📘

goroutine

2023/05/26に公開

goroutine(ゴルーチン)

参考:Go言語 プログラミングエッセンス

スレッドとは

プログラミングにおけるスレッド(スレッド)は、プログラムの実行フローの単位です。スレッドは、プロセス内で独立して動作する軽量な処理単位であり、複数のスレッドを同時に実行することで、並行性や並列性を実現することができます。

スレッドは、プロセス内のリソース(メモリ、ファイルハンドルなど)を共有しながら実行されます。つまり、複数のスレッドは同じメモリ空間にアクセスすることができ、データの共有や通信が比較的容易に行えます。ただし、スレッド同士が競合状態(データ競合)になることがあるため、注意が必要です。

スレッドを使用することで、複数のタスクを同時に実行したり、タスクの処理を非同期に行ったりすることができます。例えば、ウェブブラウザで複数のタブを同時に開いている場合、それぞれのタブの処理を個別のスレッドで実行することで、タブ間の応答性を向上させることができます。

一般的なプログラミング言語やフレームワークには、スレッドを作成・制御するための機能やライブラリが提供されています。スレッドの作成や終了、スレッド間のデータのやり取り、スレッドの同期などの操作を行うことができます。ただし、スレッドの管理には注意が必要であり、適切な同期や排他制御を行わないと、競合状態やデッドロックなどの問題が発生する可能性があリます。

goroutineとは

Goのランタイムで管理された軽量のスレッド

message := "hi"
go sendMessage(message)
  • goroutineを実行するには、関数呼び出しの前にgoをつけるだけです。
  • goキーワードをつけるたび、goroutineの数が増えます。

syncパッケージ

goroutineは関数の中で実行しても動き続けます。逆に言うと、main関数の中で実行するとgoroutineが実行中にも関わらずmain関数の終了とともに強制終了してしまいます。main関数がgoroutineの終了を待つためにはsyncパッケージを使う必要があります。

package main

import(
	"fmt"
	"sync"
)

func main(){
	var wg sync.WaitGroup
	wg.Add(1) // リファレンスカウンタを+1する
	go func(){
		defer wg.Done()
		// 重たい処理
	}()

	// 別の重たい処理

	wg.Wait() // リファレンスカウンタが0になるまで待つ
}
  • WaitGroupは、goroutineの数だけAddする必要があります。

無名関数

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}
  • }()の意味は、無名関数を定義して、すぐに実行するという意味です。

変数の注意点

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

↓実行結果

10
10
10
10
10
10
10
10
10
10
  • goroutineの中で参照している変数iの結果がfmt.Printlnで表示しようとしている時にはすでにforループが終了しているので、このような結果になります。
  • これを回避するには、ループのスコープの中で新たな変数を宣言します。
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		v := i
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(v)
		}()
	}
	wg.Wait()
}
  • もしくは、無名関数の引数として渡します。
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(v int) {
			defer wg.Done()
			fmt.Println(v)
		}(i)
	}
	wg.Wait()
}

↓実行結果

9
0
1
2
3
4
5
6
7
8

race condition

goroutineの呼び出し元とgoroutine内で同じ変数を参照・更新する場合にも注意が必要です。

race conditionが発生した場合にはどのような結果になるかが保証されていません。

データを保護するにはsync.Mutexを使って保護します

func main() {
	n := 0

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			n++
		}
	}()

	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			n++
		}
	}()

	wg.Wait()
	fmt.Println(n)
}

↓実行結果

1260
  • n++はnの値に+1するオペレータですが、元のnの値を得て+1したタイミングで、他のgoroutineがさらに+1を実行してしまう可能性があリマス。このようなrace conditionを避けるため、sync.Mutexを使います。
func main() {
	n := 0

	var mu sync.Mutex

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			mu.Lock()
			n++
			mu.Unlock()
		}
	}()

	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			mu.Lock()
			n++
			mu.Unlock()
		}
	}()

	wg.Wait()
	fmt.Println(n)
}
  • **sync.Mutexは排他制御を実現するための仕組みです。排他制御は、複数のスレッドやゴルーチンが共有のリソースにアクセスする際に、同時にアクセスしないように制御することです。sync.Mutexはそのためのツールであり、ロックを使用することで共有リソースへの同時アクセスを制限します。一度に1つのスレッドやゴルーチンのみがロックを取得できるため、他のスレッドやゴルーチンは待機します。これにより、競合状態やデータ破損などの問題を防ぐことができます。つまり、sync.Mutex**を使用することで、排他制御が実現されます。

channel

channelはgoroutineに対してデータを送受信できる仕組みです。

func server(ch chan string) {
	defer close(ch)
	ch <- "one"
	ch <- "two"
	ch <- "three"
}

func main() {
	var s string

	ch := make(chan string)
	go server(ch)

	s = <-ch
	fmt.Println(s)
	s = <-ch
	fmt.Println(s)
	s = <-ch
	fmt.Println(s)
}

↓実行結果

one
two
three
  • channelの作成にはmakeを使います。
ch := make(chan string)
  • chanの後に送信するデータの型を指定します。そしてchannelを閉じるにはcloseを使います。
close(ch)
  • channelにデータを送信するためにはch <-を使います。またchannelからデータを受信するためには<- chを使います。channnelからデータを受け取る処理に関してはforを使うことができます。
**for _,s := range ch {
	fmt.Println(s)
}**
  • このループを終了させるためにはgoroutine側でchannelを閉じる必要があります。

一見、単なるデータの受け渡しにしか見えないかもしれません。ですが、これが非同期処理に大きな影響をもたらします。

例えば10個のURL文字列があるとします。それらが示すHTTPサーバ上のCSVファイルをダウンロードし、全てをデータベースに登録する処理を書くとします。

HTTPサーバからダウンロードする処理はCPUをあまり使いません。また、データベースに登録する処理もCPUをあまり使いません。これらを並行で動作させることで、CPUを有効に使い全体の処理時間を短くすることができます。

goroutineとchannelを使うことでそういった処理を簡単に実装できます。

func downloadCSV(wg *sync.WaitGroup, urls []string, ch chan []byte) {
	defer wg.Done()
	defer close(ch) // 終わったら閉じる(5)

	// HTTPサーバーからのダウンロード
	for _, u := range urls {
		resp, err := http.Get(u)
		if err != nil {
			log.Println("cannot download CSV:", err)
			continue
		}
		b, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			resp.Body.Close()
			log.Println("cannot read content:", err)
			continue
		}
		resp.Body.Close()
		ch <- b // main関数にコンテンツを送信(3)
	}
}

func main() {
	urls := []string{
		"http://my-server.com/data01.csv",
		"http://my-server.com/data01.csv",
		"http://my-server.com/data01.csv",
		"http://my-server.com/data01.csv",
	}

	// バイト列を転送するためのchannelを作成(1)
	ch := make(chan []byte)

	var wg sync.WaitGroup
	wg.Add(1)
	go downloadCSV(&wg, urls, ch) //(2)

	// gotoutineからコンテンツを受け取る(4)
	for _, b := range ch {
		r := csv.NewReader(bytes.NewReader(b))
		for {
			records, err := r.Read()
			if err != nil {
				log.Fatal(err)
			}
			// レコードの登録
			insertRecords(records)
		}
	}
	wg.Wait()
}
  • main関数からgoroutineを起動し、10個のURLを渡します。
  • goroutineではループを使いHTTPサーバーからダウンロードします。ダウンロードが完了したら都度、コンテンツをmain関数に渡します。
  • main関数はダウンロードされたコンテンツを都度goroutineから受け取ってCSVをパースし、データベースに登録します。
  • このように実装することで、CSVファイルをパースしたりデータベースに登録したりしている最中であってもHTTPサーバからのダウンロードが継続されることになります。

この例でのchanenlを使ったデータのやり取りをまとめると以下の手順になります。

  1. makeを使ってchannelを作成
  2. goroutineにchannelを渡す
  3. goroutineにてchanenlへ送信
  4. main関数にてchannelから受信
  5. goroutineの終了時にchannelを閉じる

channelは基本、送信と受信それぞれをブロッキングします。つまり、main関数のforループは、downloadCSVがコンテンツを送信するまでブロックします。コンテンツが届いていないのにforループが回り始めることはありません。

逆に、downloadCSVのコンテンツ送信は、main関数がchannelから読み取りを行わない限りブロックします。つまり双方が準備ができて初めてブロックが解けることになります。

ここで、ダウンロード処理の方が早く終わってしまう場合を考えます。HTTPサーバがイントラネットにある場合にはデータベースへの登録処理よりも早く終わることになるでしょう。そう言った場合、downloadCSVでのchannelへの送信の際に発生するブロックが、main側の受信の際に発生するブロックよりも長くなってしまうことになります。

このような場合には、以下のようにchannelにバッファを持たせることができます。

ch := make(chan string,5)

このバッファを持ったchannelは、コンテンツが5つ登録されるまで送信がブロックされることはありません。また、コンテンツがいくつかバッファリングされている状態であれば、受信側もブロックされることはありません。うまく調整することで、非同期処理がただし、ダウンロードしたコンテンツが、最も多い場合で5つ分メモリに保持されることになります。用途に合わせて調整する必要があります。

非同期パターン

・非同期処理(Asynchronous processing)と同期処理(Synchronous processing)は、プログラムやシステムでのタスクの実行方法に関連しています。
・同期処理は、タスクが順番に実行され、前のタスクの完了を待ってから次のタスクが実行される方法です。つまり、各タスクは他のタスクが完了するのを待ってから実行されます。これにより、タスクの実行順序が明確になり、結果を取得するためにはタスクが完了するまで待つ必要があります。同期処理はシンプルで直感的ですが、タスクが長時間実行される場合や、他のタスクの完了を待つことでパフォーマンスが低下する可能性があります。
・一方、非同期処理は、タスクの実行順序が保証されず、タスクが同時に並行して実行される方法です。非同期処理では、タスクが開始された後、その完了を待たずに次のタスクが実行されます。非同期処理では、タスクがバックグラウンドで実行されるため、他の処理やタスクをブロックすることなく、パフォーマンスの向上が期待できます。ただし、結果を取得するためには、通常はコールバック関数やプロミスなどの仕組みを使用して、タスクの完了を待つ必要があります。
・要約すると、同期処理はタスクの実行順序が保証され、結果を取得するためにはタスクの完了を待つ必要があります。一方、非同期処理はタスクの実行順序が保証されず、他の処理をブロックせずにタスクを並行して実行できますが、結果を取得するためには明示的な待機が必要です。どちらの方法を選択するかは、特定の要件やシナリオによって異なります。

package main

import (
	"fmt"
	"time"
)

func generator(msg string) <-chan string {
	ch := make(chan string)
	go func() {
		for i := 0; ; i++ {
			ch <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Second)
		}
	}()
	return ch
}

func main() {
	ch := generator("Hello")
	for i := 0; i < 5; i++ {
		fmt.Println(<-ch)
	}
}

↓実行結果

Hello 0
Hello 1
Hello 2
Hello 3
Hello 4

このプログラムは、ジェネレータ(generator)という関数を使用して非同期なメッセージの生成と取得を行うものです。以下で各部分の解説をします。

func generator(msg string) <-chan string {
	ch := make(chan string)
	go func() {
		for i := 0; ; i++ {
			ch <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Second)
		}
	}()
	return ch
}

generator関数は、引数として文字列msgを受け取り、<-chan string型(送信専用の文字列チャネル)を返します。内部で非同期にゴルーチン(goroutine)を起動し、メッセージを生成してチャネルに送信する処理を行います。

具体的には、無限ループ内でfmt.Sprintfを使用してmsgとループ変数iを組み合わせた文字列を生成し、chチャネルに送信します。その後、time.Sleep(time.Second)を使って1秒待機します。これにより、メッセージ生成と送信が1秒ごとに繰り返されることになります。

func main() {
	ch := generator("Hello")
	for i := 0; i < 5; i++ {
		fmt.Println(<-ch)
	}
}

main関数では、generator関数を呼び出してchチャネルを取得します。その後、forループを使用して5回繰り返し、<-chを使ってチャネルからメッセージを受信し、それをfmt.Printlnで出力します。

結果として、5回の繰り返しでgenerator関数から生成されたメッセージが順番に取得され、それぞれが出力されます。

このプログラムの特徴は、generator関数が非同期にメッセージを生成しているため、メッセージの生成と取得が同時に進行します。つまり、chチャネルにメッセージが送信される度に<-chが実行され、それぞれのメッセージが順番に出力されることになります。また、generator関数は無限ループでメッセージを生成し続けるため、取得回数に制限はありません。

  • 関数generatorの中でgoroutineを起動し、goroutineから返されたchannelをmain関数で受信します。ここで注目して欲しいのは、関数宣言にある<-chanです。このようにchanを宣言することで、返されるchanが受信専用であることを明示できます。返されたchanに対して送信を行うとコンパイルエラーになります。

バッファ付きチャネル

Go言語におけるバッファ付きチャネル(Buffered Channel)は、チャネルに一定の容量を設定することができる機能です。通常のチャネルは、送信側と受信側が同期してデータのやり取りを行いますが、バッファ付きチャネルでは、一定数のデータを非同期に送受信することができます。

バッファ付きチャネルを作成するには、make()関数を使用します。例えば、以下のようにバッファサイズ3のチャネルを作成します。

ch := make(chan int, 3) // バッファサイズ3のバッファ付きチャネル

この例では、int型のデータを扱うチャネルを作成しています。

バッファ付きチャネルへのデータの送信は、通常のチャネルと同様に<-演算子を使用します。

ch <- 10 // チャネルにデータを送信

ただし、バッファ付きチャネルでは、チャネルのバッファが満杯の場合には送信側が一時停止し、バッファに空きができるまで待機します。バッファに空きができれば、送信が再開されます。

データの受信も通常のチャネルと同様に行います。

data := <-ch // チャネルからデータを受信

バッファ付きチャネルの場合、バッファが空の状態で受信を行うと、受信側が一時停止し、バッファにデータが送信されるまで待機します。バッファにデータが送信されれば、受信が再開されます。

バッファ付きチャネルでは、バッファが満杯の状態で送信を行うと、送信側が一時停止し、受信側がデータを受信するまで待機します。受信側がデータを受信すれば、送信が再開されます。

バッファ付きチャネルは、非同期の送受信を可能にするため、特にゴルーチン間の通信に便利です。バッファサイズがチャネルの容量を表し、送信側と受信側の間での同期を柔軟に制御することができます。

Discussion