[python] 다중 처리 : tqdm을 사용하여 진행률 표시 줄 표시

내 코드를보다 “pythonic”하고 빠르게 만들기 위해 “multiprocessing”과 맵 함수를 사용하여 a) 함수 및 b) 반복 범위를 보냅니다.

이식 된 솔루션 (즉, tqdm.tqdm (range (0, 30)) 범위에서 직접 tqdm 호출)은 다중 처리 (아래 코드에서 공식화 됨)에서 작동하지 않습니다.

진행률 표시 줄은 0 ~ 100 % (python이 코드를 읽을 때?)로 표시되지만지도 기능의 실제 진행률을 나타내지는 않습니다.

‘지도’기능이 어느 단계에 있는지 나타내는 진행률 표시 줄을 표시하는 방법은 무엇입니까?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

모든 도움이나 제안을 환영합니다 …



답변

처리 된 값의 반복자를 반환하는 map 대신 imap을 사용합니다.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))


답변

솔루션을 찾았습니다 : 조심하세요! 다중 처리로 인해 추정 시간 (루프 당 반복, 총 시간 등)이 불안정 할 수 있지만 진행률 표시 줄은 완벽하게 작동합니다.

참고 : 풀용 컨텍스트 관리자는 Python 버전 3.3에서만 사용할 수 있습니다.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()


답변

p_tqdm대신 사용할 수 있습니다 .

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))


답변

늦어서 죄송 합니다만 동시지도 만 있으면 최신 버전 ( tqdm>=4.42.0)에 다음이 내장되어 있습니다.

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

참조 : https://tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py


답변

Xavi Martínez의 답변을 바탕으로 함수를 작성했습니다 imap_unordered_bar. imap_unordered처리 막대가 표시되는 유일한 차이점을 제외 하고 동일한 방식으로 사용할 수 있습니다 .

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))


답변

병렬 실행 함수에서 결과를 다시 가져와야 할 때에 대한 필자의 견해입니다. 이 기능은 몇 가지 작업을 수행하지만 (더 자세히 설명하는 다른 게시물이 있음) 핵심은 대기중인 작업 대기열과 완료된 작업 대기열이 있다는 것입니다. 작업자가 대기중인 대기열의 각 작업을 완료하면 작업 완료 대기열에 결과를 추가합니다. tqdm 진행률 표시 줄을 사용하여 작업 완료 대기열에 검사를 래핑 할 수 있습니다. 여기에 do_work () 함수의 구현을 넣는 것이 아닙니다. 여기에있는 메시지는 작업 완료 대기열을 모니터링하고 결과가 나올 때마다 진행률 표시 줄을 업데이트하는 것이므로 관련이 없습니다.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results


답변

import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))