Comment testez-vous une tâche de céleri?

La documentation Celery mentionne le test de Celery dans Django, mais n’explique pas comment tester une tâche Celery si vous n’utilisez pas Django. Comment est-ce que tu fais ça?

Il est possible de tester des tâches de manière synchrone en utilisant n’importe quelle lib la plus courante. Je fais normalement 2 sessions de test différentes lorsque je travaille avec des tâches de céleri. Le premier (comme je le suggère ci-dessous) est complètement synchrone et devrait être celui qui garantit que l’algorithme fait ce qu’il doit faire. La deuxième session utilise l’intégralité du système (y compris le courtier) et vérifie que je n’ai pas de problèmes de sérialisation ou d’autres problèmes de dissortingbution.

Alors:

from celery import Celery celery = Celery() @celery.task def add(x, y): return x + y 

Et votre test:

 from nose.tools import eq_ def test_add_task(): rst = add.apply(args=(4, 4)).get() eq_(rst, 8) 

J’espère que cela pourra aider!

J’utilise ceci:

 with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True): ... 

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER vous permet d’exécuter votre tâche de manière synchrone et vous n’avez pas besoin d’un serveur Celery.

Dépend de ce que vous voulez tester exactement.

  • Testez directement le code de la tâche. N’appelez pas “task.delay (…)” appelez simplement “task (…)” à partir de vos tests unitaires.
  • Utilisez CELERY_ALWAYS_EAGER . Cela provoquera l’appel immédiat de vos tâches au point que vous avez dit “task.delay (…)”, afin que vous puissiez tester l’intégralité du chemin (mais pas tout comportement asynchrone).

Test de l’unité

 import unittest from myproject.myapp import celeryapp class TestMyCeleryWorker(unittest.TestCase): def setUp(self): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True) 

luminaires py.test

 # conftest.py from myproject.myapp import celeryapp @pytest.fixture(scope='module') def celery_app(request): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True) return celeryapp # test_tasks.py def test_some_task(celery_app): ... 

Addendum: rendez send_task respecté

 from celery import current_app def send_task(name, args=(), kwargs={}, **opts): # https://github.com/celery/celery/issues/581 task = current_app.tasks[name] return task.apply(args, kwargs, **opts) current_app.send_task = send_task 

Pour ceux sur le céleri 4 c’est:

 @override_settings(CELERY_TASK_ALWAYS_EAGER=True) 

Comme les noms de parameters ont été modifiés et doivent être mis à jour si vous choisissez de mettre à niveau, voir

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

À partir de Celery 3.0 , une façon de définir CELERY_ALWAYS_EAGER dans Django est la suivante:

 from django.test import TestCase, override_settings from .foo import foo_celery_task class MyTest(TestCase): @override_settings(CELERY_ALWAYS_EAGER=True) def test_foo(self): self.assertTrue(foo_celery_task.delay()) 

Dans mon cas (et j’en suppose beaucoup d’autres), tout ce que je voulais, c’était tester la logique interne d’une tâche en utilisant pytest.

TL; DR; fini par se moquer de tout ( OPTION 2 )


Exemple d’utilisation :

proj/tasks.py

 @shared_task(bind=True) def add_task(self, a, b): return a+b; 

tests/test_tasks.py

 from proj import add_task def test_add(): assert add_task(1, 2) == 3, '1 + 2 should equal 3' 

mais comme le décorateur shared_task fait beaucoup de logique interne au céleri, ce n’est pas vraiment un test unitaire.

Donc, pour moi, il y avait 2 options:

OPTION 1: Séparer la logique interne

proj/tasks_logic.py

 def internal_add(a, b): return a + b; 

proj/tasks.py

 from .tasks_logic import internal_add @shared_task(bind=True) def add_task(self, a, b): return internal_add(a, b); 

Cela semble très étrange, et à part le rendre moins lisible, il faut extraire et passer manuellement des atsortingbuts qui font partie de la requête, par exemple task_id au cas où vous en auriez besoin, ce qui rend la logique moins pure.

OPTION 2: mock
se moquer des internes du céleri

tests/__init__.py

 # noinspection PyUnresolvedReferences from celery import shared_task from mock import patch def mock_signature(**kwargs): return {} def mocked_shared_task(*decorator_args, **decorator_kwargs): def mocked_shared_decorator(func): func.signature = func.si = func.s = mock_signature return func return mocked_shared_decorator patch('celery.shared_task', mocked_shared_task).start() 

ce qui me permet ensuite de simuler l’object request (encore une fois, au cas où vous auriez besoin d’éléments de la requête, comme l’identifiant ou le compteur de tentatives).

tests/test_tasks.py

 from proj import add_task class MockedRequest: def __init__(self, id=None): self.id = id or 1 class MockedTask: def __init__(self, id=None): self.request = MockedRequest(id=id) def test_add(): mocked_task = MockedTask(id=3) assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3' 

Cette solution est beaucoup plus manuelle, mais elle me donne le contrôle dont j’ai besoin pour tester les unités , sans me répéter, et sans perdre la scope du céleri.

Depuis Celery v4.0 , les fixtures py.test sont fournies pour démarrer un ouvrier de céleri juste pour le test et l’arrêt lorsque vous avez terminé:

 def test_myfunc_is_executed(celery_session_worker): # celery_session_worker:  assert myfunc.delay().wait(3) 

Parmi les autres appareils décrits sur http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test , vous pouvez modifier les options par défaut du céleri en redéfinissant le luminaire celery_config manière suivante:

 @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['json', 'pickle'], 'result_serializer': 'pickle', } 

Par défaut, l’agent de test utilise un courtier en mémoire et un moteur de résultat. Pas besoin d’utiliser un Redis ou un RabbitMQ local si vous ne testez pas des fonctionnalités spécifiques.