[python] Python 다중 처리를 사용하여 난처한 병렬 문제 해결

당황스럽게 병렬 문제 를 해결하기 위해 다중 처리 를 어떻게 사용 합니까?

당황스러운 병렬 문제는 일반적으로 세 가지 기본 부분으로 구성됩니다.

  1. 파일, 데이터베이스, tcp 연결 등에서 입력 데이터를 읽습니다 .
  2. 각 계산이 다른 계산과 독립적 인 입력 데이터에 대해 계산을 실행 합니다.
  3. 계산 결과를 작성 합니다 (파일, 데이터베이스, tcp 연결 등).

프로그램을 두 가지 차원으로 병렬화 할 수 있습니다.

  • 파트 2는 각 계산이 독립적이므로 여러 코어에서 실행할 수 있습니다. 처리 순서는 중요하지 않습니다.
  • 각 부품은 독립적으로 실행할 수 있습니다. 파트 1은 데이터를 입력 큐에 배치 할 수 있고, 파트 2는 데이터를 입력 큐에서 가져와 출력 큐에 넣을 수 있으며, 파트 3은 결과를 출력 큐에서 가져 와서 쓸 수 있습니다.

이것은 동시 프로그래밍에서 가장 기본적인 패턴으로 보이지만 여전히 해결하려고 노력하지 않고 있으므로 multiprocessing을 사용하여 이것이 어떻게 수행되는지 설명하는 표준 예제를 작성해 보겠습니다 .

다음은 예제 문제입니다. 입력으로 정수 행이 있는 CSV 파일 이 주어지면 합계를 계산합니다. 문제를 세 부분으로 분리하면 모두 병렬로 실행될 수 있습니다.

  1. 입력 파일을 원시 데이터 (정수 목록 / 반복 가능)로 처리
  2. 병렬로 데이터의 합계를 계산합니다.
  3. 합계 출력

다음은 이러한 세 가지 작업을 해결하는 전통적인 단일 프로세스 바인딩 Python 프로그램입니다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

이 프로그램을 사용하여 위에서 설명한 세 부분을 병렬화하기 위해 다중 처리를 사용하도록 다시 작성해 보겠습니다. 아래는 주석의 부분을 다루기 위해 구체화해야하는이 새로운 병렬 프로그램의 골격입니다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

이러한 코드와 테스트 목적으로 예제 CSV 파일생성 할 수있는 다른 코드는 github에서 찾을 수 있습니다 .

동시성 전문가 가이 문제에 어떻게 접근하는지에 대한 통찰력을 높이고 싶습니다.


이 문제에 대해 생각할 때 몇 가지 질문이 있습니다. 일부 / 모두 해결에 대한 보너스 포인트 :

  • 데이터를 읽고 큐에 배치하기위한 자식 프로세스가 있어야합니까, 아니면 모든 입력을 읽을 때까지 차단하지 않고 주 프로세스가이를 수행 할 수 있습니까?
  • 마찬가지로, 처리 된 큐에서 결과를 작성하기위한 자식 프로세스가 있어야합니까? 아니면 모든 결과를 기다릴 필요없이 주 프로세스가이를 수행 할 수 있습니까?
  • 합계 작업에 프로세스 풀 을 사용해야 합니까?
    • 그렇다면 입력 및 출력 프로세스도 차단하지 않고 입력 대기열로 들어오는 결과를 처리하기 위해 풀에서 어떤 메서드를 호출해야합니까? apply_async () ? map_async () ? imap () ? imap_unorder () ?
  • 데이터가 입력 될 때 입력 및 출력 큐를 빼낼 필요가 없지만 모든 입력이 구문 분석되고 모든 결과가 계산 될 때까지 기다릴 수 있다고 가정합니다 (예 : 모든 입력 및 출력이 시스템 메모리에 적합하다는 것을 알고 있기 때문입니다). 어떤 방식 으로든 알고리즘을 변경해야합니까 (예 : I / O와 동시에 프로세스를 실행하지 않음)?



답변

내 솔루션에는 출력 순서가 입력 순서와 동일한 지 확인하기 위해 추가 벨과 휘슬이 있습니다. 저는 multiprocessing.queue를 사용하여 프로세스간에 데이터를 전송하고 중지 메시지를 전송하여 각 프로세스가 큐 확인을 종료하도록 알립니다. 나는 소스의 코멘트가 무슨 일이 일어나고 있는지 명확하게해야한다고 생각하지만 알려주지 않는다면 말이다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])


답변

파티에 늦게 오는 중 …

joblib 에는 병렬 for 루프를 만드는 데 도움이되는 다중 처리 위에 레이어가 있습니다. 작업의 지연 디스패치와 같은 기능과 매우 간단한 구문 외에도 더 나은 오류보고를 제공합니다.

면책 조항으로서 저는 joblib의 원저자입니다.


답변

나는 파티에 조금 늦었다는 것을 알고 있지만 최근에 GNU parallel을 발견 했으며이 일반적인 작업을 얼마나 쉽게 수행 할 수 있는지 보여주고 싶습니다.

cat input.csv | parallel ./sum.py --pipe > sums

다음과 같은 작업이 수행됩니다 sum.py.

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

병렬은 (물론 병렬 sum.py로) 모든 라인에 대해 실행 된 input.csv다음 결과를 sums. multiprocessing번거 로움 보다 분명히 낫다


답변

오래된 학교.

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

다음은 다중 처리 최종 구조입니다.

python p1.py | python p2.py | python p3.py

예, 쉘은 OS 수준에서 이들을 함께 묶었습니다. 나에게 더 간단 해 보이며 매우 잘 작동합니다.

예, pickle (또는 cPickle)을 사용하는 데 약간의 오버 헤드가 있습니다. 그러나 단순화는 노력할만한 가치가있는 것 같습니다.

파일 이름을 인수로 사용하려는 경우 p1.py 사용하려면 쉽게 변경할 수 있습니다.

더 중요한 것은 다음과 같은 기능이 매우 편리하다는 것입니다.

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

이를 통해 다음을 수행 할 수 있습니다.

for item in get_stdin():
     process item

이것은 매우 간단하지만 P2.py의 여러 복사본을 실행하는 것을 쉽게 허용 .

팬 아웃과 팬인의 두 가지 문제가 있습니다. P1.py는 어떻게 든 여러 P2.py로 팬 아웃되어야합니다. 그리고 P2.py는 어떻게 든 결과를 단일 P3.py로 병합해야합니다.

팬 아웃에 대한 구식 접근 방식은 매우 효과적인 “푸시”아키텍처입니다.

이론적으로 공통 큐에서 가져 오는 여러 P2.py가 최적의 리소스 할당입니다. 이것은 종종 이상적이지만 상당한 양의 프로그래밍이기도합니다. 프로그래밍이 정말 필요합니까? 아니면 라운드 로빈 처리가 충분할까요?

실제로, P1.py가 여러 P2.py를 처리하는 간단한 “라운드 로빈”을 수행하도록하는 것이 상당히 좋을 수 있습니다. 명명 된 파이프를 통해 P2.py의 n 개의 복사본 을 처리하도록 P1.py를 구성했을 것 입니다. P2.py는 각각 적절한 파이프에서 읽습니다.

하나의 P2.py가 모든 “최악의 경우”데이터를 가져 와서 뒤처진다면 어떨까요? 예, 라운드 로빈은 완벽하지 않습니다. 그러나 하나의 P2.py보다 낫고 간단한 무작위 화로 이러한 편향을 해결할 수 있습니다.

여러 P2.py에서 하나의 P3.py 로의 팬인은 여전히 ​​조금 더 복잡합니다. 이 시점에서 구식 접근 방식은 유리하지 않습니다. P3.py는 읽기 select를 인터리브하기 위해 라이브러리를 사용하여 여러 명명 된 파이프 에서 읽어야합니다.


답변

파트 1에도 약간의 병렬 처리를 도입 할 수 있습니다. 아마도 CSV처럼 간단한 형식의 문제는 아니지만 입력 데이터 처리가 데이터를 읽는 것보다 눈에 띄게 느리면 더 큰 청크를 읽은 다음 “행 구분 기호”( CSV의 경우 개행이지만 읽은 형식에 따라 다릅니다. 형식이 충분히 복잡한 경우 작동하지 않습니다).

각각 여러 항목을 포함하고있을 가능성이있는 이러한 청크는 대기열에서 작업을 읽는 병렬 프로세스 군중으로 파밍되어 파싱 및 분할 된 다음 2 단계의 대기열에 배치 될 수 있습니다.


답변