🐕

TiDB Dedicated Cluster の Changefeed 機能を試す

2024/11/11に公開

普段このブログシリーズではTiDB Serverlessの無償で使える機能を中心にお届けしていますが、今回PingCapの関口さんにご相談させていただき、前から気になったていたDedicated ClusterのChangefeed機能を期間限定で触らせていただけるようになりましたので早速試してみました。

CDC と Changefeed

CDC (Change Data Capture)は一般的に用いられる言葉です。データベースのテーブルなどに対して何かの変更(Insert,Delete,Update等)が発生したタイミングでイベントが発生しデータ同期を行うのに必要なデータが指定した先に送出されます。この機能をTiDBではChangefeedと呼んでいます。
多少の遅延は発生する物の準リアルタイムでターゲットにデータを伝搬できるのが特徴です。

https://docs.pingcap.com/tidb/v5.4/ticdc-overview
にその機能概要がまとまっていますが、TiDBではChangefeed送出基盤を追加で作ることでその機能が実装されるようです。

この図でいうTiCDCという基盤がその機能を実装しています。とはいえTiDB CloudのDedicatedインスタンスでは自動で作成してくれますので特に意識する必要がありません。
一方、自前でインストールするバージョンではこの辺りの構築作業が発生します。構築手順はこちら。
https://docs.pingcap.com/tidb/v5.4/deploy-ticdc

Changefeed の対象

TiDBでは以下をChangefeedのデータ送出先としてサポートしています。
・MySQL
・Apache Kafka
・TiDB Cloud Serverless クラスター
・Amazon S3 or GCS
今日はAmazon S3を試します。

さっそくやってみる

では早速やっていきます。TiDB Cloud Dedicated Cluster の構築と外部からの接続はこの記事では割愛しますが以下にその手順がまとまっています。
https://docs.pingcap.com/ja/tidbcloud/connect-to-tidb-cluster

まず以下のSQLでテスト用のテーブルを作成しておきます。

CREATE TABLE users (
    id INT PRIMARY KEY,
    name VARCHAR(100)
);

次にTiDB Cloudのマネージメントコンソール、左ペインからChangefeedをクリックします。

画面右上Create Changefeedボタンを押します。
Amazon S3をせんたくします。Serverlessクラスターがグレーアウトされているのは同じリージョンにサーバレスクラスターが無いためです。

S3バケット名とIAMクレデンシャルを入力してNextをクリックします。

CDCをChangefeed対象にするテーブルを選択します。

イベントを発行するトリガー(DDLやDML)を選ぶこともできますが、一旦すべてで進めます。

出力形式を選択します。

ファイルフォーマットはCSVかCanal-JSONを選ぶことが出来ます。バイナリー形式のデータはBase64HEXを選ぶことが出来ます。

Cacal-JSON とは

Canalというツールがキャプチャしたデータベースの変更内容(INSERT、UPDATE、DELETE)をJSON形式で表現したフォーマットです。Canalは、MySQLやMariaDBなどのデータベースからバイナリログ(binlog)を利用して、データの変更をリアルタイムで取得し、その情報を他のシステムにストリーミングするツールです。このデータフォーマットをJSON形式にしたものがCanal-JSONです。ざっくりCDCにおける業界デファクトファイルフォーマットととらえても問題なさそうです。以下に例を記載しておきます。

{
  "database": "test_db",
  "table": "user",
  "type": "UPDATE",
  "before": {
    "id": 1,
    "name": "John Doe",
    "age": 30
  },
  "after": {
    "id": 1,
    "name": "John Doe",
    "age": 31
  },
  "ts": 1612908474000
}

では今回は使い慣れたCSVを指定し、CSVのデータフォーマットを指定します。すべてデフォルトで進めます。

データの同期タイミングをファイルサイズ、時間間隔で指定します。この二つの値は、どちらか一方が満たされた時点で同期が実行されます。

RCUを指定します。Dedicated Cluster とは別にChangefeed専用に費用が発生します。多くのRCUを指定すればデータ同期はスムーズになりますが、費用が上がってしまうことに注意してください。
NextSubmitをクリックすれば設定完了です。

この状態で以下のInsert SQLを実行します。

INSERT INTO users (id, name)
VALUES (1, 'Taro');

S3バケットでは、まずスキーマ用ディレクトリが作成され、その下にテーブルディレクトリが作成されます。

その下にCDCデータを含むcsvファイルが生成されています。

中身はこんなデータです。

"I","employees","test",2,"Taro"

"I"はInsertを意味します。同様に"U"はUpdate、"D"はDeleteを意味します。
一度のSQLで複数レコードのInsertを行った場合は以下のように1行ずつ表示されます。

"I","employees","test",5,"Jiro"
"I","employees","test",10,"Saburo"

Create tableSQLの場合はCSVではなく以下のようなJSONが出力されます。

CREATE TABLE cdc (
    id INT PRIMARY KEY,
    name VARCHAR(100)
);
{
    "Table": "test",
    "Schema": "test",
    "Version": 1,
    "TableVersion": 453853740661473297,
    "Query": "CREATE TABLE `test` (`id` INT PRIMARY KEY,`name` VARCHAR(100))",
    "Type": 3,
    "TableColumns": [
        {
            "ColumnName": "id",
            "ColumnType": "INT",
            "ColumnPrecision": "11",
            "ColumnNullable": "false",
            "ColumnIsPk": "true"
        },
        {
            "ColumnName": "name",
            "ColumnType": "VARCHAR",
            "ColumnPrecision": "100"
        }
    ],
    "TableColumnsTotal": 2
}

Discussion