Zenn
💬

[Node.js] ループ内でstream.pipeline()を使ってはいけない

2025/03/21に公開

はじめに

仕事で以下のような仕様のコードを書いていました。

  • あるファイルを分割してダウンロード
  • ダウンロードしたファイルを逐次的に一時ファイルに保存
  • 最後に保存した一時ファイルを結合し、1つのファイルにする

これを実現するためにstream/promisespipeline()をループ内で使っていたところ、実行が途中で止まってしまうという問題が発生しました。
備忘録として原因と対策を記載しておきます。

問題のあるコード

import { createWriteStream, createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
import { tmpdir } from 'os';
import { join } from 'path';

const runPoC = async () => {
  const tempDir = tmpdir();
  const outputPath = join(tempDir, 'output.txt');

  // 1️⃣ダミーのチャンクファイルを作成
  const chunkPaths = Array.from({ length: 5 }, (_, i) => join(tempDir, `chunk${i}.txt`));
  chunkPaths.forEach((path, i) => {
    require('fs').writeFileSync(path, `Chunk ${i + 1}\n`);
  });

  // 最終的なファイルのストリーム
  const finalStream = createWriteStream(outputPath, { flags: 'w' });

  console.log('Starting file merging...');

  // 2️⃣チャンクファイルを結合
  for (const chunkPath of chunkPaths) {
    console.log(`Merging ${chunkPath} using pipeline...`);
    const readStream = createReadStream(chunkPath);

    // ループ内で pipeline を使用(これが問題を引き起こす)
    await pipeline(readStream, finalStream);
  }

  console.log('Merging completed!');
  finalStream.end();
};

runPoC().catch(console.error);

上に簡易的に再現した問題のあるコードを載せました。
1️⃣ではダウンロードしたファイルをチャンクごとの一時ファイルに保存する部分を再現しています。ここは重要ではないので解説等は省略します。
2️⃣では各一時ファイルのストリームを作成し、stream/promisespipeline()で書き込みを行っています。このコードを実行してみると理想的には実行結果が

Starting file merging...
Merging /tmp/chunk0.txt using pipeline...
Merging /tmp/chunk1.txt using pipeline...
Merging /tmp/chunk2.txt using pipeline...
Merging /tmp/chunk3.txt using pipeline...
Merging /tmp/chunk4.txt using pipeline...
Merging completed!

となるはずですが、実際には

Starting file merging...
Merging /tmp/chunk0.txt using pipeline...
Merging /tmp/chunk1.txt using pipeline...

で実行が止まってしまいます。
ドキュメントを見てみると、optionsend <boolean> End the destination stream when the source stream ends. Transform streams are always ended, even if this value is false. Default: true.とあります。
どうも第一引数のストリームが完了すると第二引数のストリームも閉じるようです。じゃあこいつをfalseにしてあげればいいのではないか?と思われるかもしれませんが、falseにしたところ MaxListenersExceededWarningが発生しました。すみません、この原因については色々調べたのですがよく分かりませんでした。一応ChatGPTは

コードで pipeline(readStream, finalStream, { end: false }) を ループ内で複数回 実行しているため、finalStream に対して複数のストリームが接続される ことになります。

pipeline は stream.pipeline(src, dest) の dest に error イベントリスナーを追加する 仕様になっています。
そのため、ループを回すたびに finalStream に新しい error リスナーが追加され、 デフォルトのリスナー上限 (10) を超えてしまい警告が出る わけです。

と言っていますが、ちょっとこれが合っているのかドキュメントを読んだ限りでは判然としませんでした。有識者がいればご教示頂きたいです。

修正したコード

以下のようにstream.pipe()end:falseで使ってあげると、メモリリークの警告も発生せずに問題なくループで処理を終えることができます。

import { createWriteStream, createReadStream } from 'fs';
import { tmpdir } from 'os';
import { join } from 'path';

const runPoC = async () => {
  const tempDir = tmpdir();
  const outputPath = join(tempDir, 'output.txt');

  // 1️⃣ダミーのチャンクファイルを作成
  const chunkPaths = Array.from({ length: 5 }, (_, i) => join(tempDir, `chunk${i}.txt`));
  chunkPaths.forEach((path, i) => {
    require('fs').writeFileSync(path, `Chunk ${i + 1}\n`);
  });

  const finalStream = createWriteStream(outputPath, { flags: 'w' });

  console.log('Starting file merging...');

  // 2️⃣チャンクファイルを結合
  for (const chunkPath of chunkPaths) {
    console.log(`Merging ${chunkPath} using pipe...`);
    const readStream = createReadStream(chunkPath);

    // `pipeline` を使わず、 `pipe` でストリームを繋げる
    await new Promise<void>((resolve, reject) => {
      readStream.pipe(finalStream, { end: false });
      readStream.on('end', resolve);
      readStream.on('error', reject);
    });
  }

  console.log('Merging completed!');
  finalStream.end();
};

runPoC().catch(console.error);

まとめ

とりあえずループ内で1つのWriteStreamを再利用する場合、pipeline()ではなくstream.pipe()を使ってあげるのが良さそうです。
Node.jsのドキュメント、中々読み方に慣れない……

Discussion

ログインするとコメントできます