🕰️

Goで作るGraphQLサーバーのdataloaderについて

2024/10/25に公開

今回はGoで作るGraphQLサーバーのdataloaderについて書いていきたいと思います。

dataloaderとは

dataloaderはアプリケーションのデータ取得レイヤーで効率良くデータ取得できるようにバッチ処理やキャッシュを行う機能です。Goでdataloaderを実現したパッケージはいくつかありますが、今回はgraph-gophers/dataloaderというパッケージを用いて実装した例について書いていきます。
https://github.com/graph-gophers/dataloader

使用する主要なパッケージ

  • gqlgen
  • echo
  • graph-gophers/dataloader

なぜdataloaderを使う必要があるのか

dataloaderを使う理由は、GraphQLクエリを実行したときに発生するN+1問題を解決するためです。dataloaderではLazyLoading(遅延読み込み)の仕組みを利用してN+1問題を解決するというアプローチを取ります。

EagerLoadingとLazyLoadingの違いについて

N+1問題を解決する方法にはEagerLoading(先読み込み)とLazyLoading(遅延読み込み)がありますが、この違いについても触れていこうと思います。

EagerLoading

EagerLoadingは必要となるデータを事前に読み込むことでN+1問題が発生するのを解決します。例えばuserと紐づくpostのデータを使って処理を行いたいとなったとき、user情報をデータベースから取得する際に一緒にそれに紐づくpostの情報も取得しておき、必要に応じてpostの情報を使って処理を行います。
仮にEagerLoadingしなかった場合、IDが1、2、3のuserが存在し、それぞれのuserに紐づくpostの情報を取得しようとすると以下のようなSQLが走ることになるでしょう。

// userの数だけSQLが走るので非効率
select * from posts where user_id = 1;
select * from posts where user_id = 2;
select * from posts where user_id = 3;

このような問題が発生するのを防ぐために事前に紐づく情報を取得しておくことでデータベースへクエリを投げる量が減って、結果的にパフォーマンスの向上に繋がります。

LazyLoading

LazyLoadingは、処理を行う際に必要なデータの情報を何かしらの方法で宣言し、必要になったときにまとめて取得する方法です。先ほどの例で言うと以下のようなイメージです。

// これだと非効率なので
select * from posts where user_id = 1;
select * from posts where user_id = 2;
select * from posts where user_id = 3;

// どのuser_idに紐づく情報が必要になるのかが分かってからまとめて取得する。
select * from posts where user_id in(1, 2, 3);

EagerLoadingとLazyLoadingのメリット、デメリットは以下のようなものがあげられます。

  • EagerLoading

    • メリット
      フレームワークやORMなどに標準で備わっていることも多いので実装が簡単。
    • デメリット
      どの情報が必要になるのか詳細は確認せずに一括でデータ取得を行うので無駄なデータ取得が多くなる場合がある。
  • LazyLoading

    • メリット
      必要なデータしか取得しないので効率が良い。
    • デメリット
      実装が複雑になりがち。

なぜdataloaderではLazyLoadingを使うのか

データを効率的に取得する方法として2パターンを紹介しましたが、dataloaderではなぜLazyLoadingを使うのでしょうか。その理由はGraphQLのデータ取得方法の特性にあります。以下はGoのresolverのコードの一例です。

func (r *queryResolver) User(ctx context.Context, id int64) (*graphql1.UserDetail, error) {
	return r.UserUsecase.Fetch(ctx, int64(id))
}

func (r *userDetailResolver) Posts(ctx context.Context, obj *graphql1.UserDetail) ([]*graphql1.PostDetail, error) {
	return r.PostUsecase.FetchByUserID(ctx, obj.ID)
}

func (r *postDetailResolver) Comments(ctx context.Context, obj *graphql1.PostDetail) ([]*graphql1.CommentDetail, error) {
	return r.CommentUsecase.FetchByPostID(ctx, obj.ID)
}

このresolverのコードに以下のようなクエリを投げるとします。

query {
  User(ID: 1) {
    name
    posts{
      title
      comments{
        content
      }
    }
  }
}

このとき、user_id = 1で取得できたpostのidが[1, 2, 3]だった場合、resolverは返り値一つ一つに対して処理を行うことになるので何も対策をしないと以下のような非効率なSQLが走ることになります。

select * from comments where post_id = 1;
select * from comments where post_id = 2;
select * from comments where post_id = 3;

resolverはいつ実行されるかや、resolverの結果を使ってどのresolverが次に実行されるかを先読みすることは基本的に行いません。そのため先に後々必要になるデータを取得しておくEagerLoadingではなくLazyLoadingを使用する必要があるのです。

具体的な実装

一通りdataloaderを使用する前段階の説明が終わったところで、具体的な実装例について解説していきます。今回の実装例ではざっくり以下のような処理の流れになります。

  1. server.goでリクエストを受け取る
  2. server.goでcontextにdataloaderをキーバリューの形でセット
  3. リクエストの内容に応じてキーを使ってdataloaderを呼び出してデータ取得

順を追って解説していきます。

1. server.goでリクエストを受け取る

まずはリクエストを受け取る部分についてです。

func main() {
    // 略
	g := e.Group("/query") // パスをグループ化
	g.Use(dataloader.DataLoaderMiddleWare()) // コンテキストにdataloaderをセットするためのミドルウェアを設定
    // 略
}

パスのグループ化についてですが、これはechoの機能です。/queryで始まるエンドポイント(GraphQLのqueryやmutationを処理するエンドポイント)を効率良く管理するためにグループ化を行います。グループ化を行うことで、GraphQL関連のリクエストに対してのみdataloaderに関するミドルウェアを適用しています。

2. server.goでcontextにdataloaderをキーバリューの形でセット

続いてcontextにdataloaderをセットする箇所についてです。手順1の例のコードの以下の箇所でそれを行っています。

g.Use(dataloader.DataLoaderMiddleWare())

ここのdataloader.DataLoaderMiddleWare()は以下のような実装になっており、ここでcontextにdataloaderをセットしています。

func DataLoaderMiddleWare() echo.MiddlewareFunc {
	return func(next echo.HandlerFunc) echo.HandlerFunc {
		return func(c echo.Context) error {
                        // キーバリューの形式でcontextにdataloaderをセット
			ctx := context.WithValue(
				c.Request().Context(),
				dataloader.CommentLoadKey, // キー
				dataloader.NewBatchedLoader(BatchGetComments()) // dataloader
			)
			c.SetRequest(c.Request().WithContext(ctx))
			return next(c)
		}
	}
}

NewBatchedLoader関数は実際にdataloaderとして機能するLoader構造体のインスタンスを作成するためのコンストラクタ関数です(詳細は後述します)。BatchGetComments()の部分がデータ取得を行うバッチ関数で、これがdataloaderにおける中心的なロジックと言えます。BatchGetComments()の実装は以下のようなイメージです。

func (d *commentDataloaderImpl) BatchGetComments(ctx context.Context, keys []int64) []*dataloader.Result[*model.Comment] {
        // データを取得
	comments, err := d.repository.FetchByPostIDs(ctx, keys)
	if err != nil {
            return ErrorResults[int64, *model.Comment](keys, err)
        }
        // 取得したデータを並び替えるためのmapを作成
        commentMap := make(map[int64]*model.Comment, len(keys))
        for _, comment := range comments {
            commentMap[comment.ID] = append(commentMap[comment.ID], commentDetail)
        }
        return SortResults[int64, *model.Comment](keys, commentMap, true)
}

func ErrorResults[T comparable, U any](keys []T, err error) []*dataloader.Result[U] {
        // エラーが発生した場合は、dataloader.Result構造体のErrorフィールドにエラー内容を格納して返す。
	results := make([]*dataloader.Result[U], len(keys))
        // キーの数と同じだけレスポンス(Result構造体)を返す必要がある。
	for i := range results {
		results[i] = &dataloader.Result[U]{
			Error: err,
		}
	}
	return results
}

func SortResults[T comparable, U any](keys []T, m map[T]U, nullable bool) []*dataloader.Result[U] {
        // キーと同じ順序で紐づくデータが返却できるように並び替える
	results := make([]*dataloader.Result[U], len(keys))
	for i, key := range keys {
		obj, ok := m[key]
		if !ok && !nullable {
			results[i] = &dataloader.Result[U]{
				Error: fmt.Errorf("data not found. key: %v", key),
			}
			continue
		}
		results[i] = &dataloader.Result[U]{
			Data: obj,
		}
	}
	return results
}

バッチ処理の返り値はdataloader.Result構造体として返されます。この構造体には以下のように取得したデータもしくはデータ取得時に発生したエラーが含まれています。

type Result[V any] struct {
	Data  V
	Error error
}

このバッチ関数で返すデータは、引数で受け取ったIDなどと同じ順序で返す必要があります(例:IDが[1,2,3]の順で引数に渡された場合、[ID1に紐づくデータ,ID2に紐づくデータ,ID3に紐づくデータ]の順に紐づくデータを返す必要がある)。そのため、適切な順序にデータを並び替える必要があります。

また、引数で受け取ったキーの数と返却するResult構造体の数が一致しない場合もエラーが発生してしまいます(詳細は後述します)。そのため、ErrorResults関数で処理しているように、データ取得時などにエラーが発生した場合もキーの数だけResult構造体を返すように実装する必要があります。

3. リクエストの内容に応じてキーを使ってdataloaderを呼び出してデータ取得

では最後にdataloaderを呼び出す部分のコードを見ていきます。

func (r *postDetailResolver) Comments(ctx context.Context, obj *graphql1.PostDetail) ([]*graphql1.CommentDetail, error) {
        // contextからdataloaderを取得
	loader := dbDataloader.DataloaderFromCtx[dbDataloader.CommentDataloaderKey, dataloader.Interface[int64, []*graphql1.CommentDetail]](ctx, dbDataloader.CommentLoadKey)
        // データ取得処理を予約。即時実行はされない。一定時間待機した後、その時間内に実行されたLoadメソッドにより登録された複数の値(この場合ID)をまとめてバッチ関数に渡してデータ取得処理が実行される。
	thunk := loader.Load(ctx, obj.ID)
        // thunk関数を実行する。thunk関数の実行はデータ取得処理が行われるまでブロックされる。データ取得処理完了後、戻り値として取得したデータを返す。
	result, err := thunk()
	if err != nil {
		return nil, err
	}
	return result, nil
}

// キーを使ってcontextから適切なdataloaderを返す
func DataloaderFromCtx[T, U any](ctx context.Context, key T) U {
	l, ok := ctx.Value(key).(U)
	if !ok {
		panic(fmt.Sprintf("no loader for key %v", key))
	}
	return l
}

ここでは手順2でcontextに登録したdataloaderをキーを使って取得して使用しています。Loadメソッドは一定時間待機することで時間内のリクエストを取りまとめ、それをバッチ関数に渡してまとめてデータ取得が行えるようにしています。要はここでLazyLoadingを実現しているということです。取得されたデータはLoadメソッドから返されるthunk関数の返り値として取得できるという仕組みになっています(Loadメソッドの詳細は後述します)。

かなり簡単に書きましたが、以上がdataloaderを使ってデータ取得を行う際の流れになります。

graph-gophers/dataloaderの実装

ここからは使用しているパッケージのgraph-gophers/dataloaderの内部実装の主要箇所について見ていきたいと思います。

NewBatchedLoader関数

まずは前述の手順2でdataloaderをcontextに登録する際に使用したNewBatchedLoader関数についてです。実装は以下のようになっており、バッチ関数とオプションを引数に取り、Loader構造体を作成しています。

// NewBatchedLoader constructs a new Loader with given options.
func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] {
	loader := &Loader[K, V]{
		batchFn:  batchFn,
		inputCap: 1000,
		wait:     16 * time.Millisecond,
	}

	// Apply options
	for _, apply := range opts {
		apply(loader)
	}

	// Set defaults
	if loader.cache == nil {
		loader.cache = NewCache[K, V]()
	}

	if loader.tracer == nil {
		loader.tracer = NoopTracer[K, V]{}
	}

	return loader
}

オプションはデフォルトの最大キューサイズ(inputCap)や待機時間(wait)を編集したい時などに使用することができます。例えば待機時間を変更したいときは以下のように実装することで対応できます。

// dataloaderのオプションの設定を格納する変数を定義。
var opt dataloader.Option[int64, []*graphql.Comment]
// WithWait関数を使って待機時間を上書きするオプションを変数に格納。
/*
WithWait関数の実装
func WithWait[K comparable, V any](d time.Duration) Option[K, V] {
	return func(l *Loader[K, V]) {
		l.wait = d
	}
}
*/
opt = dataloader.WithWait[int64, []*graphql.Comment](50*time.Millisecond)
// オプションをNewBatchedLoaderメソッドに渡して適用させる。
dataloader.NewBatchedLoader(BatchGetComments(), opt)

こうすることでデフォルトでは16*time.Millisecondとなっている待機時間を50*time.Millisecondに上書きすることができます。

Loader構造体

NewBatchedLoader関数によって作成されるLoader構造体は以下のような実装になっています。

// Loader implements the dataloader.Interface.
type Loader[K comparable, V any] struct {
	// the batch function to be used by this loader
	batchFn BatchFunc[K, V]

	// the maximum batch size. Set to 0 if you want it to be unbounded.
	batchCap int

	// the internal cache. This packages contains a basic cache implementation but any custom cache
	// implementation could be used as long as it implements the `Cache` interface.
	cacheLock sync.Mutex
	cache     Cache[K, V]
	// should we clear the cache on each batch?
	// this would allow batching but no long term caching
	clearCacheOnBatch bool

	// count of queued up items
	count int

	// the maximum input queue size. Set to 0 if you want it to be unbounded.
	inputCap int

	// the amount of time to wait before triggering a batch
	wait time.Duration

	// lock to protect the batching operations
	batchLock sync.Mutex

	// current batcher
	curBatcher *batcher[K, V]

	// used to close the sleeper of the current batcher
	endSleeper chan bool

	// used by tests to prevent logs
	silent bool

	// can be set to trace calls to dataloader
	tracer Tracer[K, V]
}

Loader構造体は中身のフィールドが非公開となっているため、NewBatchedLoader関数をコンストラクタ関数としてインスタンスを作成する必要があります。

dataloaderのインターフェース

このLoader構造体は以下のインターフェースを実装しており、これによりdataloaderの機能を実現しています。

type Interface[K comparable, V any] interface {
        // 1つの値K(idなど)をもとにデータ取得を行うメソッド。
	Load(context.Context, K) Thunk[V]
        // 値Kを複数渡すことができるLoadメソッド。
	LoadMany(context.Context, []K) ThunkMany[V]
        // 値Kに紐づくキャッシュを削除するメソッド。
	Clear(context.Context, K) Interface[K, V]
        // キャッシュを全て削除するメソッド。
	ClearAll() Interface[K, V]
        // キャッシュをセットするメソッド。
	Prime(ctx context.Context, key K, value V) Interface[K, V]
}

Loadメソッド

Loadメソッドについても少し深ぼってみます。

func (l *Loader[K, V]) Load(originalContext context.Context, key K) Thunk[V] {
	ctx, finish := l.tracer.TraceLoad(originalContext, key)

	c := make(chan *Result[V], 1)
	var result struct {
		mu    sync.RWMutex
		value *Result[V]
	}

	// lock to prevent duplicate keys coming in before item has been added to cache.
	l.cacheLock.Lock()
	if v, ok := l.cache.Get(ctx, key); ok {
		defer finish(v)
		defer l.cacheLock.Unlock()
		return v
	}

	thunk := func() (V, error) {
		result.mu.RLock()
		resultNotSet := result.value == nil
		result.mu.RUnlock()

		if resultNotSet {
			result.mu.Lock()
			if v, ok := <-c; ok {
				result.value = v
			}
			result.mu.Unlock()
		}
		result.mu.RLock()
		defer result.mu.RUnlock()
		var ev *PanicErrorWrapper
		if result.value.Error != nil && errors.As(result.value.Error, &ev) {
			l.Clear(ctx, key)
		}
		return result.value.Data, result.value.Error
	}
	defer finish(thunk)

	l.cache.Set(ctx, key, thunk)
	l.cacheLock.Unlock()

	// this is sent to batch fn. It contains the key and the channel to return
	// the result on
	req := &batchRequest[K, V]{key, c}

	l.batchLock.Lock()
	// start the batch window if it hasn't already started.
	if l.curBatcher == nil {
		l.curBatcher = l.newBatcher(l.silent, l.tracer)
		// start the current batcher batch function
		go l.curBatcher.batch(originalContext)
		// start a sleeper for the current batcher
		l.endSleeper = make(chan bool)
		go l.sleeper(l.curBatcher, l.endSleeper)
	}

	l.curBatcher.input <- req

	// if we need to keep track of the count (max batch), then do so.
	if l.batchCap > 0 {
		l.count++
		// if we hit our limit, force the batch to start
		if l.count == l.batchCap {
			// end the batcher synchronously here because another call to Load
			// may concurrently happen and needs to go to a new batcher.
			l.curBatcher.end()
			// end the sleeper for the current batcher.
			// this is to stop the goroutine without waiting for the
			// sleeper timeout.
			close(l.endSleeper)
			l.reset()
		}
	}
	l.batchLock.Unlock()

	return thunk
}

Loadメソッドにはデータを取得する処理やキャッシュの仕組みが含まれています。キャッシュが存在する場合はキャッシュされたthunk関数を返し、存在しなければ新たにthunk関数を作成してそれを返すような仕組みになっています。キャッシュの確認は以下の箇所で行われています。

// 複数のゴルーチンがキャッシュにアクセスしないようにロック。
l.cacheLock.Lock()
// キャッシュがあればそれを返して処理終了。なければ後続の処理が続く。
if v, ok := l.cache.Get(ctx, key); ok {
    defer finish(v)
    defer l.cacheLock.Unlock()
    return v
}

thunk関数は以下の箇所で作成されています。

thunk := func() (V, error) {
    // 他のゴルーチンがresultにアクセスしないようにロック。
    result.mu.RLock()
    resultNotSet := result.value == nil
    result.mu.RUnlock()

    // result.valueがセットされていない場合チャネルからデータを受け取るまで待機。
    if resultNotSet {
        result.mu.Lock()
        if v, ok := <-c; ok {
            result.value = v
        }
        result.mu.Unlock()
    }
    result.mu.RLock()
    defer result.mu.RUnlock()
    var ev *PanicErrorWrapper
    if result.value.Error != nil && errors.As(result.value.Error, &ev) {
        l.Clear(ctx, key)
    }
    return result.value.Data, result.value.Error
}

ここのチャネルに対してデータを返すリクエストは以下のように実装されています。

// バッチ処理のリクエストを作成
req := &batchRequest[K, V]{key, c}

// 他のゴルーチンがバッチ処理にアクセスしないようロック。
l.batchLock.Lock()
// バッチ処理が開始されていない場合は開始する。
if l.curBatcher == nil {
    l.curBatcher = l.newBatcher(l.silent, l.tracer)
    // ここでバッチ処理を行う。キーとResult構造体の数が合わないとここでエラーが出る。
    go l.curBatcher.batch(originalContext)
    l.endSleeper = make(chan bool)
    // バッチ処理の待機時間を管理
    go l.sleeper(l.curBatcher, l.endSleeper)
}
// バッチ処理のキューにリクエストを追加
l.curBatcher.input <- req

手順2で引数で受け取ったキーの数と返却するResult構造体の数が一致しない場合もエラーが発生すると書いていましたが、このエラーを返す処理が上のl.curBatcher.batch(originalContext)の部分です。実装は以下のようになっています。

func (b *batcher[K, V]) batch(originalContext context.Context) {
// 略
        // キーと取得したデータの数が一致するか見ている。
	if len(items) != len(keys) {
		err := &Result[V]{Error: fmt.Errorf(`
			The batch function supplied did not return an array of responses
			the same length as the array of keys.

			Keys:
			%v

			Values:
			%v
		`, keys, items)}

		for _, req := range reqs {
			req.channel <- err
			close(req.channel)
		}

		return
	}
// 略
}

公開されている他のメソッドについての解説は今回は割愛します。気になる方はぜひ調べてみてください。

最後に

今回はGoで実装するGraphQLのdataloaderについて簡単にまとめてみました。dataloaderはGraphQLを使用する際には必須の技術ですが、実装が少々難しいのが難点です。ただ、パッケージを使えば割とシンプルに実装できるので便利ですね。

最後までご覧いただきありがとうございました。

参考

https://github.com/graph-gophers/dataloader

https://zenn.dev/hsaki/books/golang-graphql

https://speakerdeck.com/syumai/xonokotosheng-cheng-tegqlgennodataloadershi-zhuang-wole-nisitahua

https://tech.layerx.co.jp/entry/2022/06/13/120000

Discussion