2011-04-15 10 views
8

Tengo un escenario donde necesito distribuir y procesar trabajos extremadamente rápido. Tendré cerca de 45 trabajos ocupados en la cola rápidamente y puedo procesar aproximadamente 20 simultáneamente (5 máquinas, 4 núcleos cada una). Cada trabajo requiere una cantidad de tiempo variable, y para complicar las cosas, la recolección de basura es un problema, así que tengo que ser capaz de llevar al consumidor fuera de línea para la recolección de basura.Suscribirse a una cola, recibir 1 mensaje y luego darse de baja

Actualmente, tengo todo funcionando con pop (cada consumidor aparece cada 5 ms). Esto parece indeseable porque se traduce en 600 solicitudes pop por segundo a rabbitmq.

Me encantaría que hubiera un comando pop que actuara como suscribirse, pero solo para un mensaje. (el proceso se bloquearía, a la espera de la entrada de la conexión rabbitMQ, a través de algo parecido a Kernel.select)

He intentado engañar a la gema AMQP para hacer algo como esto, pero no está funcionando: no puedo anular la suscripción hasta que la cola esté vacía y no se envíen más mensajes al consumidor. Otros métodos para cancelar la suscripción temo perderán mensajes.

consume_1.rb:

require "amqp" 

EventMachine.run do 
    puts "connecting..." 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    queue.subscribe do |payload| 
    puts "Received a message: #{payload}." 
    queue.unsubscribe { puts "unbound" } 
    sleep 3 
    end 
end 

consumer_many.rb:

require "amqp" 

# Imagine the command is something CPU - intensive like image processing. 
command = "sleep 0.1" 

EventMachine.run do 
    puts "connecting..." 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    queue.subscribe do |payload| 
    puts "Received a message: #{payload}." 
    end 
end 

producer.rb:

require "amqp" 

i = 0 
EventMachine.run do 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    EM.add_periodic_timer(1) do 
    msg = "Message #{i}" 
    i+=1 
    puts "~ publishing #{msg}" 
    end 
end 

me lanzaré consume_many.rb y producer.rb. Los mensajes fluirán como se espera.

Cuando ejecuto consume_1.rb, recibe todos los demás mensajes (como se esperaba). Pero NUNCA cancela la suscripción porque nunca termina de procesar todos sus mensajes ... así sucesivamente.

¿Cómo obtengo consume_1.rb para suscribirme a la lista de espera, recibir un mensaje único y luego salir solo del anillo equilibrador de carga para que pueda hacer su trabajo, sin perder trabajos pendientes adicionales que puedan estar en la cola y de otra manera estarían programados para ser enviados al proceso?

Tim

+0

Escribir un servidor que se comporte de esta manera sería estúpido, fácil en ruby ​​... ¿tal vez usar la solución zero-mq para implementar mi propio intermediario es la solución? –

Respuesta

13

Ésta es una característica de la gema AMQP simple, pero muy mal documentado, lo que necesita es la siguiente:

En su consumidor:

channel = AMQP::Channel.new(connection, :prefetch => 1) 

Y luego con su suscripción bloquear, hacer:

queue.subscribe(:ack => true) do |queue_header, payload| 
    puts "Received a message: #{payload}." 
    # Do long running work here 

    # Acknowledge message 
    queue_header.ack 
end 

¿Qué le dice esto a RabbitMQ que solo envíe su consumidor 1 mensaje a la vez, y no envíe otro mensaje hasta que llame al ack en el encabezado de la cola después de trabajando en una tarea larga por un tiempo.

Podría ser necesario corregir esto, pero creo que un intercambio direct sería más adecuado para esta tarea.

+0

Ivan, eres un genio. He estado buscando esto por varios meses. ¡GRACIAS! –

+0

No estoy medio sorprendido, estaba golpeando mi cabeza contra el escritorio durante meses antes de descifrarlo por leer el código fuente. – Ivan

+0

Ivan - No creo que el tipo de intercambio realmente importe aquí. Su problema está en el nivel de la cola porque parece que quiere que varios consumidores trabajen desde la misma cola, cada uno por turno para extraer un único elemento de trabajo y procesarlo. Por lo tanto, el tipo de intercambio en uso no importa porque el intercambio está esencialmente fuera de la imagen una vez que el mensaje se inserta en la cola. –

1

con el medio ambiente que tengo,

versión RabbitMQ: 3.3.3

amqp versión joya: 1.5.0

La solución de Ivan todavía resultó en todos los mensajes que se obtienen de la cola.

En cambio, el número de mensajes no confirmados mediante la suscripción a una cola puede limitarse configurando la QoS de un canal.

De acuerdo con el documento de la API de AMQP :: Canal,

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object 

Una nota para el método en el que, si está ejecutando servidores RabbitMQ después de la versión 2.3.6, prefetch_size está en desuso.

channel = AMQP::Channel.new(connection, :prefetch => 1) 
channel.qos(0, 1) 

queue = channel.queue(queue_name, :auto_delete => false) 
queue.subscribe(:ack => true) do |metadata, payload| 
    puts "Received a message: #{payload}." 

    # Do long running work here 

    # Acknowledge message 
    metadata.ack 
end 

Espero que la solución ayude a alguien.

Saludos.

+0

Esta línea hizo el truco. Gracias. 'channel.qos (0, 1)' – mipmip

Cuestiones relacionadas