🐡

Kafka 色々試してみる

2022/07/26に公開約3,000字

インストール方法

Confluent のパッケージから Linux サーバーへインストールする。

(1) jdk のインストール

yum -y install java-1.8.0-openjdk-devel

(2) Confluent のリポジトリ情報登録

rpm --import https://packages.confluent.io/rpm/5.3/archive.key

cat << EOF > /etc/yum.repos.d/confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF

(3) Confluent のパッケージインストール

yum -y install confluent-platform-2.12

ZooKeeperの設定

kafka1/kafka2/kafka3の3台のクラスタで構成する前提

Configuration

/etc/kafka/zookeeper.properties に追記
dataDir=/var/lib/zookeeper
tickTime=3000
initLimit=10
syncLimit=5

server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888

Parameter

https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
  • dataDir
    • ZooKeeperのデータディレクトリ
  • tickTime
    • タイムアウト系のパラメータの単位(ms)
  • initLimit
    • ZooKeeperクラスタのleaderノードへの接続タイムアウト
  • syncLimit
    • ZooKeeperクラスタのfollowerが同期するタイムアウト。
    • このタイムアウトを超過して同期できないとクラスタから離脱。
  • server.x(x=数字)
    • 各ノード情報を記載。
    • server.<myid>=<サーバーのホスト名>:<サーバーのポート番号:1>:<サーバーのポート番号:2>
    • myid はこれで登録しておく。
    • echo $(uname -n | cut -c6) | sudo -u cp-kafka tee /var/lib/zookeeper/myid

Start

systemctl restart confluent-zookeeper

Kafka Broker の設定

kafka1/kafka2/kafka3の3台のクラスタで構成する前提

Configuration

/etc/kafka/server.properties に追記
# brokerノードごとに一意の値を設定
broker.id=x
broker.id.generation.enable=false
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181

Parameter

https://kafka.apache.org/documentation/#brokerconfigs
  • broker.id
    • brokerごとのidでbrokerごとに異なる値を設定する
  • broker.id.generation.enable
    • trueの場合、brokerのidを自動生成する。この場合、broker.id はコメントアウトしておく。
  • zookeeper.connect
    • zookeeper への接続情報

Start

systemctl restart confluent-kafka

トピック

トピックの作成
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --create --topic first-test --partitions 3 --replication-factor 3
トピックの削除
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --delete --topic first-test
  • 削除時には kafka 側の server.property で以下の設定を入れておかないと削除できない。
    • delete.topic.enable=true
  • 削除時にはConsumerは停止しておく。
トピック一覧
kafka-topics --list --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
トピックの状態確認
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --describe --topic first-test

Producer と Consumer のメッセージ送受信テスト

producer
kafka-console-producer --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic first-test
consumer
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic first-test

producer側のコンソールで何らかのメッセージを入力するとすぐさまconsumer側のコンソールへ表示されることがわかる

Discussion

ログインするとコメントできます