DebeziumでCDCを構築してみた
こんにちは、ikkitangです。
この記事は スターフェスティバル Advent Calendar 2022 の9日目の記事です。
昨日は delucci さんの事業側と開発側、そんな垣根なく一緒にプロダクトをつくるポイントでした。
私も内山さんと同じチームなのですが、弊チームはお客様視点で見た時に窓口となる方々(営業やオペレーションの方)と一緒にプロダクト開発をしています。同じ方向を向いて仕事をしているはずが、どうしても完了イメージに差異が出たり...みたいな事が発生する事もあってもどかしかった所で内山さんやもう一人のPMロールのエンジニアの方と一緒にめちゃくちゃ会話する時間を設けたりなど、記事にあった改善をまさに目の前で見せてもらってる所だったりします(めちゃくちゃ頼りにしてますw)
本当記事中にあった、「絵を描いてみせる」重要だなぁぁぁって思います。
では、本題
今回は、業務の中でDebeziumでCDCを構築した機会があったので、「Debeziumとは何か?」「CDCとは何か?」みたいな所に主眼を置いて紹介をしたいと思います。
とはいえ、いきなり Debezium
とはというのを話をしても伝わらない程度には前提知識が必要だったりしており、まずは前提知識となる Apache Kafkaの話をしていきながら Debeziumについてお話出来ればと思います。
簡単なinputの為に概念的なのをまとめると以下の画像で表せます。
前提知識
Apache Kafkaについて
まずは、Apache Kafka です。これが無いと始まらない素晴らしいソフトウェアです。
これが何か?という所では、公式サイトでは、こう説明されています。
Apache Kafkaはオープンソースの分散型イベントストリーミングプラットフォームです
もう少し平たく言うと、大量のメッセージ(データ)を高速に(かつ少なくとも一回は成功する保障を持って)処理する為に作られた分散メッセージシステムという事も出来ます。送られてくるメッセージ(ひとかたまりのデータ)を受け取り、受け取ったメッセージを別のシステムに送る為に使用されます。
当初の生まれた背景とかから、以下の4要件を重要視して作られています。
- 高いスループットでリアルタイムに処理したい
- 任意のタイミングで読み出しをしたい
- 各種システムとの接続を容易にしたい
- メッセージをロストしたくない
上記を満たす為に、メッセージングモデルが採用されています。
ざっと書くと上記のような感じです。
キーワードとして、メッセージをKafkaに送信するシステムの総称をProducerといいます。自前のAPIからKafkaに送信する場合はそのAPIはProducerになりますし、Kafka Connect(フレームワーク)に対応したConnector(プラグイン)を用いる事で、接続情報を設定しさえすればSourceとする情報を簡単にKafkaのイベントに変換することが出来ます。例えば.csvや.logのデータを読み取ってKafkaにメッセージを送信する仕組みを作る事も出来ます。
逆にメッセージを受信して処理するシステムの総称をConsumerといいます。例えば、メッセージを受け取ってメッセージの分析をして分析結果を格納するといったConsumerを作るとリアルタイムに分析することを実現する事が出来ますね。
そして、メッセージを収集する部分を Brokerといいます。
これがある事によって、ProducerやConsumerはお互いの事を知る必要が無いため、変更に強いシンプルな設計をすることが出来ます。
Producer, Consumer, Broker がKafkaを活用したアーキテクチャを採用する時に必要なロールなのですが、もう少し詳細度があがる話をするとTopic, Messageの概念も理解が必要です。
Messageというのは、Kafka内で扱うデータの単位で、Key, Valueを持つ構造の物です。想像に難くないかと思いますが、皆様大好きなJSONもMessageとして利用出来ます。Topicというのは、Messageを種別ごとで管理する為のストレージです。ProducerはBrokerにMessageを送信する際Topicを指定しますし、ConsumerもMessageを購読する際は Topicを指定する必要があります。
Debezium について
それでは、今日の主題の Debeziumです。
公式サイトではこのように言及されています。
Debeziumは、CDCのためのオープンソース分散プラットフォームです。
Debeziumを起動すると、他のアプリケーションがデータベースにコミットする挿入、更新、削除の全てにアプリケーションが応答を始める事ができます。
2つ目の文章がDebeziumの特徴を全て表していると言ってよいですね。
CDCの説明にもなりますが、データベースの行レベルのINSERT/UPDATE/DELETEで発生したイベントを全てKafkaのイベントとして記録する事が可能になります。私も経験がありますが、「特定のデータが変更された時に何か処理を走らせる」といった非機能要件を達成する時に、バッチで数時間おきに実行する処理とするのか 短いタイミングでポーリングするのかどうかという選択を迫られる事があるかと思いますが、Debeziumを採用すると「特定のテーブルにデータが入った時をトリガーとして何か処理を走らせる」といった事が可能になります。
DebeziumはKafka Connect対応のコネクタがデフォルトで提供されており、MySQL, PostgreSQL, SQL Server など幅広いサポートがあります。
ローカルでDebeziumを動かすには
以下が達成されるとDebeziumが動く環境を手に入れる事が出来ます。
- Apache Kafkaが動いている状態
- データベースが稼働している状態
- DebeziumがデータベースをCDCしている状態
それぞれDocker Imageが公開されており、それを組み合わせる事で環境を簡単に構築する事が出来ます。
Kafkaの構築
こちらのイメージを使用します。
Kafkaと共にZookeeperというのもあります。KafkaのBrokerにおいて分散処理の為の管理を行ってくれるツールであります。例えば、先程Topicの概念を説明しましたが、それらの設定情報などを管理してくれています。
KafkaのDockerHub にある Apache Kafka development setup example
の項を見ながら、以下のような設定を行います。
version: "3.8"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
ports:
- "9092:9092"
- "29092:29092"
environment:
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://local-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
container_name: local-kafka
depends_on:
- zookeeper
設定項目 | 内容 |
---|---|
KAFKA_CFG_BROKER_ID | ブローカーID |
KAFKA_CFG_ZOOKEEPER_CONNECT | ZookeeperのHostへの接続情報 |
KAFKA_CFG_LISTENERS | Kafkaリスナーの設定をします |
KAFKA_CFG_ADVERTISED_LISTENERS | リスナーごとにIP,ポートを指定する |
主に、KAFKA_CFG_LISTENERS/KAFKA_CFG_ADVERTISED_LISTENERS を理解しておかないとハマりますw
Kafka Primer for Docker: ですごくわかりやすい解説があったので参考にしていただくのが良いと思います。
Dockerネットワーク外部なのか内部なのか、という点で接続時のポートが変わります。
Dockerネットワーク内から接続する場合は PLAINTEXT
リスナーが使用されるので、 local-kafka:9092
を用いて、接続を行います。 逆に Dockerネットワーク外から接続する場合は PLAINTEXT_HOST
が使用されるので、127.0.0.1:29092
を用いて接続しないといけません。
これを知っておかないと、なんでや!つながらんやんけ!
と時間を溶かす羽目になります。w
MySQLの構築
MySQLを起動する場合は、customのパラメータを設定する必要があります。内部的にはレプリケーション的な動きをしますので、binlogを吐き出すように設定する必要があります。ので、以下を設定しましょう。
- my.cnf
[mysqld]
# CDCの為の変更
server_id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 1
default_authentication_plugin = mysql_native_password
動作確認用のテーブルを作る為に以下を記述して置いておきます。
- mysql/init.d/0_init.sql
USE test;
DROP TABLE IF EXISTS user;
CREATE TABLE user(
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(25) NOT NULL
) DEFAULT CHARACTER SET=utf8mb4;
docker-compose.yml への記述はこんな感じですかね。
version: "3.8"
services:
zookeeper: 略
kafka: 略
mysql:
image: mysql:8.0
command: mysqld
environment:
MYSQL_DATABASE: test
MYSQL_ROOT_PASSWORD: password
volumes:
- ./my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/init.d:/docker-entrypoint-initdb.d
ports:
- 3306:3306
Debeziumの構築
Debeziumについても構築をする為のDockerが公式より提供されています。これを先程定義したKafkaと接続させれば良いですね。
version: "3.8"
services:
zookeeper: 略
kafka: 略
mysql: 略
debezium:
image: "debezium/connect:2.0"
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=local-kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=_kafka_connect_configs
- OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
- STATUS_STORAGE_TOPIC=_kafka_connect_statuses
depends_on:
- zookeeper
- kafka
- mysql
環境変数についてはこんな感じですね。
設定項目 | 内容 |
---|---|
BOOTSTRAP_SERVERS | Kafkaの接続先 |
CONFIG_STORAGE_TOPIC | Kafka ConnectがConnector設定を保存するトピックの名前を設定する |
OFFSET_STORAGE_TOPIC | 各ConnectorのOffset値(どこまでメッセージを処理したか)を保持するトピックの名前 |
STATUS_STORAGE_TOPIC | 各Connectorのステータスを保持するトピックの名前 |
BOOTSTRAP_SERVERS
に先程出てきた Docker内部接続におけるポートを使うってのがポイントです。
全体構成
version: "3.8"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
ports:
- "9092:9092"
- "29092:29092"
environment:
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://local-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
container_name: local-kafka
depends_on:
- zookeeper
mysql:
image: mysql:8.0
command: mysqld
environment:
MYSQL_DATABASE: test
MYSQL_ROOT_PASSWORD: password
volumes:
- ./my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/init.d:/docker-entrypoint-initdb.d
ports:
- "3306:3306"
debezium:
image: "debezium/connect:2.0"
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=local-kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=_kafka_connect_configs
- OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
- STATUS_STORAGE_TOPIC=_kafka_connect_statuses
depends_on:
- zookeeper
- kafka
- mysql
以下、コマンドで起動すると、起動が確認出来るかと思います。
$ docker-compose up -d
[+] Running 5/5
⠿ Network debezium-sample_debezium-test Created 0.0s
⠿ Container debezium-sample-mysql-1 Started 2.5s
⠿ Container debezium-sample-zookeeper-1 Started 2.5s
⠿ Container local-kafka Started 0.9s
⠿ Container debezium-sample-debezium-1 Started 1.3s
$ docker-compose ps
NAME COMMAND SERVICE STATUS PORTS
debezium-sample-debezium-1 "/docker-entrypoint.…" debezium running 0.0.0.0:8083->8083/tcp, 9092/tcp
debezium-sample-mysql-1 "docker-entrypoint.s…" mysql running 0.0.0.0:3306->3306/tcp, 33060/tcp
debezium-sample-zookeeper-1 "/opt/bitnami/script…" zookeeper running 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp
local-kafka "/opt/bitnami/script…" kafka running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp
この時点で、kafkaが起動しており、MySQLも起動していて、DebeziumもRunning状態になります。 残る所としては、CDCを構築する
部分になります。
CDCの構築
CDCを構築する場合はDebeziumのコンテナにConnecor定義を書く必要があります。
Connector定義は、Debeziumコンテナが提供するRESTのエンドポイントにより設定が可能です。今、Dockerの設定により8083ポートが空いているかと思いますが、そこに提供されています。
APIの定義はここを見るとわかりやすいです。
Connectorの設定はPOST /connectorsエンドポイントを使用します。ただ、このエンドポイントConnector定義を設定するのに必要なパラメータが異常に多く、例で最小限でもこんな感じのリクエストが必要なので、多少工夫して解決するのがおすすめです。
まずは、Connector定義をJSONファイルとして書き出します。( connector.json
としてます. )このファイル自体に、JSONファイルを書いていきます。
{
"name": "debezium-sample-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"schema.history.internal.kafka.bootstrap.servers": "local-kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.test",
"internal.key.converter.schemas.enable": "false",
"include.schema.changes": "false",
"decimal.handling.mode": "double",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"topic.prefix": "debezium_cdc_topic",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "1",
"database.include.list": "test",
"table.include.list": "test.user",
"database.history.kafka.bootstrap.servers": "local-kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"event.processing.failure.handling.mode": "warn",
"database.connectionTimeZone": "UTC",
"inconsistent.schema.handling.mode": "warn",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable": "false",
"snapshot.mode": "initial"
}
}
エグい記述量ですよね。 Connectorの設定については、https://debezium.io/documentation/reference/stable/connectors/mysql.html を参考にしました。
取り立ててピックアップすべき項目を上げるとこんな感じです。
設定項目 | 内容 |
---|---|
topic.prefix | TopicのPrefix名です。 DebeziumでCDCした情報については、{topic.prefix}.{MySQLのデータベース名}.{テーブル名} で決まるので一意の名前をつける必要があります。 |
database.hostname/port/user/password | MySQLへの接続情報です |
database.include.list | 監視対象にするMySQLのデータベース名です |
table.include.list | 監視対象にするMySQLのテーブル名です。 {データベース名}.{テーブル名} の形式で指定する必要があります |
これをリクエストする事で設定が可能です。
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors
-d @connector.json
HTTP/1.1 201 Created
Date: Wed, 07 Dec 2022 17:31:10 GMT
Location: http://localhost:8083/connectors/debezium-sample-connector
Content-Type: application/json
Content-Length: 1329
Server: Jetty(9.4.48.v20220622)
{"name":"debezium-sample-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","schema.history.internal.kafka.bootstrap.servers":"local-kafka:9092","schema.history.internal.kafka.topic":"schema-changes.test","internal.key.converter.schemas.enable":"false","include.schema.changes":"false","decimal.handling.mode":"double","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","topic.prefix":"debezium_cdc_topic","database.hostname":"mysql","database.port":"3306","database.user":"root","database.password":"password","database.server.id":"1","database.include.list":"test","table.include.list":"test.user","database.history.kafka.bootstrap.servers":"local-kafka:9092","database.history.kafka.topic":"schema-changes.test","event.processing.failure.handling.mode":"warn","database.connectionTimeZone":"UTC","inconsistent.schema.handling.mode":"warn","internal.key.converter":"org.apache.kafka.connect.json.JsonConverter","internal.value.converter":"org.apache.kafka.connect.json.JsonConverter","internal.value.converter.schemas.enable":"false","snapshot.mode":"initial","name":"debezium-sample-connector"},"tasks":[],"type":"source"}
$ curl -X GET http://localhost:8083/connectors/debezium-sample-connector
{"name":"debezium-sample-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.history.kafka.topic":"schema-changes.test","internal.key.converter.schemas.enable":"false","include.schema.changes":"false","topic.prefix":"debezium_cdc_topic","decimal.handling.mode":"double","schema.history.internal.kafka.topic":"schema-changes.test","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","database.user":"root","database.server.id":"1","database.history.kafka.bootstrap.servers":"local-kafka:9092","schema.history.internal.kafka.bootstrap.servers":"local-kafka:9092","event.processing.failure.handling.mode":"warn","database.port":"3306","inconsistent.schema.handling.mode":"warn","key.converter.schemas.enable":"false","internal.key.converter":"org.apache.kafka.connect.json.JsonConverter","database.hostname":"mysql","database.connectionTimeZone":"UTC","database.password":"password","internal.value.converter.schemas.enable":"false","value.converter.schemas.enable":"false","internal.value.converter":"org.apache.kafka.connect.json.JsonConverter","name":"debezium-sample-connector","table.include.list":"test.user","database.include.list":"test","snapshot.mode":"initial"},"tasks":[{"connector":"debezium-sample-connector","task":0}],"type":"source"}
動作チェック
以下のコマンドを打つことでTopicリストが得られます。 今はまだ、データを一件もInsertしていないので、Topicが作成されていません。
$ docker-compose exec kafka kafka-topics.sh --list --bootstrap-server local-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses
schema-changes.test
という事で、Insertをしてリストを出してみます。
$ docker-compose exec mysql mysql -u root -ppassword test -e "INSERT INTO user(name) VALUES ('SampleName1');"
$ docker-compose exec kafka kafka-topics.sh --list --bootstrap-server local-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses
debezium_cdc_topic.test.user
schema-changes.test
では、購読してみましょう。
$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server local-kafka:9092 --from-beginning --topic debezium_cdc_topic.test.user
{"before":null,"after":{"id":1,"name":"SampleName1"},"source":{"version":"2.0.0.Final","connector":"mysql","name":"debezium_cdc_topic","ts_ms":1670434632000,"snapshot":"false","db":"test","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":369,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1670434632204,"transaction":null}
無事、"after" の部分にパラメータがセットされる事が達成出来ました。
Debezium構築時のおすすめな楽しみ方は、ターミナルを2個開いて片方でDBの更新をして、片方でDebeziumを購読する事なので、ぜひやってみてほしいと思います。
$ docker-compose exec mysql mysql -u root -ppassword test -e "INSERT INTO user(name) VALUES ('SampleName2');"
$ docker-compose exec mysql mysql -u root -ppassword test -e "UPDATE user SET name = 'SampleName2-Updated' WHERE id = 2;"
$ docker-compose exec mysql mysql -u root -ppassword test -e "DELETE FROM user WHERE id = 2;"
Messageの型については、Insert/Update/Deleteに応じて、以下のようなMessageを取得出来ます.
- INSERTの時
{
before: null;
after: { [key:string]: string|number|Date };
}
- Updateの時
{
before: { [key:string]: string|number|Date };
after: { [key:string]: string|number|Date };
}
- Deleteの時
{
before: { [key:string]: string|number|Date };
after: null;
}
後は、Consumerをつないでアプリケーションを作成する事でどうとでも処理が出来ます。各言語いろんな物が提供されていますが、特にTypeScriptでいうと、 KafkaJSを使ってもらうのがおすすめの構築かと思います。ドキュメントも充実してますし。
現状
今回、Debeziumを構築するチュートリアルについて記述させていただきました。本当は本番環境での運用経験を書きたかったんですが、まだノウハウが溜まっておらずなので、近い内書いていこうと思います。Debeziumの構築時、日本語記事が結構無かったりしたので非常に困りました。Debeziumを構築される誰かの一助になれば幸いです。
採用頑張っております
という事で、今採用活動頑張っております。
雰囲気もこの辺で感じてもらえそうです!
カジュアル面談からやっておりますので、必要な方は @ikkitangのTwitter までDMくださいませ〜〜〜!!
Discussion