Open4

並行処理

engineer rebornengineer reborn

目次

  • サクッとまとめてみたの記事
  • zen並行処理参考
  • errgroup
    • 最初のエラーを返す
    • 他のgoroutineはcontext cancelしない限り動く
engineer rebornengineer reborn

サクッとまとめてみた

  • 並行と並列の違い

    • 並行は複数のタスクを1箇所で同時に扱うこと
      • メインゴルーチン、サブゴルーチン
    • 並列処理は複数のタスクを複数箇所で同時に実行すること
  • 注意点

    • コードの実行順番が予測できない
      • 実行順序に依存する設計はだめ
    • 競合を避ける
      • データの読み書きで競合する可能性があるので排他制御をする必要がある
    • ゴールーチンリーク
      • ゴルーチンが不要になったにもかかわらず、終了しないで実行し続ける状態
        • メモリとcpuを消費してパフォーマンス低下、クラッシュもありえる
    • 早くなるとか限らない
      • ゴールーチンの作成やスケジューリング、同期メカニズムの使用など追加的な処理が発生します。これらの追加的な処理の影響が大きい場合、並行処理による性能向上が相殺される可能性があります

参考

engineer rebornengineer reborn

https://zenn.dev/hsaki/books/golang-concurrency/viewer/term

並行と並列処理の違い

  • 並行は複数の処理を同時にやっているように見える!

  • 並列は時間軸は同じで3つの動作を同じにする
    • コアが3つ必要

並行と並列の違和感に

ランタイムスケジューラ

どのゴルーチンを今 CPU に実行させるかを決めているもの
ゴルーチン(タスク)をCPU(作業台)にスケジューラー(管理者)

  • 高速切り替え(コンテキストスイッチ)
例えばゴルーチンAを少し動かして

途中でゴルーチンBに切り替えて

さらにCに切り替えて

またAに戻る
  • シンプルな並行
for i := 0; i < 3; i++ {
    /*
        go func() {
            fmt.Println(i)
        }()
    */
    go func(i int) {
        fmt.Println(i)
    }(i)
}
  • 複数
    ループ1は 1,2,3回が少しずつずれて実行されて、その後にループ2も 1,2,3回が少しずつずれて実行されるイメージ
for i := 0; i < 3; i++ {
    /*
        go func() {
            fmt.Println(i)
        }()
    */
    go func(i int) {
        fmt.Println(i)
    }(i)
}

for i := 0; i < 3; i++ {
    /*
        go func() {
            fmt.Println(i)
        }()
    */
    go func(i int) {
        fmt.Println(i)
    }(i)
}

基本

チャネルの大事な理解⭐️⭐️⭐️

  • チャネル
    • 送受信できる条件
      • バッファ無し
        • 受信側が待機してる時に送信できる
      • バッファあり
        • バッファに空きがあるとき、送信できる
  • range チャネル
    • closeされるまで受信する
package main

import (
	"fmt"
)

func main() {
	ch := make(chan int)

	// 値を送信する goroutine
	go func() {
		for i := 1; i <= 5; i++ {
			ch <- i
		}
		close(ch) // チャネルを閉じると range が終了する
	}()

	// チャネルを range で受信(close されるまで繰り返す)
	for v := range ch {
		fmt.Println("受信:", v)
	}
}

チャネルバッファ有無の理解は大事かも

  • バッファなしチャネルが同期の役割を果たす
  • バッファありチャネルがセマフォのような役割を果たす
    • リソース数に制限を設けて、それ以上処理が進まないように制御する仕組み
    • 同時に実行できる制限を設ける感じらしい
  • ブロックの有無につながるので

よくあるバグ

  • 値を正しく参照できない
    • iの渡し方
// bad
for i := 0; i < 3; i++ {
    go func() {
        fmt.Println(i)
    }()
}
/*
(実行結果)
2
2
2
*/

// good

for i := 0; i < 3; i++ {
    /*
        go func() {
            fmt.Println(i)
        }()
    */
    go func(i int) {
        fmt.Println(i)
    }(i)
}
/*
(実行結果)
0
2
1
(0,1,2が順不同で出力)
*/
  • 実行されずに終わった
    • 同期してないので、メインルーチンが先に終わる
// bad
func getLuckyNum() {
	// (前略)
	num := rand.Intn(10)
	fmt.Printf("Today's your lucky number is %d!\n", num)
}

func main() {
	fmt.Println("what is today's lucky number?")
	go getLuckyNum()
}

// good 1

func main() {
	fmt.Println("what is today's lucky number?")

	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		defer wg.Done()
		getLuckyNum()
	}()

	wg.Wait()
}

// good 2

func getLuckyNum(c chan<- int) {
	// (前略)
	num := rand.Intn(10)
	c <- num
}

func main() {
	fmt.Println("what is today's lucky number?")

	c := make(chan int)
	go getLuckyNum(c)

	num := <-c
}
  • データの競合
    • 追加作業などをやるときに下記のステップが行われるが、同時にやると空きの状態やメモリの確保が正しく行えないので、結果がおかしくなる
      • 空きの確認
      • メモリの確認
      • 要素追加
// bad

unc main() {
	src := []int{1, 2, 3, 4, 5}
	dst := []int{}

	// srcの要素毎にある何か処理をして、結果をdstにいれる
	for _, s := range src {
		go func(s int) {
			// 何か(重い)処理をする
			result := s * 2

			// 結果をdstにいれる
			dst = append(dst, result)
		}(s)
	}

	time.Sleep(time.Second)
	fmt.Println(dst)
}
// 期待
[2 4 6 8 10](順不同)

// 結果
$ go run main.go
[2 6 10]
$ go run main.go
[6 4 8 10]
$ go run main.go
[2 10]

// good 1
// チャンネルで、送信と受信の処理を書く
// 送信は並行
// 受信は順列だけど意味があるか?
// 1つの処理を重い処理と軽い処理に分けて、重い処理を並行にしてる
// ⭐️バッファ無しは、受信が来ないと送信できない。バッファありはこのブロックはない

func main() {
	src := []int{1, 2, 3, 4, 5}
	dst := []int{}

	c := make(chan int) //バッファなしだと、受信を待って送信できないので、バッファありがよき⭐️

	for _, s := range src {
		go func(s int, c chan int) {
			result := s * 2
			c <- result
		}(s, c)
	}

	for _ = range src {
		num := <-c
		dst = append(dst, num)
	}

	fmt.Println(dst)
	close(c)
}

// good 2
// Mutex (WaitGroupではない)
// 書き込む時は、1つのgoroutineだけを扱う

func main() {
	src := []int{1, 2, 3, 4, 5}
	dst := []int{}

	var mu sync.Mutex

	for _, s := range src {
		go func(s int) {
			result := s * 2
			mu.Lock()
			dst = append(dst, result)
			mu.Unlock()
		}(s)
	}

	time.Sleep(time.Second)
	fmt.Println(dst)
}

ランタイム

  • ランタイムとは
    • 実行時に必要になるあれこれの部品・環境(runtimeパッケージ)
      • カーネルから割り当てられたメモリを分割し、必要なところに割り当てる
      • ガベージコレクタを動かす
      • ゴールーチンのスケジューリングを行う
engineer rebornengineer reborn

errGroup

sync.WaitGroupと対比

  • エラー処理
    • go routineの待機だけでエラーや戻り値を処理しない
    • errgroupは、エラーが起きた場合にゴルーチンをキャンセルして、
  • コード量
    • 少なくかける
  • 並行数の制御
    • SetLimit(3)メソッド

挙動

  • エラーは、最初に発生したエラーを返す
  • 他のgoroutineは止まらない
if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    }

よくある実装

  • ユーザー情報を並行で取得
    • 1つでもエラーがあれば、internalserver エラーとして返す

g, _ := errgroup.WithContext(ctx)

var user *User
var skill *Skill
var account *Account

g.Go(func() error {
	u, err := fetchUser(userID)
	if err != nil {
		return err
	}
	user = u
	return nil
})

g.Go(func() error {
	s, err := fetchSkill(userID)
	if err != nil {
		return err
	}
	skill = s
	return nil
})

g.Go(func() error {
	a, err := fetchAccount(userID)
	if err != nil {
		return err
	}
	account = a
	return nil
})

if err := g.Wait(); err != nil {
	return err // ← ここが最初のエラーしか返さない
}

参考記事

https://qiita.com/Leapcell/items/eb26c4c7dd340d65855e