TiDB Dedicated Cluster の Changefeed 機能を試す
普段このブログシリーズではTiDB Serverlessの無償で使える機能を中心にお届けしていますが、今回PingCapの関口さんにご相談させていただき、前から気になったていたDedicated ClusterのChangefeed機能を期間限定で触らせていただけるようになりましたので早速試してみました。
CDC と Changefeed
CDC (Change Data Capture)は一般的に用いられる言葉です。データベースのテーブルなどに対して何かの変更(Insert,Delete,Update等)が発生したタイミングでイベントが発生しデータ同期を行うのに必要なデータが指定した先に送出されます。この機能をTiDBではChangefeedと呼んでいます。
多少の遅延は発生する物の準リアルタイムでターゲットにデータを伝搬できるのが特徴です。
この図でいうTiCDC
という基盤がその機能を実装しています。とはいえTiDB CloudのDedicatedインスタンスでは自動で作成してくれますので特に意識する必要がありません。
一方、自前でインストールするバージョンではこの辺りの構築作業が発生します。構築手順はこちら。
Changefeed の対象
TiDBでは以下をChangefeedのデータ送出先としてサポートしています。
・MySQL
・Apache Kafka
・TiDB Cloud Serverless クラスター
・Amazon S3 or GCS
今日はAmazon S3を試します。
さっそくやってみる
では早速やっていきます。TiDB Cloud Dedicated Cluster の構築と外部からの接続はこの記事では割愛しますが以下にその手順がまとまっています。
まず以下のSQLでテスト用のテーブルを作成しておきます。
CREATE TABLE users (
id INT PRIMARY KEY,
name VARCHAR(100)
);
次にTiDB Cloudのマネージメントコンソール、左ペインからChangefeed
をクリックします。
画面右上Create Changefeed
ボタンを押します。
Amazon S3
をせんたくします。Serverlessクラスターがグレーアウトされているのは同じリージョンにサーバレスクラスターが無いためです。
S3バケット名とIAMクレデンシャルを入力してNext
をクリックします。
CDCをChangefeed対象にするテーブルを選択します。
イベントを発行するトリガー(DDLやDML)を選ぶこともできますが、一旦すべてで進めます。
出力形式を選択します。
ファイルフォーマットはCSVかCanal-JSONを選ぶことが出来ます。バイナリー形式のデータはBase64
かHEX
を選ぶことが出来ます。
Cacal-JSON とは
Canalというツールがキャプチャしたデータベースの変更内容(INSERT、UPDATE、DELETE)をJSON形式で表現したフォーマットです。Canalは、MySQLやMariaDBなどのデータベースからバイナリログ(binlog)を利用して、データの変更をリアルタイムで取得し、その情報を他のシステムにストリーミングするツールです。このデータフォーマットをJSON形式にしたものがCanal-JSONです。ざっくりCDCにおける業界デファクトファイルフォーマットととらえても問題なさそうです。以下に例を記載しておきます。
{
"database": "test_db",
"table": "user",
"type": "UPDATE",
"before": {
"id": 1,
"name": "John Doe",
"age": 30
},
"after": {
"id": 1,
"name": "John Doe",
"age": 31
},
"ts": 1612908474000
}
では今回は使い慣れたCSVを指定し、CSVのデータフォーマットを指定します。すべてデフォルトで進めます。
データの同期タイミングをファイルサイズ、時間間隔で指定します。この二つの値は、どちらか一方が満たされた時点で同期が実行されます。
RCUを指定します。Dedicated Cluster とは別にChangefeed専用に費用が発生します。多くのRCUを指定すればデータ同期はスムーズになりますが、費用が上がってしまうことに注意してください。
Next
→Submit
をクリックすれば設定完了です。
この状態で以下のInsert SQLを実行します。
INSERT INTO users (id, name)
VALUES (1, 'Taro');
S3バケットでは、まずスキーマ用ディレクトリが作成され、その下にテーブルディレクトリが作成されます。
その下にCDCデータを含むcsvファイルが生成されています。
中身はこんなデータです。
"I","employees","test",2,"Taro"
"I"
はInsertを意味します。同様に"U"
はUpdate、"D"
はDeleteを意味します。
一度のSQLで複数レコードのInsertを行った場合は以下のように1行ずつ表示されます。
"I","employees","test",5,"Jiro"
"I","employees","test",10,"Saburo"
Create table
SQLの場合はCSVではなく以下のようなJSONが出力されます。
CREATE TABLE cdc (
id INT PRIMARY KEY,
name VARCHAR(100)
);
{
"Table": "test",
"Schema": "test",
"Version": 1,
"TableVersion": 453853740661473297,
"Query": "CREATE TABLE `test` (`id` INT PRIMARY KEY,`name` VARCHAR(100))",
"Type": 3,
"TableColumns": [
{
"ColumnName": "id",
"ColumnType": "INT",
"ColumnPrecision": "11",
"ColumnNullable": "false",
"ColumnIsPk": "true"
},
{
"ColumnName": "name",
"ColumnType": "VARCHAR",
"ColumnPrecision": "100"
}
],
"TableColumnsTotal": 2
}
Discussion