RabbitMQ par exemple: plusieurs threads, canaux et files d’attente

Je viens de lire les docs de l’API Java de RabbitMQ et je l’ai trouvé très informatif et direct. L’exemple de configuration d’un Channel simple pour la publication / consommation est très simple à suivre et à comprendre. Mais c’est un exemple très simple / basique, et cela m’a laissé une question importante: comment puis-je configurer 1 Channels pour publier / consumr depuis et vers plusieurs files d’attente?

Disons que j’ai un serveur RabbitMQ avec 3 files d’attente: logging , security_events et customer_orders . Nous aurions donc besoin d’un seul Channel pour pouvoir publier / consumr sur les 3 files d’attente, ou plus probablement 3 Channels distincts, chacun dédié à une seule queue.

De plus, les meilleures pratiques de RabbitMQ exigent que nous configurions 1 Channel par fil de consommation. Pour cet exemple, disons que security_events convient à un seul thread consommateur, mais que la logging et customer_order ont besoin de 5 threads pour gérer le volume. Donc, si je comprends bien, cela signifie-t-il que nous avons besoin de:

  • 1 Channel et 1 thread consommateur pour la publication / consommation vers et depuis security_events ; et
  • 5 Channels et 5 threads grand public pour la publication / consommation vers et depuis la logging ; et
  • 5 Channels et 5 threads consommateurs pour la publication / consommation vers et depuis customer_orders ?

Si ma compréhension est erronée ici, veuillez commencer par me corriger. Quoi qu’il en soit, des vétérans de RabbitMQ, épuisés par la bataille, pourraient-ils m’aider à «relier les points» avec un exemple de code décent pour configurer des éditeurs / consommateurs répondant à mes exigences? Merci d’avance!

Je pense que vous avez plusieurs problèmes avec la compréhension initiale. Franchement, je suis un peu surpris de voir ce qui suit: les both need 5 threads to handle the volume . Comment avez-vous identifié que vous avez besoin de ce nombre exact? Avez-vous des garanties 5 threads suffiront?

RabbitMQ est mis au point et testé dans le temps, il est donc question de conception correcte et de traitement efficace des messages.

Essayons de revoir le problème et de trouver une solution appropriée. BTW, la queue de messages elle-même ne fournira aucune garantie que vous avez vraiment une bonne solution. Vous devez comprendre ce que vous faites et faire des tests supplémentaires.

Comme vous le savez certainement, de nombreuses configurations sont possibles:

entrer la description de l'image ici

Je vais utiliser la mise en page B comme le moyen le plus simple d’illustrer le problème des 1 producteurs. Puisque vous êtes si inquiet au sujet du débit. BTW, comme vous pouvez vous y attendre, RabbitMQ se comporte assez bien ( source ). Faites attention à prefetchCount , je le traiterai plus tard:

entrer la description de l'image ici

Il est donc probable que la logique de traitement des messages soit un bon endroit pour vous assurer un débit suffisant. Naturellement, vous pouvez étendre un nouveau fil chaque fois que vous avez besoin de traiter un message, mais cette approche finira par tuer votre système. En gros, plus vous aurez de temps de latence, plus vous aurez de temps de latence (vous pouvez vérifier la loi d’Amdahl si vous voulez).

entrer la description de l'image ici

(voir la loi d’Amdahl illustrée )

Astuce # 1: Soyez prudent avec les threads, utilisez ThreadPools ( détails )

Un pool de threads peut être décrit comme une collection d’objects Runnable (queue de travail) et une connexion de threads en cours d’exécution. Ces threads sont constamment en cours d’exécution et vérifient la requête de travail pour de nouveaux travaux. S’il y a du nouveau travail à faire, ils exécutent ce Runnable. La classe Thread fournit elle-même une méthode, par exemple execute (Runnable r) pour append un nouvel object Runnable à la queue de travail.

 public class Main { private static final int NTHREDS = 10; public static void main(Ssortingng[] args) { ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); for (int i = 0; i < 500; i++) { Runnable worker = new MyRunnable(10000000L + i); executor.execute(worker); } // This will make the executor accept no new threads // and finish all existing threads in the queue executor.shutdown(); // Wait until all threads are finish executor.awaitTermination(); System.out.println("Finished all threads"); } } 

Conseil n ° 2: Soyez prudent avec le traitement des messages

Je dirais que c'est une technique d'optimisation évidente. Il est probable que vous enverrez de petits messages faciles à traiter. L'approche globale concerne des messages plus petits à définir et à traiter en continu. Les gros messages finiront par jouer une mauvaise blague, il est donc préférable d'éviter cela.

entrer la description de l'image ici

Il est donc préférable d'envoyer de minuscules informations, mais qu'en est-il du traitement? Il y a un surcoût à chaque fois que vous soumettez un travail. Le traitement par lots peut être très utile en cas de taux élevé de messages entrants.

entrer la description de l'image ici

Par exemple, supposons que nous ayons une simple logique de traitement des messages et que nous ne souhaitons pas avoir de surcharge spécifique à chaque thread lors du traitement du message. Pour optimiser ce très simple CompositeRunnable can be introduced :

 class CompositeRunnable implements Runnable { protected Queue queue = new LinkedList<>(); public void add(Runnable a) { queue.add(a); } @Override public void run() { for(Runnable r: queue) { r.run(); } } } 

Ou faites la même chose d'une manière légèrement différente, en collectant les messages à traiter:

 class CompositeMessageWorker implements Runnable { protected Queue queue = new LinkedList<>(); public void add(T message) { queue.add(message); } @Override public void run() { for(T message: queue) { // process a message } } } 

De cette façon, vous pouvez traiter les messages plus efficacement.

Conseil n ° 3: Optimiser le traitement des messages

Bien que vous sachiez pouvoir traiter les messages en parallèle ( Tip #1 ) et réduire les frais de traitement ( Tip #2 ), vous devez tout faire rapidement. Les étapes de traitement redondantes, les boucles lourdes, etc. peuvent affecter les performances. Veuillez voir une étude de cas intéressante:

entrer la description de l'image ici

Amélioration du débit de la queue de messages en choisissant le bon parsingur XML

Conseil n ° 4: Gestion des connexions et des canaux

  • Le lancement d'un nouveau canal sur une connexion existante implique un aller-retour sur le réseau.
  • Chaque connexion utilise un descripteur de fichier sur le serveur. Les canaux ne le font pas.
  • La publication d'un message volumineux sur un canal bloque une connexion pendant son extinction. A part cela, le multiplexage est assez transparent.
  • Les connexions en cours de publication peuvent être bloquées si le serveur est surchargé - il est conseillé de séparer la publication et la consommation de connexions
  • Soyez prêt à gérer les rafales de messages

( source )

Veuillez noter que tous les conseils fonctionnent parfaitement ensemble. N'hésitez pas à me faire savoir si vous avez besoin de détails supplémentaires.

Exemple de consommateur complet ( source )

Veuillez noter les points suivants:

  • channel.basicQos (prefetch) - Comme vous l'avez vu précédemment, prefetchCount peut être très utile:

    Cette commande permet au consommateur de choisir une fenêtre de lecture anticipée indiquant la quantité de messages non acquittés qu'il est prêt à recevoir. En définissant le nombre de prélèvements sur une valeur différente de zéro, le courtier ne livrera aucun message susceptible de violer cette limite. Pour faire avancer la fenêtre, le consommateur doit accuser réception d'un message (ou d'un groupe de messages).

  • ExecutorService threadExecutor - vous pouvez spécifier un service exécuteur correctement configuré.

Exemple:

 static class Worker extends DefaultConsumer { Ssortingng name; Channel channel; Ssortingng queue; int processed; ExecutorService executorService; public Worker(int prefetch, ExecutorService threadExecutor, , Channel c, Ssortingng q) throws Exception { super(c); channel = c; queue = q; channel.basicQos(prefetch); channel.basicConsume(queue, false, this); executorService = threadExecutor; } @Override public void handleDelivery(Ssortingng consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Runnable task = new VariableLengthTask(this, envelope.getDeliveryTag(), channel); executorService.submit(task); } } 

Vous pouvez également vérifier les éléments suivants:

  • Architecture de solution à l'aide de files d'attente?
  • Quelques théories sur les files d'attente: débit, latence et bande passante
  • Un benchmark de queue rapide: ActiveMQ, RabbitMQ, HornetQ, QPID, Apollo…

Comment puis-je configurer 1+ canaux pour publier / consumr vers et depuis plusieurs files d’attente?

Vous pouvez implémenter en utilisant des threads et des canaux. Tout ce dont vous avez besoin est un moyen de classer les choses, c.-à-d. Tous les éléments de la queue de la connexion, tous les éléments de la queue de security_events, etc.

ie: Chaque fois que vous ajoutez un élément à la queue, indiquez la clé de routage. Il sera ajouté comme élément de propriété. Vous pouvez ainsi obtenir les valeurs d’un événement particulier, par exemple la journalisation .

L’exemple de code suivant explique comment vous le faites du côté client.

Par exemple:

La clé de routage est utilisée pour identifier le type de canal et retracer les types.

Par exemple, si vous devez obtenir tous les canaux relatifs au type Login, vous devez spécifier la clé de routage en tant que login ou un autre mot-clé pour l’identifier.

  Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); ssortingng routingKey="login"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); 

Vous pouvez regarder ici pour plus de détails sur la catégorisation.


Fil de pièce

Une fois la publication terminée, vous pouvez lancer la partie thread.

Dans cette partie, vous pouvez obtenir les données publiées sur la base de la catégorie. c’est à dire; Clé de routage qui dans votre cas est la journalisation, security_events et customer_orders etc.

regardez dans l’exemple pour savoir comment récupérer les données dans les threads.

Par exemple :

  ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //**The threads part is as follows** channel.exchangeDeclare(EXCHANGE_NAME, "direct"); Ssortingng queueName = channel.queueDeclare().getQueue(); // This part will biend the queue with the severity (login for eg:) for(Ssortingng severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(Ssortingng consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Ssortingng routingKey = envelope.getRoutingKey(); Ssortingng contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } }); 

Maintenant, un thread qui traite les données dans la queue du type login (clé de routage) est créé. De cette façon, vous pouvez créer plusieurs threads. Chacun servant un but différent.

regardez ici pour plus de détails sur la partie threads ..

Pourquoi tout mettre en œuvre par vous-même?

Essayez d’utiliser une sorte de cadre d’intégration. Disons que Camel a déjà un tas de connecteurs vers divers systèmes, Rabbit MQ inclut Camel Rabbit MQ .

Vous devez simplement définir vos itinéraires. Par exemple:

Vous souhaitez consumr des messages de la queue de journalisation par 5 clients simultanés dans un fichier.

 from("rabbitmq://localhost/Logging ?concurrentConsumers=5") .to("file://yourLoggingFile") 

Il existe de nombreuses options pour définir le consommateur de fichiers. Comme vous pouvez le voir, vous pouvez définir combien de consommateurs devraient apparaître juste en mettant concurrentConsumers=5 dans votre URI. Si vous le souhaitez, vous pouvez créer votre propre producteur ou consommateur en implémentant une interface de processeur.

Il est très polyvalent et puissant, vous pouvez faire beaucoup de travail simplement en utilisant les composants fournis. Le site Web du projet contient des tas d’exemples et de documentation.