ハンズオンNode.jsを進める【第3章 EventEmitterとストリーム】

EventEmitterとストリーム
- 非同期プログラミングで扱った非同期処理は、処理の要求とその結果が常に一対一であった。
- では、1回の要求に対して結果が複数回発生するような非同期処理はどのように実装すれば良いか。
- Promiseインスタンスは一度settled状態になったらそれ以降状態が変化しないため、そのような処理を実装するのには向いていない。
- この章ではこの種の非同期処理を実現するためにNode.jsでどのような仕組みが用意されているのかを付加ぼる

3.1 ObserverパターンとEventEmitter
- イベントループで起動するWebサーバー(Nginx)が接続を受け付ける処理は、1回の要求に対して結果が複数発生する非同期処理と考えられる
- 要求とはWebサーバーの起動
- 結果とは、起動したサーバーに対するHTTPリクエスト

Node.jsではhttpという標準モジュールを使うと、Webサーバーを移動できる。
このhttpモジュールはObserverパターンを使ってこの要件を実現している。
Observerパターン
Observerパターンはデザインパターンの一つで、このパターンでは監視対象(Subject)に対して発生した何らかのイベントが、監視役(Observer)に逐一通知される。
ObserverはあらかじめSubjectに対して監視を行うための登録処理を行い、Subjectはイベント発生のタイミングで登録済みのObserverに対して通知処理を実行する。それぞれのSubjectには複数のObserverを登録できる。
Node.jsではObserverパターンはEventEmitterによって実装される。
EventEmitter自体はObserverパターンにおけるSubjectとして機能する。EventEmitterに対するObserverは一般的にリスナと呼ばれるため、以降はリスナという表現を用いる。

Nodeの世界では、EventEmitterインスタンスがSubject(監視対象)であって、リスナがObserver(監視役)なのか

httpモジュールで実際にWebサーバーを起動するコード
EventEmitterインスタンスのon()というメソッドにイベント名とコールバックを渡すことで、リスナの登録を行っている(onがSubjectmov対して監視を行うためのメソッドであって、このコールバックがリスナである)。リスナが受け取る引数はイベントによって異なる。
const http = require("http")
// サーバオブジェクト(EventEmitterインスタンス)の生成
// こいつが要はSubject(監視対象)。リスナがObserver(監視役)
// ObserverはあらかじめSubject(NodeではEventEmitterのインスタンス)に対して監視を行うための登録処理を行う
// Subjectはイベント発生のタイミングで登録済みのObserver(リスナ)に対して通知処理を実行する
// Subjectには複数のObserver(リスナ)を登録できる
const server = http.createServer()
// requestイベントのリスナ登録
server.on("request", (request, response) => {
// レスポンスを返す
response.writeHead(200, { "Content-Type": "text/plain"})
response.write("Hello, World!")
response.end()
})
// listening(リクエストの受付開始)イベントのリスナ登録
server.on("listening", () => {
// ...
})
// errorイベントのリスナ登録
server.on("error", err => {
// ...
})
// close(リクエストの受付終了)イベントのリスナ登録
server.on("close", () => {
// ...
})
// サーバーの起動
server.listen(8000)

processオブジェクトもprocess.onでハンドリングできるので、つまり、processオブジェクトもEventEmitterインスタンスである。

アプリケーションコードでもEventEmitterを使い、任意のイベントを定義してObserverパターンを実装できる。EventEmitterはeventsモジュールに定義されている。

emitは「発行する」という意味の他動詞

このコードだとstartイベントに登録したリスナが発火されない。
おそらくasync関数の中で実行した同期的なemit("start")の処理の方が、イベントの登録より早く実行されているせいである。
(リスナを登録する前にイベントを発火させているのが問題)
const events = require("events")
// この実装には一部問題がある
function createFizzBuzzEventEmitter(until) {
// これがサブジェクト
const eventEmitter = new events.EventEmitter()
// これはreturnが実行された後に実行される
// async関数自体は非同期処理
_emitFizzBuzz(eventEmitter, until)
return eventEmitter
}
// async/await構文が使えるよう、イベントを発行する部分を別の関数に切り離す
// startイベントが条件にかかわらず発行されないのは、createFizzBuzzEventEmitter()の中で
// emit("start")が常に同期的に実行されるためである
// onメソッドで登録したリスナは、登録前に発行されたイベントに対しては呼ばれない
async function _emitFizzBuzz(eventEmitter, until) {
// emitは指定したイベント(第一引数)を指定した引数(第二引数以降、任意)で発行する
eventEmitter.emit("start")
let count = 1
while (count <= until) {
// 0.1秒刻み
await new Promise(resolve => setTimeout(resolve, 100))
if (count % 15 === 0) {
eventEmitter.emit("FizzBuzz", count)
} else if (count % 3 === 0) {
eventEmitter.emit("Fizz", count)
} else if (count % 5 === 0) {
eventEmitter.emit("Buzz", count)
}
count += 1
}
eventEmitter.emit("end")
}
function startListener() {
console.log("start");
}
function fizzListener(count) {
console.log("Fizz", count);
}
function buzzListener(count) {
console.log("FizzBuzz", count);
}
function fizzBuzzLister(count) {
console.log("FizzBuzz", count);
}
function endListener() {
console.log("end");
// EventEmitterの主要なインスタンスメソッドの中でemit以外のメソッドはそのメソッドを持つ
// EventEmitterインスタンスを返すため、メソッドチェーンが可能になっている
// リスナに渡す関数の中ではthisによってEventEmitterインスタンスにアクセスできる
// emit()メソッドの戻り値はそのイベントに対するリスナが登録されているかどうかを示すbooleanです
// offは指定したイベントに登録されたリスナを削除する
// 以下の処理では全てのイベントからリスナを削除している
this
.off("start", startListener)
.off("Fizz", fizzListener)
.off("FizzBuzz", fizzBuzzLister)
.off("end", endListener)
}
// Buzzイベントだけonceで登録
// onceはonと同様にイベントに対するリスナを登録するが、このリスナはイベントが1回発行されたら、自動的に削除され
// 2回目以降のイベントでは実行されない
// createFizzBuzzEventEmitter(40)
// .on("start", startListener)
// .on("Fizz", fizzListener)
// .once("Buzz", buzzListener)
// .on("FizzBuzz", fizzBuzzLister)
// .on("end", endListener)
// startイベントで登録したリスナが実行されない
createFizzBuzzEventEmitter(40)
.on("start", startListener)
.on("end", endListener)
ハンドリング不可能なイベントを発行しても意味がないので、イベントの発行が非同期的になるように実装を修正する
// この実装には一部問題がある
function createFizzBuzzEventEmitter(until) {
// これがサブジェクト
const eventEmitter = new events.EventEmitter()
// これはreturnが実行された後に実行される
// async関数自体は非同期処理
// この関数の下にconsole.logを書けばわかる
// ただ、同期的に呼び出しているから、イベント自体は関数が終了した関数が終了した後に同期的に発行されると思われる
// _emitFizzBuzz(eventEmitter, until)
// イベントの発行を常に非同期にするため、process.nextTick()を使用
// process.nextTickがスケジュールしたコールバックは、それを実行するための特定のフェーズを持たない
// 代わりにこのコールバックは特定のフェーズに属さないnextTickQueueと呼ばれるキューにつまれ、現在実行中の処理の完了後
// すぐ(イベントループが次のフェーズへ進む前に)実行される。このため、フェーズを跨がなければ実行されないsetTimeoutのコールバックより
// 早く実行される
process.nextTick(() => _emitFizzBuzz(eventEmitter, until))
return eventEmitter
}

ここで覚えておくべき原則は、EventEmitterインスタンスの生成処理の中で同期的にイベントを発行してはならないということ。
あと、EventEmitterの同期、非同期に関してもう一つ注意すべきことは、リスナが常に同期的に実行されるということ。
const events = require("events")
const fooEventEmitter = new events.EventEmitter()
fooEventEmitter.on("foo", () => {
console.log("fooイベントリスナの実行");
})
console.log("fooイベント発行", fooEventEmitter.emit("foo"));
// =>
// fooイベントリスナの実行
// fooイベント発行 true
コールバックパターンを見慣れていると、この結果は不自然に感じる。
直感的には、console.logが同期的に実行された後に、リスナが非同期的に実行されそうである。しかし、実際にはEventEmitterインスタンスは登録されたリスナを同期的に実行するため、後者は前者よりも先に実行される。

EventEmitterとメモリリーク
1つのEventEmitterインスタンスに11個以上のリスナを登録すると警告が出力される。
これは使い終わったリスナの削除漏れを原因としたメモリリークが発生しがちだからである。
Node.jsはGCを備えているので、使われなくなった変数は自動的にメモリから削除される。しかし、EventEmitterインスタンスに登録したリスナは、EventEmitterインスタンスの内部にリスナの参照が残り続ける(つまりリスナが消えない)ので、注意が必要である。
const events = require("events")
const fooEventEmitter = new events.EventEmitter()
for (let i = 0; i <= 11; i++) {
fooEventEmitter.on("foo", () => {
console.log("fooイベントリスナの実行");
})
}
console.log("fooイベント発行", fooEventEmitter.emit("foo"));
// =>
// node main.js
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベントリスナの実行
// fooイベント発行 true
// (node:23632) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 foo listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit
// (Use `node --trace-warnings ...` to show where the warning was created)
↓ 使われなくなった変数が自動的にGCによって消されている
node
Welcome to Node.js v18.9.1.
Type ".help" for more information.
> {
... const a = 1
... }
undefined
> a
Uncaught ReferenceError: a is not defined
> const a = 1
undefined
> a
1
>
↓ EventEmitterインスタンスに登録したリスナは、EventEmitterインスタンスの内部にリスナの参照が残り続ける(つまりリスナが消えない)
node
Welcome to Node.js v18.9.1.
Type ".help" for more information.
> const emitter = new events.EventEmitter();
undefined
> emitter;
EventEmitter {
_events: [Object: null prototype] {},
_eventsCount: 0,
_maxListeners: undefined,
[Symbol(kCapture)]: false
}
> emitter.on("foo", () => console.log("fooイベントリスナの実行"));
EventEmitter {
_events: [Object: null prototype] { foo: [Function (anonymous)] },
_eventsCount: 1,
_maxListeners: undefined,
[Symbol(kCapture)]: false
}
> emitter.listeners("foo");
[ [Function (anonymous)] ]
>
> emitter
EventEmitter {
_events: [Object: null prototype] { foo: [Function (anonymous)] },
_eventsCount: 1,
_maxListeners: undefined,
[Symbol(kCapture)]: false
}
> .editor
// Entering editor mode (Ctrl+D to finish, Ctrl+C to cancel)
{
emitter.on("message", () => console.log("hello"))
}
EventEmitter {
_events: [Object: null prototype] {
foo: [Function (anonymous)],
message: [Function (anonymous)]
},
_eventsCount: 2,
_maxListeners: undefined,
[Symbol(kCapture)]: false
}
> emitter.listeners("message")
[ [Function (anonymous)] ]
> emitter
EventEmitter {
_events: [Object: null prototype] {
foo: [Function (anonymous)],
message: [Function (anonymous)]
},
_eventsCount: 2,
_maxListeners: undefined,
[Symbol(kCapture)]: false
}
EventEmitterインスタンスに登録された特定のリスナを取得するには、listeners(eventName)メソッドを使って確認できる。

このようなケースでリスナをGCの対象とするには、EventEmitterインスタンス自体がGCの対象となるか(つまり、どこからも参照されなくなる)、明示的にリスナを削除する必要がある。
これをうっかりしていると、リスナが削除されることなく蓄積していって、メモリを不必要に圧迫してしまう恐れがある。

デフォルトでは登録されたリスナが10を超えるとこの警告が出る。リスナの数はsetMaxListenersで増やせる

そうか、EventEmittterってNode.jsでオブサーバーパターンを実現するためのものなのか。

オブザーバパターンのメリットは、
- SubjectがObserverで行われる詳しい実装を知らなくて良い
- Subjectはどんな具体的なObserverを登録しているのかを知らなくて良い(抽象的なObserverだけ知っていれば良い。ゆえにSubjectと複数のObserverは疎結合になり、拡張性がある
- 新しいObserverを追加したり、既存のObserverを変更しても、Subjectを変更する必要がない(SubjectはObserverの具体を知らなくても良いから。Subjectは複数のObserverの共通の性質を取り出して抽象化した抽象のインターフェースさえ知っていれば良い)。

エラーハンドリング
コールパックパターンによる非同期関数の実装では、コールバックの第一引数にエラーを渡すという規約があったが、EventEmitterにはそのような規約はない。
それぞれのイベントのリスナは、そのイベントに関する情報だけをEventEmitterインスタンスから受け取る。代わりにEventEmitterではerrorという名前のイベントによってエラーを伝搬するという規約がある。errorイベントはNode.jsに特別なイベントとして扱われ、リスナが存在しない状態でこのイベントをemit()するとエラーが投げられる。
const events = require("events")
try {
// リスナが存在しない状態でこのイベントをemitすると、エラーが投げられる
new events.EventEmitter().emit("error", new Error())
} catch (err) {
console.log("catch"); // => catch
}
errorいけたを登録するコードのコメントアウトを解除してから再度実行すると、errorイベントとして表示されてcatchは表示されなくなる。errorイベントにリスナを登録したことで、errorイベントのemit()がエラーを投げなくなったためである
const events = require("events")
try {
// リスナが存在しない状態でこのイベントをemitすると、エラーが投げられる
const emitter = new events.EventEmitter()
emitter.on("error", (err) => console.log("エラーです", err))
emitter.emit("error", new Error("オエラ"))
} catch (err) {
console.log("catch"); // => catch
}
// =>
// エラーです Error: オエラ
// at Object.<anonymous> (/Users/yuuki_haga/repos/node/node-practice/chapter_3/src/main.js:7:25)
// at Module._compile (node:internal/modules/cjs/loader:1119:14)
// at Module._extensions..js (node:internal/modules/cjs/loader:1173:10)
// at Module.load (node:internal/modules/cjs/loader:997:32)
// at Module._load (node:internal/modules/cjs/loader:838:12)
// at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
// at node:internal/main/run_main_module:18:47

uncaughtExceptionイベントによるプロセスの停止を防ぎたければ、errorイベントをemit()しうるEventEmitterインスタンスに常にerrorイベントリスナを登録すべきである。

EventEmitterの継承
EventEmitterの利用方法は大きく2パターンに分類できる。
- createFizzBuzzEventEmitter()の例のように直接newして使うパターン
- EventEmitterを継承したclassを作成するパターン
createFizzBuzzEventEmitter()はEventEmitterを継承したclassで次のように実装できる。
const events = require("events")
function startListener() {
console.log("start");
}
function fizzListener(count) {
console.log("Fizz", count);
}
function buzzListener(count) {
console.log("FizzBuzz", count);
}
function fizzBuzzListener(count) {
console.log("FizzBuzz", count);
}
function endListener() {
console.log("end");
// EventEmitterの主要なインスタンスメソッドの中でemit以外のメソッドはそのメソッドを持つ
// EventEmitterインスタンスを返すため、メソッドチェーンが可能になっている
// リスナに渡す関数の中ではthisによってEventEmitterインスタンスにアクセスできる
// emit()メソッドの戻り値はそのイベントに対するリスナが登録されているかどうかを示すbooleanです
// offは指定したイベントに登録されたリスナを削除する
// 以下の処理では全てのイベントからリスナを削除している
this
.off("start", startListener)
.off("Fizz", fizzListener)
.off("FizzBuzz", fizzBuzzListener)
.off("end", endListener)
}
class FizzBuzzEventEmitter extends events.EventEmitter {
async start(until) {
this.emit("start")
let count = 1
while (true) {
if (count % 15 === 0) {
this.emit("FizzBuzz", count)
} else if (count % 3 === 0) {
this.emit("Fizz", count)
} else if (count % 5 === 0) {
this.emit("Buzz", count)
}
count += 1
if (count >= until) {
break
}
await new Promise(resolve => setTimeout(resolve, 100))
}
this.emit("end")
}
}
const emitter = new FizzBuzzEventEmitter()
emitter
.on("start", startListener)
.on("Fizz", fizzListener)
.on("Buzz", buzzListener)
.on("FizzBuzz", fizzBuzzListener)
.on("end", endListener)
.start(20)
このケースでは、EventEmitterインスタンスの生成がコンストラクタで行われ、start()メソッドとは分離しているため、process.nextTick()のような処理がなくても問題ない。
httpモジュールがそうであるように、Node.jsコアのAPIではEventEmitterを継承するパターンが多用されている。

eventsモジュールのonceメソッドはPromiseインスタンスが得られる。
EventEmitterのonメソッドとは違うので注意。

ストリーム
Node.jsのノンブロッキングI/Oはイベントループで高いパフォーマンスを出すためには必須である。
例えばファイルの内容を読み込む際には、バックグラウンドでファイルを読み込んでいる間Node.js自体は次のタスクを実行でき(別リクエストとか)、時間のかかるI/Oのタスクによる影響を受けない。しかし、ファイルが非常に大きくなると、それだけでは不十分な場合がある。

くっそでかい5GBくらいのファイルを用意。
ll
total 10582032
drwxr-xr-x 7 yuuki_haga staff 224 9 18 16:13 .
drwxr-xr-x 4 yuuki_haga staff 128 9 18 16:11 ..
-rw------- 1 yuuki_haga staff 5417992192 5 8 2021 Centos.vdi
drwxr-xr-x 3 yuuki_haga staff 96 9 18 13:58 listener
-rw-r--r-- 1 yuuki_haga staff 302 9 18 16:08 main.js
-rw-r--r-- 1 yuuki_haga staff 1150 9 18 16:09 sample.txt
drwxr-xr-x 3 yuuki_haga staff 96 9 16 20:44 server
const fs = require("fs")
function copyFiles(target, dest, callback) {
// ファイルの読み込み
fs.readFile(target, (err, data) => {
if (err) {
return callback(err)
}
// 読み込んだ内容を別のファイルに書き出す
fs.writeFile(dest, data, callback)
})
}
// このsample.txtはなく処理が終了する(sample.txtは1150バイト)
copyFiles("./sample.txt", "copy_files/sample.txt", (err, result) => {
console.log("コピーが完了しました");
})
// => コピーが完了しました
// Centos.vdiは5GB以上あるので、エラーで終了する。
copyFiles("./Centos.vdi", "copy_files/Centos.vdi", (err, result) => {
if (err) {
console.error("エラーが発生しました", err)
return
}
console.log("コピーが完了しました");
})
// =>
// node main.js
// エラーが発生しました RangeError [ERR_FS_FILE_TOO_LARGE]: File size (5417992192) is greater than 2 GiB
// at new NodeError (node:internal/errors:393:5)
// at FSReqCallback.readFileAfterStat [as oncomplete] (node:fs:349:11) {
// code: 'ERR_FS_FILE_TOO_LARGE'
// }
Node.jsはファイルを読み込む時に、バッファメモリ(メモリ内にあるデータを一時的に保存する場所)にファイルのデータを展開する。デフォルトでは2GBがNode.jsで指定されたバッファメモリのサイズなので、そのメモリサイズを超えたファイルを読み込もうとしたからこのエラーが出た。問題はこれだけではなくて、扱えるバッファサイズの範囲内であっても、大きなファイルの読み込み結果全体がメモリ上に展開されれば、メモリが圧迫されてアプリケーションのパフォーマンスに影響が出る。最悪の場合、利用可能なメモリが枯渇して、Node.jsのプロセス自体が停止してしまう恐れがある。

上のコードは、あるファイルを別のファイルとしてコピーする関数。
上のエラーで、ファイルの全ての読み込みを完了させる(メモリ上にファイルデータを展開する)のは不可能であることが分かった(不可能ではなくても、メモリが圧迫される問題がある)。
これを防ぐ別のやり方としては、読み込みが完了した部分から順次ファイルに書き込んでいくような処理を実現することである。(部分的に小さくメモリに展開して、その部分的に展開したデータをファイルに書き込む)
これを可能にするのがストリームである。

fs.ReadFileを使う際の問題点
- バッファの制約
- 仮にバッファの制約を通ってもメモリを圧迫させる危険性がある
ストリームとは、データの流れを扱うためのインターフェースである。 効率的な処理を実現できることに加え、このインターフェースを介して複数の機能を容易に結合できることも強みである。
このため、ストリームはNode.jsにおける標準的なインターフェースとして広く用いられている。

上のcopyFile関数をストリームを使って書き直す。
// ストリームを使って、データを送信してようが関数を利用している人にとってはどうでも良い。
// 関数を利用する人の一番の目的(ファイルのコピー)できてればよいから。この関数を使う人は関心がないs
// 具体的なやり方はどうでも良いから。抽象に関心がある。インターフェースってそういうもん。
// 車を動かしたい人からしたら、車のキーを回してエンジンがつけば良いって話。エンジンが電気エンジンンなのかガソリンで動いているのかどうでも良い。
// 使う人に具体的な実装を意識させない抽象的なインターフェースが良い。車のエンジンと一緒。
function copyFiles(target, dest, callback) {
// // ファイルの読み込み
// fs.readFile(target, (err, data) => {
// if (err) {
// return callback(err)
// }
// // 読み込んだ内容を別のファイルに書き出す
// fs.writeFile(dest, data, callback)
// })
// ストリームでやる
// ファイルから読み込みストリームを生成
fs.createReadStream(target)
.pipe(fs.createWriteStream(dest)) // ファイルから書き込みストリームを生成して、pipe()で繋ぐ
.on("finish", callback) // 完了時にコールバックを呼び出す
}
fs.createReadStream()は、引数のファイルから少しずつ内容を読み込み、「読み込みストリーム」を生成する。
fs.createWriteStream()は、読み込みストリームの内容を引数のファイルに書き出す「書き込みストリーム」を生成する。
両者は読み込みストリームのpipe()メソッドで連結できる。
ストリームでやったら、5GBのファイルもコピーできた!
node main.js
コピーが完了しました
コピーが完了しました

読み込みストリームや書き込みストリームはEventEmitterのインスタンスである。
pipe()で繋げられたストリーム同士が各種のイベントを介して連携し、データが受け渡されている。書き込みストリームは、書き込み完了時にfinishというイベントを発行する。

ストリームを使えば、いろんな処理(暗号化処理や圧縮処理)をpipeで繋げられる。これがストリームの魅力。
Node.jsには次のようなストリームが存在する
- 読み込みストリーム
- 書き込みストリーム
- 二重ストリーム
- 変換ストリーム(二重ストリームの一種)

メモリリーク
メモリリークとは、メモリ領域に不要なデータが残り続けてしまうことです。
(メモリ領域が不要になっても解放されないこと)
メモリリークが発生すると、時間経過とともにメモリ使用量が徐々に増加していき、以下の問題が起こる可能性があります。
-
アプリケーションの速度低下・レイテンシの増大
-
アプリケーションのクラッシュ
-
メモリ利用量が必要以上に増加し、クラウドの利用コストの増大

バッファメモリとは
バッファメモリとは、データが 1 つの場所から別の場所に転送する際に、データを一時的に保持するメモリストレージ領域のことをいいます。
バッファメモリの目的は、「遅い装置と早い装置の間に入って速度差を吸収すること」 である。
例えばプリンタの印刷速度はコンピュータの通信速度より遅いため、コンピュータからデータを受信しながら印刷を進めると、印刷が間に合わず、取りこぼすデータが出てしまう。これを防ぐためにプリンタ内部にバッファメモリを用意しておいて、受信したデータを溜めておき、順番に印刷するという制御が行われる。
ここまでで参考にした記事

読み込みストリーム
読み込みストリームは、pipe()によって接続する書き込みストリーム、二重ストリーム、変換ストリームで読み込み可能なデータを生成する。
読み込みストリームは、読み込みが可能になると、「readable」イベントを発行します。
そのリスナの中で「read」メソッドによるデータの読み込みが可能である。また、読み込みが完了した際にはendというイベントを発行する。
const fs = require("fs")
const readStream = fs.createReadStream("./sample.txt")
function handleReadStream() {
readStream
.on("readable", () => {
// readableイベントを発行する
console.log("readable");
let chunk;
// 現在読み込み可能なデータを全て読み込む
// チャンクは、送信や処理中のデータの一部のデータのこと
while((chunk = readStream.read()) !== null) {
console.log(`chunk: ${chunk.toString()}`);
}
})
.on("end", () => console.log("end")) // endイベントリスナの登録
}
module.exports = {
handleReadStream,
}
// =>
// node main.js
// readable
// chunk: The standard chunk of Lorem Ipsum used since the 1500s is reproduced below for those interested. Sections 1.10.32 and 1.10.33 from "de Finibus Bonorum et Malorum" by Cicero are also reproduced in their exact original form, accompanied by English versions from the 1914 translation by H. Rackham.
// Where can I get some?
// There are many variations of passages of Lorem Ipsum available, but the majority have suffered alteration in some form, by injected humour, or randomised words which don't look even slightly believable. If you are going to use a passage of Lorem Ipsum, you need to be sure there isn't anything embarrassing hidden in the middle of text. All the Lorem Ipsum generators on the Internet tend to repeat predefined chunks as necessary, making this the first true generator on the Internet. It uses a dictionary of over 200 Latin words, combined with a handful of model sentence structures, to generate Lorem Ipsum which looks reasonable. The generated Lorem Ipsum is therefore always free from repetition, injected humour, or non-characteristic words etc.
// 5
// paragraphs
// words
// bytes
// lists
// Start with 'Lorem
// ipsum dolor sit amet...'
// readable
// end
ファイルサイズが大きくなると、ファイルの内容は複数回のreaableイベントに分かれて入ってくる。結果として、fs.readFile()ではエラーを引き起こしてしまう2GB以上の大きさのファイルでも、ストリームを使った実装では問題なく読み込める。

自前の読み込みストリームを実装するには、stream.Readableを継承して、_read()メソッドを実装します。
const stream = require("stream")
class HenaReadableStream extends stream.Readable {
constructor(options) {
// スーパークラスのコンストラクタ呼び出している
super(options)
// 読み込みストリームに対して、あらかじめ流されているデータ
this.languages = ["JavaScript", "Python", "Java", "C#"]
}
// _read()はNode.jsのストリームの内部実装の中で実行されることを想定したものであり、
// 決して外部から直接実行してはならない。readableイベントリスなの中でそうしているように、外部からのストリームの読み込みを実行したい場合は、read()を使う
_read(size) {
console.log("_read()");
let language
while ((language = this.languages.shift())) {
// Array.prototype.pushではなく、readable.push
// push()でこのストリームからのデータを流す
// ただし、push()がfalseを返したら、読み込みストリームはそれ以上データを流さない
if (!this.push(`Hello, ${language}!\n`)) {
console.log("読み込み中断");
return
}
}
// データを全て流し終わったら、最後にnullを流してストリームの終了を通知する
console.log("読み込み完了");
this.push(null)
}
}
module.exports = {
handleReadStream,
HenaReadableStream,
}
const read = require("./stream/read")
const stream = new read.HenaReadableStream()
stream
.on("readable", () => {
console.log("readable");
let chunk
while((chunk = stream.read()) !== null) {
console.log(`chunk: ${chunk.toString()}`);
}
})
.on("end", () => console.log("end"))
_read()はNode.jsのストリームの内部実装の中で実行されることを想定したものであり、決して外部から直接実行してはならない。readableイベントリスナの中でそうしているように、外部からのストリームの読み込みを実行したい場合は、read()を使う

いつか見る

書き込みストリーム
書き込みストリームは、読み込みストリームから流れてきたデータを受け取るストリームである。書き込みストリームにはwrite()でデータを流し、全てのデータを流し終えたらend()を実行する。
これらのメソッドを使って、ファイルへの書き込みを試してみる
const fs = require("fs")
function buildWriteStream() {
return fs.createWriteStream("dest.txt")
}
function handleWriteStream() {
writeStream = buildWriteStream()
// 書き込みストリームにはwrite()でデータを流し
writeStream.write("Hello\n")
writeStream.write("World\n")
// 全てのデータを流し終えたらend()を実行する。
writeStream.end()
}
module.exports = {
handleWriteStream,
}
const fs = require("fs")
function buildWriteStream() {
return fs.createWriteStream("dest.txt")
}
function handleWriteStream() {
writeStream = buildWriteStream()
// 書き込みストリームにはwrite()でデータを流し
writeStream.write("Hello\n")
writeStream.write("World\n")
// 全てのデータを流し終えたらend()を実行する。
writeStream.end()
}
module.exports = {
handleWriteStream,
}
読み込みストリームのpushメソッドと書き込みストリームのwrite()メソッドは、どちらもその戻り値で「それ以上のデータを流せるかどうか」を返します。 これはストリームの持つバックプレッシャという仕組みと関係している。

二重ストリームと変換ストリーム
「二重ストリーム」とは、読み込みと書き込みの両方が可能なストリームである
Node.jsコアの中では、netモジュールのSocketクラスというTCPソケットを表現するクラスが二重ストリームの例である。外部とデータのやり取りを行うような機能が、読み込みストリームと書き込みストリームのインターフェースを合わせ持つのは分かりやすい。
二重ストリームの中でも、読み込んだデータを変換して下流に流す(書き込む)ストリームが「変換ストリーム」である。

pipe
pipeは読み込みストリームのメソッドである
(読み込みストリームのメソッドってことは書き込みストリームのメソッドでもある)

エラーハンドリングとstream.pipeline()
各種ストリームはEventEmitterのインスタンスなので、errorイベントによりエラーを通知する。
ストリームを使用する際にはこのエラーを適切にハンドリングすべきである。
ストリームのエラーハンドリングで注意すべきなのは、pipe()がエラーを伝播しないことである。
const fs = require("fs")
fs.createReadStream("no-such-file.txt")
.pipe(fs.createWriteStream("dest.txt"))
.on("error", err => console.log("エラーイベント", err.message))
// =>
// node main.js
// node:events:491
// throw er; // Unhandled 'error' event
// ^
// Error: ENOENT: no such file or directory, open 'no-such-file.txt'
// Emitted 'error' event on ReadStream instance at:
// at emitErrorNT (node:internal/streams/destroy:151:8)
// at emitErrorCloseNT (node:internal/streams/destroy:116:3)
// at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
// errno: -2,
// code: 'ENOENT',
// syscall: 'open',
// path: 'no-such-file.txt'
// }
// Node.js v18.9.1
エラーがerrorイベントリスナによって処理された場合は「エラーイベント」という文言が出力されるはずなので、この出力はエラーがerrorイベントリスナをすり抜けたことを示している。
fs.createReadStream("no-such-file.txt")が投げるエラーを処理するには、この読み込みストリームに直接errorイベントリスナを登録する必要がある。
const fs = require("fs")
fs.createReadStream("no-such-file.txt")
.on("error", err => console.log("エラーイベント", err.message))
.pipe(fs.createWriteStream("dest.txt"))
.on("error", err => console.log("エラーイベント", err.message))
// =>
// node main.js
// エラーイベント ENOENT: no such file or directory, open 'no-such-file.txt'
今度は登録したイベントリスナで期待通りのエラーを処理できている。
それぞれのストリームに対してerrorイベントリスナを登録するのは、pipe()により連結するストリームが増えるほど苦痛に感じられることはいうまでもない。
Promiseチェーンの場合は最後のcatch()にエラーハンドリングをまとめられるのと比べると、とても冗長。
あとは、下流のストリームがエラー終了しても、上流のストリームが生き続けるので、メモリリークのデメリットがある。これを回避するには、下流のストリームのerrorイベントリスナの中で明示的に上流のストリームを破棄しなければならないが、それはエラーハンドリングがより冗長になることを意味する。
こうした問題に対して、Node.js v10から、読み込みストリームのpipe()メソッドの代わりにstream.pipeline()を使ってストリームを連結するのを推奨したそう
const fs = require("fs")
const stream = require("stream")
// stream.pipeline()は2つ以上のストリームを引数に取り、それらをpipe()で連結する
// 連結したストリームのどこかでエラーが発生した場合、最後の引数として渡したコールバックがそのエラーを引数に実行される
// エラーの発生なくストリームが終了した場合、コールバックが引数なしで実行される
// いずれの場合でも、引数に渡したストリームは全て自動的に破棄される(コンピュータリソースであるメモリのメモリリークの心配がない)
stream.pipeline(
// pipe()したい2つ以上のストリーム
fs.createReadStream("no-such-file.txt"),
fs.createWriteStream("dest.txt"),
// コールバック
err => err
? console.error("エラー発生", err.message)
: console.log("正常終了")
)
// =>
// node main.js
// エラー発生 ENOENT: no such file or directory, open 'no-such-file.txt'

ストリームの異常終了とstream.finished()
読み込みストリームは、読み込み完了時にendイベント
書き込みストリームは、書き込み完了時にfinishイベントを発行する。
何らかの理由でこれらのイベントやerrorイベントが発行されないままストリームが終了することがある。そうした状況を考慮したいなら、ストリームの終了をハンドリングするstream.finishedを使う