[python] 파이썬의 멀티 프로세싱 풀과 키보드 인터럽트

파이썬의 다중 처리 풀로 KeyboardInterrupt 이벤트를 어떻게 처리 할 수 ​​있습니까? 다음은 간단한 예입니다.

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

위의 코드를 실행할 KeyboardInterrupt때을 누르면 ^C프로세스 가 발생 하지만 프로세스는 단순히 그 시점에서 중단되며 외부에서 종료해야합니다.

^C언제든지 누를 수 있고 모든 프로세스가 정상적으로 종료 되도록하고 싶습니다 .



답변

이것은 파이썬 버그입니다. threading.Condition.wait ()에서 조건을 기다리는 경우 KeyboardInterrupt가 전송되지 않습니다. 재현 :

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

KeyboardInterrupt 예외는 wait ()가 리턴 될 때까지 전달되지 않으며 리턴되지 않으므로 인터럽트가 발생하지 않습니다. KeyboardInterrupt는 거의 확실히 조건 대기를 중단해야합니다.

타임 아웃이 지정된 경우에는 이런 일이 발생하지 않습니다. cond.wait (1)은 즉시 인터럽트를 수신합니다. 따라서 해결 방법은 시간 초과를 지정하는 것입니다. 그렇게하려면 교체

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

또는 유사합니다.


답변

내가 최근에 찾은 것 중에서 가장 좋은 해결책은 작업자 프로세스를 설정하여 SIGINT를 완전히 무시하고 모든 정리 코드를 부모 프로세스로 제한하는 것입니다. 이는 유휴 및 바쁜 작업자 프로세스 모두에 대한 문제를 해결하며 자식 프로세스에서 오류 처리 코드가 필요하지 않습니다.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

설명 및 전체 예제 코드는 각각 http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt 에서 찾을 수 있습니다 .


답변

어떤 이유로 기본 Exception클래스 에서 상속 된 예외 만 정상적으로 처리됩니다. 이 문제를 해결하기 위해 다음 KeyboardInterrupt과 같이 Exception인스턴스를 다시 올릴 수 있습니다 .

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

일반적으로 다음과 같은 결과가 나타납니다.

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

따라서를 누르면 ^C다음을 얻을 수 있습니다.

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end


답변

일반적으로이 간단한 구조는 작동 CtrlC수영장에 :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

몇 가지 유사한 게시물에 언급 된 바와 같이 :

try-except없이 파이썬에서 키보드 인터럽트 캡처


답변

투표 된 답변은 핵심 문제를 다루지는 않지만 유사한 부작용을 해결합니다.

멀티 프로세싱 라이브러리의 저자 인 Jesse Noller multiprocessing.Pool는 오래된 블로그 게시물 에서 CTRL + C를 올바르게 처리하는 방법을 설명합니다 .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()


답변

다중 처리 성가신 동안 예외를 만드는 두 가지 문제가있는 것 같습니다. 첫 번째 (Glenn에 의해 표시됨)는 즉각적인 응답을 얻기 위해 (즉, 전체 목록 처리를 완료하지 않고) map_async타임 아웃과 함께 사용해야한다는 것 map입니다. 두 번째 (Andrey로 표시)는 멀티 프로세싱에서 상속되지 않는 예외 Exception(예 :)를 포착하지 못한다는 것입니다 SystemExit. 다음은이 두 가지를 모두 다루는 솔루션입니다.

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results


답변

당분간 최선의 해결책은 multiprocessing.pool 기능을 사용하지 않고 자신의 풀 기능을 롤링하는 것입니다. apply_async로 오류를 보여주는 예제와 풀 기능을 모두 사용하지 않는 방법을 보여주는 예제를 제공했습니다.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/