[python] django-celery로 단위 테스트?

django-celery 프로젝트에 대한 테스트 방법론을 생각해 내려고합니다 . 문서 의 메모를 읽었 지만 실제로 무엇을해야하는지에 대한 좋은 아이디어를 얻지 못했습니다. 나는 실제 데몬에서 작업을 테스트하는 것에 대해 걱정하지 않고 코드 의 기능에 대해서만 걱정 합니다. 주로 궁금합니다.

  1. task.delay()테스트 중에 어떻게 우회 할 수 CELERY_ALWAYS_EAGER = True있습니까 ( 설정을 시도 했지만 차이가 없음)?
  2. settings.py를 실제로 변경하지 않고 권장되는 테스트 설정을 어떻게 사용합니까 (최선의 방법 인 경우)?
  3. 그래도 사용할 수 있습니까 manage.py test아니면 맞춤형 러너를 사용해야합니까?

전반적으로 셀러리 테스트에 대한 힌트 나 팁이 매우 도움이 될 것입니다.



답변

설정 시도 :

BROKER_BACKEND = 'memory'

( asksol 의 의견에 감사드립니다 .)


답변

완료하려면 셀러리 결과가 필요한 테스트에서 override_settings 데코레이터를 사용하고 싶습니다.

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

이것을 모든 테스트에 적용하려면 http://docs.celeryproject.org/en/2.5/django/unit-testing.html에 설명 된대로 셀러리 테스트 실행기를 사용할 수 있습니다. 기본적으로 ( BROKER_BACKEND = 'memory')를 제외하고 동일한 설정을 설정 합니다.

설정에서 :

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

CeleryTestSuiteRunner의 소스를 보면 무슨 일이 일어나고 있는지 매우 분명합니다.


답변

다음은 apply_async메서드 를 스텁하고 이에 대한 호출을 기록 하는 테스트 기본 클래스에서 발췌 한 것입니다 (포함) Task.delay. 약간 복잡하지만 지난 몇 달 동안 사용하고있는 내 필요에 맞게 관리되었습니다.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

다음은 테스트 케이스에서 사용하는 방법에 대한 “정상적인”예제입니다.

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)


답변

여전히 검색 결과에 표시되므로 설정이

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Celery Docs에 따라 나를 위해 일했습니다.


답변

이것이 내가 한 일입니다

myapp.tasks.py 안에 다음이 있습니다.

from celery import shared_task

@shared_task()
def add(a, b):
    return a + b

myapp.test_tasks.py 안에 다음이 있습니다.

from django.test import TestCase, override_settings
from myapp.tasks import add


class TasksTestCase(TestCase):

    def setUp(self):
        ...

    @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
    def test_create_sections(self):
        result= add.delay(1,2)
        assert result.successful() == True
        assert result.get() == 3


답변

2019 년에 여기에 온 모든 사람 : 작업을 동기식으로 호출하는 등 다양한 전략을 다루는 이 기사 를 확인 하세요 .


답변