🌀
【Go】並列実行を楽に、安全にする為にラッパー関数作る
概要
アプリケーションを実装していて、並列処理を書きたいタイミングは多々あると思う。
とはいえ、各々によって書き方が異なると
- 可読性の損失
- 書く人によって同時実行数を制御してなかったり、などのバグ
に繋がったりする。
そこでラッパー関数を用意してあげる事によって上記のような問題を回避する。
関数
特徴としては、
- errgroupを使って、goroutineの処理内のエラーをハンドリング出来るように
- semaphoreを使って、並列処理の最大同時実行数を制限出来るように
辺り。
semaphoreはあまり知られてないので、かなりオススメ。
import (
"context"
"fmt"
"runtime"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
type Function func(ctx context.Context) error
type Functions []Function
func RunParallelism(ctx context.Context, functions Functions, parallelism int64) error {
eg, ctx := errgroup.WithContext(ctx)
// 最大実行可能数となるWeight(重み)を決める。
sem := semaphore.NewWeighted(parallelism)
// 関数をgoroutineで実行していく
for _, f := range functions {
eg.Go(wrapRecoverParallelism(ctx, sem, f))
}
// 全ての関数を実行するまで待機
if err := eg.Wait(); err != nil {
return serrors.Stack(err)
}
return nil
}
func wrapRecoverParallelism(ctx context.Context, sem *semaphore.Weighted, f Function) func() error {
return func() (err error) {
// goroutineごとにpanicはrecoverしてあげる必要がある
defer func() {
if r := recover(); r != nil {
var stacktrace string
for depth := 0; ; depth++ {
_, file, line, ok := runtime.Caller(depth)
if !ok {
break
}
stacktrace += fmt.Sprintf(" %v:%d\n", file, line)
}
err = errors.New("panic recovered: %v\n%s", r, stacktrace)
}
}()
// 重みを+1して、関数を実行し終わったら-1する
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
if err := f(ctx); err != nil {
return err
}
return nil
}
}
実際に使うとき
超シンプルに、functionsというスライスを生成してRunParallelismに渡してあげるだけ。
list := []string{"1","2"}
functions := make(Functions, 0, len(list))
for _, e := range list {
// 実行したい処理をappend
functions = append(functions, func(ctx context.Context) error {
fmt.Println(e)
return nil
})
}
if err := RunParallelism(ctx, 2); err != nil {
return err
}
最後に
並列処理は使い所を間違えたりすると致命的な障害に繋がったりしてしまう(めっちゃメモリ使ってoomkillされてたり)ので、チームで書き方、使い方は統一しておいたほうが何かと良い。
Discussion