😁

Go言語マニアックス ~ iter.PullもいいけどPushしたいよね

2024/10/05に公開

TL;DR

  • 標準 iter パッケージの iter.Pull でイテレータをpull型に変換すれば、イテレータから少しずつ値を取り出すことができる
  • これは既に終端のあるデータ列の利用(パーサ等)で有用だが、一方で終端のない未完のデータ列(リアルタイムの入力信号等)に対しては不便
  • そこで、少しずつ値を送りつけることのできる Push 関数を実装してみた
  • pull型イテレータでは Merge, Zip 等の複数のイテレータを統合する処理が書けるが、push型イテレータでは逆に複数のイテレータへ分配するような処理が可能になる
  • push型イテレータがあって初めてイテレータでやりたいことが完全に網羅できるのでは?ぜひ試してみてほしい

前置き

この記事ではGoのiter.Pullについて触れます。もしiter.Pullとかコルーチンについて詳しくない方は、Goのイテレータ深堀りNightで私が発表したスライドを参照してください。

https://docs.google.com/presentation/d/1XYwB6nARBhYjwMvCgczB9APC6OlvQVAzQqFRrAT556s/edit#slide=id.p

iter.Pull/コルーチン以外の普通のイテレータの使い方は、同イベントにて他の方が素晴らしい発表をされているので、そちらをご覧ください。

https://findy.connpass.com/event/328543/

課題

上に貼った私のスライドでは、コルーチンの使い所の一つとして、リアルタイムに流れてくる信号をモールス信号としてデコードする処理、および少しずつ値を送りつけるための Pubsub 関数を紹介していました。が、この関数には言語化しづらいもやもやを抱えていました。もっとシンプルで有用な形があるはずではないか、と。(特に、折角イテレータを使っているのにスライスに値を溜め込んでいるあたり)

参考:Pubsub関数の実装
type Subscriber[VIn, VOut any] func(iter.Seq[VIn]) iter.Seq[VOut]

type Publisher[VIn, VOut any] func(iter.Seq[VIn]) ([]VOut, bool)

func Pubsub[VIn, VOut any](subscriber Subscriber[VIn, VOut]) Publisher[VIn, VOut] {
	var in VIn
	var out []VOut

	coro := func(yieldCoro func(more bool) bool) {
		seq := subscriber(func(yieldSeq func(VIn) bool) {
			for {
				more := yieldSeq(in)
				if !yieldCoro(more) {
					break
				}
			}
		})

		for v := range seq {
			out = append(out, v)
		}
	}

	next, stop := iter.Pull(coro)
	return func(incoming iter.Seq[VIn]) ([]VOut, bool) {
		out = nil
		var more bool
		for v := range incoming {
			in = v
			more, _ = next()
			if !more {
				break
			}
		}
		if more {
			return out, true
		}
		stop()
		return out, false
	}
}

C#等のReactive Exensionsでは、少しずつイベント列を送りつけるとか、データ列を分岐させる処理がいい感じに書けます。これがGoのイテレータでもまんま実現できそうで出来なかった。

https://qiita.com/acple@github/items/6cfee916f09632037a6e

いろいろこねくり回しているうちに、ようやく納得の行く Push 関数ができたので、ここで発表したいというわけです。

Push関数の使い方

とりあえずはPush関数の使い方をご覧ください。 fmt.Scanln で値を入力してもらって、それを合計するだけの処理です。

func main() {
	// 1. iter.Seqを使って何かする関数を定義
	sum := func(src iter.Seq[int]) {
		sum := 0
		for v := range src {
			sum += v
			fmt.Printf("- sum: %d\n", sum)
		}
	}
	// 2. push関数に変換
	pushSum := Push(sum)

	for {
		var n int
		fmt.Scanln(&n)
		// 3. 少しずつ値を送りつける
		if !pushSum(n) {
			break
		}
	}
}
$ go run .
1
- sum: 1
2
- sum: 3
3
- sum: 6

というように、データを少しずつ、延々と iter.Seq として送りつける処理を書くことができます。

応用例:分配

iter.Pull を使うと xiter proposal にある通り、ZipやMergeなどの複数のイテレータを統合する処理を書くことができます。ですがその逆、「複数のイテレータへ送出する処理」がどうしても書けなくて、どういうわけなんだろうと長らく頭を捻っていました。

https://go.dev/issues/61898

結局のところ、そのような処理は Push によってスマートに実現できます。以下は sum から fizz, buzz 関数へイテレータの形で値を送りつけるようにしたものです。

 func main() {
+	fizz := func(src iter.Seq[int]) {
+		for v := range src {
+			if v%3 == 0 {
+				fmt.Println("- fizz")
+			}
+		}
+	}
+	pushFizz := Push(fizz)
+
+	buzz := func(src iter.Seq[int]) {
+		for v := range src {
+			if v%5 == 0 {
+				fmt.Println("- buzz")
+			}
+		}
+	}
+	pushBuzz := Push(buzz)
 
 	sum := func(src iter.Seq[int]) {
 		sum := 0
 		for v := range src {
 			sum += v
 			fmt.Printf("- sum: %d\n", sum)
+			// 両者に送信
+			if !pushFizz(sum) || !pushBuzz(sum) {
+				return
+			}
 		}
 	}
 	pushSum := Push(sum)
 
 	for {
 		var n int
 		fmt.Scanln(&n)
 		if !pushSum(n) {
 			break
 		}
 	}
 }
$ go run .
1
- sum: 1
2
- sum: 3
- fizz
3
- sum: 6
- fizz
4
- sum: 10
- buzz
5
- sum: 15
- fizz
- buzz
6
- sum: 21
- fizz

まあ、この例だとわざわざイテレータもPushも挟まず、直接fizz/buzz判定を呼べばいいんですが、イテレータが活きるシンプルな例を思いつかなかったのでこれで勘弁してください。

Push関数の実装

最後にPush関数の実装を貼っておきます。無限に値を送り出すイテレータと、値を一個送るたびに戻って来るコルーチンを組み合わせたイメージです。

func Push[V any](recv func(iter.Seq[V])) (push func(V) bool) {
	var in V

	coro := func(yieldCoro func(struct{}) bool) {
		seq := func(yieldSeq func(V) bool) {
			for yieldSeq(in) {
				yieldCoro(struct{}{})
			}
		}
		recv(seq)
	}

	next, stop := iter.Pull(coro)
	return func(v V) bool {
		in = v
		_, more := next()
		if !more {
			stop()
			return false
		}
		return true
	}
}

モールス信号デモ

冒頭のスライドでも公開していたモールス信号デモも、Push関数を使う形に直しておきました。

https://github.com/eihigh/morse-demo

// トンorツーをかなにデコードし、結果を追記する関数
jp := Push(func(symbols iter.Seq[symbol]) {
	for v := range decodeJP(symbols) {
		textJP += v
		fmt.Println(textJP)
	}
})

// トンorツーをAlphabetにデコードし、結果を追記する関数
en := Push(func(symbols iter.Seq[symbol]) {
	for v := range decode(symbols) {
		text += v
	}
})

// On/Offをトンorツーに変換し、デコーダに渡す関数
push = Push(func(states iter.Seq[bool]) {
	for v := range symbols(pulses(states)) {
		if !jp(v) || !en(v) {
			return
		}
	}
})
// 入力をpush
on := ebiten.IsMouseButtonPressed(ebiten.MouseButtonLeft) || ebiten.IsKeyPressed(ebiten.KeySpace)
push(on)

折角なのでついでに symbols(pulses(states)) って形で、「イテレータを取ってイテレータを返す」関数を何重にも組み合わせられるところも示しています。

おわりです

イテレータを引数にとって加工するような関数を作れば組み合わせ放題なので、Goのイテレータは最高ですね。

自作のPush関数ですが、正直実装に自信ないので間違いあったら教えて下さい。あとそもそもPush関数が本当に有用なのかってところから自信ないので、リアルタイムに流れてくるデータ列を扱うにあたって何か分かったことあればぜひ筆者に教えてください。

間違っていなければReactive Extensions的なことがスマートに実現できたことになるので、その方面でどう実用されているかを参考にしてGoで大活躍させられるんじゃないでしょうか!

Discussion