イベントソーシング, CQRSのためのデータキャプチャの仕組みをkubernetes上に構築する
はじめに
この記事では、イベントソーシングのための簡易的なインフラを構築するために、
Kubernetes上にKafka, Debeziumを使った変更データキャプチャ(Change Data Capture, CDC)を設定する方法について解説します。
いろいろな記事で書かれているとおり、イベントソーシングは拡張性などの観点でメリットが挙げられる一方で、
システムが複雑になるため学習難易度が高い・構築に時間がかかる等というデメリットがあります。
私もイベントソーシングを軽く試したいと思い、ちょうど家にあったKubernetesを使ってサクッと構築してみようと思ったのですが、
いくつかつまづいた部分があったので、振り返りも兼ねてまとめたいと思います。
本記事で解説する構成
この記事では以下の図のような構成を作る方法について解説します。(図中の矢印はデータの流れを示しています)
あくまで検証用途の設定になっているので、本番用途での構築を想定されている場合は、適宜スケーリングや永続化等の設定を見直してください。
ちなみに最終的なゴールは以下のようなイベントソーシング/CQRSの構成を目指していますが、本記事ではアプリの部分は解説しません。
kafka
概要
Kafkaは、ストリーミングデータをリアルタイムで取り込んで処理するための分散メッセージングシステムです[2]。
ここでは、Write DBにデータが保存されたことを検出し、別のサーバへその情報を通知するために利用します。
設定
以下のようにKafkaがwatchするnamespaceをvaluesで指定し、kafka operatorをhelm で installします。
watchAnyNamespace: false
watchNamespaces:
- "kafka"
helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator \
--namespace kafka \
--create-namespace \
--values values.yaml
カスタムリソースにを用いた設定例と簡単な説明を以下に記載しますが、設定値の詳細についてはkafka operatorの公式docを参照ください。
- 以下のマニフェストはZookeeperは使用しない設定になっています。
- 検証用途のためreplica設定などは最小限にしています。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: pool-a
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
replicas: 1
roles:
- controller
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
replicas: 1
roles:
- broker # broker role
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: kafka
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
kafka:
version: "3.9.0" # 使用する Kafka バージョン
metadataVersion: 3.9-IV0
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
default.replication.factor: 1
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: domain-events
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 3
replicas: 1
config:
retention.ms: 60480000 # イベント保持期間
cleanup.policy: delete
postgresql
今回は検証用途なので簡単に構築・削除できることを優先し、永続化や冗長化はしません。
ここではBitnamiが提供するPostgres の Helm チャートを利用します。
まずは、postgresの初期設定に必要なpassword情報をSecretとして生成します。
apiVersion: v1
kind: Secret
metadata:
name: postgres-secret
namespace: postgres
type: Opaque
stringData:
password: password
adminPassword: password
values.yamlを以下のように定義し、helm installを実施します。
auth:
username: "myuser"
database: "mydb"
existingSecret: postgres-secret # 上記で作成したSecret名
secretKeys:
adminPasswordKey: adminPassword
userPasswordKey: password
primary:
persistence:
enabled: false
extendedConfiguration: |-
wal_level = 'logical'
helm install postgres oci://registry-1.docker.io/bitnamicharts/postgresql \
--version 16.4.16 \
--namespace postgres \
--values values.yaml
一点注意すべきは primary.extendedConfiguration
の設定です。
後述のdebeziumがPostgreSQL のレプリケーションスロットを作成するような挙動になるため、ログ先行書き込み(WAL)のレベルをlogicalに設定しておく必要があります。[3]
helmで作ったpostgresqlに、outboxテーブルを作成しておきます。
kubectl exec コマンドを用いて、helm installしたpostgresqlのPodのshellに入れば、psql cliを利用することができるので、その中でSQLを実行しておきます。
CREATE TABLE IF NOT EXISTS outbox (
id BIGSERIAL PRIMARY KEY,
event_id BIGINT NOT NULL,
payload JSON NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
published BOOLEAN NOT NULL DEFAULT FALSE
);
debezium
概要
debeziumは、データベースの変更をリアルタイムで検出し、kafkaなどのストリーミングシステムに転送することを目的としています。
正しくDBの接続情報を設定しておけば、対象のテーブルの変更を監視しイベントを発行してくれるような挙動になります。
設定
今回はpostgresを監視するため、postgres プラグインをいれたdebeziumのコンテナが必要になります。
debeziumを利用するために用意した仕組みは以下の図のとおりです。
早速ハマりポイントだったのですが、プラグイン入りのConnector用コンテナは自前で作る必要があるようです。
てっきりpostgresプラグインが入ったコンテナを使えるのかと思ったのですが公式では提供されていなさそう[4]で、
おそらくkafka には様々なプラグインがあるために各自好きなプラグインをコンテナ化するような運用になっているのかなと想像しています。
そのため自前で作ってしまうのが一番早いと思います。少し手間ですがバージョンも明示的に管理できるので案外良いかもしれません。
(マニフェストの設定でビルドまでやってしまう方法もあります。それについては後述します。)
参考までに、Dockerfileのサンプルと、GitHub Container RegistryへPushするコマンドを記載します。
FROM docker.io/alpine:3.21.2 AS builder
ARG DEBEZIUM_VERSION=3.0.7.Final
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
# debezium-connector-postgres
# https://mvnrepository.com/artifact/io.debezium/debezium-connector-postgres \
&& wget --no-verbose --output-document=debezium-connector-postgres.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${DEBEZIUM_VERSION}/debezium-connector-postgres-${DEBEZIUM_VERSION}-plugin.tar.gz \
&& tar --extract --file=debezium-connector-postgres.tar.gz -C /opt/kafka/plugins/ \
&& rm -f debezium-connector-postgres.tar.gz
USER 1001
FROM quay.io/strimzi/kafka:0.45.0-kafka-3.9.0
COPY /opt/kafka/plugins/ /opt/kafka/plugins/
#!/usr/bin/env bash
# エラー時にスクリプトを終了させる
set -e
# 必要な環境変数が設定されているかチェック
: "${GITHUB_USERNAME:?Need to set GITHUB_USERNAME}"
: "${GITHUB_TOKEN:?Need to set GITHUB_TOKEN}"
GHCR_PREFIX="ghcr.io/${GITHUB_USERNAME}/"
APP="my-app"
# Dockerイメージのビルド
# ここではARM64アーキテクチャ向けのイメージを作成したかったため、Dockerのマルチプラットフォームビルド機能を利用してイメージをビルドしている
docker buildx build --platform linux/arm64 \
-t "${GHCR_PREFIX}${APP}:latest" \
-f "${APP}/Dockerfile" .
# GHCRへログイン
echo "${GITHUB_TOKEN}" | docker login ghcr.io -u "${GITHUB_USERNAME}" --password-stdin
# イメージをプッシュ
docker push "${GHCR_PREFIX}${APP}:latest"
Kafka Connect の設定のためのマニフェストは次のとおりです。
# KafkaConnect クラスターのマニフェスト
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cdc
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: <コンテナレジストリのURL>:latest
replicas: 1
bootstrapServers: kafka-cluster-kafka-bootstrap:9092 # kafka CRDsの名前や設定によってエンドポイントが変わる
config:
group.id: kafka-connect-cdc-group
config.storage.topic: kafka-connect-cdc-configs
offset.storage.topic: kafka-connect-cdc-offsets
status.storage.topic: kafka-connect-cdc-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
session.timeout.ms: 30000
heartbeat.interval.ms: 10000
template:
pod:
imagePullSecrets:
- name: ghcr-secret
# # # build セクションにより、必要なプラグイン(Debezium、JDBC Sink など)を含むカスタムイメージを指定
# build:
# output:
# type: docker
# image: <コンテナレジストリのURL>
# pushSecret: ghcr-secret
# plugins:
# - name: debezium-postgres-connector
# artifacts:
# - type: tgz
# url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
# sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
---
# Debezium PostgreSQL CDC コネクタのマニフェスト
# ソース側 PostgreSQL の変更をキャプチャして、Kafka の domain-events トピックに配信
# PostgreSQL の変更(CDC: Change Data Capture)を監視し、その変更内容(INSERT、UPDATE、DELETE など)をリアルタイムに Kafka のトピック(この場合は「domain-events」)へ配信
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-cdc-connector
namespace: kafka
labels:
strimzi.io/cluster: kafka-connect-cdc
spec:
class: io.debezium.connector.postgresql.PostgresConnector # 適切にpostgres pluginがinstallされていればこのclassの設定が使えるようになっています
tasksMax: 1
config:
# PostgreSQL 接続設定(※環境に合わせてホスト名や認証情報を調整)
database.hostname: postgres-postgresql.postgres.svc.cluster.local # PostgreSQL の エンドポイント。ここではkubernetes内のServiceによる名前解決
database.port: "5432"
database.user: postgres
database.password: password
database.dbname: mydb
topic.prefix: "domain-events"
# Debezium 固有の設定
database.server.name: commanddbserver # CDC イベントの論理サーバー名(Kafka トピック名の接頭辞などに利用)
plugin.name: pgoutput # PostgreSQL 用プラグイン
slot.name: debezium_slot # 複製スロットの名前
publication.name: debezium_publication # publish 設定名
table.include.list: "public.outbox" # 変更対象としたいテーブル名をschema.table の形式で表現
動作検証
ここまででpostgres, debezium, kafka によるCDCの仕組みが完成したので、最後に動作検証をします。
受け取り側のダミーアプリを用意し、postgresの変更を検知してくれるかを検証します。上図で Server と表記してある部分にあたります。
use futures::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 環境変数から設定を取得。存在しない場合はデフォルト値を使用
let group_id = env::var("KAFKA_CONSUMER_GROUP").unwrap_or_else(|_| "viewer_group".to_string());
let bootstrap_servers =
env::var("KAFKA_BOOTSTRAP_SERVERS").unwrap_or_else(|_| "localhost:9092".to_string());
let topic = env::var("KAFKA_TOPIC").unwrap_or_else(|_| "events".to_string());
// Kafka の Consumer を作成
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", &bootstrap_servers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
// 必要に応じて他の設定も追加
.create()?;
// 指定したトピックを subscribe する
consumer.subscribe(&[&topic])?;
println!(
"Viewer service started. Waiting for Kafka messages from topic '{}'...",
topic
);
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(m) => {
// メッセージのペイロードを文字列として取得
if let Some(payload) = m.payload_view::<str>() {
match payload {
Ok(text) => {
println!("Received event: {}", text);
// ここで受信したイベントをパースして、読み取り用 DB のリードモデルを更新する処理を実装
}
Err(e) => eprintln!("Error parsing message payload: {:?}", e),
}
} else {
eprintln!("No payload in received message");
}
}
Err(e) => eprintln!("Kafka error: {:?}", e),
}
}
Ok(())
}
以下はrustアプリをビルドするためのDockerfileの例です。
(rustのrdkafkaライブラリがOpenSSLに依存していたり、デプロイ先がlinuxであったりしたのでちょっと面倒なことをしていますが、単にビルドしてるだけです。)
FROM rust:latest AS builder
WORKDIR /app
# use rdkafka-sys compiling
RUN apt-get update && apt-get install -y cmake pkg-config clang libssl-dev && rm -rf /var/lib/apt/lists/*
RUN rustup target add aarch64-unknown-linux-gnu
COPY . .
WORKDIR /app/viewer_service
RUN cargo build --release --target=aarch64-unknown-linux-gnu
FROM debian:bookworm-slim
# install runtime libraries
RUN apt-get update && apt-get install -y \
ca-certificates libssl3 && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY /app/viewer_service/target/aarch64-unknown-linux-gnu/release/viewer_service /myapp
rustアプリをデプロイするためのPodのマニフェストは以下ようになります。
apiVersion: v1
kind: Pod
metadata:
name: viewer
namespace: kafka
spec:
containers:
- name: app
image: <コンテナレジストリのURL>:latest
command: ["/myapp"]
env:
- name: KAFKA_CONSUMER_GROUP
value: "viewer_group"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-cluster-kafka-bootstrap:9092" # kafka CRDsの名前や設定によってエンドポイントが変わる
- name: KAFKA_TOPIC
value: "domain-events.public.outbox" # KafkaTopicやKafkaConnectorでも設定した、topic名やwatchするテーブル名によってvalueが決まる
imagePullSecrets:
- name: ghcr-secret # ここではdebeziumのコンテナイメージをpullする際にも使ったsecretをそのまま流用している
以上で準備ができたので、Postgresqlのoutboxテーブルにデータを入れてみます。
kubectl exec -it postgres-postgresql-0 -n postgres -- env PGPASSWORD="password" psql -U postgres -d mydb -c "INSERT INTO outbox (event_id, payload) VALUES (3, '{\"data\": \"new\"}');"
このコマンドを実行した直後にRustアプリ(Pod)にログが出れば成功です。
さいごに
Kubernetesを使えば簡単に構築できるだろうと思ってやってみたところ、構築するリソースが多くて案外大変でした。
とはいえ、サービス間通信が容易で、かつ OSSの Custom Resource を使って簡単にインフラができてしまうので、VMなどに比べれば手間はかからないと思います。
また、マニフェストをベースとして簡単に作り直したり拡張したりできるので検証用途にも最適だと思います。
本番用途に使う際は、例えばDBはPostgres Operatorを使って適切に永続化・冗長化していくといった選択もできます。
...とまぁKubernetesは良いぞという本筋ではない締めくくりになりましたが、Kubernetesを使える環境にある方はぜひ試してみてください。
-
debezium用に設定されたpostgresqlのイメージもあるようです https://hub.docker.com/r/debezium/postgres ↩︎
-
debeziumの提供するコンテナイメージ は存在するようですが、特定のプラグインが入ったコンテナイメージはなさそうです。 ↩︎
Discussion