💭

Goで実装する周期的な並行処理

2021/12/15に公開約9,000字

概要

Momentum株式会社 の井上です。
これは Supershipグループ Advent Calendar 2021 の 21 日目の記事です。

本記事では、ある何らかの処理を一定周期間隔で複数回実行するようなプログラムを Go で実装します。実装方法はいくつかあるかと思いますが、今回は Go の標準ライブラリである time パッケージ SleepTickerを利用した2つのパターンで実装しました。

弊社採用情報に興味がある方はこちら

Supershipグループではプロダクト開発やサービス開発に関わる人を絶賛募集しております。
ご興味がある方は以下リンクよりご確認ください。
Momeuntum株式会社 採用サイト
Supershipグループ 採用サイト

背景

想定

本内容の応用例として HTTP リクエストで何らかの API からレスポンスを受け取るような処理を効率的に実行したいシーンがあります。API には色々な制限が設けられていることがあり、例えば rps(request per second) 1秒間にn回までリクエストを受け付ける、といった制限は珍しくありません。もしたくさんのレスポンスが必要な時は、規模感にもよりますが可能なら効率的に処理を実行したいと思います。効率的とは、このシーンでは制限ギリギリの回数までリクエストを投げ、周期内でレスポンスを処理することを指します。今回はこのようなケースを想定しながら周期的な並行処理を Go で実装したいと思います。

前提

  • 前述の想定を意識し、今回は対象の処理が完了するまでにランダムで数(ミリ)秒間かかることにします。
  • 一定周期内では並行処理します。
  • 一定周期内で対象の処理が完了しなかった場合についても考慮します。今回は周期内で処理が完了しなかった場合、全ての処理が完了してから次の周期を開始することにします。
    • 全ての処理が完了していなくても次の周期を開始することについても少しだけ言及します。
  • Goのバージョンは1.17.3です。

実装例

main関数

この後実装する doWithSleep 関数と doWithTicker 関数にかかった時間を計測します。
対象の処理にかかる時間はランダムなので Sleep と Ticker で比較はしませんが、全体でどれだけの時間がかかったかログを取ります。

main.go
func main() {
	log.SetFlags(log.Lmicroseconds)
	rand.Seed(time.Now().UnixNano()) // 対象の処理で使用する

	// これから実装する関数
	// この関数内で doSomething 関数を実行する
	fns := []func(){doWithSleep, doWithTicker}

	log.Println("Start.")
	for _, fn := range fns {
		start := time.Now()
		fn()
		log.Println(time.Since(start))
	}
	log.Println("Complete.")
}

対象の処理

一定周期で複数回実行される関数です。この後実装する doWithSleep 関数と doWithTicker 関数から呼び出します。

main.go
const (
	execTime  = 3000 // 対象の処理にかかる最大時間(millisecond)
)

// 対象の処理
// 引数の値は doWithSleep, doWithTicker 関数から渡される
func doSomething(idx int, num string) {
	r := rand.Intn(execTime)
	time.Sleep(time.Duration(r) * time.Millisecond)
	log.Printf("    [%d]: %s, ms=%d\n", idx, num, r)
}

Sleepで制御する

無限ループとSleep関数で一定周期を作っています。無限ループの中ではnumsの値を順番に参照しつつ一定回数だけループするように実装しています。
goroutine で doSomething関数を実行しています。今回は以下の条件を前提としているので、Sleepが完了した後も Wait する必要があります。

今回は周期内で処理が完了しなかった場合、全ての処理が完了してから次の周期を開始することにします。

もし全ての処理が完了していなくても次の周期を開始したい場合は、Sleep後の Wait は不要です。

main.go
const (
	interval  = 2    // interval(second)
	concCount = 5    // 一回分の並行処理数
)

var nums = []string{"Zero", "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", "Ten", "Eleven", "Twelve"}

func doWithSleep() {
	wg := sync.WaitGroup{}
	c := 0

	log.Println("  Sleep Start.")
LOOP:
	for {
		log.Println("    Doing...")
		for i := c; i < c+concCount; i++ {
			if len(nums) <= i {
				wg.Wait()
				break LOOP
			}
			wg.Add(1)
			go func(idx int, num string) {
				doSomething(idx, num)
				wg.Done()
			}(i, nums[i])
		}
		time.Sleep(interval * time.Second)
		wg.Wait()
		c += concCount
	}
	log.Println("  Sleep Complete.")
}

周期時間よりも実行時間が必ず短い時

execTime = 1000, interval = 2 として実行しました。

22:39:17.458558   Sleep Start.
22:39:17.458563     Doing...
22:39:17.721207     [2]: Two, ms=262
22:39:17.724449     [0]: Zero, ms=265
22:39:17.734973     [1]: One, ms=274
22:39:17.881865     [3]: Three, ms=418
22:39:18.415850     [4]: Four, ms=952
22:39:19.463267     Doing...
22:39:19.471228     [8]: Eight, ms=6
22:39:19.655713     [6]: Six, ms=187
22:39:19.916762     [7]: Seven, ms=452
22:39:20.103715     [9]: Nine, ms=635
22:39:20.437669     [5]: Five, ms=969
22:39:21.468715     Doing...
22:39:22.056174     [11]: Eleven, ms=587
22:39:22.079707     [12]: Twelve, ms=606
22:39:22.229673     [10]: Ten, ms=756
22:39:22.229785   Sleep Complete.
22:39:22.229796 4.771235000s

確かに2秒周期で5個の並行処理が実行・完了していて無駄な時間がほとんどないように見えます。

周期時間よりも実行時間が長くなる可能性がある時

execTime = 3000, interval = 2 として実行しました。

22:39:34.511079   Sleep Start.
22:39:34.511080     Doing...
22:39:35.159610     [1]: One, ms=645
22:39:35.299133     [3]: Three, ms=784
22:39:36.677408     [4]: Four, ms=2161
22:39:37.198802     [2]: Two, ms=2684
22:39:37.262978     [0]: Zero, ms=2749
22:39:37.263100     Doing...
22:39:37.782674     [8]: Eight, ms=514
22:39:38.684054     [5]: Five, ms=1418
22:39:38.797380     [7]: Seven, ms=1529
22:39:38.892188     [6]: Six, ms=1626
22:39:38.943325     [9]: Nine, ms=1675
22:39:39.268332     Doing...
22:39:39.847792     [11]: Eleven, ms=578
22:39:40.040714     [12]: Twelve, ms=767
22:39:42.217720     [10]: Ten, ms=2944
22:39:42.217814   Sleep Complete.
22:39:42.217831 7.706749000s

一回目の周期の中には2000msより長い処理がありますが、それらの完了を待ってからすぐ次の周期を実行していることが分かります。

Tickerで制御する

Sleep の代わりに Ticker で実装している点以外は基本的に doWithSleep関数 と同じです。
周期的な処理を終えた後は、 Ticker を Stop してあげます。

main.go
const (
	interval  = 2    // interval(second)
	concCount = 5    // 一回分の並行処理数
)

func doWithTicker() {
	ticker := time.NewTicker(interval * time.Second)
	wg := sync.WaitGroup{}
	c := 0

	log.Println("  Ticker Start.")
LOOP:
	for {
		log.Println("    Doing...")
		for i := c; i < c+concCount; i++ {
			if len(nums) <= i {
				wg.Wait()
				ticker.Stop()
				break LOOP
			}
			wg.Add(1)
			go func(idx int, num string) {
				doSomething(idx, num)
				wg.Done()
			}(i, nums[i])
		}
		<-ticker.C
		wg.Wait()
		c += concCount
	}
	log.Println("  Ticker Done.")
}

周期時間よりも実行時間が必ず短い時

Sleep同様 execTime = 1000, interval = 2 として実行しました。

22:39:22.229863   Ticker Start.
22:39:22.229871     Doing...
22:39:22.511171     [2]: Two, ms=276
22:39:22.631505     [3]: Three, ms=401
22:39:23.006909     [0]: Zero, ms=774
22:39:23.101523     [4]: Four, ms=867
22:39:23.123445     [1]: One, ms=891
22:39:24.234311     Doing...
22:39:24.235848     [6]: Six, ms=1
22:39:24.359675     [9]: Nine, ms=120
22:39:24.528674     [7]: Seven, ms=289
22:39:25.080663     [8]: Eight, ms=841
22:39:25.120700     [5]: Five, ms=886
22:39:26.230150     Doing...
22:39:26.482438     [10]: Ten, ms=247
22:39:26.884068     [11]: Eleven, ms=653
22:39:26.919218     [12]: Twelve, ms=685
22:39:26.919360   Ticker Done.
22:39:26.919378 4.689572000s

Sleepと同様に、無駄な時間がほとんどないように見えます。

周期時間よりも実行時間が長くなる可能性がある時

こちらもSleep同様 execTime = 3000, interval = 2 として実行しました。

22:39:42.217917   Ticker Start.
22:39:42.217927     Doing...
22:39:42.443213     [0]: Zero, ms=220
22:39:42.746245     [4]: Four, ms=523
22:39:42.889950     [1]: One, ms=670
22:39:44.057904     [3]: Three, ms=1836
22:39:44.689186     [2]: Two, ms=2466
22:39:44.689291     Doing...
22:39:46.082718     [6]: Six, ms=1388
22:39:46.685593     [8]: Eight, ms=1991
22:39:46.962899     [5]: Five, ms=2268
22:39:47.323677     [9]: Nine, ms=2633
22:39:47.609304     [7]: Seven, ms=2919
22:39:47.609441     Doing...
22:39:47.932430     [10]: Ten, ms=321
22:39:48.905557     [11]: Eleven, ms=1292
22:39:50.167863     [12]: Twelve, ms=2553
22:39:50.167953   Ticker Done.
22:39:50.167964 7.950120000s

こちらもSleepと同様に、周期の中には2000msより長い処理がありますが、それらの完了を待ってから次の周期を実行していることが分かります。

周期内で処理の完了を待たなかった場合

本記事の「Sleepで制御する」で以下について言及しました。

もし全ての処理が完了していなくても次の周期を開始したい場合は、Sleep後の Wait は不要です。

せっかくなのでこちらも Sleep と Ticker で実行してみたいと思います。
なお、ここでもexecTime = 3000, interval = 2 として実行しました。

22:56:13.216061 Start.
22:56:13.216292   Sleep Start.
22:56:13.216296     Doing...
22:56:14.233273     [2]: Two, ms=1015
22:56:14.421615     [3]: Three, ms=1200
22:56:14.456636     [4]: Four, ms=1235
22:56:14.823638     [1]: One, ms=1604
22:56:15.037597     [0]: Zero, ms=1816
22:56:15.220253     Doing...
22:56:16.497930     [7]: Seven, ms=1272
22:56:16.967889     [5]: Five, ms=1742
22:56:17.028498     [6]: Six, ms=1807
22:56:17.225950     Doing...
22:56:17.754940     [9]: Nine, ms=2530
22:56:17.966027     [8]: Eight, ms=2740
22:56:18.811830     [12]: Twelve, ms=1581
22:56:19.158357     [10]: Ten, ms=1927
22:56:19.252360     [11]: Eleven, ms=2021
22:56:19.252487   Sleep Complete.
22:56:19.252500 6.36207000s
22:56:19.252562   Ticker Start.
22:56:19.252570     Doing...
22:56:19.420980     [1]: One, ms=163
22:56:19.960843     [3]: Three, ms=703
22:56:19.978539     [4]: Four, ms=724
22:56:20.073644     [2]: Two, ms=818
22:56:21.253767     Doing...
22:56:21.426848     [0]: Zero, ms=2169
22:56:21.554089     [9]: Nine, ms=295
22:56:21.627936     [7]: Seven, ms=371
22:56:22.154153     [5]: Five, ms=895
22:56:22.776410     [6]: Six, ms=1520
22:56:23.254963     Doing...
22:56:23.510286     [12]: Twelve, ms=250
22:56:23.980348     [10]: Ten, ms=720
22:56:23.999980     [8]: Eight, ms=2742
22:56:24.018696     [11]: Eleven, ms=762
22:56:24.018765   Ticker Done.
22:56:24.018775 4.766265000s
22:56:24.018781 Complete.

周期内で完了しなかった処理が次の周期中に完了しているようです。
ちなみに一応補足ですが、この実行例では Sleep と Ticker で全体の処理にかかった時間に差がありますが、これは3回目の処理でたまたま Ticker の方が速く処理を完了したことによる差です。

まとめ

  • 本記事では周期的な並行処理をGoで実装しました。
  • Sleep と Ticker の2つのパターンで実装し、処理時間がランダムな場合でも一定周期内で対応できるようにしました。
  • 実行結果から期待通りの動作を確認しました。
  • 実行速度に関してはあまり差がありませんでした。

今回触れることができませんでしたが、今後は時間の差分やCPU利用率などについて考察していこうと思います。

Discussion

ログインするとコメントできます