GCP SpannerにおけるStatement DMLとMutationの挙動の違い
はじめに
アプリケーションからGCP Cloud Spannerのデータを操作するには主にStatement DMLを使う方法とMutationを使う方法の2種類がありますが、一部の振る舞いにおいて異なる点があります。
覚えてしまえば特に難しいことはないのですがたまに忘れてハマるので備忘のため簡単にまとめてGoのサンプルコードと共に紹介したいと思います。
環境構築
以下のスキーマとベースコードを前提に検証します。
CREATE TABLE Users (
UserID STRING(MAX),
Name STRING(MAX),
) PRIMARY KEY (UserID);
package main
import (
"context"
"fmt"
"os"
"time"
"cloud.google.com/go/spanner"
)
type User struct {
UserID string `spanner:"UserID"`
Name string `spanner:"Name"`
}
const (
UsersTableName = "Users"
)
func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func printUser(ctx context.Context, userID string, c *spanner.Client) error {
row, err := c.ReadOnlyTransaction().ReadRow(ctx, UsersTableName, spanner.Key{userID}, []string{"UserID", "Name"})
if err != nil {
return err
}
var u User
err = row.ToStruct(&u)
if err != nil {
return err
}
fmt.Printf("%+v\n", u)
return nil
}
func run() error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
projectID := os.Getenv("SPANNER_PROJECT_ID")
instanceID := os.Getenv("SPANNER_INSTANCE_ID")
databaseID := os.Getenv("SPANNER_DATABASE_ID")
dsn := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID)
c, err := spanner.NewClient(ctx, dsn)
if err != nil {
return err
}
Read Your Writes (RYW)
RYWは同一トランザクション内で何らかの更新したデータを再度読み取ることを指します。Statement DMLでは対応可能ですがMutationではサポートされていません。
Mutationでは書き込み時に BufferWrite()
関数を使用しますがこれは名前の通りクライアント側でバッファーしておきコミット時にまとめて送信する、という挙動になります。そのため同一トランザクション内部で BufferWrite()
-> SELECT という順でコードを書いたとしても読み取ることができません。
Statement DML
下記サンプルコードでは insertStmt
の内容を同じトランザクション内から発行した selectStmt
で読み取れていることがわかります。この例ではINSERT文を使用しましたがUPDATEやDELETE等でも同様の挙動となります。
$ go run main.go
{UserID:sample user ID Name:hoge}
{UserID:sample user ID Name:hoge}
userID := "sample user ID"
insertStmt := spanner.Statement{
SQL: `INSERT INTO Users (UserID, Name) VALUES (@userID, @name)`,
Params: map[string]interface{}{
"userID": userID,
"name": "hoge",
},
}
selectStmt := spanner.Statement{
SQL: `SELECT * FROM Users WHERE UserID = @userID`,
Params: map[string]interface{}{
"userID": userID,
},
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
// INSERT new User by statement DML
_, err := rwt.Update(ctx, insertStmt)
if err != nil {
return err
}
// Read by SELECT statement
iter := rwt.Query(ctx, selectStmt)
defer iter.Stop()
err = iter.Do(func(r *spanner.Row) error {
var u User
err := r.ToStruct(&u)
if err != nil {
return err
}
fmt.Printf("%+v\n", u) // ==> {UserID:sample user ID Name:hoge}
return nil
})
if err != nil {
return err
}
// Read by Single
row, err := rwt.ReadRow(ctx, UsersTableName, spanner.Key{userID}, []string{"UserID", "Name"})
if err != nil {
return err
}
var u User
err = row.ToStruct(&u)
if err != nil {
return err
}
fmt.Printf("%+v\n", u) // ==> {UserID:sample user ID Name:hoge}
return nil
})
Mutation
その一方でMutationは適用した順番でトランザクションのコミット時にまとめて反映されるため、同一トランザクション内では BufferWrite()
の後からでも読み取ることはできません。サンプルとしてINSERTを利用していますがその他のDMLでも同様です。
The effects of the write won't be visible to any reads (including reads done in the same transaction) until the transaction commits.
下記コードではReadWriteTransactionを2回実行していますが、1回目のINSERTの反映は読み取り後に回るためSELECT時にUserを見つけることができません。2回目の時点は1回目のトランザクションが終了しておりMutationが反映されているためUserを読み取ることができます。
userID := "sample user ID"
selectStmt := spanner.Statement{
SQL: `SELECT * FROM Users WHERE UserID = @userID`,
Params: map[string]interface{}{
"userID": userID,
},
}
u := User{
UserID: userID,
Name: "hoge",
}
m, err := spanner.InsertStruct(UsersTableName, &u)
if err != nil {
return err
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
// INSERT new User by Mutation
err := rwt.BufferWrite([]*spanner.Mutation{m})
if err != nil {
return err
}
// Read by SELECT statement
iter := rwt.Query(ctx, selectStmt)
defer iter.Stop()
err = iter.Do(func(r *spanner.Row) error {
var u User
err := r.ToStruct(&u)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
fmt.Printf("row count: %d\n", iter.RowCount) // ==> 0
// Read by Single
_, err = rwt.ReadRow(ctx, UsersTableName, spanner.Key{userID}, []string{"UserID", "Name"})
if err != nil {
fmt.Println(err) // ==> spanner: code = "NotFound", desc = "row not found(Table: Users, PrimaryKey: (\"sample user ID\"))"
}
return nil
})
if err != nil {
return err
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
// Read by SELECT statement
iter := rwt.Query(ctx, selectStmt)
defer iter.Stop()
err = iter.Do(func(r *spanner.Row) error {
var u User
err := r.ToStruct(&u)
if err != nil {
return err
}
fmt.Printf("%+v\n", u) // ==> {UserID:sample user ID Name:hoge}
return nil
})
if err != nil {
return err
}
// Read by Single
row, err := rwt.ReadRow(ctx, UsersTableName, spanner.Key{userID}, []string{"UserID", "Name"})
if err != nil {
return err
}
var u User
err = row.ToStruct(&u)
if err != nil {
return err
}
fmt.Printf("%+v\n", u) // ==> {UserID:sample user ID Name:hoge}
return nil
})
$ go run main.go
row count: 0
spanner: code = "NotFound", desc = "row not found(Table: Users, PrimaryKey: (\"sample user ID\"))"
{UserID:sample user ID Name:hoge}
{UserID:sample user ID Name:hoge}
UPSERT
指定したレコード(PrimaryKey)が存在しない場合はINSERT、存在する場合はUPDATEとして実行するSpanner以外でもよくある機能です。
Statement DML
DMLの場合特別な便利機能は無いため、INSERT時にSpannerが返すエラーをハンドリングしAlreadyExistsの場合はUPDATEを打つような形で愚直に実装する必要があります(或いは事前にSELECTを打って確認する方法でもよいでしょう)。
userID := "sample user ID"
insertStmt := spanner.Statement{
SQL: `INSERT INTO Users (UserID, Name) VALUES (@userID, @name)`,
Params: map[string]interface{}{
"userID": userID,
"name": "init name",
},
}
updateStmt := spanner.Statement{
SQL: `UPDATE Users SET Name = @name WHERE UserID = @userID`,
Params: map[string]interface{}{
"userID": userID,
"name": "updated name",
},
}
insert := func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
_, err := rwt.Update(ctx, insertStmt)
if err != nil {
return err
}
return nil
}
update := func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
_, err := rwt.Update(ctx, updateStmt)
if err != nil {
return err
}
return nil
}
upsert := func(ctx context.Context) error {
_, err := c.ReadWriteTransaction(ctx, insert)
if err != nil {
if spanner.ErrCode(err) == codes.AlreadyExists {
_, err := c.ReadWriteTransaction(ctx, update)
if err != nil {
return err
}
return nil
}
return err
}
return nil
}
err = upsert(ctx)
if err != nil {
return err
}
err = printUser(ctx, userID, c) // ==> {UserID:sample user ID Name:init name}
if err != nil {
return err
}
err = upsert(ctx)
if err != nil {
return err
}
err = printUser(ctx, userID, c) // ==> {UserID:sample user ID Name:updated name}
if err != nil {
return err
}
$ go run main.go
{UserID:sample user ID Name:init name}
{UserID:sample user ID Name:updated name}
Mutation
こちらは比較的簡単で単純に InsertOrUpdate~
でMutationを作成し適用してやるだけです。1回目の読み取りと2回目の読み取りでNameの値が変わっているのがわかります。
userID := "sample user ID"
m1, err := spanner.InsertOrUpdateStruct(UsersTableName, &User{
UserID: userID,
Name: "init name",
})
if err != nil {
return err
}
m2, err := spanner.InsertOrUpdateStruct(UsersTableName, &User{
UserID: userID,
Name: "updated name",
})
if err != nil {
return err
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
err := rwt.BufferWrite([]*spanner.Mutation{m1}) // Insert new user
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = printUser(ctx, userID, c) // ==> {UserID:sample user ID Name:init name}
if err != nil {
return err
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
err := rwt.BufferWrite([]*spanner.Mutation{m2}) // Update to new Name
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = printUser(ctx, userID, c) // ==> {UserID:sample user ID Name:updated name}
if err != nil {
return err
}
$ go run main.go
{UserID:sample user ID Name:init name}
{UserID:sample user ID Name:updated name}
データベース制約
SpannerにもDBレイヤーで様々な制約(例えばNOT NULLやSTRING(10)等)をかけることができますが、これらの評価タイミングにも違いがあります。
Statement DML
コードの直後に評価され違反があるとエラーが返却されます。
userID := "sample user ID"
insertStmt := spanner.Statement{
SQL: `INSERT INTO Users (UserID, Name) VALUES (@userID, @name)`,
Params: map[string]interface{}{
"userID": userID,
"name": "init name",
},
}
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
_, err := rwt.Update(ctx, insertStmt)
if err != nil {
return err
}
_, err = rwt.Update(ctx, insertStmt)
if err != nil {
fmt.Println("error in the transaction")
return err
}
return nil
})
if err != nil {
return err
}
$ go run main.go
error in the transaction
spanner: code = "AlreadyExists", desc = "Row [sample user ID] in table Users already exists"
exit status 1
Mutation
前述の通り BufferWrite()
した時点ではSpannerに送られないため評価が行われず、トランザクションの最後で評価されます。また本題とは直接関係無いですが、返却されるエラーがAlreadyExistsではなくInvalidArgumentなのもの興味深い点です。
userID := "sample user ID"
m := spanner.Insert(UsersTableName, []string{"UserID", "Name"}, []interface{}{userID, "init name"})
_, err = c.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
err := rwt.BufferWrite([]*spanner.Mutation{m})
if err != nil {
return err
}
err = rwt.BufferWrite([]*spanner.Mutation{m})
if err != nil {
return err
}
fmt.Println("no error in the transaction")
return nil
})
if err != nil {
return err
}
$ go run main.go
no error in the transaction
spanner: code = "InvalidArgument", desc = "Row [sample user ID] in table Users was already created in this transaction."
exit status 1
まとめ
- SQL文を直接書けるStatement DMLとデータ操作をオブジェクトとして扱うMutationではコード上での表現以外にも一部の振る舞いに差がある
- Statement DMLではSQL文を一つずつ実行していく形に近い感覚で扱うことができる
- Mutationは各命令を最後にまとめて実行するためRYWのようなトリッキーなことが困難
Discussion