最近良く書くGoのデータ処理パターン

2020/10/17に公開

概要

データ処理はどうしても for ループと配列の受け渡しになりがちなGoの処理を、最近はStreamふうに書くのが気に入っていますというメモ書き。

背景

シーケンスの処理は map() filter() reduce() で書くと良いよ、というのは古からよく言われる話ですが、Goだと原則 for ループで書く事が多いです。

func multiplyAll(data []int, multiplier int) []int {
	result := make([]int, len(data))
	for i, datum := range data {
		result[i] = datum * multiplier
	}
	return result
}

個人的にはこのGoの基本的なスタンスはとても好きで、とてもわかりやすいと感じています。

しかしこれでは、処理対象のすべてのデータがメモリに載ることになるため、効率を考えなくてはいけないシーンや、大量のデータを処理するシーンでは採用しかねることがあります。

個人的によく使うパターン

ということで、色々やりようはあると思うんですが、ここ数ヶ月よく使うパターンを…要はシンプルなStream処理なんですが、どのくらいシンプルでいいのか、といったところで悩みがちなので、メモ代わりに置いておきます。

肝となるinterfaceと基本オブジェクト定義

type DataType *** // 任意のデータ処理対象

// Sink : データを受け取って任意の処理をするInterface
type Sink interface {
	Pour(DataType) error
}

// ForkSink : データを受け取って複数のSinkにデータを分流させるSink
type ForkSink []Sink
func (s ForkSink) Pour(datum DataType) error {
	for _, downstream := range s {
		if err := downstream.Pour(datum); err != nil {
			return err
		}
	}
	return nil
}

// FilterSink : 条件を満たすデータのみ下流に流すSink
type FilterSink struct {
	Filter func(DataType) bool
	Downstream Sink
}
func (s *FilterSink) Pour(datum DataType) error {
	if s.Filter(datum) {
		return s.Downstream.Pour(datum)
	}
	return nil
}

// ModifySink : データを加工するSink
type ModifySink struct {
	Modify func(*DataType) error
	Downstream Sink
}
func (s *ModifySink) Pour(datum DataType) error {
	if err := s.Modify(&datum); err != nil {
		return err
	}
	return s.Downstream.Pour(datum)
}

基本はこれだけです。

実用上の Sink

とはいえ基本のオブジェクト群だけあっても当然何にもならないので、いろいろな Sink 実装を用意することになります。

例えばこれをデータベースに入れるための Sink を書いてもいいでしょう。

type DBSink struct {
	DB *sql.DB
}
func (s *DBSink) Pour(datum DataType) error {
	return s.DB.Exec("INSERT INTO data (id, value) (?, ?)", datum.ID, datum.Value)
}

途中でログを出したいとか

type LogSink log.Logger
func (s *LogSink) Pour(datum DataType) error {
	s.Printf("accepted a datum: %#v", datum)
	return nil
}

テストなどで使うためにメモリ上に結果をおいておきたいだとか

type SliceSink []DataType
func (s *SliceSink) Pour(datum DataType) error {
	*s = append(*s, datum)
	return nil
}

というように、必要な Sink を作っておけば、あとはこれをつなげるだけです。

sink := ForkSink{
	LogSink(*log.New(...)),
	&FilterSink{
		Filter: func(datum DataType) bool {
			return datum.Value % 2 == 0
		},
		Downstream: ForkSink{
			&SliceSink{},
			&DBSink{DB: ...},
		},
	},
}	

こうしてみるとまァほとんどStreamとかPipelineのそれですね。
データの流れがそのまま構造化されるので、結構わかりやすくかけて便利です。

補足

名前

変に既存の名前を変えすぎかもしれないです。

  • SinkじゃなくてStreamとかPipeとかでいいかもしれません
  • PourじゃなくてPutとかでいいかもしれません
  • DownstreamじゃなくてNextで十分かもしれません

などなど。ただ Sink は短くて気に入ってます。
Sink なんて名前にしたら、 Pour にしたいし Downstream にしたいじゃん!世界観的に!

Q: 処理対象のデータに種類がたくさんあるんだけど?

処理対象のデータの型ごとにこれを用意するのは 手で書いてたら アホみたいに大変です。
スニペットでも用意しとくのが利口でしょう。Generator作っても良いかもしれません。

ジェネリクス?牛刀割鶏ですねェ。

逆のInterfaceを切っても良いかも

これをやってるとたまに「すべてのorある一定のデータが揃った状態で、後処理を書きたい」みたいな本来の要件に矛盾したケースも出てくるんですが、その場合ためたデータを順次吐き出す(よくあるIteratorよりシンプルな)Interfaceを切るというのも手かもしれません。

// Drainage : データを与えられた関数に一個ずつ吐き出すInterface
type Drainage interface {
	Drain(func(DataType) error) error
}

ただこちらは活用イメージが今のところないので使ってません。
コレが求められるようなシーンではfor文を素直に書いてるほうが(たぶん)健全です。

Discussion