🐡
Kafka 色々試してみる
インストール方法
Confluent のパッケージから Linux サーバーへインストールする。
(1) jdk のインストール
yum -y install java-1.8.0-openjdk-devel
(2) Confluent のリポジトリ情報登録
rpm --import https://packages.confluent.io/rpm/5.3/archive.key
cat << EOF > /etc/yum.repos.d/confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF
(3) Confluent のパッケージインストール
yum -y install confluent-platform-2.12
ZooKeeperの設定
kafka1/kafka2/kafka3の3台のクラスタで構成する前提
Configuration
/etc/kafka/zookeeper.properties に追記
dataDir=/var/lib/zookeeper
tickTime=3000
initLimit=10
syncLimit=5
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
Parameter
-
dataDir
- ZooKeeperのデータディレクトリ
-
tickTime
- タイムアウト系のパラメータの単位(ms)
-
initLimit
- ZooKeeperクラスタのleaderノードへの接続タイムアウト
-
syncLimit
- ZooKeeperクラスタのfollowerが同期するタイムアウト。
- このタイムアウトを超過して同期できないとクラスタから離脱。
-
server.x
(x=数字)- 各ノード情報を記載。
server.<myid>=<サーバーのホスト名>:<サーバーのポート番号:1>:<サーバーのポート番号:2>
- myid はこれで登録しておく。
echo $(uname -n | cut -c6) | sudo -u cp-kafka tee /var/lib/zookeeper/myid
Start
systemctl restart confluent-zookeeper
Kafka Broker の設定
kafka1/kafka2/kafka3の3台のクラスタで構成する前提
Configuration
/etc/kafka/server.properties に追記
# brokerノードごとに一意の値を設定
broker.id=x
broker.id.generation.enable=false
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
Parameter
-
broker.id
- brokerごとのidでbrokerごとに異なる値を設定する
-
broker.id.generation.enable
- trueの場合、brokerのidを自動生成する。この場合、broker.id はコメントアウトしておく。
-
zookeeper.connect
- zookeeper への接続情報
Start
systemctl restart confluent-kafka
トピック
トピックの作成
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --create --topic first-test --partitions 3 --replication-factor 3
トピックの削除
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --delete --topic first-test
- 削除時には kafka 側の server.property で以下の設定を入れておかないと削除できない。
- delete.topic.enable=true
- 削除時にはConsumerは停止しておく。
トピック一覧
kafka-topics --list --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
トピックの状態確認
kafka-topics --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --describe --topic first-test
Producer と Consumer のメッセージ送受信テスト
producer
kafka-console-producer --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic first-test
consumer
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic first-test
producer側のコンソールで何らかのメッセージを入力するとすぐさまconsumer側のコンソールへ表示されることがわかる
Discussion