🐏

Debeziumでメッセージフィルタリングを行う

2023/12/12に公開

はじめに

Debeziumではfilter single message transform(SMT)を使用することでApache Kafkaに送信するメッセージをフィルタリングすることができます。この記事では公式ドキュメントを参考に、スクリプトエンジンであるJSR223をプラグインとしてfilter SMTを動かしてみます。

準備

Apache KafkaのDockerイメージはConfluent社が提供しているものを使用し、Debezium周りは「DebeziumでCDCを構築してみた」の記事を参考にさせていただきました。

2023年12月上旬時点のStableバージョンで動作検証します。

Apache Kafka: 3.5
Debezium: 2.4

設定を追加する前のdocker-compose.yamlは以下の通りです。
https://github.com/kohei-kohei/debezium-playground/blob/2268ec6fe3dc507bb066607895e0da40479a049a/docker-compose.yaml

設定

Debezium connector for MySQLでfilter SMTを動かすための手順です。

  1. セキュリティの観点からDebezium connectorにはfilter SMTが含まれていないので、公式ドキュメント1. Download the scripting SMT archiveからdebezium-scripting-2.4.1.Final.tar.gzを取得します。

  2. Debeziumで式言語を使用するためにGroovyのStableバージョンのbinaryをダウンロードします。今回は4.0.16を使用しました。

  3. 取得した圧縮ファイルを解凍し、以下のjarファイルをDebeziumのプラグインを管理しているディレクトリに配置します。Dockerでは/kafka/connect/debezium-connector-mysql/に配置しました。

debezium-scripting-2.4.1.Final.jar
groovy-4.0.16.jar
groovy-json-4.0.16.jar
groovy-jsr223-4.0.16.jar
  1. Debeziumの設定に以下を追加します。今回はイベント値を元にメッセージをフィルタリングします。この設定では行が削除されたときに発生するtombstoneイベントもオフにしているため、データの作成と更新のイベントのみをKafkaに送信します。
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'",
"tombstones.on.delete": "false"

変更の内容は以下のPRを参照してください。
https://github.com/kohei-kohei/debezium-playground/pull/3

動作検証

以下のコマンドを実行し、データの作成・更新・削除を行ったときにKafkaに送信されるメッセージを確認します。

docker exec mysql mysql -u root -ppassword playground -e "INSERT INTO items (name) VALUE ('Tシャツ');"
docker exec mysql mysql -u root -ppassword playground -e "UPDATE items SET name = 'ロングスリーブTシャツ' WHERE id = 1;"
docker exec mysql mysql -u root -ppassword playground -e "DELETE FROM items WHERE id = 1;"

メッセージフィルタリングを導入する前は作成・更新・削除のイベントがすべてKafkaに送信されています。最後のnullはtombstoneイベントによるものです。

docker exec kafka-broker kafka-console-consumer --bootstrap-server kafka-broker:9092 \
       --topic debezium_playground_topic.playground.items --from-beginning

{"before":null,"after":{"id":1,"name":"Tシャツ","registered_at":1702278557988,"updated_at":1702278557988},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278557000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":398,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1702278558014,"transaction":null}
{"before":{"id":1,"name":"Tシャツ","registered_at":1702278557988,"updated_at":1702278557988},"after":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278557988,"updated_at":1702278563519},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278563000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":748,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1702278563530,"transaction":null}
{"before":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278557988,"updated_at":1702278563519},"after":null,"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278569000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":1133,"row":0,"thread":14,"query":null},"op":"d","ts_ms":1702278569055,"transaction":null}
null

メッセージフィルタリングを導入した後は作成・更新のイベントのみがKafkaに送信されています。

docker exec kafka-broker kafka-console-consumer --bootstrap-server kafka-broker:9092 \
       --topic debezium_playground_topic.playground.items --from-beginning

{"before":null,"after":{"id":1,"name":"Tシャツ","registered_at":1702278727374,"updated_at":1702278727374},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278727000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":398,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1702278727394,"transaction":null}
{"before":{"id":1,"name":"Tシャツ","registered_at":1702278727374,"updated_at":1702278727374},"after":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278727374,"updated_at":1702278736121},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278736000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":748,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1702278736132,"transaction":null}

おわりに

filter SMTを使用することでメッセージのフィルタリングを行うことができました。データをローテートするときにメッセージを送らないようにするなど、活用方法はあると思うのでぜひ利用してみてください。

GitHubで編集を提案

Discussion