2011-03-17 13 views
14

¿Hay alguna forma de determinar si se pierde alguna tarea y volver a intentarlo?Reintentar tareas perdidas o fallidas (Apio, Django y RabbitMQ)

Creo que el motivo de pérdida puede ser el error del asignador o el bloqueo del hilo del trabajador.

Estaba planeando volver a intentarlos, pero no estoy seguro de cómo determinar qué tareas deben volver a intentarse?

¿Y cómo hacer este proceso automáticamente? ¿Puedo usar mi propio programador personalizado que creará nuevas tareas?

Editar: Encontré en la documentación que RabbitMQ nunca pierde tareas, pero ¿qué sucede cuando el hilo de trabajo se bloquea en el medio de la ejecución de la tarea?

Respuesta

26

Lo que se necesita es establecer

CELERY_ACKS_LATE = True

Late ACK significa que los mensajes de tareas serán reconocidas después de la tarea se ha ejecutado, no sólo antes, que es el comportamiento por defecto. De esta forma, si el trabajador se bloquea, MQ de conejo seguirá teniendo el mensaje.

Obviamente, de un bloqueo total (Rabbit + workers) al mismo tiempo, no hay forma de recuperar la tarea, excepto si implementa un inicio de sesión en el inicio de la tarea y el final de la tarea. Personalmente escribo en mongodb una línea cada vez que se inicia una tarea y otra cuando termina la tarea (independientemente del resultado), de esta forma puedo saber qué tarea se interrumpió al analizar los registros de mongo.

Puede hacerlo fácilmente anulando los métodos __call__ y after_return de la clase de tarea base de apio.

A continuación, ve un fragmento de mi código que utiliza una clase taskLogger como gestor de contexto (con punto de entrada y salida). La clase taskLogger simplemente escribe una línea que contiene la información de la tarea en una instancia de mongodb.

def __call__(self, *args, **kwargs): 
    """In celery task this function call the run method, here you can 
    set some environment variable before the run of the task""" 

    #Inizialize context managers  

    self.taskLogger = TaskLogger(args, kwargs) 
    self.taskLogger.__enter__() 

    return self.run(*args, **kwargs) 

def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    #exit point for context managers 
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo) 

espero que esto podría ayudar a

Cuestiones relacionadas