🚥

Kafka使えば無条件に高速になると、いつから錯覚していた?

2024/12/10に公開

対象読者

  • 「Kafkaって、なんかすごいやつでしょ」って思てる人
  • 分散コンピューティングに興味がある人

この記事を読むとわかること

  • Kafkaとはなんぞや
  • Kafkaってどうやって使うの?
  • どういうときは高速化できるの?

↓ コードはこちら!
https://github.com/zackerms/playground-apache-kafka

docker compose up とするだけで、Apache Kafkaを利用することができます。
また、ベンチマークの方法はREADMEに記載しています。

Apache Kafkaとは?

異なるシステム間で大量のメッセージを高速・安全に配達できるプラットフォームです。

また、色のついた矢印で示すように、右側のどのサービスが
左側のどのメッセージを受け取るかというのを指定することができます。
(※ ハッシュタグ検索をして、それに引っかかるメッセージがリアルタイムに受信できるイメージ)

どういうときに使う?

  1. リアルタイムデータ処理
    • Web サイトのユーザーアクティビティ(クリック、閲覧履歴など)の追跡
    • IoTセンサーからの大量データのリアルタイム収集・分析
  2. システム間の疎結合な連携
    • マイクロサービス間のメッセージング基盤
    • 複数のアプリケーション間でのイベント共有
  3. ログ集約
    • アプリケーションメトリクスの収集
    • セキュリティイベントの監視
  4. イベント駆動型アーキテクチャの実現
    • オーダー処理や在庫管理などの処理
    • プッシュ通知やアラートの配信

誰がどんなふうに使っている?

企業 用途
Netflix ユーザーの視聴データの収集
LinkedIn(Kafkaの開発元) アクティビティログの収集
Uber リアルタイム配車管理
Spotify 分析データの収集

Kafkaの登場人物

主な登場人物は以下の3人です。

  • Producer
    • データを作り出す人
    • 例:IoTデバイス(センサデータ), Webサーバ(ユーザアクティビティ)
  • Broker(Kafkaサーバ)
    • メッセージを受け取り、保存する人
  • Consumer
    • 収集したデータを処理する人
    • 例:DBに保存、分析処理

Kafkaのひみつ道具

Kafkaは以下のようなひみつ道具を使って、分散メッセージングを実現しています。

  • Topic
    • メッセージを分類する単位(ラベルやハッシュタグのようなイメージ?)
    • 例:ユーザー登録イベント」「決済イベント」など
    • Producer, Broker, ConsumerはTopic単位でやり取りする
  • Partition
    • 同じTopicに含まれる複数のメッセージを分散させる単位
    • メッセージは各Partitionに振り分けられ、順序は不変
    • 👍:パーティションを増やすことで並列処理が可能
  • Kafka Cluster
    • 複数のBrokerで構成される
    • クラスター内の各 Broker は他の Broker の状態を監視する
    • パーティションはクラスター内の複数の Broker に分散配置される
    • ZooKeeper またはKRaft を使用してクラスターの調整を行う
    • 👍:一つのBrokerが壊れても、別のBrokerが生き残る

Kafkaを軽く触ってみる

以下のような構成で「データを作成」→ 「データを消費」をやってみます。

プロジェクト構成

コードを公開しているので詳細は省略して、コアな部分だけを紹介します。

- backend
 - app
  - main.py # Producer(サーバ)
  - consumer.py # Consumer
- frontend
  - index.html # Producerを呼び出すUI
  • Producer
    Kafkaにメッセージを送信するためにKafkaProducerを作成しています。
    /send-event/kafka にメッセージが来たら、kafkaにメッセージを送信しています。

    backend/app/main.py
    # Kafka Producerの設定
    producer = KafkaProducer(
        bootstrap_servers=os.getenv('localhost:9092'), # kafkaサーバ
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    )
    
    @app.post("/send-event/kafka")
    async def send_event_kaka(event_data: dict):
        # イベントをKafkaに送信
        producer.send('events', value=event_data)
        return {"status": "success", "message": "Event sent to Kafka"}
    
  • Consumer

    Kafkaからメッセージを受け取り、CSVファイルに書き込みます。

    backend/app/consumer.py
    consumer = KafkaConsumer(
        # 受信するTopicを指定
        'events',
        # kafkaサーバ
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='event_consumer_group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    for message in consumer:
        # メッセージを受信
        print(f"Received message: {message.value}")
          with open(log_file_path, mode='a') as f:
            writer = csv.writer(f)
            writer.writerow([event_type, event_data])
    
  • Frontend

    Webサーバにリクエストを送るのが大変なので、Webページを用意しました。

    「決済イベント」等を発火させる仮のアプリケーションだと思ってください。

    「送信」ボタンを押すと、入力した内容がWebサーバに送信されます。

KafkaをDockerで起動する

docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list"]
      interval: 10s
      timeout: 5s
      retries: 5

  backend:
    build: ./backend
    ports:
      - "8000:8000"
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    volumes:
      - ./backend:/app

  frontend:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./frontend:/usr/share/nginx/html

  consumer:
    build: ./backend
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    command: python -m app.consumer
    volumes:
      - ./backend/logs:/app/logs

いろいろ書いてありますが、以下のようにサーバが起動します。

  • zookeeper(Brokerをまとめる人): 2181ポート
  • kafkaサーバ: 9092ポート
  • Webサーバ: 8000ポート

docker containerで起動するので kafka:9029 としてアクセスすることもできます。

producer = KafkaProducer(
    bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
    # ...
)

ためしてみる

docker compose up -d --build

localhost:80 にアクセスし、以下のような内容を入力して送信すると

backend/logs/event.csv に以下のように追記されます。

hoge,{'v': 'hoge'}

Partition, Consumerを増やして並列化してみる

以下の図のようにPartitionとConsumerを増やして並列化をしてみます。

単純に複製をするだけなので、docker-compose.ymlの設定を変えるだけでできます。

kafka:
  environment:
    KAFKA_NUM_PARTITIONS: 3
    # ...
consumer:
  deploy:
      mode: replicated
      replicas: 3 # Consumerを3つ複製
  # ...

docker compose up で起動すると、3台のConsumerが起動していることがわかります。

並列化すれば高速化するってホント?

  • 結論:部分的にそう

「確実に高速化するよ!」と言えれば楽なのですが
「システムがどのような状態になっているか」によって高速化できるかどうかが変わるみたいです。

どういう状態だったら、Kafkaの性能が発揮されるの?

kafkaは時間のかかる処理をConsumerに投げることによって、Webサーバのレスポンスまでにかかる時間を短縮しています。

そして、当たり前かもしれませんが、Kafkaサーバにメッセージを送るのにも時間がかかります。

したがって、

Webサーバで行っていた処理の実行時間 > Kafkaサーバとのやり取りの時間

となる場合は、もとの実装よりも高速化できると思います。

では、実際にどの程度時間のかかる処理であれば、Kafkaの効果が発揮されるのでしょうか?

書き込みがすぐに終わる場合

Kafkaと比較するために、新しくエンドポイントを作成します。
このエンドポイントではKafkaを経由せずに、直接ログの書き込みを行います。

@app.post("/send-event/direct")
async def send_event_direct(event_data: dict):
    # 直接、CSVファイルに書き込む
    with open(log_file_path, mode='a') as f:
      writer = csv.writer(f)
      writer.writerow([event_type, event_data])
    return {"status": "success", "message": "Event sent to Kafka"}

以下の条件で、どちらが早く捌き切ることができるかを比較してみました。

  • 1万リクエスト
  • クライアント x 50(1万リクエストが50のクライアントで分割されて並列で行われる)
  • Consumer x 3

書き込み処理がすぐに終わる場合は、kafkaを使ったほうが倍近くかかってしまいます 😮

/send-event/direct /send-event/kafka
機能 受け取ったログをWebサーバで直接書き込み 受け取ったログをKafkaに送信してConsumerがログに書き込み
終了まで(sec) 7.272 12.12
req/sec 1375 825.4
ms/req 36.36 60.57
Apache Benchのスクリーンショット

書き込みに時間がかかる場合

ログを書き込みする処理の前に、100ms のsleepをおいてみます。

await asyncio.sleep(0.1) # 大規模ログが書き込まれている想定
with open(log_file_path, mode='a') as f:
  writer = csv.writer(f)
  writer.writerow([event_type, event_data])

直接書き込みの方は遅くなっていますが、kafkaの方はほとんど変わらないですね。

/send-event/direct /send-event/kafka
機能 受け取ったログをWebサーバで直接書き込み 受け取ったログをKafkaに送信してConsumerがログに書き込み
終了まで(sec) 30.73 12.20
req/sec 325.3 819.2
ms/req 153.7 61.03
Apache Bench

結論

kafkaが効果を発揮するのは

  • Webサーバ上で100ms程度、時間がかかる処理がある
  • 処理の内容はレスポンスに直接的に関係がない
  • 処理はリアルタイムに行わなくてもいい

kafkaを使うと

  • Consumerに処理を分散できる(Webサーバの負荷が小さくなる)
  • Consumerの台数が増えれば、全体の処理時間が短くなる

疑問

  • Q:Brokerから同じメッセージを複数回受け取ることはあるか?
    • A:ある
    1. Consumer Groupを使った場合
      • 異なるConsumer Groupに属するConsumerはそれぞれ独立してメッセージを受信する
      • 同じトピックのメッセージを異なる目的で処理したい場合に使う
    2. ProducerやConsumer側の問題
      • Consumerがクラッシュして再読み込みするとき
      • ネットワークの一時的な問題で再送が発生した場合
  • Q:Kafkaが起動する前にWebサーバが起動して、通信できずにおちる
    • A:docker composeの設定で、Kafkaが起動するのを待つように設定することができます
    コード
    kafka:
        # ...
      healthcheck:
        test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list"]
        interval: 10s
        timeout: 5s
        retries: 5
    backend:
      depends_on:
        kafka:
          condition: service_healthy
    consumer:
      build: ./backend
      depends_on:
        kafka:
          condition: service_healthy
      environment:
        KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      command: python -m app.consumer
      volumes:
        - ./backend/logs:/app/logs
    

Discussion