Pub/Sub の BigQuery Change Data Capture 機能について
1. はじめに
こんにちは、クラウドエース データML ディビジョンの木村です。
クラウドエースの IT エンジニアリングを担うシステム開発部の中で、特にデータ基盤構築・分析基盤構築からデータ分析までを含む一貫したデータ課題の解決を専門とするのがデータ ML ディビジョンです。
データ ML ディビジョンでは活動の一環として、毎週 Google Cloud の新規リリースを調査・発表し、データ領域のプロダクトのキャッチアップをしています。その中でも重要と考えるリリースを本ページのように記事として公開しています。
今回紹介するリリースは、Pub/Sub の BigQuery サブスクリプションにおける BigQuery の変更データキャプチャ(CDC) についてです。
BigQuery の CDC では、ストリーミングされた変更を処理し、既存のデータに適用することで BigQuery テーブルを更新することができます。
本記事では Pub/Sub の BigQuery サブスクリプションにおける CDC の概要と、実際の使い方について紹介していきます。
2. リリースの概要
この節では、まず BigQuery の 変更データキャプチャについて触れ、次に Pub/Sub の BigQuery サブスクリプションについて説明します。
そして、最後に今回のリリースで追加された BigQuery サブスクリプションでの変更データキャプチャについて紹介します。
BigQuery の変更データキャプチャ(CDC)
BigQuery 変更データ キャプチャ(CDC)は、ストリーミングされた変更を処理して既存のデータに適用することで、BigQuery テーブルの行を更新します。
この更新は、BigQuery Storage Write API によってリアルタイムにストリーミングされる、行の更新および削除操作によって行われます。
こちらの詳細についてはリンクからご覧いただけたらと思います。
BigQuery サブスクリプションとは
Pub/Sub のBigQuery サブスクリプションは、Pub/Sub メッセージ受信時に既存の BigQuery テーブルに直接メッセージを書き込むことができる機能です。
メッセージを BigQuery テーブルへ保存する際に Dataflow などで別のサブスクライバーを構成する必要がなく、運用負荷やコストを抑えることができます。
ワークフローのイメージは以下の通りです。
- Pub/Sub が BigQuery Storage Write API を使用して、データを BigQuery テーブルに送信します。
- メッセージは、BigQuery テーブルにバッチで送信されます。
- 書き込みオペレーションが正常に完了すると、API から OK レスポンスが返されます。
- 書き込みオペレーションでエラーが発生した場合、Pub/Sub メッセージ自体に否定応答が行われます。その後、メッセージが再送信されます。
今回のリリース内容について
今回のリリースでは、BigQuery サブスクリプションの機能として先述の BigQuery の変更データキャプチャ(CDC) が追加されました。
BigQuery サブスクリプションの BigQuery 変更データキャプチャ(CDC)を用いると、Pub/Sub メッセージを送信することで、BigQuery テーブルの行更新が可能となります。
つまり、今まで BigQuery サブスクリプションでは行の追加しかできなかったのに対し、指定した行の変更や削除も可能となりました。
Pub/Sub で BigQuery CDC を使うための注意点は、BigQuery の宛名テーブルで主キーを定義する必要があるということです。
CDC を使用するには、行の変更をストリーミングする際に、"_CHANGE_TYPE" を設定します。
"_CHANGE_TYPE" には、UPSERT と DELETE を指定できます。
- UPSERT : メッセージに指定した主キーと同じ主キーを持つレコードがテーブル内に存在する場合はそのレコードをメッセージの内容に更新し、存在しない場合はメッセージの内容を新しい行として挿入
- DELETE : メッセージに指定した主キーと同じ主キーを持つレコードがテーブル内に存在する場合はそのレコードの行を削除
3. 実際に使ってみた
流れとしては以下の通りです。
-
1, 2 → BigQuery、PubSub のセッティング
- BigQuery テーブルを用意し、メッセージを送るための Pub/Sub の設定を行う
-
3 ~ 5 → CDC を用いたデータ更新
- "_CHANGE_TYPE" を使用し、BigQuery テーブルのデータを更新、削除する
1. BigQuery テーブルの作成
まず、あらかじめ書き込み先として BigQuery テーブルを作成します。
今回は、 Id (テーブル内で一意)、 fruits (果物の名前)、 price (果物の値段) で構成されるテーブルを作成します。
- データセット名 : CDC
- テーブル名 : CDC_test
- スキーマ
- フィールド : Id (INTEGER), fruits (STRING), price (INTEGER)
- 主キー : Id
次のクエリを実行します。
CREATE TABLE CDC.CDC_test (
Id INTEGER,
fruits string,
price INTEGER,
PRIMARY KEY (Id) NOT ENFORCED
)
これで、Id を主キー(PK)とした以下のようなテーブルが作成できました。
2. Pub/Sub のセッティング
ここからは、Pub/Sub のセッティングを行なっていきます。
メッセージを送信する際に用いるスキーマを設定します。
コンソールで、Pub/Sub メニューバーから [Schemas] を選択し、[CREATE SCHEMA] をクリックします。
先ほど作成した BigQuery テーブルと同様のスキーマと、最後に "_CHANGE_TYPE" を定義しています。
"_CHANGE_TYPE" は "string" で定義します。
{
"type": "record",
"name": "Avro",
"fields": [
{
"name": "Id",
"type": "int"
},
{
"name": "fruits",
"type": "string"
},
{
"name": "price",
"type": "int"
},
{
"name": "_CHANGE_TYPE",
"type": "string"
}
]
}
次に、Topic の作成を行います。
メニューバーの [Topic] を選択し、[CREATE TOPIC] をクリックします。
[Use a shema] にチェックを入れ、[Select a Pub/Sub schema] で先ほど作成したスキーマを選択し、[CREATE] をクリックします。
続いて、トピックに紐付ける サブスクリプションを作成します。
メニューバーの [Subscription] を選択し、[CREATE SUBSCRIPTION] をクリックします。
[Subscription ID] は任意の ID を入力し、[Select a Cloud Pub/Sub topic] に先ほど作成した Topic を選択します。
BigQuery Storage Write API を使用するので、[Delivery type] で、[Write to BigQuery] を選択します。
[Dataset] と [Table] には先ほど作成したものを入力します。
その下にある [Use topic Schema] にもチェックを入れ、[CREATE] をクリックします。
これで、Pub/Sub 側の設定も完了です。
3. UPSERT を使用して BigQuery テーブルにデータを挿入する
実際に BigQuery テーブルへデータを送信していきます。
メニューバーの [Topic] から先ほど作成した トピックをクリックします。
[MESSAGES] を選択し、[Message body] に以下のように入力します。
{
"Id" : 1,
"fruits" : "orange",
"price" : 100,
"_CHANGE_TYPE" : "UPSERT"
}
"_CHANGE_TYPE" を "UPSERT" に設定している場合、同一の主キーを持たない時は新しい行を挿入します。
現在 BigQuery のテーブルは空の状態なので、送信したメッセージがテーブルに挿入されるはずです。
[PUBLISH] をクリックしてメッセージを送信します。
BigQuery のコンソール画面に戻り、以下のようなクエリを実行してテーブルの情報を確認します。
SELECT *
FROM #テーブルID
以下のように、テーブルにデータを挿入することができました。
同様にいくつかのメッセージを送信し、以下のようなテーブルを作成します。
4. UPSERT を使用して BigQuery テーブルのデータを変更する
では、オレンジの値段を、100 円から 130 円に変更してみましょう。
次のようなメッセージを送信します。
{
"Id" : 1,
"fruits" : "orange",
"price" : 130,
"_CHANGE_TYPE" : "UPSERT"
}
すると、次のような結果となります。
Id が 1 のものはすでにテーブルに存在するので、今回の場合は挿入ではなく変更となります。
画像のように、オレンジの値段が 130 円に変更されました。
また、Id が 1 の果物を ぶどう(200 円)のように変換したい場合は "Id" に 1 、"fruit" に "grape"、"price" に 200 を入力して送信すれば変更することができます。
このように、"_CHANGE_TYPE" の UPSERT では対象の主キーを指定した行の変更や、挿入をすることができます。
5. DELETE を使用して BigQuery テーブルのデータを削除する
"_CHANGE_TYPE" の DELETE を試してみましょう。
DELETE に設定すると、同じ主キーを持つ行を削除することができます。
今回は、Id が 2 の行を削除します。
以下のようにメッセージを送信します。
{
"Id" : 2,
"fruits" : "apple",
"price" : 120,
"_CHANGE_TYPE" : "DELETE"
}
そうすると、以下のように apple の列が削除されました。
補足
今回のリリースでは "_CHANGE_TYPE" の他にもう 1 つフィールドが追加されています。
それが、"_CHANGE_SEQUENCE_NUMBER" です。
CDC を使う際には メッセージのスキーマに "_CHANGE_TYPE" を定義することが必須となりますが、"_CHANGE_SEQUENCE_NUMBER" は省略可能なフィールドとなっています。
"_CHANGE_SEQUENCE_NUMBER" を使用する場合でも、BigQuery 側でのスキーマ設定は必要ありません。
"_CHANGE_SEQUENCE_NUMBER" は、BigQuery テーブルに対する更新と削除を確実に行うために設定するものであり、単調に増加する値が含まれる必要があります。
1 つの行に対して処理された最大シーケンス番号よりも小さいシーケンス番号を持つメッセージは、BigQuery テーブルの行には影響しません。
つまり、更新の一貫性と順序付けを保証するためのフィールドとなります。
例えば同じ主キーである 2 つのメッセージが送信され、片方の "_CHANGE_SEQUENCE_NUMBER" が 1 、もう一方が 2 であった場合、後者の変更が適用されます。
また、既にテーブルに存在する行であっても、それ以下の "_CHANGE_SEQUENCE_NUMBER" を持つメッセージは無視されることになります。
試しに、 "_CHANGE_SEQUENCE_NUMBER" を定義してメッセージを送ってみます。
対象のBigQuery テーブルは、3. UPSERT を使用して BigQuery テーブルにデータを挿入する で用意したものと同じものだとします。
以下のように "_CHANGE_SEQUENCE_NUMBER" を追加し、値段を 200 円に変更するようなメッセージを送信します。
{
"Id" : 1,
"fruits" : "orange",
"price" : 200,
"_CHANGE_TYPE" : "UPSERT",
"_CHANGE_SEQUENCE_NUMBER" : 2
}
"_CHANGE_TYPE" のみを指定した時と同様にオレンジの値段が変更されました。
次に、以下のように "_CHANGE_SEQUENCE_NUMBER" を 1 に設定して オレンジの値段を 130円にするようにメッセージを送信してみます。
{
"Id" : 1,
"fruits" : "orange",
"price" : 130,
"_CHANGE_TYPE" : "UPSERT",
"_CHANGE_SEQUENCE_NUMBER" : 1
}
今度は "_CHANGE_SEQUENCE_NUMBER" の値が先ほどよりも小さい値なのでテーブルの変更はされませんでした。
このように、"_CHANGE_SEQUENCE_NUMBER" を設定することでメッセージに処理順序をつけることができます。
4. まとめ
本記事では、Pub/Sub の BigQuery サブスクリプションにおける BigQuery の変更データキャプチャ(CDC) について紹介しました。
BigQuery サブスクリプションは、受信時に既存の BigQuery テーブルに直接メッセージを書き込むことができる機能です。
メッセージを BigQuery テーブルへ保存する際に 別のサブスクライバーを構成する必要がなく、運用負荷やコストを抑えることができます。
今回リリースされた BigQuery サブスクリプションの機能である変更データキャプチャ(CDC)では、Pub/Sub メッセージに "_CHANGE_TYPE" を指定することで BigQuery テーブルへの行挿入、更新、削除ができます。
また、"_CHANGE_SEQUENCE_NUMBER" を使用するとメッセージに処理順序を定義することもできます。
Pub/Sub からのメッセージを BiqQuery に書き込む際は是非この機能をご活用ください。
Discussion