[python] 다중 처리의 공유 메모리 개체

메모리에 numpy 배열이 크고이 func거대한 배열을 입력으로 받는 함수 가 있다고 가정 합니다 (다른 매개 변수와 함께). func다른 매개 변수를 사용하여 병렬로 실행할 수 있습니다. 예를 들면 :

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

다중 처리 라이브러리를 사용하면 거대한 배열이 여러 번 다른 프로세스로 복사됩니다.

다른 프로세스가 동일한 어레이를 공유하도록하는 방법이 있습니까? 이 배열 개체는 읽기 전용이며 수정되지 않습니다.

arr이 배열이 아니라 임의의 파이썬 객체 인 경우 더 복잡한 것은 무엇입니까? 공유하는 방법이 있습니까?

[편집 됨]

나는 대답을 읽었지만 여전히 약간 혼란 스럽습니다. fork ()는 copy-on-write이므로 python 다중 처리 라이브러리에서 새 프로세스를 생성 할 때 추가 비용을 호출해서는 안됩니다. 그러나 다음 코드는 엄청난 오버 헤드가 있음을 나타냅니다.

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

출력 (그런데 어레이의 크기가 증가함에 따라 비용이 증가하므로 메모리 복사와 관련된 오버 헤드가 여전히 있다고 생각합니다) :

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

어레이를 복사하지 않았다면 왜 그렇게 엄청난 오버 헤드가 발생합니까? 그리고 공유 메모리는 어떤 부분을 저장합니까?



답변

쓰기시 복사 fork()의미 체계 (일반적인 유닉스와 같은) 를 사용하는 운영 체제를 사용하는 경우 데이터 구조를 변경하지 않는 한 추가 메모리를 차지하지 않고 모든 하위 프로세스에서 사용할 수 있습니다. 특별한 작업을 수행 할 필요가 없습니다 (객체를 변경하지 않도록 절대적으로 확인하는 것 제외).

가장 효율적인 것은 당신이 당신의 문제를 위해 할 수있는가 (이용하여 효율적인 배열 구조로 배열을 포장하는 것 numpy또는 array), 공유 메모리에, 그것을 포장 그 장소 multiprocessing.Array, 그리고 기능에 그것을 전달합니다. 이 대답은 그 방법을 보여줍니다 .

쓰기 가능한 공유 객체 를 원한다면 일종의 동기화 또는 잠금으로 래핑해야합니다. 이 작업을 수행하는 두 가지 방법을multiprocessing 제공합니다 . 하나는 공유 메모리 (단순 값, 배열 또는 ctypes에 적합) 또는 프록시 (한 프로세스가 메모리를 보유하고 관리자가 다른 프로세스 (네트워크를 통해))에서 액세스를 중재 하는 프록시입니다.Manager

Manager접근 방식은 임의의 Python 객체와 함께 사용할 수 있지만 객체를 직렬화 / 역 직렬화하고 프로세스간에 전송해야하기 때문에 공유 메모리를 사용하는 동등한 것보다 느립니다.

Python에서 사용할 수 있는 다양한 병렬 처리 라이브러리와 접근 방식이 있습니다 . multiprocessing훌륭하고 둥근 라이브러리이지만 특별한 요구 사항이있는 경우 다른 접근 방식 중 하나가 더 좋을 수 있습니다.


답변

나는 같은 문제를 겪고 그것을 해결하기 위해 약간의 공유 메모리 유틸리티 클래스를 작성했습니다.

나는 multiprocessing.RawArray(lockfree)를 사용 하고 있으며 배열에 대한 액세스가 전혀 동기화되지 않았습니다 (lockfree). 자신의 발을 쏘지 않도록 조심하십시오.

이 솔루션을 사용하면 쿼드 코어 i7에서 약 3 배의 속도 향상을 얻을 수 있습니다.

코드는 다음과 같습니다. 자유롭게 사용하고 개선하고 버그가 있으면 신고 해주세요.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))


답변

이것은 병렬 및 분산 Python 용 라이브러리 인 Ray 의 의도 된 사용 사례입니다 . 내부적으로 Apache Arrow 데이터 레이아웃 (무 복사 형식)을 사용하여 개체를 직렬화 하고 공유 메모리 개체 저장소에 저장 하므로 복사본을 만들지 않고도 여러 프로세스에서 액세스 할 수 있습니다.

코드는 다음과 같습니다.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

호출하지 않으면 ray.put배열은 여전히 ​​공유 메모리에 저장되지만을 호출 할 때마다 한 번씩 수행되며 func원하는 것이 아닙니다.

이것은 배열뿐만 아니라 배열을 포함하는 객체 ( 예 : 아래와 같이 int를 배열에 매핑하는 사전)에서도 작동합니다.

IPython에서 다음을 실행하여 Ray와 pickle의 직렬화 성능을 비교할 수 있습니다.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Ray를 사용한 직렬화는 피클보다 약간 빠르지 만 공유 메모리를 사용하기 때문에 역 직렬화가 1000 배 더 빠릅니다 (물론이 숫자는 객체에 따라 다름).

Ray 문서를 참조하십시오 . Ray 및 Arrow를 사용한 빠른 직렬화 에 대해 자세히 알아볼 수 있습니다 . 저는 Ray 개발자 중 한 명입니다.


답변

Robert Nishihara가 언급했듯이 Apache Arrow는 특히 Ray가 구축 된 Plasma 인 메모리 개체 저장소를 사용하여이를 쉽게 만듭니다.

저는 특별히 이런 이유로 뇌 플라즈마를 만들었습니다 . Flask 앱에서 큰 개체를 빠르게로드하고 다시로드하는 것입니다. pickle.NET에서 생성 된 ‘d 바이트 문자열을 포함하여 Apache Arrow 직렬화 가능 객체를위한 공유 메모리 객체 네임 스페이스입니다 pickle.dumps(...).

Apache Ray와 Plasma의 주요 차이점은 개체 ID를 추적한다는 것입니다. 로컬에서 실행되는 모든 프로세스, 스레드 또는 프로그램은 모든 Brain개체 에서 이름을 호출하여 변수 값을 공유 할 수 있습니다 .

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]


답변