다중 처리를 통해 생성 된 자식 프로세스 는 프로그램에서 이전에 생성 된 객체를 공유합니까?
다음 설정이 있습니다.
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
큰 개체를 메모리에로드 한 다음 해당 큰 개체를 사용해야하는 작업자 풀을 만듭니다. 큰 개체는 읽기 전용으로 액세스되며 프로세스간에 수정 사항을 전달할 필요가 없습니다.
내 질문은 : 내가 유닉스 / C에서 프로세스를 생성 한 경우와 같이 빅 객체가 공유 메모리에로드됩니까? 아니면 각 프로세스가 자체 빅 객체 사본을로드합니까?
업데이트 : 더 명확히하기 위해-big_lookup_object는 공유 조회 개체입니다. 나는 그것을 나누고 별도로 처리 할 필요가 없습니다. 사본 한 장을 보관해야합니다. 분할해야하는 작업은 많은 다른 대용량 파일을 읽고 조회 개체에 대해 해당 대용량 파일의 항목을 찾는 것입니다.
추가 업데이트 : 데이터베이스는 훌륭한 솔루션이고, memcached가 더 나은 솔루션 일 수 있으며, 디스크상의 파일 (shelve 또는 dbm)이 더 좋을 수 있습니다. 이 질문에서 저는 특히 메모리 솔루션에 관심이있었습니다. 최종 솔루션으로는 hadoop을 사용할 것이지만 로컬 메모리 버전도 사용할 수 있는지 확인하고 싶었습니다.
답변
“멀티 프로세싱을 통해 생성 된 자식 프로세스는 프로그램 초기에 생성 된 객체를 공유합니까?”
아니요 (3.8 이전의 Python), 3.8에서는 예 ( https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory )
프로세스에는 독립적 인 메모리 공간이 있습니다.
해결책 1
작업자가 많은 대형 구조를 최대한 활용하려면 이렇게하십시오.
-
각 작업자를 “필터”로 작성 – stdin에서 중간 결과를 읽고, 작동하며, stdout에 중간 결과를 씁니다.
-
모든 작업자를 파이프 라인으로 연결합니다.
process1 <source | process2 | process3 | ... | processn >result
각 프로세스는 읽고, 작업하고, 씁니다.
이것은 모든 프로세스가 동시에 실행되기 때문에 매우 효율적입니다. 쓰기 및 읽기는 프로세스 간의 공유 버퍼를 통해 직접 전달됩니다.
해결 방법 2
경우에 따라 더 복잡한 구조 (종종 “팬 아웃”구조)가 있습니다. 이 경우 여러 자녀가있는 부모가 있습니다.
-
부모는 소스 데이터를 엽니 다. 부모는 여러 자녀를 포크합니다.
-
부모는 소스를 읽고 소스의 일부를 동시에 실행중인 자식으로 팜합니다.
-
부모가 끝에 도달하면 파이프를 닫습니다. 자식은 파일의 끝을 얻고 정상적으로 끝납니다.
각 어린이는 단순히 읽기 때문에 어린이 부분은 쓰기가 즐겁습니다 sys.stdin
.
부모는 모든 자식을 생성하고 파이프를 올바르게 유지하는 데 약간의 멋진 발놀림을 가지고 있지만 그렇게 나쁘지는 않습니다.
팬인은 반대 구조입니다. 독립적으로 실행되는 여러 프로세스는 입력을 공통 프로세스에 삽입해야합니다. 수집기는 많은 소스에서 읽어야하기 때문에 쓰기가 쉽지 않습니다.
많은 명명 된 파이프에서 읽기는 종종 select
어떤 파이프에 보류중인 입력이 있는지 확인하기 위해 모듈을 사용하여 수행됩니다 .
해결책 3
공유 조회는 데이터베이스의 정의입니다.
솔루션 3A – 데이터베이스를로드합니다. 작업자가 데이터베이스의 데이터를 처리하도록합니다.
솔루션 3B – werkzeug (또는 유사)를 사용하여 매우 간단한 서버를 생성 하여 작업자가 서버를 쿼리 할 수 있도록 HTTP GET에 응답하는 WSGI 애플리케이션을 제공합니다.
해결 방법 4
공유 파일 시스템 객체. Unix OS는 공유 메모리 개체를 제공합니다. 이들은 메모리에 매핑 된 파일 일 뿐이므로보다 규칙적인 버퍼 읽기 대신 I / O 스왑이 수행됩니다.
여러 가지 방법으로 Python 컨텍스트에서이를 수행 할 수 있습니다.
-
(1) 원래의 거대한 객체를 더 작은 객체로 나누고 (2) 각각 더 작은 객체로 작업자를 시작하는 시작 프로그램을 작성하십시오. 작은 객체는 파일 읽기 시간을 절약하기 위해 피클 링 된 Python 객체 일 수 있습니다.
-
(1) 원래의 거대한 객체를 읽고
seek
작업을 사용하여 페이지 구조의 바이트 코드 파일을 작성하는 시작 프로그램을 작성하여 간단한 탐색으로 개별 섹션을 쉽게 찾을 수 있도록합니다. 이것이 데이터베이스 엔진이하는 일입니다. 데이터를 페이지로 나누고seek
.이 큰 페이지 구조 파일에 액세스 할 수있는 작업자를 생성합니다. 각 작업자는 관련 부분을 찾고 거기에서 작업을 수행 할 수 있습니다.
답변
다중 처리를 통해 생성 된 자식 프로세스 는 프로그램에서 이전 에 생성 된 객체를 공유 합니까?
때에 따라 다르지. 전역 읽기 전용 변수의 경우 (소비 된 메모리와는 별도로) 그렇게 간주 될 수 있습니다.
multiprocessing 의 문서에 따르면 다음과 같습니다.
Better to inherit than pickle/unpickle
Windows에서 다중 처리의 많은 유형은 하위 프로세스에서 사용할 수 있도록 선택 가능해야합니다. 그러나 일반적으로 파이프 나 큐를 사용하여 공유 객체를 다른 프로세스로 보내지 않아야합니다. 대신 다른 곳에서 생성 된 공유 리소스에 액세스해야하는 프로세스가 조상 프로세스에서 상속 할 수 있도록 프로그램을 정렬해야합니다.
Explicitly pass resources to child processes
Unix에서 하위 프로세스는 전역 리소스를 사용하여 상위 프로세스에서 생성 된 공유 리소스를 사용할 수 있습니다. 그러나 개체를 자식 프로세스의 생성자에 인수로 전달하는 것이 좋습니다.
코드를 (잠재적으로) Windows와 호환되도록 만드는 것 외에도 자식 프로세스가 여전히 활성 상태 인 한 개체가 부모 프로세스에서 가비지 수집되지 않습니다. 이는 상위 프로세스에서 개체가 가비지 수집 될 때 일부 리소스가 해제되는 경우 중요 할 수 있습니다.
Global variables
자식 프로세스에서 실행되는 코드가 전역 변수에 액세스하려고하면 표시되는 값 (있는 경우)이 Process.start ()가 호출 될 때 부모 프로세스의 값과 동일하지 않을 수 있습니다. .
예
Windows (단일 CPU) :
#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool
x = 23000 # replace `23` due to small integers share representation
z = [] # integers are immutable, let's try mutable object
def printx(y):
global x
if y == 3:
x = -x
z.append(y)
print os.getpid(), x, id(x), z, id(z)
print y
if len(sys.argv) == 2 and sys.argv[1] == "sleep":
time.sleep(.1) # should make more apparant the effect
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(printx, (1,2,3,4))
와 함께 sleep
:
$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4
없이 sleep
:
$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
답변
S.Lott 가 맞습니다. Python의 다중 처리 단축키는 효과적으로 별도의 중복 된 메모리 청크를 제공합니다.
대부분의 * nix 시스템에서 하위 수준 호출을 사용 os.fork()
하면 실제로 기록 중 복사 메모리를 제공 할 수 있습니다. 이론적으로 AFAIK는 가능한 가장 단순한 프로그램에서 복제하지 않고도 해당 데이터에서 읽을 수 있습니다.
그러나 파이썬 인터프리터에서는 그렇게 간단하지 않습니다. 개체 데이터와 메타 데이터는 동일한 메모리 세그먼트에 저장되므로 개체가 변경되지 않더라도 해당 개체에 대한 참조 카운터와 같은 항목이 증가하면 메모리 쓰기가 발생하므로 복사본이 발생합니다. “print ‘hello'”이상을 수행하는 거의 모든 Python 프로그램은 참조 횟수를 증가 시키므로 쓰기시 복사의 이점을 결코 깨닫지 못할 것입니다.
누군가가 파이썬에서 공유 메모리 솔루션을 해킹 했더라도 프로세스 전반에 걸쳐 가비지 수집을 조정하려는 시도는 아마도 상당히 고통 스러울 것입니다.
답변
유닉스에서 실행 하는 경우 포크가 작동하는 방식 으로 인해 동일한 객체를 공유 할 수 있습니다 (즉, 자식 프로세스는 별도의 메모리를 가지고 있지만 쓰기시 복사이므로 아무도 수정하지 않는 한 공유 할 수 있습니다). 다음을 시도했습니다.
import multiprocessing
x = 23
def printx(y):
print x, id(x)
print y
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(printx, (1,2,3,4))
다음 출력을 얻었습니다.
$ ./mtest.py 23 22995656 1 23 22995656 2 23 22995656 삼 23 22995656 4
물론 이것은 복사본이 만들어 지지 않았다는 것을 증명 하지는 않지만 ps
각 하위 프로세스가 사용하는 실제 메모리의 양을 확인하기 위해 의 출력을보고 상황에서 확인할 수 있어야합니다 .
답변
프로세스마다 주소 공간이 다릅니다. 인터프리터의 다른 인스턴스를 실행하는 것과 같습니다. 이것이 바로 IPC (프로세스 간 통신)의 목적입니다.
이를 위해 큐 또는 파이프를 사용할 수 있습니다. 나중에 네트워크를 통해 프로세스를 배포하려는 경우 rpc over tcp를 사용할 수도 있습니다.
http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes
답변
다중 처리 자체와 직접 관련이 없지만 귀하의 예에서는 shelve 모듈 또는 이와 유사한 것을 사용할 수있는 것처럼 보입니다 . “big_lookup_object”가 실제로 완전히 메모리에 있어야합니까?
답변
아니요,하지만 데이터를 하위 프로세스로로드하고 데이터를 다른 하위 프로세스와 공유하도록 허용 할 수 있습니다. 아래를 참조하십시오.
import time
import multiprocessing
def load_data( queue_load, n_processes )
... load data here into some_variable
"""
Store multiple copies of the data into
the data queue. There needs to be enough
copies available for each process to access.
"""
for i in range(n_processes):
queue_load.put(some_variable)
def work_with_data( queue_data, queue_load ):
# Wait for load_data() to complete
while queue_load.empty():
time.sleep(1)
some_variable = queue_load.get()
"""
! Tuples can also be used here
if you have multiple data files
you wish to keep seperate.
a,b = queue_load.get()
"""
... do some stuff, resulting in new_data
# store it in the queue
queue_data.put(new_data)
def start_multiprocess():
n_processes = 5
processes = []
stored_data = []
# Create two Queues
queue_load = multiprocessing.Queue()
queue_data = multiprocessing.Queue()
for i in range(n_processes):
if i == 0:
# Your big data file will be loaded here...
p = multiprocessing.Process(target = load_data,
args=(queue_load, n_processes))
processes.append(p)
p.start()
# ... and then it will be used here with each process
p = multiprocessing.Process(target = work_with_data,
args=(queue_data, queue_load))
processes.append(p)
p.start()
for i in range(n_processes)
new_data = queue_data.get()
stored_data.append(new_data)
for p in processes:
p.join()
print(processes)