멀티 프로세싱 모듈의 풀 클래스 와 비슷한 워커 스레드에 대한 풀 클래스가 있습니까?
예를 들어지도 기능을 병렬화하는 쉬운 방법이 마음에 듭니다.
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
그러나 나는 새로운 프로세스를 만드는 오버 헤드없이 그것을하고 싶습니다.
나는 길에 대해 알고있다. 그러나 유스 케이스에서 함수는 파이썬 랩퍼가 실제 함수 호출 전에 GIL을 해제하는 IO 바운드 C 함수입니다.
내 스레딩 풀을 작성해야합니까?
답변
난 그냥 사실이 있음을 발견 하다 에서 스레드 기반의 풀 인터페이스 multiprocessing
그러나 그것은 다소 숨겨져 제대로 문서화되지, 모듈.
통해 가져올 수 있습니다
from multiprocessing.pool import ThreadPool
파이썬 스레드를 래핑하는 더미 프로세스 클래스를 사용하여 구현됩니다. 이 스레드 기반 프로세스 클래스는 docsmultiprocessing.dummy
에서 간단히 언급 할 수 있습니다 . 이 더미 모듈은 아마도 스레드를 기반으로 한 전체 멀티 프로세싱 인터페이스를 제공합니다.
답변
Python 3에서는을 사용할 수 있습니다 concurrent.futures.ThreadPoolExecutor
.
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
자세한 내용 과 예제 는 문서 를 참조하십시오 .
답변
그렇습니다. 동일한 API를 가진 것으로 보입니다.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
답변
매우 간단하고 가벼운 무언가 ( 여기서 약간 수정 됨 ) :
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
작업 완료시 콜백을 지원하려면 작업 튜플에 콜백을 추가하면됩니다.
답변
안녕하세요 파이썬에서 스레드 풀을 사용하려면이 라이브러리를 사용할 수 있습니다 :
from multiprocessing.dummy import Pool as ThreadPool
그리고이 라이브러리를 사용하려면 다음과 같이하십시오.
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
스레드는 원하는 스레드 수이며 작업은 대부분 서비스에 매핑되는 작업 목록입니다.
답변
마지막으로 사용한 결과는 다음과 같습니다. 위의 dgorissen에 의해 수정 된 클래스 버전입니다.
파일: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
수영장을 이용하려면
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print('sleeping for (%d)sec' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
답변
새 프로세스를 만드는 데 드는 오버 헤드는 최소화되며 특히 4 개에 불과한 경우에는 더욱 그렇습니다. 이것이 응용 프로그램의 성능 핫스팟인지 의심합니다. 간단하게 유지하고 필요한 위치와 프로파일 링 결과가 가리키는 위치를 최적화하십시오.