Debeziumでメッセージフィルタリングを行う
はじめに
Debeziumではfilter single message transform(SMT)
を使用することでApache Kafkaに送信するメッセージをフィルタリングすることができます。この記事では公式ドキュメントを参考に、スクリプトエンジンであるJSR223
をプラグインとしてfilter SMT
を動かしてみます。
準備
Apache KafkaのDockerイメージはConfluent社が提供しているものを使用し、Debezium周りは「DebeziumでCDCを構築してみた」の記事を参考にさせていただきました。
2023年12月上旬時点のStableバージョンで動作検証します。
Apache Kafka: 3.5
Debezium: 2.4
設定を追加する前のdocker-compose.yaml
は以下の通りです。
設定
Debezium connector for MySQLでfilter SMT
を動かすための手順です。
-
セキュリティの観点からDebezium connectorには
filter SMT
が含まれていないので、公式ドキュメントの1. Download the scripting SMT archive
からdebezium-scripting-2.4.1.Final.tar.gz
を取得します。 -
Debeziumで式言語を使用するためにGroovyのStableバージョンのbinaryをダウンロードします。今回は
4.0.16
を使用しました。 -
取得した圧縮ファイルを解凍し、以下のjarファイルをDebeziumのプラグインを管理しているディレクトリに配置します。Dockerでは
/kafka/connect/debezium-connector-mysql/
に配置しました。
debezium-scripting-2.4.1.Final.jar
groovy-4.0.16.jar
groovy-json-4.0.16.jar
groovy-jsr223-4.0.16.jar
- Debeziumの設定に以下を追加します。今回はイベント値を元にメッセージをフィルタリングします。この設定では行が削除されたときに発生するtombstoneイベントもオフにしているため、データの作成と更新のイベントのみをKafkaに送信します。
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'",
"tombstones.on.delete": "false"
変更の内容は以下のPRを参照してください。
動作検証
以下のコマンドを実行し、データの作成・更新・削除を行ったときにKafkaに送信されるメッセージを確認します。
docker exec mysql mysql -u root -ppassword playground -e "INSERT INTO items (name) VALUE ('Tシャツ');"
docker exec mysql mysql -u root -ppassword playground -e "UPDATE items SET name = 'ロングスリーブTシャツ' WHERE id = 1;"
docker exec mysql mysql -u root -ppassword playground -e "DELETE FROM items WHERE id = 1;"
メッセージフィルタリングを導入する前は作成・更新・削除のイベントがすべてKafkaに送信されています。最後のnull
はtombstoneイベントによるものです。
docker exec kafka-broker kafka-console-consumer --bootstrap-server kafka-broker:9092 \
--topic debezium_playground_topic.playground.items --from-beginning
{"before":null,"after":{"id":1,"name":"Tシャツ","registered_at":1702278557988,"updated_at":1702278557988},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278557000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":398,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1702278558014,"transaction":null}
{"before":{"id":1,"name":"Tシャツ","registered_at":1702278557988,"updated_at":1702278557988},"after":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278557988,"updated_at":1702278563519},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278563000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":748,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1702278563530,"transaction":null}
{"before":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278557988,"updated_at":1702278563519},"after":null,"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278569000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":1133,"row":0,"thread":14,"query":null},"op":"d","ts_ms":1702278569055,"transaction":null}
null
メッセージフィルタリングを導入した後は作成・更新のイベントのみがKafkaに送信されています。
docker exec kafka-broker kafka-console-consumer --bootstrap-server kafka-broker:9092 \
--topic debezium_playground_topic.playground.items --from-beginning
{"before":null,"after":{"id":1,"name":"Tシャツ","registered_at":1702278727374,"updated_at":1702278727374},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278727000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":398,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1702278727394,"transaction":null}
{"before":{"id":1,"name":"Tシャツ","registered_at":1702278727374,"updated_at":1702278727374},"after":{"id":1,"name":"ロングスリーブTシャツ","registered_at":1702278727374,"updated_at":1702278736121},"source":{"version":"2.4.1.Final","connector":"mysql","name":"debezium_playground_topic","ts_ms":1702278736000,"snapshot":"false","db":"playground","sequence":null,"table":"items","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":748,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1702278736132,"transaction":null}
おわりに
filter SMT
を使用することでメッセージのフィルタリングを行うことができました。データをローテートするときにメッセージを送らないようにするなど、活用方法はあると思うのでぜひ利用してみてください。
Discussion