메모리에 numpy 배열이 크고이 func
거대한 배열을 입력으로 받는 함수 가 있다고 가정 합니다 (다른 매개 변수와 함께). func
다른 매개 변수를 사용하여 병렬로 실행할 수 있습니다. 예를 들면 :
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
다중 처리 라이브러리를 사용하면 거대한 배열이 여러 번 다른 프로세스로 복사됩니다.
다른 프로세스가 동일한 어레이를 공유하도록하는 방법이 있습니까? 이 배열 개체는 읽기 전용이며 수정되지 않습니다.
arr이 배열이 아니라 임의의 파이썬 객체 인 경우 더 복잡한 것은 무엇입니까? 공유하는 방법이 있습니까?
[편집 됨]
나는 대답을 읽었지만 여전히 약간 혼란 스럽습니다. fork ()는 copy-on-write이므로 python 다중 처리 라이브러리에서 새 프로세스를 생성 할 때 추가 비용을 호출해서는 안됩니다. 그러나 다음 코드는 엄청난 오버 헤드가 있음을 나타냅니다.
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;
출력 (그런데 어레이의 크기가 증가함에 따라 비용이 증가하므로 메모리 복사와 관련된 오버 헤드가 여전히 있다고 생각합니다) :
construct array = 0.0178790092468
multiprocessing overhead = 0.252444982529
어레이를 복사하지 않았다면 왜 그렇게 엄청난 오버 헤드가 발생합니까? 그리고 공유 메모리는 어떤 부분을 저장합니까?
답변
쓰기시 복사 fork()
의미 체계 (일반적인 유닉스와 같은) 를 사용하는 운영 체제를 사용하는 경우 데이터 구조를 변경하지 않는 한 추가 메모리를 차지하지 않고 모든 하위 프로세스에서 사용할 수 있습니다. 특별한 작업을 수행 할 필요가 없습니다 (객체를 변경하지 않도록 절대적으로 확인하는 것 제외).
가장 효율적인 것은 당신이 당신의 문제를 위해 할 수있는가 (이용하여 효율적인 배열 구조로 배열을 포장하는 것 numpy
또는 array
), 공유 메모리에, 그것을 포장 그 장소 multiprocessing.Array
, 그리고 기능에 그것을 전달합니다. 이 대답은 그 방법을 보여줍니다 .
쓰기 가능한 공유 객체 를 원한다면 일종의 동기화 또는 잠금으로 래핑해야합니다. 이 작업을 수행하는 두 가지 방법을multiprocessing
제공합니다 . 하나는 공유 메모리 (단순 값, 배열 또는 ctypes에 적합) 또는 프록시 (한 프로세스가 메모리를 보유하고 관리자가 다른 프로세스 (네트워크를 통해))에서 액세스를 중재 하는 프록시입니다.Manager
이 Manager
접근 방식은 임의의 Python 객체와 함께 사용할 수 있지만 객체를 직렬화 / 역 직렬화하고 프로세스간에 전송해야하기 때문에 공유 메모리를 사용하는 동등한 것보다 느립니다.
Python에서 사용할 수 있는 다양한 병렬 처리 라이브러리와 접근 방식이 있습니다 . multiprocessing
훌륭하고 둥근 라이브러리이지만 특별한 요구 사항이있는 경우 다른 접근 방식 중 하나가 더 좋을 수 있습니다.
답변
나는 같은 문제를 겪고 그것을 해결하기 위해 약간의 공유 메모리 유틸리티 클래스를 작성했습니다.
나는 multiprocessing.RawArray
(lockfree)를 사용 하고 있으며 배열에 대한 액세스가 전혀 동기화되지 않았습니다 (lockfree). 자신의 발을 쏘지 않도록 조심하십시오.
이 솔루션을 사용하면 쿼드 코어 i7에서 약 3 배의 속도 향상을 얻을 수 있습니다.
코드는 다음과 같습니다. 자유롭게 사용하고 개선하고 버그가 있으면 신고 해주세요.
'''
Created on 14.05.2013
@author: martin
'''
import multiprocessing
import ctypes
import numpy as np
class SharedNumpyMemManagerError(Exception):
pass
'''
Singleton Pattern
'''
class SharedNumpyMemManager:
_initSize = 1024
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize
def __createArray(self, dimensions, ctype=ctypes.c_double):
self.lock.acquire()
# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
# next handle
self.__getNextFreeHdl()
# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
# update cnt
self.cnt += 1
self.lock.release()
# return handle to the shared memory numpy array
return self.cur
def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')
def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()
def __getArray(self, i):
return self.shared_arrays[i]
@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance
@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
# Init Singleton on module load
SharedNumpyMemManager.getInstance()
if __name__ == '__main__':
import timeit
N_PROC = 8
INNER_LOOP = 10000
N = 1000
def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i
class Parallel_Dummy_PF:
def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)
def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))
def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)
def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())
def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())
t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))
답변
이것은 병렬 및 분산 Python 용 라이브러리 인 Ray 의 의도 된 사용 사례입니다 . 내부적으로 Apache Arrow 데이터 레이아웃 (무 복사 형식)을 사용하여 개체를 직렬화 하고 공유 메모리 개체 저장소에 저장 하므로 복사본을 만들지 않고도 여러 프로세스에서 액세스 할 수 있습니다.
코드는 다음과 같습니다.
import numpy as np
import ray
ray.init()
@ray.remote
def func(array, param):
# Do stuff.
return 1
array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)
result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)
호출하지 않으면 ray.put
배열은 여전히 공유 메모리에 저장되지만을 호출 할 때마다 한 번씩 수행되며 func
원하는 것이 아닙니다.
이것은 배열뿐만 아니라 배열을 포함하는 객체 ( 예 : 아래와 같이 int를 배열에 매핑하는 사전)에서도 작동합니다.
IPython에서 다음을 실행하여 Ray와 pickle의 직렬화 성능을 비교할 수 있습니다.
import numpy as np
import pickle
import ray
ray.init()
x = {i: np.ones(10**7) for i in range(20)}
# Time Ray.
%time x_id = ray.put(x) # 2.4s
%time new_x = ray.get(x_id) # 0.00073s
# Time pickle.
%time serialized = pickle.dumps(x) # 2.6s
%time deserialized = pickle.loads(serialized) # 1.9s
Ray를 사용한 직렬화는 피클보다 약간 빠르지 만 공유 메모리를 사용하기 때문에 역 직렬화가 1000 배 더 빠릅니다 (물론이 숫자는 객체에 따라 다름).
Ray 문서를 참조하십시오 . Ray 및 Arrow를 사용한 빠른 직렬화 에 대해 자세히 알아볼 수 있습니다 . 저는 Ray 개발자 중 한 명입니다.
답변
Robert Nishihara가 언급했듯이 Apache Arrow는 특히 Ray가 구축 된 Plasma 인 메모리 개체 저장소를 사용하여이를 쉽게 만듭니다.
저는 특별히 이런 이유로 뇌 플라즈마를 만들었습니다 . Flask 앱에서 큰 개체를 빠르게로드하고 다시로드하는 것입니다. pickle
.NET에서 생성 된 ‘d 바이트 문자열을 포함하여 Apache Arrow 직렬화 가능 객체를위한 공유 메모리 객체 네임 스페이스입니다 pickle.dumps(...)
.
Apache Ray와 Plasma의 주요 차이점은 개체 ID를 추적한다는 것입니다. 로컬에서 실행되는 모든 프로세스, 스레드 또는 프로그램은 모든 Brain
개체 에서 이름을 호출하여 변수 값을 공유 할 수 있습니다 .
$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma
from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)
brain['a'] = [1]*10000
brain['a']
# >>> [1,1,1,1,...]