[python] Python 멀티 프로세싱 PicklingError : <type ‘function’>을 (를) 피클 할 수 없습니다

더 간단한 예제로 오류를 재현 할 수 없어서 죄송합니다. 코드가 너무 복잡하여 게시 할 수 없습니다. 일반 파이썬 대신 IPython 쉘에서 프로그램을 실행하면 문제가 해결됩니다.

이 문제에 대한 이전 메모를 찾아 보았습니다. 풀을 사용하여 클래스 함수 내에 정의 된 함수를 호출했기 때문에 모두 발생했습니다. 그러나 이것은 나에게 해당되지 않습니다.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

도움을 주시면 감사하겠습니다.

업데이트 : 피클 링 된 기능은 모듈의 최상위 레벨에서 정의됩니다. 중첩 된 함수가 포함 된 함수를 호출하지만. 즉, f()전화 g()통화 h()중첩 된 기능을 가지고 i(), 나는 부르고있다 pool.apply_async(f). f(), g(), h()모든 상단 레벨에서 정의됩니다. 이 패턴으로 더 간단한 예를 시도했지만 작동합니다.



답변

다음은 절임 가능한 항목의 목록입니다 . 특히 함수는 모듈의 최상위 수준에서 정의 된 경우에만 선택할 수 있습니다.

이 코드 조각 :

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

게시 한 것과 거의 동일한 오류가 발생합니다.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

문제는 pool모두 메소드를 사용하여 mp.SimpleQueue작업을 작업자 프로세스에 전달한다는 것입니다. 통과하는 모든 항목은 선택 가능 mp.SimpleQueue해야하며 foo.work모듈의 최상위 레벨에 정의되어 있지 않으므로 선택 가능하지 않습니다.

최상위 레벨에서 함수를 정의하여 해결할 수 있습니다 foo.work().

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

공지 foo이후, pickable입니다 Foo최고 수준에서 정의된다 foo.__dict__picklable입니다.


답변

내가 사용하는 것 pathos.multiprocesssing대신에, multiprocessing. 사용 pathos.multiprocessing하는 포크입니다 . 파이썬에서 거의 모든 것을 직렬화 할 수 있으므로 병렬로 더 많이 보낼 수 있습니다. 당신은 클래스 메소드를 위해 필요로하는 포크는 또한 여러 인자 기능을 직접 작업 할 수있는 능력을 가지고 있습니다.multiprocessingdilldillpathos

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

https://github.com/uqfoundation 에서 pathos원하는 정보를 얻으십시오 dill.


답변

다른 사람들이 말했듯 multiprocessing이 파이썬 객체는 피클 가능한 작업자 프로세스로만 전송할 수 있습니다. unutbu에서 설명한대로 코드를 재구성 할 수없는 경우 아래에 표시된대로 dill데이터 (특히 코드 데이터)를 전송하기 위해 확장 된 산세 / 비산 세 기능을 사용할 수 있습니다 .

이 솔루션은 다음 dill과 같은 다른 라이브러리 의 설치 만 필요합니다 pathos.

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()


답변

필자는 프로파일 러를 사용하여 완벽하게 작동하는 코드에서 오류 출력을 정확하게 생성 할 수도 있음을 발견했습니다.

이것은 Windows (포크가 약간 우아하지 않은)에 있음을 유의하십시오.

나는 달리고 있었다 :

python -m profile -o output.pstats <script> 

그리고 프로파일 링을 제거하면 오류가 제거되고 프로파일 링을 배치하면 오류가 복원됩니다. 코드가 작동하는 것을 알고 있기 때문에 나에게도 배티를 몰고있었습니다. 나는 무언가가 pool.py를 업데이트했는지 확인하고 있었고 … 싱크 느낌이 있었고 프로파일 링을 제거했습니다.

다른 사람이 보관할 수 있도록 여기에 보관소에 게시하십시오.


답변

이 문제가 발생 multiprocessing하면 간단한 해결책이에서로 전환 Pool됩니다 ThreadPool. 가져 오기 이외의 코드 변경없이 수행 할 수 있습니다.

from multiprocessing.pool import ThreadPool as Pool

ThreadPool은 새로운 프로세스를 생성하지 않고 메인 스레드와 메모리를 공유하기 때문에 작동합니다. 이는 산세가 필요하지 않음을 의미합니다.

이 방법의 단점은 파이썬이 스레드를 처리하는 데 가장 큰 언어가 아니라는 것입니다. 글로벌 인터프리터 잠금 (Global Interpreter Lock)이라는 것을 사용하여 스레드를 안전하게 유지하므로 일부 사용 사례가 느려질 수 있습니다. 그러나 주로 다른 시스템 (HTTP 명령 실행, 데이터베이스와 대화, 파일 시스템에 쓰기)과 상호 작용하는 경우 코드가 CPU에 의해 구속되지 않고 큰 타격을받지 않습니다. 실제로 HTTP / HTTPS 벤치 마크를 작성할 때 여기에 사용 된 스레드 모델의 오버 헤드와 지연이 적다는 것을 알았습니다. 새 프로세스를 생성하는 오버 헤드가 새 스레드를 생성하는 오버 헤드보다 훨씬 높기 때문입니다.

따라서 파이썬 사용자 공간에서 많은 것을 처리하는 경우 이것이 최선의 방법이 아닐 수도 있습니다.


답변

이 솔루션은 dill 설치 만 필요하며 pathos로 다른 라이브러리는 필요하지 않습니다.

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

numpy 배열에서도 작동합니다.


답변

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

비동기 작업으로 전달 된 모델 객체 내에 내장 함수가있는 경우에도이 오류가 발생합니다.

전달 된 모델 객체 에 내장 함수가 없는지 확인하십시오 . (이 경우 모델 내에서 django-model-utilsFieldTracker() 기능을 사용하여 특정 필드를 추적했습니다). 다음은 관련 GitHub 문제에 대한 링크 입니다.