[python] x 초마다 함수를 반복적으로 실행하는 가장 좋은 방법은 무엇입니까?

파이썬에서 60 초마다 반복적으로 함수를 반복하고 싶습니다 ( Objective C 의 NSTimer 와 마찬가지로 ). 이 코드는 데몬으로 실행되며 크론을 사용하여 매분마다 파이썬 스크립트를 호출하는 것과 비슷하지만 사용자가 설정하지 않아도됩니다.

에서 파이썬으로 구현 된 크론에 대한이 질문에 솔루션을 효과적으로 단지에 나타나는 수면 () x 초합니다. 나는 그러한 고급 기능이 필요하지 않으므로 아마도 이와 같은 것이 작동 할 것입니다.

while True:
    # Code executed here
    time.sleep(60)

이 코드에 예측 가능한 문제가 있습니까?



답변

프로그램에 이벤트 루프가없는 경우, 범용 이벤트 스케줄러를 구현하는 sched 모듈을 사용하십시오 .

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

이미 같은 이벤트 루프 라이브러리를 사용하는 경우 asyncio, trio, tkinter, PyQt5, gobject, kivy, 그리고 많은 다른 – 단지 대신, 기존의 이벤트 루프 라이브러리의 방법을 사용하여 작업을 예약.


답변

다음과 같이 시간 루프를 시스템 시계에 고정하십시오.

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))


답변

Reactor Pattern 을 구현하는 Python 네트워킹 라이브러리 인 Twisted 를 고려할 수 있습니다 .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

“while : sleep (60)”은 아마도 작동하지만 Twisted는 아마도 여러분이 궁극적으로 필요로하는 많은 기능 (bobince가 지적한 데몬 처리, 로깅 또는 예외 처리)을 이미 구현하고 있으며보다 강력한 솔루션 일 것입니다


답변

무한 루프를 차단하는 대신 비 차단 방식으로 주기적으로 함수를 실행하려면 스레드 타이머를 사용합니다. 이렇게하면 코드가 계속 실행되고 다른 작업을 수행 할 수 있으며 n 초마다 함수를 호출 할 수 있습니다. 이 기술은 긴 CPU / 디스크 / 네트워크 집약적 작업에 대한 진행 정보를 인쇄하는 데 많이 사용됩니다.

다음은 start () 및 stop () 컨트롤을 사용하여 비슷한 질문에 게시 한 코드입니다.

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

용법:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

풍모:

  • 표준 라이브러리 만, 외부 종속성 없음
  • start()그리고 stop()타이머가 이미 시작된 경우에도 여러 번 전화를 안전 / 정지
  • 호출 할 함수는 위치 및 명명 된 인수를 가질 수 있습니다.
  • interval언제든지 변경할 수 있으며 다음 실행 후 효과적입니다. 동일에 args, kwargs심지어 function!

답변

내가 믿는 더 쉬운 방법은 다음과 같습니다.

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

이렇게하면 코드가 실행 된 다음 60 초 동안 기다린 다음 다시 실행, 대기, 실행 등을 복잡하게 할 필요가 없습니다.


답변

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

나머지 코드를 차단하지 않고이 작업을 수행하려면이 코드를 사용하여 자체 스레드에서 실행할 수 있습니다.

import threading
threading.Thread(target=lambda: every(5, foo)).start()

이 솔루션은 다른 솔루션에서 거의 찾아 볼 수없는 몇 가지 기능을 결합합니다.

  • 예외 처리 : 이 수준에서 가능한 한 예외가 올바르게 처리됩니다. 즉, 프로그램을 중단하지 않고 디버깅 목적으로 기록됩니다.
  • 체인 없음 : 많은 응답에서 발견되는 일반적인 체인과 같은 구현은 스케줄링 메커니즘 ( threading.Timer또는 무엇이든)에 문제가 있으면 체인을 종료 한다는 측면에서 취하기가 쉽지 않습니다. 문제의 원인이 이미 수정 된 경우에도 더 이상 실행되지 않습니다. 간단한 루프와 간단한 대기 sleep()가 훨씬 강력합니다.
  • 표류 없음 : 내 솔루션은 실행 예정 시간을 정확하게 추적합니다. 실행 시간에 따라 다른 많은 솔루션 에서처럼 드리프트가 없습니다.
  • 건너 뛰기 : 한 번의 실행에 너무 많은 시간이 걸리면 솔루션이 작업을 건너 뜁니다 (예 : 5 초마다 X를 수행하지만 X는 6 초가 걸렸습니다). 이것은 표준 크론 동작입니다. 다른 많은 솔루션은 지연없이 작업을 여러 번 연속해서 실행합니다. 대부분의 경우 (예 : 정리 작업) 이것은 바람직하지 않습니다. 이 경우 되고 싶다고, 단순히 사용하는 next_time += delay대신.

답변

다음은 시간이 지남에 따라 드리핑을 피하는 MestreLion의 코드 업데이트입니다.

여기서 RepeatedTimer 클래스는 OP에서 요청한대로 “간격”초마다 지정된 함수를 호출합니다. 스케줄은 함수를 실행하는 데 걸리는 시간에 의존하지 않습니다. 외부 라이브러리 종속성이 없기 때문에이 솔루션을 좋아합니다. 이것은 순수한 파이썬입니다.

import threading
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

샘플 사용법 (MestreLion의 답변에서 복사) :

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!