🌊

【Streams API】メモリ効率と低遅延なデータ処理

に公開

はじめに

今回は、Web標準技術であるStreams APIについて、その基本と実装サンプルまでを解説します。
TypeScriptを用いた具体的なコード例を交えながら、Streams APIがなぜ必要で、どのように活用すべきかを理解していきたいと思います。

Streams API とは

まず、Streams APIとは何かを説明します。
Streams APIは、データの流れ(ストリーム)を効率的に処理するためのWeb標準技術です。
従来のアプローチでは、大容量のデータの場合、
全データが揃うまで待ってから処理を開始していました。
一方、Streams APIでは、データをチャンクという小さな単位に分けて、
到着したものから順次処理を開始することができます。
https://developer.mozilla.org/ja/docs/Web/API/Streams_API

Streams APIの3つの構成要素

Streams APIは3つのストリームから構成されています。

1. ReadableStream(読み取り)

  • Fetch APIのレスポンスボディはReadableStream
  • データのソースとなるストリーム
const response = await fetch('https://example.com/data.json');

if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

const stream: ReadableStream<Uint8Array> = response.body;

2. TransformStream(変換)

  • データを変換・加工するストリーム
  • ReadableStreamから出てくるのはUint8Array(バイナリデータ)なので、変換するためにTextDecoderStream()を通す必要がある
// テキストデコーダー(バイト列 → 文字列)
const decoder = new TextDecoderStream();
const textStream: ReadableStream<string> =
  stream.pipeThrough(decoder);

3. WritableStream(書き込み)

  • データの出力先となるストリーム
  • ファイル保存、DOM要素への出力などを行う
// ファイルへの書き込み
const fileHandle = await window.showSaveFilePicker();
const writable: WritableStream = await fileHandle.createWritable();

主要なメソッド

実装で頻繁に使うメソッドを簡単に紹介します。

ストリームの接続メソッド

pipeThrough(transformStream)

  • データを変換しながら次のストリームに渡すメソッド
  • TransformStreamを引数に取り、変換後のReadableStreamを返す
  • 複数回連結して、データ変換のパイプラインを構築することができる
stream
  .pipeThrough(transform1)  // 1つ目の変換
  .pipeThrough(transform2)  // 2つ目の変換
  // さらに連結可能

pipeTo(writableStream)

  • データを最終的な出力先に送るメソッド
  • WritableStreamを引数に取り、処理完了を示すPromiseを返す
  • パイプラインの最後に必ず1回だけ使用する
await stream
  .pipeThrough(transform)
  .pipeTo(destination);  // 最後はpipeTo()

ReadableStreamのメソッド

getReader()

ストリームから直接データを読み取るための リーダー(Reader) を取得するメソッドです。

使い分け

  • pipeThrough / pipeTo → 自動パイプ処理向け(推奨)
  • getReader()手動でチャンク単位の処理 をしたい場合

使用時の注意点

  • 読み取り中は ストリームがロックされる ため、他の操作ができない
  • 処理終了後は releaseLock() でロックを解放する必要がある
  • 中断する場合は cancel(reason) でリソースを解放

基本的な使い方

const reader = stream.getReader();
try {
  while (true) {
    // チャンクを1つ読み取る
    const { done, value } = await reader.read();

    // done: true → 全データ読み取り完了
    // done: false → まだデータが残っている
    if (done) {
      console.log('読み取り完了');
      break;
    }

    // value: 読み取ったチャンクのデータ
    console.log('受信:', value);
  }
} finally {
  // 必ずロックを解放(エラー時も実行される)
  reader.releaseLock();
}

cancel(reason)

  • ストリームの読み取りを 途中で中止 するメソッドです。
  • 不要になったストリームのリソースを解放できます。
  • reason は省略可能ですが、エラー原因などを渡せます。
  • pipeThroughpipeTo で接続している場合、キャンセルすると 下流の WritableStream も影響を受けます
await stream.cancel("不要になったため中止");

TransformStreamDefaultControllerのプロパティとメソッド

enqueue(chunk)

  • 変換したデータを次のストリームに送出するメソッド
  • transform()メソッド内で使用
transform(chunk, controller) {
  const processed = processData(chunk);
  controller.enqueue(processed);
}

desiredSize(プロパティ)

  • キューの残り容量を示す数値
  • 正の値 → まだデータを受け入れ可能
  • 0以下 → キューが満杯(後で説明する背圧が発生している状態)
console.log(controller.desiredSize);  // 結果が3 → あと3つ入る
controller.enqueue(data);
console.log(controller.desiredSize);  // 結果が2 → ひとつ減った

Transformerインターフェースのメソッド

transform(chunk, controller)

  • 各チャンクに対して実行される変換処理
  • 上流から送られてきたデータ(chunk)を受け取る
  • 必要に応じて加工・変換する
  • 加工したデータを controller.enqueue() で下流に送る
transform(chunk: string, controller: TransformStreamDefaultController<number>) {
  const number = parseInt(chunk, 10);

  if (!isNaN(number)) {
    controller.enqueue(number);  // 有効な数値だけ下流に送る
  }
}

flush(controller)

  • flush(controller) は、TransformStream が終了するときに呼ばれる関数
  • 変換中に残ったバッファのデータを最後に下流に送る
  • ストリームの終端処理を行う
flush(controller: TransformStreamDefaultController<string>) {
  if (this.buffer.length > 0) {
    controller.enqueue(this.buffer);  // 残りのデータを送出
  }
}

start(controller)(オプション)

  • Transformerの初期化処理(バッファの初期化やログ出力など)
  • 必要に応じて実装
start(controller: TransformStreamDefaultController<string>) {
  console.log('変換処理を開始します');
}

パイプラインの構築

これらのストリームをpipeThrough()pipeTo()で連結することで、データ処理のパイプラインを構築できます。

await stream
  .pipeThrough(decoder)      // データを文字列やバイナリに変換
  .pipeThrough(jsonParser)   // JSON オブジェクトに変換
  .pipeTo(writer);           // 最終的な出力先に書き込む

なぜ Streams API が必要なのか

従来の処理方法が抱える課題

ここで、従来の方法が抱える2つの問題についてみてみます。

メモリ負荷の増大

  • 従来の方法では データ全体を一度にメモリに読み込む
  • データサイズが大きいと、メモリを大量に消費してしまう
  • 最悪の場合、アプリがクラッシュすることもある
const response = await fetch('large-file.json');
const data = await response.json(); // ここで全データを一気に読み込む

初回応答の遅延

  • データのダウンロードや読み込みが完了するまで、画面に何も表示されない
  • ユーザーは数秒~数十秒間、待たされることになる
  • 特に大量データの表示やリアルタイム処理には不向き
const users = await fetchAllUsers(); // 全ユーザーを取得するまで待機
displayUsers(users);                 // データ取得完了後に表示開始

Streams APIによる解決

Streams APIは、チャンク単位でデータを処理することで、これらの課題を解決します。

メモリ効率の最適化

  • データを チャンクごとに処理
  • メモリに残るのは、チャンクサイズ × キューサイズだけ
  • 大容量ファイルでも安定して動作
if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

await response.body                      // ReadableStream: サーバーからのデータ
  .pipeThrough(new TextDecoderStream())  // バイナリを文字列に変換(チャンク単位)
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      const processed = processChunk(chunk); // チャンクを処理
      controller.enqueue(processed);          // 下流に送る
      // 処理済みチャンクはメモリからすぐ解放される
    }
  }))
  .pipeTo(writer);                         // WritableStream: 最終出力先に書き込む

低遅延の実現

  • 最初のチャンクが届いた瞬間から処理・表示開始
  • ユーザー体験の大幅な向上
  • プログレッシブレンダリングが可能
if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

await response.body                        // ReadableStream: サーバーからのデータ
  .pipeThrough(decoder)                    // バイナリを文字列に変換(チャンク単位)
  .pipeTo(new WritableStream({
    write(chunk) {
      displayChunk(chunk);                 // 受信したチャンクを即座に表示
    }
  }));

内部キューと背圧制御

ここから、Streams APIの重要な機能である背圧制御について説明します。

背圧制御とは

背圧制御とは、データの生産速度と消費速度の差を自動的に調整する仕組みです。

例えば、データの生成が速すぎてメモリが溢れないように、また、処理が遅すぎて待機時間が発生しないように、自動的にバランスを取ります。

内部キューの仕組み

各ストリームは、内部に**キュー(待ち行列)**を持っています。
このキューには、highWaterMarkという最大容量が設定されています。

例えば、highWaterMark: 5の場合、最大5個のチャンクを保持できます。

データを生成するReadableStreamと、データを消費するWritableStreamの速度が異なる場合、
このキューが満杯になったり、空になったりします。

背圧制御は、この状態を自動的に検知して、データフローを調整しています。

new ReadableStream({
  // ...
}, {
  highWaterMark: 5  // キューの最大サイズ(ReadableStream/WritableStreamのデフォルト: 1、TransformStreamのデフォルト: 0)
});

背圧制御の動作パターン

具体的な動作を3つのケースで見てみます。

通常の状態

  • 処理がスムーズ
  • キューに余裕がある
  • 背圧なし
ReadableStream → [キュー: □□□__] → WritableStream
                   (3/5個使用)        ↓
                                   高速処理

処理が遅い

  • キューが満杯になる
  • 背圧発生 → ReadableStreamに停止信号
  • ReadableStreamは一時停止
ReadableStream → [キュー: ■■■■■] → WritableStream
    ↑              (5/5個満杯)        ↓
  停止信号                         低速処理

処理が追いつく

  • キューに空きができる
  • 背圧解除 → ReadableStreamに再開信号
  • ReadableStreamが再開
ReadableStream → [キュー: ■■___] → WritableStream
    ↑              (2/5個使用)        ↓
  再開信号                         処理完了

このように、自動的にペースを調整することで、メモリに溜まるデータ量が制限されます。

desiredSizeプロパティの役割

desiredSizeプロパティは、キューの残り容量を示す重要な指標です。

desiredSizeの計算式

// 基本的な考え方
desiredSize = highWaterMark - queueTotalSize

// queueTotalSizeは、キュー内の全チャンクのサイズの合計
// チャンクのサイズが指定されていない場合は、各チャンクのサイズは1としてカウント

// 例:highWaterMark = 5 の場合
desiredSize =  5  // キューは空(5個入る)
desiredSize =  2  // 3個使用中(あと2個入る)
desiredSize =  0  // キューが満杯に近く、背圧がかかり始めている
desiredSize = -1  // 容量オーバー(背圧発生中)

この値によって、次のセクションで説明するpull()メソッドの呼び出しが制御されます。

pull()とenqueue()による自動調整

背圧制御の核心は、pull()メソッドとdesiredSizeの連携にあります。

仕組みの詳細

  • ReadableStreampull() は 下流のキューに余裕があるとき (desiredSize > 0) に自動的に呼ばれる
  • controller.enqueue(chunk) でチャンクを追加すると、desiredSize が減少
  • desiredSize <= 0 になると、pull() は自動的に停止
  • 上流はこれ以上チャンクを作らないので、メモリ使用量が抑えられる
  • 下流の処理が進み、キューに余裕が出るとpull() が再び呼ばれる

つまり、手動でペース調整する必要がなく、
システムが自動的にメモリ使用量を制限してくれます。

実装例

const stream = new ReadableStream({
  // start(): ストリーム開始時に1度だけ呼ばれる
  start(controller) {
    console.log('ストリーム開始');
  },

  // pull(): desiredSize > 0 のときに自動的に呼ばれる
  pull(controller) {
    const chunk = generateData();

    // チャンクを送出
    controller.enqueue(chunk);

    // この時点でdesiredSizeが減少
    // desiredSize <= 0 になると、pull()は呼ばれなくなる(背圧発生)
    // 下流の処理が進んでdesiredSize > 0 になると、再びpull()が呼ばれる
  }
}, {
  highWaterMark: 3  // キューの最大サイズを3に設定
});

試してみる

Metropolitan Museum of Art APIから約50万件のオブジェクトID(約5MB)を取得する例で、
実際に試してみました。

従来の方法の場合

type ApiResponse = {
  total: number;
  objectIDs: number[];
};

const fetchAllIds = async (): Promise<number[]> => {
  const response = await fetch('https://collectionapi.metmuseum.org/public/collection/v1/objects');

  // 全データ(約5MB)をメモリに読み込む
  const data: ApiResponse = await response.json();

  return data.objectIDs; // 約50万件を返す
};

// 使用例
const ids = await fetchAllIds(); // 待機が発生
console.log(`取得完了: ${ids.length}`);

問題点:

  • ダウンロード完了まで待機時間が約20秒間程度
  • 全データ(約5MB)がメモリに保持される
  • 初回表示までの時間が長い

Streams APIを使用した場合

Streams APIでは、データをチャンクごとに処理します。

CustomReadableStream

Fetch APIのレスポンスをReadableStreamとして扱うカスタムクラス。

class CustomReadableStream extends ReadableStream<Uint8Array> {
  constructor(url: string) {
    let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;

    super({
      // ストリーム開始時に1度だけ実行
      async start(controller) {
        try {
          // URLからデータを取得
          const response = await fetch(url);
          if (!response.ok) {
            throw new Error(`HTTP Error: ${response.status}`);
          }
          if (!response.body) {
            throw new Error("No response body");
          }
          // レスポンスボディからリーダーを取得
          reader = response.body.getReader();
        } catch (error) {
          controller.error(error);
        }
      },

      // 下流にデータを送る準備ができたときに自動的に呼ばれる
      async pull(controller) {
        if (!reader) {
          controller.error(new Error("Reader not initialized"));
          return;
        }

        try {
          // チャンクを読み取る
          const { done, value } = await reader.read();
          if (done) {
            // 全データ読み取り完了
            controller.close();
          } else {
            // チャンクを下流に送出
            controller.enqueue(value);
          }
        } catch (error) {
          controller.error(error);
        }
      },

      // ストリームがキャンセルされたときの処理
      async cancel() {
        if (reader) {
          await reader.cancel();
          reader = null;
        }
      },
    });
  }
}

CustomTransformStream

受け取った文字列データを加工して、必要なIDだけを取り出す変換用ストリームのクラス。


class CustomTransformStream extends TransformStream<string, number> {
  constructor() {
    let buffer = "";
    let isInsideArray = false;

    super({
      /**
       * 初期処理(オプション)
       */
      start() {
        console.log('Transform開始');
      },

      /**
       * チャンクを変換
       * 各チャンクに対して実行される
       */
      transform(chunk, controller) {
        // 受信したチャンクをバッファに追加
        buffer += chunk;

        // JSON配列の開始位置を探す
        if (!isInsideArray) {
          const arrayStartIndex = findArrayStart(buffer);
          if (arrayStartIndex !== null) {
            isInsideArray = true;
            // 配列開始位置以降のデータのみを保持
            buffer = buffer.substring(arrayStartIndex);
          }
        }

        // 配列内のデータを処理
        if (isInsideArray) {
          // バッファから数値IDを抽出
          const ids = extractIds(buffer);

          // 抽出したIDを1つずつ下流に送出
          for (const id of ids) {
            controller.enqueue(id);
          }

          // 処理済みデータをバッファから削除
          buffer = cleanBuffer(buffer);
        }
      },

      /**
       * 終了時の処理
       * バッファに残ったデータを処理
       */
      flush(controller) {
        const ids = extractIds(buffer);
        for (const id of ids) {
          controller.enqueue(id);
        }
        console.log('Transform完了');
      },
    });
  }
}

CustomWritableStream

変換されたIDを受け取り、UIにテキストとして表示する書き込み用ストリームのクラス。

class CustomWritableStream extends WritableStream<number> {
  constructor(
    onWrite?: (id: number) => void,
    onComplete?: () => void,
    onError?: (error: any) => void,
  ) {
    super({
      /**
       * 各チャンクの書き込み処理
       */
      async write(id) {
        // カスタム処理を実行(オプション)
        if (onWrite) {
          onWrite(id);
        }
      },

      /**
       * ストリーム正常終了時の処理
       */
      close() {
        console.log("処理完了");
        if (onComplete) {
          onComplete();
        }
      },

      /**
       * エラー発生時の処理
       */
      abort(reason) {
        console.error("エラー:", reason);
        if (onError) {
          onError(reason);
        }
      },
    });
  }
}

パイプラインでの使用例

// カスタムクラスを組み合わせてパイプラインを構築
const readable = new CustomReadableStream(MET_API_URL);  // データソース
const decoder = new TextDecoderStream();                 // バイナリ→文字列変換
const transformer = new CustomTransformStream();         // JSON→ID抽出
const writable = new CustomWritableStream(
  (id) => console.log(`ID受信: ${id}`),         // IDを1つ受信するたびに実行
  () => console.log('処理完了'),                 // 全ID処理完了時に実行
  (error) => console.error('エラー:', error)    // エラー時に実行
);

// パイプラインの実行(エラーハンドリング付き)
try {
  await readable
    .pipeThrough(decoder)      // バイト列を文字列に変換
    .pipeThrough(transformer)  // JSONからIDを抽出
    .pipeTo(writable);         // IDを処理(UI更新など)
} catch (error) {
  console.error('ストリーム処理エラー:', error);
  // 必要に応じてリソースのクリーンアップやユーザーへの通知を行う
}

測定結果

※ 環境によって結果は異なります

項目 従来の方法 Streams API
初回表示 約20秒待機 即座(数十ms)
表示方法 一括表示 逐次表示
メモリ使用量 約5MB増加 一定(チャンクサイズ程度)

Streams APIが適している場面

大容量データの処理リアルタイム性が求められる処理 の2つで利用されることが多いようです。
例:リアルタイムデータ処理、大容量ファイルのダウンロード・アップロードなど

まとめ

本日は、Streams API の基本について解説しました。
今後は、実装の選択肢のひとつとして適した場面があれば、積極的に活用していきたいと思います。

特に、今回紹介した背圧制御の仕組みを理解することで、なぜメモリ効率が良いのか、なぜ安定して動作するのかが理解できました。

GitHubで編集を提案

Discussion