Goの並行処理をスッキリ安全に書ける「sourcegraph/conc」ええやんって話

2024/02/26に公開

Goの並行処理は簡単に実装できますが、panic処理、goroutineリーク、デッドロック、エラー伝搬などなど、正しく扱うのは簡単ではありませんよね。
そこでおすすめなのが、今回紹介する、sourcegraph/conc です。(以下、「conc」と表記します)

https://github.com/sourcegraph/conc

conc

concはGoの並行処理を扱う際に役立つ、便利で安全なツールや機能を提供するパッケージです。
concの主な目標は以下です。

  • ゴルーチンのリークをより防ぎやすくする
  • パニックを優雅に処理する
  • 並行処理のコードをより読みやすくする

コード見たほうが早いと思うので、
panic処理をconcを使うとどうなるか見てみましょう。

標準ライブラリ ver

type caughtPanicError struct {
	val   any
	stack []byte
}

func (e *caughtPanicError) Error() string {
	return fmt.Sprintf(
		"panic: %q\n%s",
		e.val,
		string(e.stack),
	)
}

func main() {
	done := make(chan error)
	go func() {
		defer func() {
			if v := recover(); v != nil {
				done <- &caughtPanicError{
					val:   v,
					stack: debug.Stack(),
				}
			} else {
				done <- nil
			}
		}()
		doSomethingThatMightPanic()
	}()
	err := <-done
	if err != nil {
		panic(err)
	}
}

Playground

conc ver

func main() {
	var wg conc.WaitGroup
	wg.Go(doSomethingThatMightPanic)
	wg.Wait()
}

Playground

劇的ビフォーアフターですね。

concは、すべての並行処理はスコープ内に収めるべきだという主張をしています。
つまり、ゴルーチンはオーナーを持つべきであり、そのオーナーは所有するゴルーチンのライフサイクルを管理しましょうねということです。

concでは、ゴルーチンのオーナーはconc.WaitGroupです。
ゴルーチンは(*WaitGroup).Go()でWaitGroup内に生成され、WaitGroupがスコープ外となる前に(*WaitGroup).Waitを呼び出すことでオーナーによるライフサイクル管理を実現しています。

いやいや、sync.Addとパニック処理をラップしてるだけやん、同時実行数の制御とかマルチエラーとかどうすんのよ!安心してください、poolで出来ます。

pool

poolには、ゴルーチンの同時実行数制限、エラーの集約、エラー時のコンテキスト・キャンセルなどの機能が用意されています。

タスクが返すT型の値とerrorを集約した例

func resultErrorPool() ([]int, error) {
    // NewWithResults:    T型の結果を持つタスク用に新しい ResultPool 作成
    // WithErrors:        ResultPoolをResultErrorPoolに変換、タスクがT型の結果とエラーを返す。
    // WithMaxGoroutines: プール内のゴルーチン数を制限、デフォルトは無制限。n < 1 の場合はパニックになる
    p := pool.NewWithResults[int]().WithErrors().WithMaxGoroutines(5)
    for i := 1; i < 10; i++ {
        p.Go(func() (int, error) {
            if i%3 == 0 {
                return i, fmt.Errorf(fmt.Sprintf("%d is a multiple of 3", i))
            }
            return i, nil
        })
    }
    return p.Wait()
}

func main() {
    numbers, err := resultErrorPool()
	if err != nil {
		spew.Dump(err)
		// (*errors.joinError)(0xc000098000)
		// (3 is multiple number 9 is multiple number 6 is multiple number)
	}
	fmt.Printf("numbers is %v\n", numbers)
	// numbers is [1 2 5 8 4 7]
}

Playground

func (*ResultErrorPool[T]) Waitはデフォルトで複合エラーを返します。
これをfunc (*ResultErrorPool[T]) WithFirstErrorを使うことでその名の通り、最初のエラーのみを返すようにすることもできます。

また、WithContextでcontextを扱うことも出来ます。

func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) {
	p := pool.NewWithResults[string]().WithContext(ctx)
	for _, firstName := range firstNames {
		p.Go(func(ctx context.Context) (string, error) {
			return fetchLastName(ctx, firstName)
		})
	}
	return p.Wait()
}

Building conc: Better structured concurrency for Go より引用

iter

iterは、スライスの各要素の並行処理処理を安全に行うための機能が用意されています。
iter.Mapはスライスの各要素の操作を並行に処理し、新しいスライスを生成できます。
iter.ForEachでは、スライスの各要素を並行に処理することが出来ます。

names := []string{"frieren", "stark", "fern"}
names = iter.Map[string, string](names, func(s *string) string {
    return strings.ToUpper(*s)
})
fmt.Println(names)
// [FRIEREN STARK FERN]

iter.Iterator[string]{}.ForEach(names, func(s *string) {
    convertJPName(s)
})
fmt.Println(names)
// [フリーレン シュタルク フェルン]

PlayGround

stream

streamは、並行処理の順序を保持することができます。

s := stream.New().WithMaxGoroutines(15)
for i := 1; i <= 15; i++ {
    s.Go(func() stream.Callback {
        res := heavyFizzBuzz(i)
        return func() { fmt.Println(res) }
    })
}
s.Wait()
// 1
// 2
// 3 Fizz
// ...

PlayGround

まとめ

concは、Goの並行処理を安全かつ簡単に扱えるよう抽象化しています。
作者は、並行処理の誤用を最小限に抑えることに重点を置いています。そのため、channelを意図的に組み込んでいいないようです。
個人的には並行処理のボイラーが減りレビューも行いやすくなるように感じたので積極的に使っていきたいなと思いました。

GitHubで編集を提案

Discussion