Closed23

Goで並行処理を作る方法メモ

まきぞうまきぞう

「初めてのGo言語」、公式ドキュメントを軸に読んでいく。

まきぞうまきぞう

バッファってなんなん?

一時的にデータを保管するメモリ領域。

CPUによって実行されるプログラムは全て同じわけではないので、処理の速度に差が出たりすると。
そんな時に早く出た方の計算結果をバッファに貯めておくことで、CPUは次の命令を実行することができるようになるのか。

まきぞうまきぞう

「並行性と並列性は違う」

並列とは? → 独立したプロセッサ上で複数の命令が実行される性質
並行とは? → プロセッサのリソースを共有して同時に複数の命令をこなす性質

まきぞうまきぞう

複数のプロセッサを使用し、それぞれのプロセッサを最大限活用した状態が「並行性を持ったコードが並列に実行される」という意味かな?

まきぞうまきぞう

プログラムのステップ

  1. データを取り出す
  2. 変換
  3. 結果を出力する

並行性を利用すべきかは、各ステップでデータがどう流れるかに依存する

例)

  • フィボナッチ数列の計算: 結果を取得するためには、前の結果が必要になる。
    • 第n項における値を取得するために、1~n項を並行して処理することはできない
  • 複数のスレッドが同じデータに頻繁にアクセスするケース
    • 銀行の残高更新
    • 他のスレッドがデータを編集しているときに読み込んだり、割り込んで編集しようとすると一貫性が保たれない
まきぞうまきぞう

二つの処理が並行して実行できるケース

Aという処理で利用されるデータが、Bという処理では全く使用されないケース

まきぞうまきぞう

処理に長い時間を要さない処理は、並行で処理してもあまり意味はない。
当たり前よねw

まきぞうまきぞう

Goのプログラムが実行されるとき、初めに以下が生成される

  • いくつかのスレッド
  • 一つのゴルーチン

プログラムによっては途中で新しくゴルーチンが生成され、Goのランタイムスケジューラによってスレッドに割り当てられる。

まきぞうまきぞう

OSレベルのスレッドと比較したゴルーチンのメリット

  • ゴルーチンはスレッドではあるが、OSレベルのリソースを生成(CPUとかネットワークとかを割り当てたり)しているわけではないため、その分早い。
  • ゴルーチンはスタックサイズを必要に応じて大きくできるので、空いたメモリ領域を効率的に使える
  • ゴルーチン間のスイッチングはランタイム上で行われるため、高速、かつシステムコール(OSへの呼び出し)を必要としない
  • ゴルーチンのスケジューリングはライタイムのスケジューラが管理する。こいつはネットワーク操作などをブロックするゴルーチンを自動的に検出し、他のゴルーチンにスケジュールを移す。

基本的にはランタイム上で色々できるのとリソースを最小限に抑えられるのでパフォーマンスが良い的なメリットだった。

まきぞうまきぞう

ゴルーチン使い方

定義した関数の呼び出し時にgoというキーワードが置かれていると、ゴルーチンになる。
任意の関数をゴルーチンとして起動することができるが、基本的には「ビジネスロジックをラップするクロージャと共に起動する」のが一般的。

ビジネスロジックをラップするクロージャと共に起動する

一体どういうことか?
実際の処理をクロージャの中に閉じ込めることをいう。

func process(data string) {
    // 何らかのビジネスロジックを実行する
    fmt.Println("Processing:", data)
}

これをクロージャとしてラップしてみると、こうなる。

func main() {
    data := "example data"

    go func(d string) {
        process(d)
    }(data)
    
    // 他の処理
    time.Sleep(time.Second) // ゴルーチンが完了するのを待つために一時停止
}

ビジネスロジックを無名関数(クロージャ)に閉じ込め(ラップし)、それに引数を渡して起動している。

なぜクロージャを作って実行しているの?

並行して実行される処理の状態やリソースの管理がしやすくなるから。(ブックキーピングがしやすくなる)
クロージャを用いて管理しやすくなるものとしては、以下がある。

  • リソース管理
  • 複数のゴルーチンが共有リソースにアクセスする際の競合を防ぐための同期
  • 並行処理中に発生する可能性のあるエラー管理並行処理の進行状況や結果の管理

例えば以下の例では、並行処理の関数の実行が終わるまで待機する関数。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    data := "example data"

    wg.Add(1)
    go func(d string) {
        defer wg.Done()
        process(d)
    }(data)

    // 他の処理
    wg.Wait() // ゴルーチンが完了するのを待つ
}

func process(data string) {
    // 何らかのビジネスロジックを実行する
    fmt.Println("Processing:", data)
}

この例では、sync.WaitGroupを使ってゴルーチンの完了を待つための同期をしている。こうすることで、クロージャによって並行処理の状態を管理している。
また、ビジネスロジックを扱う関数が並行性について意識することなく関数がかけるというのもポイント。

まきぞうまきぞう

チャネル

ゴルーチンでは情報のやり取りにチャネルという型が使われる。

チャネルの作り方

make関数にキーワードchanと型指定する。

channel := make(chan int)

参照型とは?

データではなく、データのメモリアドレスを保持する型のこと。
複数の変数が同じデータを共有でき、データのコピーを作成せずに効率的に操作ができる

  • ポインタ
  • スライス
  • マップ
  • チャネル
  • 関数
  • インターフェース
package main

import "fmt"

func main() {
    arr := [5]int{1, 2, 3, 4, 5}
    s := arr[1:4] // スライスsはarrの一部を参照

    fmt.Println(s) // [2 3 4]
    s[0] = 10
    fmt.Println(arr) // [1 10 3 4 5] スライスを通じて元の配列が変更される
}

スライスを例にとると、main関数では、array自体は変えていないのに、変数に置き換えたsを編集すると、arrayまで変わっている。

読み込み、書き込み、バッファリング

チャネルとやり取りするためには、<-演算子を使う。

チャネルからの読み込みには、変数の左側に<-を置き、チャネルに書き込むにはチャネル変数の右側に<-を書き込む。

// 読み込み: 変数aにchの値を書き込む
a := <- ch

// 書き込み: チャネルに変数bの値を書き込む
ch <- b

チャネルに書き込まれた値は一度だけ読み込むことができる。
同じチャネルから複数のゴルーチンが読み込みを行っている場合、チャネルに書き込まれた値はそのうち一つのゴルーチンからのみ読み込まれる。
一つのゴルーチンが同じチャネルに対して読み込み、書き込み両方を行うのは一般的ではない。
チャネルに記憶されている値を変数あるいは構造体のフィールドに代入したり、その値を関数に渡す際には

ch <- chan int

といった具合で<-演算子をchanの前に置いてその変数などを含むゴルーチンがチャネルの読み込み専用であることを示すことができる。
また、書き込み(送信)専用も同じように以下のような記法で設定できる

ch chan <- int

それを踏まえて、チャネルの読み込み、書き込みの両方ができるゴルーチンを作ってみると、以下のようになる。

func runThingsConcurrently(chIn <- chan int, chOut chan <- string) {
    for val := range chIn {// このforはチャネルがクローズされるまで続き、chInに値が到着するたびに新たにゴルーチンを呼び出す
        go func(val int) {
            result := doBusinessLogic(val)
            resultString := fmt.Sprintf("%d -> %d", val, result)
            chOut <- resultString
        }(val)
    }
}
まきぞうまきぞう

チャネルとバッファリング

バッファリングとは、一時的な情報の格納を行うこと。
チャネルにバッファがあるということは一時的な情報が格納されているということ。

デフォルトでは、チャネルはバッファリングされない。
バッファリングされないと送信側と受信側で下記のことが起こる。
送信側: データを書き込むゴルーチン
受信側: データを読み込むゴルーチン

送信側

送信側は、受信側がチャネルからデータを受け取るまで受け取るまでブロックされる。

受信側

受信側は、送信側がチャネルへデータを書き込むまでブロックされる。

例として以下のコードがある。

package main

import "fmt"

func main() {
	ch := make(chan int)

	go func() {
		ch <- 42
		fmt.Println("Sent 42")
	}()

	value := <-ch
	fmt.Println("Received:", value)
}

このプログラムの結果は、以下のようになる。

Sent 42
Received: 42

順を追ってまとめると

  1. int型のチャネル、chが作られる
  2. ゴルーチン(以下サブゴルーチン)が起動される
  3. チャネルに42という値を書き込むが、読み込みがされていないので、サブゴルーチンの処理はブロックされる
  4. メインゴルーチンにてチャネルの値が読み込まれる
  5. チャネルの値が読み込まれたので、サブゴルーチンの処理が再開され、Sent 42が出力される
  6. 残りのメインゴルーチンのReceived: 42が出力される

という順番になる

複数のゴルーチンが並行していないとバッファリングされていないチャネルが動作しない理由

バッファリングされていないチャネルでは、送信者と受信者の両方が相互に同期する必要がある。
つまり、どちらか一方が他方の処理を待っている必要がある。
そのため、バッファリングされていないチャネルの送信や受信は少なくとも2つのゴルーチンが必要。一方が送信者、もう一方が受信者として機能するからだ。

バッファ付きチャネルではどうなる?

バッファ付きチャネルではチャネルにデータを一時保存できるので、書き込みされても読み込みを待つ必要はないし、読み込みされても書き込みを待つ必要はない。

package main

import "fmt"

func main() {
    ch := make(chan int, 1) // バッファサイズ1のバッファ付きチャネル

    ch <- 42
    fmt.Println("Sent 42")

    value := <-ch
    fmt.Println("Received:", value)
}

結果は変わらないが、以下のようになる

Sent 42
Received: 42

また、バッファにはサイズを指定できる。
バッファサイズがいっぱいになると、それ以降のチャネルへの書き込みは一旦停止され、書き込み側のゴルーチンはチャネルから読み込みされるまでブロックすることになる
同じように、バッファがからのチャネルから読み込みを行おうとするとブロックされる。

ほとんどの場合では、バッファリングされないチャネルを使うべきである

まきぞうまきぞう

チャネルのクローズ

チャネルへの書き込みが終わったら、組み込みの関数closeを使ってチャネルを閉じる。

close(ch)

チャネルがクローズされた後でチャネルの書き込みをしようとしたり、再度クローズしようとしたりするとパニックになる。
しかし、クローズされたチャネルの読み込みは常に成功する。それはクローズされたチャネルがバッファを持っており、まだ読まれていない値がある場合は、その値が順番に返される。
バッファリングされていないチャネル、またはバッファに値が残っていない場合はゼロ値(nil)が返される。
チャネルから読み込みを行う際、以下の2パターンを区別する必要がある。

  • 書き込まれたゼロ値
  • チャネルがクローズされていたために戻されたゼロ値

下記のコードで区別する

v, ok := <- ch

okがtrueであれば、チャネルがオープンな状態なので、vはチャネルchから読み込まれた値が入っている。
okがfalseならチャネルはクローズされている。

チャネルのクローズはチャネルに書き込むゴルーチンにある。
チャネルのクローズはチャネルのクローズを待っているゴルーチンがある時のみ必須である。

まきぞうまきぞう

チャネルの動作

チャネルには「状態」があり、それぞれの状態で

  • 読み込み
  • 書き込み
  • クローズ
    これらの動作をした時の挙動が異なる。
バッファ無 + 開 バッファ無 * 閉 バッファ有 + 開 バッファ有 + 閉 nil
書き込み 何かが読み込まれるまでポーズ ゼロ値を返す バッファがいっぱいならポーズ パニック 無限にハング
読み込み 何かが書き込まれるまでポーズ パニック バッファがからならポーズ バッファに残されている値を返す 無限にハング
クローズ 正常にクローズする パニック 正常にクローズする パニック パニック

基本的には書き込み側のゴルーチンが書き込むものがなくなった時にチャネルをクローズするのが標準的なパターン。

パニックになるパターンは以下の通り

  • 同じチャネルを複数回クローズ
  • 一つのゴルーチン内でチャネルをクローズした後で別のゴルーチンがそのチャネルへ書き込む
まきぞうまきぞう

select

Go言語にはselectという文がある。

並行操作の優先順を解決してくれるものだ。
selectキーワードを使うことで、複数のチャネルに対する読み込みあるいは書き込みの操作が可能になる。

select {
    case v := <- ch1:
        fmt.Println("ch1", v)
    case v := <- ch2:
        fmt.Println("ch2", v)
    case ch3 <- x:
        fmt.Println("ch3への書き込み", x)
    case <- ch4:
        fmt.Println("ch4から値をもらったが、値は無視した")
}

一つのcaseに対して読み込み操作あるいは書き込み操作が可能な場合に、その操作とcaseの本体が実行される。
複数のcaseが読み込み、もしくは書き込み可能なチャネルを持った場合、ランダムに実行される。つまりcaseの検証に実行の順序はない。
switch文では上から各caseを実行していき、最初にtrueを返したcaseのブロックを実行するが、selectは違うっぽい。

これによって、「整合性がない順番でロックを取得すること」を防いでくれるそう。

整合性がない順番でロックを取得すること
これはどういう意味か?
2つのゴルーチンが同じ2つのチャネルにアクセスすると、その場合2つのチャネルは両方のゴルーチンから同じ順番でアクセスしなければ、デッドロックになってしまう。
例えば

  • 一方がチャネルAから送信しようとし、もう一方がチャネルBから受信しようとしている
  • どちらも同じチャネルから送信しようとしている、あるいは同じチャネルから受信しようとしている
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        v := 1
        ch1 <- v
        v2 := <- ch2
        fmt.Println(v, v2)
    }()

    v := 2
    ch2 <- v
    v2 := <- ch1
    fmt.Println(v, v2)
}
$ go run main.go
fatal error: all goroutines are asleep - deadlock!

互いに相手を待つことになり、どちらも前に進めない。
Goアプリケーションのゴルーチンがデッドロック状態になった場合、Goのランタイムがプログラムを強制終了する。

この対処法は、mainゴルーチンのチャネルアクセスをselectで囲むこと。

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        v := 1
        ch1 <- v
        v2 := <- ch2
        fmt.Println(v, v2)
    }()

    v := 2
    var v2 int
    select {
        case ch2 <- v: // 2つ目のcaseが呼び出されたので、こちらは選択されない
        case v2 = <- ch1:// サブゴルーチンによってch1に値が書き込まれたら、v2に読み込む
    }
    fmt.Println(v, v2) // ここが呼び出される
}

結果は

$ go run
2 1

となる。

selectはforに埋め込まれて使われるケースがとても多い。

for {
    select {
        case <- done:
            return
        case v := <- ch:
            fmt.Println(v)
    }
}

よく使われるパターンなので、for-selectループと呼ばれる。
for-selectループを使うときは、ループを抜ける方法を含めなければならない。
実装方法は後述

まきぞうまきぞう

公開する機能のインターフェースに並行性は含めない

並行性というのは、何を実装するかでなく、どうやって実装するかという詳細情報。
実装の詳細は表に出さないようにするべき。優れたソフトウェア(またはパッケージ)は、内部でどのように動いているかを気にせず使えるものである。(疎結合)

具体的に言うと、インターフェースにチャネルを含めないということを意味する。
APIとして外部モジュールなどに公開する関数の引数としてチャネルを含めてはいけない。
チャネルを考慮すると、APIを使うために以下を気にする必要が出てくる。

  • チャネルがバッファリングされているか
  • クローズされたか
  • nilかどうか

想定されていない順序でチャネルにアクセスされ、意図せずデッドロックを招くことになってしまう。

まきぞうまきぞう

ゴルーチンにおけるforループ

並行性のあるコードを実装する上での多くのケースでは、ゴルーチンのクロージャには引数がない。
代わりに、クロージャが宣言された関数内の変数から値をとる。
そこで、forループの冒頭で宣言する値やインデックスの値を利用する場合、注意が必要。

func main() {
	a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
	ch := make(chan int, len(a))
	for _, v := range a {
		go func() {
			ch <- v * 2
		}()
	}
	for i := 0; i < len(a); i++ {
		fmt.Println(<-ch, " ")
	}
}

以下を実行すると、なぜか重複した40という値が出力されている

40
4
8
40
40
40
40
40
40
40
4
24
40
40
40
40
40
40
40
40

これは、ゴルーチンの中でvの値がキャプチャされていないという状況が発生しているから。
以下のような事態が起こっている。

  • forループが素早く実行され、最初の方に起動したv=1の時のゴルーチンが実行し終わる前にvの値が20となる
  • ゴルーチンから見たら、vの値は20となっている。
  • ゴルーチンの最初の値が40になってしまう可能性が出てくる

ゴルーチン起動時に決めた値を渡すには、2つの方法がある。

  • ゴルーチンの引数に値を渡す
  • ループ内でシャドーイングする

ゴルーチンの引数に値を渡す

以下のようにクロージャに引数を渡すことで、ゴルーチンに渡すvの値を起動時の値として扱うことができる

func main() {
	a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
	ch := make(chan int)
	for _, v := range a {
		v := v
		go func(input int) {
			ch <- input * 2
		}(v)
	}
	for i := 0; i < len(a); i++ {
		fmt.Println(<-ch, " ")
	}
}
4
16
8
12
24
20
32
28
36
40

もちろん、並行で実行しているので、順番はタイミングによって異なってくる

ループ内でシャドーイングする

シャドーイングとは?

シャドーイングとは、内側のスコープで定義された変数が外側のスコープで同じ名前の変数を「隠す」現象を指す。つまり、内側のスコープで定義された変数が優先され、外側のスコープの変数にアクセスできなくなる。

func main() {
	a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
	ch := make(chan int)
	for _, v := range a {
		v := v
		go func() {
			ch <- v * 2
		}()
	}
	for i := 0; i < len(a); i++ {
		fmt.Println(<-ch, " ")
	}
}
4
12
8
16
28
40
32
20
36
24
まきぞうまきぞう

ゴルーチンの終了チェック

ゴルーチンとして起動された関数を起動する際には、確実に終了するようにしなければならない。
Goのランタイムは全く使われないゴルーチンを検知できない。
ゴルーチンが終了しない場合、スケジューラは定期的にゴルーチンに無駄な時間を割り振る。
こうなると全体の動作が遅くなってしまう。これをゴルーチンリンクという。

以下のようなコードで発生する。

func main() {
	ch := make(chan int)
	go func() {
		for {
			select {
			case v := <-ch:
				fmt.Println(v)
			}
		}
	}()
	time.Sleep(1 * time.Second)
}

このコードでは、メインゴルーチンとサブゴルーチンの2つが起動している。
サブゴルーチンでは、無限ループの中でselectを実行し、チャネルに値が書き込まれるのを待機している。
しかし、メインゴルーチンでは1秒待機するだけで終了し、サブゴルーチンはずっと書き込み待ち状態になってしまう。

これを解決するには、以下のようにチャネルを閉じることで回避する必要がある。

func main() {
	ch := make(chan int)
	done := make(chan bool)

	go func() {
		for {
			select {
			case v, ok := <-ch:
				if !ok {
					done <- true
					return
				}
				fmt.Println(v)
			}
		}
	}()

	go func() {
		time.Sleep(500 * time.Millisecond)
		close(ch)
	}()

	<-done
}
まきぞうまきぞう

doneチャネルパターン

ゴルーチンに対して、処理を終了すべきであるというシグナルを送ることができる形式があり、そのパターンをdoneチャネルパターンというものが存在する。

https://github.com/Keigo-Hirohara/Oreilly-Golang/blob/e431210e2b7298a85f8a5ca7e4aff578bb58d373/DoneChannelPattern/main.go

ここではdoneという名前のチャネルが宣言されていて、空のstructとして定義されている。
このチャネルは値を値を書き込まれることはなく、ただクローズするだけ。
渡された各searcherに対してごルーチンを起動し、ゴルーチン内のselect文は次のいづれかを待っている。

  1. searcher関数が値を返した時、resultチャネルへの書き込みをする
  2. doneチャネルからの値の読み込み

オープンされた(バッファリングされていない)チャネルからの読み込みは、値が書き込まれるまでブロックされる。
doneから読み込まれるケースはdoneがクローズされるまでポーズされていて、resultに(空配列であったとしても)最初の値が返された時にdoneチャネルがクローズされ、これがゴルーチンに対する終了のシグナルとなり、ゴルーチンリークを防止してくれる。

ゴルーチンリークとは?

ゴルーチンが終了せず、ずっと残ってしまうこと

どうしてdoneチャネルがクローズすると、全てのゴルーチンが終了するの?

select文は定義したcaseのうちどれか一つの条件が当てはまれば、それ以外のcaseは選択されなくなるよね?
doneチャネルが終了するとクローズしたチャネルが読み込めるようになり、以降のselect文ではresultではなくdoneチャネルが選択されるようになる。なので一つ目のcaseである関数の実行が選択されないので後ルーチンがすぐに終了するよという流れだ!

まきぞうまきぞう

キャンセレーション関数を用いたゴルーチンの終了

ゴルーチンにシグナルを送信して終了(キャンセル)させるという手法はdoneチャネルパターンによって実現できたが、実はゴルーチンを終了させる方法はもう一つある。

https://github.com/Keigo-Hirohara/Oreilly-Golang/blob/23ace5fc1cb93544cb3c5eafbf3dc3a2701c7adb/cancelation.go

countTo関数ではdoneチャネルパターンと同じようにdoneチャネルを生成し、クローズされているかをselectによって判定し、クローズされていればゴルーチンを終了するというロジックを組んでいる。

ここで、doneチャネルをクローズするクロージャをmain関数に渡し、main関数で実行することでゴルーチンを終了させている。

ここでいうcancelFuncをキャンセレーション関数という。

キャンセレーション関数を用意することで、チャネルのクローズだけでなくデータベースへの後処理を記述したりすることができるなどのメリットがある。

このスクラップは2ヶ月前にクローズされました