🎉

Kafka Cliの基本的な使い方(サンプル付き)

2022/07/05に公開

検証環境

対象 Version 備考
OS Ubuntu 20.04.4 LTS(WSL2)
JDK Temurin-17.0.3+7
Kafka 6.2.4-ccs ローカル開発用のKafka環境構築

インストール

  1. JDK

    jdk 1.8以降のバージョンは基本的に問題ありません。OSに合わせてインストールしてください。

  2. Kafka Cli

    wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
    tar -xzf kafka_2.12-2.6.2.tgz
    

基本操作

export ZOOKEEPER_CONNECT_STRING=localhost:2181
export BOOTSTRAP_SERVER_CONNECT_STRING=localhost:9092

Topic

作成

$ bin/kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
Created topic mytopic.

zookeeperやbrokerのどちらを指定すれば操作できます。高可用性を考慮してzookeeperの接続情報を指定しましょう。

  1. zookeeperに接続

    export ZOOKEEPER_CONNECT_STRING=localhost:2181
    bin/kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
    
  2. brokerに接続

    export BOOTSTRAP_SERVER_CONNECT_STRING=localhost:9092
    bin/kafka-topics.sh --create --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
    

詳細

$ bin/kafka-topics.sh --describe --zookeeper ${ZOOKEEPER_CONNECT_STRING} --topic mytopic
Topic: mytopic  PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: mytopic  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

削除

$ bin/kafka-topics.sh --delete --zookeeper ${ZOOKEEPER_CONNECT_STRING} --topic mytopic
# 存在しないTopicを指定した場合、エラーが発生します。

一覧

$ bin/kafka-topics.sh --list --zookeeper ${ZOOKEEPER_CONNECT_STRING}
mytopic

プロデューサ

  1. コンソールでメッセージを送信する。

    bin/kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic
    
  2. 数字のメッセージデータを送信する。

    bin/kafka-verifiable-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --max-messages 10000
    

コンシューマ

  1. コンソールでメッセージを受信する。

    bin/kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal
    
  2. コンシューマを検証する。

    bin/kafka-verifiable-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group-id gekal
    

コンシューマグループ

  1. コンシューマグループを取得する。

    bin/kafka-consumer-groups.sh --list --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING}
    
  2. コンシューマグループの詳細情報を表示する。

    bin/kafka-consumer-groups.sh --describe --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --group gekal --offsets
    
  3. グループを削除する。

    bin/kafka-consumer-groups.sh --delete --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --group gekal
    
  4. オフセット情報をリセットする。

    bin/kafka-consumer-groups.sh --reset-offsets --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal --to-earliest
    
  5. オフセット情報を削除する。

    bin/kafka-consumer-groups.sh --delete-offsets --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal
    

参照

  1. APACHE KAFKA QUICKSTART

Discussion