하위 프로세스에 대한 비차단 읽기입니다.Python의 파이프
하위 프로세스 모듈을 사용하여 하위 프로세스를 시작하고 해당 출력 스트림(표준 출력)에 연결합니다.표준 출력으로 논블로킹 판독을 실행할 수 있도록 하고 싶다.호출하기 전에 .readline을 논블로킹하거나 스트림에 데이터가 있는지 확인하는 방법이 있습니까?.readline
Windows Linux の 에에에에에에에에에에에에 。
.readline
「 」 「 」 、 「 」
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
fcntl
, 이 경우 도움이 되지 않습니다.
운영체제에 관계없이 차단하지 않고 스트림을 읽는 신뢰성 높은 방법은 다음과 같습니다.
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Python (stdin)에서 사용자 입력을 동시에 받아들이면서 주요 기능을 실행할 수 있어야 합니다. 처리 되지 않습니다.readline()
차단하고 타임아웃은 없습니다.을 더 종료하고 , 수 . 왜냐하면 "Doese"는 "Doese"를 사용하기 때문입니다.readline()
다른 스레드에서 아직 회선을 기다리고 있습니다.로 만드는 입니다.fcntl 모듈입니다.
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
제 생각에는 이 문제를 해결하기 위해 선택 모듈이나 신호 모듈을 사용하는 것보다 조금 더 깨끗하지만 UNIX에서만 작동합니다.
Python 3.4는 비동기 IO -- 모듈을 위한 새로운 잠정 API를 도입했습니다.
이 접근법은 @Bryan Ward의 기반 응답과 유사합니다. 프로토콜을 정의하면 데이터가 준비되는 즉시 메서드가 호출됩니다.
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
수준의 가 있습니다.asyncio.create_subprocess_exec()
coroutine(/ Python 3.5+ 구문 포함)await
을 사용하여 비동기적으로 행을 읽을 수 있는 개체를 반환합니다.
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
는 다음합니다.
- 시작 하위 프로세스, 파이프에 stdout 리디렉션
- 서브프로세스의 stdout에서 비동기적으로 행을 읽다
- 킬 서브프로세스
- 그것이 끝나기를 기다리다.
필요에 따라서, 각 스텝을 타임 아웃 초단위로 제한할 수 있습니다.
Unix 계열의 시스템과 Python 3.5+에서는, 그것이 말하는 대로 동작합니다.
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
출력은 다음과 같습니다.
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
★★★★★★★★★★★★★★★★ os.set_blocking
★★★★★★★★★★★★★★★★★★:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
비동기 모듈을 시험해 보겠습니다.예를 들어 다음과 같습니다.
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
모듈은 S의 제안에 따라 모든 스레딩을 처리합니다.많이.
트위스트에서는 정말 쉽게 할 수 있어요.기존 코드 베이스에 따라서는 사용하기 쉽지 않을 수 있지만, 트위스트된 애플리케이션을 구축하고 있다면 이와 같은 것은 거의 하찮아집니다.작성하다ProcessProtocol
.outReceived()
에 따라 다름)은 보통 큰 꼬임일 뿐입니다.select()
다른 파일 기술자(많은 경우 네트워크 소켓)의 데이터를 처리하기 위해 콜백이 설치된 루프입니다. 그...outReceived()
는 단순히 하여 콜백으로 입니다.STDOUT
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Twisted 문서에는 이에 대한 몇 가지 유용한 정보가 있습니다.
Twisted를 기반으로 전체 애플리케이션을 구축하면 로컬이든 원격이든 다른 프로세스와 비동기식으로 통신할 수 있습니다. 이렇게 우아합니다.한편, 프로그램이 Twisted를 기반으로 구축되어 있지 않은 경우에는 그다지 도움이 되지 않습니다.이것이 당신의 특정 응용 프로그램에 적용되지 않더라도 다른 독자들에게 도움이 될 수 있기를 바랍니다.
선택 및 읽기(1)를 사용합니다.
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
readline()-like의 경우:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
한 가지 해결책은 다른 프로세스를 만들어 프로세스의 판독을 수행하거나 프로세스의 스레드를 타임아웃으로 만드는 것입니다.
타임아웃 함수의 스레드 버전은 다음과 같습니다.
http://code.activestate.com/recipes/473878/
단, stdout이 들어올 때 읽을 필요가 있습니까?다른 해결 방법은 출력을 파일에 덤프하고 프로세스가 p.wait()를 사용하여 완료될 때까지 기다리는 것입니다.
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
부분 라인을 포함하여 하위 프로세스에서 나오는 모든 출력을 최대한 빨리 캡처하는 데 사용되는 내 코드입니다.펌프는 거의 올바른 순서로 stdout과 stderr를 동시에 펌프합니다.
Python 2.7 Linux 및 Windows에서 테스트 및 올바르게 동작.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
현대의 Python은 상황이 훨씬 더 좋다.
다음은 간단한 어린이 프로그램인 "hello.py"입니다.
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
그리고 그것과 상호작용할 수 있는 프로그램:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
다음과 같이 출력:
b'hello bob\n'
b'hello alice\n'
실제 패턴은 여기에서도 관련 질문에서도 거의 모든 이전 답변에 따라 달라집니다.실제 패턴은 자녀의 stdout 파일 기술자를 non-blocking으로 설정한 후 일종의 선택 루프에서 폴링하는 것입니다.물론 오늘날에는 이 루프가 비동기식으로 제공됩니다.
면책사항: 이는 토네이도에 대해서만 적용됩니다.
이를 수행하려면 fd를 nonblocking으로 설정하고 ioloop을 사용하여 콜백을 등록합니다.토네이도_서브프로세스라는 에그에 포장해서 파이를 통해 설치할 수 있습니다.PI:
easy_install tornado_subprocess
이제 다음과 같은 작업을 수행할 수 있습니다.
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
RequestHandler와 함께 사용할 수도 있습니다.
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
기존 솔루션이 작동하지 않았습니다(자세한 내용은 아래 참조).마지막으로 성공한 것은 read(1)를 사용하여 readline을 구현하는 것이었습니다(이 답변에 근거).후자는 차단하지 않습니다.
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
기존 솔루션이 작동하지 않는 이유:
- 읽기 행이 필요한 솔루션(큐 기반 솔루션 포함)은 항상 차단됩니다.readline을 실행하는 스레드를 끊는 것은 어렵습니다(불가능).생성된 프로세스가 종료되었을 때만 중지되고 출력 생성 프로세스가 종료되었을 때는 중지되지 않습니다.
- anonnn이 지적한 바와 같이 낮은 수준의 fcntl과 높은 수준의 읽기 회선 콜을 조합하면 올바르게 동작하지 않을 수 있습니다.
- select.poll()을 사용하는 것은 간단하지만 python 문서에 따르면 Windows에서는 작동하지 않습니다.
- 서드파티 라이브러리를 사용하면 이 태스크에 과도한 영향을 미쳐 종속성을 추가할 수 있습니다.
하위 프로세스를 읽기 위해 이 문제를 추가합니다.펑펑 터지다.다음은 내 논블로킹 읽기 솔루션입니다.
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
다음은 스레드에 기반한 간단한 솔루션입니다.
- 와 에서 동작합니다( 에 의존하지 ).
select
를 참조해 주세요. - 다 읽다
stdout
★★★★★★★★★★★★★★★★★」stderr
비동기적으로 - 는 임의의 대기시간(CPU 친화적)에 의한 액티브 폴링에 의존하지 않습니다.
- 않다
asyncio
(서양간) - 는자프로세스가종료될때까지실행됩니다.
프린터.화이
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
이 버전의 논블로킹 읽기에는 특별한 모듈이 필요하지 않으며 대부분의 Linux Distros에서 즉시 사용할 수 있습니다.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
원래 질문자의 문제가 있지만 스레드를 호출하고 싶지 않습니다..read()
파이프와 라인 판독을 위한 나만의 버퍼링(단, 서브프로세스 ping은 항상 완전한 라인< a system page size)을 작성합니다).고브젝트 등록 IO 워치로 읽기만 하면 통화중 대기를 피할 수 있습니다.요즘은 스레드를 피하기 위해 고브젝트 MainLoop 내에서 코드를 실행합니다.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
감시자는
def watch(f, *other):
print 'reading',f.read()
return True
그리고 메인 프로그램은 ping을 설정하고 나서 gobject 메일 루프를 호출한다.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
기타 작업은 gobject의 콜백에 첨부됩니다.
여기에 이 답변을 추가하면 Windows 및 Unix에서 논블로킹 파이프를 설정할 수 있습니다.
★★★★★★★★★★★★★★★★★.ctypes
자세한 내용은 @techtonik의 답변 덕분입니다.
Unix 시스템과 Windows 시스템 모두에서 사용할 수 있도록 약간 수정된 버전이 있습니다.
- Python3 호환(작은 변경만 필요).
- posix 버전을 포함하며 둘 중 하나에 사용할 예외를 정의합니다.
이렇게 하면 Unix 및 Windows 코드에 동일한 기능과 예외를 사용할 수 있습니다.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
불완전한 데이터를 읽는 것을 피하기 위해, 나는 나만의 readline generator(각 행의 바이트 문자열을 반환하는)를 쓰게 되었습니다.
발전기라서 예를 들면...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
선택 모듈을 통해 다음 유용한 입력 위치를 확인할 수 있습니다.
하지만, 여러분은 거의 항상 분리된 실이 더 행복합니다.하나는 stdin을 읽고, 다른 하나는 당신이 차단하고 싶지 않은 곳을 읽습니다.
왜 실타래를 괴롭히는가?readline()과 달리 BufferedReader.read1()은 \r\n을 기다리는 것을 차단하지 않습니다.출력이 착신되면 가능한 한 빨리 반환됩니다.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
저 같은 경우에는 백그라운드 어플리케이션의 출력을 캡처하여 확대하는 로깅 모듈이 필요했습니다(타임 스탬프, 색상 추가 등).
실제 I/O를 수행하는 백그라운드 스레드를 갖게 되었습니다.다음 코드는 POSIX 플랫폼 전용입니다.나는 불필요한 부분을 제거했다.
이 기능을 장기간 사용하는 경우 오픈 디스크립터 관리를 고려해 주십시오.저 같은 경우에는 큰 문제가 되지 않았습니다.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
실행 중인 프로세스에서 stdout과 stderr를 모두 수집하고 싶었지만 위젯에서 출력을 생성하면서 렌더링하고 싶었기 때문에 문제는 조금 다릅니다.
큐 또는 추가 스레드를 사용하여 제안된 많은 회피책을 사용하고 싶지 않습니다.이는 다른 스크립트를 실행하여 출력을 수집하는 일반적인 작업을 수행할 필요가 없기 때문입니다.
제안 솔루션과 python 문서를 읽고 아래 구현으로 문제를 해결했습니다. 저는 POSIX를 하고 있기 할 수 있습니다.select
함수 호출
이러한 일반적인 스크립트 작업에서는 문서가 혼란스럽고 구현이 어색하다는 것에 동의합니다.합니다.Popen
설명이 달라서 혼란이 많았어요.2. 와.5.2 의으로 동작하고 것 같습니다.
열쇠는 '세팅'이었다.bufsize=1
는, 다음에 「」를 참조해 주세요.universal_newlines=True
를 bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG 및 VERBOSE는 단순히 출력을 단말기에 인쇄하는 매크로입니다.
IMHO 99.99%를 사용하기 에 IMHO 의 효과가 .readline
따라서 서브프로세스가 양호하다고 가정하고 완전한 라인을 출력합니다.
저는 아직 Python에 익숙하지 않기 때문에 솔루션 개선을 위한 피드백을 환영합니다.
처음은 아니지만 아마도 마지막은 아닐 것입니다.저는 두 가지 다른 방법으로 stdout PIPE 읽기를 차단하지 않는 패키지를 만들었습니다.하나는 J.F.의 작업에 기초하고 있습니다.Sebastian(@jfs)의 답변입니다.다른 하나는 타임아웃을 체크하기 위한 스레드가 있는 단순한 communicate() 루프입니다.
stdout 캡처 방법 모두 Linux와 Windows 모두에서 작동하도록 테스트되었으며, Python 버전은 2.7에서 3.9까지입니다.
비차단 기능이기 때문에 여러 자녀 및 손자 프로세스, 그리고 Python 2.7 하에서도 타임아웃 적용을 보장합니다.
또한 이 패키지는 바이트 및 텍스트 stdout 인코딩을 모두 처리하므로 EOF를 잡으려고 할 때 악몽이 됩니다.
패키지는 https://github.com/netinvent/command_runner 에서 찾을 수 있습니다.
제대로 테스트된 논블로킹 읽기 구현이 필요한 경우 테스트해 보십시오(또는 코드 해킹).
pip install command_runner
from command_runner import command_runner
exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')
는 에 ._poll_process()
★★★★★★★★★★★★★★★★★」_monitor_process()
사용하는 포획 방법에 따라 달라집니다.여기서 원하는 것을 해킹하거나 전체 패키지를 사용하여 하위 프로세스 대체로 명령을 실행할 수 있습니다.
J. F. Sebastian의 솔루션을 기반으로 라이브러리를 만들었습니다.쓸 수 있어요.
https://github.com/cenkalti/what
J.F.에서 일해요.Sebastian의 답변과 다른 몇 가지 출처를 간단한 서브프로세스 매니저로 정리했습니다.또한 요청 논블로킹 판독과 여러 프로세스를 병렬로 실행할 수 있습니다.OS 고유의 콜을 사용하지 않기 때문에 어디서든 동작할 수 있습니다.
수 그냥 pypi로 .pip install shelljob
예시와 전체 문서는 프로젝트 페이지를 참조하십시오.
편집: 이 구현은 여전히 차단됩니다.J.F.를 사용합니다.대신 세바스찬의 대답.
제가 한번 해봤는데상위 답변이지만 스레드 코드의 추가 위험과 유지보수가 우려됩니다.
를 훑어보다io module (및 2.6으로 제한됨)이 Buffered Reader를 찾았습니다.이것은 스레드리스 논블로킹 솔루션입니다.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
이것은 서브프로세스에서 인터랙티브명령어를 실행하는 예이며 stdout은 의사 단말기를 사용하여 인터랙티브합니다.https://stackoverflow.com/a/43012138/3555925 를 참조해 주세요.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
에서는 「」를 하고 있습니다.select
모듈을 사용하여 IO 스트림에서 "사용 가능한 모든 데이터 읽기"를 수행합니다.이 함수는 처음에는 데이터를 사용할 수 있을 때까지 차단하지만 이후 사용할 수 있고 더 이상 차단되지 않는 데이터만 읽습니다.
는 '이 문양'을 에,select
유닉스
코드는 PEP8에 완전히 준거하고 있다.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Jesse가 기술한 문제에 직면하여 Bradley, Andy 및 다른 사람들이 그랬던 것처럼 "select"를 사용하지만 비지 루프를 피하기 위해 블로킹 모드로 문제를 해결했습니다.가짜 스틴으로 더미 파이프를 사용합니다.블록을 선택하고 stdin 또는 파이프가 준비될 때까지 기다립니다.키를 누르면 선택 블록이 해제되고 키 값이 read(1)로 검색됩니다.다른 스레드가 파이프에 쓰이면 파이프가 선택 항목을 차단 해제하고 stdin의 필요성이 끝났음을 나타내는 것으로 간주할 수 있습니다.다음은 참조 코드입니다.
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
wexpect를 사용해 보십시오.이것은 pexpect의 윈도 대체입니다.
import wexpect
p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.') // regex pattern of any character
output_str = p.after()
다음은 python에서 논블로킹 읽기 및 백그라운드 쓰기를 지원하는 모듈입니다.
https://pypi.python.org/pypi/python-nonblock
기능을 제공합니다.
nonblock_read 스트림에서 데이터를 읽을 수 있는 경우 빈 문자열을 반환합니다(또는 스트림이 반대편에서 닫혀 있고 가능한 모든 데이터를 읽은 경우 없음).
python-subprocess2 모듈도 고려할 수 있습니다.
https://pypi.python.org/pypi/python-subprocess2
하위 프로세스 모듈에 추가됩니다.서브프로세스에서 반환된 오브젝트.Popen"은 runInBackground라는 추가 메서드가 추가되었습니다.그러면 스레드가 시작되고 메인 스레드를 차단하지 않고 stdout/stderr에 항목이 기록될 때 자동으로 채워지는 개체가 반환됩니다.
맛있게 드세요!
언급URL : https://stackoverflow.com/questions/375427/a-non-blocking-read-on-a-subprocess-pipe-in-python
'programing' 카테고리의 다른 글
부울을 문자열로 변환하는 방법 (0) | 2022.10.02 |
---|---|
Bootstrap Vue 테이블의 첫 번째 필드로 확인란을 설정하는 방법 (0) | 2022.10.02 |
물음표와 함께 검은색 다이아몬드가 거의 표시되지 않는 PHP 출력 (0) | 2022.09.26 |
대부분의 거대한 방법은 존재하지 않을지도 모른 파일을 삭제합니다. (0) | 2022.09.26 |
필드에 $x가 포함되지 않는 SQL 쿼리 (0) | 2022.09.26 |