🎏

Spanner Change StreamsをDataflowで触ってみた

2022/06/30に公開

はじめに

Spanner Change Streamsの概要の続きで、試しにSpanner Change StreamsをDataflowからGCSに出力してみた記録です。

Google公式のテンプレートを使ってDMLを実行後に実際どんなデータが出てくるのかを見ます。

環境構築

実験用のGCPプロジェクトが作成済みで下記コマンドで認証完了していることを前提とします。

gcloud auth login
gcloud auth application-default login

途中のオペレーションでspanner-cliも使いますのでインストールしておいて下さい。
主に以下3つを作成します。

  • Spanner
  • GCS
  • Dataflow Job (Streaming)

構築前の準備

以下の環境変数が設定されていることを前提にコマンドを記載します。
空欄orコメントのところは各自の環境で変わるところだと思うので適宜入れて下さい。

export PROJECT_ID=

export SPANNER_INSTANCE_ID=
export SPANNER_DATABASE_ID=
export SPANNER_CHANGE_STREAM_NAME=ItemsTableStream

export GCS_LOCATION=asia-northeast1
export GCS_BUCKET=#no need gs://

export DATAFLOW_REGION=asia-northeast1
export DATAFLOW_WORKER_REGION=asia-northeast1

加えて下記のMakefileも配置しておいて下さい。
(コマンドベタ貼りだと検証時に面倒なので)

spanner-create:
	gcloud --project $$PROJECT_ID spanner instances create $$SPANNER_INSTANCE_ID --processing-units 100 --config regional-asia-northeast1 --description $$SPANNER_INSTANCE_ID
	gcloud --project $$PROJECT_ID spanner databases create $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID --ddl-file schema.sql

spanner-reset:
	-gcloud --project $$PROJECT_ID spanner databases delete $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID -q
	-gcloud --project $$PROJECT_ID spanner databases create $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID --ddl-file schema.sql

spanner-delete:
	-gcloud --project $$PROJECT_ID spanner instances delete $$SPANNER_INSTANCE_ID -q

gcs-create:
	gsutil mb -p $$PROJECT_ID -l asia-northeast1 gs://$$GCS_BUCKET

gcs-delete:
	-gsutil rm -r gs://$$GCS_BUCKET
	-gsutil rb gs://$$GCS_BUCKET

DATAFLOW_JOB_NAME := cs-gcs-`uuidgen | tr "[:upper:]" "[:lower:]"`
dataflow-create:
	gcloud --project $$PROJECT_ID dataflow flex-template run $(DATAFLOW_JOB_NAME) --region $$DATAFLOW_REGION --worker-region $$DATAFLOW_WORKER_REGION \
		--template-file-gcs-location gs://dataflow-templates/latest/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
		--parameters \
	spannerProjectId=$$PROJECT_ID,\
	spannerInstanceId=$$SPANNER_INSTANCE_ID,\
	spannerDatabase=$$SPANNER_DATABASE_ID,\
	spannerMetadataInstanceId=$$SPANNER_INSTANCE_ID,\
	spannerMetadataDatabase=$$SPANNER_DATABASE_ID,\
	spannerChangeStreamName=$$SPANNER_CHANGE_STREAM_NAME,\
	gcsOutputDirectory=gs://$$GCS_BUCKET

dataflow-firewall:
	gcloud --project $$PROJECT_ID compute firewall-rules create dataflow-nodes \
		--action=allow \
		--direction=ingress \
		--network=default  \
		--target-tags=dataflow \
		--source-tags=dataflow \
		--priority=0 \
		--rules tcp:12345-12346

dataflow-delete:
	gcloud --project $$PROJECT_ID dataflow jobs cancel ${JOB_ID} --region $$DATAFLOW_REGION

up: spanner-create gcs-create dataflow-create

clean: dataflow-delete spanner-delete gcs-delete

以下のスキーマを schema.sql として保存しmakeファイルと同じディレクトリに配置しておいて下さい。

CREATE TABLE Users (
    UserID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
    Age INT64 NOT NULL,
) PRIMARY KEY (UserID);

CREATE TABLE Items (
    ItemID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
) PRIMARY KEY (ItemID);

CREATE CHANGE STREAM EverythingStream FOR ALL;

CREATE CHANGE STREAM UsersTableStream FOR Users;

CREATE CHANGE STREAM ItemsTableStream FOR Items;

CREATE CHANGE STREAM UsersNameAndItemsNameColumnsStream FOR Users(Name), Items(Name);

構築

$ make up
gcloud --project $PROJECT_ID spanner instances create $SPANNER_INSTANCE_ID --processing-units 100 --config regional-asia-northeast1 --description $SPANNER_INSTANCE_ID
Creating instance...done.
gcloud --project $PROJECT_ID spanner databases create $SPANNER_DATABASE_ID --instance $SPANNER_INSTANCE_ID --ddl-file schema.sql
Creating database...done.
gsutil mb -p $PROJECT_ID -l asia-northeast1 gs://$GCS_BUCKET
Creating gs://change-stream-bucket/...
gcloud --project $PROJECT_ID dataflow flex-template run cs-gcs-`uuidgen | tr "[:upper:]" "[:lower:]"` --region $DATAFLOW_REGION --worker-region $DATAFLOW_WORKER_REGION \
		--template-file-gcs-location gs://dataflow-templates/latest/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
		--parameters \
	spannerProjectId=$PROJECT_ID,\
	spannerInstanceId=$SPANNER_INSTANCE_ID,\
	spannerDatabase=$SPANNER_DATABASE_ID,\
	spannerMetadataInstanceId=$SPANNER_INSTANCE_ID,\
	spannerMetadataDatabase=$SPANNER_DATABASE_ID,\
	spannerChangeStreamName=$SPANNER_CHANGE_STREAM_NAME,\
	gcsOutputDirectory=gs://$GCS_BUCKET
job:
  createTime: '2022-06-27T15:01:10.118891Z'
  currentStateTime: '1970-01-01T00:00:00Z'
  id: 2022-06-27_08_01_09-801665975291223129
  location: asia-northeast1
  name: cs-gcs-b5a461ef-4734-4533-b708-373769864efd
  projectId: xxx
  startTime: '2022-06-27T15:01:10.118891Z'

確認

Spanner

4つのStreamが緑色になっていればOK
(ちなみに今回実際使うのは ItemsTableStream だけです)

GCS

空のBucketが存在すればOK

Dataflow

StreamのJobが作成されステータスが Running になればOK
裏で稼働するGCEが立ち上がるのに数分程度かかるので少し待つ必要があります。

もしJobを覗いてみて「インスタンス同士が通信できない」という趣旨の警告が出ている場合は make dataflow-firewall を試してみて下さい。Makefileを見ればわかりますが、デフォルトネットワーク内で dataflow タグを持つパケットを許可するルールを追加します。

クエリーを投げて挙動を確かめる

INSERT

まずはデータの新規作成から。
極単純なINSERTを発行後SELECTし実際に書き込みされていることを確認します。

$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "INSERT INTO Items (ItemID, Name) VALUES ('aaa', 'bbb')"
$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "SELECT * FROM Items"
ItemID	Name
aaa	bbb

その後しばらくするとGCSに output2022-06-23T22_15_00.000Z-2022-06-23T22_20_00.000Z-pane-0-last-13-of-20.avro という具合のavroファイルが生成されます。
これがStreamから出力された内容でトランザクションID等のメタデータも含まれています。

適当なリーダーでJSONパースしてやると下記のような具合になります。
(個人環境で漏洩を気にしなくてよいのであれば解析してくれるWebフォームに投げるのが手っ取り早いかと思います)。

{
    "partitionToken" : "__8BAYEGmLsIUAABgsBYg0l0ZW1zVGFibGVTdHJlYW0AAYSBBgxG3FAASIKAgwjDZAAAAAAAAIQE-27ed4VnNzJfMTI4NzMxNTcAAf__hf8F4iR4_893hv8F4iUnQ9wnh4DAZAEB__8",
    "commitTimestamp" : "2022-06-23 18:15:44.000074",
    "serverTransactionId" : "MTcxNTIwMDY3Mjk1OTM2NTEyNTM=",
    "isLastRecordInTransactionInPartition" : true,
    "recordSequence" : "00000000",
    "tableName" : "Items",
    "rowType" : [
        {
            "name" : "ItemID",
            "Type" : "STRING",
            "isPrimaryKey" : true,
            "ordinalPosition" : 1
        },
        {
            "name" : "Name",
            "Type" : "STRING",
            "isPrimaryKey" : false,
            "ordinalPosition" : 2
        }
    ],
    "mods" : [
        {
            "keysJson" : "{"ItemID":"aaa"}",
            "oldValuesJson" : "{}",
            "newValuesJson" : "{"Name":"bbb"}"
        }
    ],
    "modType" : "INSERT",
    "valueCaptureType" : "OLD_AND_NEW_VALUES",
    "numberOfRecordsInTransaction" : 1,
    "numberOfPartitionsInTransaction" : 1,
    "metadata" : {
        "partitionToken" : "__8BAYEGmLsIUAABgsBYg0l0ZW1zVGFibGVTdHJlYW0AAYSBBgxG3FAASIKAgwjDZAAAAAAAAIQE-27ed4VnNzJfMTI4NzMxNTcAAf__hf8F4iR4_893hv8F4iUnQ9wnh4DAZAEB__8",
        "recordTimestamp" : "2022-06-23 18:15:44.000074",
        "partitionStartTimestamp" : "2022-06-23 18:07:57.000681",
        "partitionEndTimestamp" : "9999-12-31 18:59:59.000999",
        "partitionCreatedAt" : "2022-06-23 18:10:43.000234",
        "partitionScheduledAt" : "2022-06-23 18:10:43.000964",
        "partitionRunningAt" : "2022-06-23 18:10:43.000982",
        "queryStartedAt" : "0001-01-02 19:00:00.000000",
        "recordStreamStartedAt" : "2022-06-23 18:15:44.000570",
        "recordStreamEndedAt" : "2022-06-23 18:15:44.000587",
        "recordReadAt" : "2022-06-23 18:15:44.000587",
        "totalStreamTimeMillis" : 17,
        "numberOfRecordsRead" : 1
    }
}

アプリケーションから読み込んで比較的シンプルな処理(順序非再現でPub/Subに投げる等)を行う場合であれば tableName, rowType, mods, modType 辺りを気にすればよさそうでしょうか。

順序やトランザクションを再現しようとするとメタデータを参照することになりかなり困難な実装になると思われます(1トランザクションが1JSONでストリームされるとは限らないため)。

UPDATE

先ほどINSERTしたレコードを更新しNameを aaa から ccc に変更してみます。
完了後SELECTをすると値が正しく変更されていることが確認できました。

$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "UPDATE Items SET Name = 'ccc' WHERE ItemID = 'aaa'"
$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "SELECT * FROM Items"
ItemID	Name
aaa	ccc

またしばらくするとavroが吐かれるはずなので再度パースしたものがこちら(言及しないデータは削除)。

{
    "tableName" : "Items",
    "rowType" : [
        {
            "name" : "ItemID",
            "Type" : "STRING",
            "isPrimaryKey" : true,
            "ordinalPosition" : 1
        },
        {
            "name" : "Name",
            "Type" : "STRING",
            "isPrimaryKey" : false,
            "ordinalPosition" : 2
        }
    ],
    "mods" : [
        {
            "keysJson" : "{"ItemID":"aaa"}",
            "oldValuesJson" : "{"Name":"bbb"}",
            "newValuesJson" : "{"Name":"ccc"}"
        }
    ],
    "modType" : "UPDATE",
}

tableName, rowType が同じなのは当然として mods では新しい値だけでなく更新前の値も含まれています。単純な処理では不要ですが、例えば特定のステータス遷移をトリガーに処理を行うようなケースでは有用でしょう。

DELETE

今度は同レコードを削除してみます。
恐らくご想像通りの結果になるので特に解説は不要でしょう。

$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "DELETE Items WHERE ItemID = 'aaa'"
$ spanner-cli -p $PROJECT_ID -i $SPANNER_INSTANCE_ID -d $SPANNER_DATABASE_ID -e "SELECT * FROM Items"

{
    "mods" : [
        {
            "keysJson" : "{"ItemID":"aaa"}",
            "oldValuesJson" : "{"Name":"ccc"}",
            "newValuesJson" : "{}"
        }
    ],
    "modType" : "DELETE",
}

終わりに

今回のテストではDataflowとテンプレートを使用して簡単にGCSへ出力できることがわかりました。また出力されるデータはかなり細かい情報が付加されるため 自分でApache Beam SDKのChange Streams Connectorを使って実装する気合があれば Transaction log tailingパターンやその他様々な用途で応用できそうです。

他のテンプレートとしてGCSではなくBigQueryに出力するものが用意されています。こちらについては商用環境に影響を与えないよう調査クエリーを発行する等のOLAP用リードレプリカとして利用価値がありそうです。ただスキーママイグレーション時どのような振る舞いになるのか気になるところです。

ちなみにユースケースに挙げられているPub/Subは公式テンプレートには無さそうです。理由は不明ですが恐らく単に全部そのまま投げ込むだけという要件は少なく(Ordering Key付けたいとかCustom Metadata足したいとか)自分で実装しましょうということなのではないでしょうか。

後片付け

Dataflowを作成する際に id: が出力されているはずなのでその内容をコピーして下記のように実行して下さい。先の例だと 2022-06-27_08_01_09-801665975291223129 がその値です。

$ make clean JOB_ID=2022-06-27_08_01_09-801665975291223129
gcloud --project $PROJECT_ID spanner instances delete $SPANNER_INSTANCE_ID -q
gsutil rm -r gs://$GCS_BUCKET
Removing gs://change-stream-bucket/...
gsutil rb gs://$GCS_BUCKET
Removing gs://change-stream-bucket/...
BucketNotFoundException: 404 gs://change-stream-bucket bucket does not exist.
make: [gcs-delete] Error 1 (ignored)
gcloud --project $PROJECT_ID dataflow jobs cancel 2022-06-27_08_01_09-801665975291223129 --region $DATAFLOW_REGION
Cancelled job [2022-06-27_08_01_09-801665975291223129]

最後に念の為コンソールからリソースが削除されているか確認しましょう。

Discussion