[python] 멀티 프로세싱 : 클래스에 정의 된 함수에서 Pool.map을 사용하는 방법?

내가 다음과 같은 것을 실행할 때 :

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

잘 작동합니다. 그러나 이것을 클래스의 함수로 두는 것 :

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

다음과 같은 오류가 발생합니다.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Alex Martelli에서 같은 종류의 문제를 다루는 게시물을 보았지만 충분히 명확하지 않았습니다.



답변

또한 pool.map이 받아 들일 수있는 함수의 종류에 대한 제한에 짜증이났습니다. 나는 이것을 피하기 위해 다음을 썼다. Parmap을 재귀 적으로 사용하더라도 작동하는 것처럼 보입니다.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))


답변

“multiprocessing.Pool”을 사용하는 코드는 람다 식에서 작동하지 않고 “multiprocessing.Pool”을 사용하지 않는 코드는 작업 항목만큼 많은 프로세스를 생성하기 때문에 지금까지 게시 된 코드를 사용할 수 없습니다.

미리 정의 된 양의 작업자를 생성하고 유휴 작업자가있는 경우 입력 목록을 반복합니다. 또한 작업자 st ctrl-c 작업에 대해 “데몬”모드를 활성화했습니다.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))


답변

표준 라이브러리 외부로 이동하지 않으면 다중 처리 및 산세가 중단되고 제한됩니다.

당신의 포크를 사용하는 경우 multiprocessing전화를 pathos.multiprocesssing직접의 멀티 프로세싱에 클래스와 클래스 메소드를 사용할 수있는 map기능. 이 때문입니다 dill대신 사용 pickle하거나 cPickle, 및 dill파이썬에서 거의 모든 것을 직렬화 할 수 있습니다.

pathos.multiprocessing또한 비동기지도 기능을 제공합니다 … 그리고 수 map(예 : 여러 인수 기능 map(math.pow, [1,2,3], [4,5,6]))

토론 참조 :
멀티 프로세싱과 딜은 무엇을 함께 할 수 있습니까?

및 :
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

심지어 수정하지 않고 해석기에서 처음 작성한 코드도 처리합니다. 더 취약하고 단일 사례에 특정한 다른 이유는 무엇입니까?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

https://github.com/uqfoundation/pathos 에서 코드를 얻으십시오.

그리고 그것이 할 수있는 일을 조금 더 보여주기 위해 :

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
...   return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]


답변

내가 아는 한, 현재 문제에 대한 해결책은 없습니다 : 당신이주는 기능은 map()모듈 가져 오기를 통해 액세스 할 수 있어야합니다. 이것이 robert의 코드가 작동하는 이유입니다 f(). 다음 코드를 가져 와서 함수 를 얻을 수 있습니다.

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

필자는 실제로 “main”섹션을 추가했는데, 이는 Windows 플랫폼에 대한 권장 사항을 따르기 때문 입니다 ( “예기치 않은 부작용을 일으키지 않고 새로운 Python 인터프리터가 메인 모듈을 안전하게 가져올 수 있는지 확인하십시오”).

또한 PEP 8Calculate 을 따르기 위해 대문자 앞에 대문자를 추가했습니다 . 🙂


답변

mrule의 해결책은 정확하지만 버그가 있습니다. 자식이 많은 양의 데이터를 다시 보내면 파이프 버퍼를 채우고 자식을 막을 수 pipe.send()있으며 부모는 자식이 종료되기를 기다리는 중입니다 pipe.join(). 해결책은 아이를 join()먹기 전에 아이의 데이터를 읽는 것입니다. 또한 어린이는 교착 상태를 방지하기 위해 부모의 파이프 끝을 닫아야합니다. 아래 코드는 그 문제를 해결합니다. 또한의 parmap요소 당 하나의 프로세스를 만듭니다 X. 보다 고급 솔루션은 여러 청크 multiprocessing.cpu_count()로 나누고 X결과를 병합하기 전에 사용하는 것입니다. 나는 mrule에 의한 좋은 대답의 간결성을 망치지 않기 위해 독자에게 연습으로 남겨 둡니다. 😉

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))


답변

나는 또한 이것으로 고투했다. 간단한 예제로 클래스의 데이터 멤버로서의 기능을 가졌습니다.

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add):
        self.f = my_add
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

동일한 클래스 내에서 Pool.map () 호출에서 self.f 함수를 사용해야했고 self.f는 튜플을 인수로 사용하지 않았습니다. 이 함수는 클래스에 포함되었으므로 다른 답변이 제안한 래퍼 유형을 작성하는 방법이 명확하지 않았습니다.

첫 번째 요소가 함수이고 나머지 요소가 eval_func_tuple (f_args)라는 해당 함수의 인수 인 튜플 /리스트를 사용하는 다른 래퍼를 사용하여이 문제를 해결했습니다. 이를 사용하여 문제가있는 줄을 return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2))로 바꿀 수 있습니다. 전체 코드는 다음과 같습니다.

파일 : util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

파일 : main.py

from multiprocessing import Pool
import itertools
import util

pool = Pool()
class Example(object):
    def __init__(self, my_add):
        self.f = my_add
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple,
            itertools.izip(itertools.repeat(self.f), list1, list2))

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

main.py를 실행하면 [11, 22, 33]이 나타납니다. 예를 들어 eval_func_tuple을 수정하여 키워드 인수를 사용할 수도 있습니다.

또 다른 참고로, 다른 답변에서, “parmap”기능은 사용 가능한 CPU 수보다 많은 프로세스의 경우 더 효율적으로 만들 수 있습니다. 아래에서 수정 된 버전을 복사하고 있습니다. 이것은 첫 번째 게시물이며 원래 답변을 직접 수정해야하는지 잘 모르겠습니다. 또한 일부 변수의 이름을 바꿨습니다.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe,x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
    numProcesses = len(processes)
    processNum = 0
    outputList = []
    while processNum < numProcesses:
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
        for proc in processes[processNum:endProcessNum]:
            proc.start()
        for proc in processes[processNum:endProcessNum]:
            proc.join()
        for proc,c in pipe[processNum:endProcessNum]:
            outputList.append(proc.recv())
        processNum = endProcessNum
    return outputList

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))         


답변

나는 klaus se와 aganders3의 대답을 가져 와서 더 읽기 쉽고 하나의 파일로 보유하는 문서화 된 모듈을 만들었습니다. 프로젝트에 추가하면됩니다. 옵션 진행 바가 있습니다!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

편집 : @ alexander-mcfarlane 제안 및 테스트 기능 추가