🍷

errorgroup.SetLimitとTryGoでgoroutineの同時実行数を制御する

2022/12/09に公開6

TL;DR

  • 今年の5月にerrgroupにSetLimitとTryGoが追加されてた
  • SetLimiteg.Go() で実行するgoroutineの同時実行数を指定して、TryGo はSetLimitの実行数よりgoroutineが下回っていたら差し込めるよって感じみたい。

今までの方法

goroutineの実行数を制御する方法としてよく見かけるのは x/sync/semaphore を使って手動で重さを確保・リリースすることだと思います。

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
sem := semaphore.NewWeighted(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		if err := sem.Acquire(egctx, 1); err != nil {
			return err
		}
		defer sem.Release(1)
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

また、もっとシンプルな方法として、channelに値を入れれるまで待機する仕様を使い、実行するときに入れて終わるときに取り出すという方法もあります。
https://mattn.kaoriya.net/software/lang/go/20171221111857.htm

var eg errgroup.Group
limit := make(chan struct{}, 10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		limit <- struct{}{}
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		<-limit
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

errgroup.SetLimitとTryGoを使う方法

goroutineの実行終了を待機するのにsync.Waitgroup を使うなら上記の方法でやることになりますが、その場合はgoroutineでErrorを返せないので x/sync/errgroup も多いはずです。
(これはこれでWaitしたときに最初にgoroutineから返したErrorしか返さないので、複数Errorを受け取りたい場合は自作する必要があります)

errgroupを使う場合は行終了を待機の他に新たにgoroutineの同時実行数を制限するSetLimitを使えるようになりました。
SetLimitに最大で同時に実行できる数をセットするとerrgroup内で実行するgoroutineが制限を超えないように自動的に待機します。

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

また、goroutineの同時実行数が最大を下回っているときだけ実行したい場合はTryGoを使うことで一時的に実行数を超えしまうということが無いように調整できます。こちらは最大数を下回っていたらgoroutineの実行を予約するわけではないので注意が必要です。
(予約する目的ならeg.Goで追加すればいいので)

ctx := context.Background()
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(10)
for i := 0; i < 50; i++ {
	i := i
	eg.Go(func() error {
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Hello, 世界 %d\n", i)
		return nil
	})
	result := eg.TryGo(func() error {
		return nil
	})
	fmt.Printf("%d の差し込みは %v\n", i, result)
}
if err := eg.Wait(); err != nil {
	log.Fatal(err)
}

使用例としてはgoroutine内でwarnを受け取ったときにwarn解決のためのgoroutineを追加実行する、TryGoできなかったらgoroutineの実行数が埋まっているので別の経路で処理をするなどが思いつくでしょうか。

Playground
https://go.dev/play/p/e03Lk9HOJP4

Discussion

mjhdmjhd

ドキュメント読んだり動かしてみて、GoとTryGoの唯一の違いは実行数の上限に達していた場合に、ブロックするか、falseを返すだけかだと思いました。
逆に言えばGoでもTryGoでも新しいgoroutineを発行できます。

goroutineの同時実行数が最大を下回っているときだけ実行したい場合はTryGoを使うことで一時的に実行数を超えしまうということが無いように調整できます。

これはGoでも一緒なのかなと思いました。

こちらは最大数を下回っていたらgoroutineの実行を予約するわけではないので注意が必要です。

最大数を下回っていれば、goroutineが発行されます。

TryGoの主な使い道は、別のシグナルを待っているようなループの中でGoしたいときなど、ループがブロックしてしまうと他の処理に影響が出てしまう場合が思いつきました。

Rustのmpscのrecv/try_recvなどもブロック、非ブロックの違いがあって似ているかなと思いました。
https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.try_recv

kolukukoluku

コメントありがとうございます。
1年前の記事なので、「あれそうだったけ」と思いながら返しますね。

ドキュメント読んだり動かしてみて、GoとTryGoの唯一の違いは実行数の上限に達していた場合に、ブロックするか、falseを返すだけかだと思いました。

逆に言えばGoでもTryGoでも新しいgoroutineを発行できます。

TryGoは同時実行数を超えている場合はやっぱりgoroutineを作らずにfalseを返しますね。
https://cs.opensource.google/go/x/sync/+/refs/tags/v0.5.0:errgroup/errgroup.go;l=91-98

即時にgoroutineを動かしたいけど同時実行数を超えたくないという場面に役に立つのがTryGoなのかなと思います。

これはGoでも一緒なのかなと思いました。

ここはちょっと日本語がおかしいですね。ありがとうございます。
同時実行数を超えないように動かすのはGoでもTryGoでも同じですが、goroutineを作って動くまで投げっぱなしにできるGoとは大きく違うと思います。

TryGoの主な使い道は、別のシグナルを待っているようなループの中でGoしたいときなど、ループがブロックしてしまうと他の処理に影響が出てしまう場合が思いつきました。

Web APIのように呼び出し制限があるようなものに対してGoでキューを組んだりするのが基本だとして、TryGoで先方のリソースが開放されるのを待つワーカーのような使い方になるかなと思いつきましたが、goroutineの中で待機したほうが無難な気もするというところでやはり使いたい場面が思いつかなかったです。

mjhdmjhd

TryGoは同時実行数を超えている場合はやっぱりgoroutineを作らずにfalseを返しますね。

そうです。Goの場合はその時点でブロックします。

即時にgoroutineを動かしたいけど同時実行数を超えたくないという場面に役に立つのがTryGoなのかなと思います。

これはGoもTryGoも一緒で即時ではないです。どちらもgoroutineの実行タイミングに違いはないはずです。

同時実行数を超えないように動かすのはGoでもTryGoでも同じですが、goroutineを作って動くまで投げっぱなしにできるGoとは大きく違うと思います。

投げっぱなしにできる、というよりはその時点でブロックする、というのが正しい表現だと思います。ブロックしている間、goroutineは発火しません。(コード読んでも、goする前にチャンネル待ちしてる)

大きく違うのは、同時実行数が上限に至った場合に

  • Go: ブロックしてそこで処理がとまる
  • TryGo: ブロックせずにfalseだけ返して処理を続行する

だと思います。

参考に貼っていただいたコードをみるとこうなってますね

func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{} // 上限に達していたらここでブロックする
	}

         // 上限が緩和されたらgoroutineが発火する
	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel(g.err)
				}
			})
		}
	}()
}

func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}: // 上限が緩和されていれば続行する
		default: // 上限に達していたらブロックせずにfalseをreturnする
			return false
		}
	}

         // 上限が緩和されたらgoroutineを発火する
	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel(g.err)
				}
			})
		}
	}()
	return true
}

使用例

eg.SetLimit(10)

for i, _ := range data {
  eg.Go(...) // 10並列に達したら、ここでループが止まる。goroutineが一個終了すると、ループが再開される
  // 上でブロックしているので、並列数が解消されるまでここは処理されない
}

// 基本無限ループで使うことになりそう
for {
  result := eg.TryGo(...) // 10並列に達したら、falseが帰りgoroutineが発火せずにループが続く
  // ブロックしないのでselectでチャンネルのclose待ち、Contextのcancel待ちなど、その他の処理を続行できる
}
kolukukoluku

Go: ブロックしてそこで処理がとまる
TryGo: ブロックせずにfalseだけ返して処理を続行する

まとめていただいてありがとうございます。自分もこの認識であってます。

最終的にGoとTryGoをどう使い分けるのかというところに話が行くと思うのですが、SetLimitを使うということは

  • 似た処理を複数同時に走らせたい
  • channelに新しい値が入るたびにgoroutineで処理したい

ということになるのでGoでgoroutineの実行予約的な使い方になるはずです。

// job worker的な
go func() {
	var eg errgroup.Group
	eg.SetLimit(10)
	for c := range ch {
		c := c
		eg.Go(func() error {
			// something job
		})
	}
	if err := eg.Wait(); err != nil {
		log.Fatal(err)
	}
}()

TryGoも同じような文脈で考えるべきで、SetLimitの中でTryGoで動かすfuncがGoと違う処理をするのはerrgroupの文脈でないほうがいいので、プロポーザルのIssueにもあるようにgoroutineが同時実行数を超えるならしばらく逐次処理にするといったような使い方になりそうです。
https://github.com/golang/go/issues/27837#issuecomment-453178824

つまり元の記事に書いた

使用例としてはgoroutine内でwarnを受け取ったときにwarn解決のためのgoroutineを追加実行する、

は使い方としては微妙で、

TryGoできなかったらgoroutineの実行数が埋まっているので別の経路で処理をするなどが思いつくでしょうか。

こちらの考え方が合ってそうかなと感じました。

ただ、SetLimitの値が十分に小さくて重たくない・時間のかからない処理であればGoでも問題にならないはずなので、TryGoが使える場面があるのは想像できるが使いたい状況になるかは微妙な気がします。
なので、基本的にはSetLimitとGoを使って、TryGoは記憶の片隅に置いておくで良いかなと個人的に思います。

お陰様で理解が深まりました。ありがとうございました。

mjhdmjhd

プロポーザルのIssueにもあるようにgoroutineが同時実行数を超えるならしばらく逐次処理にするといったような使い方になりそうです。

なるほど、その発想はありませんでした。確かに使えますね。
僕の書いた使用例の、別のシグナルを待ちたいケースというのも一例だと思います。

基本的にはSetLimitとGoを使って、TryGoは記憶の片隅に置いておくで良いかなと個人的に思います。

そうですね。タイトルやTL;DR、記事本文はSetLimit/TryGoが重視されているので、読者の方は注意が必要ですね。

mjhdmjhd

僕の書いた使用例の、別のシグナルを待ちたいケースというのも一例だと思います。

これについては別の方からアドバイスを頂いて別goroutineに分けてchanで制御すべきとの助言を頂いたので撤回します。
ごちゃごちゃいってしまってすいませんでした:pray: