DebeziumからのMessageを.NETで処理する方法
はじめに
前回の記事[1] でMongoDB ✕ Debezium Server ✕ RabbitMQ でMongoDBの変更データをRabbitMQで受信しました
今回は、RabbitMQからC#で変更データを処理していきます
変更データをDeserializeするポイントも併せて紹介します
環境
項目 | バージョン |
---|---|
.NET | 8.0 |
RabbitMQ.Client | 7.1.2 |
Newtonsoft.Json | 13.0.3 |
変更データの受信
1. APIサーバテンプレートの作成
Visual Studioを起動し 新しいプロジェクトの作成
-> ワーカーサービス
を選択し、プロジェクトを作成します
2. RabbitMQとの接続に必要なライブラリの追加
プロジェクトのNuGetパッケージの追加から、RabbitMQ.Client
を追加します
3. Consumerの追加
-
RabbitMQからデータを受信するため、Consumerをバックグラウンドサービスとして実装します
RabbitMqConsumerService.csusing RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; public class RabbitMqConsumerService : BackgroundService { public required IConnection _connection; public required IChannel _channel; public RabbitMqConsumerService(ILogger<RabbitMqConsumerService> logger) { InitializeAsync().Wait(); } private async Task InitializeAsync() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest", ConsumerDispatchConcurrency = 5 // 並列処理数の指定 }; _connection = await factory.CreateConnectionAsync(); _channel = await _connection.CreateChannelAsync(); await _channel.QueueDeclareAsync("debezium.growi.queues", durable: true, exclusive: false, autoDelete: false); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += async (sender, ea) => { var body = ea.Body.ToArray(); // ReadOnlyMemory<byte> → byte[] var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Received: {message}"); // 処理後にACK await _channel.BasicAckAsync(ea.DeliveryTag, multiple: false); }; await _channel.BasicConsumeAsync("debezium.growi.queues", autoAck: false, consumer); } }
-
Program.csでバックグラウンドサービスを追加します
Program.csbuilder.Services.AddHostedService<RabbitMqConsumerService>();
-
ここまでで、MongoDBの変更データがC#で処理出来る様になりました
message
を見てみると、以下の様になっています
MongoDB ✕ Debezium Server ✕ RabbitMQ のパターンでは、全てのコレクションの変更が同じQueueに格納され、schema.name
でコレクションを判定する様になります{ "schema":{ "type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false, "name":"debezium.growi.growi.pages.Envelope" }, "payload":{ "before":null, "after":"{\"_id\": {\"$oid\": \"68a436bde14d4984851fd3d6\"},\"parent\": {\"$oid\": \"68a4364de14d4984851fd1f6\"},\"descendantCount\": 0,\"isEmpty\": false,\"status\": \"published\",\"grant\": 1,\"grantedUsers\": [],\"liker\": [],\"seenUsers\": [{\"$oid\": \"68a4364de14d4984851fd200\"}],\"commentCount\": 0,\"grantedGroups\": [],\"updatedAt\": {\"$date\": 1756270981089},\"path\": \"/テスト\",\"creator\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"lastUpdateUser\": {\"$oid\": \"68a4364de14d4984851fd200\"},\"createdAt\": {\"$date\": 1755592381185},\"__v\": 1,\"latestRevisionBodyLength\": 23,\"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}}","updateDescription":{"removedFields":null,"updatedFields":"{\"latestRevisionBodyLength\": 23, \"revision\": {\"$oid\": \"68ae918597bf8cc222686b60\"}, \"updatedAt\": {\"$date\": \"2025-08-27T05:03:01.089Z\"}}","truncatedArrays":null},"source":{"version":"2.5.4.Final","connector":"mongodb","name":"debezium.growi","ts_ms":1756270981000,"snapshot":"false","db":"growi","sequence":null,"rs":"growi-mongodb-cluster","collection":"pages","ord":2,"lsid":null,"txnNumber":null,"wallTime":1756270981091},"op":"u","ts_ms":1756270981191,"transaction":null } }
JsonのDeserialize
変更データはJsonになっているため、そのままでは使い辛い状態です
使い易い様に、JsonをクラスにDeserializeします
Jsonの構成
Debeziumが登録するデータは以下の様な構成になっています
- schema: テーブル定義
- payload: 実データ
- 更新前( before ) / 更新後( after )
- MongoDB Connecterの場合、JSON文字列
1. Jsonの変換に必要なライブラリの追加
プロジェクトのNuGetパッケージの追加から、Newtonsoft.Json
を追加します
2. 型定義
Debeziumが登録するデータを以下の DebeziumMessage<Page>
型にDeserializeします
public class DebeziumMessage<T>
where T : class
{
public SchemaPrimary Schema { get; set; }
public Payload<T> Payload { get; set; }
}
public class SchemaPrimary
{
public string Type { get; set; }
public bool Optional { get; set; }
public string Name { get; set; }
}
public class Payload<T>
where T:class
{
public T Before { get; set; }
public T After { get; set; }
}
サンプルデータのコレクション用の型を定義します
項目が多いので、Idだけにしています
using Newtonsoft.Json;
public class Page
{
[JsonProperty("_id")]
public OidWrapper Id { get; set; } = default!;
}
public class User
{
[JsonProperty("_id")]
public OidWrapper Id { get; set; } = default!;
}
public class OidWrapper
{
[JsonProperty("$oid")]
public string Oid { get; set; } = "";
}
3. JsonConverterの作成
Before / AfterはJson文字列の為、DebeziumMessage<Page>
に直接Deserializeするとエラーになります
Json文字列を変換するJsonConverterを作成します
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public class JsonStringToObjectConverter<T> : JsonConverter where T : class
{
public override bool CanConvert(Type objectType)
{
return typeof(T).IsAssignableFrom(objectType);
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
// JSON文字列として受け取る
var jsonString = (string)JToken.Load(reader);
if (string.IsNullOrEmpty(jsonString))
return null;
// 文字列をT型にデシリアライズ
return JsonConvert.DeserializeObject<T>(jsonString);
}
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
// オブジェクトをJSON文字列にシリアライズ
var jsonString = JsonConvert.SerializeObject(value);
writer.WriteValue(jsonString);
}
}
作成したJsonStringToObjectConverter
は以下の様に、Before / Afterのコレクションの型を指定して使います
var settings = new JsonSerializerSettings();
var converter = new JsonStringToObjectConverter<Page>();
settings.Converters.Add(converter);
var debeziumMessage = JsonConvert.DeserializeObject(message, typeof(DebeziumMessage<Page>),
settings);
4. コレクション毎の変換
コレクション名とそれに対応する型を以下の様に定義します
private Dictionary<string, Type> _nameMapper = new Dictionary<string, Type>
{
{ "debezium.growi.growi.pages.Envelope", typeof(Page) },
{ "debezium.growi.growi.users.Envelope", typeof(User) },
};
message にあるコレクション名から、対応する型へ変換します
これで、DebeziumMessage<T>
型で変更データが扱えるようになります
// コレクション名から対応する型を取得
_nameMapper.TryGetValue(
JsonConvert.DeserializeObject<DebeziumMessage<string>>(message)?.Schema?.Name ?? "",
out var targetType);
if (targetType != null)
{
// コレクションの型への変換ルールを追加
var settings = new JsonSerializerSettings();
var converterType = typeof(JsonStringToObjectConverter<>).MakeGenericType(targetType);
var converter = (JsonConverter)Activator.CreateInstance(converterType);
settings.Converters.Add(converter);
// DebeziumMessage<T>型へのDeserialize
var genericType = typeof(DebeziumMessage<>).MakeGenericType(targetType);
var deserializedMessage = JsonConvert.DeserializeObject(message, genericType, settings);
// ここで deserializedMessage を使って必要な処理を行う
Console.WriteLine($"Processed message for type: {targetType.Name}");
}
まとめ
- RabbitMQのメッセージをC#で扱うコードを作ってみました
- JsonをDeserializeする部分を共通化しようとしたため、一手間掛かったコードになっています
Discussion