[python] 여러 프로세스에서 단일 파일 처리

각 줄을 처리하고 (일부 작업 수행) 데이터베이스에 저장하려는 하나의 큰 텍스트 파일이 있습니다. 하나의 간단한 프로그램이 너무 오래 걸리기 때문에 여러 프로세스 또는 스레드를 통해 수행되기를 바랍니다. 각 스레드 / 프로세스는 해당 단일 파일에서 다른 데이터 (다른 줄)를 읽고 데이터 조각 (줄)에 대해 몇 가지 작업을 수행하고 데이터베이스에 넣어 결국 전체 데이터를 처리하고 내 데이터베이스는 필요한 데이터로 덤프됩니다.

그러나 나는 이것에 접근하는 방법을 알 수 없습니다.



답변

당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.

기본 스레딩 예

다음은 스레딩 모듈을 사용하는 기본 예 입니다 (다중 처리 대신).

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

스레드와 파일 객체를 공유하지 않습니다. 데이터 줄을 에 제공하여 작업을 생성 합니다. 그런 다음 각 스레드는 라인을 선택하고 처리 한 다음 대기열에 반환합니다.

목록 및 특별한 종류의 Queue 와 같이 데이터를 공유하기 위해 다중 처리 모듈 에 내장 된 고급 기능이 있습니다 . 다중 처리와 스레드를 사용하는 데는 절충점이 있으며 작업이 CPU 바인딩인지 IO 바인딩인지에 따라 다릅니다.

기본 다중 처리 풀 예제

다음은 다중 처리 풀의 정말 기본적인 예입니다.

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

은 자체 프로세스를 관리하는 편리한 개체입니다. 열린 파일은 라인을 반복 할 수 있으므로 파일을으로 전달할 수 있습니다. 그러면 파일을 반복 pool.map()하고 작업자 함수에 라인을 전달합니다. 블록을 매핑 하고 완료되면 전체 결과를 반환합니다. 이것은 지나치게 단순화 된 예이며 pool.map()는 작업을 수행하기 전에 한 번에 전체 파일을 메모리로 읽어 들일 것입니다. 대용량 파일이 예상되는 경우이를 염두에 두십시오. 생산자 / 소비자 설정을 설계하는 더 고급 방법이 있습니다.

제한 및 라인 재 분류가있는 수동 “풀”

이것은 Pool.map 의 수동 예제 이지만 한 번에 전체 이터 러블 을 소비하는 대신 대기열 크기를 설정하여 처리 할 수있는 한 빨리 하나씩 만 공급하도록 할 수 있습니다. 또한 줄 번호를 추가하여 나중에 원할 경우 추적하고 참조 할 수 있습니다.

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)


답변

여기 내가 만든 정말 멍청한 예가 있습니다.

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else:
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

여기서 까다로운 부분은 줄 바꿈 문자로 파일을 분할하여 어떤 줄도 놓치지 않도록 (또는 부분 줄만 읽도록) 확인하는 것입니다. 그런 다음 각 프로세스는 파일의 일부를 읽고 주 스레드가 데이터베이스에 넣을 수있는 개체를 반환합니다. 물론 모든 정보를 한 번에 메모리에 보관할 필요가 없도록이 부분을 청크 단위로 수행해야 할 수도 있습니다. (이것은 매우 쉽게 수행됩니다. “args”목록을 X 청크로 분할하고 호출 pool.map(wrapper,chunk) 하십시오. 여기를 참조 하십시오. )


답변

하나의 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드에서 처리하십시오.


답변