2012-05-22 20 views
6

Estoy usando Django y apio y estoy intentando configurar el enrutamiento en varias colas. Cuando especifico una tarea como routing_key y exchange (en el decorador de tareas o usando apply_async()), la tarea no se agrega al intermediario (que es Kombu que se conecta a mi base de datos MySQL).Django y apio: problemas de enrutamiento

Si especifico el nombre de la cola en el decorador de tareas (lo que significa que se ignora la clave de enrutamiento), la tarea funciona bien. Parece ser un problema con la configuración de enrutamiento/intercambio.

¿Alguna idea de cuál podría ser el problema?

Aquí está la configuración:

settings.py

INSTALLED_APPS = (
    ... 
    'kombu.transport.django', 
    'djcelery', 
) 
BROKER_BACKEND = 'django' 
CELERY_DEFAULT_QUEUE = 'default' 
CELERY_DEFAULT_EXCHANGE = "tasks" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "task.default" 
CELERY_QUEUES = { 
    'default': { 
     'binding_key':'task.#', 
    }, 
    'i_tasks': { 
     'binding_key':'important_task.#', 
    }, 
} 

tasks.py

from celery.task import task 

@task(routing_key='important_task.update') 
def my_important_task(): 
    try: 
     ... 
    except Exception as exc: 
     my_important_task.retry(exc=exc) 

iniciar la tarea:

from tasks import my_important_task 
my_important_task.delay() 
+0

¿Cómo se pasa routing_key ? Con async_apply? – mher

+0

Estoy usando el método 'delay()', que es solo un atajo para 'apply_async()'. Estoy tratando de mantener la especificación 'routing_key' con el método de la tarea (a través del decorador) en lugar de cuando se llama. Intenté pasar la clave usando 'apply_async()' pero estoy obteniendo el mismo problema. –

+0

delay no acepta la palabra clave routing_key. Es una versión simplificada de apply_async, pero no son lo mismo. – mher

Respuesta

43

Está utilizando el ORM de Django como un corredor, lo que significa declaraciones sólo se almacenan en la memoria (ver el, indiscutiblemente difícil de encontrar, tabla de comparación de transporte en http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)

Así que cuando se aplica esta tarea con routing_key important_task.update no podrá enrutarlo, porque aún no ha declarado la cola.

Funcionará si hace esto:

@task(queue="i_tasks", routing_key="important_tasks.update") 
def important_task(): 
    print("IMPORTANT") 

Pero sería mucho más simple para que pueda utilizar la función de enrutamiento automático, ya que aquí no hay nada que muestre que usted necesita utilizar un ' intercambio tema', utilizar el enrutamiento automático simplemente eliminar la configuración:

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

y declare que su tarea como esta:

@task(queue="important") 
def important_task(): 
    return "IMPORTANT" 

y luego para iniciar un trabajador que consume a partir de esa cola:

$ python manage.py celeryd -l info -Q important 

o consumir tanto de la (celery) cola de defecto y el important cola:

$ python manage.py celeryd -l info -Q celery,important 

Otra buena práctica es la de no codificar los nombres de cola en la tarea y utilizar CELERY_ROUTES vez :

@task 
def important_task(): 
    return "DEFAULT" 

luego en la configuración:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

Si todavía insiste en usar intercambios tema, entonces podría añadir este router para declarar automáticamente todas las colas de la primera vez que se envía una tarea:

class PredeclareRouter(object): 
    setup = False 

    def route_for_task(self, *args, **kwargs): 
     if self.setup: 
      return 
     self.setup = True 
     from celery import current_app, VERSION as celery_version 
     # will not connect anywhere when using the Django transport 
     # because declarations happen in memory. 
     with current_app.broker_connection() as conn: 
      queues = current_app.amqp.queues 
      channel = conn.default_channel 
      if celery_version >= (2, 6): 
       for queue in queues.itervalues(): 
        queue(channel).declare() 
      else: 
       from kombu.common import entry_to_queue 
       for name, opts in queues.iteritems(): 
        entry_to_queue(name, **opts)(channel).declare() 
CELERY_ROUTES = (PredeclareRouter(),) 
+0

¡Gracias por la explicación! –

+2

¿Este problema con las declaraciones de cola y los intercambios se resolvió en Apio 3? Estoy usando el nuevo 'CELERY_QUEUES = (Queue (...), ...)' en la configuración, ¿significa esto que las colas están siendo declaradas correctamente? –

+0

Nota: en Celery 4.0 en adelante, CELERY_ROUTES se ha reemplazado por CELERY_TASK_ROUTES. Podría salvarle el tiempo a alguien. –