😪

Go言語での Ordered な 並列処理について

2021/06/03に公開1

TL;DR

書けはしたけど、シーケンシャルに処理するより遅いので多分何か間違っている。そのため参考にしないこと。何が間違っているかわかる方はコメント欄にどうぞ

Go での並列処理について

Go言語では、 goroutine を用いることにより並列処理を手軽に書くことができます。

例えば以下のような処理

package main

import (
	"fmt"
	"time"
)

func say(s string) {
	for i := 0; i < 5; i++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println(s)
	}
}

func main() {
	go say("world")
	say("hello")
}

https://tour.golang.org/concurrency/1

簡単ですね。Javaとかでやると Thread がどうたらとか、色々面倒な印象です。(最近はマシですが)

並列処理の問題点について

今どき並列処理なんてのはそこまで珍しくないですが、知らない人からするとハマりがちな挙動って多いですよね。例えば以下のようなコード

package main

import (
	"fmt"
)

func say(i int) {
	fmt.Println(i)
}

func main() {
	for i := 0; i < 5; i++ {
		go say(i)
	}
}

環境にもよるのですが、実行すると何も表示されずに終わります。これは、mainスレッドがgoroutineより早く終了するためです。

雑に対処するなら、以下のように sleep を挟んでやります。

package main

import (
	"fmt"
	"time"
)

func say(i int) {
	fmt.Println(i)
}

func main() {
	for i := 0; i < 5; i++ {
		go say(i)
	}
	time.Sleep(50)
}

こうすると、環境にもよりますが何かが出力されると思います。筆者の環境では以下のように出力されました。

4
0
1
2
3

期待する順番とは異なってしまいますね。並列処理では、基本的に処理の順番は不定です。これもハマリポイントの一つです。

出力の順番を入力の順番と同じにするには

ここでは、あるテキストファイルを一行ずつ読み、何か処理をした上で、入力を受けた順番に処理を返すことを考えます。
処理は何でもいいのですが、例えば sha256 でハッシュ化する処理を考えます。
以下では、 input_text_path は入力となるテキストファイルのあるパス、出力は []string とします。

非パラレル版

凄くシンプル。並列処理で以下の実装と同じ出力を得ることを目指します。

func serial(input_text_path string) []string {
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()
	result := make([]string, 0)
	s := bufio.NewScanner(f)
	for s.Scan() {
		line := s.Bytes()
		encoded := sha256.Sum256(line)
		result = append(result, hex.EncodeToString(encoded[:]))
	}
	return result
}

駄目実装①

以下は駄目な実装例です。さあ何が駄目か考えてみましょう。

func parallel_broken(input_text_path string) []string {
	channel := make(chan string, 100_000)
	result := make([]string, 0)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)
	length := 0
	for s.Scan() {
		line := s.Bytes()
		go func() {
			encoded := sha256.Sum256(line)
			tmp := hex.EncodeToString(encoded[:])
			channel <- tmp
		}()
		length++
	}
	for i := 0; i < length; i++ {
		result = append(result, <-channel)
	}
	return result
}

駄目実装②

以下も駄目です。困った困った。何故駄目なのでしょうね。

func parallel1(input_text_path string) []string {
	channel := make(chan struct{}, 100_000)
	result := make([]string, 0)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)

	length := 0
	for s.Scan() {
		line := s.Bytes()
		encoded := sha256.Sum256(line)
		tmp := hex.EncodeToString(encoded[:])
		go func() {
			result = append(result, tmp)
			channel <- struct{}{}
		}()
		length++
	}

	for j := 0; j < length; j++ {
		<-channel
	}
	return result
}

駄目実装③

これでどや!仕様は満たしているやろ!

func parallel3(input_text_path string) []string {
	channel := make(chan struct{}, 100_000)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)
	var m sync.Map
	length := 0
	for s.Scan() {
		line := s.Bytes()
		new_line := make([]byte, len(line))
		_ = copy(new_line, line)
		go func(i int) {
			encoded := sha256.Sum256(new_line)
			tmp := hex.EncodeToString(encoded[:])
			m.Store(i, tmp)
			channel <- struct{}{}
		}(length)
		length++
	}

	for j := 0; j < length; j++ {
		<-channel
	}
	result := make([]string, length)
	for i := 0; i < length; i++ {
		tmp, _ := m.Load(i)
		result[i] = string(tmp.(string))
	}
	return result
}

ところが・・・

ベンチマーク結果

\(^o^)/


$ go test -bench .
...
BenchmarkSerial-4               1000000000               0.06522 ns/op
BenchmarkParallel3-4            1000000000               0.2431 ns/op

まとめ

後アイディアとしてあるのは、例えば sync.Map を使わずにarrayでやるとか。ただこれもそんなに早くなかったんですよね。
https://github.com/tejzpr/ordered-concurrently みたいなライブラリもあるみたいで、これは試していないのだが、どういう実装なのだろうか。

Discussion

KZRNMKZRNM

sha256の計算が軽いのでマルチスレッドで動かすコストの方が大きいということでしょう。