2012-09-02 32 views
8

Estoy depurando una aplicación que reúne información de 2 sensores: una cámara web y un micrófono.¿Qué podría hacer un bloque connection.send()? (desde conn1, conn2 = multiprocesamiento.Pipe())

La arquitectura general es bastante simple:

  • el proceso principal envía mensajes (inicio, parada, get_data) a través de tuberías a los procesos secundarios (uno para cada uno).
  • procesos hijos se reúnen los datos y los envían al proceso principal

Niño & procesos principales están en bucles infinitos para procesar comandos (el principal proceso por parte del usuario, el proceso hijo del proceso principal).

Funciona globalmente, pero tengo problemas para detener los procesos secundarios.

He conectado el código y lo que parece suceder 2 cosas:

  1. El mensaje de 'stop' se envía, pero no consigue a través de la tubería.
  2. El proceso hijo continúa enviando datos y los bloques de conn.send (datos).

El comportamiento está claramente relacionado con el estado de la conexión, ya que los procesos secundarios que no devuelven nada no tienen este comportamiento. Aún así, no veo cómo depurar/modificar la arquitectura actual que parece razonable.

Entonces, ¿qué causa este comportamiento de bloqueo y cómo evitarlo?

Este es el código que se ejecuta para cada iteración del bucle infinito en el proceso hijo:

def do(self): 
     while self.cnx.poll(): 
      msg = self.cnx.recv() 
      self.queue.append(msg) 
     #== 
     if not self.queue: 
      func_name = 'default_action' 
      self.queue.append([func_name, ]) 
     #== 
     msg    = self.queue.pop() 
     func_name, args = msg[0], msg[1:] 
     #== 
     res = self.target.__getattribute__(func_name)(*args) 
     #== 
     running = func_name != 'stop' 
     #== 
     if res and self.send: 
      assert running 
      self.output_queue.append(res[0]) 
     if self.output_queue and running: 
      self.cnx.send(self.output_queue.popleft()) 
     #== 
     return running 

actualización: parece que la tubería no se puede escribir simultáneamente en ambos extremos. Funciona si cambiar las últimas líneas del código anterior para:

 if self.output_queue and running: 
      if not self.cnx.poll(): 
       self.cnx.send(self.output_queue.popleft()) 

La pregunta permanece abierta, aunque como los de tuberías están documentados como dúplex completo por defecto y este comportamiento no está documentado en absoluto. Debo haber entendido mal algo. Por favor, ilumíname!

actualización 2: para que quede claro, no hay conexión cerrada durante esta situación. Para describir la secuencia de eventos:

  • el proceso principal envía una messsage ("stop") (que se vacía la conexión antes de enviar el mensaje)
  • el proceso principal introduzca un bucle (infinita) que se detiene cuando el el proceso hijo finaliza.
  • Mientras tanto, el proceso hijo está bloqueado en el envío y nunca recibe el mensaje.
+1

¿Podría publicar su código? De lo contrario, es difícil resolver su problema. –

+0

He agregado el código del proceso hijo. (El padre solo está enviando el mensaje). – LBarret

+1

¿En qué SO se está ejecutando? 'multiprocesamiento.Pipe' se implementa de forma bastante diferente (y con algunas semánticas diferentes, creo) en Windows. – Blckknght

Respuesta

4

Un dúplex completo multiprocessing.Pipe se implementa como socketpair(). Llamar al .send puede bloquear por todos los motivos normales al hablar con un socket.Según su descripción, creo que es probable que el lector de su Pipe haya dejado de leer y los datos se hayan acumulado en los almacenamientos intermedios en el kernel hasta el punto donde se encuentran sus bloques .send.

Si explícitamente .close el lado de recepción probablemente obtendrá algún tipo de error (aunque posiblemente SIGPIPE también, no estoy seguro) cuando intente .send. Si su conexión de recepción saliera del alcance esto probablemente sucedería automáticamente. Es posible que pueda solucionar el problema simplemente teniendo más cuidado de no almacenar referencias (directas o indirectas) en el lado receptor para que se desasigne cuando el hilo se vaya.

demostración Trivial de bloquear .send:

import multiprocessing 

a, b = multiprocessing.Pipe() 

while True: 
    print "send!" 
    a.send("hello world") 

Ahora en cuenta que después de un tiempo se deja la impresión de "enviar"

+0

Lo consideré pero tanto el receptor como el emisor siguen vivos y no se cierra ninguna conexión. Actualicé el texto principal. – LBarret

Cuestiones relacionadas