2012-09-14 14 views
11

Tengo un cliente python worker que genera 10 trabajadores que se enganchan en una cola RabbitMQ. Un poco como esto:Pika + RabbitMQ: establecer basic_qos para prefetch = 1 todavía parece consumir todos los mensajes en la cola

#!/usr/bin/python 
worker_count=10 

def mqworker(queue, configurer): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue=qname, durable=True) 
    channel.basic_consume(callback,queue=qname,no_ack=False) 
    channel.basic_qos(prefetch_count=1) 
    channel.start_consuming() 


def callback(ch, method, properties, body): 
    doSomeWork(); 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

if __name__ == '__main__': 
    for i in range(worker_count): 
     worker = multiprocessing.Process(target=mqworker) 
     worker.start() 

El problema que tengo es que a pesar de la configuración basic_qos en el canal, el primer trabajador a iniciar acepta todos los mensajes de la cola, mientras que los otros se sientan allí inactivo. Puedo ver esto en la interfaz de rabbitmq, que incluso cuando establezco worker_count en 1 y vuelco 50 mensajes en la cola, los 50 entran en el depósito "no reconocido", mientras que yo esperaría que 1 fuera no reconocido y los otros 49 en estar listo.

¿Por qué no funciona?

Respuesta

14

Parece que he resuelto esto moviéndome a donde se llama basic_qos.

Colocarlo justo después de channel = connection.channel() parece alterar el comportamiento de lo que esperaba.

+0

gracias! eso solucionó el problema. y por cierto esto es muy difícil de depurar ... – Sajuuk

+0

@Hiagara Sí, acabo de toparme con esto hoy mismo. Es sorprendente que casi 5 años después, esto todavía no esté claro ni documentado en la API. – Jordan

Cuestiones relacionadas