🐾

SpannerのInactiveTransactionRemovalOptionsを試す

2024/12/26に公開

はじめに

SpannerではReadOnlyTransactionやイテレーター処理で defer による終了処理を忘れると簡単にセッションリークが起こりやがてクエリーが詰まってしまいます。

まだPreviewですが自動検知・開放してくれる仕組みがしばらく前から提供されているので試してみます。

実験

以下のように最大セッション数を5つにしたクライアントを用意してforループの中で使い回すようにします。

	config := spanner.ClientConfig{
		SessionPoolConfig: spanner.SessionPoolConfig{
			MaxOpened:                         5,
			MinOpened:                         1,
		},
	}
	cli, err := spanner.NewClientWithConfig(ctx, dsn, config)
	if err != nil {
		return err
	}
	defer cli.Close()

リークしないパターン

まずは正常系としてクエリーをfor文で10連続で回してみます。

	for i := 0; i < 10; i++ {
		_, err := query(ctx, cli)
		if err != nil {
			return fmt.Errorf("failed to query: %w", err)
		}
		println(i)
	}
func query(ctx context.Context, cli *spanner.Client) ([]*User, error) {
	rotx := cli.ReadOnlyTransaction()
	defer rotx.Close()

	stmt := spanner.NewStatement("SELECT * FROM Users")
	iter := rotx.Query(ctx, stmt)
	defer iter.Stop()

	var uu []*User
	for {
		row, err := iter.Next()
		if err != nil {
			if err == iterator.Done {
				break
			}
			return uu, nil
		}
		var u User
		if err := row.ToStruct(&u); err != nil {
			return nil, err
		}
		uu = append(uu, &u)
	}

	return uu, nil
}

想定通り10回クエリーを投げて終了しました。

$ go run main.go
0
1
2
3
4
5
6
7
8
9

リークさせる

訓練されたGopherは大体イテレーターの類をdeferで閉じる癖があります(個人の見解です)。しかしSpannerのGoライブラリでReadOnlyTransactionを発行する際 Single() 以外ではイテレーターとは別にこちらも手動で閉じなければなりません(1敗)。

以下のようにコメントアウトして意図的にリークさせます。

func query(ctx context.Context, cli *spanner.Client) ([]*User, error) {
	rotx := cli.ReadOnlyTransaction()
//	defer rotx.Close()

想定通り6つ目のクエリーが詰まってしまいました。

$ go run main.go
0
1
2
3
4

自動解消オプションとロガーを入れる

通常の使い方としては以下のように検知時の挙動を指定してやればよいです。

spanner.InactiveTransactionRemovalOptions{
	ActionOnInactiveTransaction: spanner.WarnAndClose,
}

が、デフォルトのリークチェック間隔が1時間と非常に長いので実験に支障をきたしてしまいます。またこの設定は非公開フィールドなので真っ当な手段では変更できません。

https://github.com/googleapis/google-cloud-go/blob/e9a8e3ad04b655dcf202c989f0295892b62e2b37/spanner/session.go#L539-L540

そこで許されざる呪文「reflect + 生ポインター」を使いstructの非公開フィールドにアクセスして無理やり3秒に書き換えましょう。ついでなのでクリーンアップの実行間隔も同様にして縮め、更にロガーも仕込みましょう。

些細なミスでpanicしたりSpanner側が予期しない振る舞いをするかもしれないのでプロダクションコードでやってはいけません。

	lg := log.New(os.Stdout, "spanner-client: ", log.Lshortfile)

	itro := &spanner.InactiveTransactionRemovalOptions{
		ActionOnInactiveTransaction: spanner.WarnAndClose,
	}

	elm := reflect.ValueOf(itro).Elem()
	thresholdVal := elm.FieldByName("idleTimeThreshold")
	thresholdPo := (*time.Duration)(unsafe.Pointer(thresholdVal.UnsafeAddr()))
	*thresholdPo = 3 * time.Second

	freqVal := elm.FieldByName("executionFrequency")
	freqPo := (*time.Duration)(unsafe.Pointer(freqVal.UnsafeAddr()))
	*freqPo = 3 * time.Second

	config := spanner.ClientConfig{
		SessionPoolConfig: spanner.SessionPoolConfig{
			MaxOpened:                         5,
			MinOpened:                         1,
			InactiveTransactionRemovalOptions: *itro,
		},
		Logger: lg,
	}
	cli, err := spanner.NewClientWithConfig(ctx, dsn, config)
	if err != nil {
		return err
	}
	defer cli.Close()

その後実行してみると以下のようにセッションリークの検知・強制解消とその旨がログに出力され、無事に10回目までループできました。なお理由は不明ですが私の環境では3秒にセットしても実際に発動するまで1, 2分程度かかったので何か他の要素も影響しているようです。

$ go run main.go
0
1
2
3
4
spanner-client: client.go:1378: session projects/sample-project/instances/sample-instance/databases/sample-database/sessions/AIOVj2zT4Oy-y94dXfCJ1lz9pOZzmTk2OhaX0xVR-PPqr2KfikBj1Vhgolof8Q checked out of pool at 2024-12-26T18:16:40+09:00 is long running and will be removed due to possible session leak for goroutine:
Enable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session
spanner-client: client.go:1378: session projects/sample-project/instances/sample-instance/databases/sample-database/sessions/AIOVj2xmTEVc6y43B79m654T-aP0XKGR4PWKfE1kJYq56ApsS6TLclk7lzN1XQ checked out of pool at 2024-12-26T18:16:41+09:00 is long running and will be removed due to possible session leak for goroutine:
Enable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session
spanner-client: client.go:1378: session projects/sample-project/instances/sample-instance/databases/sample-database/sessions/AIOVj2y_CLscBvelcawnMWLjtQD2ky71O_PhbXDZalq8knj1p_BYtbCNHJBPNA checked out of pool at 2024-12-26T18:16:41+09:00 is long running and will be removed due to possible session leak for goroutine:
Enable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session
spanner-client: client.go:1378: session projects/sample-project/instances/sample-instance/databases/sample-database/sessions/AIOVj2z1qDSPgraNuCTl6c2UWv3iACKpSi4e48ELGzXh7YLevS4Wg8jGInSY-Q checked out of pool at 2024-12-26T18:16:41+09:00 is long running and will be removed due to possible session leak for goroutine:
Enable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session
spanner-client: client.go:1378: session projects/sample-project/instances/sample-instance/databases/sample-database/sessions/AIOVj2xZTl5ubHQEW3Nb-uzu0hjFPpsKvS1SnYQopjZboJ_sxSON3NUUu3XI-w checked out of pool at 2024-12-26T18:16:41+09:00 is long running and will be removed due to possible session leak for goroutine:
Enable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session
5
6
7
8
9

全コード

main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"reflect"
	"time"
	"unsafe"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

type User struct {
	UserID    string    `spanner:"UserID"`
	CreatedAt time.Time `spanner:"CreatedAt"`
}

func main() {
	if err := run(); err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
}

func run() error {
	ctx := context.Background()
	dsn := fmt.Sprintf("projects/%s/instances/%s/databases/%s", os.Getenv("SPANNER_PROJECT_ID"), os.Getenv("SPANNER_INSTANCE_ID"), os.Getenv("SPANNER_DATABASE_ID"))
	lg := log.New(os.Stdout, "spanner-client: ", log.Lshortfile)

	itro := &spanner.InactiveTransactionRemovalOptions{
		ActionOnInactiveTransaction: spanner.WarnAndClose,
	}

	elm := reflect.ValueOf(itro).Elem()
	thresholdVal := elm.FieldByName("idleTimeThreshold")
	thresholdPo := (*time.Duration)(unsafe.Pointer(thresholdVal.UnsafeAddr()))
	*thresholdPo = 3 * time.Second

	freqVal := elm.FieldByName("executionFrequency")
	freqPo := (*time.Duration)(unsafe.Pointer(freqVal.UnsafeAddr()))
	*freqPo = 3 * time.Second

	config := spanner.ClientConfig{
		SessionPoolConfig: spanner.SessionPoolConfig{
			MaxOpened:                         5,
			MinOpened:                         1,
			InactiveTransactionRemovalOptions: *itro,
		},
		Logger: lg,
	}
	cli, err := spanner.NewClientWithConfig(ctx, dsn, config)
	if err != nil {
		return err
	}
	defer cli.Close()

	for i := 0; i < 10; i++ {
		_, err := query(ctx, cli)
		if err != nil {
			return fmt.Errorf("failed to query: %w", err)
		}
		println(i)
	}

	return nil
}

func query(ctx context.Context, cli *spanner.Client) ([]*User, error) {
	rotx := cli.ReadOnlyTransaction()
	defer rotx.Close()

	stmt := spanner.NewStatement("SELECT * FROM Users")
	iter := rotx.Query(ctx, stmt)
	defer iter.Stop()

	var uu []*User
	for {
		row, err := iter.Next()
		if err != nil {
			if err == iterator.Done {
				break
			}
			return uu, nil
		}
		var u User
		if err := row.ToStruct(&u); err != nil {
			return nil, err
		}
		uu = append(uu, &u)
	}

	return uu, nil
}

おわりに

実際の発動条件としてセッション使用率95%以上且つ検知まで最大1時間以上かかるのでクエリー詰まり防止としてはアテにできないでしょう。

あくまで保険としてSpannerクライアントにロガーを仕込んだ上でログ監視する等早期検知を目的にするのよさそうです。

参考文献

Discussion