multiprocessing
모듈에 대한 문서는로 시작하는 프로세스에 큐를 전달하는 방법을 보여줍니다 multiprocessing.Process
. 하지만 시작된 비동기 작업자 프로세스와 큐를 공유하려면 어떻게해야 apply_async
합니까? 동적 조인이나 다른 것이 필요하지 않습니다. 작업자가 결과를 다시 기지에 (반복적으로)보고하는 방법 일뿐입니다.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
이것은 실패합니다 :
RuntimeError: Queue objects should only be shared between processes through inheritance
. 이것이 의미하는 바를 이해하고 산세 / 산세 제거 (및 모든 특수 Windows 제한 사항)를 요구하는 대신 상속하라는 조언을 이해합니다. 하지만 어떻게 합니까 내가 작업하는 방식으로 큐를 통과? 예를 찾을 수없고 여러 가지 방법으로 실패한 여러 대안을 시도했습니다. 도와주세요?
답변
사용해보십시오 multiprocessing.Manager를 대기열을 관리하고 또한 다른 근로자가 액세스 할 수 있도록 만들 수 있습니다.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))
답변
multiprocessing.Pool
이미 공유 된 결과 큐가있는 경우 추가로 Manager.Queue
. 별도의 서버 프로세스에 있으며 프록시를 통해 노출되는 후드 아래에 Manager.Queue
있는 queue.Queue
(다중 스레드 대기열)입니다. 이는 풀의 내부 대기열에 비해 추가 오버 헤드를 추가합니다. 풀의 기본 결과 처리에 의존하는 것과 달리의 결과 Manager.Queue
도 순서가 보장되지 않습니다.
작업자 프로세스는로 시작 되지 않습니다 . 이는 .apply_async()
인스턴스화 할 때 이미 발생합니다 Pool
. 무엇 됩니다 당신이 호출 할 때 시작하는 pool.apply_async()
새로운 “작업”입니다. 풀의 작업자 프로세스 multiprocessing.pool.worker
는 내부적으로 기능을 실행합니다 . 이 함수는 풀의 내부를 통해 전송 된 새로운 “작업”을 처리 Pool._inqueue
하고 Pool._outqueue
. 지정한 항목 func
은 내에서 실행됩니다 multiprocessing.pool.worker
. func
에만이 return
뭔가 결과가 자동으로 부모에게 전송됩니다.
.apply_async()
즉시 (비동기 적으로) AsyncResult
객체 (의 별칭 )를 반환합니다 ApplyResult
. .get()
실제 결과를 받으려면 해당 개체 에서 호출 (차단)해야합니다. 또 다른 옵션은 결과가 준비되는 즉시 시작되는 콜백 함수 를 등록하는 것 입니다.
from multiprocessing import Pool
def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)): # do stuff
pass
return i
if __name__ == '__main__':
with Pool(4) as pool:
print(pool._outqueue) # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# `.apply_async()` immediately returns AsyncResult (ApplyResult) object
print(results[0]) # DEMO
results = [res.get() for res in results]
print(f'result: {results}')
출력 예 :
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
참고 :에- timeout
매개 변수를 지정해도 .get()
작업자 내에서 작업의 실제 처리가 중지되는 것은 아니며 multiprocessing.TimeoutError
.