📚

DebeziumからのMessageを.NETで処理する方法

に公開

はじめに

前回の記事[1] でMongoDB ✕ Debezium Server ✕ RabbitMQ でMongoDBの変更データをRabbitMQで受信しました
今回は、RabbitMQからC#で変更データを処理していきます
変更データをDeserializeするポイントも併せて紹介します


環境

項目 バージョン
.NET 8.0
RabbitMQ.Client 7.1.2
Newtonsoft.Json 13.0.3

変更データの受信

1. APIサーバテンプレートの作成

Visual Studioを起動し 新しいプロジェクトの作成 -> ワーカーサービスを選択し、プロジェクトを作成します

2. RabbitMQとの接続に必要なライブラリの追加

プロジェクトのNuGetパッケージの追加から、RabbitMQ.Clientを追加します

3. Consumerの追加

  1. RabbitMQからデータを受信するため、Consumerをバックグラウンドサービスとして実装します

    RabbitMqConsumerService.cs
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    public class RabbitMqConsumerService : BackgroundService
    {
    
        public required IConnection _connection;
        public required IChannel _channel;
    
        public RabbitMqConsumerService(ILogger<RabbitMqConsumerService> logger)
        {
            InitializeAsync().Wait();
        }
    
        private async Task InitializeAsync()
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest",
                ConsumerDispatchConcurrency = 5 // 並列処理数の指定
            };
    
             _connection = await factory.CreateConnectionAsync();
            _channel = await _connection.CreateChannelAsync();
    
            await _channel.QueueDeclareAsync("debezium.growi.queues", durable: true, exclusive: false, autoDelete: false);
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var consumer = new AsyncEventingBasicConsumer(_channel);
            consumer.ReceivedAsync += async (sender, ea) =>
            {
                var body = ea.Body.ToArray(); // ReadOnlyMemory<byte> → byte[]
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"Received: {message}");
    
                // 処理後にACK
                await _channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
            };
    
            await _channel.BasicConsumeAsync("debezium.growi.queues", autoAck: false, consumer);
        }
    }
    
  2. Program.csでバックグラウンドサービスを追加します

    Program.cs
    builder.Services.AddHostedService<RabbitMqConsumerService>();
    
  3. ここまでで、MongoDBの変更データがC#で処理出来る様になりました
    messageを見てみると、以下の様になっています
    MongoDB ✕ Debezium Server ✕ RabbitMQ のパターンでは、全てのコレクションの変更が同じQueueに格納され、schema.nameでコレクションを判定する様になります

    {
      "schema":{
        "type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,
        "name":"debezium.growi.growi.pages.Envelope"
      },
      "payload":{
        "before":null,
        "after":"{\"_id\": {\"$oid\": \"68a436bde14d4984851fd3d6\"},\"parent\": {\"$oid\": \"68a4364de14d4984851fd1f6\"},\"descendantCount\": 0,\"isEmpty\": false,\"status\": \"published\",\"grant\": 1,\"grantedUsers\": [],\"liker\": [],\"seenUsers\": [{\"$oid\": \"68a4364de14d4984851fd200\"}],\"commentCount\": 0,\"grantedGroups\": [],\"updatedAt\": {\"$date\": 1756270981089},\"path\": \"/テスト\",\"creator\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"lastUpdateUser\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"createdAt\": {\"$date\": 1755592381185},\"__v\": 1,\"latestRevisionBodyLength\": 23,\"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}}","updateDescription":{"removedFields":null,"updatedFields":"{\"latestRevisionBodyLength\": 23, \"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}, \"updatedAt\": {\"$date\": \"2025-08-27T05:03:01.089Z\"}}","truncatedArrays":null},"source":{"version":"2.5.4.Final","connector":"mongodb","name":"debezium.growi","ts_ms":1756270981000,"snapshot":"false","db":"growi","sequence":null,"rs":"growi-mongodb-cluster","collection":"pages","ord":2,"lsid":null,"txnNumber":null,"wallTime":1756270981091},"op":"u","ts_ms":1756270981191,"transaction":null
      }
    }
    

JsonのDeserialize

変更データはJsonになっているため、そのままでは使い辛い状態です
使い易い様に、JsonをクラスにDeserializeします

Jsonの構成

Debeziumが登録するデータは以下の様な構成になっています

  • schema: テーブル定義
  • payload: 実データ
    • 更新前( before ) / 更新後( after )
    • MongoDB Connecterの場合、JSON文字列

1. Jsonの変換に必要なライブラリの追加

プロジェクトのNuGetパッケージの追加から、Newtonsoft.Jsonを追加します

2. 型定義

Debeziumが登録するデータを以下の DebeziumMessage<Page>型にDeserializeします

public class DebeziumMessage<T>
    where T : class
{
    public SchemaPrimary Schema { get; set; }
    public Payload<T> Payload { get; set; }
}
public class SchemaPrimary
{
    public string Type { get; set; }
    public bool Optional { get; set; }
    public string Name { get; set; }
}
public class Payload<T>
    where T:class
{
    public T Before { get; set; }
    public T After { get; set; }
}

サンプルデータのコレクション用の型を定義します
項目が多いので、Idだけにしています

using Newtonsoft.Json;
public class Page
{
    [JsonProperty("_id")]
    public OidWrapper Id { get; set; } = default!;
}
public class User
{
    [JsonProperty("_id")]
    public OidWrapper Id { get; set; } = default!;
}

public class OidWrapper
{
    [JsonProperty("$oid")]
    public string Oid { get; set; } = "";
}

3. JsonConverterの作成

Before / AfterはJson文字列の為、DebeziumMessage<Page>に直接Deserializeするとエラーになります
Json文字列を変換するJsonConverterを作成します

JsonStringToObjectConverter.cs
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public class JsonStringToObjectConverter<T> : JsonConverter where T : class
{
    public override bool CanConvert(Type objectType)
    {
        return typeof(T).IsAssignableFrom(objectType);
    }

    public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
    {
        // JSON文字列として受け取る  
        var jsonString = (string)JToken.Load(reader);
        if (string.IsNullOrEmpty(jsonString))
            return null;
        // 文字列をT型にデシリアライズ  
        return JsonConvert.DeserializeObject<T>(jsonString);
    }

    public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
    {
        // オブジェクトをJSON文字列にシリアライズ  
        var jsonString = JsonConvert.SerializeObject(value);
        writer.WriteValue(jsonString);
    }
}

作成したJsonStringToObjectConverterは以下の様に、Before / Afterのコレクションの型を指定して使います

var settings = new JsonSerializerSettings();
var converter = new JsonStringToObjectConverter<Page>();
settings.Converters.Add(converter);
var debeziumMessage = JsonConvert.DeserializeObject(message, typeof(DebeziumMessage<Page>),
    settings);

4. コレクション毎の変換

コレクション名とそれに対応する型を以下の様に定義します

RabbitMqConsumerService.cs
private Dictionary<string, Type> _nameMapper = new Dictionary<string, Type>
{
    { "debezium.growi.growi.pages.Envelope", typeof(Page) },
    { "debezium.growi.growi.users.Envelope", typeof(User) },
};

message にあるコレクション名から、対応する型へ変換します
これで、DebeziumMessage<T>型で変更データが扱えるようになります

RabbitMqConsumerService.cs
// コレクション名から対応する型を取得
_nameMapper.TryGetValue(
    JsonConvert.DeserializeObject<DebeziumMessage<string>>(message)?.Schema?.Name ?? "",
    out var targetType);

if (targetType != null)
{
    // コレクションの型への変換ルールを追加
    var settings = new JsonSerializerSettings();
    var converterType = typeof(JsonStringToObjectConverter<>).MakeGenericType(targetType);
    var converter = (JsonConverter)Activator.CreateInstance(converterType);
    settings.Converters.Add(converter);
    // DebeziumMessage<T>型へのDeserialize
    var genericType = typeof(DebeziumMessage<>).MakeGenericType(targetType);
    var deserializedMessage = JsonConvert.DeserializeObject(message, genericType, settings);
    // ここで deserializedMessage を使って必要な処理を行う
    Console.WriteLine($"Processed message for type: {targetType.Name}");
}

まとめ

  • RabbitMQのメッセージをC#で扱うコードを作ってみました
  • JsonをDeserializeする部分を共通化しようとしたため、一手間掛かったコードになっています
脚注
  1. MongoDBからDebeziumでRabbitMQにデータ転送する方法 ↩︎

セリオ株式会社 テックブログ

Discussion