Sujet Purger Kafka

J’ai poussé un message trop gros dans un sujet de message kafka sur ma machine locale, maintenant je reçois une erreur:

kafka.common.InvalidMessageSizeException: invalid message size 

Augmenter la fetch.size n’est pas idéal ici, car je ne veux pas accepter de messages aussi gros. Y a-t-il un moyen de purger le sujet dans kafka?

Mettez temporairement à jour le temps de rétention sur le sujet:

 kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000 

puis attendez que la purge prenne effet (environ une minute). Une fois purgé, restaurez la valeur précédente de retention.ms .

Voici les étapes à suivre pour supprimer un sujet nommé MyTopic :

  1. Arrêtez le démon Apache Kafka
  2. Supprimez le dossier de données de rubrique: rm -rf /tmp/kafka-logs/MyTopic-0
  3. Supprimez les métadonnées du sujet: zkCli.sh then rmr /brokers/MyTopic
  4. Démarrez le démon Apache Kafka

Si vous manquez l’étape 3, Apache Kafka continuera à signaler le sujet comme présent (par exemple, si vous exécutez kafka-list-topic.sh ).

Testé avec Apache Kafka 0.8.0.

Pour purger la queue, vous pouvez supprimer le sujet:

 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test 

puis recréez-le:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic test 

Bien que la réponse acceptée soit correcte, cette méthode est obsolète. La configuration du sujet devrait maintenant se faire via kafka-configs .

 kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic 

Les configurations définies via cette méthode peuvent être affichées avec la commande

 kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic 

Testé dans Kafka 0.8.2, pour l’exemple de démarrage rapide: D’abord, ajoutez une ligne au fichier server.properties sous le dossier config:

 delete.topic.enable=true 

vous pouvez alors exécuter cette commande:

 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test 

Oui, arrêtez kafka et supprimez manuellement tous les fichiers du sous-répertoire correspondant (il est facile de le trouver dans le répertoire de données de kafka). Après le redémarrage de kafka, le sujet sera vide.

kafka n’a pas de méthode directe pour le sujet de purge / nettoyage (files d’attente), mais peut le faire en supprimant ce sujet et en le recréant.

Tout d’abord, assurez-vous que le fichier sever.properties a et sinon ajoutez delete.topic.enable=true

ensuite, supprimer le sujet bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

puis créez-le à nouveau.

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2 

Parfois, si vous avez un cluster saturé (trop de partitions, utilisez des données de sujet chiffrées, ou utilisez SSL, ou si le contrôleur est sur un nœud défectueux, ou que la connexion est instable, cela prendra beaucoup de temps pour purger ce sujet). .

Je suis ces étapes, particulièrement si vous utilisez Avro.

1: Exécuter avec les outils kafka:

 bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name  

2: nœud de registre Run on Schema:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic --new-consumer --from-beginning

3: Rétablissez la rétention de sujet au paramètre d’origine, une fois le sujet vide.

 bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name  

J’espère que cela aide quelqu’un, car il n’est pas facilement annoncé.

L’approche la plus simple consiste à définir la date des fichiers journaux individuels pour qu’ils soient plus anciens que la période de rétention. Ensuite, le courtier doit les nettoyer et les supprimer en quelques secondes. Cela offre plusieurs avantages:

  1. Pas besoin de faire tomber les courtiers, c’est une opération d’exécution.
  2. Évite la possibilité d’exceptions compensatoires non valides (plus d’informations à ce sujet ci-dessous).

D’après mon expérience de Kafka 0.7.x, la suppression des fichiers journaux et le redémarrage du courtier pourraient entraîner des exceptions de correction non valides pour certains consommateurs. Cela se produirait car le courtier redémarre les décalages à zéro (en l’absence de fichiers journaux existants) et un consommateur qui consommait auparavant du sujet se reconnecterait pour demander un décalage [une fois valide] spécifique. Si ce décalage se situe en dehors des limites des nouveaux journaux de sujets, alors le consommateur et le consommateur recommencent au début ou à la fin. Toutefois, si le décalage se situe dans les limites des nouveaux journaux de sujet, le courtier tente d’extraire le jeu de messages mais échoue car le décalage ne s’aligne pas sur un message réel.

Cela pourrait être atténué en effaçant également les compensations des consommateurs dans zookeeper pour ce sujet. Mais si vous n’avez pas besoin d’un sujet vierge et que vous souhaitez simplement supprimer le contenu existant, il vous suffit de toucher quelques journaux de sujet pour arrêter les courtiers, supprimer les journaux de sujets et effacer certains noeuds zookeeper. .

Le conseil de Thomas est génial mais malheureusement zkCli dans les anciennes versions de Zookeeper (par exemple 3.3.6) ne semble pas supporter rmr . Par exemple, comparez l’implémentation de ligne de commande dans Zookeeper moderne avec la version 3.3 .

Si vous êtes confronté à une ancienne version de Zookeeper, une solution consiste à utiliser une bibliothèque client telle que zc.zk for Python. Pour ceux qui ne sont pas familiers avec Python, vous devez l’installer avec pip ou easy_install . Ensuite, lancez un shell Python ( python ) et vous pouvez faire:

 import zc.zk zk = zc.zk.ZooKeeper('localhost:2181') zk.delete_recursive('brokers/MyTopic') 

ou même

 zk.delete_recursive('brokers') 

si vous voulez supprimer tous les sujets de Kafka.

Pour nettoyer tous les messages d’un sujet particulier en utilisant votre groupe d’applications (GroupName doit être identique au nom du groupe d’application kafka).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

Impossible d’append un commentaire à cause de la taille: Je ne suis pas sûr que cela soit vrai, mis à part la mise à jour retention.ms et retention.bytes, mais j’ai remarqué que la politique de nettoyage des sujets devait être “par défaut”. conserver les messages plus longtemps, c’est-à-dire que s’il est “compact”, vous devez également spécifier delete.retention.ms .

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Aussi devait surveiller les premiers / derniers décalages devrait être la même pour confirmer que cela s’est bien passé, peut également vérifier le du -h / tmp / kafka-logs / test-topic-3-100- *

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

L’autre problème est que vous devez d’abord obtenir la configuration actuelle pour que vous vous ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics de revenir après la suppression: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics