🌊

Datastream for BigQuery を本番導入しようとしている話

2023/06/30に公開

オープンロジという物流テック企業でエンジニアをやっている阿部です。オープンロジではデータ基盤の整備を進めており、その第一弾として Datastream for BigQuery を本番導入しようとしています。導入の経緯や、これまでに調査して分かったコトなどを書いていきたいと思います。

トリッキーな挙動になることもありますが、総じて非常に優れたマネージドサービスです。RDB と BigQuery を同期する仕組みとしては、かなり優秀です。まだ、本格的な稼働はしていませんが、いずれオープンロジのデータ基盤の中核を担うシステムになると思っています。

Datastream for BigQuery とは?

https://cloud.google.com/blog/ja/products/data-analytics/introducing-seamless-database-replication-to-bigquery

MySQL, PostgreSQL などのリレーショナルデータベース (RDB) から BigQuery に準リアルタイムでデータを同期する Google Cloud のサービスです。RDB がレプリケーションに利用する binary log (binlog) を Datastream が加工して BigQuery に差分同期する仕組みになっています。

導入の経緯

オープンロジでは「テクノロジーを使い、サイロされた物流をネットワーク化し、データを起点にモノの流れを革新する」というビジョンを掲げています。このビジョンが示す通り、社内では積極的にデータを活用しようという雰囲気があり、BigQuery に様々なデータを集約し、ダッシュボードや分析などの業務に活用しています。

今年に入り、長年運用してきたデータ基盤の課題が浮き彫りになりつつありため、データ基盤の刷新を進めています。特に本番データの同期は大きな問題になっており、最初に着手することになりました。従来は、セルフホスティングした Embulk を使って、本番アプリケーションの RDB のデータを BigQuery に差分同期していましたが、いくつかの問題を抱えています。

  • データ鮮度が悪い:同期が1日1回なので、当日分のデータを見ることができません。商品の出荷など1日に数回ほど重要な業務イベントが発生するので、当日データを見たいという要望がありました。
  • データ不整合:RDB と BigQuery のデータに矛盾があり、信頼性という点で大きな課題がありました。前提として、Embulk では RDB の各テーブルにある更新日時カラム (updated_at) が前日のレコードを BigQuery に INSERT する仕組みになっています。一部のテーブルに ON UPDATE CURRENT_TIMESTAMP が付与されていなかったり、updated_at をアプリケーション側で特殊な用途で利用しているなどの事情により、updated_at がレコードの更新日時としての正確性を欠いており、差分更新がうまくいかない状況に陥っていました。
  • Embulk の運用が面倒:Embulk 自体のバージョンアップやテーブルのスキーマ変更への追従などでしばしば運用の手間が発生していました。

これらの問題を解決する手段として、最近リリースされた Datastream for BigQuery に白羽の矢が立ちました。

インフラ構成

インフラ構成図: RDS -> Datastream -> BigQuery

本番 RDS (MySQL) から Datastream に binlog を流し、Datastream から BigQuery に書き込むという流れになります。RDS は AWS、Datastream, BigQuery は GCP に置いてあるので、直接通信はできません。今回はクイックに踏み台 EC2 で SSH port forwarding する方式をとっていますが、そのうち VPC peering に切り替えたいと思っています。

RDS の設定

https://cloud.google.com/datastream/docs/configure-your-source-mysql-database?hl=ja

基本的には、上記のドキュメントに書いてある通り、

  • 適切なパラメータグループを RDS に設定する(リードレプリカのみで良い)
  • RDS のバックアップ保存期間を 7 日にする
  • binlog 保存期間を変更する(リードレプリカのみで良い)
    call mysql.rds_set_configuration('binlog retention hours', 168);
    
  • Datastream ユーザーを作成する
    CREATE USER 'datastream'@'%' IDENTIFIED BY '[YOUR_PASSWORD]';
    GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO 'datastream'@'%';
    FLUSH PRIVILEGES;
    

上記の設定を行い、RDS を再起動すれば、Datastream の同期ができるようになります。

データ操作の反映

RDB 側で発行した SQL 文によりデータが変更された場合、どのように BigQuery に反映されるか事前に調査しました。

DML (Data Manipulation Language)

DML はデータを操作する SQL 文で、一般的には SELECT, INSERT, UPDATE, DELETE が含まれます。今回は、INSERT, UPDATE, DELETE がデータを変更する可能性があるため、BigQuery 上でどのような挙動になるか確認しています。

主キーのあるテーブル

RDB 側で主キーが設定されているテーブルの場合、(同期遅延を除けば)BigQuery に RDB の最新のスナップショットが反映されるような振る舞いになります。
BigQuery の主キーの話ではありません。

INSERT 文については、RDB 側に INSERT したレコードに対応するレコードが BigQuery にも作成されます。同様に UPDATE すると対応するレコードが BigQuery で上書き更新され、DELETE すると対応するレコードが BigQuery で削除されます。要するに、UPDATE, DELETE されたレコードは最新の状態のみが BigQuery に反映され、過去の値は保持されません。

Datastream で同期されたレコードには、下図のように datastream_metadata というカラムが BigQuery 側で付与されます。uuid は恐らくレコードのハッシュ値のようなもので、UPDATE すると変化します。source_timestamp はレコードの最終更新タイムスタンプです。

datastream_metadata の例

主キーのないテーブル

RDB 側で主キーが設定されていないテーブルの場合、過去の更新履歴が全て残るような振る舞いになります。

INSERT の振る舞いは同じですが、UPDATE されると既存のレコードが上書きされず、新しい値のレコードが挿入されます。古いレコードが残留するため、過去の更新履歴レコードが全て残るような振る舞いになります。この場合、source_timestamp が最新のレコードを持ってくると、RDB のデータと辻褄が合います。

DELETE された場合、datastream_metadata.is_deletedtrue のレコードが挿入されます。is_deleted フィールドは主キーのあるテーブルを同期する場合には存在せず、主キーのないテーブルを同期した場合にのみ、追加されます。

datastream_metadata.is_deleted の例

ちなみに、こんな感じのクエリをかけば、主キーのあるテーブルと同じ結果になるはずです。

SELECT * EXCEPT(_rn)
FROM (
  SELECT
    *, ROW_NUMBER() OVER (
      PARTITION BY id
      ORDER BY datastream_metadata.source_timestamp DESC
    ) AS _rn
  FROM my_awesome_table
) t
WHERE _rn = 1 AND datastream_metadata.is_deleted = false

DDL (Data Definition Language)

CREATE TABLE や ALTER TABLE のようにテーブルの構造を変化させる SQL 文がどのように作用するかも調べました。基本的に、Datastream では DDL が直ちに BigQuery に反映されるわけではなく、DML によるデータ変更が反映されたタイミングにテーブル構造の変化も適用されます。例えば、ALTER TABLE ADD COLUMN だけでは BigQuery 上のテーブル構造は変化せず、そのテーブルに INSERT や UPDATE が走ったタイミングで初めてカラム追加が反映されます。

2023年2月時点では、以下のような振る舞いになりました。この辺りはバージョンアップで挙動が変化する可能性があります。DROP 系や Generated Column は少しトリッキーな挙動になるので、注意が必要です。

  • カラムの NOT NULL 制約
    • 全て無視され、BigQuery 上では全てのカラムが NULLABLE として扱われる。
  • テーブル・カラムのコメント
    • BigQuery には反映されない。
  • CREATE TABLE
    • Datastream で全テーブルを同期する設定にしている場合、新規テーブルは自動的に同期される。
    • RDB の主キーでクラスタ化されたテーブルが作成される(パーティションは未設定)
  • DROP TABLE
    • RDB 側でテーブルを削除しても、BigQuery 側では対応するテーブルが削除されない。
  • ALTER TABLE ADD COLUMN
    • BigQuery 上でも対応するカラムが追加される。
  • ALTER TABLE DROP COLUMN
    • BigQuery 上では対応するカラムは削除されず残留する。
    • 全てのレコードで対応するカラムの値が NULL に置き換わる。
  • ALTER TABLE RENAME COLUMN
    • 変更後の名前のカラムが追加される。
    • 変更前のカラムの値が NULL に置き換わり、カラム自体は残留する。
  • ALTER TABLE CHANGE COLUMN(型の変更)
    • 既存のカラムの型を変更した場合、BigQuery 上でも対応するカラムの型が変更される。
    • ここまで変更を反映してくれると思っていなかったので、少し驚いた。
  • ALTER TABLE DROP PARTITION
    • パーティションを DROP しても BigQuery からはデータが削除されない。
    • DELETE とは挙動が違うので注意。おそらく DML による削除しか反映されない。
    • 一方で、パーティションを追加した場合は、BigQuery 上にも対応するレコードが追加される。
  • MySQL Generated Column の追加
    • Generated Column を新規に追加した場合、カラム追加後に INSERT, UPDATE されたレコードだけ Generated Column の値が格納される。
    • カラム追加前から変更のないレコードでは、カラムの値は NULL になる。
    • ※ Generated Column は計算結果を値として仮想的に持っておくためのカラム。
  • MySQL Generated Column の計算式の変更
    • 計算式変更後に INSERT, UPDATE されたレコードだけ Generated Column の値が更新される。
    • 計算式変更前から変更のないレコードでは、古い計算式の計算結果がそのまま残る。

パーティション化テーブルへの書き込み

Datastream は愚直に設定すると、BigQuery 上にパーティションが設定されていないテーブルを作成して、そこに同期してしまいます。小さいテーブルだと問題になりませんが、巨大なテーブルではスキャンサイズ(=クエリコスト)が増大する恐れがあります。BigQuery 上でパーティション化テーブルに書き込むにはちょっと工夫が必要なので、その手順について説明します。

まず、Datastream を愚直に設定して同期処理を開始し、BigQuery にテーブルが作成されたらすぐに同期を停止します。この段階で、BigQuery にパーティションが設定されていないテーブルが作成された状態になります。このテーブルの DDL は以下の SQL で確認できます。

SELECT ddl
FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES`
WHERE table_name = '{table-name}'

例えば、こんな感じの DDL が返ってくると思います。

CREATE TABLE `{project}.{dataset}.{table-name}`
(
  id INT64,
  name STRING,
  created_at DATETIME,
  datastream_metadata STRUCT<uuid STRING, source_timestamp INT64>,
  PRIMARY KEY (id) NOT ENFORCED
)
CLUSTER BY id
OPTIONS(
  max_staleness=INTERVAL '0-0 0 0:15:0' YEAR TO SECOND
);

DDL を確認したら、テーブルを一旦 DROP します。そして、パーティションを設定したテーブルを新たに作り直します。PARTITON BY 以外の部分を変更すると同期に失敗する可能性があるので、注意してください。

DROP TABLE `{project}.{dataset}.{table-name}`;
CREATE TABLE `{project}.{dataset}.{table-name}`
(
  id INT64,
  name STRING,
  created_at DATETIME,
  datastream_metadata STRUCT<uuid STRING, source_timestamp INT64>,
  PRIMARY KEY (id) NOT ENFORCED
)
PARTITION BY DATE(created_at) -- ここを追加
CLUSTER BY id
OPTIONS(
  max_staleness=INTERVAL '0-0 0 0:15:0' YEAR TO SECOND
);

ここまでできたら、Datastream の同期を再開して、該当テーブルにバックフィルをかけます。

以下の SQL を BigQuery に投げて、コメントの有無でスキャンサイズが変化すれば成功です。

SELECT *
FROM `{project}.{dataset}.{table-name}`
-- WHERE created_at >= "YYYY-MM-DD" -- 適当な日付を入れる

BigQuery へのデータ反映の課金

Datastream は以下のような SQL を定期的に BigQuery に投げることで、差分データを BigQuery に適用します。

CALL BQ.APPLY_UPSERT_STREAM({project}.{dataset}.{table}')

この SQL は BigQuery 上だと Analysis pricing(クエリ実行の料金)として課金されます ($6/TB)。Streaming insert とかで課金されるわけではないの注意してください。上記の SQL によるスキャンサイズと Datastream の CDC データ量は一致しているようです。Datastream は CDC の処理データ量で課金されるので、Datastream for BigQuery の料金を見積もる時は参考にしてください。(そもそも CDC のデータ量を見積もるのが難しいですが)

RDB の負荷調整

Datastream には通常の差分同期を行う CDC (Change Data Capture) と全てのデータを一括で同期するバックフィルの 2 つの動作があります。両方とも RDB に負荷がかかりますが、並列実行するタスク数を制限することができます。

  • maxConcurrentCdcTasks: CDC の並列実行タスク数の上限
  • maxConcurrentBackfillTasks: バックフィルの並列実行タスク数の上限

terraform provider も対応しています。

https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/datastream_stream#max_concurrent_cdc_tasks

バックフィルだと巨大なテーブルをフルスキャンするため、かなりの負荷がかかります。設定しておいた方が無難でしょう。

監視

Datadog の GCP インテグレーションにより、stream freshness が計測できるので、Datadog を使って監視しています。また、Cloud Logging に Datastream のログが書き込まれるので、これもいずれ監視できるようにしたいと思っています。

OPENLOGI Tech Blog

Discussion