Goの並行処理をスッキリ安全に書ける「sourcegraph/conc」ええやんって話
Goの並行処理は簡単に実装できますが、panic処理、goroutineリーク、デッドロック、エラー伝搬などなど、正しく扱うのは簡単ではありませんよね。
そこでおすすめなのが、今回紹介する、sourcegraph/conc です。(以下、「conc」と表記します)
conc
concはGoの並行処理を扱う際に役立つ、便利で安全なツールや機能を提供するパッケージです。
concの主な目標は以下です。
- ゴルーチンのリークをより防ぎやすくする
- パニックを優雅に処理する
- 並行処理のコードをより読みやすくする
コード見たほうが早いと思うので、
panic処理をconcを使うとどうなるか見てみましょう。
標準ライブラリ ver
type caughtPanicError struct {
val any
stack []byte
}
func (e *caughtPanicError) Error() string {
return fmt.Sprintf(
"panic: %q\n%s",
e.val,
string(e.stack),
)
}
func main() {
done := make(chan error)
go func() {
defer func() {
if v := recover(); v != nil {
done <- &caughtPanicError{
val: v,
stack: debug.Stack(),
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic()
}()
err := <-done
if err != nil {
panic(err)
}
}
conc ver
func main() {
var wg conc.WaitGroup
wg.Go(doSomethingThatMightPanic)
wg.Wait()
}
劇的ビフォーアフターですね。
concは、すべての並行処理はスコープ内に収めるべきだという主張をしています。
つまり、ゴルーチンはオーナーを持つべきであり、そのオーナーは所有するゴルーチンのライフサイクルを管理しましょうねということです。
concでは、ゴルーチンのオーナーはconc.WaitGroup
です。
ゴルーチンは(*WaitGroup).Go()
でWaitGroup内に生成され、WaitGroupがスコープ外となる前に(*WaitGroup).Wait
を呼び出すことでオーナーによるライフサイクル管理を実現しています。
いやいや、sync.Addとパニック処理をラップしてるだけやん、同時実行数の制御とかマルチエラーとかどうすんのよ!安心してください、poolで出来ます。
pool
poolには、ゴルーチンの同時実行数制限、エラーの集約、エラー時のコンテキスト・キャンセルなどの機能が用意されています。
タスクが返すT型の値とerrorを集約した例
func resultErrorPool() ([]int, error) {
// NewWithResults: T型の結果を持つタスク用に新しい ResultPool 作成
// WithErrors: ResultPoolをResultErrorPoolに変換、タスクがT型の結果とエラーを返す。
// WithMaxGoroutines: プール内のゴルーチン数を制限、デフォルトは無制限。n < 1 の場合はパニックになる
p := pool.NewWithResults[int]().WithErrors().WithMaxGoroutines(5)
for i := 1; i < 10; i++ {
p.Go(func() (int, error) {
if i%3 == 0 {
return i, fmt.Errorf(fmt.Sprintf("%d is a multiple of 3", i))
}
return i, nil
})
}
return p.Wait()
}
func main() {
numbers, err := resultErrorPool()
if err != nil {
spew.Dump(err)
// (*errors.joinError)(0xc000098000)
// (3 is multiple number 9 is multiple number 6 is multiple number)
}
fmt.Printf("numbers is %v\n", numbers)
// numbers is [1 2 5 8 4 7]
}
func (*ResultErrorPool[T]) Wait
はデフォルトで複合エラーを返します。
これをfunc (*ResultErrorPool[T]) WithFirstError
を使うことでその名の通り、最初のエラーのみを返すようにすることもできます。
また、WithContextでcontextを扱うことも出来ます。
func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) {
p := pool.NewWithResults[string]().WithContext(ctx)
for _, firstName := range firstNames {
p.Go(func(ctx context.Context) (string, error) {
return fetchLastName(ctx, firstName)
})
}
return p.Wait()
}
iter
iterは、スライスの各要素の並行処理処理を安全に行うための機能が用意されています。
iter.Mapはスライスの各要素の操作を並行に処理し、新しいスライスを生成できます。
iter.ForEachでは、スライスの各要素を並行に処理することが出来ます。
names := []string{"frieren", "stark", "fern"}
names = iter.Map[string, string](names, func(s *string) string {
return strings.ToUpper(*s)
})
fmt.Println(names)
// [FRIEREN STARK FERN]
iter.Iterator[string]{}.ForEach(names, func(s *string) {
convertJPName(s)
})
fmt.Println(names)
// [フリーレン シュタルク フェルン]
stream
streamは、並行処理の順序を保持することができます。
s := stream.New().WithMaxGoroutines(15)
for i := 1; i <= 15; i++ {
s.Go(func() stream.Callback {
res := heavyFizzBuzz(i)
return func() { fmt.Println(res) }
})
}
s.Wait()
// 1
// 2
// 3 Fizz
// ...
まとめ
concは、Goの並行処理を安全かつ簡単に扱えるよう抽象化しています。
作者は、並行処理の誤用を最小限に抑えることに重点を置いています。そのため、channelを意図的に組み込んでいいないようです。
個人的には並行処理のボイラーが減りレビューも行いやすくなるように感じたので積極的に使っていきたいなと思いました。
Discussion