2011-06-05 5 views
5

Aquí está mi script.zero mq pub/sub con multipart no funciona


#!/usr/bin/env python 

import traceback 
import sys 
import zmq 
from time import sleep 

print "Creating the zmq.Context" 
context = zmq.Context() 

print "Binding the publisher to the local socket at port 5557" 
sender = context.socket(zmq.PUB) 
sender.bind("tcp://*:5557") 

print "Binding the subscriber to the local socket at port 5557" 
receiver = context.socket(zmq.SUB) 
receiver.connect("tcp://*:5557") 

print "Setting the subscriber option to get only those originating from \"B\"" 
receiver.setsockopt(zmq.SUBSCRIBE, "B") 

print "Waiting a second for the socket to be created." 
sleep(1) 

print "Sending messages" 
for i in range(1,10): 
    msg = "msg %d" % (i) 
    env = None 
    if i % 2 == 0: 
     env = ["B", msg] 
    else: 
     env = ["A", msg] 
    print "Sending Message: ", env 
    sender.send_multipart(env) 

print "Closing the sender." 
sender.close() 

failed_attempts = 0 
while failed_attempts < 3: 
    try: 
     print str(receiver.recv_multipart(zmq.NOBLOCK)) 
    except: 
     print traceback.format_exception(*sys.exc_info()) 
     failed_attempts += 1 

print "Closing the receiver." 
receiver.close() 

print "Terminating the context." 
context.term() 

""" 
Output: 

Creating the zmq.Context 
Binding the publisher to the local socket at port 5557 
Binding the subscriber to the local socket at port 5557 
Setting the subscriber option to get only those originating from "B" 
Waiting a second for the socket to be created. 
Sending messages 
Sending Message: ['A', 'msg 1'] 
Sending Message: ['B', 'msg 2'] 
Sending Message: ['A', 'msg 3'] 
Sending Message: ['B', 'msg 4'] 
Sending Message: ['A', 'msg 5'] 
Sending Message: ['B', 'msg 6'] 
Sending Message: ['A', 'msg 7'] 
Sending Message: ['B', 'msg 8'] 
Sending Message: ['A', 'msg 9'] 
Closing the sender. 
['B', 'msg 2'] 
['B', 'msg 4'] 
['B', 'msg 6'] 
['B', 'msg 8'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
Closing the receiver. 
Terminating the context. 
""" 

Y, la pregunta es ... ¿por qué este código no funciona?

[EDIT] Después de conseguir una respuesta súper rápido la lista de correo zeromq, He actualizado el código de seguridad.

Respuesta

8

Crédito: Chuck Remes

Puede que necesite un "reposo" entre los pasos de creación hueca (bind, conectar, setsockopt) y la transmisión real de los mensajes. Las operaciones de conexión de vinculación & son asíncronas, por lo que es posible que no se completen cuando llegue a la lógica que envía todos los mensajes. En ese caso, todos los mensajes enviados a través de la toma PUB será cayeron desde una operación de zmq_bind() no crea una cola hasta que otro socket se ha conectado correctamente a la misma.

Como nota al margen, no es necesario para crear contextos 2 en este ejemplo. Ambos enchufes se pueden crear dentro del mismo contexto. No duele, pero tampoco es necesario.

Crédito: Pieter

Hay un "solucionador de problemas" al final del Ch1 que explica esto.

Algunos tipos de socket (ROUTER y PUB) colocarán silenciosamente mensajes para que no tienen destinatarios. La conexión es, como dijo Chuck, asíncrona y toma aproximadamente 100msec. Si inicia dos hilos, una el , conecte el otro, y luego comience inmediatamente a enviar datos sobre un tipo de socket, perderá los primeros 100msec de datos (aproximadamente).

Haciendo un sueño es un brutal "probar que funciona" opción. Realísticamente, se sincronizaría de alguna manera, o (más típicamente) esperaría la pérdida de mensaje como parte del inicio normal (es decir, ver los datos publicados como una emisión pura sin inicio ni final definitivo).

Ver actualización del tiempo ejemplo, syncpub y syncsub para más detalles.