💪

Goで開発して3年のプラクティスまとめ(3/4): concurrent GO編

2024/07/01に公開

Goで開発して3年のプラクティスまとめ(3/4): concurrent GO編

yet another入門記事です。

ご質問やご指摘がございましたらこの記事のコメントでお願いします。
(ほかの媒体やリンク先に書かれた場合、筆者は気付きません)

Overview

  • concurrentなプログラムを書くためのツールキットであるgoroutine / chan / sync/sub-repositoryのsyncパッケージを紹介します
    • なぜそれらを使わないといけないか(data race)についても述べます
  • signal handling

筆者は「並行」と「並列」がどっちがどっちのことを言ってるのかたびたびわからなくなってしまうので concurrent, concurrency および parallel, parallelism と記述します。

2種の想定読者

記事中では仮想的な「対象読者」と「ベテランとして取り扱われるその他の読者」が想定されています。

対象読者

記事中で「対象読者」と呼ばれる人々は以下のことを指します。

  • 会社の同僚
  • いままでGoを使ってこなかった人
  • ある程度コンピュータとネットワークとプログラムを理解している人
  • pythonとかNode.jsで開発したことある
  • gitは使える。
  • 高校生レベルの英語能力
    • 作ってるところがアメリカ企業なので英語のリンクが全般的に多い

part1以降はA Tour of Goを完了していることと、
ポインター、メモリアロケーション、POSIX(もしくはLinux) syscallなどの基礎的概念がわかっていることが前提条件になっています。

そのほかの読者

特に断りがない時、他の読者も聴衆として想定されます。

  • 筆者と同程度かそれ以上にGoに長じており
  • POSIX APIや通信プロトコル、他のプログラミング言語でよくやられる方法を知っている

というベテラン的な人々です。

記事中に他にいい方法があったら教えてくださいとか書いてますが、大概はこのベテランな人たちに向けて書いているのであって、対象読者は当面気にしないでください(もちろんあったら教えてください)。

対象環境

  • 下層の仕組みに言及するとき、特に述べない限りlinux/amd64を想定します。
  • OS/archに依存するコードは書きません。

version

検証はgo 1.22.0、リンクとして貼るドキュメントは1.22.3のものになります。

## go version
go version go1.22.0 linux/amd64

最近追加されたAPIをちょいちょい使うので1.22.0以降でないと動かないコードがたくさんあります。

直近の3~4 minor versionのみサポートするライブラリが多いとして、Go 1.18でできなくてそれ以降できるようになったことは、○○以降となるだけ書くようにします。

サンプルコードのrepository

サンプルコードの一部は下記にアップロードされます。

https://github.com/ngicks/go-basics-example

concurrent Go

Goではnet/httpなどでサーバープログラムを書く場合、意識することなく concurrent に処理がなされ、大抵の場合 concurrent な処理は parallel に実行されるため、
multi-threadなプログラムで生じうる諸般の問題が起きないようにうまくプログラムを書く必要があります。

https://go.dev/blog/waza-talk

上記の11年前のRob Pikeの講演によれば concurrency とはタスクをいかに分解するかという表現方法/構造であり、分解されたタスクは parallel に(いくつか同時に)実行できます。
Concurrentに問題を分割しておけば、CPUの個数などが増加したときに全体の処理スピードが(理論上)増加量だけ速くすることができるということを述べています。
このアイデアはTony Hoare 1978 Communicating Sequential Processes.という論文に書かれていたものであり、
concurrent に物事を解くためのツールキットとして、channelやgoroutineが存在しています。
Rob Pikeは講演上で「この話が沁みたら家に帰ってCSPの論文を読め」と言っていますね。

ところがこのキーコンセプトを全く理解していなくてもnet/httpでサーバーを書くと

https://github.com/golang/go/blob/go1.22.3/src/net/http/server.go#L3285

という感じで新しいgoroutineでハンドラが実行されます。

goroutine

https://go.dev/ref/spec#Go_statements

Specificationによれば"go"キーワードの後に関数かメソッド呼び出しのExpressionを書くことで、an independent concurrent thread of control, goroutine でそれが実行されます。

goはstatementです。つまり返り値は何もありません。goroutineを特定する方法はありませんし、goroutineを指定して終了させるような方法もありません。
これはpthread_create(3)pthread_tでthead idを返したり、RustのasyncFutureというステートマシンを返したりするのとは対照的です。

pthreadpthread_join(3)で終了を待てるのに対して、goroutineを指定して終了を待つ方法がありません。goroutineとそれを呼び出すコード間でchansync.WaitGroup(後述)などの変数を共有し、goroutineで動作する関数がそれらを通じて明示的に終了を通知する必要があります。

goroutineで動作する関数が終了すればgoroutineもexitします。きちんと終了できるようにするのはユーザーの責任です。

goroutineGOMAXPROCSと同数(デフォルトでは(論理)CPUコアの個数)まで concurrent に実行されます。
GOMAXPROCSかプロセスに与えられたCPUのコア数の少ないほうまで parallel にコードを実行することができます。

playground

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()
	go func() {
		for {
			select {
			case <-ticker.C:
				fmt.Println("tick tok")
			case <-ctx.Done():
				return
			}
		}
	}()

	<-ctx.Done()
	/*
		tick tok
		tick tok
		tick tok
		tick tok
	*/
}()

A Tour of Goで網羅済みの内容ですので今更の紹介は必要ないかもしれませんが、このようにmain goroutineがブロックしてる間も動作しつづけていることと、
プログラム内のほかの変数(この場合tickerctx)にアクセスできることから同じアドレススペースで動いていることもわかると思います。

対象読者はNode.jsの経験があるため、もしかしたらgoroutinePromiseに近い何かだと思うかもしれませんが、
Promiseがコンストラクタに与えられたコールバック関数をeagerに実行するのに対して、goroutineは別段実行順序を保証しません(spec中に記載がありません)。

テストコードなどで明確にgoroutineに処理が切り替わるのを待つ必要がある場合は以下のように何かしらの方法でタイミングをそろえる必要があります。(探したらstd内でテストで似たようなことしてるところがあった)

switchCh := make(chan struct{})
go func() {
	<-switchCh
	// ... do tasks ...
	close(switchCh)
}()
switchCh <- struct{}{} // block until the function above starts working
<-switchCh // wait until the goroutine above exits
goroutineのメモリ消費量

goroutineは非常にメモリの負荷が非常に低いです。必要に応じて大量に作っても問題ありません。

goroutineは非常に軽量なスタック(見たところ2KiBを最低値として,windowsならさらに4KiB, plan9なら追加で512byte, iosかつarmならば追加で1KiB=2~6KiB)をallocateします。その後必要に応じて拡張されていきます。

GOMEMLIMITのデフォルトがmath.MaxInt64(=無制限)であることから、例えばメモリが64GiBのLinux環境であれば

> (64 * 1024**3) / (2*1024)
33554432

ということで3300万個ぐらいのgoroutineを生成するとメモリを使い果たします。

実際に測ってみましょう。
以下のようなコードで100万個のgoroutineを無意味に待たせてみます。

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"os/signal"
	"sync"
)

var (
	num = flag.Uint("n", 1_000_000, "num of goroutines")
)

func main() {
	fmt.Printf("pid = %d\n", os.Getpid())
	flag.Parse()
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()
	var wg sync.WaitGroup
	for range *num {
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-ctx.Done()
		}()
	}
	wg.Wait()
	<-ctx.Done()
}

おおよそ2.5g使います。

top - 15:53:29 up  4:43,  0 users,  load average: 1.93, 0.96, 0.45
Tasks:   1 total,   0 running,   1 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.9 us,  0.4 sy,  0.0 ni, 98.5 id,  0.0 wa,  0.0 hi,  0.1 si,  0.0 st
MiB Mem :  31661.5 total,  14527.8 free,   8854.9 used,   8278.9 buff/cache
MiB Swap:   8192.0 total,   8192.0 free,      0.0 used.  22346.2 avail Mem

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
61144 root      20   0 3812376   2.5g    808 S   0.0   8.0   0:14.08 main

計算より0.5gほど多いですね。まあ計算のほうが正しくないんでしょうね。

まあそんな感じで、実際上はもっとメモリを消費したり(システムがgoroutineを作ったり、*gのallocationがあったり,runtimeがいろいろallocateしたり、そもそも書いてるプログラムが大きなsliceをallocateするなど)、そもそも制限がかかっていたり(linuxによるプロセスごとのメモリリミット、(コンテナランタイムによる)cgroupsなど)ので、そこまでたくさんのgoroutineを作ることはできないかもしれません。実際上記のコードだとsingalのトラップ部分でも結構いろいろallocateされる動きが見えます。

ただgoroutineは大量に生成しても問題ないよっていうことを強調しておきます。実際の上限は(swapが起きて固まるのが怖いので)筆者は測るつもりはないです。

goroutineはすべて終了できるようにする

goroutineは最低2KiB,最大1GBのstackを持ち、詳細な条件は調べてないのでわかりませんが、freeListに入れられて再利用されます。

goroutineがexitしなければこれらがfreeListに戻るなりしないので、leakyなgoroutineはそれだけ無駄なメモリを消費します。

例えばgoplsLanguage Server Protocolに従ってstdin/stdoutごしにjsonrcpで通信を行います。
そのための2つのファイルを1つのduplexなファイルであるかのように見せるために以下のようなfakeConnnet.Connとして返し、

https://github.com/golang/tools/blob/b6235391adb3b7f8bcfc4df81055e8f023de2688/internal/fakenet/conn.go#L18-L29

使われ方そのものはここでは重要ではありませんが、Listenの代わりにこのfakeConnを直接使ってjsonrpcの接続を確立するために使われます。

https://github.com/golang/tools/blob/b6235391adb3b7f8bcfc4df81055e8f023de2688/gopls/internal/cmd/serve.go#L136

NewConngoキーワードが書かれています。
こういう感じで関数がgoroutineを新しく作ることはよくあるわけなんですが、
この場合は以下のようにClosegoroutineを終了できるようにしてあります。

https://github.com/golang/tools/blob/b6235391adb3b7f8bcfc4df81055e8f023de2688/internal/fakenet/conn.go#L59-L65

https://github.com/golang/tools/blob/b6235391adb3b7f8bcfc4df81055e8f023de2688/internal/fakenet/conn.go#L86-L93

このように、自然なリソース解放処理で作られたgoroutineはすべてexitできるようになっているべきです(特にライブラリを作るときは)。
もし、リソース解放に時間がかかる場合は、解放のリクエストと、goroutineのexitを待つ処理を分けて実装するとよいでしょう。

例えば以下みたいに、全部のgoroutineを終了を依頼するメソッドと、全部のgoroutineの終了を待てるメソッドを作っておくほうがよいでしょう。

type Watcher struct {
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

func New() *Watcher {
	ctx, cancel := context.WithCancel(context.Background())
	return &Watcher{
		ctx:    ctx,
		cancel: cancel,
	}
}

type Target interface {
	Events() <-chan any
}

func (w *Watcher) Add(target Target) {
	w.wg.Add(1)
	go func() {
		defer w.wg.Done()
		for {
			select {
			case <-w.ctx.Done():
				// Closeが呼ばれるとreturnする
				return
			case ev := <-target.Events():
				// ... watch logic ...
			}
		}
	}()
}

// Closeで、Addで作られたgoroutineの終了を依頼する。
func (w *Watcher) Close() {
	w.cancel()
}

// WaitはAddで作られたgoroutineがすべてexitするまでブロックする
func (w *Watcher) Wait() {
	w.wg.Wait()
}

Multi-threadedプログラムで起こる典型的問題: data race(race condition)

race condition

Wikipedia articleによれば、ソフトウェア文脈におけるrace conditionとは、複数のコードパスが同時に動作しており、それぞれのコードパスがそれぞれに予測とは異なった時間がかかってしまうことで、予測とは違った順序で処理を完了することで起きます。

これは、ファイルの読み書きであるとか、設定の書き換えであるとかを、複数のスレッド/複数のプログラムが行うことも含みます。状態に依存したり、状態を操作したりする処理をタイミングを合わせる方法なしで行うことで、予測しない状態を観測したりする問題のことを指していると読み取れます。

例を挙げるとプログラムの実行状態を保存したファイルがあって、プログラムは動作を完了するたびにあるフィールドの値をインクリメントするとします。プログラムAとBがあるとき、Aがファイルを読んで、書きこむまでの間にBが書き込みを行ったら、Aはその書き込みに対して上書きを行ってしまい、Bの増加分をなかったことにしてしまうので矛盾した状態になりますよね。

そういったrace conditionの典型の一つにdata raceがあります。

data race

Data Race Detectorによれば、Goにおけるdata raceは複数のgoroutineが同じ変数にアクセスしており、少なくとも1つ以上がwriteであると起きます。
以下があげられているdata raceを起こすコードのスニペットです

func main() {
	c := make(chan bool)
	m := make(map[string]string)
	go func() {
		m["1"] = "a" // First conflicting access.
		c <- true
	}()
	m["2"] = "b" // Second conflicting access.
	<-c
	for k, v := range m {
		fmt.Println(k, v)
	}
}

FAQ: Why are map operations not defined to be atomic?によると、map[T]Uへのconcurrentなアクセスはruntime panicが起こるように特別なチェックがかかっていますので上記のコードもうまくいけば(うまくいかなければ?)fatal panicが起きます(試した限りrecoverできない)。

map[T]Uはruntimeにおいてはhmapという構造体です。これはmapのheaderで内部にポインターを持ちます。mapへのwriteはあれこれ計算したうえでこの構造体に代入を行います(この場合mapassign_faststr)。
readして計算してwriteを行いますので、複数のスレッドが同時にそのような処理を試みるとwrite中にreadして、途中のよくわからない状態を観測しまうことが考えれます。

...が、これを手元で動かしてみると特にエラーなく実行されることが多いですね。

例として試してもらえるように、もう少し不正状態の起きやすいサンプルを示しておきます。
以下のコードを筆者環境で何度か実行するとdata raceによる不正な状態を観測することができました。

a []intに、0から99の値をappendするという処理を、GOMAXPROCSと同数のgoroutineの中で実行します。
data raceが起きていないとき、aは0-99の値のセットを順序不定でGOMAXPROCSと同数だけ持つことになるはずですが、
実際にはdata raceによって全く違う状態を持つこともあるのです。(前述したとおり、数度に1回しか不正な状態は起きません)

snippet

package main

import (
	"fmt"
	"runtime"
	"slices"
	"sync"
)

func main() {
	var a []int

	var wg sync.WaitGroup
	for range runtime.GOMAXPROCS(0) {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for i := range 100 {
				a = append(a, i)
			}
		}()
	}
	wg.Wait()

	group := map[int]int{}
	var keys []int
	for _, num := range a {
		if !slices.Contains(keys, num) {
			keys = append(keys, num)
		}
		group[num] = group[num] + 1
	}
	slices.Sort(keys)
	for _, key := range keys {
		num, count := key, group[key]
		if count != runtime.GOMAXPROCS(0) {
			fmt.Printf(
				"data race caused invalid state at num = %d, count = %d, expected = %d\n",
				num, count, runtime.GOMAXPROCS(0),
			)
		}
	}
	fmt.Printf("result = %#v\n", group)
	/*
	   data race caused invalid state at num = 0, count = 2, expected = 24
	   data race caused invalid state at num = 1, count = 2, expected = 24
	   data race caused invalid state at num = 2, count = 2, expected = 24
	   ...省略..
	   data race caused invalid state at num = 97, count = 2, expected = 24
	   data race caused invalid state at num = 98, count = 2, expected = 24
	   data race caused invalid state at num = 99, count = 2, expected = 24
	   result = map[int]int{0:2, 1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 11:2, 12:2, 13:2, 14:2, 15:2, 16:2, 17:2, 18:2, 19:2, 20:2, 21:2, 22:2, 23:2, 24:2, 25:2, 26:2, 27:2, 28:2, 29:2, 30:2, 31:2, 32:2, 33:2, 34:2, 35:2, 36:2, 37:2, 38:2, 39:2, 40:2, 41:2, 42:2, 43:2, 44:2, 45:2, 46:2, 47:2, 48:2, 49:2, 50:2, 51:2, 52:2, 53:2, 54:2, 55:2, 56:2, 57:2, 58:2, 59:2, 60:2, 61:2, 62:2, 63:2, 64:2, 65:2, 66:2, 67:2, 68:2, 69:2, 70:2, 71:2, 72:2, 73:2, 74:2, 75:2, 76:2, 77:2, 78:2, 79:2, 80:2, 81:2, 82:2, 83:2, 84:2, 85:2, 86:2, 87:2, 88:2, 89:2, 90:2, 91:2, 92:2, 93:2, 94:2, 95:2, 96:2, 97:2, 98:2, 99:2}
	*/
}

race detector

ブログポストData Race Detectorで紹介される通り、Goにはrace detectorというdata raceを検知するためのツールが組み込まれています。

ブログポストで述べられる通り、ビルドを行う各種コマンドに-raceというフラグを追加することでrace detectorが有効になります。

$ go test -race mypkg    // to test the package
$ go run -race mysrc.go  // to run the source file
$ go build -race mycmd   // to build the command
$ go install -race mypkg // to install the package

上記のコードスニペットをrace detector付きで実行すると、以下のように警告がstderrにプリントされます。
(ソースのコードパスは一応--redacted--に置き換える編集をしています。実際にはソースのローカルストレージ上の絶対パスが出力されます)

## go run -race ./snipet/data-race-example/main.go
==================
WARNING: DATA RACE
Read at 0x00c000130000 by goroutine 7:
  main.main.func1()
      /--redacted--/snipet/data-race-example/main.go:19 +0xd0

Previous write at 0x00c000130000 by goroutine 11:
  main.main.func1()
      /--redacted--/snipet/data-race-example/main.go:19 +0x154

Goroutine 7 (running) created at:
  main.main()
      /--redacted--/snipet/data-race-example/main.go:16 +0xb4

Goroutine 11 (finished) created at:
  main.main()
      /--redacted--/snipet/data-race-example/main.go:16 +0xb4
==================
...省略...
==================
WARNING: DATA RACE
Write at 0x00c0002989b0 by goroutine 15:
  main.main.func1()
      /--redacted--/snipet/data-race-example/main.go:19 +0x131

Previous write at 0x00c0002989b0 by goroutine 23:
  main.main.func1()
      /--redacted--/snipet/data-race-example/main.go:19 +0x131

Goroutine 15 (running) created at:
  main.main()
      /--redacted--/snipet/data-race-example/main.go:16 +0xb4

Goroutine 23 (running) created at:
  main.main()
      /--redacted--/snipet/data-race-example/main.go:16 +0xb4
==================
data race caused invalid state at num = 0, count = 6, expected = 24
data race caused invalid state at num = 1, count = 7, expected = 24
data race caused invalid state at num = 2, count = 9, expected = 24
...省略...
data race caused invalid state at num = 97, count = 6, expected = 24
data race caused invalid state at num = 98, count = 6, expected = 24
data race caused invalid state at num = 99, count = 6, expected = 24
result = map[int]int{...省略...}
Found 4 data race(s)
exit status 66

テストは-raceあり/なしどっちもで実行しよう

race detectorは有効/無効でふるまいに違いが出てきます。(特にテストは)-raceあり/なしどちらでも実行してうまく動作するか確認したほうがよいでしょう。

筆者は何度かrace detectorを無効にすると通過しなくなるテストを経験しています。
これはたぶんdata raceではないrace conditionが生じていたのだと思われます。
ですので、タイミングが重要なタイプのライブラリに対しては、テストはrace detectorあり/なしで2度以上実行することをお勧めします。

もちろんしっかりsynchronizationを(テストの中で)とれるように実装する必要はあります。そのためのツールキットは次の節以降で触れていきます。

chan

https://go.dev/ref/spec#Channel_types

Specificationによれば、channelはconcurrentに実行されている関数(goroutine)間でコミュニケートするメカニズムを提供します。

A Tour of Goで網羅済みの内容なので、基本的なことは以下のスニペットのみで省略します。

ch := make(chan int) // unbufferd
// ch := make(chan int, 0) // unbuffered
ch := make(chan int, 100) // buffered

// https://go.dev/ref/spec#Length_and_capacity
len(ch) // number of elements queued in channel buffer
cap(ch) // channel buffer capacity

go func () {
	for {
		// ...何かの処理...
		ch <- 4768 // send
	}
}()

go func () {
	for {
		num := <-ch // recv
	}
}()

Effective Goのchannelの項目で少し進んだ使い方も紹介されているので一通り読んでおくといいでしょう。
ただ1つだけ変わったこととして、

The bug is that in a Go for loop, the loop variable is reused for each iteration

Go1.22以降では正しくありません。for i, v := range s {}i, vはgoroutineをまたいで参照されるのを検知するなどしたら再利用されなくなりました。

それ以外の細かい挙動としては

  • closeを呼ぶことでchennelを閉じることができます。
  • closeされたchannelからの受信は即座にアンブロックします
    • 二つ目の返り値がfalseになることで検知できます
  • nil channelやcloseされたchannelのcloseはパニックです
  • nil channelへの送受信は永久にブロックします

以下のスニペットで挙動をしめします。

playground

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	unbuffered := make(chan struct{})
	buffered := make(chan struct{}, 100)
	fmt.Printf("cap(unbuffred) = %d, cap(buffered) = %d\n", cap(unbuffered), cap(buffered)) // cap(unbuffred) = 0, cap(buffered) = 100

	for range 10 {
		buffered <- struct{}{}
	}
	fmt.Printf("len(unbuffred) = %d, len(buffered) = %d\n", len(unbuffered), len(buffered)) // len(unbuffred) = 0, len(buffered) = 10

	// recv from / send on closed channel
	func() {
		c := make(chan struct{})
		defer func() {
			rec := recover()
			fmt.Printf("recovered: %#v\n", rec) // recovered: "send on closed channel"
		}()
		close(c)
		_, ok := <-c
		fmt.Printf("ok = %t\n", ok) // ok = false
		c <- struct{}{}
	}()

	// close of closed channel
	func() {
		c := make(chan struct{})
		defer func() {
			rec := recover()
			fmt.Printf("recovered: %#v\n", rec) // recovered: "close of closed channel"
		}()
		close(c)
		close(c)
	}()

	// close of nil channel
	func() {
		var c chan struct{}
		defer func() {
			rec := recover()
			fmt.Printf("recovered: %#v\n", rec) // recovered: "close of nil channel
		}()
		close(c)
	}()

	// close of receive-only channel is compilation error
	// cc := make(chan struct{})
	// var c <-chan struct{} = cc
	// close(c) // invalid operation: cannot close receive-only channel c (variable of type <-chan struct{})

	// recv from nil channel
	func() {
		var c chan struct{}
		ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
		defer cancel()
		select {
		case <-c:
		case <-ctx.Done():
			fmt.Printf("timed out: recv on nil channel\n") // timed out: recv on nil channel
		}
	}()

	// send on nil channel
	func() {
		var c chan struct{}
		ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
		defer cancel()
		select {
		case c <- struct{}{}:
		case <-ctx.Done():
			fmt.Printf("timed out: send on nil channel\n") // timed out: send on nil channel
		}
	}()
}

buffer-sizeはどう決めるか?

  • 0: synchronizationしたい
  • 1: 送信でブロックしたくないけど受信はブロックしたい(i.e. asyncに通知を行いたい)
    • e.g. Go 1.23以前のtime.Timerの内部のchannelはbuffer-size 1
    • e.g. signal.Notifyで受け取るchannelは1以上のバッファーをつける必要がある。
      • select { case c <- sig; default: }でnon-blockingに送信するため
    • 後述のnotifier
  • N: pipeline風にデータを引き渡しており、consumerとproducerの間に処理速度差があり、producerがburstyならば、バックプレッシャーをかけ始めたい任意のサイズN
    • 現実的に起こるイベントはほとんどそう(だから在庫管理が一つの学問になるん)だろうという突っ込みはあります。
    • ただし、channelがキューイングしているデータの中身を観測する方法は筆者が知る限り普通にはないため、queueを管理したいならchannelではなく別のqueueを定義したほうが良い。
      • 単なるFIFO queueならchannelを利用するだけで便利だと思いますが、queueがpriority queueであってほしいとかだと特に別の実装が必要になります。

だいたいこんなものでしょうか?何かしらを引用して経験的にこうと述べたかったんですが、goproxyに登録されているすべてのモジュールのmake(chan T)を検索するぐらいしか思いつかなくて、
結局引用なしで筆者がどこかで見たことのまとめになってしまいました。

buffer-size n > 1が便利な場面もあるけど、大抵の場合は0か1だと思います。

channelはどのようにcloseするか

  • closeしない
    • closeしなくてもGCに回収される
    • time.Timerなどは<-chan time.Timeを返してくるが、これらをcloseする方法はないことから、このことがわかる。
      • ただしStopはしよう。Stopを呼ばないとfinalizerが実行されない(Go1.22.0時点)。
  • いろんなところでcloseするのは避ける
    • close of closed channel, send on closed channelでパニックが起きるため、closeの責務を負うのは誰なのかを明確にすべき
    • 例1: make(chan T)を呼んだ関数/structが責任をもってcloseを呼ぶ
    • 例2: chan<- Tを関数が返す時、closeしてもらうことで終了を通知する。ドキュメントにそのように書く。
    • 例3: 関数が<-chan Tを返す時は,呼び出し側はcloseによって終了を通知する。ドキュメントにそのように書く。
  • 場合によってはsync.OnceFuncなどを使って1度しかcloseが呼ばれないのを保証する。
  • structのフィールドにchannelを引き渡すようなケースの場合大分ややこしいのでcloseによる終了の通知を避けることも多い
    • channelから値の読み出しを行うメソッドなりの第一期比数をcontext.Contextとし、Done()で返されるchannelを通じて終了を通知する。

特定のchannelを優先するには1段selectで包む

初期の講演でRob Pike自身も述べていますがselectで送受信が同時に可能になっている場合ランダムにどれかが選ばれるので、優先して送受信したいチャネルがある場合は、1段selectで包む必要があります

for {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-otherChannels:
		case someOther <- 547891:
		}
	}
}

Example: notifier

channel-close型: 受け取り側が複数の場合

<-chan struct{}を返して、返したchannelをcloseすることでnotifyするパターン。
通知される側が複数の時に用いられるのを見たことがある。

特定のタイミング(=Chan呼び出しタイミング)以後にあるイベントの発生(=Notify呼び出し)がわかる。イベントの起こる回数とか頻度の情報が必要なときには使いづらい。

// Single-producer multi-consumer notifier
type SpmcNotifier struct{
	mu sync.Mutex
	ch chan struct{}
}

func (n *SpmcNotifier) Chan() <-chan struct{} {
	n.mu.Lock()
	defer d.mu.Unlock()
	if n.ch == nil {
		n.ch = make(chan struct{})
	}
	return n.ch
}

func (n *SpmcNotifier) Notify() {
	n.mu.Lock()
	defer n.mu.Unlock()
	if n.ch	!= nil {
		ch := n.ch
		n.ch = nil
		close(ch)
	}
}

1-buffered channel型: 受け取り側が単数で、イベントの発生だけわかればいい場合

buffer-size 1のchannelを使ってイベントがあったことだけを保存する。
こちらは通知される側が1つだけ、あるいは1度だけの時によく使うというイメージ。

  • time.Timerが返すCはbuffer-size 1でchannelが埋まっていなければ時間をsendするようになっています。
    • ただしGo.1.23から改善が入るのでそれ以降はこの記述は正しくならなくなりました。
  • signal.Notifyは第一引数でchannelを渡して、プロセスがシグナルを受けとるたび、それを受け取ったchannelにsendされます。
    • このchannelは1以上のバッファーをつける必要があります。
    • select { case c <- sig; default: }でnon-blockingに送信するためです。
// Multi-producer single-consumer notifier
type MpscNotifier struct{
	ch chan struct{}
}

func New() *MpscNotifier {
	return &MpscNotifier{
		ch: make(chan struct{}, 1)
	}
}

func (n *MpscNotifier) Chan() <-chan struct{} {
	return n.ch
}

func (n *MpscNotifier) Notify() {
	select {
		case n.ch <- struct{}{}:
		default:
	}
}

Example: send / recv one of

今まで述べてきた性質を利用して、動的な個数のchannelを使ったselectを実装します。

channelの普通の使い方は難しくないので、この記事の目的に照らし合わせて突っ込んだ話をここでします。

nil channelの送受信は永久にブロックします。逆に言えば、[N]chan Tに対し、任意のインデックスinilを代入すればNを上限とした動的な個数のchannelに送受信できます。
reflectを利用すれば現実的な上限なしの動的な個数のchannelの送受信を実装できます。

以下で与えられた[]chan Tのうちどれか一つからsend / recvする関数を実装してみます。

func Send[T ~[]C, C ~(chan<- E), E any](chans T, v E, cancel <-chan struct{}) (chosen int, sent bool) {
	var c [4]C
	_ = copy(c[:], chans)
	return Send4(c, v, cancel)
}

func Send4[T ~[4]C, C ~(chan<- E), E any](chans T, v E, cancel <-chan struct{}) (chosen int, sent bool) {
	sent = true
	select {
	case <-cancel:
		sent = false
	case chans[0] <- v:
		chosen = 0
	case chans[1] <- v:
		chosen = 1
	case chans[2] <- v:
		chosen = 2
	case chans[3] <- v:
		chosen = 3
	}
	return
}

func Recv[T ~[]C, C ~(<-chan E), E any](chans T, cancel <-chan struct{}) (v E, chosen int, received bool) {
	var c [4]C
	_ = copy(c[:], chans)
	return Recv4(c, cancel)
}

func Recv4[T ~[4]C, C ~(<-chan E), E any](chans T, cancel <-chan struct{}) (v E, chosen int, received bool) {
	received = true
	select {
	case <-cancel:
		received = false
	case v = <-chans[0]:
		chosen = 0
	case v = <-chans[1]:
		chosen = 1
	case v = <-chans[2]:
		chosen = 2
	case v = <-chans[3]:
		chosen = 3
	}
	return
}

このSend4/Recv4を任意数まで実装すれば何個でも対応できます

func Send[T ~[]C, C ~(chan<- E), E any](chans T, v E, cancel <-chan struct{}) (chosen int, sent bool) {
	switch x := len(chans); {
	case x == 0:
		panic("zero chans")
	case x <= 4:
		var c [4]C
		_ = copy(c[:], chans)
		return Send4(c, v, cancel)
	case x <= 8:
		var c [8]C
		_ = copy(c[:], chans)
		return Send8(c, v, cancel)
	case x <= 16:
		var c [16]C
		_ = copy(c[:], chans)
		return Send16(c, v, cancel)
	// ... and so on ...
	}
}

32, 64, 128...と実装していくこと自体は簡単です; code generatorを整備したら簡単にいくらでも増やせます。
ただそうするとコードサイズがすさまじく大きくなりそうなので、現実的ではないですよね。ということで、reflectを使って本当の意味で任意数なchannelに対応します。

func SendN[T ~[]C, C ~(chan<- E), E any](chans T, v E, cancel <-chan struct{}) (chosen int, sent bool) {
	cases := []reflect.SelectCase{{
		Dir:  reflect.SelectRecv,
		Chan: reflect.ValueOf(cancel),
	}}
	for _, ch := range chans {
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectSend,
			Chan: reflect.ValueOf(ch),
			Send: reflect.ValueOf(v),
		})
	}
	chosen, _, _ = reflect.Select(cases)
	if chosen == 0 {
		return
	}
	return chosen - 1, true
}

func RecvN[T ~[]C, C ~(<-chan E), E any](chans T, cancel <-chan struct{}) (value E, chosen int, received bool) {
	cases := []reflect.SelectCase{{
		Dir:  reflect.SelectRecv,
		Chan: reflect.ValueOf(cancel),
	}}
	for _, ch := range chans {
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(ch),
		})
	}
	chosen, recv, _ := reflect.Select(cases)
	if chosen == 0 {
		return
	}
	return recv.Interface().(E), chosen - 1, true
}

func Send[T ~[]C, C ~(chan<- E), E any](chans T, v E, cancel <-chan struct{}) (chosen int, sent bool) {
	switch x := len(chans); {
	// ... cases ...
	default:
		return SendN(chans, v, cancel)
	}
}

func Recv[T ~[]C, C ~(<-chan E), E any](chans T, cancel <-chan struct{}) (v E, chosen int, received bool) {
	switch x := len(chans); {
	// ... cases ...
	default:
		return RecvN(chans, cancel)
	}
}

Sendを何度も実行し、成功するたびに送受信できたchannelをnilにフォールバックすれば「同じ値をすべてのchannelに1度ずつ送る」が実現できますね。
つまり以下のような感じです。

(サンプルはランダムな値を送りたかったのでEの代わりにfunc() Eを渡せるようにしています)

func SendEach[T ~[]C, C ~(chan<- E), E any](chans T, fn func() E, cancel <-chan struct{}) (sent []int, completed bool) {
	chans = slices.Clone(chans)
	sent = make([]int, 0, len(chans))
	completed = true

	for len(chans) != len(sent) {
		chosen, ok := Send(chans, fn(), cancel)
		if !ok {
			completed = false
			break
		}
		sent = append(sent, chosen)
		chans[chosen] = nil
	}
	return
}

実装物は以下に置いてあります

https://github.com/ngicks/go-basics-example/tree/main/snipet/chan-one-of

一応動いています。(リンク先はlocal-onlyモジュールなのでimportできません。単なるサンプルです。)

現実的にfan-in / fan-outを実装するときにこういうのは使いますかね?
SendEachの使い道は筆者はぱっと思いつかなかったです。なので対象読者にとっても向こう数年は不要かもしれません。
タイミングが重要なライブラリはたびたび複数のチャネルの中からどれか1つだけがreceiveするのをテストしたくなる時がありますので、ごくまれに何かの使いどころがあるかも。

多分reflectを使うとオーバーヘッドがかかるのでlen(chans)<=16みたいな適当な小さい数までは固定数版に分岐する処理が妥当だと思って実装してみましたが、これが本当にいいことなのかはよくわかっていない(ベンチをとっていない)ので参考までに、という感じです。

context.Context

https://pkg.go.dev/context@go1.22.3

context.Contextはhttp requestのrequest-scopeのようなスコープに紐づく情報を運ぶ手段を提供するinterfaceです。

  • Valuecontext.Contextに収められた任意の値を取り出すことができます
    • context.WithValueで値を収めることができます。
    • ただこれらのWith*関数はcontext.Contextを入れ子にして情報を付け足しながら新しいcontext.Context返すので、上位のスコープにValueを伝搬したい場合は収めた値自体に工夫が必要です。
      • e.g. 上位で*sync.Mapを収めて下層で操作してもらう。
  • Done<-chan struct{}を返します。context.Contextがcancelされたときこのchannelはcloseされます。
  • cancelされた場合Errがnon-nil errorを返すようになります。
  • context.Causecontext.WithCancelCauseなどで返されたcontext.CancelCauseFuncに渡されたerrorを取り出せます。

処理時間の長くかかる関数/メソッドはとりあえず第一引数にcontext.Contextを受けとって起き、ctx.Done()や、ctx.Err()によってcancellationされた時を検知して即座にreturnできるようにするとよいでしょう。

func longLoongJob(ctx context.Context) error {
	// ... do something ...

	timer := time.NewTimer(dur)
	select {
	// ctx.Done()はcancelされるとcloseされる。
	case <-ctx.Done():
		return ctx.Err()
	case <-timer.C:
	}

	// ctx.Err()はcancelされるとnon-nil
	if err := ctx.Err(); err != nil {
		return err
	}

	// ... do something ...
	return nil
}

そもそも大抵の長い処理を行う関数はcontext.Contextを第一引数で受け取るようになっていると思うので、そのうち自然とこの様式がわかるようになると思います。

context.WithCancelのドキュメントに以下のようにありますが

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

入れ子にするだけなのに何をリリースするねん?と思うかもしれませんが、親contextのcancelを伝搬するために親contextが既知の(≒contextパッケージで実装された)ものでないとき、新しいgoroutineの中で親contextのcancelをDoneで監視するので、cancelはこのgoroutineのような暗黙的に確保されたリソースをリリースされます。

sync

https://pkg.go.dev/sync@go1.22.3
https://pkg.go.dev/sync/atomic@go1.22.3

syncおよびsync/atomicパッケージではsynchronization primitiveが実装されています。
Goにはchannelという第一級市民が存在していますが、mutexatomicを使うほうが問題をシンプルに解決できるときがたびたびあります。

おそらく対象読者にはmutexatomic自体がなじみない概念だと思います。

mutex(MUTual EXclusion=相互排他)はあるコードパス(よくcritical sectionと呼ばれる)を実行しているthread of executionが一つであることを保証する機能のことを指します。
前述のとおり、複数のthreadが同じ変数がアクセスし、少なくとも一つがwriteである場合data raceを起こします。一般的なmutexはこれらのリソースへのアクセスをたった1つのプロセス、あるいはスレッドに制限することでdata raceが起きるのを防ぐことができます。

mutexは大抵LockUnlockができるインターフェイスを備え、Lockによってコードパスをロックします。Lockを保持している状態で他のthreadLockを呼び出すと、Lockを保持しているthreadUnlockを呼び出すまでそのthreadはブロックしつづけます。

atomicはCPUに備わった命令(x86ではLOCK prefix)を利用してある変数の観測と変更をほかのスレッドにわりこまれないようにするものです。mutexより粒度が小さくできることが限られています。mutexの実装にも使われています。

Node.jsもといjavascriptはスクリプトの実行はシングルスレッドですし、pythonにはGILがあるのでpythonコードはシリアライズされて実行されます。
(ただしpythonにおいてはPEP703が実装されるとデフォルトがGIL無効になるかもしれないが)
single threadedな両者でも時にconcurrentなリソースへアクセスする際に制限をする必要があることがあるので、async-mutexなどを利用した対象読者も多いかもしれません。

mutexatomic変数以外のsynchronization primitiveはmutexatomicを使って実装されています。

sync/atomicの概要

sync/atomicで各種atomicな変数の操作が実装されています。各int/uint variantに対してAdd*, CompareAndSwap*, Load*, Store*, Swap*が実装されています。
さらにGo1.19より便利なatomic typeが実装されています。

基本はtypeを使っておくほうが良いと思います。
なお見て分かると思いますが、Add*はあってもSub*はないので引き算が必要ならばUint*ではなくInt*を使います。

大体はmutexなしでconcurrent-safeに変数を書き換えるのに使います。

var working atomic.Bool
if !working.CompareAndSwap(false, true) { // oldがfalseだった場合のみtrueに置き換える。
	// swapできなかった=oldがtrueだった
	return ErrAlreadyWorking
}
defer working.Store(false)
// ... work ...

TODO: add progress reader example

余談ですが、C++やRustはatomic accessのorderingを複数選択可能です(The Rustonomicon::atomics)が、Goはsequentially consistentしかありません

syncの概要

syncGoのstdが提供するツールキットの中でもとりわけ重要できちんと理解しておく必要があります。
以下で現時点(=Go1.22.3)でexportされている関数や型を適当な順番でざっくり説明します。
詳細は上記ドキュメントを見てください。

  • Mutex
    • mutexです
    • 見た感じ何度かspinlockを試みるので呼び出しがFIFOになるとは限らないっぽいです。
  • RWMutex
    • 複数のReadLock、もしくは1つのWriteLockを同時にとることができます。
    • Readerが多くてWriteする頻度が低い時はこちらを用いるとよいのでしょう。
  • Once
    • 1度だけ実行されるのを保証するprimitiveです
    • 同時にDoを呼ばれると、二つ目以降の呼び出しは一つ目の呼び出しが終了するまで待ちます。
    • パッケージやstructの遅延初期化によく使います。
    • Doに渡す関数はコードパスごとに別のものでよいですが、Doが実行されるのはOnceに対して1度きりです。
    • 下記Once*Go1.21.0より追加されたため、これが直接使われるとき自動的により複雑なユースケースを実現しようとしていると思われるようになりました。
      • つまり、コードパスごとにDoに渡す関数が別のものじゃないとOnceを直接使う理由があんまりないです。
      • e.g. context.AfterFuncここここ
  • OnceFunc (Go1.21.0より)
    • Onceのラッパーで典型的な使い道の1つである「特定の関数を1度だけ実行する」というのができます。
    • 逆にOnceを直接使ってる場合自動的にもっと複雑なユースケースなのかと思われるようになりました。
    • 渡されたfunc()panicした際OnceFuncから返される関数の呼び出しすべてにpanicが伝搬するように実装されています。
  • OnceValue(Go1.21.0より)
    • OnceFuncと同様ですが渡す関数が返り値を1つもてます。
  • OnceValues(Go1.21.0より)
    • OnceFuncと同様ですが渡す関数が返り値を2つもてます。
    • おそらくエラーしうる初期化に用いるためにあるのだと思われます。
  • WaitGroup
    • アトミックにincrement/decrementできるカウンターで、Waitでカウンターが0になるまで待つことができます。
    • Addでn個increment、Doneで1つdecrementします
    • goroutineが終了するのを待つのによく使います。
      • もちろんそれ以外のことに使ってもよいです。あくまでカウンターです。
    • 基本的にDonedefer wg.Done()で呼び出したほうがよいでしょう
      • goroutineで呼び出す関数がpanicしたり、runtime.Goexitを読んだとしてもDoneが呼び出せるからです。
      • panic時にrecoverする気がないならdeferじゃなくてもいいです。
    • Waitを呼ぶとカウントが0になるまでそのgoroutineブロックします。すでに0ならすぐアンブロックします。
    • これまでのコードサンプルは特に何も断らずにこれを使ってきているので使い方はきっともうわかっていることでしょう。
  • Pool
    • 初期化にそれなりにコストを伴なったり実行中にallocationが起きる一時的なオブジェクトをためておくプールです
    • 典型的ユースケースは後述
  • Map
    • concurrent-safeなmap[any]anyみたいなものです
    • key, valueともany型ですがkeyはcomparableである必要があります。
    • map[T]Uは実装上keyごとのlockを行うようなことはできないため、ランダムなkeyにconcurrentにアクセスする場合は適しているとドキュメントされています。
    • 典型的なユースケースは後述
    • Go1.20.0より追加された(*sync.Map).CompareAndDelete(*sync.Map).CompareAndSwapを利用する際にはvalueの型もcomparableである必要があります。
  • Cond
    • ある状態を監視する複数のgoroutineに対して、状態の変更の通知を効率的に行う仕組み・・・というべきなんでしょうか?
    • 便利な使い道はあるんですがいつ使うべきか説明しにくい機能です。
    • もしかしたら対象読者は向こう数年使わないかもしれません。
    • こういう概念自体はPOSIX APIにもあって広く認知されています
    • 一応使用例を後述します

sync.Poolの典型的ユースケース: buf pool

sync.Poolの典型的ユースケースにbuf poolがあります。

大きな[]byte*bytes.Bufferが必要な関数を何度も実行する場合はpoolを使ってallocationの負荷を低減(あるいはzero allocationに)できます。
前述通りgoroutineの初期スタックサイズは2-6KiBなので大きなbyte array[N]byteを宣言してしまうとおそらくスタック成長が起きてパフォーマンスが落ちるはずなので、[]byteのallocation自体は避けられません。

よくio.CopyBufferと一緒に使います。
io.Copyは引数のsrcio.WriterTodstio.ReaderFromをそれぞれどちらも実装しない場合、実行のたびに32KiB(srcio.LimitedReaderである場合はsrc.N)の[]byteをallocateしてしまうので、普通はio.CopyBufferを使うほうがよいでしょう。

playground(full)

// 8KiBにしているのは単なるサンプルで、
// ファイルを読み書きするなら64KiB、
// io.CopyBuffer向けに使うなら32KiBとかの適当な値に増やしたほうがよいでしょうね。
const bufSize = 8 * 1024

var bytesPool = &sync.Pool{
	New: func() any {
		b := make([]byte, bufSize, bufSize)
		return &b
	},
}

func getBytes() *[]byte {
	return bytesPool.Get().(*[]byte)
}

func putBytes(b *[]byte) {
	if b == nil || len(*b) != bufSize || cap(*b) != bufSize {
		// reject grown / shrunk
		return
	}
	bytesPool.Put(b)
}
var bufPool = &sync.Pool{
	New: func() any {
		return new(bytes.Buffer)
	},
}

func getBuf() *bytes.Buffer {
	return bufPool.Get().(*bytes.Buffer)
}

func putBuf(b *bytes.Buffer) {
	if b.Cap() > 64*1024 {
		// See https://golang.org/issue/23199
		return
	}
	b.Reset()
	bufPool.Put(b)
}

コメントでリンクされている通り、大体同じサイズの要素をプールしないと効率が悪いので大きすぎるものはPutしないようにしてあります。
fmtのプールも同じことをしていますね。

https://github.com/golang/go/blob/go1.22.3/src/fmt/print.go#L160-L182

bytesPoolが返す[]byteはappendを呼ばれるとcap(s)を超えてしまうので、
新しいarrayがallocateされてしまいます。
なので、appendが必要なsliceのプーリングはもう少し考慮が必要です。

具体的には以下のような感じで、

https://github.com/golang/go/blob/go1.22.3/src/crypto/tls/conn.go#L985-L995

Putする前にgrowしたスライスをescape済みのポインターに再代入するとallocationの頻発を防げます。

sync.Mapの典型的ユースケース: キャッシュ

sync.Mapの想定される典型的ユースケースにcacheがあります

https://pkg.go.dev/sync@go1.22.3#Map

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

これの(1)のほうですね。

実際std内でもcacheとして使われています。

実装例としてimage loaderを作って置きます。"image" dirの下のディレクトリにあるnameの画像を.pngと思ってロードしてimage.Imageとして返します。
sync.Mapを使ってconcurrent-safeなキャッシュを実装できます。

この例ではエラーも永続化してしまいます。実用するならfs.ErrNotExist以外のエラー時はキーを消すとか必要なんですがexampleなので深く考ないことにします。
実際には何かしらのLRU,LFU実装(max-ageをサポートしているとなおよい)をキャッシュとしたほうが良いかもしれません。

var cache sync.Map

func loadImage(name string) (image.Image, error) {
	v, ok := cache.Load(name)
	if !ok {
		v, _ = cache.LoadOrStore(
			name,
			sync.OnceValues(func() (image.Image, error) {
				f, err := os.Open(filepath.Join("image", name))
				if err != nil {
					return nil, err
				}
				return png.Decode(f)
			}),
		)
	}
	return v.(func() (image.Image, error))()
}

sync.Condの基本的な使い方

sync.Condは筆者的にはわかりにくかったので基本的な使い方から入ります。

c.L.Lock()
defer c.L.Unlock()
for !condition(someVar) {
	c.Wait()
}
// do some task using someVar...

これが基本的な使い方になります。

c.L.Lockで保護された何かしらのリソースがconditionを満たすまで待ち、その後そのリソースを使う、という感じです。

Waitは別のgoroutineからBroadcastもしくは、Signalが呼ばれるまでブロックします。Broadcast/Signalの言い回しからわかる通り、Waitでブロックしているすべてのgoroutineあるいは単一のgoroutineをそれぞれアンブロックします。

これがなかなか筆者にはピンときませんでした。

Waitをインライン展開すると、

c.L.Lock()
defer c.L.Unlock()
for !condition(someVar) {
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

となります。

runtime_notifyListWaitがランタイムによりブロックされるWaitの本体のようなロジックです。見てのとおり、c.L.Unlockが呼ばれるので、Wait中はロックは解除されています。
runtime_notifyListAddc.notify.waiterカウンターを1つインクリメントして、-1して(=つまりインクリメントする前の数値)返します。
c.Broadcastc.Signalc.notify.notifyをインクリメントし、t < c.notify.notifyになるまで待つ、という感じっぽいです。

runtime_notifyListWaitがアンブロックしてからc.L.Lock()が取れるまでに発生したBroadcastはとり逃してしまうのでここにrace conditionがあります。

Example: Cond Wait

sync.Condの利用例を以下のように実装します。

pthread_cond_init(3p)のExmpalesセクションで紹介されているものと似ています。
しかしこれを典型と言い切っていいのかはよくわかりません。もっといろいろな使い方ができますからね。

状態を変数として持つcondWorkerdoメソッドで特定のstate doIfの時のみアクションf func()を実行しますが、それ以外にもwaitIfにマッチする場合はsync.Condを利用してその状態になるか、もしくは別の状態になるまで待ちます。waitIfがマッチするstateは十分に短い時間で別のstateに遷移することがよく知られています。
こういうケースではchanmutexを使うよりシンプルに問題を解けているはず・・・たぶん。

実際のコードではchangeStateメソッドではなく*condWorker自体がいろいろな仕事を自らするなり、メソッド呼び出しで依頼されるなりすることで内部のstateが切り替わるはずです。
何かの仕事doを行う際に、それをしてもよい状態のみならず、待つこと許される状態というのを定義できると便利だと思います。

ただしsync.CondWaitしている間context.Contextのcancellationを受けとるなどできませんから、sync.Mutexと同じで場合によって長い時間ブロックすることもあり得ます。
そのためロック期間を十分認識した設計が必要です。

playground

package main

import (
	"errors"
	"fmt"
	"slices"
	"sync"
	"time"
)

var (
	ErrNotEligibleState = errors.New("not eligible state")
)

type state int

const (
	stateA state = iota + 1
	stateB
	stateC
)

type condWorker struct {
	s    state
	cond *sync.Cond
}

func newCondWorker() *condWorker {
	return &condWorker{
		s:    stateA,
		cond: sync.NewCond(&sync.Mutex{}),
	}
}

func (w *condWorker) changeState(s state) {
	w.cond.L.Lock()
	defer w.cond.L.Unlock()
	w.cond.Broadcast()
	w.s = s
}

func (w *condWorker) do(doIf state, waitIf func(state) bool, f func()) error {
	w.cond.L.Lock()
	defer w.cond.L.Unlock()
	if w.s == doIf {
		f()
		return nil
	}
	if waitIf == nil || !waitIf(w.s) {
		return ErrNotEligibleState
	}
	for {
		w.cond.Wait() // lock is freed while being blocked on Wait
		// lock is now held.
		if w.s == doIf {
			break
		}
		if !waitIf(w.s) {
			return ErrNotEligibleState
		}
	}
	f()
	return nil
}

type varSet struct {
	doIf   state
	waitIf []state
}

func main() {
	w := newCondWorker()
	sChan := make(chan state)
	doChan := make(chan varSet)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for s := range sChan {
			fmt.Printf("changing state to %d\n", s)
			w.changeState(s)
			fmt.Printf("changed state to %d\n", s)
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for v := range doChan {
			fmt.Printf("doing if %d, would wait if s is one of %v\n", v.doIf, v.waitIf)
			err := w.do(
				v.doIf,
				func(s state) bool { return slices.Contains(v.waitIf, s) },
				func() { fmt.Println("...working...") },
			)
			fmt.Printf("done with %v\n", err)
		}
	}()
	doChan <- varSet{doIf: stateA}
	/*
		doing if 1, would wait if s is one of []
		...working...
		done with <nil>
	*/
	doChan <- varSet{doIf: stateB}
	/*
		doing if 2, would wait if s is one of []
		done with not eligible state
	*/
	doChan <- varSet{doIf: stateB, waitIf: []state{stateA}}
	/*
		doing if 2, would wait if s is one of [1]
	*/
	fmt.Println("sleeping...")
	time.Sleep(time.Millisecond)
	fmt.Println("woke up!")
	sChan <- stateB
	/*
		changing state to 2
		changed state to 2
		...working...
		done with <nil>
	*/
	doChan <- varSet{doIf: stateC, waitIf: []state{stateB}}
	/*
		doing if 3, would wait if s is one of [2]
	*/
	fmt.Println("sleeping...")
	time.Sleep(time.Millisecond)
	fmt.Println("woke up!")
	sChan <- stateA
	/*
		changing state to 1
		changed state to 1
		done with not eligible state
	*/
	close(sChan)
	close(doChan)
	wg.Wait()
}

sub-repositoryのsync

https://pkg.go.dev/golang.org/x/sync

sub-repositoryにもsyncが実装されています。

errgroup.Group

https://pkg.go.dev/golang.org/x/sync@v0.7.0/errgroup#Group

errgroup.Groupはざっくりいうと以下のパターンをライブラリ化したものです。

(何度も書きますがGo1.22.0以降でないと動かきません)

snippet

func tasksNaiveGroup(ctx context.Context, tasks []task, work func(ctx context.Context, t task) error) error {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancelCause(ctx)
	defer cancel(nil)

	sem := make(chan struct{}, 15)

	wg.Add(len(tasks))
	for _, t := range tasks {
		sem <- struct{}{}
		go func() {
			defer func() {
				wg.Done()
				<-sem
			}()
			e := work(ctx, t)
			if e != nil {
				cancel(e)
			}
		}()
	}
	wg.Wait()
	return context.Cause(ctx)
}

これをerrgroup.Groupを使うように変更すると以下のようになります。

func tasksErrgroup(ctx context.Context, tasks []task, work func(ctx context.Context, t task) error) error {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(15)
	for _, t := range tasks {
		g.Go(func() error {
			return work(ctx, t)
		})
	}
	return g.Wait()
}
  • (*errgroup.Group).Goで任意の関数を新しいgoroutineの中で実行できます。
  • (*errgroup.Group).SetLimitで同時に実行するgoroutineの数を制限でき、
  • (*errgroup.Group).Waitで実行されたすべての関数が終わるまで待ち、
    • Goに渡された関数が最初に返したnon-nil errorを返します。
  • errgroup.WithContext*errgroup.Groupと親contextを受け継いだcontext.Contextを返します
    • このcontext.ContextGoに渡された関数のがnon-nilエラーを返すとそのエラーでcancelされます。
    • これによってGoに渡す関数をキャンセルできるようにするとエラーで全体を中断できます。

めちゃ便利ですね。これはすごく多用します。
しいて言えばGoに渡す関数がpanicしたときの手当てが特にまだ実装されていない(v0.7.0時点)ので、渡す関数の中でrecoverしてg.Goを呼び出しているgoroutinere-panicするように呼び出し側で工夫する必要があります。

以下のような感じでしょうか

func tasksErrgroupRepanic(ctx context.Context, tasks []task, work func(ctx context.Context, t task) error) error {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(15)
	var (
		panicOnce sync.Once
		panicked  any
	)
	for _, t := range tasks {
		g.Go(func() error {
			var err error
			defer func() {
				rec := recover()
				if rec == nil {
					return
				}
				var set bool
				panicOnce.Do(func() {
					set = true
					panicked = rec
				})
				if set {
					err = fmt.Errorf("panicked: %v", rec)
				}
			}()
			err = work(ctx, t)
			return err
		})
	}
	err := g.Wait()
	if panicked != nil {
		panic(panicked)
	}
	return err
}

panicよる強制終了は他のgoroutineのdeferを実行せずに終わってしまいます。
panicは伝搬させられるならなるだけしたほうが良いと思います。
とはいえ、筆者の体感上panicはほとんど遭遇しないので困る機会は少ないかもしれません。

semaphore.Weighted

https://pkg.go.dev/golang.org/x/sync@v0.7.0/semaphore#Weighted

token量で制限するsemaphoreです。
対象読者にはsemaphoreという概念になじみないでしょうか?semaphoreは一般的にmutexと同じくリソースにアクセスできるthreadの数を制限するものです。mutexがロック区間にたった一つのthreadしかアクセスできないようにするのに対し、counting semaphoreは任意の数nまでに制限します。

semaphore.Weightedweighted semaphoreなので任意のtoken量nに対して、(*semaphore.Weighted).Acquireは任意のmを取得します。もし現在m個のtokenが利用可能でなければ、利用可能になるまでブロックし続けます。

singleflight.Group

https://pkg.go.dev/golang.org/x/sync@v0.7.0/singleflight#Group

(*singleflight.Group).Doにkeyと関数を受け取り実行します。
同じkeyに対して同時に複数の呼び出しがあった場合、2つ目以降の呼び出しは実際に関数を実行せずに1つ目の実行の結果を待って同じ結果をえます。

keyの型がstringだけなので、重い関数の呼び出しパラメータをkeyにしたい場合一旦パラメータをシリアライズする必要があります。
json.Marshalはほとんどのケースでstableな結果を返すので普通はjson.Marshalで必要十分であると思われます。

json.Marshalがstableじゃないときはいつか

筆者が思いつくのは以下の時

  • MarshalJSON() ([]byte, error)を実装している型が含まれ、その実装がstableでない
  • nil mapmap[T]U{}が意味的に同じ(≒遅延初期化される)とき
    • nil mapnullが出力される
    • assignment to entry in nil mapのpanicがあるのでこれはあんまりないかもしれません。
  • nil slice[]T{}が意味的に同じ(≒遅延初期化される)とき
    • []Tnilのときnullが出力される
    • nil sliceappend, len, capすべて機能するのでこのケースは多い気がします。
  • Goバージョンをまたいだことでjson.Marshalがunicode sequenceにescapeする文字種が変わったとき
    • 確か1度はescapeする文字種が変わっていたと思います。確認取れたらこの行は編集されます。
    • 別システムから来たjsonを未編集で使ったり、ファイルにキャッシュしている場合などで起きます。
      • つまり普通は起きません。

for k, v := range map[T]U{} {}したとき順序がランダムになるのはGoを書いているならばよくご存じでしょうが、json.Marshalはmapのkeyをすべて取ってsortする処理が挟まるので、ここに関してはstableな出力を得られます。

使用例としてはSNMPなどでUDPのbroadcastを送信してn秒待つような検索処理はsingleflightで処理できるとよいでしょう。というか筆者はNode.jsでそういうものを実装したことがあります。Node.jsではstdに近いライブラリにsingleflightに当たるものがなくて困ったことがあります。

signal handling

サーバープログラムは通常明示的に終了されるまで動作し続けます。
終了は大抵の場合SIGTERMなどのシグナルによってされることが多いです。

Goはruntimeが起動時にすべてのsignalのsignal handlerをインストールします。
つまり下記の「pid 1だとシグナルのデフォルトアクションによる終了が起きない問題」が起きません。

https://man7.org/linux/man-pages/man2/kill.2.html#NOTES

The only signals that can be sent to process ID 1, the init
process, are those for which init has explicitly installed signal
handlers.

runtimeによってキャッチされたsignalはos/signalパッケージによって受け取ることができます。

https://pkg.go.dev/os/signal@go1.22.3

色々注意点が述べられているので上記ドキュメントは呼んでおくほうがよいでしょう。

(os/signal).Notifyによってsignalをchannelに通知します。
(os/signal).NotifyContextで、signalを受けたときcancelされるcontext.Contextを得られます。

// buffer-sizeはとりあえずn > 0ならなんでもよい。
// channelのバッファが埋まるとsignalは捨てられる(Go1.22.3時点)。
// signalの使い方によっては何度signalされたかも重要なはず。その場合は適当なサイズまで増量する。
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM)

Notify/NotifyContextのvariadicな第二引数で受け取るsignalを指定できますが、ここに何も渡さないとすべてのsignalを受け取れます。

しかし基本的にはこうしません。なぜなら、unixではpreemptiveなスケジューリングのためにSIGURGを使用するので、ノイズとなるsyscall.SIGURGをたくさん受け取ってしまうためです。
とりたいsignalだけを指定するようにするとよいでしょう。

終了を通知されたいだけのケースの場合、とりあえずは以下二つだけを受けとっておけば困らないと思います

おわりに

GitHubで編集を提案

Discussion