Closed11

Goに入門する - 並行処理 -

ShionShion

Goroutine

Goroutine(ゴルーチン)とは、Goで並行処理を行うための軽量スレッドのようなもの。
goroutineを使って複数の処理を同時に実行することができる。

Goroutineの特徴

軽量

  • Goroutineは非常に軽量で、大量のGoroutineを生成してもシステムのリソースを大量に消費しない
  • OSのスレッドよりも効率的にリソースを使用する

独立した処理

  • Goroutineは関数やメソッドを非同期に実行し、他の処理と並行して実行される
  • 異なるGoroutineはそれぞれ独立して動作しますが、同期や通信はchannelという仕組みで行える

Goランタイムによる管理

  • GoroutineはGoのランタイムによって管理されており、OSのスレッドに対して自動的にスケジュールされる
  • これにより、開発者がスレッドを直接管理する必要はない

並行処理と並列処理

並行処理(Concurrency)

  • 複数のタスクが「同時に進行している」状態を指す
  • 1つのCPUでも、タスクを切り替えて実行することで、あたかも複数の作業が同時に進んでいるように見える
  • 例えば、AタスクとBタスクが交互に進行することで、どちらも時間をシェアしながら実行される

並列処理(Parallelism)

  • 複数のタスクが「同時に実行される」状態を指す
  • 複数のCPUやコアを持つシステムで、異なるタスクが完全に同時に実行される
  • 例えば、Aタスクは1つのCPUで、Bタスクは別のCPUで同時に実行される
ShionShion

Goroutine使い方

手始めにGoroutineを起動してみる。

func main() {
	// goをつけることでgoroutineを起動できる
	go func() {
		fmt.Println("goroutine invoked")
	}()
	// NumGoroutineで起動中のgoroutineの数を取得
	fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine()) // 2
	fmt.Println("main func finish")
}

実行結果は次のようになる。
Goroutineの内容が出力されていないのは、Goroutineには起動に少々時間がかかり起動している間に、mainで実行が終わってしまったために出力されていない。
また、Goroutineの数が2になっているのは、エントリであるmain()も1つのGoroutineであるためである。

$ go run main.go
num of working goroutines: 2
main func finish

main()から分岐したGoroutineの処理を待って(=JOIN)、mainの処理を完了とする必要があるため、以下のコードに書き換える。

func main() {
	var wg sync.WaitGroup

	wg.Add(1) // カウンタを+1する
	go func() {
		wg.Done() // カウンタを-1する
		fmt.Println("goroutine invoked")
	}()

	wg.Wait() // カウントの値が0になるまでGoroutineの完了を待つ
	fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine())
	fmt.Println("main func finish")
}

実行結果

$ go run main.go
goroutine invoked
num of working goroutines: 1
main func finish
ShionShion

channel

概要

channelによって、複数のGoroutine間でデータの送受信が可能になる。
channelに矢印が入っていく方が「書き込み」、channelから矢印が出ていく方が「読み込み」を表現する。

func main() {
	// channel作成
	ch := make(chan int)

	var wg sync.WaitGroup
	wg.Add(1)

	// channelに書き込みするGoroutine
	go func() {
		defer wg.Done()
		ch <- 10
		time.Sleep(500 * time.Millisecond)
	}()

	// channel読み込み
	fmt.Println(<-ch)

	// 立ち上げたGoroutineが完了するまで処理を待つ
	wg.Wait()
}

実行結果。

$ go run main.go         
10

Goroutine Leak

下記の例は、読み込みのみが記載されている。そのため書き出し操作をずっと待ち続けた状態になる。待ち状態なのでメモリが解放されず、いわゆるGoroutineリーク状態となる。

ch1 := make(chan int)
// 読み出し状態のGoroutine(書き出し操作をずっと待っている状態・・・)
go func() {
    fmt.Println(<-ch1)
}()
fmt.Printf("num of working goroutines: %d\n", runtime.NumGoroutine()) // 2

ちなみにGoroutineリークしているかテストで確かめることができる。
下記は外部パッケージを使用して確かめている例。

main_test.go
package main

import (
	"testing"

	"go.uber.org/goleak"
)

func TestLeak(t *testing.T) {
	defer goleak.VerifyNone(t)
	main()
}

テスト実行すると、GotoutineLeakが検知できていることがわかる。

$ go test -v .
=== RUN   TestLeak
num of working goroutines: 3
    main_test.go:12: found unexpected goroutines:
        [Goroutine 19 in state chan receive, with go-basics.main.func1 on top of the stack:
        go-basics.main.func1()
                /Users/kamiyashion/dev/go-basics/main.go:31 +0x28
        created by go-basics.main in goroutine 18
                /Users/kamiyashion/dev/go-basics/main.go:30 +0x6c
        ]
--- FAIL: TestLeak (0.44s)
FAIL
FAIL    go-basics       0.681s
FAIL

バッファ

基本的にチェネルの受信を開始していないと、チャネルへ送信ができない
下記の例だと、受信を開始する前に送信しているためエラーが発生する。

ch2 := make(chan int)
ch2 <- 2
fmt.Println(<-ch2) // // fatal error: all goroutines are asleep - deadlock!

しかし、バッファを設定することで受信を開始する前に送信が可能

// バッファ1のチェネルを作成
ch2 := make(chan int, 1)
ch2 <- 2
fmt.Println(<-ch2) // 2
ShionShion

closed channel

channelを閉じる。

func main() {
	ch1 := make(chan int)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println(<-ch1) // 10
	}()
	ch1 <- 10
	close(ch1)

	// 二つ目の返り値にchannelが開いているか閉じているかフラグが返される
	v, ok := <-ch1
	fmt.Printf("%v %v\n", v, ok) // 0 false
	wg.Wait()
}

バッファのあるchannelを閉じる。
バッファが尽きてようやく閉じられるため、バッファが尽きる前に閉じてもフラグがfalseにならないことを示している。

// バッファ2のchannel
ch2 := make(chan int, 2)
ch2 <- 1
ch2 <- 2
close(ch2)
v, ok := <-ch2
fmt.Printf("%v %v\n", v, ok) // 1 true
v, ok = <-ch2
fmt.Printf("%v %v\n", v, ok) // 2 true
v, ok = <-ch2
fmt.Printf("%v %v\n", v, ok) // 0 false
ShionShion

capsel

channelを関数の中に閉じ込めカプセル化する。

func main() {
	ch3 := generateCountStream()
	for v := range ch3 {
		fmt.Println(v)
	}
}

// 読み出し(readonly)専用返り値を返す関数
func generateCountStream() <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		for i := 0; i <= 5; i++ {
			ch <- i
		}
	}()
	return ch
}

実行結果。

0
1
2
3
4
5
ShionShion

select

select を使うことで複数のチャネルで受信することができる。

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		time.Sleep(500 * time.Millisecond)
		ch1 <- "A"
	}()
	go func() {
		defer wg.Done()
		time.Sleep(800 * time.Millisecond)
		ch2 <- "B"
	}()
	// ch1またはch2がnilになるまでループ
	for ch1 != nil || ch2 != nil {
		select {
		// ch1に書き込みがあった場合下記が実行される
		case v := <-ch1:
			fmt.Println(v)
			ch1 = nil
		// ch2に書き込みがあった場合下記が実行される
		case v := <-ch2:
			fmt.Println(v)
			ch2 = nil
		}
	}
	wg.Wait()
	fmt.Println("finish")
}

実行結果

A
B
finish
ShionShion

select: default case

const bufSize = 3

func main() {
	var wg sync.WaitGroup
	ch := make(chan string, bufSize)
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < bufSize; i++ {
			time.Sleep(1000 * time.Millisecond)
			ch <- "hello"
		}
	}()
	for i := 0; i < 3; i++ {
		select {
		case m := <-ch:
			fmt.Println(m)
		default:
			fmt.Println("no msg arrived")
		}
		time.Sleep(1500 * time.Millisecond)
	}
}

実行結果

$ go run main.go        
no msg arrived
hello
hello
ShionShion

mutex

以下の文を実行してみる。

func main() {
	var wg sync.WaitGroup
	var i int
	wg.Add(2)
	go func() {
		defer wg.Done()
		i++
	}()
	go func() {
		defer wg.Done()
		i++
	}()
	wg.Wait()
	fmt.Println(i)
}

実行結果

$ go run main.go                      
2

ただしインクリメントするタイミングがそろえば1が出力されるケースもある。
データ競合(data race)が発生しているかは-raceをつければOK。

$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c00011a048 by goroutine 7:
  main.main.func2()
      /Users/kamiyashion/dev/go-basics/main.go:18 +0x74

Previous write at 0x00c00011a048 by goroutine 6:
  main.main.func1()
      /Users/kamiyashion/dev/go-basics/main.go:14 +0x84

Goroutine 7 (running) created at:
  main.main()
      /Users/kamiyashion/dev/go-basics/main.go:16 +0x1a8

Goroutine 6 (finished) created at:
  main.main()
      /Users/kamiyashion/dev/go-basics/main.go:12 +0x108
==================
2
Found 1 data race(s)
exit status 66

1件の data race が検知されているのがわかる。
このデータ競合は mutex を使って回避することができる。

以下に書き換える。

func main() {
	var wg sync.WaitGroup
	var mu sync.Mutex
	var i int
	wg.Add(2)
	go func() {
		defer wg.Done()
		// iをロックして他から参照できないようにする
		mu.Lock()
		// 処理が終わったらロックを解除して資源を解放する
		defer mu.Unlock()
		i++
	}()
	go func() {
		defer wg.Done()
		mu.Lock()
		defer mu.Unlock()
		i++
	}()
	wg.Wait()
	fmt.Println(i)
}

そして再度実行すると data race を解消できていることがわかる。

$ go run -race main.go
2
ShionShion

rwMutex

以下を実行する。

func main() {
	var wg sync.WaitGroup
	var rwMu sync.RWMutex
	var c int

	wg.Add(4)
	go write(&rwMu, &wg, &c)
	go read(&rwMu, &wg, &c)
	go read(&rwMu, &wg, &c)
	go read(&rwMu, &wg, &c)

	wg.Wait()
	fmt.Println("finish")
}

func read(mu *sync.RWMutex, wg *sync.WaitGroup, c *int) {
	defer wg.Done()
	time.Sleep(1 * time.Millisecond)
	mu.RLock()
	defer mu.RUnlock()
	fmt.Println("read lock")
	fmt.Println(*c)
	time.Sleep(1 * time.Second)
	fmt.Println("read unlock")
}
func write(mu *sync.RWMutex, wg *sync.WaitGroup, c *int) {
	defer wg.Done()
	mu.Lock()
	defer mu.Unlock()
	fmt.Println("write lock")
	*c += 1
	time.Sleep(1 * time.Second)
	fmt.Println("write unlock")
}

実行結果。

$ go run main.go      
write lock
write unlock
read lock
1
read lock
1
read lock
1
read unlock
read unlock
read unlock
finish

実行結果を見てわかるように、通常の mutext だと読み込みが終了するまで待つ必要があったが、rwMutex を使用すると複数の読み込みを同時に実行することができる

ShionShion

context

context を使うことでメインのGoroutineからサブGoroutineを一斉キャンセルすることができる。

withTimeout

下記のコードを実行する。

func main() {
	var wg sync.WaitGroup
	ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
	defer cancel()

	wg.Add(3)
	go subTask(ctx, &wg, "a")
	go subTask(ctx, &wg, "b")
	go subTask(ctx, &wg, "c")
	wg.Wait()
}
func subTask(ctx context.Context, wg *sync.WaitGroup, id string) {
	defer wg.Done()
	t := time.NewTicker(500 * time.Millisecond)
	select {
	case <-ctx.Done():
		fmt.Println(ctx.Err())
		return
	case <-t.C:
		t.Stop()
		fmt.Println(id)
	}
}

実行結果

$ go run main.go
b
c
a

タイムアウトの秒数を400にして再度実行する。

ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond)

実行結果。

$ go run main.go
context deadline exceeded
context deadline exceeded
context deadline exceeded

このように、main の Goroutine から一斉にキャンセルすることができる。

ShionShion

errGroup

errorGroupは複数のGoroutineで発生するエラーを管理する機能。

下記のコードを実行してみる。

func main() {
	eg := new(errgroup.Group)
	s := []string{"task1", "fake1", "fake2", "task2"}
	for _, v := range s {
		task := v
		eg.Go(func() error {
			return doTask(task)
		})
	}
	// errGroupのGoroutineを全て待機
	if err := eg.Wait(); err != nil {
		fmt.Printf("error %v\n", err)
	}
	fmt.Println("finish")
}

func doTask(task string) error {
	if task == "fake1" || task == "fake2" {
		return fmt.Errorf("%v failed", task)
	}
	fmt.Printf("task %v completed\n", task)
	return nil
}

実行結果。タイミング的にfake2が先に実行されてエラーになった。

$ go run main.go
task task2 completed
task task1 completed
error fake2 failed
finish
このスクラップは1ヶ月前にクローズされました