🦜

Goのbyteストリーミング処理

2021/03/31に公開
1

Goにはioパッケージにまつわるbyte単位のストリーミング処理機能が素敵なので、その良さを紹介してみようと思います。

io.Reader/io.Writerについて

ioパッケージにてReaderとWriterのインターフェースは以下のようになっています。

type Reader interface{
	Read(b []byte) (sz int, err error)
}
type Writer interface{
	Write(b []byte) (sz int, err error)
}

読める書けるだけのシンプルな定義です。このシンプルさが素敵なんです。

派生インターフェース

  • io.Seeker
  • io.Closer
  • io.ReadSeeker
  • io.ReadSeekCloser
  • io.WriterSeeker
  • io.ReadWriteSeeker
  • io.ReadCloser
  • io.WriteCloser
  • io.ReadWriteCloser

これらは実装側に必要なパラメータの最小インターフェースを規定するのに使います。
例えばあるテキストパーサー処理Parserが引数にリードとシーク機能が必要なら。
以下のように引数の型を規定します。

func Parser(r io.ReadSeeker) error {...}

このとき、欲張って機能を内包しすぎないことが肝心です。
io.ReadSeekCloserまで要求すると渡せる引数の範囲が狭まります。
もっと言えばio.Readerで受け取って内部でbytes.NewReaderでラップしてあげればSeekableになります。メモリ効率の問題でこうはできない場合もありますが、必要になるまでは境界に現れるインターフェースは極力シンプルなものに留める方が良いでしょう。

つまり、io.Reader,io.Writerで済むようなインターフェースを使うようにしましょう。

バッファ付き

バッファ付きのReaderやWriter実装があります。
バッファサイズはデフォルトで4KiBです。

import "bufio"
reader := bufio.NewReader(r)
writer := bufio.NewWriter(w)

バッファ付きにすることでバッファなしにくらべ、システムコール回数を抑えることができます。さらにbufio.ReaderPeekUnreadなどの追加機能により、デリミタ探しなどスキャン系の処理が行いやすくなっていて、スキャン用のラッパーbufio.Scannerの定義もあります。

標準入力の読み取りなどは以下のように書くと良いでしょう。

s  := bufio.NewScanner(os.Stdin)
lines := []string{}
for s.Scan() {
	lines = append(lines, s.Text())
}

この方法は標準入力から大量のデータを読み込むときもバッファサイズ単位で読み込むので逐一読む方法に比べシステムコールの頻度を抑えることができ、効率的に動作します。

Readerラッパー

  • io.LimitReader
  • io.MultiReader
  • io.TeeReader

Writerラッパー

  • io.MultiWriter

その他のラッパー

  • io.NopCloser

io.Copyの挙動

定義は以下のようになっています。

io.Copy(dst io.Writer, src io.Reader) (written int64, err error)

通常の使い方としては呼び出すとdstsrcのどちらかがEOFに到達するまでブロック(ループでsrcからReadしてdstにWriteを繰り返す)します。ただし、以下の条件を満たすと宣言的に論理接続する機能があります。

  • srcがio.WriteToインターフェースをサポートする場合、src.WriteTo(dst)が呼ばれた後、io.Copy呼び出しは即時完了する。
  • dstがio.ReadFromインターフェースをサポートする場合、dst.ReadFrom(src)が呼ばれた後、io.Copy呼び出しは即時完了する。

前者はsrcに書き込みが行われるとき、同時にdstにも書き込みを行います。後者はdstから読み出そうとするとき、srcから読み出した内容を返します。

条件を満たす満たさないに関係なく以下のように書くとEOFまでのコピーが走ります。

go io.Copy(dst, src)

これもgoroutineの仕組みのおかげですね!

io.Pipeの挙動

r, w := io.Pipe()

wに書いた内容がrから読めます。アンバッファなので、rとwは必ず異なるgoroutineで扱う必要があります。同じgoroutineからの場合、書き込みも読み出しも完了せずデッドロックします。

leftR, rightW := io.Pipe()
rightR, leftW := io.Pipe()
l := &struct{io.Reader;io.WriteCloser;}{leftR, leftW}
r := &struct{io.Reader;io.WriteCloser;}{rightR, rightW}

以上のようにするとlとrは双方向接続された擬似Conn相当になります。例えばnet.Connインターフェースを満たすものを作るのに使えます。

ほぼ似た用途として使えるos.Pipe()というものもありますが、これはOSの提供機能を利用するものです。こちらはほとんどの環境でバッファ付きです。閉じずに同時に利用できる数には限界があります。(OSの1プロセスに許容されるファイルオープン数制限)

テクニック集

標準エラー出力とファイル双方にログを出力

fp, err := os.OpenFile("access.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err  != nil {...}
log.SetOutput(io.Multiwriter(os.Stderr, fp))
log.Print("Hello!") // -> os.Stderrと"access.log"に出力される

ReaderとWriterからReadWriteCloserを合成

ReadWriteCloserが要求されているところにreader,writerを渡す方法。

&struct {
	io.ReadCloser
	io.Writer
}{
	io.NopCloser(reader),
	writer,
}

巨大イメージのメタ情報の事前参照と読み込みの両立

image.DecodeConfigは画像ストリームのヘッダを解釈して画像のサイズやカラーフォーマット、画像コンテナフォーマットなどを取得できます。

しかし、一旦image.DecodeConfigすると「画像ストリームのヘッダ」部分を読み出してしまうのでrをそのままimage.Decodeに渡してもヘッダが欠損したストリームしか読めません。

ストリームがio.Seekerを満たすものであればシークで先頭まで巻き戻してやることで目的を果たすことはできます。しかし、ローカルファイルとかであれば可能ですが、圧縮ストリームやリモートで受け取るようなストリームの場合はシーク出来ないものがほとんどです。メモリに余裕があればストリーム全体を一旦bytesバッファに読み込むことでシーカブルなストリームは簡単に用意できますが、画像のストリームが巨大であればメモリの無駄遣いになっていしまいます。

そこで「画像ストリームのヘッダ」の読み込み時に「画像ストリームのヘッダ」だけを複製するのに「io.TeeReader」を使います。これは読み出された内容を第二引数のWriterに書き出すという動作を行います。そうして得られた「画像ストリームのヘッダ」のコピー「header」と残りのストリーム「r」をまとめて一つのストリームとして扱うために「io.MultiReader」を使います。

サンプル: https://play.golang.org/p/jdUNNqQ7uvc

import (
	_ "image/png"
	_ "image/jpeg"
	_ "image/gif"
	"bytes"
	"image"
	"io"
)

func Load(r io.Reader) (image.Image, error) {
	header := bytes.NewBuffer(nil)
	conf, format, err := image.DecodeConfig(io.TeeReader(r, header))
	if err != nil { ... }
	... // confやformatを参照した処理
	return image.Decode(io.MultiReader(header, r))
}

Writer経由でチャンク採取

ストリーミング処理などでチャンクを参照したいときなどはio.Writerを自作すると簡単。

type ChanWriter chan []byte
func (cw ChanWriter) Write(b []byte) (int, error) {
	cw <- b
	return len(b), nil
}

ch := make(chan []byte)
go func() {
	io.Copy(ChanWriter(ch), src)
	close(ch)
}()
for b := range ch {
	// チャンク受け取りごとの処理
}

io.Copyはデフォルトで最大32Kバイトのチャンクになる。
チャンクサイズをコントロールしたいなら「io.Copy」の代わりに「io.CopyBuffer」を使おう。
「ChanWriterへの書き込み処理」と「chの読みだし」を並行処理するのがキモ。
必要に応じてどっちをgoroutineに載せるか両方載せるかは任意に選べるのがGo言語の良いところ。

標準入出力でJSON-RPCサービス

jsonrpc.ServeConnのパラメータはio.ReadWriteCloserなので先述のテクニックを利用して以下のように書けます。

package main

import (
	"fmt"
	"io"
	"log"
	"net/rpc"
	"net/rpc/jsonrpc"
	"os"
)

type Server struct{}

func (s *Server) Greet(req string, res *string) error {
	*res = fmt.Sprintf("hello %q", req)
	return nil
}

func main() {
	if err := rpc.Register(&Server{}); err != nil {
		log.Fatal(err)
	}
	jsonrpc.ServeConn(&struct {
		io.ReadCloser
		io.Writer
	}{
		io.NopCloser(os.Stdin),
		os.Stdout,
	})
}
% echo '{"id":1, "method":"Server.Greet", "params":["foo"]}' | go run .
{"id":1,"result":"hello \"foo\"","error":null}

こういったプロセスはChrome拡張などでシステム側との疎通につかわれていたりします。

まとめ

  • Goではほとんどのストリーム処理がシンプルで共通なインターフェースで実装されています。
  • なのでストリーム処理は簡単に結合できます。
  • 標準入出力をWebSocketに中継なんてのもgoroutine併用で容易に実装できてしまいます。
  • 圧縮や解凍をしつつ何かの処理やストリームにフィルタなど応用範囲はとにかく広い。
  • いろいろ試してみてね。

Discussion