나는 RabbitMQ와 AMQP를 일반적으로 사용하기 시작했습니다.
- 메시지 대기열이 있습니다
- 여러 소비자가 있는데 동일한 메시지로 다른 일을하고 싶습니다 .
RabbitMQ 문서의 대부분은 라운드 로빈에 중점을 둔 것으로 보입니다. 즉, 단일 메시지가 단일 소비자에 의해 소비되고 각 소비자간에로드가 분산되는 경우입니다. 이것은 실제로 내가 목격하는 행동입니다.
예 : 생산자에게 단일 대기열이 있고 2 초마다 메시지를 보냅니다.
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
그리고 여기 소비자가 있습니다 :
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
소비자를 두 번 시작하면 각 소비자가 라운드 로빈 동작으로 대체 메시지를 소비하고 있음을 알 수 있습니다. 예를 들어, 한 터미널에는 메시지 1, 3, 5, 다른 터미널에는 2, 4, 6 메시지가 표시됩니다 .
내 질문은 :
-
각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?
-
이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?
답변
각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?
소비자가 동일한 대기열에있는 경우 아닙니다. RabbitMQ의 AMQP 개념 안내서에서 :
AMQP 0-9-1에서 메시지는 소비자간에로드 균형 조정됨을 이해해야합니다.
이것은 큐 내의 라운드 로빈 동작이 지정 되어 있으며 구성 할 수 없음을 의미합니다. 즉, 여러 소비자가 동일한 메시지 ID를 처리하려면 별도의 큐가 필요합니다.
이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?
아니요, 각 소비자가 동일한 메시지 ID를 처리하는 단일 대기열 / 여러 소비자는 불가능합니다. 교환기가 메시지를 두 개의 개별 대기열로 라우팅하는 것이 실제로 더 좋습니다.
너무 복잡한 라우팅이 필요하지 않기 때문에 팬 아웃 교환 이이를 잘 처리합니다. node-amqp에는 메시지를 직접 연결에 게시 할 수있는 ‘기본 교환’개념이 있기 때문에 Exchange에 너무 집중하지 않았지만 대부분의 AMQP 메시지는 특정 교환에 게시됩니다.
팬 아웃 교환은 다음과 같습니다. 송수신
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})
답변
rabbitmq 튜토리얼을 읽으십시오 . 대기열이 아닌 교환을 위해 메시지를 게시합니다. 그런 다음 적절한 대기열로 라우팅됩니다. 귀하의 경우 각 소비자에 대해 별도의 큐를 바인딩해야합니다. 이렇게하면 메시지를 완전히 독립적으로 사용할 수 있습니다.
답변
마지막 몇 가지 답변은 거의 정확합니다. 다른 소비자와 끝내야하는 프로세스를 매우 간단하게 해야하는 메시지를 생성하는 수많은 앱이 있습니다.
여러 소비자를 동일한 메시지에 표시하려면 다음 절차를 수행하십시오.
각 큐 특성에서 메시지를 수신 할 각 앱마다 하나씩 여러 개의 큐를 작성하여 amq.direct 교환과 라우팅 태그를 “바인드”하십시오. amq.direct로 보내도록 게시 앱을 변경하고 대기열이 아닌 라우팅 태그를 사용하십시오. 그런 다음 AMQP는 동일한 바인딩으로 각 큐에 메시지를 복사합니다. 매력처럼 작동합니다 🙂
예 : 생성 한 JSON 문자열이 있다고 가정하고 라우팅 태그 “new-sales-order”를 사용하여 “amq.direct”교환에 게시하고 주문을 인쇄하는 order_printer 앱에 대한 대기열이 있습니다. 주문 사본을 발송하고 고객에게 송장을 보내는 청구 시스템 대기열이 있고, 나는 과거 / 적합성 이유로 주문을 보관하는 웹 보관 시스템을 가지고 있으며 다른 정보가 들어 오면 주문을 추적하는 클라이언트 웹 인터페이스가 있습니다. 주문.
따라서 내 대기열은 order_printer, order_billing, order_archive 및 order_tracking입니다. 모두 바인딩 태그 “new-sales-order”가 바인딩되어 있으며 4 개 모두 JSON 데이터를 가져옵니다.
이는 게시 앱이 수신 앱을 알거나 신경 쓰지 않고 데이터를 보내는 이상적인 방법입니다.
답변
예, 각 소비자는 동일한 메시지를받을 수 있습니다. http://www.rabbitmq.com/tutorials/tutorial-three-python.html
http://www.rabbitmq.com/tutorials/tutorial-four-python.html
http : //www.rabbitmq를 살펴보십시오
. com / tutorials / tutorial-five-python.html
다양한 방법으로 메시지를 라우팅합니다. 나는 그들이 파이썬과 자바를위한 것이라는 것을 알고 있지만 원리를 이해하고 당신이 무엇을하고 있는지 결정한 다음 JS에서 그것을하는 방법을 찾는 것이 좋습니다. 간단한 팬 아웃 ( 자습서 3 ) 을 수행하려는 것처럼 들리며 , 이는 교환기에 연결된 모든 큐에 메시지를 보냅니다.
수행중인 작업과 수행하려는 작업의 차이점은 기본적으로 팬 아웃을 설정 및 교환하거나 입력한다는 것입니다. 팬 아웃 예외는 연결된 모든 대기열에 모든 메시지를 보냅니다. 각 대기열에는 모든 메시지에 개별적으로 액세스 할 수있는 소비자가 있습니다.
그렇습니다. 이것은 일반적으로 이루어지며 AMPQ의 기능 중 하나입니다.
답변
송신 패턴은 일대일 관계입니다. 둘 이상의 수신자에게 “보내기”하려면 pub / sub 패턴을 사용해야합니다. 자세한 내용은 http://www.rabbitmq.com/tutorials/tutorial-three-python.html 을 참조하십시오.
답변
RabbitMQ / AMQP : 단일 대기열, 동일한 메시지 및 페이지 새로 고침을위한 여러 소비자.
rabbit.on('ready', function () { });
sockjs_chat.on('connection', function (conn) {
conn.on('data', function (message) {
try {
var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));
if (obj.header == "register") {
// Connect to RabbitMQ
try {
conn.exchange = rabbit.exchange(exchange, { type: 'topic',
autoDelete: false,
durable: false,
exclusive: false,
confirm: true
});
conn.q = rabbit.queue('my-queue-'+obj.agentID, {
durable: false,
autoDelete: false,
exclusive: false
}, function () {
conn.channel = 'my-queue-'+obj.agentID;
conn.q.bind(conn.exchange, conn.channel);
conn.q.subscribe(function (message) {
console.log("[MSG] ---> " + JSON.stringify(message));
conn.write(JSON.stringify(message) + "\n");
}).addCallback(function(ok) {
ctag[conn.channel] = ok.consumerTag; });
});
} catch (err) {
console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
}
} else if (obj.header == "typing") {
var reply = {
type: 'chatMsg',
msg: utils.escp(obj.msga),
visitorNick: obj.channel,
customField1: '',
time: utils.getDateTime(),
channel: obj.channel
};
conn.exchange.publish('my-queue-'+obj.agentID, reply);
}
} catch (err) {
console.log("ERROR ----> " + err.stack);
}
});
// When the visitor closes or reloads a page we need to unbind from RabbitMQ?
conn.on('close', function () {
try {
// Close the socket
conn.close();
// Close RabbitMQ
conn.q.unsubscribe(ctag[conn.channel]);
} catch (er) {
console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
}
});
});
답변
원하는 동작을 얻으려면 각 소비자가 자체 대기열에서 소비하도록하십시오. 모든 대기열에 메시지를 한 번에 가져 오려면 비 직접 교환 유형 (주제, 헤더, 팬 아웃)을 사용해야합니다.