[python] Python threading.timer- ‘n’초마다 함수 반복

0.5 초마다 기능을 실행하고 타이머를 시작 및 중지하고 재설정 할 수 있기를 원합니다. 저는 파이썬 스레드가 어떻게 작동하는지 잘 모르고 파이썬 타이머에 어려움을 겪고 있습니다.

그러나 두 번 RuntimeError: threads can only be started once실행하면 계속 threading.timer.start()됩니다. 이에 대한 해결 방법이 있습니까? threading.timer.cancel()매번 시작하기 전에 신청 해 보았습니다 .

의사 코드 :

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()



답변

가장 좋은 방법은 타이머 스레드를 한 번 시작하는 것입니다. 타이머 스레드 내에서 다음을 코딩합니다.

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

타이머를 시작한 코드 set에서 중지 된 이벤트를 사용하여 타이머를 중지 할 수 있습니다 .

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()


답변

파이썬의 setInterval과 동등한 것에서 :

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

용법:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()

또는 다음 은 동일한 기능이지만 데코레이터 대신 독립형 함수입니다 .

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()

스레드를 사용하지 않고 수행하는 방법은 다음과 같습니다 .


답변

타이머 스레드 사용

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

이것은 멈출 수 있습니다 t.cancel()


답변

Hans Then의 답변 에 대해 조금 개선 하면 Timer 함수를 하위 클래스로 만들 수 있습니다. 다음은 전체 “반복 타이머”코드가되며 모든 동일한 인수를 사용하여 threading.Timer에 대한 드롭 인 대체로 사용할 수 있습니다.

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

사용 예 :

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

다음 출력을 생성합니다.

foo
foo
foo
foo

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

생산하다

bar
bar
bar
bar


답변

OP가 요청한 Timer를 사용하여 정답을 제공하기 위해 swapnil jariwala의 답변을 개선 하겠습니다 .

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()


답변

약간의 콘솔 시계를 만들기 위해 swapnil-jariwala 코드에서 일부 코드를 변경했습니다.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

산출

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

tkinter 그래픽 인터페이스가있는 타이머

이 코드는 tkinter를 사용하여 작은 창에 시계 타이머를 배치합니다.

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

플래시 카드 게임의 예 (일종)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

산출:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...


답변

나는 프로젝트를 위해 이것을해야했다. 내가 한 일은 함수에 대해 별도의 스레드를 시작하는 것이 었습니다.

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

**** 하트 비트는 내 기능이고, 작업자는 내 주장 중 하나입니다 ****

내 심장 박동 기능 내부 :

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

따라서 스레드를 시작하면 함수는 반복적으로 5 초 동안 대기하고 모든 코드를 실행하고 무기한으로 수행합니다. 프로세스를 종료하려면 스레드를 종료하십시오.