[python] Python : Python 함수를 병렬로 실행하려면 어떻게해야합니까?

먼저 조사했지만 내 질문에 대한 답을 찾지 못했습니다. Python에서 여러 함수를 병렬로 실행하려고합니다.

다음과 같은 것이 있습니다.

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

func1과 func2를 호출하고 동시에 실행되도록하고 싶습니다. 함수는 서로 상호 작용하거나 동일한 객체에서 상호 작용하지 않습니다. 지금은 func2가 시작되기 전에 func1이 끝날 때까지 기다려야합니다. 다음과 같이 어떻게합니까?

process.py

from files import func1, func2

runBothFunc(func1(), func2())

매 분마다 생성되는 파일 수를 계산하기 때문에 두 디렉토리를 거의 같은 시간에 만들 수 있기를 원합니다. 디렉토리가 없으면 내 타이밍이 떨어질 것입니다.



답변

당신은 사용할 수 있습니다 threading또는 multiprocessing.

로 인해 CPython과의 특수성 , threading진정한 병렬 처리를 달성하기 어렵다. 이러한 이유로 multiprocessing일반적으로 더 나은 방법입니다.

다음은 완전한 예입니다.

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

자식 프로세스를 시작 / 결합하는 메커니즘은 다음과 같은 라인을 따라 함수로 쉽게 캡슐화 될 수 있습니다 runBothFunc.

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)


답변

이는 Python 코드를 쉽게 병렬화하고 배포 할 수있는 시스템 인 Ray로 우아하게 수행 할 수 있습니다 .

예제를 병렬화하려면 @ray.remote데코레이터로 함수를 정의한 다음 .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

두 함수에 동일한 인수를 전달하고 인수가 큰 경우이를 수행하는보다 효율적인 방법은 ray.put(). 이렇게하면 큰 인수가 두 번 직렬화되고 두 개의 메모리 복사본이 생성되는 것을 방지 할 수 있습니다.

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

만약 func1()func2()반환 결과는 다음과 같이 코드를 다시 작성해야합니다

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

다중 처리 모듈에 비해 Ray를 사용하면 많은 이점이 있습니다. 특히, 동일한 코드 가 단일 시스템과 시스템 클러스터에서 실행됩니다. Ray의 더 많은 장점은 이 관련 게시물을 참조하십시오 .


답변

함수가 주로 I / O 작업 (그리고 CPU 작업이 적음)을 수행하고 Python 3.2 이상을 사용하는 경우 ThreadPoolExecutor를 사용할 수 있습니다 .

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

함수가 주로 CPU 작업을 수행하고 (그리고 I / O 작업이 적다면) Python 2.6 이상을 사용하는 경우 다중 처리 모듈을 사용할 수 있습니다 .

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])


답변

Windows 사용자이고 python 3을 사용하는 경우이 게시물은 python에서 병렬 프로그래밍을 수행하는 데 도움이 될 것입니다. 일반적인 다중 처리 라이브러리의 풀 프로그래밍을 실행할 때 프로그램의 주요 기능과 관련된 오류가 발생합니다. 이는 창에 fork () 기능이 없기 때문입니다. 아래 게시물은 언급 된 문제에 대한 해결책을 제공합니다.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

파이썬 3을 사용하고 있었기 때문에 프로그램을 다음과 같이 약간 변경했습니다.

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

이 함수 후에는 위의 문제 코드도 다음과 같이 약간 변경됩니다.

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

그리고 출력은 다음과 같습니다.

[1, 8, 27, 64, 125, 216]

이 게시물이 일부 Windows 사용자에게 유용 할 것이라고 생각합니다.


답변

두 기능이 서로 동기화되어 실행되도록 보장 할 수있는 방법은 없습니다.

가장 좋은 방법은 함수를 여러 단계로 나눈 다음 Process.join@aix의 답변 언급과 같은 중요한 동기화 지점에서 둘 다 완료 될 때까지 기다리는 것입니다 .

time.sleep(10)정확한 타이밍을 보장 할 수 없기 때문에 더 좋습니다 . 명시 적으로 기다림을 사용하면 다음 단계로 이동하기 전에 해당 단계를 실행해야 함을 의미합니다. 10ms 이내에 수행 될 것이라고 가정하는 대신 컴퓨터에서 다른 작업이 진행되고 있는지에 따라 보장되지 않습니다.


답변

두 개의 다른 매개 변수를 호출해야하는 단일 함수가있는 것 같습니다. 이 우아의 조합을 사용하여 수행 할 수 있습니다 concurrent.futuresmap파이썬을 3.2 이상

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

이제 작업이 IO 바인딩 된 경우 다음 ThreadPoolExecutor과 같이 사용할 수 있습니다 .

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

map여기에서 map인수 목록에 대한 함수 가 어떻게 사용 되는지 확인 하십시오 .

이제 함수가 CPU 바운드 인 경우 다음을 사용할 수 있습니다. ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

확실하지 않은 경우 두 가지를 모두 시도하고 어느 것이 더 나은 결과를 제공하는지 확인할 수 있습니다.

마지막으로, 결과를 인쇄하려는 경우 다음과 같이하면됩니다.

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)


답변