[python] Celery에서 대기열에있는 작업 목록 검색

아직 처리되지 않은 대기열의 작업 목록을 어떻게 검색합니까?



답변

편집 : 대기열에서 작업 목록을 얻는 다른 답변을 참조하십시오.

현재 같아야합니다
셀러리 가이드 -의 검사 노동자

기본적으로 이것은 :

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

원하는 것에 따라


답변

rabbitMQ를 사용하는 경우 터미널에서 이것을 사용하십시오.

sudo rabbitmqctl list_queues

대기중인 작업 수가 많은 대기열 목록을 인쇄합니다. 예를 들면 다음과 같습니다.

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

오른쪽 열의 숫자는 대기열의 작업 수입니다. 위의 셀러리 큐에는 166 개의 보류중인 작업이 있습니다.


답변

우선 순위가 지정된 작업을 사용하지 않으면 Redis를 사용하는 경우 실제로 매우 간단 합니다. 작업 수를 얻으려면 :

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

그러나 우선 순위가 지정된 작업 은 redis에서 다른 키를 사용 하므로 전체 그림이 약간 더 복잡합니다. 전체 그림은 모든 작업 우선 순위에 대해 redis를 쿼리해야한다는 것입니다. 파이썬과 꽃 프로젝트에서 다음과 같이 보입니다.

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

실제 작업을 원하면 다음과 같이 사용할 수 있습니다.

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

거기에서 반환 된 목록을 직렬화 해제해야합니다. 제 경우에는 다음과 같은 방법으로 이것을 달성 할 수있었습니다.

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

역 직렬화에는 다소 시간이 걸릴 수 있으므로 위의 명령을 조정하여 다양한 우선 순위로 작업해야합니다.


답변

백엔드에서 작업을 검색하려면 다음을 사용하십시오.

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)


답변

당신이 사용하는 경우 셀러리 + 장고 작업을 당신의 터미널에서 직접 명령을 사용하여 검사하는 간단한 방법을 가상 환경 또는 사용하여 전체 경로 셀러리로를 :

문서 : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

또한 Celery + RabbitMQ 를 사용하는 경우 다음 명령을 사용하여 큐 목록을 검사 할 수 있습니다 .

자세한 정보 : https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues


답변

json 직렬화를 사용하는 Redis 용 복사-붙여 넣기 솔루션 :

def get_celery_queue_items(queue_name):
    import base64
    import json

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

장고와 함께 작동합니다. 변경하는 것을 잊지 마십시오 yourproject.celery.


답변

셀러리 검사 모듈은 작업자 관점에서의 작업 만 인식하는 것으로 보입니다. 대기열에있는 메시지를 보려면 (워커가 가져 오려고하지만) pyrabbit 을 사용하는 것이 좋습니다 . pyrabbit는 rabbitmq http api와 인터페이스하여 대기열에서 모든 종류의 정보를 검색 할 수 있습니다.

예는 여기에서 찾을 수 있습니다 :
셀러리 (RabbitMQ, 장고)와 큐 길이를 검색