goでワーカープール
はじめに
プール内にある複数タスクを複数ゴルーチンで並行に処理していくワーカープールの実装パターンをいくつか書いてみた
1. チャネルを使ってワーカーにタスクを受けわたすパターン
5つのタスクを二つのワーカーで手分けして並行処理している
package main
import (
"fmt"
"workerpool/pool"
)
func main() {
p := pool.New(2)
requests := []string{"1", "2", "3", "4", "5"}
tasks := make(chan func(), len(requests))
for _, r := range requests {
r := r
tasks <- func() {
fmt.Println(r)
}
}
p.Run(tasks)
}
// 1
// 2
// 4
// 5
// 3
package pool
import (
"sync"
)
type Pool struct {
parallelism int
}
func New(parallelism int) *Pool {
p := &Pool{
parallelism: parallelism,
}
return p
}
func (p *Pool) Run(tasks chan func()) {
var wg sync.WaitGroup
for i := 0; i < p.parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case t, ok := <-tasks:
if !ok {
return
}
t()
}
}
}()
}
close(tasks)
wg.Wait()
}
2. 各ワーカーにプールごとわたすパターン
5つのタスクを二つのワーカーで手分けして並行処理している
package main
import (
"fmt"
"workerpool/pool"
)
func main() {
p := pool.New(2)
requests := []string{"1", "2", "3", "4", "5"}
tasks := make(chan func(), len(requests))
for _, r := range requests {
r := r
tasks <- func() {
fmt.Println(r)
}
}
p.Run(tasks)
}
// 2
// 3
// 1
// 5
// 4
package pool
import (
"sync"
)
type Pool struct {
parallelism int
}
func New(parallelism int) *Pool {
p := &Pool{
parallelism: parallelism,
}
return p
}
func worker(tasks chan func(), wg *sync.WaitGroup) {
for task := range tasks {
task()
wg.Done()
}
}
func (p *Pool) Run(tasks chan func()) {
var wg sync.WaitGroup
wg.Add(len(tasks))
for i := 0; i < p.parallelism; i++ {
go worker(tasks, &wg)
}
close(tasks)
wg.Wait()
}
3. タスクの総数がわからないパターン その1
上述の 1 と 2 の実装パターンはいずれも、以下のようにタスクの総数がわかっている状態でのワーカープール実装だった。
// タスクの総数分、あらかじめバッファを取ってる
tasks := make(chan func(), len(requests))
タスクの総数がわからない状態でも利用できるワーカープールの実装をやってみたのが以下。
5つのタスクを runtime.NumCPU() 個のワーカーで手分けして並行処理している
package main
import (
"fmt"
"strconv"
"sync/atomic"
"workerpool5/pool"
)
func main() {
p := pool.New()
requests := []string{"1", "2", "3", "4", "5"}
for _, r := range requests {
r := r
p.AddTask(func() (int, error) {
return strconv.Atoi(r)
})
}
p.Wait()
// 結果確認
for {
select {
case result := <-p.Results:
if result.Err != nil {
fmt.Printf("%v", result.Err)
} else {
fmt.Println(result.Value)
}
atomic.AddInt64(&p.RemainTaskCount, -1)
case <-p.End:
if p.RemainTaskCount == 0 {
return
}
}
}
}
// 1
// 5
// 3
// 2
// 4
package pool
import (
"runtime"
"sync"
"sync/atomic"
)
type Task func() (int, error)
type Result struct {
Value int
Err error
}
type Pool struct {
taskQueue chan Task
End chan struct{}
once sync.Once
Results chan Result
RemainTaskCount int64
}
func New() *Pool {
p := &Pool{
taskQueue: make(chan Task),
End: make(chan struct{}),
Results: make(chan Result),
}
go p.dispatch()
return p
}
func (p *Pool) AddTask(task Task) {
if task != nil {
p.taskQueue <- task
atomic.AddInt64(&p.RemainTaskCount, 1)
}
}
func (p *Pool) dispatch() {
for i := 0; i < runtime.NumCPU(); i++ {
go worker(p.taskQueue, p.Results)
}
}
func worker(tasks <-chan Task, results chan<- Result) {
for t := range tasks {
r := Result{}
result, err := t()
if err != nil {
r.Err = err
} else {
r.Value = result
}
results <- r
}
}
func (p *Pool) Wait() {
p.once.Do(func() {
close(p.taskQueue)
})
close(p.End)
}
AddTask するたびに RemainTaskCount を増やし、タスクが実行されるたびに RemainTaskCount を減らす。最終的に、 RemainTaskCount が 0 になっていればプログラム終了させるイメージ。
RemainTaskCount を用いたタスクの残数確認を行わずに、 p.End の受信だけでプログラムを終了させると、全てのタスクが終了するより前に main が終了してしまう。
4. タスクの総数がわからないパターン その2
上述の 3 の実装パターンはタスクの終了を待つこと(RemainTaskCount の確認など)が利用者側に委ねられていた。
それを改善してみたのが以下。
先ほどの RemainTaskCount 相当のものは sync.WaitGroup での Wait に置き換わっているイメージ。
5つのタスクをタスクごとのワーカーで手分けして並行処理している。
package main
import (
"fmt"
"workerpool/pool"
)
func main() {
p := pool.New()
requests := []string{"1", "2", "3", "4", "5"}
for _, r := range requests {
r := r
p.AddTask(func() {
fmt.Println(r)
})
}
p.Wait()
}
// 1
// 5
// 3
// 2
// 4
package pool
import (
"sync"
)
type Pool struct {
taskQueue chan func()
end chan struct{}
once sync.Once
}
func New() *Pool {
p := &Pool{
taskQueue: make(chan func()),
end: make(chan struct{}),
}
go p.dispatch()
return p
}
func worker(task func(), wg *sync.WaitGroup) {
defer wg.Done()
task()
}
func (p *Pool) AddTask(task func()) {
if task != nil {
p.taskQueue <- task
}
}
func (p *Pool) dispatch() {
defer close(p.end) // 6
var wg sync.WaitGroup
LOOP:
for {
select {
case task, ok := <-p.taskQueue:
if !ok { // 3
break LOOP // 4
}
wg.Add(1)
go worker(task, &wg)
}
}
wg.Wait() // 5
}
func (p *Pool) Wait() {
p.once.Do(func() {
close(p.taskQueue) // 1
})
// 2
<-p.end
// 7
}
処理終了直前が、あっちこっちに処理が飛ぶので、コードにコメントで処理の流れの番号を書いている。補足が以下。
-
close(p.taskQueue)
でp.taskQueue
の受信側に処理が移る -
<-p.end
でWait()
の処理がブロックされる。このブロックが解放されるのは、dispatch()
のdefer
処理が動いたタイミング(後述)。 - 1 で
p.taskQueue
をclose
したことにより、task, ok := <-p.taskQueue
のok
がfalse
になる -
break LOOP
でfor {}
の無限ループを抜ける -
wg.Wait()
で、各ワーカーの処理終了を待つ。本来なら無限ループの後にあるこのコードは Unreachable なコードだが、break LOOP
したおかげでここに到達できる。 - 5 の 各ワーカーの処理が終了すると、
defer close(p.end)
で、p.end
チャネルがクローズされる - 6 で
p.end
チャネルがクローズされたので 2 でブロックされていた<-p.end
のブロックが解除されて、Wait()
は終了、main()
のゴルーチンも終了して処理終了
5. アクターモデル
これまでの実装は各ワーカーが共通のタスクキューからタスクをもらって処理をしていたが、各ワーカーごとにタスクキューを持ってるようなイメージの、アクターモデルというプログラミングモデルがある。
ワーカープールとアクターモデルは全然異なるものだが、イメージとしてはワーカープールでのワーカーはアクターモデルではアクター、ワーカープールでのタスクキューはアクターモデルではメールボックスと呼んだりする。
アクター同士で、メッセージをやりとりし、各アクターは自身のメールボックスにあるメッセージの処理をするような
go で簡易なアクターモデルをやってる記事があったので、貼り付け。サラッと読めるコード量。
アクターモデルといえば akka が有名だが、 golang でもライブラリは存在し、以下などがある。
Discussion