【Streams API】メモリ効率と低遅延なデータ処理
はじめに
今回は、Web標準技術であるStreams APIについて、その基本と実装サンプルまでを解説します。
TypeScriptを用いた具体的なコード例を交えながら、Streams APIがなぜ必要で、どのように活用すべきかを理解していきたいと思います。
Streams API とは
まず、Streams APIとは何かを説明します。
Streams APIは、データの流れ(ストリーム)を効率的に処理するためのWeb標準技術です。
従来のアプローチでは、大容量のデータの場合、
全データが揃うまで待ってから処理を開始していました。
一方、Streams APIでは、データをチャンクという小さな単位に分けて、
到着したものから順次処理を開始することができます。
Streams APIの3つの構成要素
Streams APIは3つのストリームから構成されています。
1. ReadableStream(読み取り)
- Fetch APIのレスポンスボディはReadableStream
- データのソースとなるストリーム
const response = await fetch('https://example.com/data.json');
if (!response.body) {
throw new Error('レスポンスボディが存在しません');
}
const stream: ReadableStream<Uint8Array> = response.body;
2. TransformStream(変換)
- データを変換・加工するストリーム
- ReadableStreamから出てくるのはUint8Array(バイナリデータ)なので、変換するためにTextDecoderStream()を通す必要がある
// テキストデコーダー(バイト列 → 文字列)
const decoder = new TextDecoderStream();
const textStream: ReadableStream<string> =
stream.pipeThrough(decoder);
3. WritableStream(書き込み)
- データの出力先となるストリーム
- ファイル保存、DOM要素への出力などを行う
// ファイルへの書き込み
const fileHandle = await window.showSaveFilePicker();
const writable: WritableStream = await fileHandle.createWritable();
主要なメソッド
実装で頻繁に使うメソッドを簡単に紹介します。
ストリームの接続メソッド
pipeThrough(transformStream)
- データを変換しながら次のストリームに渡すメソッド
- TransformStreamを引数に取り、変換後のReadableStreamを返す
- 複数回連結して、データ変換のパイプラインを構築することができる
stream
.pipeThrough(transform1) // 1つ目の変換
.pipeThrough(transform2) // 2つ目の変換
// さらに連結可能
pipeTo(writableStream)
- データを最終的な出力先に送るメソッド
- WritableStreamを引数に取り、処理完了を示すPromiseを返す
- パイプラインの最後に必ず1回だけ使用する
await stream
.pipeThrough(transform)
.pipeTo(destination); // 最後はpipeTo()
ReadableStreamのメソッド
getReader()
ストリームから直接データを読み取るための リーダー(Reader) を取得するメソッドです。
使い分け
-
pipeThrough/pipeTo→ 自動パイプ処理向け(推奨) -
getReader()→ 手動でチャンク単位の処理 をしたい場合
使用時の注意点
- 読み取り中は ストリームがロックされる ため、他の操作ができない
- 処理終了後は
releaseLock()でロックを解放する必要がある - 中断する場合は
cancel(reason)でリソースを解放
基本的な使い方
const reader = stream.getReader();
try {
while (true) {
// チャンクを1つ読み取る
const { done, value } = await reader.read();
// done: true → 全データ読み取り完了
// done: false → まだデータが残っている
if (done) {
console.log('読み取り完了');
break;
}
// value: 読み取ったチャンクのデータ
console.log('受信:', value);
}
} finally {
// 必ずロックを解放(エラー時も実行される)
reader.releaseLock();
}
cancel(reason)
- ストリームの読み取りを 途中で中止 するメソッドです。
- 不要になったストリームのリソースを解放できます。
-
reasonは省略可能ですが、エラー原因などを渡せます。 -
pipeThroughやpipeToで接続している場合、キャンセルすると 下流の WritableStream も影響を受けます。
await stream.cancel("不要になったため中止");
TransformStreamDefaultControllerのプロパティとメソッド
enqueue(chunk)
- 変換したデータを次のストリームに送出するメソッド
-
transform()メソッド内で使用
transform(chunk, controller) {
const processed = processData(chunk);
controller.enqueue(processed);
}
desiredSize(プロパティ)
- キューの残り容量を示す数値
- 正の値 → まだデータを受け入れ可能
- 0以下 → キューが満杯(後で説明する背圧が発生している状態)
console.log(controller.desiredSize); // 結果が3 → あと3つ入る
controller.enqueue(data);
console.log(controller.desiredSize); // 結果が2 → ひとつ減った
Transformerインターフェースのメソッド
transform(chunk, controller)
- 各チャンクに対して実行される変換処理
- 上流から送られてきたデータ(
chunk)を受け取る - 必要に応じて加工・変換する
- 加工したデータを
controller.enqueue()で下流に送る
transform(chunk: string, controller: TransformStreamDefaultController<number>) {
const number = parseInt(chunk, 10);
if (!isNaN(number)) {
controller.enqueue(number); // 有効な数値だけ下流に送る
}
}
flush(controller)
-
flush(controller)は、TransformStream が終了するときに呼ばれる関数 - 変換中に残ったバッファのデータを最後に下流に送る
- ストリームの終端処理を行う
flush(controller: TransformStreamDefaultController<string>) {
if (this.buffer.length > 0) {
controller.enqueue(this.buffer); // 残りのデータを送出
}
}
start(controller)(オプション)
- Transformerの初期化処理(バッファの初期化やログ出力など)
- 必要に応じて実装
start(controller: TransformStreamDefaultController<string>) {
console.log('変換処理を開始します');
}
パイプラインの構築
これらのストリームをpipeThrough()とpipeTo()で連結することで、データ処理のパイプラインを構築できます。
await stream
.pipeThrough(decoder) // データを文字列やバイナリに変換
.pipeThrough(jsonParser) // JSON オブジェクトに変換
.pipeTo(writer); // 最終的な出力先に書き込む
なぜ Streams API が必要なのか
従来の処理方法が抱える課題
ここで、従来の方法が抱える2つの問題についてみてみます。
メモリ負荷の増大
- 従来の方法では データ全体を一度にメモリに読み込む
- データサイズが大きいと、メモリを大量に消費してしまう
- 最悪の場合、アプリがクラッシュすることもある
const response = await fetch('large-file.json');
const data = await response.json(); // ここで全データを一気に読み込む
初回応答の遅延
- データのダウンロードや読み込みが完了するまで、画面に何も表示されない
- ユーザーは数秒~数十秒間、待たされることになる
- 特に大量データの表示やリアルタイム処理には不向き
const users = await fetchAllUsers(); // 全ユーザーを取得するまで待機
displayUsers(users); // データ取得完了後に表示開始
Streams APIによる解決
Streams APIは、チャンク単位でデータを処理することで、これらの課題を解決します。
メモリ効率の最適化
- データを チャンクごとに処理
- メモリに残るのは、チャンクサイズ × キューサイズだけ
- 大容量ファイルでも安定して動作
if (!response.body) {
throw new Error('レスポンスボディが存在しません');
}
await response.body // ReadableStream: サーバーからのデータ
.pipeThrough(new TextDecoderStream()) // バイナリを文字列に変換(チャンク単位)
.pipeThrough(new TransformStream({
transform(chunk, controller) {
const processed = processChunk(chunk); // チャンクを処理
controller.enqueue(processed); // 下流に送る
// 処理済みチャンクはメモリからすぐ解放される
}
}))
.pipeTo(writer); // WritableStream: 最終出力先に書き込む
低遅延の実現
- 最初のチャンクが届いた瞬間から処理・表示開始
- ユーザー体験の大幅な向上
- プログレッシブレンダリングが可能
if (!response.body) {
throw new Error('レスポンスボディが存在しません');
}
await response.body // ReadableStream: サーバーからのデータ
.pipeThrough(decoder) // バイナリを文字列に変換(チャンク単位)
.pipeTo(new WritableStream({
write(chunk) {
displayChunk(chunk); // 受信したチャンクを即座に表示
}
}));
内部キューと背圧制御
ここから、Streams APIの重要な機能である背圧制御について説明します。
背圧制御とは
背圧制御とは、データの生産速度と消費速度の差を自動的に調整する仕組みです。
例えば、データの生成が速すぎてメモリが溢れないように、また、処理が遅すぎて待機時間が発生しないように、自動的にバランスを取ります。
内部キューの仕組み
各ストリームは、内部に**キュー(待ち行列)**を持っています。
このキューには、highWaterMarkという最大容量が設定されています。
例えば、highWaterMark: 5の場合、最大5個のチャンクを保持できます。
データを生成するReadableStreamと、データを消費するWritableStreamの速度が異なる場合、
このキューが満杯になったり、空になったりします。
背圧制御は、この状態を自動的に検知して、データフローを調整しています。
new ReadableStream({
// ...
}, {
highWaterMark: 5 // キューの最大サイズ(ReadableStream/WritableStreamのデフォルト: 1、TransformStreamのデフォルト: 0)
});
背圧制御の動作パターン
具体的な動作を3つのケースで見てみます。
通常の状態
- 処理がスムーズ
- キューに余裕がある
- 背圧なし
ReadableStream → [キュー: □□□__] → WritableStream
(3/5個使用) ↓
高速処理
処理が遅い
- キューが満杯になる
- 背圧発生 → ReadableStreamに停止信号
- ReadableStreamは一時停止
ReadableStream → [キュー: ■■■■■] → WritableStream
↑ (5/5個満杯) ↓
停止信号 低速処理
処理が追いつく
- キューに空きができる
- 背圧解除 → ReadableStreamに再開信号
- ReadableStreamが再開
ReadableStream → [キュー: ■■___] → WritableStream
↑ (2/5個使用) ↓
再開信号 処理完了
このように、自動的にペースを調整することで、メモリに溜まるデータ量が制限されます。
desiredSizeプロパティの役割
desiredSizeプロパティは、キューの残り容量を示す重要な指標です。
desiredSizeの計算式
// 基本的な考え方
desiredSize = highWaterMark - queueTotalSize
// queueTotalSizeは、キュー内の全チャンクのサイズの合計
// チャンクのサイズが指定されていない場合は、各チャンクのサイズは1としてカウント
// 例:highWaterMark = 5 の場合
desiredSize = 5 // キューは空(5個入る)
desiredSize = 2 // 3個使用中(あと2個入る)
desiredSize = 0 // キューが満杯に近く、背圧がかかり始めている
desiredSize = -1 // 容量オーバー(背圧発生中)
この値によって、次のセクションで説明するpull()メソッドの呼び出しが制御されます。
pull()とenqueue()による自動調整
背圧制御の核心は、pull()メソッドとdesiredSizeの連携にあります。
仕組みの詳細
-
ReadableStreamのpull()は 下流のキューに余裕があるとき (desiredSize > 0) に自動的に呼ばれる -
controller.enqueue(chunk)でチャンクを追加すると、desiredSize が減少 -
desiredSize <= 0になると、pull() は自動的に停止 - 上流はこれ以上チャンクを作らないので、メモリ使用量が抑えられる
- 下流の処理が進み、キューに余裕が出るとpull() が再び呼ばれる
つまり、手動でペース調整する必要がなく、
システムが自動的にメモリ使用量を制限してくれます。
実装例
const stream = new ReadableStream({
// start(): ストリーム開始時に1度だけ呼ばれる
start(controller) {
console.log('ストリーム開始');
},
// pull(): desiredSize > 0 のときに自動的に呼ばれる
pull(controller) {
const chunk = generateData();
// チャンクを送出
controller.enqueue(chunk);
// この時点でdesiredSizeが減少
// desiredSize <= 0 になると、pull()は呼ばれなくなる(背圧発生)
// 下流の処理が進んでdesiredSize > 0 になると、再びpull()が呼ばれる
}
}, {
highWaterMark: 3 // キューの最大サイズを3に設定
});
試してみる
Metropolitan Museum of Art APIから約50万件のオブジェクトID(約5MB)を取得する例で、
実際に試してみました。
従来の方法の場合
type ApiResponse = {
total: number;
objectIDs: number[];
};
const fetchAllIds = async (): Promise<number[]> => {
const response = await fetch('https://collectionapi.metmuseum.org/public/collection/v1/objects');
// 全データ(約5MB)をメモリに読み込む
const data: ApiResponse = await response.json();
return data.objectIDs; // 約50万件を返す
};
// 使用例
const ids = await fetchAllIds(); // 待機が発生
console.log(`取得完了: ${ids.length}件`);
問題点:
- ダウンロード完了まで待機時間が約20秒間程度
- 全データ(約5MB)がメモリに保持される
- 初回表示までの時間が長い
Streams APIを使用した場合
Streams APIでは、データをチャンクごとに処理します。
CustomReadableStream
Fetch APIのレスポンスをReadableStreamとして扱うカスタムクラス。
class CustomReadableStream extends ReadableStream<Uint8Array> {
constructor(url: string) {
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
super({
// ストリーム開始時に1度だけ実行
async start(controller) {
try {
// URLからデータを取得
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP Error: ${response.status}`);
}
if (!response.body) {
throw new Error("No response body");
}
// レスポンスボディからリーダーを取得
reader = response.body.getReader();
} catch (error) {
controller.error(error);
}
},
// 下流にデータを送る準備ができたときに自動的に呼ばれる
async pull(controller) {
if (!reader) {
controller.error(new Error("Reader not initialized"));
return;
}
try {
// チャンクを読み取る
const { done, value } = await reader.read();
if (done) {
// 全データ読み取り完了
controller.close();
} else {
// チャンクを下流に送出
controller.enqueue(value);
}
} catch (error) {
controller.error(error);
}
},
// ストリームがキャンセルされたときの処理
async cancel() {
if (reader) {
await reader.cancel();
reader = null;
}
},
});
}
}
CustomTransformStream
受け取った文字列データを加工して、必要なIDだけを取り出す変換用ストリームのクラス。
class CustomTransformStream extends TransformStream<string, number> {
constructor() {
let buffer = "";
let isInsideArray = false;
super({
/**
* 初期処理(オプション)
*/
start() {
console.log('Transform開始');
},
/**
* チャンクを変換
* 各チャンクに対して実行される
*/
transform(chunk, controller) {
// 受信したチャンクをバッファに追加
buffer += chunk;
// JSON配列の開始位置を探す
if (!isInsideArray) {
const arrayStartIndex = findArrayStart(buffer);
if (arrayStartIndex !== null) {
isInsideArray = true;
// 配列開始位置以降のデータのみを保持
buffer = buffer.substring(arrayStartIndex);
}
}
// 配列内のデータを処理
if (isInsideArray) {
// バッファから数値IDを抽出
const ids = extractIds(buffer);
// 抽出したIDを1つずつ下流に送出
for (const id of ids) {
controller.enqueue(id);
}
// 処理済みデータをバッファから削除
buffer = cleanBuffer(buffer);
}
},
/**
* 終了時の処理
* バッファに残ったデータを処理
*/
flush(controller) {
const ids = extractIds(buffer);
for (const id of ids) {
controller.enqueue(id);
}
console.log('Transform完了');
},
});
}
}
CustomWritableStream
変換されたIDを受け取り、UIにテキストとして表示する書き込み用ストリームのクラス。
class CustomWritableStream extends WritableStream<number> {
constructor(
onWrite?: (id: number) => void,
onComplete?: () => void,
onError?: (error: any) => void,
) {
super({
/**
* 各チャンクの書き込み処理
*/
async write(id) {
// カスタム処理を実行(オプション)
if (onWrite) {
onWrite(id);
}
},
/**
* ストリーム正常終了時の処理
*/
close() {
console.log("処理完了");
if (onComplete) {
onComplete();
}
},
/**
* エラー発生時の処理
*/
abort(reason) {
console.error("エラー:", reason);
if (onError) {
onError(reason);
}
},
});
}
}
パイプラインでの使用例
// カスタムクラスを組み合わせてパイプラインを構築
const readable = new CustomReadableStream(MET_API_URL); // データソース
const decoder = new TextDecoderStream(); // バイナリ→文字列変換
const transformer = new CustomTransformStream(); // JSON→ID抽出
const writable = new CustomWritableStream(
(id) => console.log(`ID受信: ${id}`), // IDを1つ受信するたびに実行
() => console.log('処理完了'), // 全ID処理完了時に実行
(error) => console.error('エラー:', error) // エラー時に実行
);
// パイプラインの実行(エラーハンドリング付き)
try {
await readable
.pipeThrough(decoder) // バイト列を文字列に変換
.pipeThrough(transformer) // JSONからIDを抽出
.pipeTo(writable); // IDを処理(UI更新など)
} catch (error) {
console.error('ストリーム処理エラー:', error);
// 必要に応じてリソースのクリーンアップやユーザーへの通知を行う
}
測定結果
※ 環境によって結果は異なります
| 項目 | 従来の方法 | Streams API |
|---|---|---|
| 初回表示 | 約20秒待機 | 即座(数十ms) |
| 表示方法 | 一括表示 | 逐次表示 |
| メモリ使用量 | 約5MB増加 | 一定(チャンクサイズ程度) |
Streams APIが適している場面
大容量データの処理 と リアルタイム性が求められる処理 の2つで利用されることが多いようです。
例:リアルタイムデータ処理、大容量ファイルのダウンロード・アップロードなど
まとめ
本日は、Streams API の基本について解説しました。
今後は、実装の選択肢のひとつとして適した場面があれば、積極的に活用していきたいと思います。
特に、今回紹介した背圧制御の仕組みを理解することで、なぜメモリ効率が良いのか、なぜ安定して動作するのかが理解できました。
Discussion