RabbitMQ / AMQP: queue unique, plusieurs consommateurs pour un même message?

Je commence tout juste à utiliser RabbitMQ et AMQP en général.

  • J’ai une file de messages
  • J’ai plusieurs consommateurs, et j’aimerais faire différentes choses avec le même message .

La plupart de la documentation de RabbitMQ semble être centrée sur le round-robin, c’est-à-dire où un seul consommateur consum un seul message, la charge étant répartie entre chaque consommateur. C’est bien le comportement dont je suis témoin.

Un exemple: le producteur dispose d’une seule queue et envoie des messages toutes les 2 secondes:

var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { var sendMessage = function(connection, queue_name, payload) { var encoded_payload = JSON.ssortingngify(payload); connection.publish(queue_name, encoded_payload); } setInterval( function() { var test_message = 'TEST '+count sendMessage(connection, "my_queue_name", test_message) count += 1; }, 2000) }) 

Et voici un consommateur:

 var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); connection.on('ready', function () { connection.queue("my_queue_name", function(queue){ queue.bind('#'); queue.subscribe(function (message) { var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) }) 

Si je lance le consommateur deux fois, je peux voir que chaque consommateur consum des messages alternatifs dans un comportement à tour de rôle. Par exemple, je verrai les messages 1, 3, 5 dans un terminal, 2, 4 et 6 dans l’autre .

Ma question est:

  • Puis-je demander à chaque consommateur de recevoir les mêmes messages? C’est-à-dire que les deux consommateurs reçoivent le message 1, 2, 3, 4, 5, 6? Qu’est-ce que cela s’appelle dans AMQP / RabbitMQ parle? Comment est-il normalement configuré?

  • Est-ce couramment fait? Dois-je simplement que le message achemine le message en deux files d’attente distinctes, avec un seul consommateur, à la place?

    Puis-je demander à chaque consommateur de recevoir les mêmes messages? C’est-à-dire que les deux consommateurs reçoivent le message 1, 2, 3, 4, 5, 6? Qu’est-ce que cela s’appelle dans AMQP / RabbitMQ parle? Comment est-il normalement configuré?

    Non, pas si les consommateurs sont dans la même queue. Extrait du guide Concepts AMQP de RabbitMQ:

    il est important de comprendre que, dans l’AMQP 0-9-1, les messages sont équilibrés entre les consommateurs.

    Cela semble impliquer que le comportement de round-robin dans une queue est un comportement donné et non configurable. C’est-à-dire que des files d’attente distinctes sont requirejses pour que le même identifiant de message soit traité par plusieurs consommateurs.

    Est-ce couramment fait? Dois-je simplement que le message achemine le message en deux files d’attente distinctes, avec un seul consommateur, à la place?

    Non, ce n’est pas possible, une queue unique / plusieurs consommateurs, chaque consommateur traitant le même ID de message. Avoir la route d’échange du message sur deux files d’attente distinctes est en effet préférable.

    Comme je n’ai pas besoin d’un routage trop complexe, un échange de fanout se passera bien. Je ne me suis pas trop concentré sur les échanges, car node-amqp a le concept d’un «échange par défaut» qui vous permet de publier des messages directement sur une connexion. Cependant, la plupart des messages AMQP sont publiés dans un échange spécifique.

    Voici mon échange de fanout, à la fois envoyer et recevoir:

     var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) { var sendMessage = function(exchange, payload) { console.log('about to publish') var encoded_payload = JSON.ssortingngify(payload); exchange.publish('', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log('Created queue') queue.bind(exchange, ''); queue.subscribe(function (message) { console.log('subscribed to queue') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) setInterval( function() { var test_message = 'TEST '+count sendMessage(exchange, test_message) count += 1; }, 2000) }) }) 

    Il suffit de lire le tutoriel rabbitmq . Vous publiez un message à échanger, pas à faire la queue; il est ensuite acheminé vers les files d’attente appropriées. Dans votre cas, vous devez lier une queue distincte pour chaque consommateur. De cette façon, ils peuvent consumr des messages de manière totalement indépendante.

    Les deux dernières réponses sont presque correctes: j’ai des tonnes d’applications qui génèrent des messages qui doivent aboutir à différents consommateurs. Le processus est donc très simple.

    Si vous souhaitez plusieurs consommateurs pour le même message, procédez comme suit.

    Créez plusieurs files d’attente, une pour chaque application devant recevoir le message, dans chaque propriété de queue, “liez” une balise de routage avec l’échange amq.direct. Modifiez votre application de publication pour l’envoyer à amq.direct et utilisez la balise de routage (pas une queue). AMQP copiera ensuite le message dans chaque queue avec la même liaison. Fonctionne liek 🙂

    Exemple: Disons que j’ai une chaîne JSON que je génère, que je la publie dans l’échange “amq.direct” en utilisant la balise de routage “new-sales-order”, j’ai une queue pour mon application order_printer qui imprime la commande queue pour mon système de facturation qui enverra une copie de la commande et facturera le client et j’ai un système d’archive Web où j’archive les commandes pour des raisons historiques / de conformité et j’ai une interface Web client où les commandes sont suivies une commande.

    Donc, mes files d’attente sont: order_printer, order_billing, order_archive et order_tracking Tous ont la balise de liaison “new-sales-order” liée à eux, tous les 4 obtiendront les données JSON.

    C’est un moyen idéal d’envoyer des données sans que l’application de publication ne connaisse ou ne se soucie des applications de réception.

    Le modèle d’envoi est une relation un à un. Si vous voulez “envoyer” à plusieurs récepteurs, vous devriez utiliser le modèle pub / sub. Voir http://www.rabbitmq.com/tutorials/tutorial-three-python.html pour plus de détails.

    Oui, chaque consommateur peut recevoir les mêmes messages. voir http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http: //www.rabbitmq. com / tutoriels / tutorial-five-python.html

    pour différentes manières d’acheminer des messages. Je sais qu’ils sont pour python et java mais c’est bien de comprendre les principes, de décider ce que vous faites et ensuite de trouver comment le faire dans JS. Cela sonne comme si vous vouliez faire un simple fanout ( tutoriel 3 ), qui envoie les messages à toutes les files d’attente connectées au central.

    La différence avec ce que vous faites et ce que vous voulez faire est fondamentalement que vous allez configurer et échanger ou taper fanout. Les sorties de fanout envoient tous les messages à toutes les files d’attente connectées. Chaque file aura un consommateur qui aura access à tous les messages séparément.

    Oui, cela se fait couramment, c’est l’une des fonctionnalités de l’AMPQ.

    RabbitMQ / AMQP: queue unique, plusieurs consommateurs pour un même message et actualisation de la page.

     rabbit.on('ready', function () { }); sockjs_chat.on('connection', function (conn) { conn.on('data', function (message) { try { var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, '')); if (obj.header == "register") { // Connect to RabbitMQ try { conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); conn.q = rabbit.queue('my-queue-'+obj.agentID, { durable: false, autoDelete: false, exclusive: false }, function () { conn.channel = 'my-queue-'+obj.agentID; conn.q.bind(conn.exchange, conn.channel); conn.q.subscribe(function (message) { console.log("[MSG] ---> " + JSON.ssortingngify(message)); conn.write(JSON.ssortingngify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } catch (err) { console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (obj.header == "typing") { var reply = { type: 'chatMsg', msg: utils.escp(obj.msga), visitorNick: obj.channel, customField1: '', time: utils.getDateTime(), channel: obj.channel }; conn.exchange.publish('my-queue-'+obj.agentID, reply); } } catch (err) { console.log("ERROR ----> " + err.stack); } }); // When the visitor closes or reloads a page we need to unbind from RabbitMQ? conn.on('close', function () { try { // Close the socket conn.close(); // Close RabbitMQ conn.q.unsubscribe(ctag[conn.channel]); } catch (er) { console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack); } }); }); 

    Pour obtenir le comportement souhaité, il suffit que chaque consommateur consum dans sa propre queue. Vous devrez utiliser un type d’échange non direct (sujet, en-tête, fanout) pour envoyer le message à toutes les files d’attente à la fois.

    Comme j’évalue votre cas est:

    • J’ai une queue de messages (votre source pour recevoir des messages, appelons-la q111)

    • J’ai plusieurs consommateurs, et j’aimerais faire différentes choses avec le même message.

    Votre problème ici est que pendant que 3 messages sont reçus par cette queue, le message 1 est consommé par un consommateur A, les autres consommateurs B et C consumnt les messages 2 et 3. Où vous avez besoin d’une configuration où rabbitmq transmet les mêmes copies de tous ces trois messages (1,2,3) aux trois consommateurs connectés (A, B, C) simultanément.

    Bien que de nombreuses configurations puissent être faites pour y parvenir, une méthode simple consiste à utiliser le concept en deux étapes suivant:

    • Utilisez un rabbitmq-shovel dynamic pour récupérer les messages de la queue souhaitée (q111) et les publier dans un échange de fanout (échange exclusivement créé et dédié à cet effet).
    • Reconfigurez maintenant vos consommateurs A, B et C (qui écoutaient la queue (q111)) pour écouter directement cet échange Fanout en utilisant une queue exclusive et anonyme pour chaque consommateur.

    Remarque: Lorsque vous utilisez ce concept, ne consumz pas directement depuis la queue source (q111), car les messages déjà consommés ne seront pas transférés à votre échange Fanout.

    Si vous pensez que cela ne répond pas à vos exigences exactes, n’hésitez pas à poster vos suggestions 🙂

    Si vous utilisez la bibliothèque amqplib telle qu’elle est, ils ont un exemple pratique d’implémentation du didacticiel Publier / S’abonner RabbitMQ, que vous pourriez trouver utile.

    Je pense que vous devriez vérifier l’envoi de vos messages en utilisant l’échangeur de sortance. De cette façon, vous recevrez le même message pour différents consommateurs, sous la table RabbitMQ crée différentes files d’attente pour chacun de ces nouveaux consommateurs / abonnés.

    Ceci est le lien pour voir l’exemple de tutoriel en javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html