Lo que se necesita es establecer
CELERY_ACKS_LATE = True
Late ACK significa que los mensajes de tareas serán reconocidas después de la tarea se ha ejecutado, no sólo antes, que es el comportamiento por defecto. De esta forma, si el trabajador se bloquea, MQ de conejo seguirá teniendo el mensaje.
Obviamente, de un bloqueo total (Rabbit + workers) al mismo tiempo, no hay forma de recuperar la tarea, excepto si implementa un inicio de sesión en el inicio de la tarea y el final de la tarea. Personalmente escribo en mongodb una línea cada vez que se inicia una tarea y otra cuando termina la tarea (independientemente del resultado), de esta forma puedo saber qué tarea se interrumpió al analizar los registros de mongo.
Puede hacerlo fácilmente anulando los métodos __call__
y after_return
de la clase de tarea base de apio.
A continuación, ve un fragmento de mi código que utiliza una clase taskLogger como gestor de contexto (con punto de entrada y salida). La clase taskLogger simplemente escribe una línea que contiene la información de la tarea en una instancia de mongodb.
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
espero que esto podría ayudar a