배달 못한 편지 대기열에서 Amazon SQS의 원래 대기열로 메시지를 다시 이동하는 모범 사례는 무엇입니까?
일 것이다
- DLQ에서 메시지 받기
- 대기열에 메시지 쓰기
- DLQ에서 메시지 삭제
아니면 더 간단한 방법이 있습니까?
또한 AWS는 결국 콘솔에 DLQ에서 메시지를 이동하는 도구를 갖게 될까요?
답변
다음은 빠른 해킹입니다. 이것은 확실히 최선이나 권장 옵션이 아닙니다.
- 최대 수신이 1 인 실제 DLQ에 대한 DLQ로 기본 SQS 대기열을 설정합니다.
- DLQ의 콘텐츠보기 (실제 DLQ에 대한 DLQ이므로 메시지가 기본 대기열로 이동 됨)
- 기본 대기열이 실제 DLQ의 DLQ가되지 않도록 설정을 제거하십시오.
답변
이 작업을 수행하는 몇 가지 스크립트가 있습니다.
- npm / nodejs 기반 : http://github.com/garryyao/replay-aws-dlq
# install
npm install replay-aws-dlq;
# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source
# use
sqsmover -s [source_queue_url] -d [dest_queue_url]
답변
메시지 중복, 복구 시나리오, 메시지 손실, 중복 제거 검사 등과 같은 다른 많은 문제가 발생하므로 메시지를 이동할 필요가 없습니다.
다음은 우리가 구현 한 솔루션입니다.
일반적으로 우리는 영구적 인 오류가 아닌 일시적인 오류에 DLQ를 사용합니다. 그래서 아래 접근 방식을 취했습니다.
-
일반 대기열처럼 DLQ에서 메시지 읽기
혜택
- 중복 메시지 처리를 방지하려면
- DLQ에 대한 더 나은 제어-내가 확인한 것처럼 일반 대기열이 완전히 처리 될 때만 처리합니다.
- DLQ의 메시지를 기반으로 프로세스 확장
-
그런 다음 일반 대기열이 따르는 동일한 코드를 따르십시오.
-
작업을 중단하거나 처리하는 동안 프로세스가 종료 된 경우 (예 : 인스턴스가 종료되거나 프로세스가 종료 된 경우) 더 안정적입니다.
혜택
- 코드 재사용 성
- 오류 처리
- 복구 및 메시지 재생
-
다른 스레드가 메시지를 처리하지 않도록 메시지 가시성을 확장하십시오.
이익
- 여러 스레드에서 동일한 레코드를 처리하지 마십시오.
-
영구적 인 오류가 있거나 성공한 경우에만 메시지를 삭제하십시오.
이익
- 일시적인 오류가 발생할 때까지 처리를 계속하십시오.
답변
최선의 선택 인 것 같습니다. 2 단계 이후에 프로세스가 실패 할 가능성이 있습니다.이 경우 메시지를 두 번 복사하게되지만 응용 프로그램은 메시지 재전송을 처리해야합니다 (또는 상관 없음).
답변
여기:
import boto3
import sys
import Queue
import threading
work_queue = Queue.Queue()
sqs = boto3.resource('sqs')
from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)
def process_queue():
while True:
messages = work_queue.get()
bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})
to_q.send_messages(Entries=bodies)
for message in messages:
print("Coppied " + str(message.body))
message.delete()
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
답변
한 줄의 코드를 작성하지 않고도이를 달성 할 수있는 또 다른 방법이 있습니다. 실제 큐 이름이 SQS_Queue이고 DLQ가 SQS_DLQ라고 고려하십시오. 이제 다음 단계를 따르십시오.
- SQS_Queue를 SQS_DLQ의 dlq로 설정합니다. SQS_DLQ는 이미 SQS_Queue의 dlq이기 때문에. 이제 둘 다 다른 것의 dlq 역할을합니다.
- SQS_DLQ의 최대 수신 횟수를 1로 설정합니다.
- 이제 SQS_DLQ 콘솔에서 메시지를 읽습니다. 메시지 수신 횟수가 1이므로 실제 SQS_Queue 큐인 자체 dlq로 모든 메시지를 보냅니다.
답변
boto3 lib를 사용하여이를 수행하기 위해 작은 파이썬 스크립트를 작성했습니다.
conf = {
"sqs-access-key": "",
"sqs-secret-key": "",
"reader-sqs-queue": "",
"writer-sqs-queue": "",
"message-group-id": ""
}
import boto3
client = boto3.client(
'sqs',
aws_access_key_id = conf.get('sqs-access-key'),
aws_secret_access_key = conf.get('sqs-secret-key')
)
while True:
messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)
if 'Messages' in messages:
for m in messages['Messages']:
print(m['Body'])
ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
print(ret)
client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
else:
print('Queue is currently empty or messages are invisible')
break
이 링크 에서이 스크립트를 얻을 수 있습니다.
이 스크립트는 기본적으로 임의의 대기열간에 메시지를 이동할 수 있습니다. message_group_id
필드를 제공 할 수있을뿐만 아니라 fifo 대기열을 지원 합니다.
