[node.js] RabbitMQ / AMQP : 단일 대기열, 동일한 메시지에 대한 여러 소비자?

나는 RabbitMQ와 AMQP를 일반적으로 사용하기 시작했습니다.

  • 메시지 대기열이 있습니다
  • 여러 소비자가 있는데 동일한 메시지로 다른 일을하고 싶습니다 .

RabbitMQ 문서의 대부분은 라운드 로빈에 중점을 둔 것으로 보입니다. 즉, 단일 메시지가 단일 소비자에 의해 소비되고 각 소비자간에로드가 분산되는 경우입니다. 이것은 실제로 내가 목격하는 행동입니다.

예 : 생산자에게 단일 대기열이 있고 2 초마다 메시지를 보냅니다.

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

그리고 여기 소비자가 있습니다 :

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

소비자를 두 번 시작하면 각 소비자가 라운드 로빈 동작으로 대체 메시지를 소비하고 있음을 알 수 있습니다. 예를 들어, 한 터미널에는 메시지 1, 3, 5, 다른 터미널에는 2, 4, 6 메시지가 표시됩니다 .

내 질문은 :

  • 각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?

  • 이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?



답변

각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?

소비자가 동일한 대기열에있는 경우 아닙니다. RabbitMQ의 AMQP 개념 안내서에서 :

AMQP 0-9-1에서 메시지는 소비자간에로드 균형 조정됨을 이해해야합니다.

이것은 큐 내의 라운드 로빈 동작이 지정 되어 있으며 구성 할 수 없음을 의미합니다. 즉, 여러 소비자가 동일한 메시지 ID를 처리하려면 별도의 큐가 필요합니다.

이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?

아니요, 각 소비자가 동일한 메시지 ID를 처리하는 단일 대기열 / 여러 소비자는 불가능합니다. 교환기가 메시지를 두 개의 개별 대기열로 라우팅하는 것이 실제로 더 좋습니다.

너무 복잡한 라우팅이 필요하지 않기 때문에 팬 아웃 교환 이이를 잘 처리합니다. node-amqp에는 메시지를 직접 연결에 게시 할 수있는 ‘기본 교환’개념이 있기 때문에 Exchange에 너무 집중하지 않았지만 대부분의 AMQP 메시지는 특정 교환에 게시됩니다.

팬 아웃 교환은 다음과 같습니다. 송수신

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})


답변

rabbitmq 튜토리얼을 읽으십시오 . 대기열이 아닌 교환을 위해 메시지를 게시합니다. 그런 다음 적절한 대기열로 라우팅됩니다. 귀하의 경우 각 소비자에 대해 별도의 큐를 바인딩해야합니다. 이렇게하면 메시지를 완전히 독립적으로 사용할 수 있습니다.


답변

마지막 몇 가지 답변은 거의 정확합니다. 다른 소비자와 끝내야하는 프로세스를 매우 간단하게 해야하는 메시지를 생성하는 수많은 앱이 있습니다.

여러 소비자를 동일한 메시지에 표시하려면 다음 절차를 수행하십시오.

각 큐 특성에서 메시지를 수신 할 각 앱마다 하나씩 여러 개의 큐를 작성하여 amq.direct 교환과 라우팅 태그를 “바인드”하십시오. amq.direct로 보내도록 게시 앱을 변경하고 대기열이 아닌 라우팅 태그를 사용하십시오. 그런 다음 AMQP는 동일한 바인딩으로 각 큐에 메시지를 복사합니다. 매력처럼 작동합니다 🙂

예 : 생성 한 JSON 문자열이 있다고 가정하고 라우팅 태그 “new-sales-order”를 사용하여 “amq.direct”교환에 게시하고 주문을 인쇄하는 order_printer 앱에 대한 대기열이 있습니다. 주문 사본을 발송하고 고객에게 송장을 보내는 청구 시스템 대기열이 있고, 나는 과거 / 적합성 이유로 주문을 보관하는 웹 보관 시스템을 가지고 있으며 다른 정보가 들어 오면 주문을 추적하는 클라이언트 웹 인터페이스가 있습니다. 주문.

따라서 내 대기열은 order_printer, order_billing, order_archive 및 order_tracking입니다. 모두 바인딩 태그 “new-sales-order”가 바인딩되어 있으며 4 개 모두 JSON 데이터를 가져옵니다.

이는 게시 앱이 수신 앱을 알거나 신경 쓰지 않고 데이터를 보내는 이상적인 방법입니다.


답변

예, 각 소비자는 동일한 메시지를받을 수 있습니다. http://www.rabbitmq.com/tutorials/tutorial-three-python.html
http://www.rabbitmq.com/tutorials/tutorial-four-python.html
http : //www.rabbitmq를 살펴보십시오
. com / tutorials / tutorial-five-python.html

다양한 방법으로 메시지를 라우팅합니다. 나는 그들이 파이썬과 자바를위한 것이라는 것을 알고 있지만 원리를 이해하고 당신이 무엇을하고 있는지 결정한 다음 JS에서 그것을하는 방법을 찾는 것이 좋습니다. 간단한 팬 아웃 ( 자습서 3 ) 을 수행하려는 것처럼 들리며 , 이는 교환기에 연결된 모든 큐에 메시지를 보냅니다.

수행중인 작업과 수행하려는 작업의 차이점은 기본적으로 팬 아웃을 설정 및 교환하거나 입력한다는 것입니다. 팬 아웃 예외는 연결된 모든 대기열에 모든 메시지를 보냅니다. 각 대기열에는 모든 메시지에 개별적으로 액세스 할 수있는 소비자가 있습니다.

그렇습니다. 이것은 일반적으로 이루어지며 AMPQ의 기능 중 하나입니다.


답변

송신 패턴은 일대일 관계입니다. 둘 이상의 수신자에게 “보내기”하려면 pub / sub 패턴을 사용해야합니다. 자세한 내용은 http://www.rabbitmq.com/tutorials/tutorial-three-python.html 을 참조하십시오.


답변

RabbitMQ / AMQP : 단일 대기열, 동일한 메시지 및 페이지 새로 고침을위한 여러 소비자.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });


답변

원하는 동작을 얻으려면 각 소비자가 자체 대기열에서 소비하도록하십시오. 모든 대기열에 메시지를 한 번에 가져 오려면 비 직접 교환 유형 (주제, 헤더, 팬 아웃)을 사용해야합니다.