📑

goでワーカープール

2022/03/06に公開

はじめに

プール内にある複数タスクを複数ゴルーチンで並行に処理していくワーカープールの実装パターンをいくつか書いてみた

1. チャネルを使ってワーカーにタスクを受けわたすパターン

5つのタスクを二つのワーカーで手分けして並行処理している

main.go
package main

import (
	"fmt"
	"workerpool/pool"
)

func main() {
	p := pool.New(2)
	requests := []string{"1", "2", "3", "4", "5"}
	tasks := make(chan func(), len(requests))
	for _, r := range requests {
		r := r
		tasks <- func() {
			fmt.Println(r)
		}
	}
	p.Run(tasks)
}
// 1
// 2
// 4
// 5
// 3
pool/pool.go
package pool

import (
	"sync"
)

type Pool struct {
	parallelism int
}

func New(parallelism int) *Pool {
	p := &Pool{
		parallelism: parallelism,
	}
	return p
}

func (p *Pool) Run(tasks chan func()) {
	var wg sync.WaitGroup
	for i := 0; i < p.parallelism; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				select {
				case t, ok := <-tasks:
					if !ok {
						return
					}
					t()
				}
			}
		}()
	}
	close(tasks)
	wg.Wait()
}

2. 各ワーカーにプールごとわたすパターン

5つのタスクを二つのワーカーで手分けして並行処理している

main.go
package main

import (
	"fmt"
	"workerpool/pool"
)

func main() {
	p := pool.New(2)
	requests := []string{"1", "2", "3", "4", "5"}
	tasks := make(chan func(), len(requests))
	for _, r := range requests {
		r := r
		tasks <- func() {
			fmt.Println(r)
		}
	}
	p.Run(tasks)
}
// 2
// 3
// 1
// 5
// 4
pool/pool.go
package pool

import (
	"sync"
)

type Pool struct {
	parallelism int
}

func New(parallelism int) *Pool {
	p := &Pool{
		parallelism: parallelism,
	}
	return p
}

func worker(tasks chan func(), wg *sync.WaitGroup) {
	for task := range tasks {
		task()
		wg.Done()
	}
}

func (p *Pool) Run(tasks chan func()) {
	var wg sync.WaitGroup
	wg.Add(len(tasks))
	for i := 0; i < p.parallelism; i++ {
		go worker(tasks, &wg)
	}
	close(tasks)
	wg.Wait()
}

3. タスクの総数がわからないパターン その1

上述の 1 と 2 の実装パターンはいずれも、以下のようにタスクの総数がわかっている状態でのワーカープール実装だった。

// タスクの総数分、あらかじめバッファを取ってる
tasks := make(chan func(), len(requests))

タスクの総数がわからない状態でも利用できるワーカープールの実装をやってみたのが以下。
5つのタスクを runtime.NumCPU() 個のワーカーで手分けして並行処理している

main.go
package main

import (
	"fmt"
	"strconv"
	"sync/atomic"
	"workerpool5/pool"
)

func main() {
	p := pool.New()
	requests := []string{"1", "2", "3", "4", "5"}
	for _, r := range requests {
		r := r
		p.AddTask(func() (int, error) {
			return strconv.Atoi(r)
		})
	}
	p.Wait()
	// 結果確認
	for {
		select {
		case result := <-p.Results:
			if result.Err != nil {
				fmt.Printf("%v", result.Err)
			} else {
				fmt.Println(result.Value)
			}
			atomic.AddInt64(&p.RemainTaskCount, -1)
		case <-p.End:
			if p.RemainTaskCount == 0 {
				return
			}
		}
	}
}
// 1
// 5
// 3
// 2
// 4
pool/pool.go
package pool

import (
	"runtime"
	"sync"
	"sync/atomic"
)

type Task func() (int, error)

type Result struct {
	Value int
	Err   error
}

type Pool struct {
	taskQueue       chan Task
	End             chan struct{}
	once            sync.Once
	Results         chan Result
	RemainTaskCount int64
}

func New() *Pool {
	p := &Pool{
		taskQueue: make(chan Task),
		End:       make(chan struct{}),
		Results:   make(chan Result),
	}
	go p.dispatch()
	return p
}

func (p *Pool) AddTask(task Task) {
	if task != nil {
		p.taskQueue <- task
		atomic.AddInt64(&p.RemainTaskCount, 1)
	}
}

func (p *Pool) dispatch() {
	for i := 0; i < runtime.NumCPU(); i++ {
		go worker(p.taskQueue, p.Results)
	}
}

func worker(tasks <-chan Task, results chan<- Result) {
	for t := range tasks {
		r := Result{}
		result, err := t()
		if err != nil {
			r.Err = err
		} else {
			r.Value = result
		}
		results <- r
	}
}

func (p *Pool) Wait() {
	p.once.Do(func() {
		close(p.taskQueue)
	})
	close(p.End)
}

AddTask するたびに RemainTaskCount を増やし、タスクが実行されるたびに RemainTaskCount を減らす。最終的に、 RemainTaskCount が 0 になっていればプログラム終了させるイメージ。
RemainTaskCount を用いたタスクの残数確認を行わずに、 p.End の受信だけでプログラムを終了させると、全てのタスクが終了するより前に main が終了してしまう。

4. タスクの総数がわからないパターン その2

上述の 3 の実装パターンはタスクの終了を待つこと(RemainTaskCount の確認など)が利用者側に委ねられていた。
それを改善してみたのが以下。
先ほどの RemainTaskCount 相当のものは sync.WaitGroup での Wait に置き換わっているイメージ。

5つのタスクをタスクごとのワーカーで手分けして並行処理している。

main.go
package main

import (
	"fmt"
	"workerpool/pool"
)

func main() {
	p := pool.New()
	requests := []string{"1", "2", "3", "4", "5"}
	for _, r := range requests {
		r := r
		p.AddTask(func() {
			fmt.Println(r)
		})
	}
	p.Wait()
}
// 1
// 5
// 3
// 2
// 4
pool/pool.go
package pool

import (
	"sync"
)

type Pool struct {
	taskQueue   chan func()
	end         chan struct{}
	once        sync.Once
}

func New() *Pool {
	p := &Pool{
		taskQueue:   make(chan func()),
		end:         make(chan struct{}),
	}
	go p.dispatch()
	return p
}

func worker(task func(), wg *sync.WaitGroup) {
	defer wg.Done()
	task()
}

func (p *Pool) AddTask(task func()) {
	if task != nil {
		p.taskQueue <- task
	}
}

func (p *Pool) dispatch() {
	defer close(p.end) // 6
	var wg sync.WaitGroup
LOOP:
	for {
		select {
		case task, ok := <-p.taskQueue:
			if !ok { // 3
				break LOOP // 4
			}
			wg.Add(1)
			go worker(task, &wg)
		}
	}
	wg.Wait() // 5
}

func (p *Pool) Wait() {
	p.once.Do(func() {
		close(p.taskQueue) // 1
	})
	// 2
	<-p.end
	// 7
}

処理終了直前が、あっちこっちに処理が飛ぶので、コードにコメントで処理の流れの番号を書いている。補足が以下。

  1. close(p.taskQueue)p.taskQueue の受信側に処理が移る
  2. <-p.endWait() の処理がブロックされる。このブロックが解放されるのは、 dispatch()defer 処理が動いたタイミング(後述)。
  3. 1 で p.taskQueueclose したことにより、 task, ok := <-p.taskQueueok が false になる
  4. break LOOPfor {} の無限ループを抜ける
  5. wg.Wait() で、各ワーカーの処理終了を待つ。本来なら無限ループの後にあるこのコードは Unreachable なコードだが、 break LOOP したおかげでここに到達できる。
  6. 5 の 各ワーカーの処理が終了すると、 defer close(p.end) で、 p.end チャネルがクローズされる
  7. 6 で p.end チャネルがクローズされたので 2 でブロックされていた <-p.end のブロックが解除されて、 Wait() は終了、 main() のゴルーチンも終了して処理終了

5. アクターモデル

これまでの実装は各ワーカーが共通のタスクキューからタスクをもらって処理をしていたが、各ワーカーごとにタスクキューを持ってるようなイメージの、アクターモデルというプログラミングモデルがある。
ワーカープールとアクターモデルは全然異なるものだが、イメージとしてはワーカープールでのワーカーはアクターモデルではアクター、ワーカープールでのタスクキューはアクターモデルではメールボックスと呼んだりする。
アクター同士で、メッセージをやりとりし、各アクターは自身のメールボックスにあるメッセージの処理をするような

go で簡易なアクターモデルをやってる記事があったので、貼り付け。サラッと読めるコード量。
https://viscarra.dev/post/go-generics-actor/

アクターモデルといえば akka が有名だが、 golang でもライブラリは存在し、以下などがある。
https://github.com/asynkron/protoactor-go
https://github.com/ergo-services/ergo

Discussion