파이썬의 멀티 프로세싱 패키지 에서 큐와 파이프의 근본적인 차이점은 무엇입니까 ?
어떤 시나리오에서 다른 시나리오를 선택해야합니까? 언제 사용하는 것이 유리 Pipe()
합니까? 언제 사용하는 것이 유리 Queue()
합니까?
답변
사용시기
의사 소통에 두 개 이상의 포인트가 필요한 경우을 사용하십시오 Queue()
.
절대적인 성능이 필요한 경우에 기반으로 Pipe()
하기 때문에 a 가 훨씬 빠릅니다 .Queue()
Pipe()
성능 벤치마킹
두 개의 프로세스를 생성하고 가능한 빨리 그들 사이에 메시지를 보내려고한다고 가정 해 봅시다. 이것 Pipe()
과 Queue()
…를 사용하는 비슷한 테스트 사이의 드래그 레이스의 타이밍 결과입니다 . 이것은 Ubuntu 11.10을 실행하는 ThinkpadT61과 Python 2.7.2에 있습니다.
참고로, 나는 JoinableQueue()
보너스로 결과를 던졌다 . 호출 JoinableQueue()
될 때 작업을 queue.task_done()
설명합니다 (특정 작업에 대해서는 알지 못하고 대기열에서 완료되지 않은 작업 만 계산 함). 따라서 queue.join()
작업이 완료 되었음을 알 수 있습니다.
이 답변의 맨 아래에있는 각 코드는 …
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
요약하면 Pipe()
a보다 약 3 배 빠릅니다 Queue()
. JoinableQueue()
당신이 정말로 이점을 가져야 만하지 않는 한 생각조차하지 마십시오 .
보너스 자료 2
멀티 프로세싱은 정보 흐름에 미묘한 변경 사항을 도입하여 일부 단축키를 모르는 경우 디버깅을 어렵게 만듭니다. 예를 들어, 여러 조건에서 사전을 통해 색인을 생성 할 때 제대로 작동하지만 특정 입력에서 실패하는 스크립트가있을 수 있습니다.
일반적으로 우리는 전체 파이썬 프로세스가 충돌 할 때 실패에 대한 단서를 얻습니다. 그러나 다중 처리 기능이 충돌하는 경우 원치 않는 충돌 추적이 콘솔에 인쇄되지 않습니다. 알려지지 않은 멀티 프로세스 크래시를 추적하는 것은 프로세스 크래시의 단서 없이는 어렵습니다.
멀티 프로세스 충돌 정보를 추적하는 가장 간단한 방법은 전체 멀티 프로세싱 기능을 try
/ 로 감싸서 except
사용하는 것입니다 traceback.print_exc()
.
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
이제 충돌을 발견하면 다음과 같은 내용이 표시됩니다.
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
소스 코드:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
답변
Queue()
주목할만한 추가 기능 중 하나 는 피더 스레드입니다. 이 섹션은 “프로세스가 처음 큐에 항목을 넣을 때 피더 스레드가 시작되어 버퍼에서 파이프로 오브젝트를 전송합니다.” 차단에 Queue()
대한 호출없이 무한한 수의 (또는 최대 크기) 항목을 삽입 할 수 있습니다 queue.put()
. 이를 통해 Queue()
프로그램에서 처리 할 준비가 될 때까지 여러 항목을에 저장할 수 있습니다.
Pipe()
반면, 한 연결로 전송되었지만 다른 연결에서 수신되지 않은 항목에 대한 저장 용량은 유한합니다. 이 저장소를 모두 사용한 후에 connection.send()
는 전체 항목을 쓸 공간이있을 때까지 호출 이 차단됩니다. 다른 스레드가 파이프에서 읽을 때까지 쓰레드가 쓰기 작업을 중단합니다. Connection
객체는 기본 파일 디스크립터에 대한 액세스를 제공합니다. * nix 시스템에서이 기능을 connection.send()
사용하여 통화가 차단되는 것을 방지 할 수 있습니다 os.set_blocking()
. 그러나 파이프 파일에 맞지 않는 단일 항목을 보내려고하면 문제가 발생합니다. 최신 버전의 Linux에서는 파일 크기를 늘릴 수 있지만 허용되는 최대 크기는 시스템 구성에 따라 다릅니다. 따라서 Pipe()
데이터 버퍼링에 의존해서는 안됩니다 . 전화connection.send
파이프에서 데이터를 읽을 때까지 차단할 수 있습니다.
결론적으로, 큐는 데이터를 버퍼링해야 할 때 파이프보다 더 나은 선택입니다. 두 지점간에 만 통신해야하는 경우에도 마찬가지입니다.