⛴️

イベントソーシング, CQRSのためのデータキャプチャの仕組みをkubernetes上に構築する

2025/03/06に公開

はじめに

この記事では、イベントソーシングのための簡易的なインフラを構築するために、
Kubernetes上にKafka, Debeziumを使った変更データキャプチャ(Change Data Capture, CDC)を設定する方法について解説します。

いろいろな記事で書かれているとおり、イベントソーシングは拡張性などの観点でメリットが挙げられる一方で、
システムが複雑になるため学習難易度が高い・構築に時間がかかる等というデメリットがあります。

私もイベントソーシングを軽く試したいと思い、ちょうど家にあったKubernetesを使ってサクッと構築してみようと思ったのですが、
いくつかつまづいた部分があったので、振り返りも兼ねてまとめたいと思います。

本記事で解説する構成

この記事では以下の図のような構成を作る方法について解説します。(図中の矢印はデータの流れを示しています)
あくまで検証用途の設定になっているので、本番用途での構築を想定されている場合は、適宜スケーリングや永続化等の設定を見直してください。

ちなみに最終的なゴールは以下のようなイベントソーシング/CQRSの構成を目指していますが、本記事ではアプリの部分は解説しません。

kafka

概要

Kafkaは、ストリーミングデータをリアルタイムで取り込んで処理するための分散メッセージングシステムです[2]
ここでは、Write DBにデータが保存されたことを検出し、別のサーバへその情報を通知するために利用します。

設定

以下のようにKafkaがwatchするnamespaceをvaluesで指定し、kafka operatorをhelm で installします。

watchAnyNamespace: false
watchNamespaces:
  - "kafka"

helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator \
  --namespace kafka \
  --create-namespace \
  --values values.yaml

カスタムリソースにを用いた設定例と簡単な説明を以下に記載しますが、設定値の詳細についてはkafka operatorの公式docを参照ください。

  • 以下のマニフェストはZookeeperは使用しない設定になっています。
  • 検証用途のためreplica設定などは最小限にしています。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  replicas: 1
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: ephemeral
        kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  replicas: 1
  roles:
    - broker  # broker role
  storage:
    type: jbod
    volumes:
      - id: 0
        type: ephemeral
        kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: kafka
  annotations:
    strimzi.io/kraft: enabled
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: "3.9.0"        # 使用する Kafka バージョン
    metadataVersion: 3.9-IV0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      default.replication.factor: 1
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: domain-events
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 60480000        # イベント保持期間
    cleanup.policy: delete

postgresql

今回は検証用途なので簡単に構築・削除できることを優先し、永続化や冗長化はしません。
ここではBitnamiが提供するPostgres の Helm チャートを利用します。
まずは、postgresの初期設定に必要なpassword情報をSecretとして生成します。

apiVersion: v1
kind: Secret
metadata:
  name: postgres-secret
  namespace: postgres
type: Opaque
stringData:
  password: password
  adminPassword: password

values.yamlを以下のように定義し、helm installを実施します。

auth:
  username: "myuser"
  database: "mydb"
  existingSecret: postgres-secret # 上記で作成したSecret名
  secretKeys:
    adminPasswordKey: adminPassword
    userPasswordKey: password
primary:
  persistence:
    enabled: false
  extendedConfiguration: |-
    wal_level = 'logical'
helm install postgres oci://registry-1.docker.io/bitnamicharts/postgresql \
  --version 16.4.16 \
  --namespace postgres \
  --values values.yaml

一点注意すべきは primary.extendedConfiguration の設定です。
後述のdebeziumがPostgreSQL のレプリケーションスロットを作成するような挙動になるため、ログ先行書き込み(WAL)のレベルをlogicalに設定しておく必要があります。[3]

helmで作ったpostgresqlに、outboxテーブルを作成しておきます。

kubectl exec コマンドを用いて、helm installしたpostgresqlのPodのshellに入れば、psql cliを利用することができるので、その中でSQLを実行しておきます。

CREATE TABLE IF NOT EXISTS outbox (
    id BIGSERIAL PRIMARY KEY,
    event_id BIGINT NOT NULL,
    payload JSON NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    published BOOLEAN NOT NULL DEFAULT FALSE
);

debezium

概要

debeziumは、データベースの変更をリアルタイムで検出し、kafkaなどのストリーミングシステムに転送することを目的としています。
正しくDBの接続情報を設定しておけば、対象のテーブルの変更を監視しイベントを発行してくれるような挙動になります。

設定

今回はpostgresを監視するため、postgres プラグインをいれたdebeziumのコンテナが必要になります。
debeziumを利用するために用意した仕組みは以下の図のとおりです。

早速ハマりポイントだったのですが、プラグイン入りのConnector用コンテナは自前で作る必要があるようです。
てっきりpostgresプラグインが入ったコンテナを使えるのかと思ったのですが公式では提供されていなさそう[4]で、
おそらくkafka には様々なプラグインがあるために各自好きなプラグインをコンテナ化するような運用になっているのかなと想像しています。

そのため自前で作ってしまうのが一番早いと思います。少し手間ですがバージョンも明示的に管理できるので案外良いかもしれません。
(マニフェストの設定でビルドまでやってしまう方法もあります。それについては後述します。)

参考までに、Dockerfileのサンプルと、GitHub Container RegistryへPushするコマンドを記載します。

FROM docker.io/alpine:3.21.2 AS builder

ARG DEBEZIUM_VERSION=3.0.7.Final
USER root:root

RUN mkdir -p /opt/kafka/plugins/ \
  # debezium-connector-postgres
  # https://mvnrepository.com/artifact/io.debezium/debezium-connector-postgres \
  && wget --no-verbose --output-document=debezium-connector-postgres.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${DEBEZIUM_VERSION}/debezium-connector-postgres-${DEBEZIUM_VERSION}-plugin.tar.gz \
  && tar --extract --file=debezium-connector-postgres.tar.gz -C /opt/kafka/plugins/ \
  && rm -f debezium-connector-postgres.tar.gz
USER 1001

FROM quay.io/strimzi/kafka:0.45.0-kafka-3.9.0
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
#!/usr/bin/env bash

# エラー時にスクリプトを終了させる
set -e

# 必要な環境変数が設定されているかチェック
: "${GITHUB_USERNAME:?Need to set GITHUB_USERNAME}"
: "${GITHUB_TOKEN:?Need to set GITHUB_TOKEN}"

GHCR_PREFIX="ghcr.io/${GITHUB_USERNAME}/"
APP="my-app"

# Dockerイメージのビルド  
# ここではARM64アーキテクチャ向けのイメージを作成したかったため、Dockerのマルチプラットフォームビルド機能を利用してイメージをビルドしている
docker buildx build --platform linux/arm64 \
  -t "${GHCR_PREFIX}${APP}:latest" \
  -f "${APP}/Dockerfile" .

# GHCRへログイン
echo "${GITHUB_TOKEN}" | docker login ghcr.io -u "${GITHUB_USERNAME}" --password-stdin

# イメージをプッシュ
docker push "${GHCR_PREFIX}${APP}:latest"

Kafka Connect の設定のためのマニフェストは次のとおりです。

# KafkaConnect クラスターのマニフェスト
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect-cdc
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: <コンテナレジストリのURL>:latest
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-bootstrap:9092 # kafka CRDsの名前や設定によってエンドポイントが変わる
  config:
    group.id: kafka-connect-cdc-group
    config.storage.topic: kafka-connect-cdc-configs
    offset.storage.topic: kafka-connect-cdc-offsets
    status.storage.topic: kafka-connect-cdc-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    session.timeout.ms: 30000
    heartbeat.interval.ms: 10000
  template:
    pod:
      imagePullSecrets:
        - name: ghcr-secret
  # # # build セクションにより、必要なプラグイン(Debezium、JDBC Sink など)を含むカスタムイメージを指定
  # build:
  #   output:
  #     type: docker
  #     image: <コンテナレジストリのURL>
  #     pushSecret: ghcr-secret
  #   plugins:
  #     - name: debezium-postgres-connector
  #       artifacts:
  #         - type: tgz
  #           url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
  #           sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
---
# Debezium PostgreSQL CDC コネクタのマニフェスト
# ソース側 PostgreSQL の変更をキャプチャして、Kafka の domain-events トピックに配信
# PostgreSQL の変更(CDC: Change Data Capture)を監視し、その変更内容(INSERT、UPDATE、DELETE など)をリアルタイムに Kafka のトピック(この場合は「domain-events」)へ配信
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgres-cdc-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-connect-cdc
spec:
  class: io.debezium.connector.postgresql.PostgresConnector # 適切にpostgres pluginがinstallされていればこのclassの設定が使えるようになっています
  tasksMax: 1
  config:
    # PostgreSQL 接続設定(※環境に合わせてホスト名や認証情報を調整)
    database.hostname: postgres-postgresql.postgres.svc.cluster.local  # PostgreSQL の エンドポイント。ここではkubernetes内のServiceによる名前解決
    database.port: "5432"
    database.user: postgres
    database.password: password
    database.dbname: mydb
    topic.prefix: "domain-events"
    # Debezium 固有の設定
    database.server.name: commanddbserver       # CDC イベントの論理サーバー名(Kafka トピック名の接頭辞などに利用)
    plugin.name: pgoutput                       # PostgreSQL 用プラグイン
    slot.name: debezium_slot                    # 複製スロットの名前
    publication.name: debezium_publication      # publish 設定名
    table.include.list: "public.outbox" # 変更対象としたいテーブル名をschema.table の形式で表現

動作検証

ここまででpostgres, debezium, kafka によるCDCの仕組みが完成したので、最後に動作検証をします。

受け取り側のダミーアプリを用意し、postgresの変更を検知してくれるかを検証します。上図で Server と表記してある部分にあたります。

use futures::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 環境変数から設定を取得。存在しない場合はデフォルト値を使用
    let group_id = env::var("KAFKA_CONSUMER_GROUP").unwrap_or_else(|_| "viewer_group".to_string());
    let bootstrap_servers =
        env::var("KAFKA_BOOTSTRAP_SERVERS").unwrap_or_else(|_| "localhost:9092".to_string());
    let topic = env::var("KAFKA_TOPIC").unwrap_or_else(|_| "events".to_string());

    // Kafka の Consumer を作成
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", &group_id)
        .set("bootstrap.servers", &bootstrap_servers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        // 必要に応じて他の設定も追加
        .create()?;

    // 指定したトピックを subscribe する
    consumer.subscribe(&[&topic])?;
    println!(
        "Viewer service started. Waiting for Kafka messages from topic '{}'...",
        topic
    );

    let mut message_stream = consumer.stream();

    while let Some(message) = message_stream.next().await {
        match message {
            Ok(m) => {
                // メッセージのペイロードを文字列として取得
                if let Some(payload) = m.payload_view::<str>() {
                    match payload {
                        Ok(text) => {
                            println!("Received event: {}", text);
                            // ここで受信したイベントをパースして、読み取り用 DB のリードモデルを更新する処理を実装
                        }
                        Err(e) => eprintln!("Error parsing message payload: {:?}", e),
                    }
                } else {
                    eprintln!("No payload in received message");
                }
            }
            Err(e) => eprintln!("Kafka error: {:?}", e),
        }
    }
    Ok(())
}

以下はrustアプリをビルドするためのDockerfileの例です。
(rustのrdkafkaライブラリがOpenSSLに依存していたり、デプロイ先がlinuxであったりしたのでちょっと面倒なことをしていますが、単にビルドしてるだけです。)

FROM rust:latest AS builder

WORKDIR /app

# use rdkafka-sys compiling
RUN apt-get update && apt-get install -y cmake pkg-config clang libssl-dev && rm -rf /var/lib/apt/lists/*
RUN rustup target add aarch64-unknown-linux-gnu

COPY . .

WORKDIR /app/viewer_service

RUN cargo build --release --target=aarch64-unknown-linux-gnu

FROM debian:bookworm-slim

# install runtime libraries
RUN apt-get update && apt-get install -y \
    ca-certificates libssl3 && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY --from=builder /app/viewer_service/target/aarch64-unknown-linux-gnu/release/viewer_service /myapp

rustアプリをデプロイするためのPodのマニフェストは以下ようになります。

apiVersion: v1
kind: Pod
metadata:
  name: viewer
  namespace: kafka
spec:
  containers:
    - name: app
      image: <コンテナレジストリのURL>:latest
      command: ["/myapp"]
      env:
        - name: KAFKA_CONSUMER_GROUP
          value: "viewer_group"
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-cluster-kafka-bootstrap:9092"  # kafka CRDsの名前や設定によってエンドポイントが変わる
        - name: KAFKA_TOPIC
          value: "domain-events.public.outbox" # KafkaTopicやKafkaConnectorでも設定した、topic名やwatchするテーブル名によってvalueが決まる
  imagePullSecrets:
    - name: ghcr-secret # ここではdebeziumのコンテナイメージをpullする際にも使ったsecretをそのまま流用している

以上で準備ができたので、Postgresqlのoutboxテーブルにデータを入れてみます。

kubectl exec -it postgres-postgresql-0 -n postgres -- env PGPASSWORD="password" psql -U postgres -d mydb -c "INSERT INTO outbox (event_id, payload) VALUES (3, '{\"data\": \"new\"}');"

このコマンドを実行した直後にRustアプリ(Pod)にログが出れば成功です。

さいごに

Kubernetesを使えば簡単に構築できるだろうと思ってやってみたところ、構築するリソースが多くて案外大変でした。
とはいえ、サービス間通信が容易で、かつ OSSの Custom Resource を使って簡単にインフラができてしまうので、VMなどに比べれば手間はかからないと思います。
また、マニフェストをベースとして簡単に作り直したり拡張したりできるので検証用途にも最適だと思います。
本番用途に使う際は、例えばDBはPostgres Operatorを使って適切に永続化・冗長化していくといった選択もできます。

...とまぁKubernetesは良いぞという本筋ではない締めくくりになりましたが、Kubernetesを使える環境にある方はぜひ試してみてください。

脚注
  1. https://medium.com/@cobch7/kafka-connectors-8fb71ee27cb4 ↩︎

  2. https://docs.confluent.io/kafka/introduction.html ↩︎

  3. debezium用に設定されたpostgresqlのイメージもあるようです https://hub.docker.com/r/debezium/postgres ↩︎

  4. debeziumの提供するコンテナイメージ は存在するようですが、特定のプラグインが入ったコンテナイメージはなさそうです。 ↩︎

Discussion