🔧

Cloud Spanner の Change Streams を Go で読む

2022/10/23に公開

はじめに

2022年5月に Cloud Spanner の Change Streams (変更ストリーム) が GA になりました。

機能としては AWS の DynamoDB Streams 的なものを期待していたのですが、実際に提供された Change Streams API は思ったより複雑な形でした。

比較的簡単に使う方法として公式から Dataflow のテンプレートが提供されていますが、転送先としては現状 Cloud Storage と BigQuery のみで選択肢が少ないです。BigQuery のストリーミングテンプレートを試してみましたが、テンプレート実装をある程度読んで何が起きているのか理解してから使いたい印象です。

テンプレート実装を読むならついでに Go で Change Streams API を読むところぐらいは作れるだろう、ということでテンプレート実装や先人の記事、ドキュメントなどを読みながら実装してみたので、この記事で解説します。

参考資料

最初に Change Streams API を扱うために読んだドキュメントや記事、実装などを挙げていきます。

Change Streams API の仕様
https://cloud.google.com/spanner/docs/change-streams/details?hl=ja

先人の記事
https://zenn.dev/ryo_yamaoka/scraps/f263b45d58a1a9
https://zenn.dev/ryo_yamaoka/articles/38c70cde3f97f0

Dataflow の公式 Template 実装
https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/fd9371f06ecf8da086ee2918ae2b2adac7057ea3/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery

Apache Beam SDK の Spanner コンポーネントたち
https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner

実装

ここからは具体的な実装方法について書いていきます。
なお、Change Streams 概要や作り方などは参考資料に書いてあるので適宜省略していきます。

クエリ

まずは実際に Change Stream を Cloud Spanner から読むところです。
クエリの各パラメータについてはドキュメントなどを参考にしてください。

type dao struct {
	client   *spanner.Client
}

func (d *dao) query(
	ctx context.Context,
	streamName string,
	startTimestamp time.Time,
	endTimestamp *time.Time,
	partitionToken *string,
	heartbeatMilliseconds int64,
) error {
	sql := fmt.Sprintf(`
		SELECT ChangeRecord FROM READ_%s (
			start_timestamp => @startTimestamp, 
			end_timestamp => @endTimestamp,
			partition_token => @partitionToken,
			heartbeat_milliseconds => @heartbeatMilliseconds
		)
	`, streamName)

	params := map[string]interface{}{
		"startTimestamp":        startTimestamp,
		"endTimestamp":          endTimestamp,
		"partitionToken":        partitionToken,
		"heartbeatMilliseconds": heartbeatMilliseconds,
	}

	return d.client.
		Single().
		Query(ctx, spanner.Statement{SQL: sql, Params: params}).
		Do(func(r *spanner.Row) error {
			v := []*ChangeRecord{}
			if err := r.Columns(&v); err != nil {
				return err
			}
			// TODO: ここで読んだ ChangeRecord を処理する
			return nil
		})
}

いくつかのパラメータについて説明します。

  • partitionToken
    変更ストリームを先頭から読む場合は nil を設定します。
  • startTimestamp
    現在時刻、または過去の時刻を設定します。
    遡れる範囲は Change Stream やデータベースの retention_period に基づきます。

クエリは Single() を使った単一読み取りの ReadOnlyTransaction で発行します。それ以外のトランザクションではクエリできません。

通常の SELECT クエリなら RowIterator を回しつつ slice に詰めて読み切ったら結果を返す、みたいな実装をするのが一般的です。しかし Change Streams API の場合は当然ながらストリームなのでパーティションのストリームが終わらない限りは読み取りがブロックされます。読んだそばから逐次処理していきましょう。

ChangeRecord の型

最初のややこしいポイントですが、この Change Stream から出てくるカラム型がまあまあ複雑です。

一旦適当な型で受けて JSON にして吐いてみればわかりやすいかなと思って試すものの、Spanner のカラムは struct をちゃんと作らないとデコードできないので結局ドキュメントと Apache Beam SDK を読みながら地道に作りました。

最終的にはこんな実装になりました。

type ChangeRecord struct {
	DataChangeRecords      []*DataChangeRecord      `spanner:"data_change_record"`
	HeartbeatRecords       []*HeartbeatRecord       `spanner:"heartbeat_record"`
	ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record"`
}

type DataChangeRecord struct {
	CommitTimestamp                      time.Time     `spanner:"commit_timestamp"`
	RecordSequence                       string        `spanner:"record_sequence"`
	ServerTransactionID                  string        `spanner:"server_transaction_id"`
	IsLastRecordInTransactionInPartition bool          `spanner:"is_last_record_in_transaction_in_partition"`
	TableName                            string        `spanner:"table_name"`
	ColumnTypes                          []*ColumnType `spanner:"column_types"`
	Mods                                 []*Mod        `spanner:"mods"`
	ModType                              string        `spanner:"mod_type"`
	ValueCaptureType                     string        `spanner:"value_capture_type"`
	NumberOfRecordsInTransaction         int64         `spanner:"number_of_records_in_transaction"`
	NumberOfPartitionsInTransaction      int64         `spanner:"number_of_partitions_in_transaction"`
	TransactionTag                       string        `spanner:"transaction_tag"`
	IsSystemTransaction                  bool          `spanner:"is_system_transaction"`
}

type ColumnType struct {
	Name            string           `spanner:"name"`
	Type            spanner.NullJSON `spanner:"type"`
	IsPrimaryKey    bool             `spanner:"is_primary_key"`
	OrdinalPosition int64            `spanner:"ordinal_position"`
}

type Mod struct {
	Keys      spanner.NullJSON `spanner:"keys"`
	NewValues spanner.NullJSON `spanner:"new_values"`
	OldValues spanner.NullJSON `spanner:"old_values"`
}

type HeartbeatRecord struct {
	Timestamp time.Time `spanner:"timestamp"`
}

type ChildPartitionsRecord struct {
	StartTimestamp  time.Time         `spanner:"start_timestamp"`
	RecordSequence  string            `spanner:"record_sequence"`
	ChildPartitions []*ChildPartition `spanner:"child_partitions"`
}

type ChildPartition struct {
	Token                 string   `spanner:"token"`
	ParentPartitionTokens []string `spanner:"parent_partition_tokens"`
}

注意点としては STRUCT 型のカラムは必ずポインタで受けること、JSON 型は spanner.NullJSON 型で受ける(なにか他の標準的な型で受けようとしてハマった)ことの2つです。

カラムが取り出せたらその中には以下の3種類のレコードのうちいずれかが含まれていますので取り出して種類別に処理していきましょう。このとき重要なポイントとして、クエリ時に指定した partitionToken は常にアクセス可能な場所に保存しておく必要があります。これはパーティションごとに "ここまで読んだ" を記録するためです。

type partitionStream struct {
	// ......
	partitionToken *string
	// ......
}

func (t *partitionStream) handle(ctx context.Context, records []*ChangeRecord) error {
	for _, cr := range records {
		for _, record := range cr.DataChangeRecords {
			if err := t.handleDataChangeRecord(ctx, record); err != nil {
				return err
			}
		}
		for _, record := range cr.HeartbeatRecords {
			if err := t.handleHeartbeatRecord(ctx, record); err != nil {
				return err
			}
		}
		for _, record := range cr.ChildPartitionsRecords {
			if err := t.handleChildPartitionsRecord(ctx, record); err != nil {
				return err
			}
		}
	}
	return nil
}

ここから各レコードについて解説します。

ハートビートレコード (heartbeat_record)

https://cloud.google.com/spanner/docs/change-streams/details#heartbeat-records

まずは簡単なものから。いわゆるハートビートですが、同時に "ここまで読んだ" を示すタイムスタンプとして timestamp が含まれています。

Change Streams API を利用するプロセスに中断が発生する場合、新規に起動するプロセスで現在時刻を指定すると中断していた間の変更が抜けてしまいます。

そこで、受け取るタイムスタンプをパーティションに対応する "ここまで読んだ" 情報としてどこか外部に記録し、再度プロセスを起動する際に記録しておいた情報から読み取り中のパーティションとタイムスタンプを復元してクエリを発行することで、中断時点からストリームの読み取りを再開できます。

なお、このタイムスタンプはハートビートレコードだけでなく、子パーティションレコードに含まれる start_timestamp、およびデータ変更レコードに含まれる commit_timestamp も同様の働きをします。

type partitionStream struct {
	// ......
	partitionToken *string
	// ここまで読んだを記録する関数
	watermarker func(ctx context.Context, partitionToken *string, timestamp time.Time) error
}

func (t *partitionStream) handleHeartbeatRecord(ctx context.Context, record *HeartbeatRecord) error {
	return t.watermarker(ctx, t.partitionToken, record.Timestamp)
}

子パーティションレコード (child_partitions_record)

https://cloud.google.com/spanner/docs/change-streams/details#child-partitions-records

Change Streams API の最も厄介な点として、ストリームが fork, join します。

あるストリーム(親パーティション)が fork, join した場合、これらのストリーム(子パーティション)には別の新しいパーティショントークンが割り当てられています。これを使って別の goroutine で新しいクエリを始める必要があります。

また、親パーティションの join が発生した場合、複数のストリームから同じ子パーティショントークンを受け取ることになります。このとき同じ子パーティショントークンに対して同じプロセス内で複数のクエリを発行しないように実装します。Cloud Spanner は同じパーティションに対するクエリを5セッションまでに制限しているようで、それ以上のセッションを張ろうとするとエラーが発生します。

type partitionStream struct {
	// ......
	partitionToken *string
	// ......
	partitionCh chan<- Partition
	// ......
}

// ......

func (t *partitionStream) handleChildPartitionsRecord(ctx context.Context, record *ChildPartitionsRecord) error {
	for _, cp := range record.ChildPartitions {
		p := Partition{
			PartitionToken: &cp.Token,
			StartTimestamp: record.StartTimestamp,
		}
		select {
		case <-ctx.Done():
			return ctx.Err()
		case t.partitionCh <- p:
		}
	}

	return t.watermarker(ctx, t.partitionToken, record.StartTimestamp)
}
type Controller struct {
	partitionCh <-chan Partition

	trackingPartitions *partitionSet

	wg *errgroup.Group
}


func (t *Controller) Start(ctx context.Context) error {
	t.wg, ctx = errgroup.WithContext(ctx)

	t.wg.Go(func() error {
		for {
			select {
			case p := <-t.partitionCh:
				t.StartTrack(ctx, p.PartitionToken, p.StartTimestamp)
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
	// ......
	return t.wg.Wait()
}

func (t *Controller) StartTrack(ctx context.Context, partitionToken string, startTimestamp time.Time) {
	if dup := t.trackingPartitions.add(partitionToken); dup {
		return
	}

	t.wg.Go(func() error {
		defer t.trackingPartitions.remove(partitionToken)

		// ここでクエリを始める
		return nil
	})
}

type partitionSet struct {
	m  map[string]struct{}
	mu sync.Mutex
}

func (s *partitionSet) add(t string) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	if _, ok := s.m[t]; ok {
		return true
	}

	s.m[t] = struct{}{}
	return false
}

func (s *partitionSet) remove(t string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.m, t)
}

データ変更レコード (data_change_record)

最後にデータ変更レコードです。
INSERT, UPDATE, DELETE のレコード例を示します。

INSERT

INSERT INTO
  users(userId, userName, userProfile, age, created, updated)
  values("E722EE5A-88C1-4563-B896-2D85F77745D5", "alice", "My name is alice.", 20, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP())
data_change_record
{
  "CommitTimestamp": "2022-10-23T05:56:18.925263Z",
  "RecordSequence": "00000000",
  "ServerTransactionID": "MTUzNDI2ODUwMDAwMDAyMDY0Mg==",
  "IsLastRecordInTransactionInPartition": true,
  "TableName": "users",
  "ColumnTypes": [
    {
      "Name": "userId",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": true,
      "OrdinalPosition": 1
    },
    {
      "Name": "userName",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 2
    },
    {
      "Name": "userProfile",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 3
    },
    {
      "Name": "updated",
      "Type": {
        "code": "TIMESTAMP"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 4
    },
    {
      "Name": "created",
      "Type": {
        "code": "TIMESTAMP"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 5
    },
    {
      "Name": "age",
      "Type": {
        "code": "INT64"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 6
    }
  ],
  "Mods": [
    {
      "Keys": {
        "userId": "E722EE5A-88C1-4563-B896-2D85F77745D5"
      },
      "NewValues": {
        "age": "20",
        "created": "2022-10-23T05:56:18.891196829Z",
        "updated": "2022-10-23T05:56:18.891196829Z",
        "userName": "alice",
        "userProfile": "My name is alice."
      },
      "OldValues": {}
    }
  ],
  "ModType": "INSERT",
  "ValueCaptureType": "OLD_AND_NEW_VALUES",
  "NumberOfRecordsInTransaction": 1,
  "NumberOfPartitionsInTransaction": 1,
  "TransactionTag": "",
  "IsSystemTransaction": false
}

UPDATE

UPDATE users
SET updated=CURRENT_TIMESTAMP, age=21
WHERE userId='E722EE5A-88C1-4563-B896-2D85F77745D5';
data_change_record
{
  "CommitTimestamp": "2022-10-23T05:59:59.356799Z",
  "RecordSequence": "00000000",
  "ServerTransactionID": "ODE1NzE2OTE3MzkzODM1NjYyMw==",
  "IsLastRecordInTransactionInPartition": true,
  "TableName": "users",
  "ColumnTypes": [
    {
      "Name": "userId",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": true,
      "OrdinalPosition": 1
    },
    {
      "Name": "updated",
      "Type": {
        "code": "TIMESTAMP"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 4
    },
    {
      "Name": "age",
      "Type": {
        "code": "INT64"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 6
    }
  ],
  "Mods": [
    {
      "Keys": {
        "userId": "E722EE5A-88C1-4563-B896-2D85F77745D5"
      },
      "NewValues": {
        "age": "21",
        "updated": "2022-10-23T05:59:59.307657331Z"
      },
      "OldValues": {
        "age": "20",
        "updated": "2022-10-23T05:56:18.891196829Z"
      }
    }
  ],
  "ModType": "UPDATE",
  "ValueCaptureType": "OLD_AND_NEW_VALUES",
  "NumberOfRecordsInTransaction": 1,
  "NumberOfPartitionsInTransaction": 1,
  "TransactionTag": "",
  "IsSystemTransaction": false
}

DELETE

DELETE FROM users
WHERE userId='E722EE5A-88C1-4563-B896-2D85F77745D5';
data_change_record
{
  "CommitTimestamp": "2022-10-23T06:13:41.486559Z",
  "RecordSequence": "00000000",
  "ServerTransactionID": "MTYwNDI3NjgyMjMwMDM3NDUxNQ==",
  "IsLastRecordInTransactionInPartition": true,
  "TableName": "users",
  "ColumnTypes": [
    {
      "Name": "userId",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": true,
      "OrdinalPosition": 1
    },
    {
      "Name": "userName",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 2
    },
    {
      "Name": "userProfile",
      "Type": {
        "code": "STRING"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 3
    },
    {
      "Name": "updated",
      "Type": {
        "code": "TIMESTAMP"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 4
    },
    {
      "Name": "created",
      "Type": {
        "code": "TIMESTAMP"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 5
    },
    {
      "Name": "age",
      "Type": {
        "code": "INT64"
      },
      "IsPrimaryKey": false,
      "OrdinalPosition": 6
    }
  ],
  "Mods": [
    {
      "Keys": {
        "userId": "E722EE5A-88C1-4563-B896-2D85F77745D5"
      },
      "NewValues": {},
      "OldValues": {
        "age": "21",
        "created": "2022-10-23T05:56:18.891196829Z",
        "updated": "2022-10-23T05:59:59.307657331Z",
        "userName": "alice",
        "userProfile": "My name is alice."
      }
    }
  ],
  "ModType": "DELETE",
  "ValueCaptureType": "OLD_AND_NEW_VALUES",
  "NumberOfRecordsInTransaction": 1,
  "NumberOfPartitionsInTransaction": 1,
  "TransactionTag": "",
  "IsSystemTransaction": false
}

UPDATE の例を見るとわかりますが、変更データレコードに含まれるカラムの情報は変更したカラムに限られます。変更後のレコードを取得するには、commit_timestamp を元に「強力なステイル読み取り」機能を利用してクエリすると他のカラムにアクセスできます。

row, err := spannerClient.
	Single().
	WithTimestampBound(spanner.ReadTimestamp(commitTimestamp)).
	ReadRow(ctx, tableName, keys, columnNames)

ただし、そもそも変更データレコードに他のカラムの情報が含まれません。また、keys, new_values, old_valuesの実体は map であり、キーの順序は保証されていません。

このため、レコードにアクセスするために事前に information_schema.columns にアクセスして対象のテーブルのカラムを把握する必要があります。

一方で、information_schema.columns 自体の変更は Change Streams API では取得できません。よって、テーブルスキーマの変更に追従するためにはまた別の工夫が必要です。ちなみに、このためか今のところ Dataflow の BigQuery テンプレートでもテーブルスキーマの変更には対応していません。

おわりに

この記事で触れた実装は、ライブラリとして以下のリポジトリで公開しています。

https://github.com/toga4/spream

今のところ、比較的簡単に Change Streams をコンソールに流して眺めるぐらいはできています。

並行処理周りが全然よくわからなくて、かなり雑です。Go言語による並行処理も以前読んだはずなのですが、普段 Web API の開発で並行処理を書くことはあっても簡単なものばかりで全く手に馴染みがありません。

でも Change Streams API を Go で扱う上での参考にはなるんじゃないかと思うので良かったら見てみてください。

以上です。お粗末さまでした。

Discussion