【基礎】 GoでSpannerを使う
はじめに
GoogleのSpannerデータベースはまだまだ知名度が低く、日本語での文献も豊富ではないため、いざ使うとなるとかなり苦労する技術です。ここでは最低限の概念を説明することにつとめ、通常利用においてSpannerのハードルを下げようと思いこの記事を執筆しました。
基本的には以下の資料に載っている情報かと思いますが、実際にソースコードを見るとドキュメントの更新が追いついていない部分が多い印象でした。そのためクライアントライブラリのソースコードに可能な限り追従し、できるだけ平易な文章でまとめようと思います。
クライアントの生成
公式ドキュメントに記載があるため、説明は割愛します。
import (
"context"
"io"
"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
)
func createClients(w io.Writer, db string) error {
ctx := context.Background()
adminClient, err := database.NewDatabaseAdminClient(ctx)
if err != nil {
return err
}
defer adminClient.Close()
dataClient, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer dataClient.Close()
_ = adminClient
_ = dataClient
return nil
}
以下説明にはこのコードで生成したクライアントを利用している前提とさせていただきます。
トランザクションの概念
Spannerライブラリでは、トランザクションを強く意識した書き味になります。そのため基本的な処理の前にそれぞれのトランザクションを理解する必要があります。
Spannerで利用できるトランザクションには
- ReadOnlyTransaction
- BatchReadOnlyTransaction
- ReadWriteTransaction
の3つがあります。
ReadOnlyTransaction
読み取りに特化したトランザクションになります。公式ドキュメントに説明があります。
読み取り専用トランザクションはトランザクションの commit 履歴で整合性のあるプレフィックスを監視しているため、アプリケーションは常に整合性のあるデータを取得できます。
読み取りのみの場合はReadOnlyTransactionを利用することでトランザクションのロックもなく、整合性のあるデータを取得できるため積極的に利用するのが良いでしょう。
ReadOnlyTransactionを利用する方法は2つあります。
(c *Client) ReadOnlyTransaction()を利用する
clientのメソッドとしてReadOnlyTransaction()
が利用でき、トランザクションが結果として取得できます。このトランザクションから読み取りを実行できます。読み取りを実行後、トランザクションを明示的にCloseする必要があります。
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) GetTasks(ctx context.Context) ([]*Task, error) {
tx := r.c.ReadOnlyTransaction() // トランザクションを開始
defer tx.Close() // トランザクションを閉じる
query := spanner.NewStatement("SELECT * FROM tasks")
tasks := make([]*Task, 0)
iter := tx.Query(ctx, query)
iter.Do(func(row *spanner.Row) error {
var t Task
if err := row.ToStructLenient(&t); err != nil {
return err
}
tasks := append(tasks, &t)
return nil
})
return tasks, nil
}
トランザクション以外の部分については後述しますので、今は雰囲気で読んでください。
(c *Client) Single()を利用する
以下のように利用できます。
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) GetTasks(ctx context.Context) ([]*Task, error) {
tx := r.c.Single() // トランザクションを開始
query := spanner.NewStatement("SELECT * FROM tasks")
tasks := make([]*Task, 0)
iter := tx.Query(ctx, query)
iter.Do(func(row *spanner.Row) error {
var t Task
if err := row.ToStructLenient(&t); err != nil {
return err
}
tasks = append(tasks, &t)
return nil
})
return tasks, nil
}
基本的にはReadOnlyTransaction()
の場合とほぼ同じ使い方ができます。何が違うのかというと、Single()
は単一の読み取りクエリに最適化されているということです。そのため明示的なClose()
が必要なく、クエリが実行された時点でこのトランザクションは終了します。複数の読み取りの厳密な同時実行が必要でない場合はSingleを使ってReadOnlyTransactionを生成したほうが多くの場合効率的なようです。
BatchReadOnlyTransaction
こちらはほぼ利用しないかと思います。BatchReadOnlyTransactionは、データベースのスナップショットからパーティション化された読み取りやクエリを行うために使用するためのトランザクションになっており、複数マシンから同一のトランザクションを参照し、読み取り作業を分割したい、というユースケースに対応するためのものです。
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) GetTasks(ctx context.Context) ([]*Task, error) {
tx, err := r.c.BatchReadOnlyTransaction(ctx, spanner.StrongRead()) // トランザクションを開始
if err != nil {
return nil, err
}
defer func() {
tx.Close() // トランザクションを閉じる
tx.Cleanup(ctx) // すべてのクライアントのトランザクションが閉じられてからトランザクションをクリーンアップ
}()
query := spanner.NewStatement("SELECT * FROM tasks")
tasks := make([]*Task, 0)
iter := tx.Query(ctx, query)
iter.Do(func(row *spanner.Row) error {
var t Task
if err := row.ToStructLenient(&t); err != nil {
return err
}
tasks = append(tasks, &t)
return nil
})
return tasks, nil
}
コード例は単一クライアントでの例なので、Batchで処理をするものではないですが一応載せておきます。
ReadWriteTransaction
ReadWriteTransactionは書き込みに読み取りが必要になる場合に利用するトランザクションです。
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) DuplicateTask(ctx context.Context, taskID string) error {
_, err := r.c.ReadWriteTransaction(ctx, func(ctx context.Context, tx *spanner.ReadWriteTransaction) error {
// Read task
var task Task
row, err := tx.ReadRow(ctx, "tasks", spanner.Key{taskID}, []string{"task_id", "name"})
if err != nil {
return err
}
if err := row.ToStruct(&task); err != nil {
return err
}
// Duplicate task
task.TaskID = uuid.New().String()
m := spanner.Insert("tasks", []string{"task_id", "name"}, []interface{}{task.TaskID, task.Name})
tx.BufferWrite([]*spanner.Mutation{m})
return nil
})
if err != nil {
return err
}
return nil
}
タスクを複製する、という操作を例として上げてみます。書き込み操作については後述します。ReadWriteTransactionは内部で自動的にリトライを行っており、トランザクションがコミットできない場合、引数として渡しているfunctionを再度実行します。リトライ回数の設定はctxにデッドラインを設定することで実現できます。
複数の書き込みのみのトランザクションは?
ここまで複数の書き込みに該当するトランザクションがないことに違和感を持った人もいるでしょう。Spannerではmutationとして書き込み処理を複数保持し、トランザクションへのBufferWriteや、clientから実行するApplyによって一括で処理するため、一連のmutationを同じ配列に格納して置くことで、書き込み単体に対してのトランザクションを意識する必要はありません。
読み取り処理
データ取得の方法についてはかなり数と手法があるため、基本的なもののみを取り上げます。
まず先程の例でも出てきた、Query()
を使う方法です。Query()
はすべてのトランザクション種別で利用でき、RowIteratorを返す関数です。SpannerではGoogleSQLとPostgreSQLの2つのSQLを利用できます。しかしSpannerライブラリの使用はGoogleSQLの利用を想定している箇所が存在しているため、推奨しているようにGoogleSQLを利用するのが良いかと思います。
読み取り処理(Query)
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) GetTasksByName(ctx context.Context, name string) ([]*Task, error) {
tx := r.c.Single() // 単一読み取りトランザクションを開始
q := "SELECT * FROM tasks WHERE name = @name" // named query parameterを使用したクエリ
queryParams := map[string]interface{}{
"name": name, // クエリパラメータ
}
stmt := spanner.Statement{
SQL: q,
Params: queryParams,
}
tasks := make([]*Task, 0)
iter := tx.Query(ctx, stmt)
iter.Do(func(row *spanner.Row) error { // クエリ結果を処理
var t Task
if err := row.ToStructLenient(&t); err != nil { // クエリ結果を構造体に変換
return err
}
tasks := append(tasks, &t)
return nil
})
return tasks, nil
}
先ほど解説した単一読み取り処理のため、Single()
を利用してReadOnlyTransactionを開始しています(複数の読み取り処理が存在する場合はReadOnlyTransaction()
を利用してください)。そしてクエリとパラメタを別々に準備します。これはGoogleSQLのnamed query parameterという機能です。fmt.Sprintf
ですべて準備しても良いのですが、クオーテーションの有無だったりをパラメタの型によって自動で処理してくれたりするので、GoogleSQLを使ったほうが簡単に書けることが多いと思います。
クエリ文、パラメタのマップを使いSpanner.Statementを作成し、それをQuery()
に渡します。Query()
はRowIterator
を返すため、イテレーション処理によって結果を処理する必要があります。たとえ結果が一つだと分かりきっている場合でもです。
iter.Do()
によってそれぞれのrowを順番に処理していきます。row.ToStructLenient()
ではその結果をspannerタグのついた構造体に自動的にマッピングしていきます。タグ名と、Select結果のカラム名は一致している必要があります。
iter.Do()とiter.Next()
実はGoogleの公式ガイドにはiter.Do()の例がありません。すべてiter.Next()によって書かれています。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func query(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
iter := client.Single().Query(ctx, stmt)
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
}
}
iter.Next()
はRowIteretor
を一つ先に進め、読み取るデータが存在しない場合はiterator.Doneというエラーを返します。これらによってデータの読み取りを実現しています。データがすべて読み取られた後、iter.Stop()
を呼び、イテレータを終了する必要があります。
ではiter.Do()
は何をしているのでしょうか?
なんと上記の処理をすべてしてくれているではありませんか。読み取ったrowに対しての画一的な処理をクロージャとして渡すことにより、簡潔に処理を書けます。その代わり、iter.Do()
はループの中で一度でもエラーが起きた場合はそれ以上の処理をせずイテレーションを終了するのですが、iter.Next()
を使うと、エラーが起きても続行する。できる限りのrowを読む、といったことが可能になります。それぞれのイテレーションでrowに対してではなく、errなどに対して複雑な処理をしたい場合はiter.Next()
を使った明示的な処理を使うのがよく、それ以外のほぼすべてのユースケースに対してはiter.Do()
を使うのが良さそうです。
ToStructとToStructLenient
iterから取得できるそれぞれのrowデータを構造体にマッピングするには、ToStruct()
とToStructLenient()
のどちらかを使う必要があります。これらは、Selectしたカラムとマッピング先の構造体が一致していることを許容するかどうかが違います。Lenientは"寛大な"という意味です。ToStruct()
がマッピング先とカラムが完全に一致していない場合にエラーを返すのに対し、ToStructLenient()
は可能な限り一致するカラムを見つけ、たとえ一致していない場合でもエラーを返さず、初期値をセットする、結果を捨てる等の処理をしてくれます。これによってアプリケーションコードとデータベースのマイグレーションタイミングを厳密に合わせずとも、安全にアプリケーションを更新することができます。
読み取り処理(ReadRow)
Query()
だけでなく、ReadRow()
という関数も使用できます。
type Task struct {
TaskID string `spanner:"task_id"`
Name string `spanner:"name"`
}
func (r *repository) GetTaskByID(ctx context.Context, taskID string) (*Task, error) {
tx := r.c.Single() // 単一読み取りトランザクションを開始
row, err := tx.ReadRow(ctx, "tasks", spanner.Key{taskID}, []string{"task_id", "name"})
if err != nil {
return nil, err
}
var task Task
if err := row.ToStructLenient(&task); err != nil {
return nil, err
}
return &task, nil
}
ReadRow()
は結果が単一であることを期待しており、先程のようなイテレーション処理を書かずにすみます。ただし取得するカラム名を正確に指定する必要があり、Query()
ほどの自由さはありません。また検索条件にしているSpanner.Keyは個人的には使い勝手があまり良くなく、個人的にはすべての取得に関してはQuery()
を使って実装してしまいます。
書き込み処理(Insert)
SpannerのInsert()
関数を使用します。
func (r *repository) CreateTask(ctx context.Context) error {
var mutations []*spanner.Mutation
mutations = append(mutations, spanner.Insert("tasks", []string{"task_id", "name"}, []interface{}{uuid.New().String(), "task1"}))
mutations = append(mutations, spanner.Insert("tasks", []string{"task_id", "name"}, []interface{}{uuid.New().String(), "task2"}))
_, err := r.c.Apply(ctx, mutations)
if err != nil {
return err
}
return nil
}
Spannerでは、複数のmutationをまとめ、clientからApplyします(ReadWriteTransactionの場合はBufferWrite)。内部的にはApplyの際にReadWriteTransactionを発行し、そこで処理を行っているのですが、特に読み取りがない場合はApplyで実行しておくと簡潔に書けます。
書き込み処理(Update)
SpannerのUpdate()
関数を使用します。
func (r *repository) UpdateTask(ctx context.Context, taskID string) error {
var mutations []*spanner.Mutation
mutations = append(mutations, spanner.Update("tasks", []string{"task_id", "name"}, []interface{}{taskID, "update"}))
_, err := r.c.Apply(ctx, mutations)
if err != nil {
return err
}
return nil
}
ほぼCreateと同様ですが、指定した主キーに対応するレコードが存在していない場合はエラーになります。
番外編: yoでより簡単にSpannerを使う
Spanner周りのツールは、Googleが認定したcloudspannerecosystemというオーガニゼーションのリポジトリとして公開されています。マイグレーションツールであるwrenchや、コマンドラインからSpannerを操作するspanner-cliなどいろいろなリポジトリがありますが、特に便利なのがこれから紹介するyoです。
yoはコマンドラインからSpannerデータベースを読み取り、データベーススキーマから自動でspannerタグのついた構造体を生成するライブラリです。
細かな使用方法についてはこちらがよくまとまっているかと思うので割愛します。
yoを使うと、生成された構造体に対してメソッドがいくつか作られます。
例えば、Task構造体に対してInsertというメソッドが生成されます。
// Insert returns a Mutation to insert a row into a table. If the row already
// exists, the write or transaction fails.
func (t *Task) Insert(ctx context.Context) *spanner.Mutation {
values, _ := w.columnsToValues(TaskWritableColumns())
return spanner.Insert("tasks", TaskWritableColumns(), values)
}
このメソッドを使うことで、先程書いたinsertがより型安全に扱えます。
import yo "models" // yoが生成したパッケージ
func (r *repository) CreateTask(ctx context.Context) error {
var mutations []*spanner.Mutation
newTasks := []yo.Task{
{
TaskID: uuid.New().String(),
Name: "task1",
},
{
TaskID: uuid.New().String(),
Name: "task2",
},
}
for _, t := range newTasks {
mutations = append(mutations, task.Insert(ctx))
}
_, err := r.c.Apply(ctx, mutations)
if err != nil {
return err
}
return nil
}
yoが勝手にSpannerタグを付与した構造体を生成し、その構造体によしなに値を詰め、Insertメソッドを実行するだけで簡単にmutationの構築ができます。
取得系に関してはあまり自由度がないため引き続きQuery()
を使用することが多いのですが、マッピング先の型を自分で更新する必要がなく、yoにお任せしておけばマッピングに失敗することがないというのが良いところです。これからSpannerで開発をしていく方にはぜひ使っていただきたいです。
番外編: さらに怠惰に書く
個人的には以下のようなパッケージをアプリケーション内で作っており、単一の読み取りであれば脳死でデータを取得できるようにラップしてあります。よければお使いください。
package internal
import (
"context"
"cloud.google.com/go/spanner"
)
// ReadQueryResult Spannerのクエリ結果をDTOに変換して返す
//
// 使用例:
// queryResult, err := internal.ReadQueryResult[dto.SomethingDTO](
// ctx,
// spannerClient.Single(),
// spannerStatement,
// )
func ReadQueryResult[DTO any, spannerTx interface {
Query(ctx context.Context, stmt spanner.Statement) *spanner.RowIterator
}](ctx context.Context, tx spannerTx, stmt spanner.Statement) ([]*DTO, error) {
var results []*DTO
err := tx.Query(ctx, stmt).Do(func(r *spanner.Row) error {
var result DTO
if err := r.ToStructLenient(&result); err != nil {
return err
}
results = append(results, &result)
return nil
})
if err != nil {
return nil, err
}
return results, nil
}
おわりに
今回は基本的な機能をメインに解説しましたが、そもそもクライアントライブラリからは設定ができない便利な機能があり、またInterLeaveというSpannerを代表するような機能についても全く触れませんでした。今後Spannerを使っていく方は、この記事で基本的な使い方を理解し、深みを目指すハードルが下がったのであれば幸いです。
Discussion