🛠️

【Go】Service層でSpannerのトランザクション管理をしたい

2021/12/29に公開

こんにちは。ymtdzzzです。
Zennの書き味にハマってしまい、個人ブログではなくこちらで書いちゃいます。

今日はSpannerとGoの話。

RepositoryパターンでWebアプリケーションを実装するときは、どこでトランザクション管理を行うかが論点になることが多いですが、恐らくServiceとかUsecase層(infraを意識しないビジネスロジックを扱う層)で行いたいのが通常かと思います。

ただ、Spanner Clientでトランザクション管理を行いたい場合、依存性を排除するのが難しくなってきます。

下記のように、トランザクション境界がライブラリの機能に閉じるため、どうやってそれを抽象化するかというのがポイントになってきそうです。

// この辺の扱いを抽象化したい
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    var balance int64
    row, err := txn.ReadRow(ctx, "Accounts", spanner.Key{"alice"}, []string{"balance"})
    if err != nil {
        return err
    }
    if err := row.Column(0, &balance); err != nil {
        return err
    }

    if balance <= 10 {
        return errors.New("insufficient funds in account")
    }
    balance -= 10
    m := spanner.Update("Accounts", []string{"user", "balance"}, []interface{}{"alice", balance})
    return txn.BufferWrite([]*spanner.Mutation{m})
})

最終的にserviceから下記のような感じでトランザクション管理(DBに依存せずに)できるようにしたいと思います。また、Repositoryにおいては逆にトランザクションを意識しないで済む方法になっています。

func (hfs *hogefugaService) StoreHogeFuga(ctx context.Context, hoge *model.Hoge, fuga *model.Fuga) error {
	// トランザクション開始
	err := hfs.tx.BeginReadWriteTx(ctx)
	if err != nil {
		return err
	}
	// データ更新処理
	err = hfs.hrepo.Create(ctx, hoge)
	if err != nil {
		// エラーだったらトランザクションロールバック
		hfs.tx.RollbackTx(ctx)
		return err
	}
	fuga := &model.Fuga{
		// ...
	}
	err = hfs.frepo.Create(ctx, fuga)
	if err != nil {
		// エラーだったらトランザクションロールバック
		hfs.tx.RollbackTx(ctx)
		return err
	}
	// トランザクションコミット
	return hfs.tx.CompleteTx(ctx)
}

検討事項

Serviceからの呼び方

1) ヘルパー的なものを作ってどこからでもトランザクションを開始・終了できるようにしてみる
→イメージとしてはLaravelのDB Facadeのtransaction()的な
2) 任意のRepositoryに管理用関数を準備する
3) RepositoryServiceの間に1枚レイヤーを挟む

今回は3)を採用し、トランザクション管理の責務を持ったstruct/interfaceを定義してみようと思います。

トランザクションオブジェクトの扱い

SpannerにはReadOnlyTransactionとReadWriteTransactionの2種類あり、それぞれClient上だと3種類の構造体に表現されています。

ReadWriteTransactionを使いたい場合、冒頭の例のようにクロージャーで処理を完結させないといけなく、レイヤーをまたぐためにはそれ以外の2種類でトランザクション管理していく方向が良さそうです。

トランザクション管理方法

トランザクションオブジェクトをどこかに保持して扱うにしても、net/httpはリクエスト毎にgoroutine(マルチスレッド)でHandlerで処理する形のため、下記の課題をクリアする必要があります。

  • リクエストに該当するトランザクションを識別して管理する
  • スレッドセーフである必要がある

リクエストに該当するトランザクションを識別して管理する

トランザクションとリクエストは1対1の関係になるので、どのリクエストのトランザクションかどうかを識別する必要があります。

幸いにも、リクエストスコープのデータを持ち回るためにcontext.Contextが利用できるので、それを使っていこうと思います。

スレッドセーフである必要がある

リクエストを判別する識別子をKey、トランザクションオブジェクトをValueとするmapを使えば良さそうなのですが、ビルドインのmapはスレッドセーフではないので、syncパッケージのMapを使うことにします。

依存関係

実際のコードから少し簡略化していますが、大まかな依存関係はこのような形になります。基本的にレイヤーが異なる場合はinterfaceを経由しておくことで、依存関係を減らしています。

実装

では実装していきます。なお、コードは一部抜粋しています。

middleware

まずはmiddlewareで、リクエスト固有の識別子を発行してcontext.Contextに格納します。
(※middlewareの作り方や登録方法についてはこの辺りの記事参照。)

http/middleware.go
type Middleware struct {}

func NewMiddleware() Middleware {
	return &Middleware{}
}

func (m *middleware) ExampleMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), "request-id", uuid.New().String())))
	})
}

Service

Serviceを定義します。依存先はRepositoryとトランザクション管理(Transactional)のinterfaceです。

service/hogefuga.go
type HogeService interface {
	StoreHoge(context.Context, *model.Hoge) error
	GetHogeById(context.Context, string) (*model.Hoge, error)
}

type hogeService struct {
	tx    repository.Transactional // トランザクション管理機能のinterface
	hrepo  repository.HogeRepository // interface
	frepo repository.FugaRepository // interface
}

// 実装は後述

Repository(interface)

トランザクション管理を行う関数群をTransactional interfaceとして定義します。
Repositoryへの結合後が高いのでrepositoryパッケージに配置します。

repository/transaction.go
type Transactional interface {
	BeginSingleReadTx(context.Context) error
	BeginReadOnlyTx(context.Context) error
	BeginReadWriteTx(context.Context) error
	CompleteTx(context.Context) error
	RollbackTx(context.Context) error
	ApplyMutationWithoutTx(context.Context) error
}

各Repositoryのinterfaceも定義しておきます。

repository/hoge.go
type HogeRepository interface {
	FindById(context.Context, string) (*model.Hoge, error)
	Create(context.Context, *model.Hoge) error
}
repository/fuga.go
type FugaRepository interface {
	FindById(context.Context, string) (*model.Fuga, error)
	Create(context.Context, *model.Fuga) error
}

Repository(implementation)

各Repositoryの実装です。

後述するSpannerConnectionをメンバーとして持っており、SpannerConnectionで定義した関数を呼び出すことができます。(Query()とかAppendMutation()とか。厳密にはembedされたWrappedTransactionで定義されている。後述)。

注目すべき点は、ここではトランザクションを意識していないという点です。
読み取り系処理はReadOnlyTransactionに定義されている関数を呼び出すことが可能です(読み取り処理はClientライブラリから行う場合も必ずReadOnlyTransaction経由で実行されるので、矛盾は無いはず)。

また、書き込み系処理はMutationの生成のみで、MutationApplyはここではしていません。

なお、各RepositoryはSpannerConnectionへの参照を保持しているので、他Repositoryと共有しています。

repository/spanner/hoge.go
type hogeRepository struct {
	conn *SpannerConnection
}

func NewHogeRepository(conn *SpannerConnection) repository.HogeRepository {
	return &hogeRepository{
		conn,
	}
}

func (hr *hogeRepository) FindById(ctx context.Context, id string) (*model.Hoge, error) {
	stmt := spanner.NewStatement(fmt.Sprintf("SELECT * FROM %s WHERE HogeId = @HogeId LIMIT 1", tableNameHoge))
	stmt.Params["HogeId"] = id

	iter, err := cr.conn.Query(child, stmt)

	defer iter.Stop()
	row, err := iter.Next()
	if err != nil {
		return nil, err
	}
	hoge := &model.Hoge{}
	if err := row.ToStruct(hoge); err != nil {
		return nil, err
	}

	return hoge, nil
}

func (hr *hogeRepository) Create(ctx context.Context, hoge *model.Hoge) error {
	m, err := spanner.InsertStruct(tableNameHoge, hoge)
	if err != nil {
		return err
	}

	cr.conn.AppendMutation(ctx, m)

	return nil
}

SpannerConnection

Spanner DBへの接続情報と、トランザクション管理機能を備えた構造体です。

repository/spanner/connection.go
type SpannerConnection struct {
	*spanner.Client
	Timeout time.Duration
	*WrappedTransaction
}

// 初期化方法はおまかせ
func NewSpannerConnection(cfg *config.SpannerConfig) (*SpannerConnection, error) {
	ctx := context.Background()
	c, err := spanner.NewClientWithConfig(ctx, cfg.Database, cfg.ClientConfig)
	if err != nil {
		return nil, err
	}
	conn := &SpannerConnection{
		Client:             c,
		Timeout:            cfg.TransactionTimeout,
		WrappedTransaction: nil,
	}
	conn.WrappedTransaction = NewWrappedTransaction(conn)
	return conn, nil
}

// 保持しているSpanner ClientをCloseする(mainとかでインスタンス化するときにdeferで呼び出す)
func (con *SpannerConnection) ClientClose() {
	con.Client.Close()
}

WrappedTransaction

いよいよ、今回のトランザクション管理のキモになってくるWrappedTransactionです。
(ネーミング的にはWrappedSpannerTransactionとかの方が良いかも・・・)

この構造体は先程のTransactionalinterfaceを実装しており、各serviceはinterface経由でこちらのトランザクション管理系の関数を呼び出すことができます。

やっていることは大まかに下記の通りです。

  • 各リクエストに紐づくReadOnlyTransactionを保持
  • 各リクエストに紐づくReadWriteStmtBasedTransactionを保持
  • 各リクエストに紐づくMutationを保持
  • トランザクション管理処理
    • トランザクション開始
    • トランザクションロールバック
    • トランザクションコミット
  • Mutationの登録
  • Spanner Clientのトランザクションオブジェクト(ReadOnlyTransaction/ReadWriteStmtBasedTransaction)が持つ関数のラッパーを提供

ソースコードの全貌は下記の通りです。
(要点は後述する&長いのでざっと見るで大丈夫です)

repository/spanner/transaction.go
type WrappedTransaction struct {
	conn *SpannerConnection
	tx   sync.Map
	rtx  sync.Map
	ms   sync.Map
}

func NewWrappedTransaction(conn *SpannerConnection) *WrappedTransaction {
	return &WrappedTransaction{
		conn: conn,
		tx:   sync.Map{},
		rtx:  sync.Map{},
		ms:   sync.Map{},
	}
}

// ここからトランザクション系の処理(Transactional interface)
// serviceから呼び出される想定
func (t *WrappedTransaction) BeginSingleReadTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if rtx, ok := t.rtx.Load(reqId); ok {
		if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			v.Close()
			t.rtx.Delete(reqId)
		}
	}
	t.rtx.Store(reqId, t.conn.Client.Single())
	return nil
}

func (t *WrappedTransaction) BeginReadOnlyTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if rtx, ok := t.rtx.Load(reqId); ok {
		if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			v.Close()
			t.rtx.Delete(reqId)
		}
	}
	t.rtx.Store(reqId, t.conn.Client.ReadOnlyTransaction())
	return nil
}

func (t *WrappedTransaction) BeginReadWriteTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}
	fmt.Println(reqId)

	if tx, ok := t.tx.Load(reqId); ok {
		if _, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			t.tx.Delete(reqId)
		}
	}
	tx, err := spanner.NewReadWriteStmtBasedTransaction(ctx, t.conn.Client)
	if err != nil {
		return err
	}
	t.tx.Store(reqId, tx)
	return nil
}

func (t *WrappedTransaction) CompleteTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	ms, _ := t.ms.LoadOrStore(reqId, []*spanner.Mutation{})
	if ms, ok := ms.([]*spanner.Mutation); ok {
		if tx, ok := t.tx.Load(reqId); ok {
			if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
				err = tx.BufferWrite(ms)
				if err != nil {
					fmt.Println(err)
					tx.Rollback(ctx)
				} else {
					_, err = tx.Commit(ctx)
					if err != nil {
						tx.Rollback(ctx)
						fmt.Println(err)
						return err
					}
				}
			}
		}
	}
	if rtx, ok := t.rtx.Load(reqId); ok {
		if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			v.Close()
		}
	}
	t.ms.Delete(reqId)
	t.tx.Delete(reqId)
	t.rtx.Delete(reqId)
	return err
}

func (t *WrappedTransaction) RollbackTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			tx.Rollback(ctx)
		}
	}
	return nil
}

func (t *WrappedTransaction) ApplyMutationWithoutTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if ms, ok := t.ms.LoadOrStore(reqId, []*spanner.Mutation{}); ok {
		if msv, ok := ms.([]*spanner.Mutation); ok {
			_, err := t.conn.Client.Apply(ctx, msv)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

// Mutationを登録する
// repositoryから呼び出される想定
func (t *WrappedTransaction) AppendMutation(ctx context.Context, m *spanner.Mutation) error {
	fmt.Println("AppendMutation ---")
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}
	fmt.Println(reqId)

	if ms, ok := t.ms.Load(reqId); ok {
		if ms, ok := ms.([]*spanner.Mutation); ok {
			ms = append(ms, m)
			t.ms.Store(reqId, ms)
		}
	} else {
		newMs := []*spanner.Mutation{m}
		t.ms.Store(reqId, newMs)
	}
	fmt.Println(t.ms)

	return nil
}

// ここからSpanner Clientのトランザクションオブジェクトの関数のラッパー
// repositoryから呼び出される想定
func (t *WrappedTransaction) AnalyzeQuery(ctx context.Context, stmt spanner.Statement) (*sppb.QueryPlan, error) {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return nil, err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.AnalyzeQuery(ctx, stmt)
		}
	}
	if rtx, ok := t.rtx.Load(reqId); ok {
		if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			return rtxv.AnalyzeQuery(ctx, stmt)
		}
	}
	return nil, errors.New("any transactions has not been started yet.")
}

func (t *WrappedTransaction) BatchUpdate(ctx context.Context, stmts []spanner.Statement) (_ []int64, err error) {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return nil, err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.BatchUpdate(ctx, stmts)
		}
	}
	if _, ok := t.rtx.Load(reqId); ok {
		return nil, errors.New("ReadOnlyTransaction does not support operation: BatchUpdate")
	}
	return nil, errors.New("any transactions has not been started yet.")
}

func (t *WrappedTransaction) BufferWrite(ctx context.Context, ms []*spanner.Mutation) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.BufferWrite(ms)
		}
	}
	if _, ok := t.rtx.Load(reqId); ok {
		return errors.New("ReadOnlyTransaction does not support operation: BufferWrite")
	}
	return errors.New("any transactions has not been started yet.")
}

func (t *WrappedTransaction) Query(ctx context.Context, stmt spanner.Statement) (*spanner.RowIterator, error) {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return nil, err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.Query(ctx, stmt), nil
		}
	}
	if rtx, ok := t.rtx.Load(reqId); ok {
		if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			return rtxv.Query(ctx, stmt), nil
		}
	}
	return nil, errors.New("any transactions has not been started yet.")
}

// TODO: 他にもSpanner Clientの関数で呼び出したいものがあれば適宜追加

// context.Contextからmiddlewareで入れたリクエスト識別子を取得する
func (t *WrappedTransaction) getReqIdFromCtx(ctx context.Context) (string, error) {
	v := ctx.Value("request-id")
	reqId, ok := v.(string)
	if !ok {
		return "", errors.New("context value 'request-id' is not found")
	}
	return reqId, nil
}

トランザクション管理系処理

func (t *WrappedTransaction) BeginSingleReadTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	if rtx, ok := t.rtx.Load(reqId); ok {
		if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			v.Close()
			t.rtx.Delete(reqId)
		}
	}
	t.rtx.Store(reqId, t.conn.Client.Single())
	return nil
}

こちらはReadOnlyTransactionの開始処理の抜粋です。

  • context.Contextからリクエスト識別子取得
  • 一応既に該当キーにトランザクションオブジェクトが存在した場合はClose()して、Mapから削除
  • Spanner Client経由(t.conn.Client.Single())でトランザクションオブジェクトを生成してMapに保存

他のトランザクション開始処理も同様です。

func (t *WrappedTransaction) CompleteTx(ctx context.Context) error {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return err
	}

	ms, _ := t.ms.LoadOrStore(reqId, []*spanner.Mutation{})
	if ms, ok := ms.([]*spanner.Mutation); ok {
		if tx, ok := t.tx.Load(reqId); ok {
			if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
				err = tx.BufferWrite(ms)
				if err != nil {
					fmt.Println(err)
					tx.Rollback(ctx)
				} else {
					_, err = tx.Commit(ctx)
					if err != nil {
						tx.Rollback(ctx)
						fmt.Println(err)
						return err
					}
				}
			}
		}
	}
	if rtx, ok := t.rtx.Load(reqId); ok {
		if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			v.Close()
		}
	}
	t.ms.Delete(reqId)
	t.tx.Delete(reqId)
	t.rtx.Delete(reqId)
	return err
}

こちらはトランザクションの完了処理です。どの種類のトランザクションが流れていた場合でも対応できるように、それぞれの場合で処理を分岐させています。
sync.Mapのデータがinterface{}のせいで型アサーションまみれになっててすいません。もっと良い書き方ありそうです。。)

ReadWriteStmtBasedTransaction
Mutationを取得しBufferWrite()Commit()。失敗したらRollback()

ReadOnlyTransaction
Close()するだけ。

共通
最後に該当キーの値を削除。

Spanner Clientのトランザクションオブジェクトの関数のラッパー

func (t *WrappedTransaction) Query(ctx context.Context, stmt spanner.Statement) (*spanner.RowIterator, error) {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return nil, err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.Query(ctx, stmt), nil
		}
	}
	if rtx, ok := t.rtx.Load(reqId); ok {
		if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
			return rtxv.Query(ctx, stmt), nil
		}
	}
	return nil, errors.New("any transactions has not been started yet.")
}

ReadOnlyTransactionReadWriteStmtBasedTransactionそれぞれに存在している関数については、それぞれ該当関数を呼び出します。

どちらも存在しなければそもそもトランザクションが始まっていないのでエラーを発生させます(どちらかは必ずあるはず)。

func (t *WrappedTransaction) BatchUpdate(ctx context.Context, stmts []spanner.Statement) (_ []int64, err error) {
	reqId, err := t.getReqIdFromCtx(ctx)
	if err != nil {
		return nil, err
	}

	if tx, ok := t.tx.Load(reqId); ok {
		if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
			return txv.BatchUpdate(ctx, stmts)
		}
	}
	if _, ok := t.rtx.Load(reqId); ok {
		return nil, errors.New("ReadOnlyTransaction does not support operation: BatchUpdate")
	}
	return nil, errors.New("any transactions has not been started yet.")
}

これはReadWriteStmtBasedTransactionにか存在しない関数なので、ReadOnlyTransactionの中で呼ばれたらエラーとしています。

Serviceでの使い方

最後に、先程説明をスキップしていた、Serviceでどうやってトランザクション管理するかについてです。

例として、複数テーブル(HogeテーブルとFugaテーブル)のデータ挿入をアトミックに実行する例です。

func (hfs *hogefugaService) StoreHogeFuga(ctx context.Context, hoge *model.Hoge, fuga *model.Fuga) error {
	// トランザクション開始
	err := hfs.tx.BeginReadWriteTx(ctx)
	if err != nil {
		return err
	}
	// データ更新処理
	err = hfs.hrepo.Create(ctx, hoge)
	if err != nil {
		// エラーだったらトランザクションロールバック
		hfs.tx.RollbackTx(ctx)
		return err
	}
	fuga := &model.Fuga{
		// ...
	}
	err = hfs.frepo.Create(ctx, fuga)
	if err != nil {
		// エラーだったらトランザクションロールバック
		hfs.tx.RollbackTx(ctx)
		return err
	}
	// トランザクションコミット
	return hfs.tx.CompleteTx(ctx)
}

いい感じにServiceでトランザクション管理ができるようになりました!!

まとめ

この方法ですが、メリットとデメリットは下記の通りです。

メリット

  • serviceでトランザクション管理ができる
  • repository側でトランザクションを意識する必要が無い(=クエリ実行に集中できる)
  • トランザクション管理を一箇所で管理できる
  • トランザクション管理層への依存関係がまぁまぁ自然(serviceからはインターフェース経由、repositoryからは実態)

デメリット

  • トランザクションオブジェクトのClose()し忘れをコンパイル時に検知できない
    • コードレビューなどで机上確認するのが原則になってきますが、セーフティネットとしてMiddlewareで必ず該当するトランザクションをClose()する処理を入れることで回避できそうです(middlewareがtransactionaのinterfaceに依存してしまう点は若干微妙)
    • 同様に、panic時もrecover()で全トランザクションを(可能なら)終了するような処理が必要かも
  • Repositoryパターンとしては中途半端
    • 例えばバックエンドがSpannerとRedisと混在してた場合どうしようっていう問題とか
    • 本当の意味で透過的にトランザクション管理できているとは言えない
  • ReadWriteStmtBasedTransactionはSpanner側の一時的なエラー発生時のリトライ処理までは面倒を見てくれないので、そこは自前でカバーする必要がある
    • クロージャー版のReadWriteTransactionは面倒見てくれます

試しに上記と同じ構成で実装してみて、MAXPROC=4でgoroutineたくさん生やしてみましたがきちんとトランザクション管理されているようでした(先述の通りAbortがいくつか発生していて、そこのリトライ処理はやはりちゃんとやっておかないといけない)

以上、参考になれば幸いです!

Discussion