【Go】Service層でSpannerのトランザクション管理をしたい
こんにちは。ymtdzzzです。
Zennの書き味にハマってしまい、個人ブログではなくこちらで書いちゃいます。
今日はSpannerとGoの話。
RepositoryパターンでWebアプリケーションを実装するときは、どこでトランザクション管理を行うかが論点になることが多いですが、恐らくServiceとかUsecase層(infraを意識しないビジネスロジックを扱う層)で行いたいのが通常かと思います。
ただ、Spanner Clientでトランザクション管理を行いたい場合、依存性を排除するのが難しくなってきます。
下記のように、トランザクション境界がライブラリの機能に閉じるため、どうやってそれを抽象化するかというのがポイントになってきそうです。
// この辺の扱いを抽象化したい
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
var balance int64
row, err := txn.ReadRow(ctx, "Accounts", spanner.Key{"alice"}, []string{"balance"})
if err != nil {
return err
}
if err := row.Column(0, &balance); err != nil {
return err
}
if balance <= 10 {
return errors.New("insufficient funds in account")
}
balance -= 10
m := spanner.Update("Accounts", []string{"user", "balance"}, []interface{}{"alice", balance})
return txn.BufferWrite([]*spanner.Mutation{m})
})
最終的にserviceから下記のような感じでトランザクション管理(DBに依存せずに)できるようにしたいと思います。また、Repositoryにおいては逆にトランザクションを意識しないで済む方法になっています。
func (hfs *hogefugaService) StoreHogeFuga(ctx context.Context, hoge *model.Hoge, fuga *model.Fuga) error {
// トランザクション開始
err := hfs.tx.BeginReadWriteTx(ctx)
if err != nil {
return err
}
// データ更新処理
err = hfs.hrepo.Create(ctx, hoge)
if err != nil {
// エラーだったらトランザクションロールバック
hfs.tx.RollbackTx(ctx)
return err
}
fuga := &model.Fuga{
// ...
}
err = hfs.frepo.Create(ctx, fuga)
if err != nil {
// エラーだったらトランザクションロールバック
hfs.tx.RollbackTx(ctx)
return err
}
// トランザクションコミット
return hfs.tx.CompleteTx(ctx)
}
検討事項
Serviceからの呼び方
1) ヘルパー的なものを作ってどこからでもトランザクションを開始・終了できるようにしてみる
→イメージとしてはLaravelのDB
Facadeのtransaction()
的な
2) 任意のRepository
に管理用関数を準備する
3) Repository
とService
の間に1枚レイヤーを挟む
今回は3)を採用し、トランザクション管理の責務を持ったstruct
/interface
を定義してみようと思います。
トランザクションオブジェクトの扱い
SpannerにはReadOnlyTransactionとReadWriteTransactionの2種類あり、それぞれClient上だと3種類の構造体に表現されています。
ReadWriteTransaction
を使いたい場合、冒頭の例のようにクロージャーで処理を完結させないといけなく、レイヤーをまたぐためにはそれ以外の2種類でトランザクション管理していく方向が良さそうです。
トランザクション管理方法
トランザクションオブジェクトをどこかに保持して扱うにしても、net/http
はリクエスト毎にgoroutine
(マルチスレッド)でHandler
で処理する形のため、下記の課題をクリアする必要があります。
- リクエストに該当するトランザクションを識別して管理する
- スレッドセーフである必要がある
リクエストに該当するトランザクションを識別して管理する
トランザクションとリクエストは1対1の関係になるので、どのリクエストのトランザクションかどうかを識別する必要があります。
幸いにも、リクエストスコープのデータを持ち回るためにcontext.Context
が利用できるので、それを使っていこうと思います。
スレッドセーフである必要がある
リクエストを判別する識別子をKey、トランザクションオブジェクトをValueとするmap
を使えば良さそうなのですが、ビルドインのmap
はスレッドセーフではないので、sync
パッケージのMap
を使うことにします。
依存関係
実際のコードから少し簡略化していますが、大まかな依存関係はこのような形になります。基本的にレイヤーが異なる場合はinterface
を経由しておくことで、依存関係を減らしています。
実装
では実装していきます。なお、コードは一部抜粋しています。
middleware
まずはmiddlewareで、リクエスト固有の識別子を発行してcontext.Context
に格納します。
(※middlewareの作り方や登録方法についてはこの辺りの記事参照。)
type Middleware struct {}
func NewMiddleware() Middleware {
return &Middleware{}
}
func (m *middleware) ExampleMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), "request-id", uuid.New().String())))
})
}
Service
Serviceを定義します。依存先はRepositoryとトランザクション管理(Transactional)のinterfaceです。
type HogeService interface {
StoreHoge(context.Context, *model.Hoge) error
GetHogeById(context.Context, string) (*model.Hoge, error)
}
type hogeService struct {
tx repository.Transactional // トランザクション管理機能のinterface
hrepo repository.HogeRepository // interface
frepo repository.FugaRepository // interface
}
// 実装は後述
Repository(interface)
トランザクション管理を行う関数群をTransactional interface
として定義します。
Repositoryへの結合後が高いのでrepository
パッケージに配置します。
type Transactional interface {
BeginSingleReadTx(context.Context) error
BeginReadOnlyTx(context.Context) error
BeginReadWriteTx(context.Context) error
CompleteTx(context.Context) error
RollbackTx(context.Context) error
ApplyMutationWithoutTx(context.Context) error
}
各Repositoryのinterfaceも定義しておきます。
type HogeRepository interface {
FindById(context.Context, string) (*model.Hoge, error)
Create(context.Context, *model.Hoge) error
}
type FugaRepository interface {
FindById(context.Context, string) (*model.Fuga, error)
Create(context.Context, *model.Fuga) error
}
Repository(implementation)
各Repositoryの実装です。
後述するSpannerConnection
をメンバーとして持っており、SpannerConnection
で定義した関数を呼び出すことができます。(Query()
とかAppendMutation()
とか。厳密にはembedされたWrappedTransaction
で定義されている。後述)。
注目すべき点は、ここではトランザクションを意識していないという点です。
読み取り系処理はReadOnlyTransaction
に定義されている関数を呼び出すことが可能です(読み取り処理はClientライブラリから行う場合も必ずReadOnlyTransaction経由で実行されるので、矛盾は無いはず)。
また、書き込み系処理はMutation
の生成のみで、Mutation
のApply
はここではしていません。
なお、各RepositoryはSpannerConnection
への参照を保持しているので、他Repositoryと共有しています。
type hogeRepository struct {
conn *SpannerConnection
}
func NewHogeRepository(conn *SpannerConnection) repository.HogeRepository {
return &hogeRepository{
conn,
}
}
func (hr *hogeRepository) FindById(ctx context.Context, id string) (*model.Hoge, error) {
stmt := spanner.NewStatement(fmt.Sprintf("SELECT * FROM %s WHERE HogeId = @HogeId LIMIT 1", tableNameHoge))
stmt.Params["HogeId"] = id
iter, err := cr.conn.Query(child, stmt)
defer iter.Stop()
row, err := iter.Next()
if err != nil {
return nil, err
}
hoge := &model.Hoge{}
if err := row.ToStruct(hoge); err != nil {
return nil, err
}
return hoge, nil
}
func (hr *hogeRepository) Create(ctx context.Context, hoge *model.Hoge) error {
m, err := spanner.InsertStruct(tableNameHoge, hoge)
if err != nil {
return err
}
cr.conn.AppendMutation(ctx, m)
return nil
}
SpannerConnection
Spanner DBへの接続情報と、トランザクション管理機能を備えた構造体です。
type SpannerConnection struct {
*spanner.Client
Timeout time.Duration
*WrappedTransaction
}
// 初期化方法はおまかせ
func NewSpannerConnection(cfg *config.SpannerConfig) (*SpannerConnection, error) {
ctx := context.Background()
c, err := spanner.NewClientWithConfig(ctx, cfg.Database, cfg.ClientConfig)
if err != nil {
return nil, err
}
conn := &SpannerConnection{
Client: c,
Timeout: cfg.TransactionTimeout,
WrappedTransaction: nil,
}
conn.WrappedTransaction = NewWrappedTransaction(conn)
return conn, nil
}
// 保持しているSpanner ClientをCloseする(mainとかでインスタンス化するときにdeferで呼び出す)
func (con *SpannerConnection) ClientClose() {
con.Client.Close()
}
WrappedTransaction
いよいよ、今回のトランザクション管理のキモになってくるWrappedTransaction
です。
(ネーミング的にはWrappedSpannerTransaction
とかの方が良いかも・・・)
この構造体は先程のTransactional
interfaceを実装しており、各serviceはinterface経由でこちらのトランザクション管理系の関数を呼び出すことができます。
やっていることは大まかに下記の通りです。
- 各リクエストに紐づく
ReadOnlyTransaction
を保持 - 各リクエストに紐づく
ReadWriteStmtBasedTransaction
を保持 - 各リクエストに紐づく
Mutation
を保持 - トランザクション管理処理
- トランザクション開始
- トランザクションロールバック
- トランザクションコミット
-
Mutation
の登録 - Spanner Clientのトランザクションオブジェクト(
ReadOnlyTransaction
/ReadWriteStmtBasedTransaction
)が持つ関数のラッパーを提供
ソースコードの全貌は下記の通りです。
(要点は後述する&長いのでざっと見るで大丈夫です)
type WrappedTransaction struct {
conn *SpannerConnection
tx sync.Map
rtx sync.Map
ms sync.Map
}
func NewWrappedTransaction(conn *SpannerConnection) *WrappedTransaction {
return &WrappedTransaction{
conn: conn,
tx: sync.Map{},
rtx: sync.Map{},
ms: sync.Map{},
}
}
// ここからトランザクション系の処理(Transactional interface)
// serviceから呼び出される想定
func (t *WrappedTransaction) BeginSingleReadTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if rtx, ok := t.rtx.Load(reqId); ok {
if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
v.Close()
t.rtx.Delete(reqId)
}
}
t.rtx.Store(reqId, t.conn.Client.Single())
return nil
}
func (t *WrappedTransaction) BeginReadOnlyTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if rtx, ok := t.rtx.Load(reqId); ok {
if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
v.Close()
t.rtx.Delete(reqId)
}
}
t.rtx.Store(reqId, t.conn.Client.ReadOnlyTransaction())
return nil
}
func (t *WrappedTransaction) BeginReadWriteTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
fmt.Println(reqId)
if tx, ok := t.tx.Load(reqId); ok {
if _, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
t.tx.Delete(reqId)
}
}
tx, err := spanner.NewReadWriteStmtBasedTransaction(ctx, t.conn.Client)
if err != nil {
return err
}
t.tx.Store(reqId, tx)
return nil
}
func (t *WrappedTransaction) CompleteTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
ms, _ := t.ms.LoadOrStore(reqId, []*spanner.Mutation{})
if ms, ok := ms.([]*spanner.Mutation); ok {
if tx, ok := t.tx.Load(reqId); ok {
if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
err = tx.BufferWrite(ms)
if err != nil {
fmt.Println(err)
tx.Rollback(ctx)
} else {
_, err = tx.Commit(ctx)
if err != nil {
tx.Rollback(ctx)
fmt.Println(err)
return err
}
}
}
}
}
if rtx, ok := t.rtx.Load(reqId); ok {
if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
v.Close()
}
}
t.ms.Delete(reqId)
t.tx.Delete(reqId)
t.rtx.Delete(reqId)
return err
}
func (t *WrappedTransaction) RollbackTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if tx, ok := t.tx.Load(reqId); ok {
if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
tx.Rollback(ctx)
}
}
return nil
}
func (t *WrappedTransaction) ApplyMutationWithoutTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if ms, ok := t.ms.LoadOrStore(reqId, []*spanner.Mutation{}); ok {
if msv, ok := ms.([]*spanner.Mutation); ok {
_, err := t.conn.Client.Apply(ctx, msv)
if err != nil {
return err
}
}
}
return nil
}
// Mutationを登録する
// repositoryから呼び出される想定
func (t *WrappedTransaction) AppendMutation(ctx context.Context, m *spanner.Mutation) error {
fmt.Println("AppendMutation ---")
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
fmt.Println(reqId)
if ms, ok := t.ms.Load(reqId); ok {
if ms, ok := ms.([]*spanner.Mutation); ok {
ms = append(ms, m)
t.ms.Store(reqId, ms)
}
} else {
newMs := []*spanner.Mutation{m}
t.ms.Store(reqId, newMs)
}
fmt.Println(t.ms)
return nil
}
// ここからSpanner Clientのトランザクションオブジェクトの関数のラッパー
// repositoryから呼び出される想定
func (t *WrappedTransaction) AnalyzeQuery(ctx context.Context, stmt spanner.Statement) (*sppb.QueryPlan, error) {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return nil, err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.AnalyzeQuery(ctx, stmt)
}
}
if rtx, ok := t.rtx.Load(reqId); ok {
if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
return rtxv.AnalyzeQuery(ctx, stmt)
}
}
return nil, errors.New("any transactions has not been started yet.")
}
func (t *WrappedTransaction) BatchUpdate(ctx context.Context, stmts []spanner.Statement) (_ []int64, err error) {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return nil, err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.BatchUpdate(ctx, stmts)
}
}
if _, ok := t.rtx.Load(reqId); ok {
return nil, errors.New("ReadOnlyTransaction does not support operation: BatchUpdate")
}
return nil, errors.New("any transactions has not been started yet.")
}
func (t *WrappedTransaction) BufferWrite(ctx context.Context, ms []*spanner.Mutation) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.BufferWrite(ms)
}
}
if _, ok := t.rtx.Load(reqId); ok {
return errors.New("ReadOnlyTransaction does not support operation: BufferWrite")
}
return errors.New("any transactions has not been started yet.")
}
func (t *WrappedTransaction) Query(ctx context.Context, stmt spanner.Statement) (*spanner.RowIterator, error) {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return nil, err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.Query(ctx, stmt), nil
}
}
if rtx, ok := t.rtx.Load(reqId); ok {
if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
return rtxv.Query(ctx, stmt), nil
}
}
return nil, errors.New("any transactions has not been started yet.")
}
// TODO: 他にもSpanner Clientの関数で呼び出したいものがあれば適宜追加
// context.Contextからmiddlewareで入れたリクエスト識別子を取得する
func (t *WrappedTransaction) getReqIdFromCtx(ctx context.Context) (string, error) {
v := ctx.Value("request-id")
reqId, ok := v.(string)
if !ok {
return "", errors.New("context value 'request-id' is not found")
}
return reqId, nil
}
トランザクション管理系処理
func (t *WrappedTransaction) BeginSingleReadTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
if rtx, ok := t.rtx.Load(reqId); ok {
if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
v.Close()
t.rtx.Delete(reqId)
}
}
t.rtx.Store(reqId, t.conn.Client.Single())
return nil
}
こちらはReadOnlyTransactionの開始処理の抜粋です。
-
context.Context
からリクエスト識別子取得 - 一応既に該当キーにトランザクションオブジェクトが存在した場合は
Close()
して、Map
から削除 - Spanner Client経由(
t.conn.Client.Single()
)でトランザクションオブジェクトを生成してMap
に保存
他のトランザクション開始処理も同様です。
func (t *WrappedTransaction) CompleteTx(ctx context.Context) error {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return err
}
ms, _ := t.ms.LoadOrStore(reqId, []*spanner.Mutation{})
if ms, ok := ms.([]*spanner.Mutation); ok {
if tx, ok := t.tx.Load(reqId); ok {
if tx, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
err = tx.BufferWrite(ms)
if err != nil {
fmt.Println(err)
tx.Rollback(ctx)
} else {
_, err = tx.Commit(ctx)
if err != nil {
tx.Rollback(ctx)
fmt.Println(err)
return err
}
}
}
}
}
if rtx, ok := t.rtx.Load(reqId); ok {
if v, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
v.Close()
}
}
t.ms.Delete(reqId)
t.tx.Delete(reqId)
t.rtx.Delete(reqId)
return err
}
こちらはトランザクションの完了処理です。どの種類のトランザクションが流れていた場合でも対応できるように、それぞれの場合で処理を分岐させています。
(sync.Map
のデータがinterface{}
のせいで型アサーションまみれになっててすいません。もっと良い書き方ありそうです。。)
ReadWriteStmtBasedTransaction
Mutation
を取得しBufferWrite()
後Commit()
。失敗したらRollback()
。
ReadOnlyTransaction
Close()
するだけ。
共通
最後に該当キーの値を削除。
Spanner Clientのトランザクションオブジェクトの関数のラッパー
func (t *WrappedTransaction) Query(ctx context.Context, stmt spanner.Statement) (*spanner.RowIterator, error) {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return nil, err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.Query(ctx, stmt), nil
}
}
if rtx, ok := t.rtx.Load(reqId); ok {
if rtxv, ok := rtx.(*spanner.ReadOnlyTransaction); ok {
return rtxv.Query(ctx, stmt), nil
}
}
return nil, errors.New("any transactions has not been started yet.")
}
ReadOnlyTransaction
とReadWriteStmtBasedTransaction
それぞれに存在している関数については、それぞれ該当関数を呼び出します。
どちらも存在しなければそもそもトランザクションが始まっていないのでエラーを発生させます(どちらかは必ずあるはず)。
func (t *WrappedTransaction) BatchUpdate(ctx context.Context, stmts []spanner.Statement) (_ []int64, err error) {
reqId, err := t.getReqIdFromCtx(ctx)
if err != nil {
return nil, err
}
if tx, ok := t.tx.Load(reqId); ok {
if txv, ok := tx.(*spanner.ReadWriteStmtBasedTransaction); ok {
return txv.BatchUpdate(ctx, stmts)
}
}
if _, ok := t.rtx.Load(reqId); ok {
return nil, errors.New("ReadOnlyTransaction does not support operation: BatchUpdate")
}
return nil, errors.New("any transactions has not been started yet.")
}
これはReadWriteStmtBasedTransaction
にか存在しない関数なので、ReadOnlyTransaction
の中で呼ばれたらエラーとしています。
Serviceでの使い方
最後に、先程説明をスキップしていた、Serviceでどうやってトランザクション管理するかについてです。
例として、複数テーブル(HogeテーブルとFugaテーブル)のデータ挿入をアトミックに実行する例です。
func (hfs *hogefugaService) StoreHogeFuga(ctx context.Context, hoge *model.Hoge, fuga *model.Fuga) error {
// トランザクション開始
err := hfs.tx.BeginReadWriteTx(ctx)
if err != nil {
return err
}
// データ更新処理
err = hfs.hrepo.Create(ctx, hoge)
if err != nil {
// エラーだったらトランザクションロールバック
hfs.tx.RollbackTx(ctx)
return err
}
fuga := &model.Fuga{
// ...
}
err = hfs.frepo.Create(ctx, fuga)
if err != nil {
// エラーだったらトランザクションロールバック
hfs.tx.RollbackTx(ctx)
return err
}
// トランザクションコミット
return hfs.tx.CompleteTx(ctx)
}
いい感じにServiceでトランザクション管理ができるようになりました!!
まとめ
この方法ですが、メリットとデメリットは下記の通りです。
メリット
- serviceでトランザクション管理ができる
- repository側でトランザクションを意識する必要が無い(=クエリ実行に集中できる)
- トランザクション管理を一箇所で管理できる
- トランザクション管理層への依存関係がまぁまぁ自然(serviceからはインターフェース経由、repositoryからは実態)
デメリット
- トランザクションオブジェクトの
Close()
し忘れをコンパイル時に検知できない- コードレビューなどで机上確認するのが原則になってきますが、セーフティネットとしてMiddlewareで必ず該当するトランザクションを
Close()
する処理を入れることで回避できそうです(middlewareがtransactionaのinterfaceに依存してしまう点は若干微妙) - 同様に、panic時も
recover()
で全トランザクションを(可能なら)終了するような処理が必要かも
- コードレビューなどで机上確認するのが原則になってきますが、セーフティネットとしてMiddlewareで必ず該当するトランザクションを
- Repositoryパターンとしては中途半端
- 例えばバックエンドがSpannerとRedisと混在してた場合どうしようっていう問題とか
- 本当の意味で透過的にトランザクション管理できているとは言えない
-
ReadWriteStmtBasedTransaction
はSpanner側の一時的なエラー発生時のリトライ処理までは面倒を見てくれないので、そこは自前でカバーする必要がある- クロージャー版の
ReadWriteTransaction
は面倒見てくれます
- クロージャー版の
試しに上記と同じ構成で実装してみて、MAXPROC=4でgoroutineたくさん生やしてみましたがきちんとトランザクション管理されているようでした(先述の通りAbortがいくつか発生していて、そこのリトライ処理はやはりちゃんとやっておかないといけない)
以上、参考になれば幸いです!
Discussion