🐺

errorgroup.SetLimitとTryGoでgoroutineの同時実行数を制御する

2022/12/09に公開約2,600字

TL;DR

  • 今年の5月にerrgroupにSetLimitとTryGoが追加されてた
  • SetLimiteg.Go() で実行するgoroutineの同時実行数を指定して、TryGo はSetLimitの実行数よりgoroutineが下回っていたら差し込めるよって感じみたい。

今までの方法

goroutineの実行数を制御する方法としてよく見かけるのは x/sync/semaphore を使って手動で重さを確保・リリースすることだと思います。

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
sem := semaphore.NewWeighted(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		if err := sem.Acquire(egctx, 1); err != nil {
			return err
		}
		defer sem.Release(1)
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

また、もっとシンプルな方法として、channelに値を入れれるまで待機する仕様を使い、実行するときに入れて終わるときに取り出すという方法もあります。
https://mattn.kaoriya.net/software/lang/go/20171221111857.htm

var eg errgroup.Group
limit := make(chan struct{}, 10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		limit <- struct{}{}
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		<-limit
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

errgroup.SetLimitとTryGoを使う方法

goroutineの実行終了を待機するのにsync.Waitgroup を使うなら上記の方法でやることになりますが、その場合はgoroutineでErrorを返せないので x/sync/errgroup も多いはずです。
(これはこれでWaitしたときに最初にgoroutineから返したErrorしか返さないので、複数Errorを受け取りたい場合は自作する必要があります)

errgroupを使う場合は行終了を待機の他に新たにgoroutineの同時実行数を制限するSetLimitを使えるようになりました。
SetLimitに最大で同時に実行できる数をセットするとerrgroup内で実行するgoroutineが制限を超えないように自動的に待機します。

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

また、goroutineの同時実行数が最大を下回っているときだけ実行したい場合はTryGoを使うことで一時的に実行数を超えしまうということが無いように調整できます。こちらは最大数を下回っていたらgoroutineの実行を予約するわけではないので注意が必要です。
(予約する目的ならeg.Goで追加すればいいので)

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
	result := eg.TryGo(func() error {
		return nil
	})
	fmt.Printf("%d の差し込みは %v\n", i, result)
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

使用例としてはgoroutine内でwarnを受け取ったときにwarn解決のためのgoroutineを追加実行する、TryGoできなかったらgoroutineの実行数が埋まっているので別の経路で処理をするなどが思いつくでしょうか。

Playground
https://go.dev/play/p/e03Lk9HOJP4

Discussion

ログインするとコメントできます