각 줄을 처리하고 (일부 작업 수행) 데이터베이스에 저장하려는 하나의 큰 텍스트 파일이 있습니다. 하나의 간단한 프로그램이 너무 오래 걸리기 때문에 여러 프로세스 또는 스레드를 통해 수행되기를 바랍니다. 각 스레드 / 프로세스는 해당 단일 파일에서 다른 데이터 (다른 줄)를 읽고 데이터 조각 (줄)에 대해 몇 가지 작업을 수행하고 데이터베이스에 넣어 결국 전체 데이터를 처리하고 내 데이터베이스는 필요한 데이터로 덤프됩니다.
그러나 나는 이것에 접근하는 방법을 알 수 없습니다.
답변
당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.
기본 스레딩 예
다음은 스레딩 모듈을 사용하는 기본 예 입니다 (다중 처리 대신).
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
스레드와 파일 객체를 공유하지 않습니다. 데이터 줄을 큐 에 제공하여 작업을 생성 합니다. 그런 다음 각 스레드는 라인을 선택하고 처리 한 다음 대기열에 반환합니다.
목록 및 특별한 종류의 Queue 와 같이 데이터를 공유하기 위해 다중 처리 모듈 에 내장 된 고급 기능이 있습니다 . 다중 처리와 스레드를 사용하는 데는 절충점이 있으며 작업이 CPU 바인딩인지 IO 바인딩인지에 따라 다릅니다.
기본 다중 처리 풀 예제
다음은 다중 처리 풀의 정말 기본적인 예입니다.
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
풀 은 자체 프로세스를 관리하는 편리한 개체입니다. 열린 파일은 라인을 반복 할 수 있으므로 파일을으로 전달할 수 있습니다. 그러면 파일을 반복 pool.map()
하고 작업자 함수에 라인을 전달합니다. 블록을 매핑 하고 완료되면 전체 결과를 반환합니다. 이것은 지나치게 단순화 된 예이며 pool.map()
는 작업을 수행하기 전에 한 번에 전체 파일을 메모리로 읽어 들일 것입니다. 대용량 파일이 예상되는 경우이를 염두에 두십시오. 생산자 / 소비자 설정을 설계하는 더 고급 방법이 있습니다.
제한 및 라인 재 분류가있는 수동 “풀”
이것은 Pool.map 의 수동 예제 이지만 한 번에 전체 이터 러블 을 소비하는 대신 대기열 크기를 설정하여 처리 할 수있는 한 빨리 하나씩 만 공급하도록 할 수 있습니다. 또한 줄 번호를 추가하여 나중에 원할 경우 추적하고 참조 할 수 있습니다.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
답변
여기 내가 만든 정말 멍청한 예가 있습니다.
import os.path
import multiprocessing
def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!='\n' and n > 0:
n-=1
f.seek(n)
c=f.read(1)
f.seek(n)
return n
filename='gpdata.dat' #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)
#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)
#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
with open(filename,'r') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
end_byte=[i-1 for i in start_byte] [1:] + [None]
def process_piece(filename,start,end):
with open(filename,'r') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)
# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.
returnobj=text
return returnobj
def wrapper(args):
return process_piece(*args)
filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)
pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)
#Now take your results and write them to the database.
print "".join(result) #I just print it to make sure I get my file back ...
여기서 까다로운 부분은 줄 바꿈 문자로 파일을 분할하여 어떤 줄도 놓치지 않도록 (또는 부분 줄만 읽도록) 확인하는 것입니다. 그런 다음 각 프로세스는 파일의 일부를 읽고 주 스레드가 데이터베이스에 넣을 수있는 개체를 반환합니다. 물론 모든 정보를 한 번에 메모리에 보관할 필요가 없도록이 부분을 청크 단위로 수행해야 할 수도 있습니다. (이것은 매우 쉽게 수행됩니다. “args”목록을 X 청크로 분할하고 호출 pool.map(wrapper,chunk)
하십시오. 여기를 참조 하십시오. )
답변
하나의 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드에서 처리하십시오.