2012-04-17 20 views
37

Si he definido una función de la siguiente manera:Cómo agregar dinámicamente/eliminar tareas periódicas de apio (celerybeat)

def add(x,y): 
    return x+y 

¿Hay una manera de añadir dinámicamente esta función como un PeriodicTask apio y golpearlo con el pie apagado en tiempo de ejecución? Me gustaría ser capaz de hacer algo así (pseudocódigo):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) 
celery.beat.start(some_unique_task_id) 

También me gustaría detener o eliminar esa tarea de forma dinámica con algo como (pseudocódigo):

celery.beat.remove_task(some_unique_task_id) 

o

celery.beat.stop(some_unique_task_id) 

FYI No estoy usando djcelery, que le permite administrar tareas periódicas a través de django admin.

Respuesta

18

No, lo siento, esto no es posible con el celerybeat regular.

Pero es fácilmente extensible para hacer lo que desee, p. el planificador de django-apio es solo una subclase que lee y escribe el cronograma en la base de datos (con algunas optimizaciones en la parte superior).

También puede usar el planificador django-apio incluso para proyectos que no son de Django.

Algo como esto:

  • Instalar Django Django + apio:

    $ PIP instalar Django Django -U-apio

  • Añadir los siguientes ajustes a su celeryconfig:

    DATABASES = { 
        'default': { 
         'NAME': 'celerybeat.db', 
         'ENGINE': 'django.db.backends.sqlite3', 
        }, 
    } 
    INSTALLED_APPS = ('djcelery',) 
    
  • Crear las tablas de la base de datos :

    celerybeat
    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
    
  • de inicio con el programador de la base de datos:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ 
        -S djcelery.schedulers.DatabaseScheduler 
    

También existe el comando djcelerymon que se puede utilizar para no Django proyecta para comenzar celerycam y un servidor web Django administración en el mismo proceso, usted puede usar eso para también editar sus tareas periódicas en una buena interfaz web:

$ djcelerymon 

(Nota por alguna razón djcelerymon no se puede detener mediante Ctrl + C, que tiene que usar Ctrl + Z + mata% 1)

+1

¿Puede mencionar el código para agregar tareas y eliminarlas? Lo siento, no estoy recibiendo. –

+6

¿Algún cambio en esto de 2012 a 2016? – Tanay

32

Esta pregunta fue respondida en google groups.

yo no soy el autor, todo el crédito va a Jean Mark

he aquí una solución adecuada para esto.Trabajo confirmado, en mi escenario, Subclasifiqué la tarea periódica y creé un modelo a partir de ella, ya que puedo agregar otros campos al modelo según sea necesario y también para poder agregar el método "terminar". Debe establecer la propiedad activada de la tarea periódica en False y guardarla antes de eliminarla. La subclase no es obligatoria, el método schedule_every es el que realmente hace el trabajo. Cuando esté listo para finalizar su tarea (si no la subclasificó) puede simplemente usar PeriodicTask.objects.filter (name = ...) para buscar su tarea, deshabilitarla y luego eliminarla.

Espero que esto ayude!

from djcelery.models import PeriodicTask, IntervalSchedule 
from datetime import datetime 

class TaskScheduler(models.Model): 

    periodic_task = models.ForeignKey(PeriodicTask) 

    @staticmethod 
    def schedule_every(task_name, period, every, args=None, kwargs=None): 
    """ schedules a task by name every "every" "period". So an example call would be: 
     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """ 
     permissible_periods = ['days', 'hours', 'minutes', 'seconds'] 
     if period not in permissible_periods: 
      raise Exception('Invalid period specified') 
     # create the periodic task and the interval 
     ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task 
     interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) 
     if interval_schedules: # just check if interval schedules exist like that already and reuse em 
      interval_schedule = interval_schedules[0] 
     else: # create a brand new interval schedule 
      interval_schedule = IntervalSchedule() 
      interval_schedule.every = every # should check to make sure this is a positive int 
      interval_schedule.period = period 
      interval_schedule.save() 
     ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) 
     if args: 
      ptask.args = args 
     if kwargs: 
      ptask.kwargs = kwargs 
     ptask.save() 
     return TaskScheduler.objects.create(periodic_task=ptask) 

    def stop(self): 
     """pauses the task""" 
     ptask = self.periodic_task 
     ptask.enabled = False 
     ptask.save() 

    def start(self): 
     """starts the task""" 
     ptask = self.periodic_task 
     ptask.enabled = True 
     ptask.save() 

    def terminate(self): 
     self.stop() 
     ptask = self.periodic_task 
     self.delete() 
     ptask.delete() 
+1

Esta debería ser la respuesta aceptada. – kai

+1

@kai 'IntervalSchedule',' PeriodicTask', etc., son clases 'djcelery', y el OP dice que no está usando' djcelery'. Definitivamente útil, sin embargo. – Chris

2

Se puede extraer de este flask-djcelery que configura frasco y djcelery y también proporciona browseable resto api

2

Hay una biblioteca llamada django-apio-beat que proporciona los modelos que uno necesita. Para hacer que cargue dinámicamente nuevas tareas periódicas, uno debe crear su propio Programador.

from django_celery_beat.schedulers import DatabaseScheduler 


class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      print('resetting heap') 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = new_schedule.keys() - self.schedule.keys() 
       to_remove = self.schedule.keys() - new_schedule.keys() 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 

     return self._schedule 
+0

Gracias. No funcionó de inmediato, pero usando 'to_add = [tecla para clave en new_schedule.keys() si key no en self.schedule.keys()]' y similar para 'to_remove' hizo el truco. ¿Por qué no es esta una opción estándar? Hasta ahora, tuve que realizar tareas de Apio para llamar a otras tareas de Apio con una cuenta regresiva. Eso no me parece muy bueno. – freethebees

Cuestiones relacionadas