Tengo un archivo ejecutable al que llamo utilizando subproceso.Popen. Entonces, tengo la intención de alimentar algunos datos a través de stdin utilizando un hilo que lee su valor de una cola que luego se completará en otro hilo. La salida debe leerse utilizando la tubería stdout en otra secuencia y volver a clasificarse en una cola.python: lectura del resultado del subproceso en hilos
Por lo que entiendo de mi investigación anterior, el uso de hilos con Queue es una buena práctica.
El archivo ejecutable externo, desafortunadamente, no me dará una respuesta rápida para cada línea que está conectada, de modo que la escritura simple, los ciclos de lectura no son una opción. El ejecutable implementa algunos subprocesos múltiples internos y deseo el resultado tan pronto como esté disponible, por lo tanto, el hilo lector adicional.
A modo de ejemplo para probar el ejecutable se acaba de mezclar cada línea (shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Tenga en cuenta que esto ya es sin almacenamiento como sea posible. ¿O no? Este es mi simplificada programa de pruebas:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
esto dará la salida siguiente (python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
entonces nada durante 2 segundos ...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
La intercalación al principio se espera. Sin embargo, la salida del subproceso no aparece hasta que después de finalice el subproceso. Con más líneas conectadas obtengo algo de salida, así que asumo un problema de almacenamiento en caché en la tubería estándar. De acuerdo con otras preguntas publicadas aquí, flushing stdout (en el subproceso) debería funcionar, al menos en Linux.
¡Gracias, esa es la solución! – muckl
¿Puedo preguntar por qué la mezcla de subprocesos y subprocesos es un enfoque tan terrible? Parece más elegante que llamar a E/S sin bloqueo una y otra vez mientras nada sucede. Obviamente, los hilos no deberían tener acceso a ninguna estructura de datos que no sea segura contra hilos, pero leer y escribir desde o hacia una cola parece seguro. ¿Los cambios en el backport de Python3.2 son importantes para un caso tan simple como el mío? – muckl
El problema con subprocesos y subprocesos específicamente es el problema de mezclar hilos y tenedor. Ver http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them y otros artículos similares. El backport del subproceso Python 3.2 trabaja en torno a esos problemas. En cuanto a los hilos en general, el principal problema es que son difíciles de controlar y depurar. Por ejemplo, no puede matarlos desde "afuera" del hilo, de modo que si un hilo está atascado en una lectura o escritura, no hay nada que pueda hacer al respecto. –