2010-10-29 24 views
71

Estoy tratando de encontrar una metodología de prueba para nuestro proyecto django-celery. He leído las notas en el documentation, pero no me dio una buena idea de qué hacer realmente. No estoy preocupado por probar las tareas en los daemons reales, solo la funcionalidad de mi código. Principalmente me pregunto:Pruebas unitarias con django-apio?

  1. ¿Cómo podemos pasar por alto task.delay() durante la prueba (I intentado fijar CELERY_ALWAYS_EAGER = True pero no hizo ninguna diferencia)?
  2. ¿Cómo utilizamos la configuración de prueba que se recomienda (si es la mejor) sin cambiar realmente nuestra settings.py?
  3. ¿Todavía podemos usar manage.py test o tenemos que usar un corredor personalizado?

En general, cualquier sugerencia o consejo para probar con apio sería muy útil.

+1

¿qué quiere decir 'CELERY_ALWAYS_EAGER' hace ninguna diferencia? – asksol

+0

Aún recibo errores acerca de no poder contactar con rabbitmq. –

+0

¿Tiene el traceback? Supongo que algo más que '.delay' podría estar tratando de establecer una conexión. – asksol

Respuesta

40

Intente configurar:

BROKER_BACKEND = 'memory' 

(Gracias a comentario asksol 's.)

+0

Gracias amablemente. –

+5

Creo que esto ya no es necesario cuando se establece CELERY_ALWAYS_EAGER. – mlissner

+1

En apio 4 esto no funciona –

16

He aquí un extracto de mi clase base de pruebas que aplasta el método apply_async y registros de las llamadas a la misma (que incluye Task.delay.) Es un poco bruto, pero se logró satisfacer mis necesidades durante los últimos meses' lo he estado usando

from django.test import TestCase 
from celery.task.base import Task 
# For recent versions, Task has been moved to celery.task.app: 
# from celery.app.task import Task 
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html 

class CeleryTestCaseBase(TestCase): 

    def setUp(self): 
     super(CeleryTestCaseBase, self).setUp() 
     self.applied_tasks = [] 

     self.task_apply_async_orig = Task.apply_async 

     @classmethod 
     def new_apply_async(task_class, args=None, kwargs=None, **options): 
      self.handle_apply_async(task_class, args, kwargs, **options) 

     # monkey patch the regular apply_sync with our method 
     Task.apply_async = new_apply_async 

    def tearDown(self): 
     super(CeleryTestCaseBase, self).tearDown() 

     # Reset the monkey patch to the original method 
     Task.apply_async = self.task_apply_async_orig 

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options): 
     self.applied_tasks.append((task_class, tuple(args), kwargs)) 

    def assert_task_sent(self, task_class, *args, **kwargs): 
     was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2] 
         for task in self.applied_tasks) 
     self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args)) 

    def assert_task_not_sent(self, task_class): 
     was_sent = any(task_class == task[0] for task in self.applied_tasks) 
     self.assertFalse(was_sent, 'Task was not expected to be called, but was. Applied tasks: %s' %     self.applied_tasks) 

He aquí un "de la parte superior de la cabeza" ejemplo de cómo se utilizaría en los casos de prueba:

mymodule.py

from my_tasks import SomeTask 

def run_some_task(should_run): 
    if should_run: 
     SomeTask.delay(1, some_kwarg=2) 

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase): 
    def test_should_run(self): 
     run_some_task(should_run=True) 
     self.assert_task_sent(SomeTask, 1, some_kwarg=2) 

    def test_should_not_run(self): 
     run_some_task(should_run=False) 
     self.assert_task_not_sent(SomeTask) 
+0

Esto es excelente como referencia. Muchas gracias. – droidballoon

59

me gusta usar el decorador override_settings en las pruebas que necesitan resultados de apio para completar.

from django.test import TestCase 
from django.test.utils import override_settings 
from myapp.tasks import mytask 

class AddTestCase(TestCase): 

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, 
         CELERY_ALWAYS_EAGER=True, 
         BROKER_BACKEND='memory') 
    def test_mytask(self): 
     result = mytask.delay() 
     self.assertTrue(result.successful()) 

Si desea aplicar esto a todas las pruebas que puede utilizar el corredor de prueba de apio como se describe en http://docs.celeryproject.org/en/2.5/django/unit-testing.html que básicamente establece estos mismos parámetros excepto (BROKER_BACKEND = 'memory').

En la configuración:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner' 

Mira la fuente de CeleryTestSuiteRunner y que está bastante claro lo que está pasando.

+0

Respuesta perfecta; limpio, compacto, y funciona ;-) ¡Gracias! –

4

ya que todavía veo que esto se presenta en los resultados de búsqueda, ajustes anulan con

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner' 

trabajó para mí como por Celery Docs