📌

「Pub/Sub アーキテクチャで実現するスケール&疎結合 ― 大量トラフィックを乗り切る設計パターン」

に公開

はじめに

今日のマイクロサービスやサーバーレス環境では、システム間の結合度をできるだけ低く保ち、かつ高スループットで稼働させることが成功の鍵です。

Pub/Sub(Publish/Subscribe)はその両方を実現する代表的なアーキテクチャです。本記事では、Pub/Sub がどのように「大量トラフィックを捌き」、「疎結合を保つ」のかを、理論・実装例・運用面まで掘り下げます。

Pub/Sub アーキテクチャとは

Pub/Sub は パブリッシャー(発行者) と サブスクライバー(購読者) がメッセージを「トピック」という中間領域でやり取りするモデルです。

  • トピック:メッセージのカテゴリ。複数のサブスクライバーが同じトピックを購読できる。
  • メッセージ:バイナリ/JSON など。スキーマは可変であり、サブスクライバー側が解釈する。

なぜ Pub/Sub がスケールに強いか

  1. 非同期処理:パブリッシャーはメッセージを送信するだけで相手までの着信を待つ必要がなく、待ち時間が短い。
  2. 水平スケーリング:トピックを複数パーティションに分割し、各サブスクライバーが別々のノードで並列処理。
  3. オフライン耐性:メッセージはブローカーに保存され、サブスクライバーが再接続した際に欠損なく受信。

大量トラフィックを捌く設計ポイント

項目 説明
パーティション分割 キーに基づきメッセージを複数パーティションへ分散。スループット向上とデータのローカリティ確保。
バッチ送信 小さなメッセージをまとめて送ることでオーバーヘッド削減。
バックプレッシャー サブスクライバーが処理遅延した場合、プロデューサー側に制限をかける。
スケールアウト ブローカークラスタのノード数を増やすことで同時接続数と書き込み性能を向上。
QoS(Quality of Service) 送信保証レベルを設定し、必要に応じて「at most once」「at least once」「exactly once」 を選択。

実装例:Kafka + Python

Python で Kafka を扱う代表的なライブラリは confluent-kafka-python(高性能)と kafka-python(純粋 Python)です。ここでは confluent-kafka-python を使った例を紹介します。

Producer

メッセージを Kafka に送信します。

# producer.py
from confluent_kafka import Producer
import json
import yaml

with open("config.yaml") as f:
    cfg = yaml.safe_load(f)

producer_conf = {
    'bootstrap.servers': cfg['kafka']['bootstrap_servers'],
    'acks': 'all',
    'linger.ms': 5,
}

producer = Producer(producer_conf)

def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered message to {msg.topic()} [{msg.partition()}]")

def send_order(order):
    key = str(order["order_id"])
    value = json.dumps(order).encode('utf-8')
    producer.produce(
        topic=cfg['kafka']['topic'],
        key=key,
        value=value,
        callback=delivery_report
    )
    producer.poll(0)

# Example usage
if __name__ == "__main__":
    sample_order = {"order_id": 123, "product": "book", "qty": 2}
    send_order(sample_order)
    producer.flush()

Consumer

Kafka からメッセージを受信して処理します。

# consumer.py
from confluent_kafka import Consumer, KafkaException
import json
import yaml

with open("config.yaml") as f:
    cfg = yaml.safe_load(f)

consumer_conf = {
    'bootstrap.servers': cfg['kafka']['bootstrap_servers'],
    'group.id': cfg['consumer']['group_id'],
    'auto.offset.reset': 'earliest',
}

consumer = Consumer(consumer_conf)
consumer.subscribe([cfg['kafka']['topic']])

def process(msg):
    order = json.loads(msg.value().decode('utf-8'))
    # ここでビジネスロジックを実装
    print(f"Processing order {order['order_id']}")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        process(msg)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

config.yaml

設定値を外部化し、環境に応じて簡単に切替られるようにします。

kafka:
  bootstrap_servers: "localhost:9092"
  topic: orders

consumer:
  group_id: order-service
  • kafka.bootstrap_servers: Kafka ブローカーのアドレス。
  • kafka.topic: メッセージを送受信するトピック名。
  • consumer.group_id: コンシューマーグループ ID。Kafka は同じグループ内のコンシューマーでパーティションを分配し、1 つのメッセージは 1 つだけが処理します。

ポイント

  • linger.ms はバッチ送信を有効化し、スループット向上に寄与。
  • acks: all は「effectively once」の保証を近似的に実現。
  • コンシューマーは auto.offset.reset: earliest で過去のメッセージも拾えるようにしている。

疎結合を実現するデザインパターン

パターン 概要 主な用途 推奨シナリオ
イベントソーシング 状態遷移をイベント単位で保存し、サービスはイベントだけで再構築。 完全な履歴管理・監査 金融、ヘルスケア
CQRS(Command Query Responsibility Segregation) 書き込みと読み取りを分離し、各サブスクライバーが専用のビューを保持。 高速読み取り + スケーラブル E‑コマース、リアルタイムダッシュボード
サービスディスカバリ 直接 URL を知る必要がなく、ブローカーを経由して通信。 動的なサービス構成 マイクロサービス群の自動連携
サーバーレス関数 必要に応じてオンデマンドで起動し、サブスクライバーを自動スケール。 低負荷・イベント駆動 バッチ処理、Webhook 受信

実装のヒント

  • Python で Kafka を扱う場合は confluent-kafka-python が最も高速。
  • 監査ログやメトリクスは prometheus_client と組み合わせると可観測性が向上。
  • スキーマ管理は Confluent Schema Registry を使い、JSON/Avro で型安全に保守。

これらのパターンを組み合わせることで、「疎結合+スケーラブル」 なシステムを Python で構築できます。ぜひプロジェクトに取り入れてみてください!

イベントソーシング

コアアイデア

  • 状態を「イベント」列として永続化。ビジネスロジックは「状態を再構築する」ことで実現し、変更履歴が完全に残る。
  • 1 つのエンティティは「過去のイベントを順に再生」して現在状態を算出。

Pub/Sub での実装例

# eventstore.yaml
event_topic: "orders.events"

Publisher

# event_publisher.py
import json
import time
import yaml

from confluent_kafka import Producer

with open("eventstore.yaml") as f:
    cfg = yaml.safe_load(f)

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def publish_event(event_type, payload):
    event = {
        "type": event_type,
        "payload": payload,
        "timestamp": int(time.time())
    }
    producer.produce(cfg['event_topic'], json.dumps(event).encode('utf-8'))
    producer.flush()

Consumer

# event_consumer.py
from confluent_kafka import Consumer, KafkaException
import json, yaml

with open("eventstore.yaml") as f:
    cfg = yaml.safe_load(f)

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe([cfg['event_topic']])

def apply_event(state, event):
    if event["type"] == "ORDER_CREATED":
        state[event["payload"]["order_id"]] = event["payload"]
    # 以降のイベントタイプに応じて状態更新
    return state

state = {}
while True:
    msg = consumer.poll(1.0)
    if msg is None: continue
    event = json.loads(msg.value().decode('utf-8'))
    state = apply_event(state, event)

メリット

  • 完全な履歴 → 監査・ロールバックが容易。
  • ビジネスルールの変更 → 新しいイベントタイプを追加するだけで既存データに影響なし。

デメリット

  • 状態再構築のコスト → 大量データになると初期化に時間がかかる。
  • 複雑なクエリ → 直接状態を問い合わせると「イベントの再生」が必要になる。

CQRS(Command Query Responsibility Segregation)

コアアイデア

  • 書き込み(Command) と 読み取り(Query) を完全に分離。
  • 書き込みはイベントソーシングや単純なトランザクションで行い、読み取りは専用ビュー(Read Model) に投影。

Pub/Sub での実装例

# cqrs.yaml
command_topic: "orders.commands"
event_topic:   "orders.events"
query_view:    "order_read_view"  # データベーステーブル

Handler

# command_handler.py (Python + SQLAlchemy)
import json
import yaml
from confluent_kafka import Consumer, KafkaException

from .handlers import publish_event
from .models import Order

with open("cqrs.yaml") as f:
    cfg = yaml.safe_load(f)

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-command-handler',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe([cfg['command_topic']])

def handle_command(cmd):
    if cmd["type"] == "CreateOrder":
        order = Order(**cmd["payload"])
        db.session.add(order)
        db.session.commit()
        # イベントを発行
        publish_event("ORDER_CREATED", order.to_dict())

while True:
    msg = consumer.poll(1.0)
    if msg is None: continue
    cmd = json.loads(msg.value().decode('utf-8'))
    handle_command(cmd)

Updater

# query_view_updater.py (Python + Kafka Streams like Faust)
import yaml

import faust

with open("cqrs.yaml") as f:
    cfg = yaml.safe_load(f)

app = faust.App('order-view-updater', broker='kafka://localhost:9092')
event_topic = app.topic(cfg['event_topic'], value_type=dict)

class OrderView(faust.Record, serializer='json'):
    order_id: int
    status: str

order_view = app.Table('orders', default=OrderView)

@event_topic.register
async def update_order_view(event):
    if event["type"] == "ORDER_CREATED":
        payload = event["payload"]
        order_view[payload["order_id"]] = OrderView(order_id=payload["order_id"],
                                                    status="CREATED")

メリット

  • スケーラビリティ:読み取りは専用テーブルに対して高速クエリが可能。
  • セキュリティ:書き込み権限と読み取り権限を完全に分離。

デメリット

  • データ整合性:ビューと書き込みモデルの同期遅延が発生する可能性。
  • 開発コスト:2 つのモデル(Command + Query)を管理する必要がある。

サービスディスカバリ(Service Discovery)

コアアイデア

  • サービスが自動的に他のサービスを発見 できる仕組み。
  • Pub/Sub のトピックを「サービス名」として扱い、ブローカーがディレクトリ機能 を提供。

Pub/Sub での実装例

  • Kafka の Confluent Control Center や KRaft で Kafka Connect を使い、サービスメタデータを __consumer_offsets やカスタムトピックに保存。
  • Python では kafka-python の KafkaConsumer.partitions_for_topic() を呼び出して、現在利用可能なパーティション(=サービスインスタンス)を取得。
# service_discovery.py
from confluent_kafka import Consumer

def discover_services(topic):
    consumer = Consumer({'bootstrap.servers': 'localhost:9092'})
    partitions = consumer.partitions_for_topic(topic)
    # 各パーティションがサービスインスタンスを表す想定
    return [p.partition for p in partitions]

services = discover_services("orders.events")
print(f"Active services: {services}")

メリット

  • 自動スケーリング:サービスが起動/停止するとトピックのパーティション数が変化し、クライアントは自動で更新。
  • ロードバランシング:パーティションを均等に割り当てることで負荷分散。

デメリット

  • パーティションの限界:Kafka のパーティション数は上限(数千)しかない。
  • 複雑な構成:分散環境での正確な状態を保つには、Zookeeper/KRaft の設定が必要。

サーバーレス関数(Serverless Functions)

コアアイデア

  • イベント駆動型:関数は「メッセージ到着時にのみ起動」。
  • スケールアウトは自動:トラフィックに応じて関数インスタンスが増減。

Pub/Sub での実装例

  • AWS SNS + Lambda、Google Cloud Pub/Sub + Cloud Functions、Azure Event Grid + Functions が代表例。
  • Python で実装する場合、関数のエントリポイントを handler として定義。
# gcp_function.py (Google Cloud Functions)
import base64
import json

def process_order(event, context):
    """Triggered by a message on a Pub/Sub topic."""
    pubsub_message = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    # ここでオーダー処理
    print(f"Order received: {pubsub_message['order_id']}")

メリット

  • コスト最適化:実行時間分だけ課金。
  • 運用負荷低減:インフラ管理が不要。

デメリット

  • Cold start:初回呼び出しで遅延が発生。
  • ステートレス制限:関数は状態を保持できないため、永続化は別途必要。

モニタリング & アラート

Kafka のメトリクスは JMX で公開され、Prometheus + Grafana で可視化できます。

Python 側では prometheus_client を使ってカスタムメトリクスを追加する例もあります。

# metrics.py
from prometheus_client import Counter, start_http_server

messages_sent = Counter('kafka_messages_sent_total', 'Total messages sent to Kafka')
messages_received = Counter('kafka_messages_received_total', 'Total messages received from Kafka')

def start_metrics(port=8000):
    start_http_server(port)

プロデューサー/コンシューマーでメトリクスをインクリメントし、Grafana のダッシュボードに kafka_* メトリクスを描画します。

セキュリティ & コンプライアンス

Pub/Sub アーキテクチャを本番環境に導入する際は、「データの安全性」 と 「法令遵守(コンプライアンス)」 が最優先課題です。以下では、記事で触れた 4つの話題(認証・認可、暗号化、監査ログ/メタデータ管理、そしてロールベースのアクセス制御)を実装観点と業務視点から詳しく掘り下げます。

ポイント

  • 「セキュリティは **一度設定したら終わり」ではなく、継続的な運用と監査が必須。
  • すべての設定変更は コード化し、CI/CD のパイプラインで自動テストとレビューを行うことでヒューマンエラーを最小化。

これから紹介する 4 つのテーマをしっかり押さえることで、公開・非公開両方のデータを安全に扱いながら、規制(GDPR, PCI‑DSS, HIPAA 等)へのコンプライアンスを確実に満たす Pub/Sub アーキテクチャが構築できます。

認証・認可(Authentication & Authorization)

目的

誰がブローカーへ接続し、何をできるかを制御。

主な技術

  • SSL/TLS(相互認証)
  • SASL(PLAIN, SCRAM、GSSAPI)
  • OAuth 2.0 / OpenID Connect(Kubernetes‑in‑K8s、AWS IAM)

実装例

# Kafka Python Client(confluent-kafka)で SASL/SCRAM を有効化
conf = {
    'bootstrap.servers': 'kafka-broker:9093',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': 'alice',
    'sasl.password': 'SecretPass1!'
}
producer = Producer(conf)

ベストプラクティス

  • TLSを必須化し、クライアント証明書とサーバ証明書を別管理(例:Vault)
  • SASL/SCRAMはパスワードベースより安全。SCRAM‑SHA‑512 が推奨。
  • OAuth2を使うと、Kubernetes の ServiceAccount と統合し IAM ロールで制御できる。

課題

  • 証明書のローテーションが手間。
  • SASL 認証情報をコードベースに埋め込むと漏洩リスク。

解決策

  • HashiCorp Vault で動的シークレットを取得。
  • CI/CD パイプラインに Secrets Manager を統合し、ランタイム時のみ取得。

データ暗号化(Encryption at Rest & In Transit)

目的

データが「通信中」または「保管時」に不正に読み取られないようにする。

主な技術

  • TLS: ブローカー間、クライアント↔︎ブローカーの通信を暗号化。
  • KMS / HSM: Kafka の encryption.key を KMS で管理。
  • Transparent Data Encryption (TDE): Kafka の内部ストレージに対して AES‑256 で暗号化。

実装例

# Kafka Config(server.properties)
ssl.keystore.type=PKCS12
ssl.keystore.location=/etc/kafka/keystore.p12
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/truststore.p12
ssl.truststore.password=changeit
security.inter.broker.protocol=SASL_SSL

ベストプラクティス

  • 通信は常に TLS を有効化し、security.protocol=SASL_SSL で相互認証。
  • 暗号化キーは KMS(AWS KMS、Azure Key Vault、GCP Cloud KMS)で管理し、ローテーションを自動化。
  • TLS 1.3 を推奨し、古いバージョンは無効化。

課題

  • 暗号化による CPU 負荷増大。
  • キー管理とローテーションの運用コスト。

解決策

  • Kafka の encryption.key を外部 KMS に委譲し、オンデマンドキー取得で負荷を分散。* 監査ログで「暗号化キーの変更」をトリガーにアラートを発行。

監査ログ & メタデータ管理(Audit Logging & Metadata)

目的

すべての操作(プロデューサー・コンシューマーの接続、メッセージ送受信、設定変更)を可視化し、法的証拠 として保持。

主な技術

  • Kafka の kafka.server.log.message.format.version を 3.0+ に設定し、Audit Log API(kafka-audit)を有効化。
  • 外部ロギング:Elastic Stack(ELK)、Splunk、Datadog APM へ転送。

実装例

# audit-config.yaml
audit.enabled: true
audit.log.dir: /var/log/kafka/audit

Kafka を起動時に --config audit-config.yaml で読み込む。

ベストプラクティス

  • 監査ログは必ず 書き込み専用のファイルシステム(例:AWS S3、Azure Blob)に保管。
  • ログローテーションは 7 日間 以上保持し、必要に応じて暗号化。
  • 監査データは 不可変(append‑only)で、改ざん検知のためにハッシュチェーンを付与。

課題

  • ログ量が膨大になるとストレージコストが増加。
  • 監査ログの 検索性 と 可視化 が不十分。

解決策

  • メタデータを Kafka Connect で Elasticsearch に送信し、Kibana で検索。
  • 監査ログは Kafka の別トピック(audit.topic) でストリームし、リアルタイムアラートを設定。

ロールベースアクセス制御(RBAC / ABAC)

目的

ユーザー/サービスアカウントごとに細かい権限を付与し、最小権限の原則を実現。

主な技術

  • Kafka 2.8+ で導入された Authorizer(SimpleACLs、OAUTHBEARER、SASL)
  • クラウドプロバイダーの IAM(AWS IAM Policies, GCP IAM)と統合。

実装例

# server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false

ACL 例: kafka-acls.sh --add --allow-principal User:alice --operation Read --topic orders

ベストプラクティス

  • ACL は最小権限で作成:必要最低限の操作(Read, Write, Describe)だけを許可。
  • プロダクションとテスト環境で ACL を分離し、テスト中の権限漏れを防止。
  • ACL 変更は GitOps で管理し、レビュー後に自動デプロイ。

課題

  • ACL の管理が煩雑になるとミスが発生。
  • 複数のクラスタ間で ACL を同期する必要がある。

解決策

  • kafka-acls.sh を CI/CD スクリプトに組み込み、自動テストで ACL 正当性を検証。
  • クラウドプロバイダーの IAM を活用し、サービスアカウント単位で権限を委譲。

ケーススタディ

A社 (E‑コマース)

1.5M TPS を安定稼働。Python ベースのサーバーレス関数で自動スケール。

  • API Gateway → Lambda:リクエストを即座に受け取り、スケールアウトへとつなげる最初のフロントエンド。
  • Lambda → Kafka:非同期メッセージングで負荷を平滑化し、データ永続性とスループットを保証。
  • Kafka → Consumer Group:並列処理で注文を高速に完了させ、ビジネスロジックの可用性を確保。
  • Autoscaling:トラフィックに応じて自動でリソースを増減し、1.5M TPS の安定稼働を実現。

この 4 層構成が、E‑コマースにおける「高頻度注文処理」の課題を解決し、スケールと可用性の両立を可能にした鍵です。

API Gateway(フロントエンド)

高頻度の注文リクエスト(1.5M TPS)を一括で処理できないため、負荷を細かく分散し、スケールアウト時のダウンタイムを最小化します。

API Gateway

外部からの REST/HTTP リクエストを受け付ける最初の入り口。

  1. スロットリングで突発的なトラフィックを抑制
  2. 認証/認可(JWT, Cognito など)で不正アクセスを防止、
  3. CORS / TLSにより安全な通信を保証。

ロードバランサー

複数の Lambda 関数インスタンスへリクエストを分散。

  1. オートスケーリングによりリクエスト数が増加しても瞬時にインスタンスを追加、
  2. ヘルスチェックで障害ノードを除外。

Python Lambda(Producer)

Lambda のスケーラビリティを活かし、ピーク時でも「一瞬でメッセージをキューに投げ込む」ことが可能。Kafka が「永続化+スケールアウト」を担うことで、Lambda は純粋に処理ロジックだけに集中できます。

Python Serverless Function

各リクエストを非同期で Kafka Producer に渡す。

  1. イベント駆動でリクエストを受け取り、
  2. **バッチ化(可オプション)**で同時に複数メッセージを送信、
  3. エラーハンドリングで失敗時に再試行。

Kafka Produce

orders トピックへメッセージを投入。

  1. パーティション(例:256)で並列性を確保、
  2. セキュリティ(SASL/SSL)で通信を暗号化、
  3. QoS (at-least-once) でデータ損失を防止。

Kafka Broker(メッセージング層)

1.5M TPS のトラフィックは単一のノードでは処理できないため、Kafka のパーティション分散とレプリケーションで「水平スケーリング」と「耐障害性」を両立します。

Kubernetes‑managed Kafka

コンテナ化された Kafka クラスター。

  1. Pod Autoscalerでリソース不足時にノードを追加、
  2. StatefulSetで永続ボリューム(PVC)にデータを保持、
  3. K8s Operatorでライフサイクル管理。

Topic: orders

パーティション数とレプリケーションを設定し、可用性・スループットを最適化。

  1. レプリカ数=3でフェイルオーバーを確実、
  2. Retention(例:7日)で古いデータを自動削除、
    Compression(LZ4/ Snappy)でネットワーク帯域を節約。

Consumer Group(バックエンド処理)

コンシューマーは「Kafka が提供するスループット」に合わせて水平に増やせるため、注文処理を並列化し、処理時間を数秒以内に抑えることが可能です。

Consumer Group

複数インスタンスが同一トピックを並列で消費。

  1. オフセット管理で処理済みメッセージを追跡、
  2. 再試行ロジックで一時的失敗を補完。

Order Service

受信した注文データを DB に永続化。

  1. トランザクションで注文処理の一貫性を保証、
  2. キャッシュ(Redis 等)で重複注文チェック、
  3. CQRSパターンを採用して読み取りと書き込みを分離。

Autoscaling(スケールアウト)

1.5M TPS のピーク時に、Lambda が「同時実行数制限」を超える前に自動で拡張されることで、ダウンタイムをゼロに保ちつつコスト効率も最適化します。

Compute Cluster

Lambda のインスタンス数を動的に増減。

  1. CPU/メモリの閾値でスケール、
  2. Cold Start を最小化するために「プロビジョンドコンテナ」を併用。

B社 (金融)

取引履歴をイベントソーシングで再現し、レギュレーション遵守に成功。

次の構成により、B社は 「取引履歴の完全再現性」 と 「レギュレーション遵守(監査証跡・報告書)を自動化」 という二重の要件を同時に満たしています。

フロー 主な技術・概念 目的
Trading App → Producer REST/WebSocket + Kafka Producer 取引リクエストを即時に非同期でキューへ投げる
Kafka → Compaction Topic コンパクション 最新状態のみ保持し、ディスクを節約
Compaction → Event Store NoSQL + イベントソーシング 取引履歴を完全に保存し、再構築可能
Compliance Engine ストリーム処理 + レポート生成 法令遵守の監査証跡を自動化
Replay Engine スナップショット + イベント再生 取引状態を高速に再構築
Audit Log 別トピック + 永続保持 監査証跡を確実に保管

Trading Application → Kafka Producer

取引は「即時性」と「確実な永続化」を同時に満たす必要があるため、メッセージングレイヤーで「書き込みと処理」を分離します。

REST / WebSocket

エンドユーザー(トレーダー)が行う注文・売買リクエストを即時に受け取る。REST は CRUD 系、WebSocket は継続的な価格ストリームやリアルタイム注文確認に使用。

Kafka Producer

受け取ったリクエストを trade_events トピックへ書き込む。

  1. 非同期送信でレスポンス遅延を抑制、
  2. トランザクション(Kafka Tx)で「取引の永続化」と「アプリケーション処理」が同時に成功/失敗を保証。

Kafka → Compaction Topic

金融取引では「最新状態(残高・ポジション)」だけを必要とするケースが多く、コンパクションによりストレージコストを削減しつつ完全再現性を確保できます。

Compaction

trade_events はコンパクショントピックに設定。

  1. キー(例:trade_id)で重複イベントをまとめ、最新状態だけを保持。
  2. 永続化ストレージは「イベントソーシング」に最適で、過去全履歴を持ちつつディスク容量を抑制。

Compaction Topic → Event Store DB

取引履歴を「状態」ではなく「出来事」として保存することで、将来の再構築や監査が容易になります。

Event Store DB

コンパクション済みメッセージをデータベースに永続化。

  1. **NoSQL(Cassandra, DynamoDB 等)**を採用し、スキーマレスで高速読取。
  2. イベントソーシングの原則に従い、DB は「状態ではなくイベント列」を保持。

Compliance Engine

金融業界は「監査証跡の完全性」が法的義務。イベントソーシングにより**「すべての取引履歴」が保持され、即座に報告書を生成**できる点が大きな強みです。

Read Events

Event Store から全取引イベントをストリームで読み込み。

  1. リアルタイムフィルタ(例:金額閾値超過、ブラックリスト照合)を適用。

Regulatory Reports

取得したイベントをレギュレーション要件に合わせて集計・フォーマット。

  1. PDF / XML 出力、
  2. S3/Blob Storage へ安全に保管。

Replay Engine

取引データが破損した場合や「バグ修正後の状態再計算」が必要な時に、イベント履歴を使って完全かつ高速に状態を再構築できます。

Read Snapshot

Event Store からスナップショット(例:過去24hの状態)を取得。

Replay Service

そのスナップショットを元に「再構築」処理を行い、現在のポジションや残高を計算。

Trade Ledger

再構築結果をレジャーとして可視化・保存。

Audit Log(監査ログ)

取引と同時に「操作の痕跡」も必須。監査ログを別トピックに分離することで、レギュレーション対応(例:SOX, GDPR)を満たしつつ、パフォーマンスに影響を与えない設計が実現できます。

Audit Event

取引時に発生したすべての重要操作(例:認証失敗、権限変更)を別トピック audit_events に書き込む。

Audit Topic

コンパクションではなく、完全保持(Retention = 永続)で監査証跡を保存。

C社 (IoT)

デバイスからのデータを Kafka に集約し、Python の faust でリアルタイム分析。

次の構成により、C社は IoT フリートからの連続データを Kafka で集約し、Faust でリアルタイム分析・ダッシュボード化 した上で、Prometheus/Grafana によるモニタリングとアラートを実現しています。

フロー 技術・概念 主なメリット
Devices → Producer MQTT/HTTP + Kafka Producer 大量データを高速・確実に送信
Kafka Cluster K8s Operator + 多パーティション 自動スケール & 高可用性
Faust Processor Python, ウィンドウ集計 開発効率 + リアルタイム分析
Monitoring Prometheus/Grafana 運用可視化 + アラート自動化

Devices → Kafka Producer

デバイスは「連続的かつ大量」のデータを生成するため、Kafka Producer で「高スループット + ストリームの整合性」を確保し、上流処理(Faust)への遅延を最小化します。

MQTT / HTTP

IoT デバイスが周期的に温度・湿度・電力などのメトリクスを送信。MQTT は軽量でバッテリー消費が少ない、HTTP は既存の REST インフラに統合しやすい。

Kafka Producer

受信したペイロードを device_data トピックへ非同期で送信。

  1. バッチ送信(linger.ms)でネットワークコストを削減、
  2. パーティションキー(例:device_id)で同一デバイスのレコードを同じパーティションに集約。

Kafka Cluster (Kubernetes‑managed)

IoT フリートは「データ量が急増/減少」しやすく、K8s‑managed Kafka で「自動スケーリング + 高可用性」を同時に実現できる点が重要です。

Kubernetes Operator

Kafka をクラスタ化して自動スケール・フェイルオーバーを実現。

  1. StatefulSet により永続ボリュームを管理、
  2. Horizontal Pod Autoscaler (HPA) で負荷に応じてブローカー数を増減。

Topic: device_data

コンシューマーのスループットに合わせてパーティション数を設定(例:200 パーティション)。

  1. Replication Factor=3 で可用性を確保。

Faust Stream Processor

Python エコシステムを活かし、開発速度とメンテナンス性を両立。Faust のウィンドウ機能により「時系列分析・異常検知」がリアルタイムで可能です。

Faust

Python ベースのストリーム処理ライブラリ。

  1. アプリケーション(app = faust.App(...))で Kafka トピックをサブスクライブ、
  2. Agent でメッセージ処理ロジックを定義。

Monitoring (Prometheus / Grafana)

「リアルタイム処理が遅延している/デバイスの異常」が即座に把握でき、運用者は早期に対処できます。監視とアラートの統合は運用コストを大幅に削減します。

Windowed Aggregation

5 分間・1 時間などのウィンドウで集計。

例:app.agent(topic) で event.time_window(5*60, key='device_id') を適用し、平均温度・最大電力を算出。

Metrics Exporter

Faust アプリケーションに faust.metrics を組み込み、Kafka のレイテンシやバッファサイズ、処理件数を Prometheus にエクスポート。

Prometheus

収集したメトリクスを時系列データベースに格納。
① Alertmanager で閾値超過を検知し、Slack/PagerDuty に通知。

Grafana

Prometheus のデータをダッシュボード化し、デバイス状態・処理パフォーマンスを可視化。

まとめ & 次へのステップ

  • Pub/Sub は「スケール」と「疎結合」を同時に実現 できる設計。
  • Python での実装は confluent-kafka-python が最もパフォーマンスが高い。
  • 次に挑戦すべきテーマ:
    • ストリーム処理(faust, kafka-streams-python)との統合
    • スキーマ管理(Confluent Schema Registry)を使ったバージョン管理
    • Kubernetes Operator(Strimzi)で Kafka クラスタを IaC 化
codeciaoテックブログ

Discussion