RabbitMQ: message persistant avec échange de sujet

Je suis très nouveau sur RabbitMQ.

J’ai mis en place un échange de sujet. Les consommateurs peuvent être lancés après l’éditeur. J’aimerais que les consommateurs puissent recevoir les messages qui ont été envoyés avant qu’ils ne soient disponibles, et qui n’ont pas encore été consommés.

L’échange est configuré avec les parameters suivants:

exchange_type => 'topic' durable => 1 auto_delete => 0 passive => 0 

Les messages sont publiés avec ce paramètre:

 delivery_mode => 2 

Les consommateurs utilisent get () pour récupérer les messages de l’échange.

Malheureusement, tout message publié avant la mise en place d’un client est perdu. J’ai utilisé différentes combinaisons.

Je suppose que mon problème est que l’échange ne contient pas de messages. J’ai peut-être besoin d’une queue entre l’éditeur et la queue. Mais cela ne semble pas fonctionner avec un échange de «sujet» où les messages sont routés par une clé.

Toute idée de la manière dont je devrais procéder. J’utilise la liaison Perl Net :: RabbitMQ (ne devrait pas avoir d’importance) et RabbitMQ 2.2.0.

Vous devez disposer d’une queue durable pour stocker les messages si aucun consommateur connecté n’est disponible pour traiter les messages au moment de leur publication.

Un échange ne stocke pas les messages, mais une queue peut. La partie confuse est que les échanges peuvent être marqués comme “durables” mais tout ce que cela signifie vraiment, c’est que l’ échange luimême sera toujours là si vous redémarrez votre courtier, mais cela ne signifie pas que tous les messages envoyés à cet échange sont automatiquement conservés.

Cela étant, voici deux options:

  1. Effectuez une étape administrative avant de démarrer vos éditeurs pour créer vous-même la ou les files d’attente. Pour ce faire, vous pouvez utiliser l’interface Web ou les outils de ligne de commande. Assurez-vous de le créer en tant que queue durable afin qu’il stocke tous les messages qui lui sont acheminés, même s’il n’y a pas de consommateurs actifs.
  2. En supposant que vos consommateurs sont codés pour toujours déclarer (et donc créer automatiquement) leurs échanges et leurs files d’attente au démarrage (et qu’ils les déclarent durables), lancez tous vos consommateurs au moins une fois avant de lancer des éditeurs. Cela garantira que toutes vos files d’attente seront créées correctement. Vous pouvez ensuite arrêter les consommateurs jusqu’à ce qu’ils soient réellement nécessaires, car les files d’attente stockeront de manière persistante les futurs messages qui leur seront acheminés.

Je voudrais aller pour # 1. Il n’y a peut-être pas beaucoup d’étapes à effectuer et vous pouvez toujours écrire les étapes nécessaires pour pouvoir les répéter. De plus, si tous vos clients vont sortir de la même queue unique (plutôt que d’avoir une queue dédiée chacun), c’est vraiment une tâche administrative minimale.

Les files d’attente doivent être gérées et contrôlées correctement. Sinon, vous pourriez vous retrouver avec des consommateurs malhonnêtes déclarant des files d’attente durables, les utilisant pendant quelques minutes, mais jamais plus. Peu de temps après, vous aurez une queue en croissance constante, sans réduction de taille, et une apocalypse imminente du courtier.

Comme mentionné par Brian, un échange ne stocke pas de messages et est principalement responsable de l’acheminement des messages vers un autre commutateur ou une autre queue. Si l’échange n’est pas lié à une queue, tous les messages envoyés à cet échange seront «perdus».

Vous ne devriez pas avoir besoin de déclarer des files d’attente de clients fixes dans le script de l’éditeur, car cela pourrait ne pas être évolutif. Les files d’attente peuvent être créées dynamicment par vos éditeurs et acheminées en interne à l’aide d’une liaison d’échange à échange.

RabbitMQ prend en charge les liaisons d’échange à échange qui permettront une flexibilité de la topologie, un découplage et d’autres avantages. Vous pouvez en savoir plus ici à RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange pour échanger la liaison

Exemple de topologie

Exemple de code Python pour créer une liaison d’échange avec une persistance si aucun consommateur n’est présent à l’aide de la queue.

 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well channel.exchange_declare(exchange='data_gateway', exchange_type='fanout', durable=True, auto_delete=False) #Declares the processing exchange to be used.Routes messages to various queues. For internal use only channel.exchange_declare(exchange='data_dissortingbutor', exchange_type='topic', durable=True, auto_delete=False) #Binds the external/producer facing exchange to the internal exchange channel.exchange_bind(destination='data_dissortingbutor',source='data_gateway') ##Create Durable Queues binded to the data_dissortingbutor exchange channel.queue_declare(queue='trade_db',durable=True) channel.queue_declare(queue='trade_stream_service',durable=True) channel.queue_declare(queue='ticker_db',durable=True) channel.queue_declare(queue='ticker_stream_service',durable=True) channel.queue_declare(queue='orderbook_db',durable=True) channel.queue_declare(queue='orderbook_stream_service',durable=True) #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present channel.queue_bind(queue='orderbook_db',exchange='data_dissortingbutor',routing_key='*.*.orderbook') channel.queue_bind(queue='orderbook_stream_service',exchange='data_dissortingbutor',routing_key='*.*.orderbook') channel.queue_bind(queue='ticker_db',exchange='data_dissortingbutor',routing_key='*.*.ticker') channel.queue_bind(queue='ticker_stream_service',exchange='data_dissortingbutor',routing_key='*.*.ticker') channel.queue_bind(queue='trade_db',exchange='data_dissortingbutor',routing_key='*.*.trade') channel.queue_bind(queue='trade_stream_service',exchange='data_dissortingbutor',routing_key='*.*.trade')