programing

하위 프로세스에 대한 비차단 읽기입니다.Python의 파이프

firstcheck 2022. 10. 2. 22:52
반응형

하위 프로세스에 대한 비차단 읽기입니다.Python의 파이프

하위 프로세스 모듈을 사용하여 하위 프로세스를 시작하고 해당 출력 스트림(표준 출력)에 연결합니다.표준 출력으로 논블로킹 판독을 실행할 수 있도록 하고 싶다.호출하기 전에 .readline을 논블로킹하거나 스트림에 데이터가 있는지 확인하는 방법이 있습니까?.readlineWindows Linux の 에에에에에에에에에에에에 。

.readline「 」 「 」 、 「 」

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

fcntl, 이 경우 도움이 되지 않습니다.

운영체제에 관계없이 차단하지 않고 스트림을 읽는 신뢰성 높은 방법은 다음과 같습니다.

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Python (stdin)에서 사용자 입력을 동시에 받아들이면서 주요 기능을 실행할 수 있어야 합니다. 처리 되지 않습니다.readline()차단하고 타임아웃은 없습니다.을 더 종료하고 , 수 . 왜냐하면 "Doese"는 "Doese"를 사용하기 때문입니다.readline()다른 스레드에서 아직 회선을 기다리고 있습니다.로 만드는 입니다.fcntl 모듈입니다.

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

제 생각에는 이 문제를 해결하기 위해 선택 모듈이나 신호 모듈을 사용하는 것보다 조금 더 깨끗하지만 UNIX에서만 작동합니다.

Python 3.4는 비동기 IO -- 모듈을 위한 새로운 잠정 API를 도입했습니다.

이 접근법은 @Bryan Ward의 기반 응답과 유사합니다. 프로토콜을 정의하면 데이터가 준비되는 즉시 메서드가 호출됩니다.

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

문서의 "하위 프로세스"를 참조하십시오.

수준의 가 있습니다.asyncio.create_subprocess_exec()coroutine(/ Python 3.5+ 구문 포함)await을 사용하여 비동기적으로 행을 읽을 있는 개체를 반환합니다.

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill()는 다음합니다.

  • 시작 하위 프로세스, 파이프에 stdout 리디렉션
  • 서브프로세스의 stdout에서 비동기적으로 행을 읽다
  • 킬 서브프로세스
  • 그것이 끝나기를 기다리다.

필요에 따라서, 각 스텝을 타임 아웃 초단위로 제한할 수 있습니다.

Unix 계열의 시스템과 Python 3.5+에서는, 그것이 말하는 대로 동작합니다.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

출력은 다음과 같습니다.

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

★★★★★★★★★★★★★★★★ os.set_blocking★★★★★★★★★★★★★★★★★★:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

비동기 모듈을 시험해 보겠습니다.예를 들어 다음과 같습니다.

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

모듈은 S의 제안에 따라 모든 스레딩을 처리합니다.많이.

트위스트에서는 정말 쉽게 할 수 있어요.기존 코드 베이스에 따라서는 사용하기 쉽지 않을 수 있지만, 트위스트된 애플리케이션을 구축하고 있다면 이와 같은 것은 거의 하찮아집니다.작성하다ProcessProtocol.outReceived()에 따라 다름)은 보통 큰 꼬임일 뿐입니다.select()다른 파일 기술자(많은 경우 네트워크 소켓)의 데이터를 처리하기 위해 콜백이 설치된 루프입니다. 그...outReceived()는 단순히 하여 콜백으로 입니다.STDOUT

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted 문서에는 이에 대한 몇 가지 유용한 정보가 있습니다.

Twisted를 기반으로 전체 애플리케이션을 구축하면 로컬이든 원격이든 다른 프로세스와 비동기식으로 통신할 수 있습니다. 이렇게 우아합니다.한편, 프로그램이 Twisted를 기반으로 구축되어 있지 않은 경우에는 그다지 도움이 되지 않습니다.이것이 당신의 특정 응용 프로그램에 적용되지 않더라도 다른 독자들에게 도움이 될 수 있기를 바랍니다.

선택 및 읽기(1)를 사용합니다.

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

readline()-like의 경우:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

한 가지 해결책은 다른 프로세스를 만들어 프로세스의 판독을 수행하거나 프로세스의 스레드를 타임아웃으로 만드는 것입니다.

타임아웃 함수의 스레드 버전은 다음과 같습니다.

http://code.activestate.com/recipes/473878/

단, stdout이 들어올 때 읽을 필요가 있습니까?다른 해결 방법은 출력을 파일에 덤프하고 프로세스가 p.wait()를 사용하여 완료될 때까지 기다리는 것입니다.

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

부분 라인을 포함하여 하위 프로세스에서 나오는 모든 출력을 최대한 빨리 캡처하는 데 사용되는 내 코드입니다.펌프는 거의 올바른 순서로 stdout과 stderr를 동시에 펌프합니다.

Python 2.7 Linux 및 Windows에서 테스트 및 올바르게 동작.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

현대의 Python은 상황이 훨씬 더 좋다.

다음은 간단한 어린이 프로그램인 "hello.py"입니다.

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

그리고 그것과 상호작용할 수 있는 프로그램:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

다음과 같이 출력:

b'hello bob\n'
b'hello alice\n'

실제 패턴은 여기에서도 관련 질문에서도 거의 모든 이전 답변에 따라 달라집니다.실제 패턴은 자녀의 stdout 파일 기술자를 non-blocking으로 설정한 후 일종의 선택 루프에서 폴링하는 것입니다.물론 오늘날에는 이 루프가 비동기식으로 제공됩니다.

면책사항: 이는 토네이도에 대해서만 적용됩니다.

이를 수행하려면 fd를 nonblocking으로 설정하고 ioloop을 사용하여 콜백을 등록합니다.토네이도_서브프로세스라는 에그에 포장해서 파이를 통해 설치할 수 있습니다.PI:

easy_install tornado_subprocess

이제 다음과 같은 작업을 수행할 수 있습니다.

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

RequestHandler와 함께 사용할 수도 있습니다.

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

기존 솔루션이 작동하지 않았습니다(자세한 내용은 아래 참조).마지막으로 성공한 것은 read(1)를 사용하여 readline을 구현하는 것이었습니다( 답변에 근거).후자는 차단하지 않습니다.

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

기존 솔루션이 작동하지 않는 이유:

  1. 읽기 행이 필요한 솔루션(큐 기반 솔루션 포함)은 항상 차단됩니다.readline을 실행하는 스레드를 끊는 것은 어렵습니다(불가능).생성된 프로세스가 종료되었을 때만 중지되고 출력 생성 프로세스가 종료되었을 때는 중지되지 않습니다.
  2. anonnn이 지적한 바와 같이 낮은 수준의 fcntl과 높은 수준의 읽기 회선 콜을 조합하면 올바르게 동작하지 않을 수 있습니다.
  3. select.poll()을 사용하는 것은 간단하지만 python 문서에 따르면 Windows에서는 작동하지 않습니다.
  4. 서드파티 라이브러리를 사용하면 이 태스크에 과도한 영향을 미쳐 종속성을 추가할 수 있습니다.

하위 프로세스를 읽기 위해 이 문제를 추가합니다.펑펑 터지다.다음은 내 논블로킹 읽기 솔루션입니다.

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

다음은 스레드에 기반한 간단한 솔루션입니다.

  • 와 에서 동작합니다( 에 의존하지 ).select를 참조해 주세요.
  • 다 읽다stdout ★★★★★★★★★★★★★★★★★」stderr비동기적으로
  • 는 임의의 대기시간(CPU 친화적)에 의한 액티브 폴링에 의존하지 않습니다.
  • 않다asyncio(서양간)
  • 는자프로세스가종료될때까지실행됩니다.

프린터.화이

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

이 버전의 논블로킹 읽기에는 특별한 모듈이 필요하지 않으며 대부분의 Linux Distros에서 즉시 사용할 수 있습니다.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

원래 질문자의 문제가 있지만 스레드를 호출하고 싶지 않습니다..read()파이프와 라인 판독을 위한 나만의 버퍼링(단, 서브프로세스 ping은 항상 완전한 라인< a system page size)을 작성합니다).고브젝트 등록 IO 워치로 읽기만 하면 통화중 대기를 피할 수 있습니다.요즘은 스레드를 피하기 위해 고브젝트 MainLoop 내에서 코드를 실행합니다.

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

감시자는

def watch(f, *other):
    print 'reading',f.read()
    return True

그리고 메인 프로그램은 ping을 설정하고 나서 gobject 메일 루프를 호출한다.

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

기타 작업은 gobject의 콜백에 첨부됩니다.

여기에 이 답변을 추가하면 Windows 및 Unix에서 논블로킹 파이프를 설정할 수 있습니다.

★★★★★★★★★★★★★★★★★.ctypes자세한 내용은 @techtonik의 답변 덕분입니다.

Unix 시스템과 Windows 시스템 모두에서 사용할 수 있도록 약간 수정된 버전이 있습니다.

  • Python3 호환(작은 변경만 필요).
  • posix 버전을 포함하며 둘 중 하나에 사용할 예외를 정의합니다.

이렇게 하면 Unix 및 Windows 코드에 동일한 기능과 예외를 사용할 수 있습니다.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

불완전한 데이터를 읽는 것을 피하기 위해, 나는 나만의 readline generator(각 행의 바이트 문자열을 반환하는)를 쓰게 되었습니다.

발전기라서 예를 들면...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

선택 모듈을 통해 다음 유용한 입력 위치를 확인할 수 있습니다.

하지만, 여러분은 거의 항상 분리된 실이 더 행복합니다.하나는 stdin을 읽고, 다른 하나는 당신이 차단하고 싶지 않은 곳을 읽습니다.

왜 실타래를 괴롭히는가?readline()과 달리 BufferedReader.read1()은 \r\n을 기다리는 것을 차단하지 않습니다.출력이 착신되면 가능한 한 빨리 반환됩니다.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

저 같은 경우에는 백그라운드 어플리케이션의 출력을 캡처하여 확대하는 로깅 모듈이 필요했습니다(타임 스탬프, 색상 추가 등).

실제 I/O를 수행하는 백그라운드 스레드를 갖게 되었습니다.다음 코드는 POSIX 플랫폼 전용입니다.나는 불필요한 부분을 제거했다.

이 기능을 장기간 사용하는 경우 오픈 디스크립터 관리를 고려해 주십시오.저 같은 경우에는 큰 문제가 되지 않았습니다.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

실행 중인 프로세스에서 stdout과 stderr를 모두 수집하고 싶었지만 위젯에서 출력을 생성하면서 렌더링하고 싶었기 때문에 문제는 조금 다릅니다.

큐 또는 추가 스레드를 사용하여 제안된 많은 회피책을 사용하고 싶지 않습니다.이는 다른 스크립트를 실행하여 출력을 수집하는 일반적인 작업을 수행할 필요가 없기 때문입니다.

제안 솔루션과 python 문서를 읽고 아래 구현으로 문제를 해결했습니다. 저는 POSIX를 하고 있기 할 수 있습니다.select함수 호출

이러한 일반적인 스크립트 작업에서는 문서가 혼란스럽고 구현이 어색하다는 것에 동의합니다.합니다.Popen설명이 달라서 혼란이 많았어요.2. 와.5.2 의으로 동작하고 것 같습니다.

열쇠는 '세팅'이었다.bufsize=1는, 다음에 「」를 참조해 주세요.universal_newlines=Truebufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG 및 VERBOSE는 단순히 출력을 단말기에 인쇄하는 매크로입니다.

IMHO 99.99%를 사용하기 에 IMHO 의 효과가 .readline따라서 서브프로세스가 양호하다고 가정하고 완전한 라인을 출력합니다.

저는 아직 Python에 익숙하지 않기 때문에 솔루션 개선을 위한 피드백을 환영합니다.

처음은 아니지만 아마도 마지막은 아닐 것입니다.저는 두 가지 다른 방법으로 stdout PIPE 읽기를 차단하지 않는 패키지를 만들었습니다.하나는 J.F.의 작업에 기초하고 있습니다.Sebastian(@jfs)의 답변입니다.다른 하나는 타임아웃을 체크하기 위한 스레드가 있는 단순한 communicate() 루프입니다.

stdout 캡처 방법 모두 Linux와 Windows 모두에서 작동하도록 테스트되었으며, Python 버전은 2.7에서 3.9까지입니다.

비차단 기능이기 때문에 여러 자녀 및 손자 프로세스, 그리고 Python 2.7 하에서도 타임아웃 적용을 보장합니다.

또한 이 패키지는 바이트 및 텍스트 stdout 인코딩을 모두 처리하므로 EOF를 잡으려고 할 때 악몽이 됩니다.

패키지는 https://github.com/netinvent/command_runner 에서 찾을 수 있습니다.

제대로 테스트된 논블로킹 읽기 구현이 필요한 경우 테스트해 보십시오(또는 코드 해킹).

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

는 에 ._poll_process() ★★★★★★★★★★★★★★★★★」_monitor_process()사용하는 포획 방법에 따라 달라집니다.여기서 원하는 것을 해킹하거나 전체 패키지를 사용하여 하위 프로세스 대체로 명령을 실행할 수 있습니다.

J. F. Sebastian의 솔루션을 기반으로 라이브러리를 만들었습니다.쓸 수 있어요.

https://github.com/cenkalti/what

J.F.에서 일해요.Sebastian의 답변과 다른 몇 가지 출처를 간단한 서브프로세스 매니저로 정리했습니다.또한 요청 논블로킹 판독과 여러 프로세스를 병렬로 실행할 수 있습니다.OS 고유의 콜을 사용하지 않기 때문에 어디서든 동작할 수 있습니다.

수 그냥 pypi로 .pip install shelljob예시와 전체 문서는 프로젝트 페이지를 참조하십시오.

편집: 이 구현은 여전히 차단됩니다.J.F.를 사용합니다.대신 세바스찬의 대답.

제가 한번 해봤는데상위 답변이지만 스레드 코드의 추가 위험과 유지보수가 우려됩니다.

를 훑어보다io module (및 2.6으로 제한됨)이 Buffered Reader를 찾았습니다.이것은 스레드리스 논블로킹 솔루션입니다.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

이것은 서브프로세스에서 인터랙티브명령어를 실행하는 예이며 stdout은 의사 단말기를 사용하여 인터랙티브합니다.https://stackoverflow.com/a/43012138/3555925 를 참조해 주세요.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

에서는 「」를 하고 있습니다.select모듈을 사용하여 IO 스트림에서 "사용 가능한 모든 데이터 읽기"를 수행합니다.이 함수는 처음에는 데이터를 사용할 수 있을 때까지 차단하지만 이후 사용할 수 있고 더 이상 차단되지 않는 데이터만 읽습니다.

는 '이 문양'을 에,select유닉스

코드는 PEP8에 완전히 준거하고 있다.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Jesse가 기술한 문제에 직면하여 Bradley, Andy 및 다른 사람들이 그랬던 것처럼 "select"를 사용하지만 비지 루프를 피하기 위해 블로킹 모드로 문제를 해결했습니다.가짜 스틴으로 더미 파이프를 사용합니다.블록을 선택하고 stdin 또는 파이프가 준비될 때까지 기다립니다.키를 누르면 선택 블록이 해제되고 키 값이 read(1)로 검색됩니다.다른 스레드가 파이프에 쓰이면 파이프가 선택 항목을 차단 해제하고 stdin의 필요성이 끝났음을 나타내는 것으로 간주할 수 있습니다.다음은 참조 코드입니다.

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

wexpect를 사용해 보십시오.이것은 pexpect의 윈도 대체입니다.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

다음은 python에서 논블로킹 읽기 및 백그라운드 쓰기를 지원하는 모듈입니다.

https://pypi.python.org/pypi/python-nonblock

기능을 제공합니다.

nonblock_read 스트림에서 데이터를 읽을 수 있는 경우 빈 문자열을 반환합니다(또는 스트림이 반대편에서 닫혀 있고 가능한 모든 데이터를 읽은 경우 없음).

python-subprocess2 모듈도 고려할 수 있습니다.

https://pypi.python.org/pypi/python-subprocess2

하위 프로세스 모듈에 추가됩니다.서브프로세스에서 반환된 오브젝트.Popen"은 runInBackground라는 추가 메서드가 추가되었습니다.그러면 스레드가 시작되고 메인 스레드를 차단하지 않고 stdout/stderr에 항목이 기록될 때 자동으로 채워지는 개체가 반환됩니다.

맛있게 드세요!

언급URL : https://stackoverflow.com/questions/375427/a-non-blocking-read-on-a-subprocess-pipe-in-python

반응형