2010-12-19 16 views
8

Tengo un servidor AMQP (RabbitMQ) del que me gustaría publicar y leer en un Tornado web server. Para hacer esto, pensé que usaría una biblioteca asíncrona de python amqp; en particular Pika (una variación de la que supuestamente es compatible con Tornado).Uso de Tornado con Pika para monitoreo de colas asíncronas

He escrito código que aparece a leer con éxito de la cola, excepto que al final de la solicitud, me sale una excepción (el navegador vuelve fina):

[E 101219 01:07:35 web:868] Uncaught exception GET/(127.0.0.1) 
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'}) 
    Traceback (most recent call last): 
     File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context 
     yield 
     File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext 
     yield 
     File "/usr/lib/python2.6/contextlib.py", line 113, in nested 
     yield vars 
     File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped 
     callback(*args, **kwargs) 
     File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events 
     self._handle_read() 
     File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read 
     self.on_data_available(chunk) 
     File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available 
     self.channels[frame.channel_number].frame_handler(frame) 
    KeyError: 1 

No estoy del todo seguro Estoy usando esta biblioteca correctamente, así que podría estar haciendo algo descaradamente mal. El flujo básico de mi código es:

  1. petición llega
  2. Crear conexión con RabbitMQ usando TornadoConnection; especifique una devolución de llamada
  3. En la devolución de llamada de conexión, cree un canal, declare/enlace mi cola, y llame a basic_consume; especifique una devolución de llamada
  4. En la devolución de llamada de consumo, cierre el canal y llame a la función de finalización de Tornado.
  5. Ver excepción.

Mis preguntas son algunas de ellas:

  1. Es este flujo uniforme correcto? No estoy seguro de cuál es el propósito de la devolución de llamada de conexión, excepto que no funciona si no lo uso.
  2. ¿Debo crear una conexión AMQP por solicitud web? La documentación de RabbitMQ sugiere que no, no debería, pero preferiría seguir creando canales. Pero, ¿dónde crearía la conexión y cómo intentaré reconectarla si se cortara brevemente?
  3. Si estoy creando una conexión AMQP por solicitud web, ¿dónde debería estar cerrando? Llamar a amqp.close() en mi devolución de llamada parece arruinar las cosas aún más.

Voy a tratar de tener un código de muestra un poco más tarde, pero los pasos que describí anteriormente muestran el lado consumidor de las cosas bastante completamente. También estoy teniendo problemas con la publicación, pero el consumo de colas es más apremiante.

+0

al ver el código en sí es mucho mejor que la lectura una explicación verbal de eso. –

Respuesta

0

Alguien ha informado de éxito en la fusión de Tornado y Pika here. Por lo que puedo decir, no es tan simple como llamar a Pika desde Tornado, ya que ambas bibliotecas quieren tener sus propios loops de eventos a cargo.

+0

Sí, estoy usando una variación de Pika que admite Tornado específicamente. Creo que lo he arreglado todo. Voy a publicar una solución más adelante si tengo confianza. –

8

Ayudaría a ver algún código fuente, pero utilizo este mismo módulo de pika para soportar tornado sin problema en más de un proyecto de producción.

No desea crear una conexión por solicitud. Cree una clase que envuelva todas sus operaciones AMQP y cree una instancia como singleton en el nivel de aplicación de tornado que se puede usar en todas las solicitudes (y entre los controladores de solicitudes). Hago esto en una función 'runapp()' que hace algunas cosas como esta y luego inicia el ioloop tornado principal.

Aquí hay una clase llamada 'Eventos'. Es una implementación parcial (en concreto, no me defino 'self.handle_event' aquí. Eso depende de usted.

class Event(object): 
    def __init__(self, config): 
    self.host = 'localhost' 
    self.port = '5672' 
    self.vhost = '/' 
    self.user = 'foo' 
    self.exchange = 'myx' 
    self.queue = 'myq' 
    self.recv_routing_key = 'msgs4me' 
    self.passwd = 'bar' 

    self.connected = False 
    self.connect() 


    def connect(self): 

    credentials = pika.PlainCredentials(self.user, self.passwd) 

    parameters = pika.ConnectionParameters(host = self.host, 
             port = self.port, 
             virtual_host = self.vhost, 
             credentials = credentials) 

    srs = pika.connection.SimpleReconnectionStrategy() 

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host, 
                   self.port)) 
    self.connection = tornado_adapter.TornadoConnection(parameters, 
                 wait_for_open = False, 
                 reconnection_strategy = srs, 
                 callback = self.on_connected) 

    def on_connected(self): 

    # Open the channel 
    logging.debug("Events: Opening a channel") 
    self.channel = self.connection.channel() 

    # Declare our exchange 
    logging.debug("Events: Declaring the %s exchange" % self.exchange) 
    self.channel.exchange_declare(exchange = self.exchange, 
           type = "fanout", 
           auto_delete = False, 
           durable = True) 

    # Declare our queue for this process 
    logging.debug("Events: Declaring the %s queue" % self.queue) 
    self.channel.queue_declare(queue = self.queue, 
          auto_delete = False, 
          exclusive = False, 
          durable = True) 


    # Bind to the exchange 
    self.channel.queue_bind(exchange = self.exchange, 
          queue = self.queue, 
          routing_key = self.recv_routing_key) 

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True) 

    # We should be connected if we made it this far 
    self.connected = True 

Y entonces me puso que en un archivo llamado 'events.py'.Mis RequestHandlers y cualquier código de back-end utilizan un módulo 'common.py' que envuelve el código que es útil para ambos (mis RequestHandlers no llaman directamente a ningún método de módulo amqp, igual para db, cache, etc.), así que definir 'sucesos = None' a nivel de módulo en common.py, y una instancia del objeto de sucesos poco como esto:

import events 

def runapp(config): 
    if myapp.common.events is None: 
     myapp.common.events = myapp.events.Event(config) 
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events) 
    http_server = tornado.httpserver.HTTPServer(app, 
              xheaders=config['HTTPServer']['xheaders'], 
              no_keep_alive=config['HTTPServer']['no_keep_alive']) 
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance() 
    logging.debug("MAIN IOLOOP: %s", main_loop) 
    main_loop.start() 

feliz año nuevo :-D