💽

MongoDBからDebeziumでRabbitMQにデータ転送する方法

に公開

はじめに

Kubernetes上で、MongoDBの変更をDebeziumでキャプチャし、RabbitMQに送信するCDCパイプラインの構築方法を紹介します
Change Data Capture (CDC) は、データベースの変更をリアルタイムで検知し、他のシステムに転送する技術です

環境

項目 バージョン
mongodb-kubernetes-operator 0.13.0
MongoDB 6.0.5
RabbitMQ 3-management
debezium/server 2.5

手順

1. MongoDBの準備

DebeziumのMongoDBコネクターは、レプリカセットまたはシャードクラスタでのみ動作します[1]

MongoDB Community Kubernetes Operatorを使って、クラスタ構成のMongoDBを構築します。

必要な設定

  • MongoDBをレプリカセットとして起動
  • Debezium用ユーザー作成(read権限とhelloコマンド実行権限)

1. helm リポジトリの追加

helm repo add mongodb https://mongodb.github.io/helm-charts

2. community-operatorをKubernetesに配置

helm installでcommunity-operatorをKubernetes に配置します
namespaceを指定し、同時に作成します

helm install community-operator mongodb/community-operator \
    --namespace growi-dev \
    --create-namespace

この時点で、community-operatorCRDのリソースが作成されます

> kubectl get pod -n growi-dev
NAME                                           READY   STATUS    RESTARTS   AGE
mongodb-kubernetes-operator-5464c4d659-ktlt5   1/1     Running   0          114m

> kubectl get crd
NAME                                            CREATED AT
mongodbcommunity.mongodbcommunity.mongodb.com   2025-07-09T00:44:17Z

3. MongoDBレプリカセットのデプロイ

community-operatorはMongoDBCommunityのリソースを監視しています
公式のサンプルをダウンロードして、CDC権限があるユーザを作ります
growiDBをCDCするのでread権限を付けたのですが、うまく動作しなかったため、readWriteAnyDatabaseにしています

apiVersion: mongodbcommunity.mongodb.com/v1
kind: MongoDBCommunity
metadata:
  namespace: growi-dev
  name: growi-mongodb-cluster
spec:
  members: 1
  type: ReplicaSet
  version: "6.0.5"
  security:
    authentication:
      modes: ["SCRAM"]
    roles:
      - role: cdcRole
        db: admin
        privileges:
          - resource:
              db: growi
              collection: ""
            actions: ["find", "changeStream"]
          - resource:
              db: admin
              collection: ""
            actions: ["find", "changeStream"]
          - resource:
              cluster: true
            actions: [ "find", "changeStream" ] #"aggregate", 
          - resource:
              db: local
              collection: "oplog.rs"
            actions: ["find"]
        roles: []
  users:
    - name: mongo-admin
      db: admin
      passwordSecretRef:
        name: mongo-admin-password
      roles:
        - db: admin
          name: clusterAdmin
        - db: admin
          name: userAdminAnyDatabase
        - db: growi
          name: readWrite
        - db: admin
          name: cdcRole
        - db: admin
          name: readWriteAnyDatabase
      scramCredentialsSecretName: mongo-admin
  additionalMongodConfig:
    storage.wiredTiger.engineConfig.journalCompressor: zlib

---
apiVersion: v1
kind: Secret
metadata:
  namespace: growi-dev
  name: mongo-admin-password
type: Opaque
stringData:
  password: <your-password-here>

applyすると、MongoDBCommunityが作成され、MongoDBクラスタが作成されます

> kubectl get MongoDBCommunity -n growi-dev
NAME                    PHASE     VERSION
growi-mongodb-cluster   Running   6.0.5

> kubectl get po -n growi-dev
NAME                                           READY   STATUS    RESTARTS        AGE
growi-mongodb-cluster-0                        2/2     Running   0               4h8m
growi-mongodb-cluster-1                        2/2     Running   0               4h7m
growi-mongodb-cluster-2                        2/2     Running   0               4h6m
mongodb-kubernetes-operator-5464c4d659-ktlt5   1/1     Running   0               7d20h

4. MongoDBへデータベース・コレクションの追加

MongoDBにデータベース・コレクションを追加します
検証ではGrowiをMongoDBと接続する事で、Migrationによりデータベースを構築しています


2. RabbitMQの準備

RabbitMQをgrowi-dev 名前空間に構築し、RabbitMQの管理画面から以下の設定を追加します

  • Exchangedebezium.growi.exchange
  • Queuedebezium.growi.queues
  • Binding:RoutingKeyなしでExchangeとQueueを接続

3. Debezium の設定

標準のDebeziumは Kafka を想定している為、Debezium Serverを使用します
MongoDBの変更をRabbitMQに送信するには、以下のような設定ファイルを作成します
CDCで監視するコレクションは、debezium.source.collection.include.listで指定します

application.properties の例

debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://mongo-admin:<your-password-here>@growi-mongodb-cluster-svc:27017/?replicaSet=growi-mongodb-cluster&authSource=admin
debezium.source.topic.prefix=debezium.growi
debezium.source.mongodb.name=growi
debezium.source.collection.include.list=growi.pages,growi.users

debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.exchange=debezium.growi.exchange
debezium.sink.rabbitmq.queue=debezium.growi.queues

1. Debezium Serverのデプロイ

後記にあるyamlを使って、Debezium Serverを構築します


4. 動作確認

MongoDBにデータを挿入すると、RabbitMQのdebezium.growi.queuesキューにイベントが送信されます
RabbitMQの管理画面やコンシューマーアプリで確認できます

Growiでページを更新するとpagesテーブルが更新され、以下の様なメッセージがキューに登録されます

  • schema: テーブル定義
  • payload: 実データ 今回の設定では更新後( after )のみ設定される
{
  "schema":{
    "type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,
    "name":"debezium.growi.growi.pages.Envelope"
  },
  "payload":{
    "before":null,
    "after":"{\"_id\": {\"$oid\": \"68a436bde14d4984851fd3d6\"},\"parent\": {\"$oid\": \"68a4364de14d4984851fd1f6\"},\"descendantCount\": 0,\"isEmpty\": false,\"status\": \"published\",\"grant\": 1,\"grantedUsers\": [],\"liker\": [],\"seenUsers\": [{\"$oid\": \"68a4364de14d4984851fd200\"}],\"commentCount\": 0,\"grantedGroups\": [],\"updatedAt\": {\"$date\": 1756270981089},\"path\": \"/テスト\",\"creator\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"lastUpdateUser\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"createdAt\": {\"$date\": 1755592381185},\"__v\": 1,\"latestRevisionBodyLength\": 23,\"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}}","updateDescription":{"removedFields":null,"updatedFields":"{\"latestRevisionBodyLength\": 23, \"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}, \"updatedAt\": {\"$date\": \"2025-08-27T05:03:01.089Z\"}}","truncatedArrays":null},"source":{"version":"2.5.4.Final","connector":"mongodb","name":"debezium.growi","ts_ms":1756270981000,"snapshot":"false","db":"growi","sequence":null,"rs":"growi-mongodb-cluster","collection":"pages","ord":2,"lsid":null,"txnNumber":null,"wallTime":1756270981091},"op":"u","ts_ms":1756270981191,"transaction":null
  }
}

まとめ

Debezium Serverを使って、MongoDBの変更をRabbitMQにリアルタイムで転送する事が出来ました
これを使って、常時バックアップや通知アプリ等を作る事が出来ます

Appendix

debezium Deploy用のyaml
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: growi-dev
  name: growi-debezium-server
  labels:
    app: growi-debezium-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: growi-debezium-server
  template:
    metadata:
      labels:
        app: growi-debezium-server
    spec:
      containers:
      - name: growi-debezium-server
        image: quay.io/debezium/server:2.5
        volumeMounts:
          - name: growi-debezium-server-v
            mountPath: /debezium/data
          - name: growi-debezium-server-application-properties
            mountPath: /debezium/conf
        ports:
          - containerPort: 80
            name: http
        envFrom:
        - secretRef:
            name: growi-debezium-server
      volumes:
      - name: growi-debezium-server-v
        persistentVolumeClaim:
          claimName: growi-debezium-server-pvc
      - name: growi-debezium-server-application-properties
        configMap:
          name: growi-debezium-server-application-properties
          items:
            - key: application.properties
              path: application.properties

service.yaml

apiVersion: v1
kind: Service
metadata:
  namespace: growi-dev
  name: growi-debezium-server
spec:
  type: ClusterIP
  selector:
    app: growi-debezium-server
  ports:
    - port: 80
      name: http
      targetPort: 80

persistentvolumeclaim.yml
apiVersion: v1
kind: Service
metadata:
  namespace: growi-dev
  name: growi-debezium-server
spec:
  type: ClusterIP
  selector:
    app: growi-debezium-server
  ports:
    - port: 80
      name: http
      targetPort: 80

configmap.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: growi-debezium-server-application-properties
  namespace: growi-dev
data:
  application.properties: |
    debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
    # debezium.source.mongodb.connection.string=mongodb://mongo-admin:<your-password-here>@growi-mongodb-cluster-svc:27017/?replicaSet=growi-mongodb-cluster&authSource=admin
    debezium.source.topic.prefix=debezium.growi
    debezium.source.mongodb.name=growi
    debezium.source.collection.include.list=growi.pages,growi.users
    
    debezium.sink.type=rabbitmq
    debezium.sink.rabbitmq.connection.host=rabbitmq
    debezium.sink.rabbitmq.connection.port=5672
    debezium.sink.rabbitmq.connection.username=guest
    debezium.sink.rabbitmq.connection.password=guest
    debezium.sink.rabbitmq.connection.virtual.host=/
    debezium.sink.rabbitmq.exchange=debezium.growi.exchange
    debezium.sink.rabbitmq.queue=debezium.growi.queues

debezium.source.mongodb.connection.stringには秘匿情報が含まれるため、secret経由で設定

secret.yaml
apiVersion: v1
kind: Secret
metadata:
  namespace: growi-dev
  name: growi-debezium-server
type: Opaque
data:
  DEBEZIUM_SOURCE_MONGODB_CONNECTION_STRING: "<MongoDB Connection String>"
脚注
  1. Debezium connector for MongoDB ↩︎

セリオ株式会社 テックブログ

Discussion