🎉
Kafka Cliの基本的な使い方(サンプル付き)
検証環境
対象 | Version | 備考 |
---|---|---|
OS | Ubuntu 20.04.4 LTS(WSL2) | |
JDK | Temurin-17.0.3+7 | |
Kafka | 6.2.4-ccs | ローカル開発用のKafka環境構築 |
インストール
-
JDK
jdk 1.8以降のバージョンは基本的に問題ありません。OSに合わせてインストールしてください。
-
Kafka Cli
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz tar -xzf kafka_2.12-2.6.2.tgz
基本操作
export ZOOKEEPER_CONNECT_STRING=localhost:2181
export BOOTSTRAP_SERVER_CONNECT_STRING=localhost:9092
Topic
作成
$ bin/kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
Created topic mytopic.
zookeeperやbrokerのどちらを指定すれば操作できます。高可用性を考慮してzookeeperの接続情報を指定しましょう。
-
zookeeperに接続
export ZOOKEEPER_CONNECT_STRING=localhost:2181 bin/kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
-
brokerに接続
export BOOTSTRAP_SERVER_CONNECT_STRING=localhost:9092 bin/kafka-topics.sh --create --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --replication-factor 1 --partitions 1 --topic mytopic
詳細
$ bin/kafka-topics.sh --describe --zookeeper ${ZOOKEEPER_CONNECT_STRING} --topic mytopic
Topic: mytopic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: mytopic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
削除
$ bin/kafka-topics.sh --delete --zookeeper ${ZOOKEEPER_CONNECT_STRING} --topic mytopic
# 存在しないTopicを指定した場合、エラーが発生します。
一覧
$ bin/kafka-topics.sh --list --zookeeper ${ZOOKEEPER_CONNECT_STRING}
mytopic
プロデューサ
-
コンソールでメッセージを送信する。
bin/kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic
-
数字のメッセージデータを送信する。
bin/kafka-verifiable-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --max-messages 10000
コンシューマ
-
コンソールでメッセージを受信する。
bin/kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal
-
コンシューマを検証する。
bin/kafka-verifiable-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group-id gekal
コンシューマグループ
-
コンシューマグループを取得する。
bin/kafka-consumer-groups.sh --list --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING}
-
コンシューマグループの詳細情報を表示する。
bin/kafka-consumer-groups.sh --describe --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --group gekal --offsets
-
グループを削除する。
bin/kafka-consumer-groups.sh --delete --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --group gekal
-
オフセット情報をリセットする。
bin/kafka-consumer-groups.sh --reset-offsets --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal --to-earliest
-
オフセット情報を削除する。
bin/kafka-consumer-groups.sh --delete-offsets --bootstrap-server ${BOOTSTRAP_SERVER_CONNECT_STRING} --topic mytopic --group gekal
Discussion