🦔

【Go言語】Worker Poolは本当に効率的か?

2024/05/30に公開

Worker Poolとは?

複数のtaskを非同期に実行する際に、同時実行数を制御し、taskの処理を効率的に行うための手法です。

一般論として

  • 並行処理数を制限することでメモリの急激な上昇を抑えることが出来る
  • タスクがキューとして溜まることで、効率的に処理を行うことが出来る
    とされています

今日はこれらを検証してみたいと思います

worker poolの仕組みがわからない方は下記の記事を参照ください
Goのworker poolの中で最もgithubのstarが多いantsに概念図が載っています
https://github.com/panjf2000/ants

Worker Poolの実装

下記3種類のWorker Poolライブラリを実装し、nativeなgoroutineとそれぞれ性能を比較します。

エントリー

ライブラリ リンク
gammazero/workerpool https://github.com/gammazero/workerpool
alitto/pond https://github.com/alitto/pond
panjf2000/ants/v2 https://github.com/panjf2000/ants
native goroutine -

ベンチマーク関数

試してみたい方は↓のgithubリポジトリをcloneして実行してみてください
https://github.com/kosuke-oya/golang-worker-pool-bench

2種類用意しました
関数benchCpuBoundはCPUバウンドな処理を行います。
シングルスレッドでの実行されます

func benchCpuBound() {
	for i := 0; i < 100; i++ {
		n, err := rand.Int(rand.Reader, big.NewInt(100))
		if err != nil {
			panic(err)
		}
		// []byteに変換する
		b := n.Bytes()
		s := sha256.Sum256(b)
		// []byteを2進数に変換して足し合わせる
		atomic.AddUint64(&sum, binary.BigEndian.Uint64(s[:]))
	}
}

関数benchCpuBoundMultiGoroutineはCPUバウンドな処理を行います。
内部ではgoroutineを生成し、並列処理を行います

func benchCpuBoundMultiGoroutine() {
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			n, err := rand.Int(rand.Reader, big.NewInt(100))
			if err != nil {
				panic(err)
			}
			// []byteに変換する
			b := n.Bytes()
			s := sha256.Sum256(b)
			// []byteを2進数に変換して足し合わせる
			atomic.AddUint64(&sum, binary.BigEndian.Uint64(s[:]))
		}()
	}
	wg.Wait()
}

ベンチマーク用のコード

// main_test.go
package main

import (
	"sync"
	"testing"

	"github.com/alitto/pond"
	"github.com/gammazero/workerpool"
	"github.com/panjf2000/ants/v2"
)

// poolのworker数
var maxWorkers = 1000

// taskの数
var tasks int = 100000

func BenchmarkGanmaPoolCpu(b *testing.B) {
	var wp = workerpool.New(maxWorkers)
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wp.Submit(benchCpuBound)
		}
		wp.StopWait()
	}
}
func BenchmarkPondPoolCpuBound(b *testing.B) {
	var wp = pond.New(maxWorkers, 0)
	defer wp.StopAndWait()
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wp.Submit(benchCpuBound)
		}
	}
}
func BenchmarkAntsPoolCpu(b *testing.B) {
	var wg sync.WaitGroup
	ff := func() {
		benchCpuBound()
		defer wg.Done()
	}
	antsPool, _ := ants.NewPool(maxWorkers)
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wg.Add(1)
			antsPool.Submit(ff)
		}
		wg.Wait()
	}
}
func BenchmarkGoroutineCpu(b *testing.B) {
	var wg sync.WaitGroup
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				benchCpuBound()
			}()
		}
		wg.Wait()
	}
}
func BenchmarkGanmaPoolCpuMulriGoroutine(b *testing.B) {
	var wp = workerpool.New(maxWorkers)
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wp.Submit(benchCpuBoundMultiGoroutine)
		}
		wp.StopWait()
	}
}
func BenchmarkPondPoolCpuMultiGoroutine(b *testing.B) {
	var wp = pond.New(maxWorkers, 0)
	defer wp.StopAndWait()
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wp.Submit(benchCpuBoundMultiGoroutine)
		}
	}
}
func BenchmarkAntsPoolCpuMultiGoroutine(b *testing.B) {
	var wg sync.WaitGroup
	ff := func() {
		benchCpuBoundMultiGoroutine()
		defer wg.Done()
	}
	antsPool, _ := ants.NewPool(maxWorkers)
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wg.Add(1)
			antsPool.Submit(ff)
		}
		wg.Wait()
	}
}
func BenchmarkGoroutineCpuMultiGoroutine(b *testing.B) {
	var wg sync.WaitGroup
	for i := 0; i < b.N; i++ {
		for i := 0; i < tasks; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				benchCpuBoundMultiGoroutine()
			}()
		}
		wg.Wait()
	}
}

ベンチマーク結果

下記コマンドで実行できます

go test -bench . -benchmem -cpu=16

結果は下記の通りです

BenchmarkGanmaPoolCpu-16                               1        1771358966 ns/op        559399320 B/op  39901342 allocs/op
BenchmarkPondPoolCpuBound-16                           1        2250690715 ns/op        559902976 B/op  39905804 allocs/op
BenchmarkAntsPoolCpu-16                                1        1875148719 ns/op        559988272 B/op  39908900 allocs/op
BenchmarkGoroutineCpu-16                               1        3048518276 ns/op        570033272 B/op  40035626 allocs/op
BenchmarkGanmaPoolCpuMulriGoroutine-16                 1        2744510408 ns/op        721048344 B/op  50002653 allocs/op
BenchmarkPondPoolCpuMultiGoroutine-16                  1        4472468357 ns/op        727328472 B/op  50036263 allocs/op
BenchmarkAntsPoolCpuMultiGoroutine-16                  1        4518372041 ns/op        725127144 B/op  50030168 allocs/op
BenchmarkGoroutineCpuMultiGoroutine-16                 1        5122364796 ns/op        821075032 B/op  50341074 allocs/op

いくつかmaxworkerやtasksの数を変更して実行してみましたが、worker poolを使うことでnativeなgoroutineよりも速く・メモリ効率が良いという結果になりました

まとめ

worker poolを使うことでnativeなgoroutineよりも速く・メモリ効率が良くタスクを処理できます
本番環境でpond poolを使っておりますが、性能が良いため、お気に入りです

ただしエラーハンドリングの処理が少し煩雑になったりするので、シンプルなタスクを投げることをまずはお勧めします。

ありがとうございました。

GitHubで編集を提案

Discussion