MongoDBからDebeziumでRabbitMQにデータ転送する方法
はじめに
Kubernetes上で、MongoDBの変更をDebeziumでキャプチャし、RabbitMQに送信するCDCパイプラインの構築方法を紹介します
Change Data Capture (CDC) は、データベースの変更をリアルタイムで検知し、他のシステムに転送する技術です
環境
項目 | バージョン |
---|---|
mongodb-kubernetes-operator | 0.13.0 |
MongoDB | 6.0.5 |
RabbitMQ | 3-management |
debezium/server | 2.5 |
手順
1. MongoDBの準備
DebeziumのMongoDBコネクターは、レプリカセットまたはシャードクラスタでのみ動作します[1] 。
MongoDB Community Kubernetes Operator
を使って、クラスタ構成のMongoDBを構築します。
必要な設定
- MongoDBをレプリカセットとして起動
- Debezium用ユーザー作成(
read
権限とhello
コマンド実行権限)
1. helm リポジトリの追加
helm repo add mongodb https://mongodb.github.io/helm-charts
2. community-operatorをKubernetesに配置
helm install
でcommunity-operatorをKubernetes に配置します
namespaceを指定し、同時に作成します
helm install community-operator mongodb/community-operator \
--namespace growi-dev \
--create-namespace
この時点で、community-operator
と CRD
のリソースが作成されます
> kubectl get pod -n growi-dev
NAME READY STATUS RESTARTS AGE
mongodb-kubernetes-operator-5464c4d659-ktlt5 1/1 Running 0 114m
> kubectl get crd
NAME CREATED AT
mongodbcommunity.mongodbcommunity.mongodb.com 2025-07-09T00:44:17Z
3. MongoDBレプリカセットのデプロイ
community-operatorはMongoDBCommunity
のリソースを監視しています
公式のサンプルをダウンロードして、CDC権限があるユーザを作ります
growi
DBをCDCするのでread
権限を付けたのですが、うまく動作しなかったため、readWriteAnyDatabase
にしています
apiVersion: mongodbcommunity.mongodb.com/v1
kind: MongoDBCommunity
metadata:
namespace: growi-dev
name: growi-mongodb-cluster
spec:
members: 1
type: ReplicaSet
version: "6.0.5"
security:
authentication:
modes: ["SCRAM"]
roles:
- role: cdcRole
db: admin
privileges:
- resource:
db: growi
collection: ""
actions: ["find", "changeStream"]
- resource:
db: admin
collection: ""
actions: ["find", "changeStream"]
- resource:
cluster: true
actions: [ "find", "changeStream" ] #"aggregate",
- resource:
db: local
collection: "oplog.rs"
actions: ["find"]
roles: []
users:
- name: mongo-admin
db: admin
passwordSecretRef:
name: mongo-admin-password
roles:
- db: admin
name: clusterAdmin
- db: admin
name: userAdminAnyDatabase
- db: growi
name: readWrite
- db: admin
name: cdcRole
- db: admin
name: readWriteAnyDatabase
scramCredentialsSecretName: mongo-admin
additionalMongodConfig:
storage.wiredTiger.engineConfig.journalCompressor: zlib
---
apiVersion: v1
kind: Secret
metadata:
namespace: growi-dev
name: mongo-admin-password
type: Opaque
stringData:
password: <your-password-here>
applyすると、MongoDBCommunityが作成され、MongoDBクラスタが作成されます
> kubectl get MongoDBCommunity -n growi-dev
NAME PHASE VERSION
growi-mongodb-cluster Running 6.0.5
> kubectl get po -n growi-dev
NAME READY STATUS RESTARTS AGE
growi-mongodb-cluster-0 2/2 Running 0 4h8m
growi-mongodb-cluster-1 2/2 Running 0 4h7m
growi-mongodb-cluster-2 2/2 Running 0 4h6m
mongodb-kubernetes-operator-5464c4d659-ktlt5 1/1 Running 0 7d20h
4. MongoDBへデータベース・コレクションの追加
MongoDBにデータベース・コレクションを追加します
検証ではGrowiをMongoDBと接続する事で、Migrationによりデータベースを構築しています
2. RabbitMQの準備
RabbitMQをgrowi-dev 名前空間に構築し、RabbitMQの管理画面から以下の設定を追加します
-
Exchange:
debezium.growi.exchange
-
Queue:
debezium.growi.queues
- Binding:RoutingKeyなしでExchangeとQueueを接続
3. Debezium の設定
標準のDebeziumは Kafka を想定している為、Debezium Server
を使用します
MongoDBの変更をRabbitMQに送信するには、以下のような設定ファイルを作成します
CDCで監視するコレクションは、debezium.source.collection.include.list
で指定します
application.properties
の例
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://mongo-admin:<your-password-here>@growi-mongodb-cluster-svc:27017/?replicaSet=growi-mongodb-cluster&authSource=admin
debezium.source.topic.prefix=debezium.growi
debezium.source.mongodb.name=growi
debezium.source.collection.include.list=growi.pages,growi.users
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.exchange=debezium.growi.exchange
debezium.sink.rabbitmq.queue=debezium.growi.queues
1. Debezium Serverのデプロイ
後記にあるyamlを使って、Debezium Serverを構築します
4. 動作確認
MongoDBにデータを挿入すると、RabbitMQのdebezium.growi.queues
キューにイベントが送信されます
RabbitMQの管理画面やコンシューマーアプリで確認できます
Growiでページを更新するとpagesテーブルが更新され、以下の様なメッセージがキューに登録されます
- schema: テーブル定義
- payload: 実データ 今回の設定では更新後( after )のみ設定される
{
"schema":{
"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,
"name":"debezium.growi.growi.pages.Envelope"
},
"payload":{
"before":null,
"after":"{\"_id\": {\"$oid\": \"68a436bde14d4984851fd3d6\"},\"parent\": {\"$oid\": \"68a4364de14d4984851fd1f6\"},\"descendantCount\": 0,\"isEmpty\": false,\"status\": \"published\",\"grant\": 1,\"grantedUsers\": [],\"liker\": [],\"seenUsers\": [{\"$oid\": \"68a4364de14d4984851fd200\"}],\"commentCount\": 0,\"grantedGroups\": [],\"updatedAt\": {\"$date\": 1756270981089},\"path\": \"/テスト\",\"creator\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"lastUpdateUser\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"createdAt\": {\"$date\": 1755592381185},\"__v\": 1,\"latestRevisionBodyLength\": 23,\"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}}","updateDescription":{"removedFields":null,"updatedFields":"{\"latestRevisionBodyLength\": 23, \"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}, \"updatedAt\": {\"$date\": \"2025-08-27T05:03:01.089Z\"}}","truncatedArrays":null},"source":{"version":"2.5.4.Final","connector":"mongodb","name":"debezium.growi","ts_ms":1756270981000,"snapshot":"false","db":"growi","sequence":null,"rs":"growi-mongodb-cluster","collection":"pages","ord":2,"lsid":null,"txnNumber":null,"wallTime":1756270981091},"op":"u","ts_ms":1756270981191,"transaction":null
}
}
まとめ
Debezium Serverを使って、MongoDBの変更をRabbitMQにリアルタイムで転送する事が出来ました
これを使って、常時バックアップや通知アプリ等を作る事が出来ます
Appendix
debezium Deploy用のyaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: growi-dev
name: growi-debezium-server
labels:
app: growi-debezium-server
spec:
replicas: 1
selector:
matchLabels:
app: growi-debezium-server
template:
metadata:
labels:
app: growi-debezium-server
spec:
containers:
- name: growi-debezium-server
image: quay.io/debezium/server:2.5
volumeMounts:
- name: growi-debezium-server-v
mountPath: /debezium/data
- name: growi-debezium-server-application-properties
mountPath: /debezium/conf
ports:
- containerPort: 80
name: http
envFrom:
- secretRef:
name: growi-debezium-server
volumes:
- name: growi-debezium-server-v
persistentVolumeClaim:
claimName: growi-debezium-server-pvc
- name: growi-debezium-server-application-properties
configMap:
name: growi-debezium-server-application-properties
items:
- key: application.properties
path: application.properties
apiVersion: v1
kind: Service
metadata:
namespace: growi-dev
name: growi-debezium-server
spec:
type: ClusterIP
selector:
app: growi-debezium-server
ports:
- port: 80
name: http
targetPort: 80
apiVersion: v1
kind: Service
metadata:
namespace: growi-dev
name: growi-debezium-server
spec:
type: ClusterIP
selector:
app: growi-debezium-server
ports:
- port: 80
name: http
targetPort: 80
apiVersion: v1
kind: ConfigMap
metadata:
name: growi-debezium-server-application-properties
namespace: growi-dev
data:
application.properties: |
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
# debezium.source.mongodb.connection.string=mongodb://mongo-admin:<your-password-here>@growi-mongodb-cluster-svc:27017/?replicaSet=growi-mongodb-cluster&authSource=admin
debezium.source.topic.prefix=debezium.growi
debezium.source.mongodb.name=growi
debezium.source.collection.include.list=growi.pages,growi.users
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.exchange=debezium.growi.exchange
debezium.sink.rabbitmq.queue=debezium.growi.queues
debezium.source.mongodb.connection.string
には秘匿情報が含まれるため、secret経由で設定
apiVersion: v1
kind: Secret
metadata:
namespace: growi-dev
name: growi-debezium-server
type: Opaque
data:
DEBEZIUM_SOURCE_MONGODB_CONNECTION_STRING: "<MongoDB Connection String>"
Discussion