Tengo un escenario donde necesito distribuir y procesar trabajos extremadamente rápido. Tendré cerca de 45 trabajos ocupados en la cola rápidamente y puedo procesar aproximadamente 20 simultáneamente (5 máquinas, 4 núcleos cada una). Cada trabajo requiere una cantidad de tiempo variable, y para complicar las cosas, la recolección de basura es un problema, así que tengo que ser capaz de llevar al consumidor fuera de línea para la recolección de basura.Suscribirse a una cola, recibir 1 mensaje y luego darse de baja
Actualmente, tengo todo funcionando con pop (cada consumidor aparece cada 5 ms). Esto parece indeseable porque se traduce en 600 solicitudes pop por segundo a rabbitmq.
Me encantaría que hubiera un comando pop que actuara como suscribirse, pero solo para un mensaje. (el proceso se bloquearía, a la espera de la entrada de la conexión rabbitMQ, a través de algo parecido a Kernel.select)
He intentado engañar a la gema AMQP para hacer algo como esto, pero no está funcionando: no puedo anular la suscripción hasta que la cola esté vacía y no se envíen más mensajes al consumidor. Otros métodos para cancelar la suscripción temo perderán mensajes.
consume_1.rb:
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
me lanzaré consume_many.rb y producer.rb. Los mensajes fluirán como se espera.
Cuando ejecuto consume_1.rb, recibe todos los demás mensajes (como se esperaba). Pero NUNCA cancela la suscripción porque nunca termina de procesar todos sus mensajes ... así sucesivamente.
¿Cómo obtengo consume_1.rb para suscribirme a la lista de espera, recibir un mensaje único y luego salir solo del anillo equilibrador de carga para que pueda hacer su trabajo, sin perder trabajos pendientes adicionales que puedan estar en la cola y de otra manera estarían programados para ser enviados al proceso?
Tim
Escribir un servidor que se comporte de esta manera sería estúpido, fácil en ruby ... ¿tal vez usar la solución zero-mq para implementar mi propio intermediario es la solución? –