🐥

EventEmitterとストリーム

2023/02/22に公開

概要

筆者は、Node.jsを使用して開発しているのですが、まだまだ知識不足や知らないことばかりだと日々の業務で痛感させられます。そこで学習した内容をメモ変わりとして書いていこうと思います

EventEmitterとストリームとは?

有名な非同期プログラミングでは、同期的なプログラミングと比べ処理を効率することができる場合がありますが、処理の要求とその結果が常に1対1でした。
この場合、複数回のリクエスト結果で完結する処理には非同期処理は使えません。
そこでNode.jsでは、1回の要求に対して結果が複数回発生するような非同期処理を実現するためにEventEmitterとストリームという仕組みが用意されています。
このページでは、以降EventEmitterとストリームについて書いていきます。

EventEmitterの概要

JavaScriptに限らず幅広くいろいろなプログラミング言語で使用されているデザインパターンの一つで、Observerパターンとゆうものがあります。
Observerパターンとは、監視対象(Subject)に対して発生したなんらかのイベントが発生した際に、監視役(Observer)に逐一通知されるパターンを指します。
Observerは、あらかじめSubjectに対して監視を行うための登録処理を行い、Subjectはイベント発生のタイミングで登録済みのObserverに対して通知処理を実行します。
Node.jsではObserverパターンを使用しEventEmitteを実装している。
EventEmitter自体はSubjectとして機能し、Observerは一般的にリスナと呼ばれる。

//サーバオブジェクト(EventEmitterのインスタンス)の作成
const http = require('http')
const server = http.createServer()

// Requestイベントのリスナ登録 
server.on('request',(req,res)=>{
  // レスポンスを返す処理
})s

// listening(リクエスト受付開始)イベントのリスナ登録
server.on('listening',()=>{
  // ...
})

// サーバー起動
server.listen(8000)

ここでは、EventEmitterインスタンスのon()というメソッドにイベント名とコールバックを渡すことで、リスナの登録を行なっている。イベントにより受け取る引数の内容が異なる。
EventEmitterを直接インスタンスを作成することもあるが、httpモジュールに限らず、Node.jsコアのAPIではEventEmitterのインターフェースが頻繁に用いられている。

EventEmitterの基礎

EventEmitterの主要なメソッド

  • on(event, listener)
    • 指定したイベントに対する新しいリスナを登録する
  • once(event, listener)
    • イベントが1回発行されると自動的に削除され、2回目以降のイベント発行では実行されない
  • off(event, listener)
    • 指定したイベントに登録されたリスナを削除する、
  • emit(event[...args])
    • 指定したイベントを指定した引数で発行する。

emit()以外のメソッドは、EventEmitterインスタンスを返すため、メソッドチェーンが可能となっている。
またリスナに渡す関数の中では、thisによってEventEmitterインスタンスにアクセスできる。emit()イベントの戻り値は、そのイベントが登録されているかのどうかを示すbooleanです。

EventEmitterとメモリリーク
1つのEventEmitterインスタンスに11個以上のイベントリスナを登録すると警告が出力される。

const barEventEmitter = new events.EventEmitter()
for(let i = 0; i < 11; i++){
  barEventEmitter.on('bar',() => console.log('bar')
}

EventEmitterを利用する際に使用しなくなったリスナの削除漏れを原因としたメモリーリークが発生しがちなためデフォルトで警告が出力されるようになっている。EventEmitterインスタンスにリスナを登録した場合、処理を終えても削除されないのでEventインスタンス自体を削除するか、明治的にリスナを削除する必要がある。
11個以上登録したい場合、作成しいたインスタンスメソッドのsetMaxListenersに登録したい数を引数にすると警告が起きずに登録することができる。

EventEmitterのエラーハンドリング
EventEmitterでは、「error」という名前のイベントによってエラーを伝播するという規約が存在する。

const events = require('events')
try{
  new events.EventEmitter()
    //'error'イベントリスナの登録をコメントアウト
    // .on('error',err => console.log('errorイベント')
    .emit('error',new Error('エラー'))
}catch(err){
  console.log('catch')
}

このファイルを実行すると「ctach」と表示され、errorイベントのコメントを消すと「error」イベントと表示される。

EventEmitterの使用方法
EventEmitterの利用方法は大きく2パターンに分類できる。
直接newしてインスタンスを利用するパターンとEventEmitterを継承したclassを作成するパターンが存在する。

ストリームの概要

ストリームは、node.jsのノンブロッキングI/Oのイベントループで高いパフォーマンスを出すために必須のものです。例えばファイルを読み込む処理を例にすると..

function copyFile(src,dest,cd){
  // ファイルの読み込み
  fs.readFile(src,(err,data) => {
    if(err){
      return cd(err)
    }
    // 読み込んだファイルを別ファイルに書き出す
  })
}

上記の処理では、一度にファイルの中身を全て読み込んだ後に別のファイルに書き出す処理を行なっています。
読み込んだファイルの容量が大きくなければ問題になりませんが、ファイルサイズが大きくなるとメモリが圧迫されパフォーマンスに影響が出てしまいまいす。
読み込みが完了した部分から順次書き込んでいくような処理が行えれば、メモリの圧迫もなく処理を進めることができます。ストリームでは、このような処理を行うことができ役に立ちます。

ストリームの基本

上記copyFileをストリームで書き直すと次のようになる。

function copyFileWithStream(src,dest,cd){
  // ファイルから読み込みストリームの作成し、少しづつ読み込んでいく
  fs.createReadStream(src);
    // ファイルから書き込みストリームを生成し、pipe()で繋ぐ
    .pipe(fs.createWriteStream(dest))
    // 完了時にコールバックを呼び出す
    .on('finish',cd)
}

読み込んだ分を書き込み処理を行い、全ての処理が終わった際に'finish'のログを出力している。
またストリームを使用していると読み込み処理にファイル内容を暗号化するといった機能追加が非常に用意となります。

fs.createReadStream('src.txt')
 // 暗号化処理の追加
 .pipe(crypto.createHash('sha256'))
 .pipe(fs.createWriteStream('dest.txt'))
 .on("finish",() => console.log('コピー完了'))

各ストリームについて

Node.jsには以下のストリームが存在する。

  • 読み込みストリーム
  • 書き込みストリーム
  • 二重ストリーム
    • 変換ストリーム(二重ストリームの一種)

各ストリームを実装する際は、基底ストリームクラスメソッドを使用するパターンと、基底ストリームクラスを継承する方法が存在する

読み込みストリーム
読み込みストリームを実装する際は、stream.Reableを継承してクラスを実装する方法と基底クラスの使用する方法が存在する。今回は継承する方法を記載する。

class HelloReadablesStream extends stream.readable {
  constructor(options){
    super(options)
    this.languages = ['JavaScript','Python','Java','C#']
  }
  
  _read(size){
    console.log('_read()')
    let langugae;
    while((languge = this.languagues.shift())){
      // push()データを流す
      // ただしpush()がfalseを返したらそれ以上流さない
      // _readの中ではpush()メソッドでこのストリームからデータを流す役割をもていて、
      // それ以上のデータを流すことができるかbooleanで返す
      if(!this.push(`Hello, ${languge}!\n`)){
        console.log('読み込み中断')
	return
      }
    }
    // 最後にnullを流してストリームの終了を通知する
    console.log('読み込み完了')
    this.push(null)
  }

  // インスタンスクラスを作成し、実際にデータを読み込む
  const helloReadableStream = new HelloReadableStream()
  helloReadableStream.on('readble',()=>{
    console.log('readable')
    let chunk;
    while((chunk = helloReadableStream.read()) !== null){
      console.log(`chunk: ${chunk.toString()}`)
    }
  })
  .on('end', () => console.log('end'))
}

// 実行結果
// 読み込み完了
// Hello, chunk, JavaScript
// Hello, chunk, Python
// ...
// end

書き込みストリーム
読み込みストリームから流れてきたデータを受け取るストリーム。
writeでデータを流し込み、終えたらendを実行する。

ファイルへの書き込み
const fileWriteStream = fs.createWriteStream('dest.txt')
fileWriteStream.write('Hello\n')
fileWriteStream.write('Wolrd\n')
fileWriteStream.end()
fs.readFileSync('dest.txt','utf8')

wrirteメソッドは、戻り値で「それ以上値を流せるのかどうか」を返している。これはバックプレッシャという仕組みと関連している。(読み込みストリームではpushが該当)。
書き込みストリームも読み込みストリーム同様に継承して実装する方法が存在する。

書き込みストリームを継承して実装パターン
class DelayLogStream extends stream.Writable {
  // objectMode: trueを指定するとオブジェクトをデータとして流せる。
  super({objectMode: true,...options})
  
  _write(chunk,encodeing,callback){
    console.log('_write()')
    // messageプロパティ(文字列),delayプロパティ(数値)を含むオブジェクトが
    // データとして流れてくることを期待
    const { message, delay } = chunk
    // delayで指定した時間(ミリ秒)だけ遅れてmessageをログに出す
    setTimeout(() => {
      console.log(message)
      callback()
    },delay)
  }
} 


const delayLogStream = new DelayLogStream()
delayLogStream.write({message: 'Hi', delay:0})
// > Hi

二重ストリームと変換ストリーム
二重ストリームとは、読み込みと書き込みの両方が可能なストリーム。
書き込みストリームにあたるのが変換ストリームです。
実装方法は、継承する方法が存在する。

二重ストリームを継承して実行
class LineTransforrmStream extends stream.Transform {
  // 上流かっら受け取ったデータのうち、下流に流していない分を保持するフィールド
  remaining = ''
  constructor(options){
    // push()にオブジェクトを渡せるようにする
    super({ readableObjectMode: true, ...options })
  }
  
  _transform(chunk, encoding, callback){
    console.log('_transform')
    const lines = (chunk + this.rremaining).split(/\n/)
    // 最後の行は次に入ってくるデータの先頭と同じ行になるため、変数に保持
    this.reamaining = lines.pop()
    for(const line of lines){
      // ここではpush()の戻り値はきにしない
      this.push({ message: line, delay: line.length * 100 })
    }
    callback()
  }
  
  _flush(callback){
    console.log('_flush()')
    // 残っているデータを流し切る
    this.push({
      message: this.remaining,
      delay: this.remaining.length * 100
    })
    callback()
  }
}

const lineTransformStream = new LineTransformStream()
lineTransformStream.on('readable', () => {
  let chunk;
  while(chunk = lineTransformStream.read() !== null){
    console.log(chunk)
  }
})

lineTransformStream.write('foo\nbar');
// > { message:'foo', delay: 300 }

pipe()による連結
上記に記載してきたプログラムでは、読み込みストリームでは「readable」、書き込みストリームでは「write」を使ってデータを書いていたが、実際のアプリでは、これらは直接使わずになるべくpipe()またはpipeline()を使用してストリームを連結することが推奨されている。

new HelloReadableStream()
  .pipe(new LineTransformStream())
  .pipe(new DelayLogStream())
  .on("finish", () => console.log('完了'))

エラーハンドリング
各種ストリームはEventEmitterのインスタンスなので、errorイベントによりエラーを通知しますが、pipe()はエラーを伝播しません。ここは注意をすべきところです。さらに下流のストリームでエラーが発生した場合、そのストリームは上流のストリームから自動的に切り離されますが、上流のストリームは生きつづけメモリリークにつながってしまうことがあります。
こうした問題を解決するためにNode.jsのv10からpipe()の代わりにpipeline()を使用するメソッドが提供されました。

stream.pipeline(
  fs.createReadStream('no-such-file.txt'),
  fs.createWriteStream('dest.txt'),
  err => err ? console.err('エラー発生 ') : console.log('正常終了')
)

pipeline()は2つ以上のストリームを引数にとり、それらをpipe()で連結しまうす。エラーやストリームが以上終了した場合、最後の引数のコールバック関数が実行される。
この形はPromiseのcatchと似ていて実装しやすいと思う。

まとめ

ここまでEventEmitterとストリームについて学習した内容を記載して行ったが、まだまだ内容が足りない箇所や誤っている箇所があるかもしれない。なので、こちらの記事は勉強した内容を常にブラッシュアップしていこうと思います。

参考資料

Discussion