Open4

Node.jsのReadableStreamを複製する方法

oosawyoosawy

graphql-uploadからgraphql-upload-minimalにライブラリの乗り換えで、ファイルがディスクに保存されずメモリ上に配置されるように変わりReadableStreamを複数回取得することができなくなったので、複製する方法を考える

oosawyoosawy

軽く調べたりドキュメントを流し読みしてみても、Node.js標準にはそのようなAPIが見つからなかったので、Node.jsのissuesから関連しそうなものを探してみると、このようなコードを見つけた

// https://github.com/nodejs/readable-stream/issues/202#issuecomment-211244715
var fs = require('fs')
var stream = require('stream')
var contents = fs.createReadStream('./bigfile') // greater than 65KB
var stream1 = contents.pipe(new stream.PassThrough())
var stream2 = contents.pipe(new stream.PassThrough())

stream1.on('data', function (data) { console.log('s1', data.length) })
stream1.on('end', function () {
  stream2.on('data', function (data) { console.log('s2', data.length) })
})

ただよく読んでみるとReadableStreamからpipeで複製された2つのstreamのうち1つだけが完了してから、2つめのstreamを読みだしていて、メモリ上にファイルの内容全てのバッファーがないとこれは動作しないように感じたので、少し動作検証してみる

まず適当なファイルを読み込ませて実行するとs1 2990 s2 2990が同時に出てきた。これは渡したファイルサイズと一致している。次に数MBあるファイルを入力してみるとs1 65536だけ出力されて終了した。65536はstreamが処理したバイト数なので計算してみるとちょうど64KBだったので、おそらくコメントで書かれているgreater than 65KBはこれより大きいファイルということ分かった

ファイルが64KBより大きいときに片方だけ読みだすと処理が止まるということは、内部バッファーのサイズがちょうど64KBで、複数回pipeした場合は全てのpipe先に配り終えてから内部バッファーをクリアして、また上流から読みだしている動作だと考えた

ついでにこの64KBがどこから来ているかも調べてみるとfs.createReadStreamのhighWaterMarkというオプションの初期値だったので、ReadableStream自体にバッファーサイズが決まっているのではなくてstreamごとに決めることができそうだということも分かった

oosawyoosawy

Node.jsのstreamが複数pipeされたときに全てのpipe先を考慮してくれることを踏まえて考えると、データが処理され始めてから新たにpipeされたとしても(それまでの全データを保持してない限りは)完全なデータを渡すことができないのでpipeするタイミングも大事だと考えて、検証してみると2つめのpipeをsetTimeoutで3msを指定して遅延させるとs1のログが実行ごとに出たり出なかったりするようになることが分かった

そしてs2が出る場合は直前のs1と同じサイズになっていて、そこからs1と同じログが最後まで出ているので、おそらく動作としてはpipeされた段階で処理された部分から始まっていることが観察できた

var fs = require("fs");
var stream = require("stream");
var contents = fs.createReadStream("./file"); // greater than 65KB
var stream1 = contents.pipe(new stream.PassThrough());

stream1.on("data", function (data) {
  console.log("s1", data.length);
});

setTimeout(() => {
  var stream2 = contents.pipe(new stream.PassThrough());
  stream2.on("data", function (data) {
    console.log("s2", data.length);
  });
}, 3);
log
> node stream.js
s1 65536
s1 65536
s1 5902

> node stream.js
s1 65536
s1 65536
s2 65536
s1 5902
s2 5902