[python] 다중 처리 Pool.map ()을 사용할 때 <type ‘instancemethod’>을 (를) 피클 할 수 없습니다

작업을 동시에 나누기 위해 multiprocessingPool.map()기능 을 사용하려고합니다 . 다음 코드를 사용하면 정상적으로 작동합니다.

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

그러나 더 객체 지향적 인 접근 방식으로 사용하면 작동하지 않습니다. 오류 메시지는 다음과 같습니다.

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

다음이 내 주요 프로그램 인 경우에 발생합니다.

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

다음은 내 someClass수업입니다.

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)
        print pool.map(self.f, range(10))

누구나 문제가 무엇인지, 쉽게 해결할 수 있는지 알고 있습니까?



답변

문제는 멀티 프로세싱이 프로세스간에 슬링하기 위해 피클을 처리해야하며 바인딩 된 메서드는 피클 링 할 수 없다는 것입니다. 해결 방법 ( “쉬운”여부와 상관없이)은 프로그램에 인프라를 추가하여 그러한 메소드를 피클 할 수있게하고 copy_reg 표준 라이브러리 메소드에 등록하는 것입니다 .

예를 들어, 스레드 의 끝까지이 스레드에 대한 Steven Bethard의 기여는를 통해 메소드 산 세척 / 언 피클 링을 수행 할 수있는 완벽하게 실행 가능한 방법 중 하나를 보여줍니다 copy_reg.


답변

표준 라이브러리를 벗어나지 않으면 멀티 프로세싱 및 피클 링이 중단되고 제한되기 때문에 이러한 솔루션은 모두보기 흉한 상태입니다.

당신의 포크를 사용하는 경우 multiprocessing전화를 pathos.multiprocesssing직접의 멀티 프로세싱에 클래스와 클래스 메소드를 사용할 수있는 map기능. 이 때문입니다 dill대신 사용 pickle하거나 cPickle, 및 dill파이썬에서 거의 모든 것을 직렬화 할 수 있습니다.

pathos.multiprocessing또한 비동기지도 기능을 제공합니다 … 그리고 수 map(예 : 여러 인수 기능 map(math.pow, [1,2,3], [4,5,6]))

참조 :
어떤 멀티 프로세싱 할 수 있고 함께 할 딜?

및 :
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
...   return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

그리고 명백하게, 당신은 당신이 처음에하고 싶었던 것을 정확하게 원할 수 있고, 원한다면 통역사로부터 그것을 할 수 있습니다.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

https://github.com/uqfoundation/pathos 에서 코드를 얻으십시오.


답변

__call__()내부 에 메소드를 정의 할 수도 있습니다.이 메소드 someClass()를 호출 someClass.go()한 다음 인스턴스를 someClass()풀에 전달합니다 . 이 객체는 피클 가능하며 잘 작동합니다 …


답변

Steven Bethard의 솔루션에는 몇 가지 제한 사항이 있습니다.

클래스 메소드를 함수로 등록하면 메소드 처리가 완료 될 때마다 클래스의 소멸자가 놀랍게도 호출됩니다. 따라서 메소드의 n 배를 호출하는 클래스의 인스턴스가 1 회있는 경우 멤버는 2 번 실행 사이에 사라지고 메시지 malloc: *** error for object 0x...: pointer being freed was not allocated(예 : 열린 멤버 파일) 또는 pure virtual method called,
terminate called without an active exception
(사용 한 멤버 객체의 수명 이보다 짧았습니다. 내가 생각한 것). 수영장 크기보다 큰 n을 처리 할 때 이것을 얻었습니다. 다음은 간단한 예입니다.

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

산출:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__[없음 …] 결과로부터 판독되지 않으므로 방법은 매우 동등하지 않다 :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

따라서 두 방법 중 어느 것도 만족스럽지 않습니다 …


답변

클래스 인스턴스의 내용에 따라 비효율적 일 수 있지만 사용할 수있는 또 다른 바로 가기가 있습니다.

모든 사람들이 말했듯이, multiprocessing코드는 코드가 시작한 서브 프로세스로 전송하는 것을 피클해야하고 피커는 인스턴스 메소드를 수행하지 않는다는 것입니다.

그러나 인스턴스 메소드를 전송하는 대신 실제 클래스 인스턴스와 호출 할 함수의 이름을 getattr인스턴스 메소드를 호출하는 데 사용되는 일반 함수 로 전송하여 Pool서브 프로세스 에서 바운드 메소드를 작성할 수 있습니다. 이것은 __call__둘 이상의 멤버 함수를 호출 할 수 있다는 점을 제외하고 메소드 정의와 유사 합니다.

@EricH.의 코드를 그의 답변에서 훔쳐서 약간 주석을 달았습니다 (모든 이름이 변경되어 어떤 이유로 든 이름이 바뀌어 붙여 넣기가 더 쉬워 보입니다 :-)).

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

결과는 실제로 생성자가 한 번 (원래 pid에서) 호출되고 소멸자가 9 번 (필요에 따라 각 사본마다 한 번씩 = 풀 작업자 프로세스 당 2 ~ 3 번, 원본에서 한 번) 호출된다는 것을 보여줍니다 방법). 기본 피커는 전체 인스턴스의 복사본을 만들고 (반)이 비밀리에 다시 채우기 때문에이 경우처럼 다음과 같이 수행하는 것이 좋습니다.

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

그렇기 때문에 3 개의 작업자 프로세스에서 소멸자가 8 번 호출 되더라도 매번 1에서 0으로 카운트 다운됩니다. 물론 이러한 방식으로 문제가 발생할 수 있습니다. 필요한 경우 자신의 것을 제공 할 수 있습니다 __setstate__.

    def __setstate__(self, adict):
        self.count = adict['count']

이 경우 예를 들어.


답변

__call__()내부 에 메소드를 정의 할 수도 있습니다.이 메소드 someClass()를 호출 someClass.go()한 다음 인스턴스를 someClass()풀에 전달합니다 . 이 객체는 피클 가능하며 잘 작동합니다 …

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):
     return self.f(x)

sc = someClass()
sc.go()


답변

위의 parisjohn 의 솔루션은 나와 잘 작동합니다. 또한 코드가 깨끗하고 이해하기 쉽습니다. 필자의 경우 Pool을 사용하여 호출하는 함수가 몇 가지 있으므로 parisjohn의 코드를 약간 아래에서 수정했습니다. 내가 만든 전화 몇 가지 함수를 호출 할 수 있도록하고, 함수 이름에서 인수 DICT에 전달됩니다 go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])

sc = someClass()
sc.go()