🚿

streamは明示的に閉じておこう [Node.js]

2024/05/22に公開1

はじめに

こんにちは、Buzo(@buzou_muzou)です。

最近Node.jsで画像処理を行う処理を書きました。その際に、streamとpipeを使いました。
改めてドキュメントを読んでみると、autoCloseが有効な場合でもstreamを閉じる処理を明示的に書かないとリソースリークを起こしてしまう場合があることを知ったのでまとめてみました。

リソースリークが起きる可能性があるコード

以下のようなコードには問題があります。

import fs from "node:fs";

const readStream = fs.createReadStream('hoge.png');
const writeStream = fs.createWriteStream('fuga.png');
readStream
  .on("error",(err) => {
     console.log(err)
   })
  .pipe(writeStream)
  .on("error",(err) => {
     readStream.destroy()
     console.log(err)
   })

こちらのコードではhoge.pngという画像をストリームで読み込んで、fuga.pngにストリームで書き込みを行っています。また、writeStreamでエラーが起きたらreadStreamを閉じています。
このコードの問題は、readStreamでエラーが起きた場合にwriteStreamはcloseされずに残ってしまう点です。

Node.jsの公式ドキュメントを見てみると、下記のような記述が見つかりました。[1]

One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.

(↓DeepLで翻訳)
重要な注意点として、Readableストリームが処理中にエラーを発した場合、Writable宛先は自動的にクローズされない。エラーが発生した場合、メモリ・リークを防ぐために、各ストリームを手動で閉じる必要がある。

ドキュメントによると、readStreamでエラーが発生したときは、明示的にwriteStreamを閉じない限り、writeStreamは閉じられないとあります。

fs.createWriteStreamにはautoCloseというパラメータがあり、デフォルトでtrueになっています。そのことを知っていたので、明示的にwriteStreamを閉じなくてもstream元のreadStreamでエラーが起きればwriteStreamを閉じられると思っていました。
しかし、実際には全ての場合において空気を読んで自動的にwriteStreamを閉じてくれるわけではありませんでした。

error時にwriteStreamを明示的に閉じよう

上記を踏まえてエラー時にwriteStreamを閉じる処理を書くと以下のようになります。

import fs from "node:fs";
import sharp from "sharp";

const readStream = fs.createReadStream('hoge.png');
const writeStream = fs.createWriteStream('fuga.png');
readStream
  .on("error", (err) => {
    // writeStreamを明示的に閉じる
    writeStream.end();
    console.log(err)
  })
  .pipe(sharp().resize(50))
  .pipe(writeStream)
  .on("error", () => {
   readStream.destroy();
   console.log(err)
  })

実際にローカルで実験してみる

それぞれのパターンをローカルで実行し、それぞれstreamが閉じられているかを確認してみます。

1.readでエラーが起きるパターン

a.エラー時に明示的にwriteStreamを閉じる場合

コード
import fs from "node:fs";
// 空文字を指定
const readStream = fs.createReadStream("");
const writeStream = fs.createWriteStream("fuga.png");
readStream
	.on("error", (err) => {
        // writeStreamを明示的に閉じる
		writeStream.end();
		console.log(err);
	})
	.on("close", () => {
		console.log("📚Read stream closed🫡");
	})
	.pipe(writeStream)
	.on("error", (err) => {
		readStream.destroy();
		console.log(err);
	})
	.on("close", () => {
		console.log("📝Write stream closed🫡");
	});

実行結果
[Error: ENOENT: no such file or directory, open ''] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: ''
}
📝Write stream closed🫡
📚Read stream closed🫡

readStreamwriteStreamともに閉じられている。

b.エラー時に明示的にwriteStreamを閉じない場合

コード
import fs from "node:fs";
// 空文字を指定
const readStream = fs.createReadStream("");
const writeStream = fs.createWriteStream("fuga.png");
readStream
	.on("error", (err) => {
        // writeStreamを明示的に閉じない
		// writeStream.end();
		console.log(err);
	})
	.on("close", () => {
		console.log("📚Read stream closed🫡");
	})
	.pipe(writeStream)
	.on("error", (err) => {
		readStream.destroy();
		console.log(err);
	})
	.on("close", () => {
		console.log("📝Write stream closed🫡");
	});


実行結果
[Error: ENOENT: no such file or directory, open ''] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: ''
}
📚Read stream closed🫡

writeStreamは閉じられない。readStreamは閉じられている。

2.writeでエラーが起きるパターン

a.エラー時に明示的にreadStreamを閉じない場合

コード
import fs from "node:fs";

const readStream = fs.createReadStream("hoge.png");
// 空文字を指定
const writeStream = fs.createWriteStream("");
readStream
	.on("error", (err) => {
		writeStream.end();
		console.log(err);
	})
	.on("close", () => {
		console.log("📚Read stream closed🫡");
	})
	.pipe(writeStream)
	.on("error", (err) => {
        // readStreamを明示的に閉じる
		readStream.destroy();
		console.log(err);
	})
	.on("close", () => {
		console.log("📝Write stream closed🫡");
	});
実行結果
[Error: ENOENT: no such file or directory, open ''] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: ''
}
📝Write stream closed🫡
📚Read stream closed🫡

readStreamwriteStreamともに閉じられている。

b.エラー時に明示的にreadStreamを閉じない場合

コード
import fs from "node:fs";

const readStream = fs.createReadStream("hoge.png");
// 空文字を指定
const writeStream = fs.createWriteStream("");
readStream
	.on("error", (err) => {
		writeStream.end();
		console.log(err);
	})
	.on("close", () => {
		console.log("📚Read stream closed🫡");
	})
	.pipe(writeStream)
	.on("error", (err) => {
        // readStreamを明示的に閉じない
		// readStream.destroy();
		console.log(err);
	})
	.on("close", () => {
		console.log("📝Write stream closed🫡");
	});
実行結果
[Error: ENOENT: no such file or directory, open ''] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: ''
}
📝Write stream closed🫡

readStreamは閉じられない。writeStreamは閉じられている。

まとめ

readStream,writeStreamはお互いにエラー時に明示的にstreamを閉じる必要があることが分かりました。
長時間稼働するようなアプリケーションではリソースリークが重大な問題になってくるので注意が必要です。
当たり前ではありますが、日頃からちゃんとドキュメントを読んで実装していこうと思いました。

脚注
  1. https://nodejs.org/api/stream.html#choose-one-api-style ↩︎

コミューン株式会社

Discussion

ngicksngicks

Node.jsのstream回りって非常にややこしくてきちんと処理しようとすると大変ですよね。
私が理解する限り、stream.promises.pipelineはエラー時のリソース解放の面倒も見てくれるので大変便利です。

// node --version
// v20.5.0
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

const src = fs.createReadStream('example');
const dst = fs.createWriteStream('example.gz');

(async () => {
    src.on("close", () => { console.log("src closed") })
    dst.on("close", () => { console.log("dst closed") })
    let i = 0;
    await pipeline(
        src,
        zlib.createGzip(),
        async function* (source) {
            for await (const chunk of source) {
                i++
                if (i > 2) {
                    throw new Error("woow")
                }
                yield await chunk;
            }
        },
        dst,
    )
        .catch(console.error)
    /*
        Error: woow
            at /foo.js:18:27
            at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
            at async pumpToNode (node:internal/streams/pipeline:135:22)
    */


    console.log(`src destroyed = ${src.destroyed}, dst destroyed = ${dst.destroyed}`) // src destroyed = true, dst destroyed = true
    console.log(`src closed = ${src.closed}, dst closed = ${dst.closed}`) // src closed = false, dst closed = false

    await new Promise(r => setTimeout(r, 0)) // let them emit close event

    /*
        src closed
        dst closed
    */
    console.log(`src closed = ${src.closed}, dst closed = ${dst.closed}`) // src closed = true, dst closed = true
    src.removeAllListeners()
    dst.removeAllListeners()
})();

Node.jsはドキュメントの重複を避けるためか、昔からあったcallback版にしか詳細なドキュメントがないことがあって困ります。上記スニペットではpromises版を使っていますが以下はcallback版のドキュメントです。

https://nodejs.org/docs/latest/api/stream.html#streampipelinestreams-callback

stream.pipeline() will call stream.destroy(err) on all streams except:

  • Readable streams which have emitted 'end' or 'close'.
  • Writable streams which have emitted 'finish' or 'close'.

ソースを掘ってみましたが、destroyを呼ばれたらcloseされるので、前述のfd leakは起きません。

open/closeはPromiseで呼ばれるのでclosedになるのは少なくともmicrotask queueまで遅延します。

ここ以降は追いきれなかったですが(多分ここ以降はNode.jsランタイムかlibuv実装です)、closeのエラーはハンドルしてないかもですから、errorイベントは拾ったほうがいいかもしれません。
src.destroy(err)されるので、await stream.finished(src)したり、await events.once(src, "close")したりするとthrowしてしまうのでそこに注意が必要です。