2012-03-21 19 views
9

Tengo un archivo ejecutable al que llamo utilizando subproceso.Popen. Entonces, tengo la intención de alimentar algunos datos a través de stdin utilizando un hilo que lee su valor de una cola que luego se completará en otro hilo. La salida debe leerse utilizando la tubería stdout en otra secuencia y volver a clasificarse en una cola.python: lectura del resultado del subproceso en hilos

Por lo que entiendo de mi investigación anterior, el uso de hilos con Queue es una buena práctica.

El archivo ejecutable externo, desafortunadamente, no me dará una respuesta rápida para cada línea que está conectada, de modo que la escritura simple, los ciclos de lectura no son una opción. El ejecutable implementa algunos subprocesos múltiples internos y deseo el resultado tan pronto como esté disponible, por lo tanto, el hilo lector adicional.

A modo de ejemplo para probar el ejecutable se acaba de mezclar cada línea (shuffleline.py):

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

Tenga en cuenta que esto ya es sin almacenamiento como sea posible. ¿O no? Este es mi simplificada programa de pruebas:

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

esto dará la salida siguiente (python2.7):

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

entonces nada durante 2 segundos ...

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

La intercalación al principio se espera. Sin embargo, la salida del subproceso no aparece hasta que después de finalice el subproceso. Con más líneas conectadas obtengo algo de salida, así que asumo un problema de almacenamiento en caché en la tubería estándar. De acuerdo con otras preguntas publicadas aquí, flushing stdout (en el subproceso) debería funcionar, al menos en Linux.

Respuesta

7

Su problema no tiene nada que ver el módulo subprocess o hilos (problemático, ya que son), o incluso sub-procesos de mezcla y los hilos (una mala idea muy , incluso peor que el uso de hilos para empezar, a menos que esté utilizando el módulo de backport subproceso de Python 3.2 que se puede obtener a partir code.google.com/p/python-subprocess32) o el acceso a las mismas cosas desde varios subprocesos (como sus declaraciones print hacen.)

lo que pasa es que su shuffleline.py memorias intermedias del programa. No en salida, sino en entrada. Aunque no es muy obvio, cuando iteras sobre un objeto de archivo, Python lo leerá en bloques, generalmente 8k bytes.Desde sys.stdin es una FileObject, el bucle for almacenará hasta EOF o un bloque completo:

for line in sys.stdin: 
    line = line.strip() 
    .... 

Si desea no hacerlo, o bien utilizar un bucle while para llamar sys.stdin.readline() (que devuelve '' para EOF):

while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

o utilizar el formulario de dos argumentos de iter(), lo que crea un iterador que llama al primer argumento hasta que el segundo argumento (el "centinela") se devuelve:

for line in iter(sys.stdin.readline, ''): 
    line = line.strip() 
    ... 

También sería negligente si no sugiriera no usar hilos para esto, sino E/S sin bloqueo en las tuberías del subproceso, o incluso algo como twisted.reactor.spawnProcess que tiene muchas formas de conectar procesos y otras cosas juntas como consumidores y productores.

+0

¡Gracias, esa es la solución! – muckl

+1

¿Puedo preguntar por qué la mezcla de subprocesos y subprocesos es un enfoque tan terrible? Parece más elegante que llamar a E/S sin bloqueo una y otra vez mientras nada sucede. Obviamente, los hilos no deberían tener acceso a ninguna estructura de datos que no sea segura contra hilos, pero leer y escribir desde o hacia una cola parece seguro. ¿Los cambios en el backport de Python3.2 son importantes para un caso tan simple como el mío? – muckl

+3

El problema con subprocesos y subprocesos específicamente es el problema de mezclar hilos y tenedor. Ver http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them y otros artículos similares. El backport del subproceso Python 3.2 trabaja en torno a esos problemas. En cuanto a los hilos en general, el principal problema es que son difíciles de controlar y depurar. Por ejemplo, no puede matarlos desde "afuera" del hilo, de modo que si un hilo está atascado en una lectura o escritura, no hay nada que pueda hacer al respecto. –

Cuestiones relacionadas