[python] Celery에서 작업 상태를 확인하는 방법은 무엇입니까?

작업이 celery에서 실행 중인지 어떻게 확인합니까 (특히 celery-django를 사용하고 있습니다)?

설명서를 읽고 봤지만 다음과 같은 전화를 볼 수 없습니다.

my_example_task.state() == RUNNING

내 사용 사례는 트랜스 코딩을위한 외부 (자바) 서비스가 있다는 것입니다. 트랜스 코딩 할 문서를 보낼 때 해당 서비스를 실행하는 작업이 실행 중인지 확인하고 그렇지 않은 경우 (다시) 시작하고 싶습니다.

현재 안정된 버전 인 2.4를 사용하고 있습니다.



답변

task_id (.delay ()에서 제공됨)를 반환하고 나중에 상태에 대해 셀러리 인스턴스에 문의합니다.

x = method.delay(1,2)
print x.task_id

질문 할 때 다음 task_id를 사용하여 새 AsyncResult를 가져옵니다.

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()


답변

만들기 AsyncResult작업 ID와 객체 것은 입니다 에서 권장하는 방법 자주 묻는 질문 당신이 가진 유일한 것은 작업 ID입니다 때 작업 상태를 얻을.

그러나 Celery 3.x부터는 사람들이주의를 기울이지 않으면 물릴 수있는 중요한주의 사항이 있습니다. 실제로 특정 사용 사례 시나리오에 따라 다릅니다.

기본적으로 Celery는 “실행 중”상태를 기록하지 않습니다.

위해 셀러리는 작업이 실행되는 것을 기록하기 위해, 당신은 설정해야합니다 task_track_started으로 True. 다음은이를 테스트하는 간단한 작업입니다.

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

task_track_started입니다 False기본 인, 상태 표시는 PENDING작업이 시작에도 불구하고. 사용자가 설정 한 경우 task_track_startedTrue, 그 상태가 될 것입니다 STARTED.

상태 PENDING는 “모름”을 의미합니다.

AsyncResult상태로는 PENDING더 셀러리는 작업의 상태를 모르는 것보다 것을하지 평균 아무것도 않습니다. 이것은 여러 가지 이유 때문일 수 있습니다.

우선 AsyncResult잘못된 작업 ID로 구성 할 수 있습니다. 이러한 “작업”은 Celery에서 보류중인 것으로 간주됩니다.

>>> task.AsyncResult("invalid").status
'PENDING'

좋아, 아무도 분명히 유효하지 않은 ID를 AsyncResult. 충분히 AsyncResult공평하지만 성공적으로 실행되었지만 Celery가 잊혀진 작업을 고려하는PENDING 효과도 있습니다 . 다시 말하지만, 일부 사용 사례 시나리오에서는 이것이 문제가 될 수 있습니다. 문제의 일부는 결과 백엔드에서 “삭제 표시”의 가용성에 따라 다르기 때문에 작업 결과를 유지하도록 Celery를 구성하는 방법에 달려 있습니다. ( “묘비는”기록이 작업이 어떻게 끝났는지하는 데이터 청크에 대한 셀러리 설명서의 용어 사용이다.)를 사용 AsyncResult하면 전혀하지 않습니다 일 task_ignore_result입니다 True. 더 짜증나는 문제는 셀러리가 기본적으로 삭제 표시를 만료한다는 것입니다. 그만큼result_expires기본 설정은 24 시간으로 설정됩니다. 따라서 작업을 시작하고 장기 저장소에 ID를 기록하고 24 시간 후에 작업을 생성 AsyncResult하면 상태는 PENDING.

모든 “실제 작업”은 PENDING주 에서 시작됩니다 . 따라서 PENDING작업을 수행한다는 것은 작업이 요청되었지만 (어떤 이유로 든) 이보다 더 진행되지 않았 음을 의미 할 수 있습니다. 또는 작업이 실행되었지만 Celery가 상태를 잊어 버렸음을 의미 할 수 있습니다.

아야! AsyncResult나를 위해 작동하지 않습니다. 그 밖에 무엇을 할 수 있습니까?

작업 자체를 추적하는 것보다 목표를 추적하는 것을 선호 합니다 . 나는 몇 가지 작업 정보를 유지하지만 목표를 추적하는 것은 정말 부차적입니다. 목표는 셀러리와는 독립적 인 저장소에 저장됩니다. 요청이 계산을 수행해야 할 때 달성 된 목표에 따라 달라지며, 목표가 이미 달성되었는지 확인하고, 그렇다면이 캐시 된 목표를 사용하고, 그렇지 않으면 목표에 영향을 미칠 작업을 시작하고 HTTP 요청을 한 클라이언트가 결과를 기다려야 함을 나타내는 응답입니다.


위의 변수 이름과 하이퍼 링크는 Celery 4.x 용입니다. 3.X에 대응하는 변수와 하이퍼 링크이다 : CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.


답변

모든 Task개체에는 개체 .request를 포함 하는 속성이 있습니다 AsyncRequest. 따라서 다음 줄은 Task의 상태를 제공합니다 task.

task.AsyncResult(task.request.id).state


답변

또한 사용자 지정 상태를 생성하고 작업 실행 중 값을 업데이트 할 수 있습니다. 이 예제는 문서에서 가져온 것입니다.

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states


답변

오래된 질문이지만 최근 에이 문제가 발생했습니다.

task_id를 얻으려는 경우 다음과 같이 할 수 있습니다.

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

이제 task_id가 정확히 무엇인지 알았으며 이제이를 사용하여 AsyncResult를 가져올 수 있습니다.

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4


답변

셀러리 FAQ 에서이 API를 사용하십시오.

result = app.AsyncResult(task_id)

이것은 잘 작동합니다.


답변

2020 년 답변 :

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"