[python] 여러 인수에 대한 Python 다중 처리 pool.map

Python 다중 처리 라이브러리에 여러 인수를 지원하는 pool.map의 변형이 있습니까?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()



답변

이에 대한 대답은 버전 및 상황에 따라 다릅니다. 최신 버전의 Python (3.3 이후)에 대한 가장 일반적인 대답은 JF Sebastian에 의해 처음 설명되었습니다 . 1Pool.starmap 일련의 인수 튜플을 허용하는 메소드를 사용합니다 . 그런 다음 각 튜플에서 인수를 자동으로 압축 해제하여 주어진 함수에 전달합니다.

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

이전 버전의 Python에서는 인수를 명시 적으로 풀기 위해 도우미 함수를 작성해야합니다. 을 사용하려면 컨텍스트 관리자 with로 전환 할 래퍼도 작성해야합니다 Pool. ( 이 점을 지적한 뮤온 에게 감사합니다 .)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

더 간단한 경우에는 고정 된 두 번째 인수 partial로을 사용할 수 있지만 Python 2.7 이상에서만 사용할 수 있습니다 .

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1.이 중 많은 부분이 그의 대답에서 영감을 받았으며, 아마도 대신에 받아 들여졌을 것입니다. 그러나 이것이 최상위에 붙어 있기 때문에 미래 독자를 위해 개선하는 것이 가장 좋았습니다.


답변

여러 인수를 지원하는 pool.map의 변형이 있습니까?

Python 3.3에는 pool.starmap()메소드가 포함되어 있습니다 .

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

이전 버전의 경우 :

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

산출

1 1
2 1
3 1

여기서 어떻게 itertools.izip()그리고 어떻게 itertools.repeat()사용 되는지 주목 하십시오.

@unutbu가 언급 한 버그 로 인해 functools.partial()Python 2.6에서 유사한 기능을 사용할 수 없으므로 간단한 래퍼 함수 func_star()를 명시 적으로 정의해야합니다. 에서 제안한 해결 방법 도 참조하십시오 .uptimebox


답변

나는 아래가 더 좋을 것이라고 생각한다.

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

산출

[3, 5, 7]


답변

파이썬 3.3 이상 과 함께 사용 하기pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"]

pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()

결과:

1 --- 4
2 --- 5
3 --- 6

원하는 경우 더 많은 인수를 zip () 할 수도 있습니다. zip(a,b,c,d,e)

경우에 당신은 당신이 사용할 필요가 인수로 전달 상수 값 갖고 싶어 import itertools다음과 zip(itertools.repeat(constant), a)예를.


답변

JF Sebastian의 itertools에 대해 배웠기 때문에 한 단계 더 나아가 파이썬-2.7 및 python-3.2 (및 그 이후 버전)의 parmap병렬화, 오퍼링 mapstarmap함수를 처리 하여 여러 위치 인수를 취할 수 있는 패키지를 작성하기로 결정했습니다. .

설치

pip install parmap

병렬화하는 방법 :

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

PyPI 및 github 저장소에 파맵을 업로드했습니다 .

예를 들어 다음과 같이 질문에 대답 할 수 있습니다.

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)


답변

# “다수의 주장을 취하는 방법”.

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4)

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)


답변

필요없는 pathos ( 주 : github의 버전 사용)multiprocessing 라는 포크가 있습니다 .지도 함수는 파이썬의 맵에 대한 API를 미러링하므로 map은 여러 인수를 취할 수 있습니다. 을 사용하면 일반적으로 블록 에 갇히지 않고 인터프리터에서 멀티 프로세싱을 수행 할 수도 있습니다 . Pathos는 약간의 업데이트 후 릴리스가 예정되어 있습니다. 주로 python 3.x 로의 변환입니다.starmappathos__main__

  Python 2.7.5 (default, Sep 30 2013, 20:15:49)
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathos의 정확한 동작을 얻을 수있는 몇 가지 방법이 있습니다 starmap.

>>> def add(*x):
...   return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>