파이썬에서 60 초마다 반복적으로 함수를 반복하고 싶습니다 ( Objective C 의 NSTimer 와 마찬가지로 ). 이 코드는 데몬으로 실행되며 크론을 사용하여 매분마다 파이썬 스크립트를 호출하는 것과 비슷하지만 사용자가 설정하지 않아도됩니다.
에서 파이썬으로 구현 된 크론에 대한이 질문에 솔루션을 효과적으로 단지에 나타나는 수면 () x 초합니다. 나는 그러한 고급 기능이 필요하지 않으므로 아마도 이와 같은 것이 작동 할 것입니다.
while True:
# Code executed here
time.sleep(60)
이 코드에 예측 가능한 문제가 있습니까?
답변
프로그램에 이벤트 루프가없는 경우, 범용 이벤트 스케줄러를 구현하는 sched 모듈을 사용하십시오 .
import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print("Doing stuff...")
# do your stuff
s.enter(60, 1, do_something, (sc,))
s.enter(60, 1, do_something, (s,))
s.run()
이미 같은 이벤트 루프 라이브러리를 사용하는 경우 asyncio
, trio
, tkinter
, PyQt5
, gobject
, kivy
, 그리고 많은 다른 – 단지 대신, 기존의 이벤트 루프 라이브러리의 방법을 사용하여 작업을 예약.
답변
다음과 같이 시간 루프를 시스템 시계에 고정하십시오.
import time
starttime = time.time()
while True:
print "tick"
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
답변
Reactor Pattern 을 구현하는 Python 네트워킹 라이브러리 인 Twisted 를 고려할 수 있습니다 .
from twisted.internet import task, reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
“while : sleep (60)”은 아마도 작동하지만 Twisted는 아마도 여러분이 궁극적으로 필요로하는 많은 기능 (bobince가 지적한 데몬 처리, 로깅 또는 예외 처리)을 이미 구현하고 있으며보다 강력한 솔루션 일 것입니다
답변
무한 루프를 차단하는 대신 비 차단 방식으로 주기적으로 함수를 실행하려면 스레드 타이머를 사용합니다. 이렇게하면 코드가 계속 실행되고 다른 작업을 수행 할 수 있으며 n 초마다 함수를 호출 할 수 있습니다. 이 기술은 긴 CPU / 디스크 / 네트워크 집약적 작업에 대한 진행 정보를 인쇄하는 데 많이 사용됩니다.
다음은 start () 및 stop () 컨트롤을 사용하여 비슷한 질문에 게시 한 코드입니다.
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
용법:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
풍모:
- 표준 라이브러리 만, 외부 종속성 없음
start()
그리고stop()
타이머가 이미 시작된 경우에도 여러 번 전화를 안전 / 정지- 호출 할 함수는 위치 및 명명 된 인수를 가질 수 있습니다.
interval
언제든지 변경할 수 있으며 다음 실행 후 효과적입니다. 동일에args
,kwargs
심지어function
!
답변
내가 믿는 더 쉬운 방법은 다음과 같습니다.
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
이렇게하면 코드가 실행 된 다음 60 초 동안 기다린 다음 다시 실행, 대기, 실행 등을 복잡하게 할 필요가 없습니다.
답변
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
나머지 코드를 차단하지 않고이 작업을 수행하려면이 코드를 사용하여 자체 스레드에서 실행할 수 있습니다.
import threading
threading.Thread(target=lambda: every(5, foo)).start()
이 솔루션은 다른 솔루션에서 거의 찾아 볼 수없는 몇 가지 기능을 결합합니다.
- 예외 처리 : 이 수준에서 가능한 한 예외가 올바르게 처리됩니다. 즉, 프로그램을 중단하지 않고 디버깅 목적으로 기록됩니다.
- 체인 없음 : 많은 응답에서 발견되는 일반적인 체인과 같은 구현은 스케줄링 메커니즘 (
threading.Timer
또는 무엇이든)에 문제가 있으면 체인을 종료 한다는 측면에서 취하기가 쉽지 않습니다. 문제의 원인이 이미 수정 된 경우에도 더 이상 실행되지 않습니다. 간단한 루프와 간단한 대기sleep()
가 훨씬 강력합니다. - 표류 없음 : 내 솔루션은 실행 예정 시간을 정확하게 추적합니다. 실행 시간에 따라 다른 많은 솔루션 에서처럼 드리프트가 없습니다.
- 건너 뛰기 : 한 번의 실행에 너무 많은 시간이 걸리면 솔루션이 작업을 건너 뜁니다 (예 : 5 초마다 X를 수행하지만 X는 6 초가 걸렸습니다). 이것은 표준 크론 동작입니다. 다른 많은 솔루션은 지연없이 작업을 여러 번 연속해서 실행합니다. 대부분의 경우 (예 : 정리 작업) 이것은 바람직하지 않습니다. 이 경우 되고 싶다고, 단순히 사용하는
next_time += delay
대신.
답변
다음은 시간이 지남에 따라 드리핑을 피하는 MestreLion의 코드 업데이트입니다.
여기서 RepeatedTimer 클래스는 OP에서 요청한대로 “간격”초마다 지정된 함수를 호출합니다. 스케줄은 함수를 실행하는 데 걸리는 시간에 의존하지 않습니다. 외부 라이브러리 종속성이 없기 때문에이 솔루션을 좋아합니다. 이것은 순수한 파이썬입니다.
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
샘플 사용법 (MestreLion의 답변에서 복사) :
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!