2010-01-25 23 views
16

Tengo un proyecto de Django y estoy tratando de utilizar Celery para enviar tareas para el procesamiento en segundo plano (http://ask.github.com/celery/introduction.html). Aplery se integra bien con Django y he podido enviar mis tareas personalizadas y obtener resultados.¿Cómo puedo configurar Apio para llamar a una función de inicialización personalizada antes de ejecutar mis tareas?

El único problema es que no puedo encontrar una forma sensata de realizar una inicialización personalizada en el proceso del daemon. Necesito llamar a una función costosa que carga mucha memoria antes de comenzar a procesar las tareas, y no puedo permitirme llamar a esa función todo el tiempo.

¿Alguien ha tenido este problema antes? ¿Alguna idea de cómo solucionarlo sin modificar el código fuente de Aplery?

Gracias

+0

¿qué tipo de inicialización personalizada necesita ejecutar? – diegueus9

+0

necesito cargar una estructura de datos de ~ 10MB que se requiere para procesar cada tarea (la estructura es la misma para todas las tareas). – xelk

Respuesta

15

Usted puede escribir un cargador personalizado o utilizar las señales.

cargadores tienen el método on_task_init, que se llama cuando una tarea está a punto de ser ejecutado, y on_worker_init que se llama por el apio + proceso principal celerybeat.

El uso de señales es probablemente el más fácil, las señales disponibles son:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    distribuye cuando una tarea está a punto de ser ejecutado por el trabajador (o localmente si se usa apply/o si se ha configurado CELERY_ALWAYS_EAGER).

  • task_postrun(task_id, task, args, kwargs, retval) Se distribuye después de que se haya ejecutado una tarea en las mismas condiciones anteriores.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    llama cuando se aplica una tarea (no es bueno para las operaciones de larga duración)

señales adicionales disponibles en 0.9.x (rama principal actual en github):

  • worker_init()

    Se invoca cuando apical comenzó (antes de inicializar la tarea, por lo que si en un sistema es compatible con fork, los cambios de memoria se copiarían en los procesos de trabajo secundarios ).

  • worker_ready()

    llama cuando celeryd es capaz de recibir tareas.

  • worker_shutdown()

    llama cuando celeryd se está cerrando.

He aquí un ejemplo puede calcular previamente algo la primera vez que una tarea se ejecuta en el proceso:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

Si desea que la función que se ejecuta para todas las tareas, omita el argumento sender.

Cuestiones relacionadas