🐥

【気軽に試せる】Kafkaのハンズオン環境の作り方

2022/09/06に公開

初めに

長くエンジニアの仕事をしていると1度は耳にしたことがある言葉「Kafka

調べてググっても「謎の専門用語やお堅い文書ばかりで頭に入ってこない」といった方に向けて「手を動かして学べる」環境の作り方を説明します。

Kafkaの全体像を何となくで良いから掴みたいよ、という人向けに以下の記事も用意しています。

https://zenn.dev/amezousan/scraps/7df6c1d21d8600

前提条件

今回記述する内容は以下の環境、バージョン(執筆当時最新)をもとにしています。CLIは事前にダウンロードしてローカルに展開しておきます。

Kafkaのデータをウェブ上で簡単に見れるツール「Kowl」を使うとより理解しやすいので「Docker Desktop」も用意します。

項目 説明
OS macOS Monterey 12.4 - M1 Pro
Kafka v3.2.1 - CLI 2.13
Docker Desktop Mac with Apple Chip - 4.12.0

展開後のフォルダ構成は以下の通り。今後の作業は以下のパスから行います。

~/Workspace/zenn/kafka_2.13-3.2.1$ tree -d -L 2
.
├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
├── logs
└── site-docs

事前準備

  • config/server.properties ファイル内での変更作業

    • log.dirsでKafkaトピックの保存先が以下のように指定されています。各自の環境に合わせて適宜変更してください。
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
  • 事前にkafka/zookeeper サーバを別々のTerminalで立ち上げておきます。

# [terminal1]
./bin/zookeeper-server-start.sh ./config/zookeeper.properties

# [terminal2]
./bin/kafka-server-start.sh ./config/server.properties

📚【まめちしき】
Macのターミナル上で複数窓を作る場合には、以下のショートカットを使うと便利です。

Command + Shift + d で上下分割
Command + d で左右分割
Command + t で新規ターミナル

  • ローカルのIPアドレスを確認します。以下のコマンドをターミナル/コマンドプロンプト上で実行します。

    • Mac: ipconfig getifaddr en0
    • Windows: ipconfig
    $ ipconfig getifaddr en0
    192.168.10.6
    
  • Docker Desktopを起動させた状態で以下のコマンドを実行します。--add-host<your-local-ip>は自身のローカルIPを指定します。

# [Terminal3]
docker run -p 8080:8080 \
  -e KAFKA_BROKERS=localhost:9092 \
  --add-host=localhost:<your-local-ip> \
  quay.io/cloudhut/kowl:master

これで全ての作業は完了しました。早速どんな風に手を動かせるのかを見ていきます。

遊んでみる

トピックを作成

Kafkaで良く聞く「トピック(Topic)」を作ってみます。またまた新しいターミナルを開きます。

# [Terminal4]
./bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic first-topic

http://localhost:8080」へアクセスしてみると、、、

先ほど作成したトピックが見えるようになりました!更に事前準備で確認した「log.dirs」のフォルダを開くと、、、(e.g., open /tmp/kafka-logs/)

フォルダ内のファイルをテキストエディタで開くと、バイナリデータ(人間ではなく機械が読みやすいデータ)として保存されているのが分かります。

ではここにメッセージ(Message)を入力して見ましょう。

メッセージを入力

先ほど作った「first-topic」にメッセージを入力します。

  • 新規ターミナル上で「first-topic」のコンシューマ(consumer; メッセージを受け付ける)を作成します。
# [Terminal4]
./bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic first-topic \
  --property print.key=true \
  --property key.separator=,
  • 新規ターミナル上で「first-topic」へ、プロデューサ(producer; メッセージを作成する)を用いてメッセージを入力します。
# [Terminal5]
./bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic first-topic \
  --property parse.key=true \
  --property key.separator=,
>key,value
>this is a pen,really?
>^C

「key,value」のようにカンマ区切りでメッセージを入力できます。コンシューマのターミナル上でも入力したメッセージがそのまま表示されます。いずれも終了する時は「Ctrl + c」です。

再度「http://localhost:8080」を見てみると、、、


↓「first-topic」リンクをクリック

先ほど入力したメッセージが表示されていますね!!

log.dirs」のフォルダを開くと、、、(e.g., open /tmp/kafka-logs/)

0バイトだったバイナリファイルが入力したメッセージ分、サイズが大きくなってますね!!

トピックを削除

http://localhost:8080」上では色々な仕掛けが用意されており、トピックの削除もできます。


↓ トピックの右端にある「ゴミ箱」アイコンをクリック。そのまま「Yes」をクリック。

その後のページではトピックが消えたかに見えますが「log.dirs」のフォルダを開くと、、、(e.g., open /tmp/kafka-logs/)

あれ?フォルダ名が謎の表記に変わっていますね。

これはKafkaの標準機能でトピックはいきなり削除されることはありません。Kafka Serverのターミナルを確認すると、、、

[2022-09-06 12:03:25,310] INFO Log for partition first-topic-0 is renamed to /tmp/kafka-logs/first-topic-0.6403205aae01482786742b5e793dcb19-delete and is scheduled for deletion (kafka.log.LogManager)
[2022-09-06 12:04:25,331] INFO [LocalLog partition=first-topic-0, dir=/tmp/kafka-logs] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=164, lastModifiedTime=1662432858515, largestRecordTimestamp=Some(1662432857499)) (kafka.log.LocalLog)
[2022-09-06 12:04:25,343] INFO [LocalLog partition=first-topic-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=0, size=164, lastModifiedTime=1662432858515, largestRecordTimestamp=Some(1662432857499)) (kafka.log.LocalLog$)
[2022-09-06 12:04:25,346] INFO Deleted log /tmp/kafka-logs/first-topic-0.6403205aae01482786742b5e793dcb19-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
[2022-09-06 12:04:25,352] INFO Deleted offset index /tmp/kafka-logs/first-topic-0.6403205aae01482786742b5e793dcb19-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
[2022-09-06 12:04:25,353] INFO Deleted time index /tmp/kafka-logs/first-topic-0.6403205aae01482786742b5e793dcb19-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
[2022-09-06 12:04:25,368] INFO Deleted log for partition first-topic-0 in /tmp/kafka-logs/first-topic-0.6403205aae01482786742b5e793dcb19-delete. (kafka.log.LogManager)

削除を決定してから1分後に「segment files, offset/time index, partition」のような難しい単語のものを消したよ、と言っています。

でもよくよく考えるとKafkaで頻出の用語は、

  • topic」👉「フォルダ」
  • message」👉「バイナリファイルに含まれるデータ」
  • consumer」👉「メッセージを作成するもの」
  • producer」👉「メッセージを受付するもの」

のように考えることができます。Kafkaで使われるお堅い言葉は、実は普段馴染み深いものに特別な名称をつけただけなのかもしれません。

最後に

ここから先、どこまで深い知識を身につけるかは人それぞれ異なりますが、今回ご紹介した方法などで「そういえばKafkaのXXの挙動を知りたいから、自分のパソコンで試してみるか」のように気軽に自分で検証できるようになると非常に大きな強みとなります。

本記事の内容が少しでも皆さんのお役に立てたならば幸いです。ではまた!

参考にしたサイト

本記事を書くに当たって生じた疑問を解決してくれたサイトに感謝です。

Dockerのゲストからホストへ「localhost」でアクセスさせたい

https://qiita.com/kai_kou/items/5182965ea75c85cf1e3f

--add-host を利用するとコンテナ内の/etc/hosts ファイルに設定を追加できるそうです。

Discussion