작업을 동시에 나누기 위해 multiprocessing
의 Pool.map()
기능 을 사용하려고합니다 . 다음 코드를 사용하면 정상적으로 작동합니다.
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
그러나 더 객체 지향적 인 접근 방식으로 사용하면 작동하지 않습니다. 오류 메시지는 다음과 같습니다.
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
다음이 내 주요 프로그램 인 경우에 발생합니다.
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
다음은 내 someClass
수업입니다.
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
누구나 문제가 무엇인지, 쉽게 해결할 수 있는지 알고 있습니까?
답변
문제는 멀티 프로세싱이 프로세스간에 슬링하기 위해 피클을 처리해야하며 바인딩 된 메서드는 피클 링 할 수 없다는 것입니다. 해결 방법 ( “쉬운”여부와 상관없이)은 프로그램에 인프라를 추가하여 그러한 메소드를 피클 할 수있게하고 copy_reg 표준 라이브러리 메소드에 등록하는 것입니다 .
예를 들어, 스레드 의 끝까지이 스레드에 대한 Steven Bethard의 기여는를 통해 메소드 산 세척 / 언 피클 링을 수행 할 수있는 완벽하게 실행 가능한 방법 중 하나를 보여줍니다 copy_reg
.
답변
표준 라이브러리를 벗어나지 않으면 멀티 프로세싱 및 피클 링이 중단되고 제한되기 때문에 이러한 솔루션은 모두보기 흉한 상태입니다.
당신의 포크를 사용하는 경우 multiprocessing
전화를 pathos.multiprocesssing
직접의 멀티 프로세싱에 클래스와 클래스 메소드를 사용할 수있는 map
기능. 이 때문입니다 dill
대신 사용 pickle
하거나 cPickle
, 및 dill
파이썬에서 거의 모든 것을 직렬화 할 수 있습니다.
pathos.multiprocessing
또한 비동기지도 기능을 제공합니다 … 그리고 수 map
(예 : 여러 인수 기능 map(math.pow, [1,2,3], [4,5,6])
)
참조 :
어떤 멀티 프로세싱 할 수 있고 함께 할 딜?
및 :
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
그리고 명백하게, 당신은 당신이 처음에하고 싶었던 것을 정확하게 원할 수 있고, 원한다면 통역사로부터 그것을 할 수 있습니다.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
https://github.com/uqfoundation/pathos 에서 코드를 얻으십시오.
답변
__call__()
내부 에 메소드를 정의 할 수도 있습니다.이 메소드 someClass()
를 호출 someClass.go()
한 다음 인스턴스를 someClass()
풀에 전달합니다 . 이 객체는 피클 가능하며 잘 작동합니다 …
답변
Steven Bethard의 솔루션에는 몇 가지 제한 사항이 있습니다.
클래스 메소드를 함수로 등록하면 메소드 처리가 완료 될 때마다 클래스의 소멸자가 놀랍게도 호출됩니다. 따라서 메소드의 n 배를 호출하는 클래스의 인스턴스가 1 회있는 경우 멤버는 2 번 실행 사이에 사라지고 메시지 malloc: *** error for object 0x...: pointer being freed was not allocated
(예 : 열린 멤버 파일) 또는 pure virtual method called,
(사용 한 멤버 객체의 수명 이보다 짧았습니다. 내가 생각한 것). 수영장 크기보다 큰 n을 처리 할 때 이것을 얻었습니다. 다음은 간단한 예입니다.
terminate called without an active exception
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
산출:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
__call__
[없음 …] 결과로부터 판독되지 않으므로 방법은 매우 동등하지 않다 :
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
따라서 두 방법 중 어느 것도 만족스럽지 않습니다 …
답변
클래스 인스턴스의 내용에 따라 비효율적 일 수 있지만 사용할 수있는 또 다른 바로 가기가 있습니다.
모든 사람들이 말했듯이, multiprocessing
코드는 코드가 시작한 서브 프로세스로 전송하는 것을 피클해야하고 피커는 인스턴스 메소드를 수행하지 않는다는 것입니다.
그러나 인스턴스 메소드를 전송하는 대신 실제 클래스 인스턴스와 호출 할 함수의 이름을 getattr
인스턴스 메소드를 호출하는 데 사용되는 일반 함수 로 전송하여 Pool
서브 프로세스 에서 바운드 메소드를 작성할 수 있습니다. 이것은 __call__
둘 이상의 멤버 함수를 호출 할 수 있다는 점을 제외하고 메소드 정의와 유사 합니다.
@EricH.의 코드를 그의 답변에서 훔쳐서 약간 주석을 달았습니다 (모든 이름이 변경되어 어떤 이유로 든 이름이 바뀌어 붙여 넣기가 더 쉬워 보입니다 :-)).
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
결과는 실제로 생성자가 한 번 (원래 pid에서) 호출되고 소멸자가 9 번 (필요에 따라 각 사본마다 한 번씩 = 풀 작업자 프로세스 당 2 ~ 3 번, 원본에서 한 번) 호출된다는 것을 보여줍니다 방법). 기본 피커는 전체 인스턴스의 복사본을 만들고 (반)이 비밀리에 다시 채우기 때문에이 경우처럼 다음과 같이 수행하는 것이 좋습니다.
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
그렇기 때문에 3 개의 작업자 프로세스에서 소멸자가 8 번 호출 되더라도 매번 1에서 0으로 카운트 다운됩니다. 물론 이러한 방식으로 문제가 발생할 수 있습니다. 필요한 경우 자신의 것을 제공 할 수 있습니다 __setstate__
.
def __setstate__(self, adict):
self.count = adict['count']
이 경우 예를 들어.
답변
__call__()
내부 에 메소드를 정의 할 수도 있습니다.이 메소드 someClass()
를 호출 someClass.go()
한 다음 인스턴스를 someClass()
풀에 전달합니다 . 이 객체는 피클 가능하며 잘 작동합니다 …
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
답변
위의 parisjohn 의 솔루션은 나와 잘 작동합니다. 또한 코드가 깨끗하고 이해하기 쉽습니다. 필자의 경우 Pool을 사용하여 호출하는 함수가 몇 가지 있으므로 parisjohn의 코드를 약간 아래에서 수정했습니다. 내가 만든 전화 몇 가지 함수를 호출 할 수 있도록하고, 함수 이름에서 인수 DICT에 전달됩니다 go()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()