ストリームAPIを使いこなす

2023/04/07に公開

Web標準にストリームAPIというのがあります。
https://developer.mozilla.org/ja/docs/Web/API/Streams_API

fetchのレスポンスボディなどもストリームAPIが使われていますが、ちゃんと理解して使ってないなーと思ったので、調べてみました。

なお、Node.jsにもストリームAPIがあるのですが、今回はWeb標準のほうのストリームです。一旦。

ストリームAPIとは

データを逐次的に読み込みながら処理したいときに利用するAPIです。
例としては、メチャ大きなCSVファイルがあるときにメモリを節約するために1行ずつ読み込んで処理をする、などで利用できます(ブラウザではあまりないパターンかもしれませんが)。

ストリームAPIの構成要素

ストリームAPIには ReadableStream, TransformStream, WritableStream の三つの要素があります。
それぞれ、データソースからデータを読み出してストリームとして提供する、ストリームから受け取ったデータを変換してストリームとして提供する、ストリームから受け取ったデータをデータソースに書き込む、という役割を持っています。

TransformStreamはReadableStreamとWritableStreamの両方の性質を持っています。型情報は正確ではないと思いますが、ざっくり以下のような定義だと思ってよいと思います。

type TransformStream = ReadableStream & WritableStream

ストリームAPIを組み合わせて、データを流す

データをコピーする例

例えば、ファイルをコピーしたいとき、以下のように読み出しと書き込みを繋げて書くことができます。

// 読み込み用ファイル
const src = await Deno.open("./sample.txt");
// 書き込み用ファイル
const dst = await Deno.open("./copy.txt", { create: true, write: true });
// pipeTo(WritableStream) で書き込みを行う
await src.readable
  .pipeTo(dst.writable);

データを変換しつつ保存する例

間に変換を挟みたいならば、TransformStreamを使いましょう。

await src.readable
  .pipeThrough(someTransformStream) // pipeThrough(TransformStream)
  .pipeTo(dst.writable);            // pipeTo(WritableStream)

たとえば、Shift-JISで書かれたファイルをUTF-8に変換してファイルに保存する、などが考えられます。以下はその例実装です。

await sjisFile.readable // ファイルのReadableStreamはUint8Arrayを返すので・・・
  .pipeThrough(new TextDecoderStream("shift-jis")) // TextDecoderStreamでShift-JISエンコードのデータを文字列に変換し・・・
  .pipeThrough(new TextEncoderStream()) // TextEncoderStreamで文字列をUTF-8でエンコードしたUint8Arrayに変換し・・・
  .pipeTo(utf8File.writable); // ファイルに書き込む

もちろん、ストリームを使わず以下のように書いてもいいですが、データサイズが大きい場合には問題がありそうですね。

const binary = await Deno.readFile("./sjis.txt"); // Uint8Arrayのデータ
const textData = new TextDecoder("shift-jis").decode(binary); // Shift-JISでデコードしたテキスト
const utf8Binary = new TextEncoder().encode(textData); // UTF-8エンコードされたバイナリデータ
await Deno.writeFile("./utf8.txt", utf8Binary);

ファイルサイズが1GBあるとすると、変数 binary のサイズは1GBですし、 textData も1GB近くになります。 utf8Binary も1GB近くあることになります。この3行だけで、理論上メモリを3GB近く圧迫することになります。(ガベージコレクタが賢ければ、1GBで済むかもしれません。)

先の例では、ファイルを逐次読み出し、逐次変換し、逐次書き込む、という流れになります。したがって、必要なメモリはごく少量で済みます。(チャンクのサイズはファイルのReadableStreamの場合64KB)

ストリームAPIから直接データを読み出す

ReadableStreamはデータを読み出す機能を持っています。WritableStreamを使用する以外には以下の2通りの方法でデータを読み出すことができます。

  • for await of
  • readerからread()する

ReadableStreamはAsyncIterableでもあるため、 for await of でデータを読み出すことができます。

for await (const chunk of readableStream) {
  console.log(chunk);
}

あるいは、ストリームからreaderを取得して、 read() でデータを読み出すことができます。読み出したデータはdoneとvalueというプロパティを持つオブジェクトで、done = trueならばデータが尽きたことを示します。

const reader = readableStream.getReader();
while (true) {
  const chunk = await reader.read();
  // chunk: { done: boolean; value: any }
  console.log(chunk);
}

ただし、ここで注意したいのは、readerは必ずなんらかの変数に保持しておく必要があるということです。ReadableStream#getReader 関数を実行すると、内部でロックされます。ロックされると、 getReader() を呼び出すことができなくなります。解除するためには reader.releaseLock() を呼び出す必要があります。

つまり、次のような実装はエラーになるし、二度とreaderを取得することはできなくなります。

const data1 = await stream.getReader().read();
const data2 = await stream.getReader().read();
// error: Uncaught (in promise) TypeError: ReadableStream is locked.
// const data2 = await stream.getReader().read();
//                            ^

ストリームのデータが途中で不要になる場合もありますよね。処理中にエラーが発生したとか。そのような場合は、キャンセルすることも可能です。

for await (const data of stream) {
  // なんか処理
  if (hasError) {
    stream.cancel();
  }
}

ストリームAPIで直接データを書きこむ

WritableStreamはデータを書き込む機能を持っています。ReadableStream以外からデータを書き込む場合には以下のようにします。

const writer = file.writable.getWriter(); // readerと同様、変数に保持しておく
await writer.ready; // readyはPromiseで、書き込み準備が整ったらresolveするので、それを待つ
await writer.write(uInt8ArrayData); // await付きで書き込む場合は書き込みが完了するまで待ってくれる
writer.write(uInt8ArrayData); // awaitなしで書き込む場合は↓
await write.ready;            // 次のデータが書き込めるようになるまで待つ
await writer.close(); // writerを閉じる。

注意点としては、書き込み準備が整うまでは await writer.ready (あるいはwriteを)で待つ必要があるというところでしょうか。(小さなコードで試してみたら、特に問題なく動いてしまいましたが、待っておいた方が無難でしょう)

ReadableStream, WritableStream, TransformStreamを自分で書いてみる

欲しいものがなければ、自分で作ればよいでしょう。そこで、ここではそれぞれ適当なサンプルを作ってみることにします。

ReadableStream

作り方は主に2タイプあります。ReadableStreamコンストラクタに実装を渡すタイプと、ReadableStreamを継承するタイプです。どちらも実装を渡すことには変わりないのですが、前者だとその場限りのストリームが作成され、後者だとコンストラクタを呼び出すことでいくつもストリームを作成することができます。

ReadableStreamをnewするタイプ

まずは、前者の方法で、ランダムな数字が無限に取得できるストリームを作成してみましょう。

const randomNumberStream1 = new ReadableStream({
  // async 関数でもよい
  pull(controller) {
    controller.enqueue(Math.random() * 10000);
  }
});

👆上の実装を見てください。pullという関数を実装したオブジェクトをReadableStreamのコンストラクタに渡しています。
この、pullという関数を実装したストリームは、外からの「データおくれ」という要求に応じてデータを出力することができるストリームです。外から「データおくれ」という要求が来ると、pull関数が呼ばれます。それに応じてデータをcontroller.enqueueに渡すことで、ストリームから取り出すことができるようになります。

外からの要求に関係なく、ストリーム自身がデータを定期的に出力するようなストリームを作ることもできます。

const randomNumberStream2 = new ReadableStream({
  // async 関数でもよい
  start(controller) {
    setInterval(() => {
      controller.enqueue(Math.random() * 10000);
    }, 1000);
  }
});

⏫上の実装を見てください。startという関数を実装したオブジェクトがありますね。startはストリームインスタンスが作成された直後に呼び出されます。内部では1秒おきに数字をcontroller.enqueueに渡しています。

試しに、5秒後からストリームを読み込むようにしてみましょうか。

// 上のコードの続きです
await sleep(5000);

for await (const item of randomNumberStream2) {
  console.log(item);
}

↑上記コードを実行すると、まず最初の5秒は何も出力されませんが、その後、ランダムな数字が5つ出力され、その後は1秒おきにランダムな数字が出力される、という動きになります。

start直後から、キューに積むことで、5秒間で5つのデータが蓄積されています。その後、for await ofでデータを取り出すタイミングで、すでに蓄積されているデータは待ち時間なしで取り出せますが、全て取り出し終えたあとはデータがキューに積まれるまで待たないと新たなデータが取り出せないわけです。

startとpullはこのように使い分けするとよいでしょう。

しかし、コードを見てもわかるとおり、状態を持つことが難しくなっていますね(ローカル変数に置かないといけない)。数字がカウントアップされるだけの簡単なコードでさえ、実現するのは難しいです。そこで登場するのが、ReadableStreamを継承する方法です。

ReadableStreamを継承するタイプ

継承するといっても、結局のところ実装はReadableStreamのコンストラクタに渡すのですが、クラス内に状態を持たせることが容易になりますね。また、同じストリームを複数個作成することもできます。(意味があるかは知りませんが)

class CountUpStream extends ReadableStream<number> {
  count = 0;
  constructor() {
    // superはReadableStreamのコンストラクタ
    super({
      // async 関数でもよい
      pull: (controller) => {
        controller.enqueue(this.count++);
      }
    })
  }
}

const stream = new CountUpStream();

データが尽きたことを知らせる

先ほどまでのストリームは無限にデータが生成されるため、データの終了はありませんでした。しかし、ストリームにも終了を示す方法があります。逆にカウントダウンして、0になったら終了するストリームを作りましょう

class CountDownStream extends ReadableStream<number> {
  constructor(public count = 10) {
    super({
      pull: (controller) => {
	if (this.count === 0) {
	  // controller.close() を呼び出すことでデータが尽きたことを知らせることができる
	  controller.close();
	} else {
          controller.enqueue(this.count--);
	}
      }
    })
  }
}

エラーが発生したことを知らせる

データ生成時にエラーが発生することもあるでしょう。そんな場合は、使用者にエラーを伝えることが可能です。for await ofをtry-catchで囲っておけば、Exceptionをキャッチできます。

class ErrorStream extends ReadableStream<number> {
  constructor() {
    super({
      pull: (controller) => {
	try {
	  // なんか処理
	} catch (e) {
	  controller.error(e);
	}
      }
    })
  }
}

キャンセルされたときにお片づけをする

行儀のよいプログラムを書きたいものです。キャンセルされた際にお片づけするプログラムはとても行儀がよいと思います。キャンセル時にはcancelが呼ばれます。この中で存分に片付けましょう。cancelはasync関数でも大丈夫です。

class CancellableStream extends ReadableStream<number> {
  constructor() {
    super({
      pull: (controller) => {
      },
      cancel: async () => {
        // DBから切断する
	// ファイルを閉じる
	// タイマーをストップする
	// etc...
      }
    })
  }
}

WritableStream

作り方は、ReadableStreamと同様に2パターンあります。WritableStreamのコンストラクタに実装を渡すタイプと、WritableStreamを継承するタイプです。ほぼ同じということがわかっていただけていると思うので、継承するタイプで紹介していきます。

データを書きこむ

ConsoleStreamを用意しました。入力されたデータをconsole.logに出力するというものです。ReadableStreamの例として、👆上でも使ったCountUpStreamを使っています。これらを繋げてみるとどうなるでしょうか。

class ConsoleStream extends WritableStream<any> {
  constructor() {
    super({
      write(chunk) {
        console.log(chunk);
      }
    })
  }
}

const countUpStream = new CountUpStream();
const consoleStream = new ConsoleStream();

countUpStream
  .pipeTo(consoleStream);

実行してみると、コンソール上にひたすらに数字が出力されるようになりました。やりましたね。

WritableStreamその他

write以外に、startとclose、abortという関数を実装することができます。startはストリームが作られたときに一度だけ呼ばれます。書き込み用のファイルを生成したりするのに使うとよいでしょう。closeはReadableStreamからのデータが尽きたときに呼ばれます。お片づけしましょう。abortはReadableStreamなどでエラーが発生したときに呼ばれます。こちらもやはりお片づけしましょう。

サンプルは、、、、めんどうなのでいいですよね。

TransformStream

TransformStreamはWritableStreamの性質(ReadableStreamからデータを受け取る)と、ReadableStreamの性質(WritableStreamにデータを渡す)の二つを併せ持つストリームです。データの変換などに使うとよいでしょう。

データを受け取り、データを出力する

数字を受け取って、文字列を出力するTwiceStreamです。transform関数を実装することで、変換処理を実現できます。引数として、chunkとcontrollerが渡されます。chunk(データ)を受け取り、controller.enqueueすることでデータを出力します。
今回は意味はありませんが、受け取った数値を二つ並べて文字列として出力することにしました。

class TwiceStream extends TransformStream<number, string> {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(`${chunk} ${chunk}`);
      }
    })
  }
}
const twiceStream = new TwiceStream();
countUpStream
  .pipeThrough(twiceStream)
  .pipeTo(consoleStream);

👆の実装により、同じ数字が二つ並び、カウントアップされていく、という処理が実現できました。

まとめ

最後はすこし端折って説明してしまいましたが、これでストリームの使い方がずいぶん理解できたのではないでしょうか。
ストリームを使いこなせるようになると、ドデカいファイルを省メモリで読み込んだりできますね!
がんばってください。

以上、よろしくお願いいたします。

Discussion