Goに入門する - 並行処理 -
Goroutine
Goroutine(ゴルーチン)とは、Goで並行処理を行うための軽量スレッドのようなもの。
goroutine
を使って複数の処理を同時に実行することができる。
Goroutineの特徴
軽量
- Goroutineは非常に軽量で、大量のGoroutineを生成してもシステムのリソースを大量に消費しない
- OSのスレッドよりも効率的にリソースを使用する
独立した処理
- Goroutineは関数やメソッドを非同期に実行し、他の処理と並行して実行される
- 異なるGoroutineはそれぞれ独立して動作しますが、同期や通信はchannelという仕組みで行える
Goランタイムによる管理
- GoroutineはGoのランタイムによって管理されており、OSのスレッドに対して自動的にスケジュールされる
- これにより、開発者がスレッドを直接管理する必要はない
並行処理と並列処理
並行処理(Concurrency)
- 複数のタスクが「同時に進行している」状態を指す
- 1つのCPUでも、タスクを切り替えて実行することで、あたかも複数の作業が同時に進んでいるように見える
- 例えば、AタスクとBタスクが交互に進行することで、どちらも時間をシェアしながら実行される
並列処理(Parallelism)
- 複数のタスクが「同時に実行される」状態を指す
- 複数のCPUやコアを持つシステムで、異なるタスクが完全に同時に実行される
- 例えば、Aタスクは1つのCPUで、Bタスクは別のCPUで同時に実行される
Goroutine使い方
手始めにGoroutineを起動してみる。
func main() {
// goをつけることでgoroutineを起動できる
go func() {
fmt.Println("goroutine invoked")
}()
// NumGoroutineで起動中のgoroutineの数を取得
fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine()) // 2
fmt.Println("main func finish")
}
実行結果は次のようになる。
Goroutineの内容が出力されていないのは、Goroutineには起動に少々時間がかかり起動している間に、mainで実行が終わってしまったために出力されていない。
また、Goroutineの数が2になっているのは、エントリであるmain()も1つのGoroutineであるためである。
$ go run main.go
num of working goroutines: 2
main func finish
main()から分岐したGoroutineの処理を待って(=JOIN)、mainの処理を完了とする必要があるため、以下のコードに書き換える。
func main() {
var wg sync.WaitGroup
wg.Add(1) // カウンタを+1する
go func() {
wg.Done() // カウンタを-1する
fmt.Println("goroutine invoked")
}()
wg.Wait() // カウントの値が0になるまでGoroutineの完了を待つ
fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine())
fmt.Println("main func finish")
}
実行結果
$ go run main.go
goroutine invoked
num of working goroutines: 1
main func finish
channel
概要
channelによって、複数のGoroutine間でデータの送受信が可能になる。
channelに矢印が入っていく方が「書き込み」、channelから矢印が出ていく方が「読み込み」を表現する。
func main() {
// channel作成
ch := make(chan int)
var wg sync.WaitGroup
wg.Add(1)
// channelに書き込みするGoroutine
go func() {
defer wg.Done()
ch <- 10
time.Sleep(500 * time.Millisecond)
}()
// channel読み込み
fmt.Println(<-ch)
// 立ち上げたGoroutineが完了するまで処理を待つ
wg.Wait()
}
実行結果。
$ go run main.go
10
Goroutine Leak
下記の例は、読み込みのみが記載されている。そのため書き出し操作をずっと待ち続けた状態になる。待ち状態なのでメモリが解放されず、いわゆるGoroutineリーク状態となる。
ch1 := make(chan int)
// 読み出し状態のGoroutine(書き出し操作をずっと待っている状態・・・)
go func() {
fmt.Println(<-ch1)
}()
fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine()) // 2
ちなみにGoroutineリークしているかテストで確かめることができる。
下記は外部パッケージを使用して確かめている例。
package main
import (
"testing"
"go.uber.org/goleak"
)
func TestLeak(t *testing.T) {
defer goleak.VerifyNone(t)
main()
}
テスト実行すると、GotoutineLeakが検知できていることがわかる。
$ go test -v .
=== RUN TestLeak
num of working goroutines: 3
main_test.go:12: found unexpected goroutines:
[Goroutine 19 in state chan receive, with go-basics.main.func1 on top of the stack:
go-basics.main.func1()
/Users/kamiyashion/dev/go-basics/main.go:31 +0x28
created by go-basics.main in goroutine 18
/Users/kamiyashion/dev/go-basics/main.go:30 +0x6c
]
--- FAIL: TestLeak (0.44s)
FAIL
FAIL go-basics 0.681s
FAIL
バッファ
基本的にチェネルの受信を開始していないと、チャネルへ送信ができない。
下記の例だと、受信を開始する前に送信しているためエラーが発生する。
ch2 := make(chan int)
ch2 <- 2
fmt.Println(<-ch2) // // fatal error: all goroutines are asleep - deadlock!
しかし、バッファを設定することで受信を開始する前に送信が可能。
// バッファ1のチェネルを作成
ch2 := make(chan int, 1)
ch2 <- 2
fmt.Println(<-ch2) // 2
closed channel
channelを閉じる。
func main() {
ch1 := make(chan int)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(<-ch1) // 10
}()
ch1 <- 10
close(ch1)
// 二つ目の返り値にchannelが開いているか閉じているかフラグが返される
v, ok := <-ch1
fmt.Printf("%v %v\n", v, ok) // 0 false
wg.Wait()
}
バッファのあるchannelを閉じる。
バッファが尽きてようやく閉じられるため、バッファが尽きる前に閉じてもフラグがfalse
にならないことを示している。
// バッファ2のchannel
ch2 := make(chan int, 2)
ch2 <- 1
ch2 <- 2
close(ch2)
v, ok := <-ch2
fmt.Printf("%v %v\n", v, ok) // 1 true
v, ok = <-ch2
fmt.Printf("%v %v\n", v, ok) // 2 true
v, ok = <-ch2
fmt.Printf("%v %v\n", v, ok) // 0 false
capsel
channelを関数の中に閉じ込めカプセル化する。
func main() {
ch3 := generateCountStream()
for v := range ch3 {
fmt.Println(v)
}
}
// 読み出し(readonly)専用返り値を返す関数
func generateCountStream() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i <= 5; i++ {
ch <- i
}
}()
return ch
}
実行結果。
0
1
2
3
4
5
select
select を使うことで複数のチャネルで受信することができる。
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
time.Sleep(500 * time.Millisecond)
ch1 <- "A"
}()
go func() {
defer wg.Done()
time.Sleep(800 * time.Millisecond)
ch2 <- "B"
}()
// ch1またはch2がnilになるまでループ
for ch1 != nil || ch2 != nil {
select {
// ch1に書き込みがあった場合下記が実行される
case v := <-ch1:
fmt.Println(v)
ch1 = nil
// ch2に書き込みがあった場合下記が実行される
case v := <-ch2:
fmt.Println(v)
ch2 = nil
}
}
wg.Wait()
fmt.Println("finish")
}
実行結果
A
B
finish
select: default case
const bufSize = 3
func main() {
var wg sync.WaitGroup
ch := make(chan string, bufSize)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < bufSize; i++ {
time.Sleep(1000 * time.Millisecond)
ch <- "hello"
}
}()
for i := 0; i < 3; i++ {
select {
case m := <-ch:
fmt.Println(m)
default:
fmt.Println("no msg arrived")
}
time.Sleep(1500 * time.Millisecond)
}
}
実行結果
$ go run main.go
no msg arrived
hello
hello
mutex
以下の文を実行してみる。
func main() {
var wg sync.WaitGroup
var i int
wg.Add(2)
go func() {
defer wg.Done()
i++
}()
go func() {
defer wg.Done()
i++
}()
wg.Wait()
fmt.Println(i)
}
実行結果
$ go run main.go
2
ただしインクリメントするタイミングがそろえば1
が出力されるケースもある。
データ競合(data race)が発生しているかは-race
をつければOK。
$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c00011a048 by goroutine 7:
main.main.func2()
/Users/kamiyashion/dev/go-basics/main.go:18 +0x74
Previous write at 0x00c00011a048 by goroutine 6:
main.main.func1()
/Users/kamiyashion/dev/go-basics/main.go:14 +0x84
Goroutine 7 (running) created at:
main.main()
/Users/kamiyashion/dev/go-basics/main.go:16 +0x1a8
Goroutine 6 (finished) created at:
main.main()
/Users/kamiyashion/dev/go-basics/main.go:12 +0x108
==================
2
Found 1 data race(s)
exit status 66
1件の data race が検知されているのがわかる。
このデータ競合は mutex を使って回避することができる。
以下に書き換える。
func main() {
var wg sync.WaitGroup
var mu sync.Mutex
var i int
wg.Add(2)
go func() {
defer wg.Done()
// iをロックして他から参照できないようにする
mu.Lock()
// 処理が終わったらロックを解除して資源を解放する
defer mu.Unlock()
i++
}()
go func() {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
i++
}()
wg.Wait()
fmt.Println(i)
}
そして再度実行すると data race を解消できていることがわかる。
$ go run -race main.go
2
rwMutex
以下を実行する。
func main() {
var wg sync.WaitGroup
var rwMu sync.RWMutex
var c int
wg.Add(4)
go write(&rwMu, &wg, &c)
go read(&rwMu, &wg, &c)
go read(&rwMu, &wg, &c)
go read(&rwMu, &wg, &c)
wg.Wait()
fmt.Println("finish")
}
func read(mu *sync.RWMutex, wg *sync.WaitGroup, c *int) {
defer wg.Done()
time.Sleep(1 * time.Millisecond)
mu.RLock()
defer mu.RUnlock()
fmt.Println("read lock")
fmt.Println(*c)
time.Sleep(1 * time.Second)
fmt.Println("read unlock")
}
func write(mu *sync.RWMutex, wg *sync.WaitGroup, c *int) {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
fmt.Println("write lock")
*c += 1
time.Sleep(1 * time.Second)
fmt.Println("write unlock")
}
実行結果。
$ go run main.go
write lock
write unlock
read lock
1
read lock
1
read lock
1
read unlock
read unlock
read unlock
finish
実行結果を見てわかるように、通常の mutext だと読み込みが終了するまで待つ必要があったが、rwMutex を使用すると複数の読み込みを同時に実行することができる。
context
context を使うことでメインのGoroutineからサブGoroutineを一斉キャンセルすることができる。
withTimeout
下記のコードを実行する。
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
defer cancel()
wg.Add(3)
go subTask(ctx, &wg, "a")
go subTask(ctx, &wg, "b")
go subTask(ctx, &wg, "c")
wg.Wait()
}
func subTask(ctx context.Context, wg *sync.WaitGroup, id string) {
defer wg.Done()
t := time.NewTicker(500 * time.Millisecond)
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
case <-t.C:
t.Stop()
fmt.Println(id)
}
}
実行結果
$ go run main.go
b
c
a
タイムアウトの秒数を400
にして再度実行する。
ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
実行結果。
$ go run main.go
context deadline exceeded
context deadline exceeded
context deadline exceeded
このように、main の Goroutine から一斉にキャンセルすることができる。
errGroup
errorGroupは複数のGoroutineで発生するエラーを管理する機能。
下記のコードを実行してみる。
func main() {
eg := new(errgroup.Group)
s := []string{"task1", "fake1", "fake2", "task2"}
for _, v := range s {
task := v
eg.Go(func() error {
return doTask(task)
})
}
// errGroupのGoroutineを全て待機
if err := eg.Wait(); err != nil {
fmt.Printf("error %v\n", err)
}
fmt.Println("finish")
}
func doTask(task string) error {
if task == "fake1" || task == "fake2" {
return fmt.Errorf("%v failed", task)
}
fmt.Printf("task %v completed\n", task)
return nil
}
実行結果。タイミング的にfake2
が先に実行されてエラーになった。
$ go run main.go
task task2 completed
task task1 completed
error fake2 failed
finish