Kafka使えば無条件に高速になると、いつから錯覚していた?
対象読者
- 「Kafkaって、なんかすごいやつでしょ」って思てる人
- 分散コンピューティングに興味がある人
この記事を読むとわかること
- Kafkaとはなんぞや
- Kafkaってどうやって使うの?
- どういうときは高速化できるの?
↓ コードはこちら!
docker compose up
とするだけで、Apache Kafkaを利用することができます。
また、ベンチマークの方法はREADMEに記載しています。
Apache Kafkaとは?
異なるシステム間で大量のメッセージを高速・安全に配達できるプラットフォームです。
また、色のついた矢印で示すように、右側のどのサービスが
左側のどのメッセージを受け取るかというのを指定することができます。
(※ ハッシュタグ検索をして、それに引っかかるメッセージがリアルタイムに受信できるイメージ)
どういうときに使う?
- リアルタイムデータ処理
- Web サイトのユーザーアクティビティ(クリック、閲覧履歴など)の追跡
- IoTセンサーからの大量データのリアルタイム収集・分析
- システム間の疎結合な連携
- マイクロサービス間のメッセージング基盤
- 複数のアプリケーション間でのイベント共有
- ログ集約
- アプリケーションメトリクスの収集
- セキュリティイベントの監視
- イベント駆動型アーキテクチャの実現
- オーダー処理や在庫管理などの処理
- プッシュ通知やアラートの配信
誰がどんなふうに使っている?
企業 | 用途 |
---|---|
Netflix | ユーザーの視聴データの収集 |
LinkedIn(Kafkaの開発元) | アクティビティログの収集 |
Uber | リアルタイム配車管理 |
Spotify | 分析データの収集 |
Kafkaの登場人物
主な登場人物は以下の3人です。
- Producer
- データを作り出す人
- 例:IoTデバイス(センサデータ), Webサーバ(ユーザアクティビティ)
- Broker(Kafkaサーバ)
- メッセージを受け取り、保存する人
- Consumer
- 収集したデータを処理する人
- 例:DBに保存、分析処理
Kafkaのひみつ道具
Kafkaは以下のようなひみつ道具を使って、分散メッセージングを実現しています。
- Topic
- メッセージを分類する単位(ラベルやハッシュタグのようなイメージ?)
- 例:ユーザー登録イベント」「決済イベント」など
- Producer, Broker, ConsumerはTopic単位でやり取りする
- Partition
- 同じTopicに含まれる複数のメッセージを分散させる単位
- メッセージは各Partitionに振り分けられ、順序は不変
- 👍:パーティションを増やすことで並列処理が可能
- Kafka Cluster
- 複数のBrokerで構成される
- クラスター内の各 Broker は他の Broker の状態を監視する
- パーティションはクラスター内の複数の Broker に分散配置される
-
ZooKeeper
またはKRaft
を使用してクラスターの調整を行う - 👍:一つのBrokerが壊れても、別のBrokerが生き残る
Kafkaを軽く触ってみる
以下のような構成で「データを作成」→ 「データを消費」をやってみます。
プロジェクト構成
コードを公開しているので詳細は省略して、コアな部分だけを紹介します。
- backend
- app
- main.py # Producer(サーバ)
- consumer.py # Consumer
- frontend
- index.html # Producerを呼び出すUI
-
Producer
Kafkaにメッセージを送信するためにKafkaProducer
を作成しています。
/send-event/kafka
にメッセージが来たら、kafkaにメッセージを送信しています。backend/app/main.py# Kafka Producerの設定 producer = KafkaProducer( bootstrap_servers=os.getenv('localhost:9092'), # kafkaサーバ value_serializer=lambda v: json.dumps(v).encode('utf-8'), ) @app.post("/send-event/kafka") async def send_event_kaka(event_data: dict): # イベントをKafkaに送信 producer.send('events', value=event_data) return {"status": "success", "message": "Event sent to Kafka"}
-
Consumer
Kafkaからメッセージを受け取り、CSVファイルに書き込みます。
backend/app/consumer.pyconsumer = KafkaConsumer( # 受信するTopicを指定 'events', # kafkaサーバ bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=True, group_id='event_consumer_group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) for message in consumer: # メッセージを受信 print(f"Received message: {message.value}") with open(log_file_path, mode='a') as f: writer = csv.writer(f) writer.writerow([event_type, event_data])
-
Frontend
Webサーバにリクエストを送るのが大変なので、Webページを用意しました。
「決済イベント」等を発火させる仮のアプリケーションだと思ってください。
「送信」ボタンを押すと、入力した内容がWebサーバに送信されます。
KafkaをDockerで起動する
docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
backend:
build: ./backend
ports:
- "8000:8000"
depends_on:
kafka:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
volumes:
- ./backend:/app
frontend:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./frontend:/usr/share/nginx/html
consumer:
build: ./backend
depends_on:
kafka:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
command: python -m app.consumer
volumes:
- ./backend/logs:/app/logs
いろいろ書いてありますが、以下のようにサーバが起動します。
- zookeeper(Brokerをまとめる人): 2181ポート
- kafkaサーバ: 9092ポート
- Webサーバ: 8000ポート
docker containerで起動するので kafka:9029
としてアクセスすることもできます。
producer = KafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
# ...
)
ためしてみる
docker compose up -d --build
localhost:80
にアクセスし、以下のような内容を入力して送信すると
backend/logs/event.csv
に以下のように追記されます。
hoge,{'v': 'hoge'}
Partition, Consumerを増やして並列化してみる
以下の図のようにPartitionとConsumerを増やして並列化をしてみます。
単純に複製をするだけなので、docker-compose.ymlの設定を変えるだけでできます。
kafka:
environment:
KAFKA_NUM_PARTITIONS: 3
# ...
consumer:
deploy:
mode: replicated
replicas: 3 # Consumerを3つ複製
# ...
docker compose up
で起動すると、3台のConsumerが起動していることがわかります。
並列化すれば高速化するってホント?
- 結論:部分的にそう
「確実に高速化するよ!」と言えれば楽なのですが
「システムがどのような状態になっているか」によって高速化できるかどうかが変わるみたいです。
どういう状態だったら、Kafkaの性能が発揮されるの?
kafkaは時間のかかる処理をConsumerに投げることによって、Webサーバのレスポンスまでにかかる時間を短縮しています。
そして、当たり前かもしれませんが、Kafkaサーバにメッセージを送るのにも時間がかかります。
したがって、
Webサーバで行っていた処理の実行時間
>Kafkaサーバとのやり取りの時間
となる場合は、もとの実装よりも高速化できると思います。
では、実際にどの程度時間のかかる処理であれば、Kafkaの効果が発揮されるのでしょうか?
書き込みがすぐに終わる場合
Kafkaと比較するために、新しくエンドポイントを作成します。
このエンドポイントではKafkaを経由せずに、直接ログの書き込みを行います。
@app.post("/send-event/direct")
async def send_event_direct(event_data: dict):
# 直接、CSVファイルに書き込む
with open(log_file_path, mode='a') as f:
writer = csv.writer(f)
writer.writerow([event_type, event_data])
return {"status": "success", "message": "Event sent to Kafka"}
以下の条件で、どちらが早く捌き切ることができるかを比較してみました。
- 1万リクエスト
- クライアント x 50(1万リクエストが50のクライアントで分割されて並列で行われる)
- Consumer x 3
書き込み処理がすぐに終わる場合は、kafkaを使ったほうが倍近くかかってしまいます 😮
/send-event/direct | /send-event/kafka | |
---|---|---|
機能 | 受け取ったログをWebサーバで直接書き込み | 受け取ったログをKafkaに送信してConsumerがログに書き込み |
終了まで(sec) | 7.272 | 12.12 |
req/sec | 1375 | 825.4 |
ms/req | 36.36 | 60.57 |
Apache Benchのスクリーンショット |
書き込みに時間がかかる場合
ログを書き込みする処理の前に、100ms
のsleepをおいてみます。
await asyncio.sleep(0.1) # 大規模ログが書き込まれている想定
with open(log_file_path, mode='a') as f:
writer = csv.writer(f)
writer.writerow([event_type, event_data])
直接書き込みの方は遅くなっていますが、kafkaの方はほとんど変わらないですね。
/send-event/direct | /send-event/kafka | |
---|---|---|
機能 | 受け取ったログをWebサーバで直接書き込み | 受け取ったログをKafkaに送信してConsumerがログに書き込み |
終了まで(sec) | 30.73 | 12.20 |
req/sec | 325.3 | 819.2 |
ms/req | 153.7 | 61.03 |
Apache Bench |
結論
kafkaが効果を発揮するのは
- Webサーバ上で100ms程度、時間がかかる処理がある
- 処理の内容はレスポンスに直接的に関係がない
- 処理はリアルタイムに行わなくてもいい
kafkaを使うと
- Consumerに処理を分散できる(Webサーバの負荷が小さくなる)
- Consumerの台数が増えれば、全体の処理時間が短くなる
疑問
- Q:Brokerから同じメッセージを複数回受け取ることはあるか?
- A:ある
- Consumer Groupを使った場合
- 異なるConsumer Groupに属するConsumerはそれぞれ独立してメッセージを受信する
- 同じトピックのメッセージを異なる目的で処理したい場合に使う
- ProducerやConsumer側の問題
- Consumerがクラッシュして再読み込みするとき
- ネットワークの一時的な問題で再送が発生した場合
- Q:Kafkaが起動する前にWebサーバが起動して、通信できずにおちる
- A:docker composeの設定で、Kafkaが起動するのを待つように設定することができます
コード
kafka: # ... healthcheck: test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list"] interval: 10s timeout: 5s retries: 5 backend: depends_on: kafka: condition: service_healthy consumer: build: ./backend depends_on: kafka: condition: service_healthy environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 command: python -m app.consumer volumes: - ./backend/logs:/app/logs
Discussion