💭

Pub/Sub の BigQuery Change Data Capture 機能について

2023/11/20に公開

1. はじめに

こんにちは、クラウドエース データML ディビジョンの木村です。
クラウドエースの IT エンジニアリングを担うシステム開発部の中で、特にデータ基盤構築・分析基盤構築からデータ分析までを含む一貫したデータ課題の解決を専門とするのがデータ ML ディビジョンです。

データ ML ディビジョンでは活動の一環として、毎週 Google Cloud の新規リリースを調査・発表し、データ領域のプロダクトのキャッチアップをしています。その中でも重要と考えるリリースを本ページのように記事として公開しています。

今回紹介するリリースは、Pub/Sub の BigQuery サブスクリプションにおける BigQuery の変更データキャプチャ(CDC) についてです。
BigQuery の CDC では、ストリーミングされた変更を処理し、既存のデータに適用することで BigQuery テーブルを更新することができます。
本記事では Pub/Sub の BigQuery サブスクリプションにおける CDC の概要と、実際の使い方について紹介していきます。

2. リリースの概要

この節では、まず BigQuery の 変更データキャプチャについて触れ、次に Pub/Sub の BigQuery サブスクリプションについて説明します。
そして、最後に今回のリリースで追加された BigQuery サブスクリプションでの変更データキャプチャについて紹介します。

BigQuery の変更データキャプチャ(CDC)

BigQuery 変更データ キャプチャ(CDC)は、ストリーミングされた変更を処理して既存のデータに適用することで、BigQuery テーブルの行を更新します。
この更新は、BigQuery Storage Write API によってリアルタイムにストリーミングされる、行の更新および削除操作によって行われます。
こちらの詳細についてはリンクからご覧いただけたらと思います。

Pub/Sub の BigQuery サブスクリプションとは

BigQuery サブスクリプションは、Pub/Sub メッセージ受信時に既存の BigQuery テーブルに直接メッセージを書き込むことができる機能です。
メッセージを BigQuery テーブルへ保存する際に Dataflow などで別のサブスクライバーを構成する必要がなく、運用負荷やコストを抑えることができます。

ワークフローのイメージは以下の通りです。

  1. Pub/Sub が BigQuery Storage Write API を使用して、データを BigQuery テーブルに送信します。
  2. メッセージは、BigQuery テーブルにバッチで送信されます。
  3. 書き込みオペレーションが正常に完了すると、API から OK レスポンスが返されます。
  4. 書き込みオペレーションでエラーが発生した場合、Pub/Sub メッセージ自体に否定応答が行われます。その後、メッセージが再送信されます。

今回のリリース内容について

今回のリリースでは、BigQuery サブスクリプションの機能として先述の BigQuery の変更データキャプチャ(CDC) が追加されました。
BigQuery サブスクリプションの BigQuery 変更データキャプチャ(CDC)を用いると、Pub/Sub メッセージを送信することで、BigQuery テーブルの行更新が可能となります。
つまり、今まで BigQuery サブスクリプションでは行の追加しかできなかったのに対し、指定した行の変更や削除も可能となりました。

Pub/Sub で BigQuery CDC を使うための注意点は、BigQuery の宛名テーブルで主キーを定義する必要があるということです。

CDC を使用するには、行の変更をストリーミングする際に、"_CHANGE_TYPE" を設定します。
"_CHANGE_TYPE" には、UPSERTDELETE を指定できます。

  • UPSERT : メッセージに指定した主キーと同じ主キーを持つレコードがテーブル内に存在する場合はそのレコードをメッセージの内容に更新し、存在しない場合はメッセージの内容を新しい行として挿入
  • DELETE : メッセージに指定した主キーと同じ主キーを持つレコードがテーブル内に存在する場合はそのレコードの行を削除

3. 実際に使ってみた

流れとしては以下の通りです。

  • 1, 2 → BigQuery、PubSub のセッティング
    • BigQuery テーブルを用意し、メッセージを送るための Pub/Sub の設定を行う
  • 3 ~ 5 → CDC を用いたデータ更新
    • "_CHANGE_TYPE" を使用し、BigQuery テーブルのデータを更新、削除する

1. BigQuery テーブルの作成

まず、あらかじめ書き込み先として BigQuery テーブルを作成します。
今回は、 Id (テーブル内で一意)、 fruits (果物の名前)、 price (果物の値段) で構成されるテーブルを作成します。

  • データセット名 : CDC
  • テーブル名 : CDC_test
  • スキーマ
    • フィールド : Id (INTEGER), fruits (STRING), price (INTEGER)
  • 主キー : Id

次のクエリを実行します。

CREATE TABLE CDC.CDC_test (
  Id     INTEGER,
  fruits string,
  price  INTEGER,

  PRIMARY KEY (Id) NOT ENFORCED
)

これで、Id を主キー(PK)とした以下のようなテーブルが作成できました。

2. Pub/Sub のセッティング

ここからは、Pub/Sub のセッティングを行なっていきます。
メッセージを送信する際に用いるスキーマを設定します。

コンソールで、Pub/Sub メニューバーから [Schemas] を選択し、[CREATE SCHEMA] をクリックします。

先ほど作成した BigQuery テーブルと同様のスキーマと、最後に "_CHANGE_TYPE" を定義しています。
"_CHANGE_TYPE" は "string" で定義します。

{
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "Id",
      "type": "int"
    },
    {
      "name": "fruits",
      "type": "string"
    },
    {
      "name": "price",
      "type": "int"
    },
    {
      "name": "_CHANGE_TYPE",
      "type": "string"
    }
  ]
}

次に、Topic の作成を行います。
メニューバーの [Topic] を選択し、[CREATE TOPIC] をクリックします。

[Use a shema] にチェックを入れ、[Select a Pub/Sub schema] で先ほど作成したスキーマを選択し、[CREATE] をクリックします。

続いて、トピックに紐付ける サブスクリプションを作成します。
メニューバーの [Subscription] を選択し、[CREATE SUBSCRIPTION] をクリックします。

[Subscription ID] は任意の ID を入力し、[Select a Cloud Pub/Sub topic] に先ほど作成した Topic を選択します。

BigQuery Storage Write API を使用するので、[Delivery type] で、[Write to BigQuery] を選択します。
[Dataset][Table] には先ほど作成したものを入力します。
その下にある [Use topic Schema] にもチェックを入れ、[CREATE] をクリックします。

これで、Pub/Sub 側の設定も完了です。

3. UPSERT を使用して BigQuery テーブルにデータを挿入する

実際に BigQuery テーブルへデータを送信していきます。
メニューバーの [Topic] から先ほど作成した トピックをクリックします。
[MESSAGES] を選択し、[Message body] に以下のように入力します。

{
    "Id" : 1,
    "fruits" : "orange",
    "price" : 100, 
    "_CHANGE_TYPE" : "UPSERT"
}

"_CHANGE_TYPE""UPSERT" に設定している場合、同一の主キーを持たない時は新しい行を挿入します。
現在 BigQuery のテーブルは空の状態なので、送信したメッセージがテーブルに挿入されるはずです。

[PUBLISH] をクリックしてメッセージを送信します。
BigQuery のコンソール画面に戻り、以下のようなクエリを実行してテーブルの情報を確認します。

SELECT *
FROM #テーブルID

以下のように、テーブルにデータを挿入することができました。

同様にいくつかのメッセージを送信し、以下のようなテーブルを作成します。

4. UPSERT を使用して BigQuery テーブルのデータを変更する

では、オレンジの値段を、100 円から 130 円に変更してみましょう。
次のようなメッセージを送信します。

{
    "Id" : 1,
    "fruits" : "orange",
    "price" : 130, 
    "_CHANGE_TYPE" : "UPSERT"
}

すると、次のような結果となります。

Id が 1 のものはすでにテーブルに存在するので、今回の場合は挿入ではなく変更となります。
画像のように、オレンジの値段が 130 円に変更されました。
また、Id が 1 の果物を ぶどう(200 円)のように変換したい場合は "Id" に 1 、"fruit" に "grape"、"price" に 200 を入力して送信すれば変更することができます。

このように、"_CHANGE_TYPE"UPSERT では対象の主キーを指定した行の変更や、挿入をすることができます。

5. DELETE を使用して BigQuery テーブルのデータを削除する

"_CHANGE_TYPE"DELETE を試してみましょう。
DELETE に設定すると、同じ主キーを持つ行を削除することができます。
今回は、Id が 2 の行を削除します。
以下のようにメッセージを送信します。

{
    "Id" : 2,
    "fruits" : "apple",
    "price" : 120, 
    "_CHANGE_TYPE" : "DELETE"
}

そうすると、以下のように apple の列が削除されました。

補足

今回のリリースでは "_CHANGE_TYPE" の他にもう 1 つフィールドが追加されています。
それが、"_CHANGE_SEQUENCE_NUMBER" です。
CDC を使う際には メッセージのスキーマに "_CHANGE_TYPE" を定義することが必須となりますが、"_CHANGE_SEQUENCE_NUMBER" は省略可能なフィールドとなっています。
"_CHANGE_SEQUENCE_NUMBER" を使用する場合でも、BigQuery 側でのスキーマ設定は必要ありません。

"_CHANGE_SEQUENCE_NUMBER" は、BigQuery テーブルに対する更新と削除を確実に行うために設定するものであり、単調に増加する値が含まれる必要があります。
1 つの行に対して処理された最大シーケンス番号よりも小さいシーケンス番号を持つメッセージは、BigQuery テーブルの行には影響しません。
つまり、更新の一貫性と順序付けを保証するためのフィールドとなります。

例えば同じ主キーである 2 つのメッセージが送信され、片方の "_CHANGE_SEQUENCE_NUMBER" が 1 、もう一方が 2 であった場合、後者の変更が適用されます。
また、既にテーブルに存在する行であっても、それ以下の "_CHANGE_SEQUENCE_NUMBER" を持つメッセージは無視されることになります。

試しに、 "_CHANGE_SEQUENCE_NUMBER" を定義してメッセージを送ってみます。
対象のBigQuery テーブルは、3. UPSERT を使用して BigQuery テーブルにデータを挿入する で用意したものと同じものだとします。

以下のように "_CHANGE_SEQUENCE_NUMBER" を追加し、値段を 200 円に変更するようなメッセージを送信します。

{
    "Id" : 1,
    "fruits" : "orange",
    "price" : 200, 
    "_CHANGE_TYPE" : "UPSERT",
    "_CHANGE_SEQUENCE_NUMBER" : 2
}

"_CHANGE_TYPE" のみを指定した時と同様にオレンジの値段が変更されました。

次に、以下のように "_CHANGE_SEQUENCE_NUMBER" を 1 に設定して オレンジの値段を 130円にするようにメッセージを送信してみます。

{
    "Id" : 1,
    "fruits" : "orange",
    "price" : 130, 
    "_CHANGE_TYPE" : "UPSERT",
    "_CHANGE_SEQUENCE_NUMBER" : 1
}

今度は "_CHANGE_SEQUENCE_NUMBER" の値が先ほどよりも小さい値なのでテーブルの変更はされませんでした。

このように、"_CHANGE_SEQUENCE_NUMBER" を設定することでメッセージに処理順序をつけることができます。

4. まとめ

本記事では、Pub/Sub の BigQuery サブスクリプションにおける BigQuery の変更データキャプチャ(CDC) について紹介しました。
BigQuery サブスクリプションは、受信時に既存の BigQuery テーブルに直接メッセージを書き込むことができる機能です。
メッセージを BigQuery テーブルへ保存する際に 別のサブスクライバーを構成する必要がなく、運用負荷やコストを抑えることができます。

今回リリースされた BigQuery サブスクリプションの機能である変更データキャプチャ(CDC)では、Pub/Sub メッセージに "_CHANGE_TYPE" を指定することで BigQuery テーブルへの行挿入、更新、削除ができます。
また、"_CHANGE_SEQUENCE_NUMBER" を使用するとメッセージに処理順序を定義することもできます。

Pub/Sub からのメッセージを BiqQuery に書き込む際は是非この機能をご活用ください。

Discussion