Chapter 05

Goで並行処理(応用編)

さき(H.Saki)
さき(H.Saki)
2021.06.18に更新

この章について

ここからは、実際にゴールーチンやチャネルをうまく使うための実践的なノウハウを列挙形式で紹介していきます。

なお、この章に書かれている内容のほとんどが、以下のセッション・本の叩き直しです。必要な方は原本の方も参照ください。

"Share by communicating"思想

異なるゴールーチンで何かデータをやり取り・共有したい場合、とりうる手段としては主に2つあります。

  • チャネルをつかって値を送受信することでやり取りする
  • sync.Mutex等のメモリロックを使って同じメモリを共有する

このどちらをとるべきか、Go言語界隈で有名な格言があります。

Do not communicate by sharing memory; instead, share memory by communicating.
出典:Effective Go

Goのチャネルはもともとゴールーチンセーフ[1]になるように設計されています。

そのため「実装が難しい危険なメモリ共有をするくらいなら、チャネルを使って値をやり取りした方が安全」という考え方をするのです。

Instead of explicitly using locks to mediate access to shared data, Go encourages the use of channels to pass references to data between goroutines.
This approach ensures that only one goroutine has access to the data at a given time.

(訳)共有メモリ上のデータアクセス制御のために明示的なロックを使うよりは、Goではチャネルを使ってゴールーチン間でデータの参照結果をやり取りすることを推奨しています。
このやり方によって、ある時点で多くても1つのゴールーチンだけがデータにアクセスできることが保証されます。

出典:The Go Blog: Share Memory By Communicating

「拘束」

Goによる並行処理本4.1節にて述べられた方法です。

このように、受信専用チャネルを返り値として返す関数を定義します。

func restFunc() <-chan int {
	// 1. チャネルを定義
	result := make(chan int)

	// 2. ゴールーチンを立てて
	go func() {
		defer close(result) // 4. closeするのを忘れずに

		// 3. その中で、resultチャネルに値を送る処理をする
		// (例)
		for i := 0; i < 5; i++ {
			result <- 1
		}
	}()

	// 5. 返り値にresultチャネルを返す
	return result
}

resultチャネル変数が使えるスコープをrestFunc内に留める(=拘束する)ことで、あらぬところから送信が行われないように保護することができ、安全性が高まります。

select文

言語仕様書では、select文はこのように定義されています。

A "select" statement chooses which of a set of possible send or receive operations will proceed.
(訳)select文は、送受信を実行できるチャネルの中からどれかを選択し実行します。
出典:The Go Programming Language Specification#Select_statements

例えば、以下のようなコードを考えます。

gen1, gen2 := make(chan int), make(chan int)

// goルーチンを立てて、gen1やgen2に送信したりする

if n1, ok := <-gen1; ok {
	// 処理1
	fmt.Println(n1)
} else if n2, ok := <-gen2; ok {
	// 処理2
	fmt.Println(n2)
} else {
	// 例外処理
	fmt.Println("neither cannot use")
}

gen1チャネルで受け取れるなら処理1をする、gen2チャネルで受け取れるなら処理2をする、どちらも無理なら例外処理という意図で書いています。

実はこれ、うまく動かずデットロックになることがあります。

fatal error: all goroutines are asleep - deadlock!

どういうときにうまくいかないかというと、一つの例としてgen1に値が何も送信されていないときです。
gen1から何も値を受け取れないときは、その受信側のゴールーチンはブロックされるので、if n1, ok := <-gen1から全く動かなくなります。

デッドロックの危険性を回避しつつ、複数のチャネルを同時に1つのゴールーチン上で扱いたい場合にselect文は威力を発揮します。

select文を使って手直し

select {
case num := <-gen1:  // gen1から受信できるとき
	fmt.Println(num)
case num := <-gen2:  // gen2から受信できるとき
	fmt.Println(num)
default:  // どっちも受信できないとき
	fmt.Println("neither chan cannot use")
}

gen1とgen2がどっちも使えるときは、どちらかがランダムに選ばれます。

書き込みでも同じことができます。

select {
case num := <-gen1:  // gen1から受信できるとき
	fmt.Println(num)
case channel<-1: // channelに送信できるとき
	fmt.Println("write channel to 1")
default:  // どっちも受信できないとき
	fmt.Println("neither chan cannot use")
}

バッファありチャネルはセマフォの役割

「バッファなしチャネルが同期の役割を果たす」ということを前述しましたが、じゃあバッファありは何なんだ?と思う方もいるでしょう。
これもEffective Goの中で言及があります。

A buffered channel can be used like a semaphore.
(訳)バッファありチャネルはセマフォのように使うことができます。
出典:Effective Go

具体例

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // Wait for active queue to drain.
    process(r)  // May take a long time.
    <-sem       // Done; enable next request to run.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Don't wait for handle to finish.
    }
}

ここで Serveでやっているのは「queueチャネルからリクエストを受け取って、それをhandleする」ということです。
ですが、このままだと際限なくhandle関数を実行するゴールーチンが立ち上がってしまいます。それをセマフォとして制御するのがバッファありのsemチャネルです。

handle関数の中で、

  • リクエストを受け取ったらsemに値を1つ送信
  • リクエストを処理し終えたらsemから値を1つ受信

という操作をしています。
もしもsemチャネルがいっぱいになったら、sem <- 1の実行がブロックされます。そのため、semチャネルの最大バッファ数以上のゴールーチンが立ち上がることを防いでいます。

メインルーチンからサブルーチンを停止させる

状況

例えば、以下のようなジェネレータを考えます。

func generator() <-chan int {
	result := make(chan int)
	go func() {
		defer close(result)
		for {
			result <- 1
		}
	}()
	return result
}

int型の1を永遠に送るジェネレータです。これをmain関数で5回使うとしたらこうなります。

func main() {
	result := generator()
	for i := 0; i < 5; i++ {
		fmt.Println(<-result)
	}
}

5回使ったあとは、もうこのジェネレータは不要です。別のゴールーチン上にあるジェネレータを止めるにはどうしたらいいでしょうか。

解決策

ここでもチャネルの出番です。doneチャネルを作って、「メインからサブに止めてという情報を送る」ようにしてやればいいのです。

- func generator() <-chan int {
+ func generator(done chan struct{}) <-chan int {
	result := make(chan int)
	go func() {
		defer close(result)
+	LOOP:
		for {
-			result <- 1           

+			select {
+			case <-done:
+				break LOOP
+			case result <- 1:
+			}
		}
	}()
	return result
}

func main() {
+	done := make(chan struct{})

-	result := generator()
+	result := generator(done)
	for i := 0; i < 5; i++ {
		fmt.Println(<-result)
	}
+	close(done)
}

select文は、doneチャネルがcloseされたことを感知してbreak LOOPを実行します。
こうすることで、サブルーチン内で実行されているfunc generator関数を確実に終わらせることができます。

FanIn

複数個あるチャネルから受信した値を、1つの受信用チャネルの中にまとめる方法をFanInといいます。

基本(Google I/O 2012 ver.)

まとめたいチャネルの数が固定の場合は、select文を使って簡単に実装できます。

func fanIn1(done chan struct{}, c1, c2 <-chan int) <-chan int {
	result := make(chan int)

	go func() {
		defer fmt.Println("closed fanin")
		defer close(result)
		for {
			// caseはfor文で回せないので(=可変長は無理)
			// 統合元のチャネルがスライスでくるとかだとこれはできない
			// →応用編に続く
			select {
			case <-done:
				fmt.Println("done")
				return
			case num := <-c1:
				fmt.Println("send 1")
				result <- num
			case num := <-c2:
				fmt.Println("send 2")
				result <- num
			default:
				fmt.Println("continue")
				continue
			}
		}
	}()

	return result
}

このFanInを使用例は、例えばこんな感じになります。

func main() {
	done := make(chan struct{})

	gen1 := generator(done, 1) // int 1をひたすら送信するチャネル(doneで止める)
	gen2 := generator(done, 2) // int 2をひたすら送信するチャネル(doneで止める)

	result := fanIn1(done, gen1, gen2) // 1か2を受け取り続けるチャネル
	for i := 0; i < 5; i++ {
		<-result
	}
	close(done)
	fmt.Println("main close done")

	// これを使って、main関数でcloseしている間に送信された値を受信しないと
	// チャネルがブロックされてしまってゴールーチンリークになってしまう恐れがある
	for {
		if _, ok := <-result; !ok {
			break
		}
	}
}

応用(並行処理本ver.)

FanInでまとめたいチャネル群が可変長変数やスライスで与えられている場合は、select文を直接使用することができません。
このような場合でも動くようなFanInが、並行処理本の中にあったので紹介します。

func fanIn2(done chan struct{}, cs ...<-chan int) <-chan int {
	result := make(chan int)

	var wg sync.WaitGroup
	wg.Add(len(cs))

	for i, c := range cs {
		// FanInの対象になるチャネルごとに
		// 個別にゴールーチンを立てちゃう
		go func(c <-chan int, i int) {
			defer wg.Done()

			for num := range c {
				select {
				case <-done:
					fmt.Println("wg.Done", i)
					return
				case result <- num:
					fmt.Println("send", i)
				}
			}
		}(c, i)
	}

	go func() {
		// selectでdoneが閉じられるのを待つと、
		// 個別に立てた全てのゴールーチンを終了できる保証がない
		wg.Wait()
		fmt.Println("closing fanin")
		close(result)
	}()

	return result
}

タイムアウトの実装

処理のタイムアウトを、select文とチャネルを使ってスマートに実装することができます。

Google I/O 2012 - Go Concurrency Patternsの23:22で述べられていた方法です。

time.Afterの利用

time.After関数は、引数d時間経ったら値を送信するチャネルを返す関数です。

func After(d Duration) <-chan Time

出典:pkg.go.dev - time#After

一定時間selectできなかったらタイムアウト

例えば、「1秒以内にselectできるならずっとそうする、できなかったらタイムアウト」とするには、time.After関数を用いて以下のようにします。

for {
		select {
		case s := <-ch1:
			fmt.Println(s)
		case <-time.After(1 * time.Second): // ch1が受信できないまま1秒で発動
			fmt.Println("time out")
			return
		/*
		// これがあると無限ループする
		default:
			fmt.Println("default")
			time.Sleep(time.Millisecond * 100)
		*/
		}
	}

タイムアウトのタイミングはtime.Afterが呼ばれた場所から計測されます。
今回の例だと、「select文にたどり着いてから1秒経ったらタイムアウト」という挙動になります。

time.After関数を呼ぶタイミングを工夫することで、異なる動きをさせることもできます。

一定時間selectし続けるようにする

例えば「select文を実行し続けるのを1秒間行う」という挙動を作りたければ、select文を囲っているfor文の外でtime.Afterを呼べば実現できます。

timeout := time.After(1 * time.Second)

// このforループを1秒間ずっと実行し続ける
for {
	select {
	case s := <-ch1:
		fmt.Println(s)
	case <-timeout:
		fmt.Println("time out")
		return
	default:
		fmt.Println("default")
		time.Sleep(time.Millisecond * 100)
	}
}

time.NewTimerの利用

time.NewTimer関数でも同様のタイムアウトが実装できます。

// チャネルを内包する構造体
type Timer struct {
	C <-chan Time
	// contains filtered or unexported fields
}

func NewTimer(d Duration) *Timer

出典:pkg.go.dev - time#NewTimer

一定時間selectできなかったらタイムアウト

select文に入ってから1秒でタイムアウト」という挙動をtime.NewTimer関数で実装すると、このようになります。

for {
	t := time.NewTimer(1 * time.Second)
	defer t.Stop()

	select {
	case s := <-ch1:
		fmt.Println(s)
	case <-t.C:
		fmt.Println("time out")
		return
	}
}

一定時間selectし続けるようにする

「for文全体で1秒」という挙動は、time.NewTimer関数を使うとこのように書き換えられます。

t := time.NewTimer(1 * time.Second)
defer t.Stop()

for {
	select {
	case s := <-ch1:
		fmt.Println(s)
	case <-t.C:
		fmt.Println("time out")
		return
	default:
		fmt.Println("default")
		time.Sleep(time.Millisecond * 100)
	}
}

time.Afterとtime.NewTimerの使い分け

time.Aftertime.NewTimer、どちらを使うべきかについては、time.After関数のドキュメントにこのように記載されています。

It is equivalent to NewTimer(d).C.
The underlying Timer is not recovered by the garbage collector until the timer fires.
If efficiency is a concern, use NewTimer instead and call Timer.Stop if the timer is no longer needed.

(訳)time.After(d)で得られるものはNewTimer(d).Cと同じです。
内包されているタイマーは、作動されるまでガベージコレクトによって回収されることはありません。
効率を重視する場合、time.NewTimerの方を使い、タイマーが不要になったタイミングでStopメソッドを呼んでください。

出典:pkg.go.dev - time#After

定期実行の実装

タイムアウトに似たものとして、「1秒ごとに定期実行」といった挙動があります。
これもtime.After関数を使って書くこともできます。

for i := 0; i < 5; i++ {
	select {
	case <-time.After(time.Millisecond * 100):
		fmt.Println("tick")
	}
}

ですが前述した通り、time.Afterはガベージコレクトされないので、効率を求める場合にはあまり望ましくない場合があります。

time.NewTimerの類似として、time.NewTickerが定期実行の機能を提供しています。

+t := time.NewTicker(time.Millisecond * 100)
+defer t.Stop()

for i := 0; i < 5; i++ {
	select {
-	case <-time.After(time.Millisecond * 100):
+	case <-t.C:
		fmt.Println("tick")
	}
}

結果のどれかを使う

Go Blogにおいて、"moving on"という名前で紹介されている手法です。

例えば、データベースへのコネクションConnが複数個存在して、その中から得られた結果のうち一番早く返ってきたものを使って処理をしたいという場合があるかと思います。
このような「Connからデータを得る作業を並行に実行させておいて、その中のどれかを採用する」というやり方は、select文をうまく使えば実現することができます。

func Query(conns []Conn, query string) Result {
    ch := make(chan Result, len(conns))
	// connから結果を得る作業を並行実行
    for _, conn := range conns {
        go func(c Conn) {
            select {
            case ch <- c.DoQuery(query):
            default:
            }
        }(conn)
    }
    return <-ch
}

func main() {
	// 一番早くchに送信されたやつだけがここで受け取ることができる
	result := Query(conns, query)
	fmt.Println(result)
}

次章予告

ここまでで「Goのコードの中で、ゴールーチンやチャネルといった並行処理機構をどのように有効活用するか」ということについて触れてきました。

次章からは焦点を「Goコード」から「Goランタイム」に移して、「並行処理を実現するために、Goではどのようなランタイム処理を行っているのか」という内容について説明していきます。
次章は、その事柄の基礎となる用語解説を行います。

脚注
  1. 異なるゴールーチン間での排他処理を意識しなくてよい、ということです。 ↩︎