[python] 하위 프로세스에서 비 블로킹 읽기 파이썬의 PIPE

하위 프로세스 모듈 을 사용하여 하위 프로세스 를 시작하고 출력 스트림 (stdout)에 연결합니다. stdout에서 비 블로킹 읽기를 실행할 수 있기를 원합니다. .readline을 비 블로킹하거나 호출하기 전에 스트림에 데이터가 있는지 확인하는 방법이 .readline있습니까? 나는 이것이 휴대용이거나 최소한 Windows와 Linux에서 작동하기를 원합니다.

여기에 내가 지금하는 방법이 있습니다 (사용 가능한 .readline데이터가없는 경우 차단됩니다 ).

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()



답변

fcntl, select, asyncproc이 경우 도움이되지 않습니다.

운영 체제와 상관없이 차단하지 않고 스트림을 읽는 안정적인 방법은 다음을 사용하는 것입니다 Queue.get_nowait().

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line


답변

나는 종종 비슷한 문제를 겪었다. 필자가 자주 쓰는 Python 프로그램은 명령 줄 (stdin)에서 사용자 입력을받는 동시에 일부 기본 기능을 실행할 수 있어야합니다. 사용자 입력 처리 기능을 다른 스레드에 넣는 것만으로도 문제가 해결 readline()되지 않으며 시간이 초과되지 않습니다. 기본 기능이 완료되고 더 이상 사용자 입력을 기다릴 필요가 없으면 일반적으로 프로그램을 종료하고 싶지만 readline()다른 스레드에서 여전히 라인을 기다리는 중이므로 차단할 수 없습니다 . 이 문제에 대한 해결책은 fcntl 모듈을 사용하여 stdin을 비 차단 파일로 만드는 것입니다.

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

내 의견으로는 이것은이 문제를 해결하기 위해 선택 또는 신호 모듈을 사용하는 것보다 약간 더 깨끗하지만 다시 UNIX에서만 작동합니다 …


답변

파이썬 3.4는 비동기식 IO 모듈을 위한 새로운 임시 API 를 도입했습니다 .asyncio

이 접근 방식은 twisted@Bryan Ward의 기반 답변 과 유사 합니다. 프로토콜을 정의하면 데이터가 준비되는 즉시 해당 메소드가 호출됩니다.

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

docs의 “Subprocess”를 참조하십시오 .

코 루틴
( / Python 3.5+ 구문 사용 )을 사용하여 비동기 적으로 행을 읽을 수있는 객체asyncio.create_subprocess_exec() 를 반환하는 고급 인터페이스 가 있습니다 .ProcessStreamReader.readline()asyncawait

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() 다음 작업을 수행합니다.

  • 하위 프로세스를 시작하고 stdout을 파이프로 리디렉션
  • 서브 프로세스의 stdout에서 행을 비동기 적으로 읽습니다.
  • 하위 프로세스 종료
  • 종료 될 때까지 기다리십시오

필요한 경우 각 단계는 시간 초과로 제한 될 수 있습니다.


답변

asyncproc 모듈을 사용해보십시오 . 예를 들면 다음과 같습니다.

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

모듈은 S.Lott에서 제안한대로 모든 스레딩을 처리합니다.


답변

Twisted 에서이 작업을 쉽게 수행 할 수 있습니다 . 기존 코드 기반에 따라 사용하기 쉽지 않을 수도 있지만, 트위스트 된 응용 프로그램을 구축하는 경우 이와 같은 작업은 거의 사소 해집니다. ProcessProtocol클래스를 만들고 outReceived()메서드를 재정의합니다 . 트위스트 (사용 된 리액터에 따라 다름)는 일반적으로 select()다른 파일 디스크립터 (종종 네트워크 소켓)의 데이터를 처리하기 위해 콜백이 설치된 큰 루프입니다. 따라서이 outReceived()방법은에서 오는 데이터를 처리하기위한 콜백을 설치하는 것입니다 STDOUT. 이 동작을 보여주는 간단한 예는 다음과 같습니다.

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

트위스트 문서는 이에 대한 좋은 정보가 있습니다.

Twisted를 중심으로 전체 응용 프로그램을 구축하면 로컬 또는 원격의 다른 프로세스와 비동기식으로 통신 할 수 있습니다. 반면에, 프로그램이 Twisted를 기반으로 구축되지 않은 경우 실제로 그렇게 도움이되지는 않습니다. 바라건대 이것은 특정 응용 프로그램에 적용되지 않더라도 다른 독자에게 도움이 될 수 있기를 바랍니다.


답변

select & read (1)를 사용하십시오.

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''):
  while (select.select([proc.stdout],[],[],0)[0]!=[]):
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

readline ()과 같은 경우 :

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a


답변

한 가지 해결책은 프로세스를 읽도록 다른 프로세스를 만들거나 시간 초과로 프로세스 스레드를 만드는 것입니다.

다음은 타임 아웃 함수의 스레드 버전입니다.

http://code.activestate.com/recipes/473878/

그러나 들어올 때 stdout을 읽어야합니까? 다른 해결책은 출력을 파일로 덤프하고 p.wait () 사용하여 프로세스가 완료 될 때까지 기다리는 것입니다 .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()