[amazon-web-services] Amazon SQS에서 DLQ에서 메시지를 이동하는 가장 좋은 방법은 무엇입니까?

배달 못한 편지 대기열에서 Amazon SQS의 원래 대기열로 메시지를 다시 이동하는 모범 사례는 무엇입니까?

일 것이다

  1. DLQ에서 메시지 받기
  2. 대기열에 메시지 쓰기
  3. DLQ에서 메시지 삭제

아니면 더 간단한 방법이 있습니까?

또한 AWS는 결국 콘솔에 DLQ에서 메시지를 이동하는 도구를 갖게 될까요?



답변

다음은 빠른 해킹입니다. 이것은 확실히 최선이나 권장 옵션이 아닙니다.

  1. 최대 수신이 1 인 실제 DLQ에 대한 DLQ로 기본 SQS 대기열을 설정합니다.
  2. DLQ의 콘텐츠보기 (실제 DLQ에 대한 DLQ이므로 메시지가 기본 대기열로 이동 됨)
  3. 기본 대기열이 실제 DLQ의 DLQ가되지 않도록 설정을 제거하십시오.


답변

이 작업을 수행하는 몇 가지 스크립트가 있습니다.

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url]


답변

메시지 중복, 복구 시나리오, 메시지 손실, 중복 제거 검사 등과 같은 다른 많은 문제가 발생하므로 메시지를 이동할 필요가 없습니다.

다음은 우리가 구현 한 솔루션입니다.

일반적으로 우리는 영구적 인 오류가 아닌 일시적인 오류에 DLQ를 사용합니다. 그래서 아래 접근 방식을 취했습니다.

  1. 일반 대기열처럼 DLQ에서 메시지 읽기

    혜택

    • 중복 메시지 처리를 방지하려면
    • DLQ에 대한 더 나은 제어-내가 확인한 것처럼 일반 대기열이 완전히 처리 될 때만 처리합니다.
    • DLQ의 메시지를 기반으로 프로세스 확장
  2. 그런 다음 일반 대기열이 따르는 동일한 코드를 따르십시오.

  3. 작업을 중단하거나 처리하는 동안 프로세스가 종료 된 경우 (예 : 인스턴스가 종료되거나 프로세스가 종료 된 경우) 더 안정적입니다.

    혜택

    • 코드 재사용 성
    • 오류 처리
    • 복구 및 메시지 재생
  4. 다른 스레드가 메시지를 처리하지 않도록 메시지 가시성을 확장하십시오.

    이익

    • 여러 스레드에서 동일한 레코드를 처리하지 마십시오.
  5. 영구적 인 오류가 있거나 성공한 경우에만 메시지를 삭제하십시오.

    이익

    • 일시적인 오류가 발생할 때까지 처리를 계속하십시오.


답변

최선의 선택 인 것 같습니다. 2 단계 이후에 프로세스가 실패 할 가능성이 있습니다.이 경우 메시지를 두 번 복사하게되지만 응용 프로그램은 메시지 재전송을 처리해야합니다 (또는 상관 없음).


답변

여기:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()


답변

한 줄의 코드를 작성하지 않고도이를 달성 할 수있는 또 다른 방법이 있습니다. 실제 큐 이름이 SQS_Queue이고 DLQ가 SQS_DLQ라고 고려하십시오. 이제 다음 단계를 따르십시오.

  1. SQS_Queue를 SQS_DLQ의 dlq로 설정합니다. SQS_DLQ는 이미 SQS_Queue의 dlq이기 때문에. 이제 둘 다 다른 것의 dlq 역할을합니다.
  2. SQS_DLQ의 최대 수신 횟수를 1로 설정합니다.
  3. 이제 SQS_DLQ 콘솔에서 메시지를 읽습니다. 메시지 수신 횟수가 1이므로 실제 SQS_Queue 큐인 자체 dlq로 모든 메시지를 보냅니다.


답변

boto3 lib를 사용하여이를 수행하기 위해 작은 파이썬 스크립트를 작성했습니다.

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

링크 에서이 스크립트를 얻을 수 있습니다.

이 스크립트는 기본적으로 임의의 대기열간에 메시지를 이동할 수 있습니다. message_group_id필드를 제공 할 수있을뿐만 아니라 fifo 대기열을 지원 합니다.