Go言語での Ordered な 並列処理について

4 min read 1

TL;DR

書けはしたけど、シーケンシャルに処理するより遅いので多分何か間違っている。そのため参考にしないこと。何が間違っているかわかる方はコメント欄にどうぞ

Go での並列処理について

Go言語では、 goroutine を用いることにより並列処理を手軽に書くことができます。

例えば以下のような処理

package main

import (
	"fmt"
	"time"
)

func say(s string) {
	for i := 0; i < 5; i++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println(s)
	}
}

func main() {
	go say("world")
	say("hello")
}

https://tour.golang.org/concurrency/1

簡単ですね。Javaとかでやると Thread がどうたらとか、色々面倒な印象です。(最近はマシですが)

並列処理の問題点について

今どき並列処理なんてのはそこまで珍しくないですが、知らない人からするとハマりがちな挙動って多いですよね。例えば以下のようなコード

package main

import (
	"fmt"
)

func say(i int) {
	fmt.Println(i)
}

func main() {
	for i := 0; i < 5; i++ {
		go say(i)
	}
}

環境にもよるのですが、実行すると何も表示されずに終わります。これは、mainスレッドがgoroutineより早く終了するためです。

雑に対処するなら、以下のように sleep を挟んでやります。

package main

import (
	"fmt"
	"time"
)

func say(i int) {
	fmt.Println(i)
}

func main() {
	for i := 0; i < 5; i++ {
		go say(i)
	}
	time.Sleep(50)
}

こうすると、環境にもよりますが何かが出力されると思います。筆者の環境では以下のように出力されました。

4
0
1
2
3

期待する順番とは異なってしまいますね。並列処理では、基本的に処理の順番は不定です。これもハマリポイントの一つです。

出力の順番を入力の順番と同じにするには

ここでは、あるテキストファイルを一行ずつ読み、何か処理をした上で、入力を受けた順番に処理を返すことを考えます。
処理は何でもいいのですが、例えば sha256 でハッシュ化する処理を考えます。
以下では、 input_text_path は入力となるテキストファイルのあるパス、出力は []string とします。

非パラレル版

凄くシンプル。並列処理で以下の実装と同じ出力を得ることを目指します。

func serial(input_text_path string) []string {
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()
	result := make([]string, 0)
	s := bufio.NewScanner(f)
	for s.Scan() {
		line := s.Bytes()
		encoded := sha256.Sum256(line)
		result = append(result, hex.EncodeToString(encoded[:]))
	}
	return result
}

駄目実装①

以下は駄目な実装例です。さあ何が駄目か考えてみましょう。

func parallel_broken(input_text_path string) []string {
	channel := make(chan string, 100_000)
	result := make([]string, 0)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)
	length := 0
	for s.Scan() {
		line := s.Bytes()
		go func() {
			encoded := sha256.Sum256(line)
			tmp := hex.EncodeToString(encoded[:])
			channel <- tmp
		}()
		length++
	}
	for i := 0; i < length; i++ {
		result = append(result, <-channel)
	}
	return result
}

駄目実装②

以下も駄目です。困った困った。何故駄目なのでしょうね。

func parallel1(input_text_path string) []string {
	channel := make(chan struct{}, 100_000)
	result := make([]string, 0)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)

	length := 0
	for s.Scan() {
		line := s.Bytes()
		encoded := sha256.Sum256(line)
		tmp := hex.EncodeToString(encoded[:])
		go func() {
			result = append(result, tmp)
			channel <- struct{}{}
		}()
		length++
	}

	for j := 0; j < length; j++ {
		<-channel
	}
	return result
}

駄目実装③

これでどや!仕様は満たしているやろ!

func parallel3(input_text_path string) []string {
	channel := make(chan struct{}, 100_000)
	f, err := os.Open(input_text_path)
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	s := bufio.NewScanner(f)
	var m sync.Map
	length := 0
	for s.Scan() {
		line := s.Bytes()
		new_line := make([]byte, len(line))
		_ = copy(new_line, line)
		go func(i int) {
			encoded := sha256.Sum256(new_line)
			tmp := hex.EncodeToString(encoded[:])
			m.Store(i, tmp)
			channel <- struct{}{}
		}(length)
		length++
	}

	for j := 0; j < length; j++ {
		<-channel
	}
	result := make([]string, length)
	for i := 0; i < length; i++ {
		tmp, _ := m.Load(i)
		result[i] = string(tmp.(string))
	}
	return result
}

ところが・・・

ベンチマーク結果

\(^o^)/


$ go test -bench .
...
BenchmarkSerial-4               1000000000               0.06522 ns/op
BenchmarkParallel3-4            1000000000               0.2431 ns/op

まとめ

後アイディアとしてあるのは、例えば sync.Map を使わずにarrayでやるとか。ただこれもそんなに早くなかったんですよね。
https://github.com/tejzpr/ordered-concurrently みたいなライブラリもあるみたいで、これは試していないのだが、どういう実装なのだろうか。