💾

Go本体から学ぶTips:メモリサイズ制限をかけながら並行処理

2022/11/27に公開

この記事はQiita Advent Calendar 2022 - Goに参加しています。(と言っても先に公開してしまいますが)

はじめに

年の瀬にコタツに入りながらやることと言えば何があるでしょうか?
みかんを食べる、本を読む、Goのソースコードを読む、ですね。(強引)

"Goに入ってはGoに従え"というように、Goのソースコードを読んで学べることはたくさんあるように思います。今回はその中で最近学んだ面白いTipsを紹介させてもらいます。

まずはどんなコードか紹介

今回発見したTipsはcmd/gofmt/gofmt.goに含まれています。

https://cs.opensource.google/go/go/+/master:src/cmd/gofmt/gofmt.go;l=371;drc=9b89c380208ea2e85985ee6bf2b1d684274dfa1d

gofmt.go
func main() {
	// Arbitrarily limit in-flight work to 2MiB times the number of threads.
	//
	// The actual overhead for the parse tree and output will depend on the
	// specifics of the file, but this at least keeps the footprint of the process
	// roughly proportional to GOMAXPROCS.
	maxWeight := (2 << 20) * int64(runtime.GOMAXPROCS(0))
	s := newSequencer(maxWeight, os.Stdout, os.Stderr)

  // ...省略...
}

同時に走る処理のメモリサイズをざっくり2MiB x スレッド数にする、とあります。そしてそれを実装しているのがsequencerのようです。何をしているのでしょう?

gofmt.go
func newSequencer(maxWeight int64, out, err io.Writer) *sequencer {
	sem := semaphore.NewWeighted(maxWeight)
  // ...省略...
}

semaphoreパッケージを使っていますね。ここが肝になりますが、詳細は次節に持ち越して、これをどんな風に使ってメモリサイズを制限するのでしょうか?

Note: 以下のコードブロック内では説明のためにかなり処理を端折っています。オリジナルはリンク先にありますので適宜ご確認ください。

https://cs.opensource.google/go/go/+/master:src/cmd/gofmt/gofmt.go;l=133;drc=9b89c380208ea2e85985ee6bf2b1d684274dfa1d

gofmt.go
// 一番シンプルな使い方
info, _ := os.Stat("./foobar.go")
s.Add(info.Size(), fn) // fnは何かしらの処理を行う関数。簡単のため割愛。

func (s *sequencer) Add(weight int64, fn func(...)){
  s.sem.Acquire(context.TODO(), weight)
  go func() {
    // do something
    s.sem.Release(weight)
  } 
}

ふむふむ、maxWeightで作ったSemaphoreに対して、処理するファイルのサイズを与えてからgoroutineを走らせています。そのgoroutineの中では処理が完了したらファイルサイズ分だけリリースを行っています。このSemaphoreを使うことで並行処理で使うメモリ数を制限していそうですが、Semaphoreってなんでしたっけ…?

Semaphoreってなんだっけ?

Package semaphore provides a weighted semaphore implementation.

https://pkg.go.dev/golang.org/x/sync/semaphore

排他制御を行うためのパッケージのようです。goroutineを使う際に並行処理数の上限を決めておきたい場合がありますが、そのようなWorkerPoolパターンがパッケージ中のサンプルとして示されています。

処理の流れ:

  1. maxWeightを定めてSemaphoreを生成する。このmaxWeightWorkerPoolパターンであればワーカー数の上限にします。
  2. 並行処理を追加する前にSemaphore.Acquire(weight)する。このとき、すでにmaxWeightに達している場合は空きができるまで待機する。
  3. 処理が完了したらSemaphore.Release(weight)でプールに空きを追加する。

先ほどのgofmtのコードの中ではワーカー数ではなくファイルサイズをweightに使うことで、並行処理で使うメモリサイズを制限していたんですね。この処理はchannelを使っても実装できますが、多少煩雑なので、このように簡単に実装できるパッケージは便利ですね。

おわりに

「セマフォ」って言葉を聞いたことはありましたが、このようなパッケージがあって、サンプルに載っている以外にもこのような使い方があるんだなぁと学ぶことができました。皆さんもGoのコードを眺めてみて、何か発見したTipsがあれば教えてくださいね。

GitHubで編集を提案

Discussion