🛠️

GCP Cloud SpannerのBatchWriteを触ってみた

2024/06/12に公開

はじめに

仕事でSpannerの大規模なデータ処理を行うバッチを実装する機会が多いのですが、パフォーマンスや利便性向上に使える新機能はないかとなんとなく探ってみたところBatchWriteを見つけたので試してみました。

なお2024/06/10現在まだプレビュー機能です。

以下はGo言語の利用を前提に書きます。

BatchWriteの特徴

従来の書き込みではDMLの実行かMutationのApplyによりSpannerへ変更を加えていました。BatchWriteは大量のMutationを扱うための仕組みであり、複数のMutationをまとめたMutationGroupを作成し更にこのGroupを複数束ねて一括で送信できます。

これにより多数のMutationを順次Applyする際に発生するオーバーヘッドを削減しより高いパフォーマンスを得られます。またPartitioned DMLではサポートされないINSERTも行えます。

構造を図示すると以下のようになります。

Partitioned DMLとの違い

大規模なデータ更新という点で類似していますが根本的な仕組みが異なります。Partitioned DMLでは条件に合致するデータの一括変更であるのに対し、BatchWriteはMutationの一括適用を行います。

参考: Compare DML and Mutations

DELETEを例にすると以下のような捉え方で概ね正しいかと思います。

partitioned_dml.sql
-- 最終アクセスが 2024-01-01 00:00:00 以前のユーザーを全て削除する
DELETE user WHERE last_access < TIMESTAMP('2024-01-01 00:00:00', 'Asia/Tokyo');
batch_write.sql
-- 大量の特定ユーザーを個別にID指定して削除する
DELETE user WHERE user_id = 'xxx';
DELETE user WHERE user_id = 'yyy';
DELETE user WHERE user_id = 'zzz';
...

BatchUpdateとの違い

名前が似ていて紛らわしいのですが、BatchUpdateは単一のReadWriteトランザクションで複数のStatement DMLをまとめて投げるためのものなのでBatchWriteとは全く違う機能です。

脱線するのでこの記事ではこれ以上触れません。

https://pkg.go.dev/cloud.google.com/go/spanner@v1.63.0#ReadWriteTransaction.BatchUpdate

注意点

MutationGroup間はアトミックではない

以下のように4つのMutation(m1, m2, m3, m4)を2つのGroup(mg1, mg2)に分けてBatchWriteする場合を考えます。この場合m1とm2又はm3とm4はアトミックな処理となりますが、mg1とmg2はまとめてリクエストするにもかかわらずアトミックな処理とはなりません。

m1, _ := spanner.InsertStruct("table", a)
m2, _ := spanner.InsertStruct("table", b)
m3, _ := spanner.InsertStruct("table", c)
m4, _ := spanner.InsertStruct("table", d)

mg1 := &spanner.MutationGroup{
    Mutations: []*spanner.Mutation{m1, m2}, // atomic between m1 and m2
}
mg2 := &spanner.MutationGroup{
    Mutations: []*spanner.Mutation{m3, m4}, // atomic between m3 and m4
}

mgs := []*spanner.MutationGroup{mg1, mg2} // not atomic between mg1(m1, m2) and mg2(m3, m4)
iter := cli.BatchWrite(ctx, mgs)

つまりm1とm2は書き込みに成功したがm3とm4は失敗する、というようなケースが起こり得ます。またどのGroupが先に書き込みを試みるのかも不定で、Spanner側が採番するTimestamp(TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true) + spanner.CommitTimestamp)を利用する場合Groupが異なるとタイムスタンプも異なる可能性があります。

Mutation制限を意識しなくてよいわけではない

BatchWriteという名前から大量のデータを気軽に放り込むと後はヨロシク処理してくれる印象を受けます。しかしMutationGroup内部、つまりアトミックな処理範囲では従来と同じくMutation制限(2024/06/10時点で80,000)があります。

これを超過している場合は従来通りのエラーが発生し、もし超過しているGroupと超過していないGroupが混在している場合は未超過のGroupだけがApplyされます。以下は100,000件のGroup(超過)と100件のGroup(未超過)を同時に投げ込んだ場合の結果です。100件の方だけが書き込まれているのがわかります。

$ go run main.go
2024/06/12 01:06:34 indexes:0  status:{code:3  message:"The transaction contains too many mutations. Insert and update operations count with the multiplicity of the number of columns they affect. For example, inserting values into one key column and four non-key columns count as five mutations total for the insert. Delete and delete range operations count as one mutation regardless of the number of columns affected. The total mutation count includes any changes to indexes that the transaction generates. Please reduce the number of writes, or use fewer indexes. (Maximum number: 80000)"  details:{[type.googleapis.com/google.rpc.Help]:{links:{description:"Cloud Spanner limits documentation."  url:"https://cloud.google.com/spanner/docs/limits"}}}}
2024/06/12 01:06:34 indexes:1  status:{}  commit_timestamp:{seconds:1718121994  nanos:800435000}
failed to batch write: 1 error occurred:
	* rpc error: code = InvalidArgument desc = The transaction contains too many mutations. Insert and update operations count with the multiplicity of the number of columns they affect. For example, inserting values into one key column and four non-key columns count as five mutations total for the insert. Delete and delete range operations count as one mutation regardless of the number of columns affected. The total mutation count includes any changes to indexes that the transaction generates. Please reduce the number of writes, or use fewer indexes. (Maximum number: 80000)


exit status 1
$
$
$ spanner-cli -e "SELECT COUNT(*) FROM Users;"

100

メッセージサイズ上限を超えやすい

こちらも従来からある制限事項ですが、大量のMutationをまとめて投げるというユースケース的に1メッセージあたり100MBという制限を超過しやすいです。これは上記のMutation制限とは別の話しで、複数のGroupを束ねた時の容量です。

rpc error: code = ResourceExhausted desc = SERVER: Received message larger than max (145000945 vs. 104857600)

私が適当に考えた下記の小さいスキーマで試したところ1リクエストで総数700,000Mutation程度が限界でした。もしこれを超える規模のMutationを投げたい場合は分割して逐次リクエストしなければなりません。

BatchWriteを実用する場合は時間の経過と共にメッセージサイズが肥大化してある日突然エラーが発生した、等とならないようMutationの分割だけでなく複数のMutationGroupもチャンクとして分割し順次リクエストするような実装が必要になるでしょう。

CREATE TABLE Users (
    UserID STRING(MAX) NOT NULL,
    CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
    UpdatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
) PRIMARY KEY (UserID);
type User struct {
	UserID    string    `spanner:"UserID"`
	CreatedAt time.Time `spanner:"CreatedAt"`
	UpdatedAt time.Time `spanner:"UpdatedAt"`
}

サンプルコード

雑に書いたのであまり見易くないですが検証に使ったコードを貼っておきます。上記のMutation制限とメッセージサイズ上限をケアする実装になっているので、数百万行あるような大量のデータでもエラーを起こさず書き込めるはずです。

samples
.envrc
export SPANNER_PROJECT_ID=<your_project_id>
export SPANNER_INSTANCE_ID=sample-instance
export SPANNER_DATABASE_ID=sample-database
schema.sql
CREATE TABLE Users (
    UserID STRING(MAX) NOT NULL,
    CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
    UpdatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),
) PRIMARY KEY (UserID);
Makefile
spanner-create:
	gcloud --project $$SPANNER_PROJECT_ID spanner instances create $$SPANNER_INSTANCE_ID --processing-units 100 --config regional-asia-northeast1 --description $$SPANNER_INSTANCE_ID
	gcloud --project $$SPANNER_PROJECT_ID spanner databases create $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID --ddl-file schema.sql

spanner-reset:
	-gcloud --project $$SPANNER_PROJECT_ID spanner databases delete $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID -q
	-gcloud --project $$SPANNER_PROJECT_ID spanner databases create $$SPANNER_DATABASE_ID --instance $$SPANNER_INSTANCE_ID --ddl-file schema.sql

spanner-delete:
	-gcloud --project $$SPANNER_PROJECT_ID spanner instances delete $$SPANNER_INSTANCE_ID -q
main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"cloud.google.com/go/spanner"
	"github.com/google/uuid"
	multierr "github.com/hashicorp/go-multierror"
	"google.golang.org/api/iterator"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

const (
	tableUsers = "Users"

	totalMutations      = 1_000_000
	mutationsPerCommit  = 25_000
	maxMutationsInGroup = 700_000
)

type User struct {
	UserID    string    `spanner:"UserID"`
	CreatedAt time.Time `spanner:"CreatedAt"`
	UpdatedAt time.Time `spanner:"UpdatedAt"`
}

func main() {
	if err := run(); err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
}

func run() error {
	uu, err := genUsers(totalMutations)
	if err != nil {
		return err
	}

	mgs, err := genMutationGroups(uu, mutationsPerCommit)
	if err != nil {
		return err
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()
	dsn := fmt.Sprintf("projects/%s/instances/%s/databases/%s", os.Getenv("SPANNER_PROJECT_ID"), os.Getenv("SPANNER_INSTANCE_ID"), os.Getenv("SPANNER_DATABASE_ID"))
	cli, err := spanner.NewClient(ctx, dsn)
	if err != nil {
		return err
	}
	defer cli.Close()

	for _, mg := range mgs {
		if err := batchWrite(ctx, cli, mg); err != nil {
			return fmt.Errorf("failed to batch write: %w", err)
		}
	}

	return nil
}

func genMutationGroups(uu []*User, perCommit int) ([][]*spanner.MutationGroup, error) {
	var mgs [][]*spanner.MutationGroup
	var mg []*spanner.MutationGroup
	var mm []*spanner.Mutation

	for i, u := range uu {
		m, err := spanner.InsertStruct(tableUsers, u)
		if err != nil {
			return nil, err
		}
		mm = append(mm, m)

		if i != 0 && i%perCommit == 0 {
			mg = append(mg, &spanner.MutationGroup{Mutations: mm})
			mm = []*spanner.Mutation{}
			if i != 0 && i%maxMutationsInGroup == 0 {
				mgs = append(mgs, mg)
				mg = []*spanner.MutationGroup{}
			}
		}
	}

	mg = append(mg, &spanner.MutationGroup{Mutations: mm})
	mgs = append(mgs, mg)

	return mgs, nil
}

func genUsers(count uint64) ([]*User, error) {
	uu := make([]*User, count)
	for i := uint64(0); i < count; i++ {
		v4, err := uuid.NewRandom()
		if err != nil {
			return nil, err
		}
		u := &User{
			UserID:    v4.String(),
			CreatedAt: spanner.CommitTimestamp,
			UpdatedAt: spanner.CommitTimestamp,
		}
		uu[i] = u
	}
	return uu, nil
}

func batchWrite(ctx context.Context, cli *spanner.Client, mgs []*spanner.MutationGroup) error {
	var merr error
	iter := cli.BatchWrite(ctx, mgs)
	defer iter.Stop()

	for {
		resp, err := iter.Next()
		if err != nil {
			if err == iterator.Done {
				return merr
			}
			return multierr.Append(merr, err)
		}

		st := status.FromProto(resp.GetStatus())
		if st.Code() != codes.OK {
			merr = multierr.Append(merr, st.Err())
		}
		log.Println(resp.String())
	}
}

パフォーマンス例

上記コードで100PUのインスタンスに対し総数1,000,000レコードを1Groupあたり25,000Mutations、1リクエストあたり700,000Mutationsで書き込んだところ約1分10秒かかりました。

$ make spanner-reset && go run main.go
gcloud --project $SPANNER_PROJECT_ID spanner databases delete $SPANNER_DATABASE_ID --instance $SPANNER_INSTANCE_ID -q
gcloud --project $SPANNER_PROJECT_ID spanner databases create $SPANNER_DATABASE_ID --instance $SPANNER_INSTANCE_ID --ddl-file schema.sql
Creating database...done.
2024/06/12 01:43:24 indexes:0 status:{} commit_timestamp:{seconds:1718124203 nanos:972607000}
...
2024/06/12 01:44:09 indexes:27 status:{} commit_timestamp:{seconds:1718124248 nanos:224101000}
2024/06/12 01:44:14 indexes:0 status:{} commit_timestamp:{seconds:1718124253 nanos:427264000}
...
2024/06/12 01:44:34 indexes:11 status:{} commit_timestamp:{seconds:1718124274 nanos:36695000}
$
$
$ spanner-cli -e "SELECT COUNT(*) FROM Users;"

1000000

参考文献

Discussion