🌀

Goでクラウド上のフォルダ探索を非同期化する

に公開

株式会社ナレッジワークのmokekoです.

この記事では, Goでフォルダ探索を非同期化する際に起こった問題とその対処ついて紹介します💃

背景

ナレッジワークには, クラウドファイルストレージ (Google Drive, OneDriveなど) と連携して特定のフォルダの中身を一括で取り込む機能があります.

ある日, この機能がタイムアウトで動かなくなる問題が起こりました. 実行環境の制約によって10分以内に処理を終える必要があったのですが, 顧客が増える中で想定外に大きいサイズのフォルダが指定されるようになり, 処理が終わらなくなったのです.

根本的にはアーキテクチャレベルでの変更が必要になりますが, それでは開発規模が大きくなりすぎるので, ひとまずコードレベルで対処することにしました.

元の実装

ボトルネックになっていたのは指定されたフォルダの子要素一覧を取得する部分であり, 元の実装は以下のようになっていました. 実際にはcontextやerrorのハンドリングが必要ですが, サンプルコードを短くするために省略しています.

// ファイルやディレクトリを表す構造体
type Item struct {
	Path string
	Type string // "file" or "dir"
}

type StorageAPI interface {
	// クラウドフォルダストレージのAPIを呼び出してディレクトリの中身を取得する
	FetchItems(dirPath string) []Item
}

type mockStorageAPI struct{}

func (m *mockStorageAPI) FetchItems(dirPath string) []Item {
	panic("mock implementation")
}

var storageAPI = &mockStorageAPI{} // 実際は本物のAPIクライアントをセットする

// rootPath以下の全てのファイルとディレクトリを同期的に列挙する
func ListItemsSync(rootPath string) []Item {
	allItems := storageAPI.FetchItems(rootPath)

	for _, item := range allItems {
		if item.Type == "dir" {
			items := ListItemsSync(item.Path)
			allItems = append(allItems, items...)
		}
	}

	return allItems
}

外部APIの呼び出しを同期的に行っているところが気になりますね. 非同期化すればすぐに早くなりそうな気がします. ひとつ注意点として, 非同期化する場合は同時実行数の制限は入れておく必要があります. 自分たちのサーバーや呼び出し先のサーバーを保護するためです.

うまくいかないパターン

問題を理解するために, まずは動きそうで動かない実装を2つ紹介します.

errgroup + SetLimit

1つ目はerrgroupを使うパターンです. Goで非同期化を考えるとき, まずはこれを試す場合が多いのではないでしょうか. errgroupは SetLimit によって同時実行数の制限もできます. 実装は以下のようになります.

import (
    "sync"

    "golang.org/x/sync/errgroup"
)

// 結果を収集するためのutility
type AtomicSlice[T any] struct {
	v  []T
	mu sync.RWMutex
}

func (a *AtomicSlice[T]) Append(v ...T) {
	a.mu.Lock()
	defer a.mu.Unlock()
	a.v = append(a.v, v...)
}

func (a *AtomicSlice[T]) Get() []T {
	a.mu.RLock()
	defer a.mu.RUnlock()
	return append([]T{}, a.v...)
}

// 非同期版の実装
func ListItemsAsyncWithErrGroup1(rootPath string) []Item {
	eg := &errgroup.Group{}
	eg.SetLimit(maxConcurrency)

	allItems := &AtomicSlice[Item]{}

	// errgroupは再帰関数の外で作って渡す
	listItemsAsyncWithErrGroup1(rootPath, eg, allItems)

	_ = eg.Wait()

	return allItems.Get()
}

func listItemsAsyncWithErrGroup1(path string, eg *errgroup.Group, allItems *AtomicSlice[Item]) {
	items := storageAPI.FetchItems(path)

	for _, item := range items {
		if item.Type == "dir" {
			// maxConcurrencyを超えるとeg.Goの呼び出しがブロックする
			eg.Go(func() error {
				listItemsAsyncWithErrGroup1(item.Path, eg, allItems)
				return nil
			})
		}
	}

	allItems.Append(items...)
}

なんとなく動きそうに見えますが, この実装はデッドロックを起こします. errgroupの実装を見てみると, SetLimit を設定済みで既に上限いっぱいまでgoroutineが動いている場合, eg.Go呼び出し側がブロックします.
よって

  • eg.Go の呼び出しが終わらないとそのgoroutineは終わらず, goroutineの数が減らない
  • goroutineの数が減らないと eg.Go の呼び出しが終わらない

という状況になり, デッドロックが発生します.

ワーカープール + chan

同時実行数を制限しつつ並列実行する他の方法として, ワーカープールも検討してみます.

const chanSize = 100

func ListItemsAsyncWithWorkerPool1(rootPath string) []Item {
	allItems := &AtomicSlice[Item]{}

	tasks := make(chan string, chanSize)
	taskWg := &sync.WaitGroup{}

	// ワーカーの起動
	workerWg := &sync.WaitGroup{}
	for range maxConcurrency {
		workerWg.Add(1)
		go func() {
			defer workerWg.Done()
			worker1(tasks, allItems, taskWg)
		}()
	}

	// 初期タスクの投入
	taskWg.Add(1)
	tasks <- rootPath

	// タスクが完了したらchanを閉じてワーカーを終了させる
	go func() {
		taskWg.Wait()
		close(tasks)
	}()

	workerWg.Wait()

	return allItems.Get()
}

func worker1(tasks chan string, results *AtomicSlice[Item], taskWg *sync.WaitGroup) {
	for path := range tasks {
		items := storageAPI.FetchItems(path)
		for _, item := range items {
			if item.Type == "dir" {
				taskWg.Add(1)
				tasks <- item.Path // chanSizeを超えるとここでブロックする
			}
			results.Append(item)
		}
		taskWg.Done()
	}
}

こちらもerrgroupの場合と同じ問題があります. Goのチャンネルはバッファサイズが有限なので, それを超えると

  • タスクを終えるには次のタスクをchanに積む必要がある
  • タスクを終えなければchanが空かず, 次のタスクを積むことができない

という状況になり, デッドロックが発生します.

問題点

上に挙げた2つの実装はどちらもデッドロックの問題を抱えていました. この問題はなぜ起こるのでしょうか.

非同期処理を書くときのより簡単なケースでは, プロデューサー (タスクをキューに積む側の処理) とコンシューマー (タスクを消費する側の処理) が分かれています. その場合, 一時的にキューが埋まってプロデューサーが止まったとしても, コンシューマーが動き続ければいずれはキューが空くので問題ありません. 普段目にする頻度はこのパターンの方が多いのではないかと思います.

一方, 今回は再帰関数で書かれたフォルダ探索処理を非同期化しようとした結果, プロデューサーとコンシューマーが同じ処理となってしまいました. この状況ではチャンネルのバッファサイズや同時実行数など, 有限のキューサイズとして働く要素があるとデッドロックを起こしてしまいます.

解決策

シンプルな解決策として, タスク数の最大値が分かる状況ならば, 単にchanのバッファサイズを十分大きく取っておくという手もあります. しかし, サービスが成長する中で気づかないうちにタスク数が増える可能性は十分考えられるので, 個人的にはこの方法は取りたくありません. 実行環境のメモリがある限りはよしなに動いてほしいところです. メモリ使用率は基本的なモニタリングの対象になっているので, 使用率の増加に応じてメモリを増やしていけば対応できます.

容量制限のないキューを作る

実際にどう対応したかというと, 「バッファサイズが有限で困るなら無限にすればええんや!!!」ということで, Goのchanと似た挙動で上限がない非同期キューを作ることにしました.
キューの詳細な実装は省略[1]しますが, 連結リストをsync.Condやsync.Mutexで保護する作りになっています.
フォルダ探索の実装はワーカープールのパターンがベースになっており, chanを使っていた場所がそのまま自作のチャンネルに置き換わった形です.

type UnboundedChan[T any] interface {
	Send(T)             // ブロックしない
	Receive() (T, bool) // 通常のchanと同じ挙動
	Close()
}

type unboundedChan[T any] struct {
	l      list.List
	cond   *sync.Cond
	closed bool
}

func NewUnboundedChan[T any]() UnboundedChan[T] {
	return &unboundedChan[T]{
		l:    list.List{},
		cond: sync.NewCond(&sync.Mutex{}),
	}
}

// 省略

func ListItemsAsyncWithWorkerPool2(rootPath string) []Item {
	allItems := &AtomicSlice[Item]{}

	tasks := NewUnboundedChan[string]()
	taskWg := &sync.WaitGroup{}

	// ワーカーの起動
	workerWg := &sync.WaitGroup{}
	for range maxConcurrency {
		workerWg.Add(1)
		go func() {
			defer workerWg.Done()
			worker2(tasks, allItems, taskWg)
		}()
	}

	// 初期タスクの投入
	taskWg.Add(1)
	tasks.Send(rootPath)

	// タスクが完了したらchanを閉じてワーカーを終了させる
	go func() {
		taskWg.Wait()
		tasks.Close()
	}()

	workerWg.Wait()

	return allItems.Get()
}

func worker2(tasks UnboundedChan[string], allItems *AtomicSlice[Item], taskWg *sync.WaitGroup) {
	for {
		path, ok := tasks.Receive()
		if !ok {
			return // closed
		}
		items := storageAPI.FetchItems(path)
		for _, item := range items {
			if item.Type == "dir" {
				taskWg.Add(1)
				tasks.Send(item.Path) // ブロックしない
			}
			allItems.Append(item)
		}
		taskWg.Done()
	}
}

背景で述べた「大容量のフォルダを扱うとタイムアウトしてしまう」という問題はこの対応でひとまず解決しました🎉
元々動かなかった処理を動くようにしたという経緯もあり, この対応の前後での正確なパフォーマンス比較はできていないのですが, 少なくとも従来比3倍程度の要素数を扱えるようになりました.

errgroup + 手動semaphore

上の対応を終えた後でよりシンプルな実装を知ったので, そちらも紹介します.
errgroup+SetLimitによる実装では eg.Go の呼び出し側がブロックするのが問題でした. この挙動自体はerrgroupの実装によるものであって変えられないのですが, errgroupの SetLimit を使わずに自分でsemaphoreを管理すればどこでブロックするのかを調整できます. eg.Go の呼び出し側ではなく呼び出し後のgoroutineがブロックするように変えればOKです. こちらの方が圧倒的にシンプルですね😇

func ListItemsAsyncWithErrGroup2(rootPath string) []Item {
	eg := &errgroup.Group{}
	sem := semaphore.NewWeighted(maxConcurrency)

	results := &AtomicSlice[Item]{}

	// errgroupは再帰関数の外で作って渡す
	listItemsAsyncWithErrGroup2(rootPath, eg, sem, results)

	_ = eg.Wait()

	return results.Get()
}

func listItemsAsyncWithErrGroup2(rootPath string, eg *errgroup.Group, sem *semaphore.Weighted, results *AtomicSlice[Item]) {
	items := storageAPI.FetchItems(rootPath)

	for _, item := range items {
		if item.Type == "dir" {
			// ブロックしない
			eg.Go(func() error {
				sem.Acquire(context.TODO(), 1) // ここでブロックするが, 別goroutineになっているので問題ない
				defer sem.Release(1)
				listItemsAsyncWithErrGroup2(item.Path, eg, sem, results)
				return nil
			})
		}
	}

	results.Append(items...)
}

2つの方法を比較する

動作する実装パターンを2つ見つけたので, これらを比較してみます.

2つの実装でコードの見た目は割と違うのですが, 抽象化して考えてみると実質的には同じことをやっていると見ていいのではないかと思います[2].
プロデューサーとコンシューマーが同じなのでキューのサイズが有限だとデッドロックするというのが問題なのでした. それに対して, 自作チャンネルのパターンでは陽に容量制限のないキューを使うことで対処しています. errgroup+semaphoreのパターンでは処理がブロックする場所を変えることで対処しているのですが, これは結局メモリがある限りgoroutineを増やし続けるということであり, goroutineのスケジューラーを無限キューとして使っていることになります.
対応としては次のようになります.

  • 自作チャンネル
    • タスク: path
    • キュー: 自作チャンネル
  • errgroup+semaphore
    • タスク: pathを含むgoroutine
    • キュー: goroutineスケジューラー

goroutineスケジューラーがタスクキューだというのは言われてみればそうなのですが, chanと比べるものとして見たことはなかったので, 個人的には新鮮でした.

次に, メリット/デメリットでいうと, errgroup+semaphoreのパターンが圧倒的にシンプルな実装になるので, 基本的にこちらを使うのがいいと思います. 自作チャンネルは雰囲気こそ組み込みのchanと合わせていますが, select 等の構文と組み合わせづらいのでどうしてもコードが長くなりがちです. サンプルコードでは省略しましたが, context cancellのハンドリングなどが入ってくると結構面倒になります.

自作チャンネルのメリットをあえて挙げるならメモリ効率でしょうか. 調べたところ, goroutineは基本的に1つあたり数KB程度のメモリを必要とするようです. 自作キューによる実装の場合, タスク数に応じて増えていくのは連結リストの長さだけで, goroutineの数はワーカー数以上には増えません. 要素がpathであれば10-100byte程度に収まるはずなので, メモリ効率は1-2桁は良いことになります.

おわりに

再帰関数として書かれたフォルダ探索処理を非同期化するときに起こる問題とその対処について説明しました.
Goはerrgroupが便利なので普段はそれに頼ることが多いのですが, 普段のパターンだけでは実装できない問題もシュッと解決できるように精進していこうと思いました💪

KNOWLEDGE WORK Blog Sprint, 明日はVPoEのhidekさんの投稿です. お楽しみに!

脚注
  1. 気になる方は https://github.com/mokeko/ubch をご覧ください ↩︎

  2. 筆者は並行処理について詳しくないので, この感想が妥当なのかについては全く自信はありません😇 ↩︎

株式会社ナレッジワーク

Discussion