🧐

Node.jsのstreamのしくみまとめ

2021/12/20に公開

streamっていろんな場面で出てきますが(HTTPリクエスト読むとか)、実務で出会ったのは大きなファイルを読む時にメモリに一度に読めないので、ちょっとずつ読みたい…というときでした。.NETで見た気がするけど、これはどう使うんじゃ…と思ってまとめたメモです。特にstreamをcloseしないの?とか、どういう時に使えるの?みたいな使い方の概念的な部分にややつまったのでそのあたりを中心に。

いつ使うのか

JavaScriptでreadfileをするとファイル全体をメモリにのせて、オブジェクトに書き込む形になり、メモリリークをするかパフォーマンスが激遅になるかが問題になるんです…。ということで

  • メモリに乗り切らないとき
  • 時間的にすぐにデータを処理しはじめたいとき

に使います。データを小分けに順番に扱うので、同じようなデータをどっかに流したいだけ(順次読みこんで、順次書き込む)とか、配列のような順序の決まったものに対して逐次計算をしていくとかの時に使います。オブジェクトが1つでメモリに乗り切らんみたいなとき(そんなことあるか?)は使えないので注意。

streamについての説明

streamの種類

  • Readable:読み込み可能なデータストリーム
  • Writable:書き込み可能なデータストリーム
  • Duplex:読み込み可能かつ書き込み可能なデータストリーム(net.Socketとか)
  • Transform:読み書きされたデータの変更や変換をするストリーム

Readのしくみ

  1. ストリームのしくみ
    ストリームは内部バッファにデータを格納します。Readableストリームだとstream.push(chunk)が呼ばれるとデータがバッファリングされて、stream.read()を呼ぶと読み出せます。バッファがいっぱいになるとデータの読み込みが止まります。

Readableストリームはデータが読めるようになった時に、EventEmitterAPIを使っています。下記のようにdataイベントが発生するとchunkを読めます。

hogestream.on('data', (chunk) => {
    huga += chunk;
 });
  1. readのモード
    例えばReadableストリームはreadable._readableState.flowingという状態をもっていて、trueのときはデータを生成してイベントを発行します。falseのときはデータはバッファに充填します。dataイベント、pipeイベント、resumeメソッドなどでこの値がtrueに変わります。pauseメソッド、unpipeメソッドでfalseになります。つまり、
  • flowing mode:勝手にデータを読んでくれるモード
    • 勝手に読まれているデータをlistenして、`on('data')'で拾っていくイメージ
  • paused mode:stream.read()で読んでいく
    • readableStream.read()を自力で読んでいく。他の言語のstream系でやる、一行ずつreadしてnullならcloseに近い

ちなみに、flowing modeの場合、Readableストリームで読み込むデータがなくなるとendイベントが発行され、ストリームは自動で閉じられます。閉じられるタイミングはon('end')で拾えますし、自力でstream.closeみたいなことはする必要がないようです(エラーの場合はこの限りではない)。

  1. モードの切り替え
    最初はpaused modeではじまります。
  • on('data')をよぶ
  • stream.resume()をよぶ
  • stream.pipe()をよぶ

のどれかをすると、readable._readableState.flowingがfalseからtrueに変わって、flowing modeになります。

逆に、paused modeに切り替えるには

  • stream.pause()をする
  • stream.unpipe()ですべてのパイプを削除する

とデータが止まります。

覚えるべきこと

先ほど書いたイベント類はnode.jsのドキュメントにあります。どんなイベントがあるかどうかと、基本的な使い方を覚えればstreamを使うことはできると思います。基本的な使い方は以下がよさそう。
https://qiita.com/masakura/items/5683e8e3e655bfda6756

また、ストリームを読むときのイディオムとして

let moji = '';
for await (const chunk of readable) {
	moji += chunk;
}

こうすると巨大なファイルから読んで文字列に格納していく…ということができます。

とりあえず全体像を知るには以下のチートシートを眺めるとよさそうです。
https://devhints.io/nodejs-stream

参考文献

ざっと流れを知りたい
https://qiita.com/masakura/items/5683e8e3e655bfda6756

細かいことを知りたい
https://nodejs.org/api/stream.html

https://nodesource.com/blog/understanding-streams-in-nodejs

実例をみたい
https://zenn.dev/tyaahan/articles/b36cf74c971d29

Discussion