📡

ReadableStreamオブジェクトを理解する

2024/12/20に公開

こんにちは、株式会社イルシルでEMをしているmizukiです。
今回はタスクでストリーミング配信を実装する必要があり「ReadableStream」というものに初めて触れたので、概要や使い方についてまとめていきたいと思います。

ReadableStreamとは

まずはReadableStreamについてのMDNの説明を引用したいと思います。

ReadableStream はストリーム API のインターフェイスで、バイトデータの読み取り可能なストリームを表します。 Fetch API は、 ReadableStream の具体的なインスタンスをResponse オブジェクトの body プロパティを介して提供します。

簡単に言うと、「ストリーミングで随時データを受け取ってくれるオブジェクト」です。

ReadableStreamには getReader() というメソッドがあり、これを実行することでリーダーを作成することができます。
リーダーを作成すると、ストリーミングでデータを受け取ることができます。
また、ReadableStreamには locked というプロパティがあり、リーダーを作成するとストリーミングがロックされるため、1箇所でしかストリーミングを受け取ることができない作りとなっています。

その他でいくと cancel() というメソッドも用意されており、名前の通りストリーミングをキャンセルすることも可能です。

どういう時に出てくる?

ReadableStreamってどういう時に出てくるの?という点ですが、今回実装で必要になったのは
・chatGPTをAPIとして使用する際に stream: true オプションを指定した際のレスポンス
・AWSのLambdaで「呼び出しモード」を RESPONSE_STREAM にした際のレスポンス
です。
chatGPTなんかはチャットをすると基本的にはストリーミングで文章が表示されていくので馴染みがあるのではないかと思います。

そのほかにもRailsにActiveStorage::Streamingというクラスもあるらしいです。
(全く触ったことないのですが・・)

実装方法

今回はchatGPTからストリーミングでデータを受け取る前提でコードを書いていきます。
chatGPTのリクエストに stream: true を指定すると、レスポンスにこのReadableStreamが含まれています。
それ以降は、ReadableStreamを介してストリーミングでデータを受け取っていく実装となります。
chatGPTからのレスポンスはReadableStreamを受け取る1度だけで、それ以降ストリーミングでデータを受け取るのはReadableStreamを介して」という点がポイントです。
(僕は初めこれを理解できておらずトンチンカンな実装をしてました😭)

ReadableStreamは「非同期反復可能プロトコル」であるため for await ... of構文が使えるようです。
それを使うと以下のような形で実装ができます。

コード全文はこちら
const response = await fetch('~chatGPT_URL~', {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "api-key": "~chatGPT_key~",
    },
    body: JSON.stringify({
      messages: ["~チャットの履歴データ~"],
      stream: true, // ここでストリーミング指定をします
      stream_options: { include_usage: true }, // token数を取得したい場合は明示的に指定する必要があります
    }),
});

// bodyにあるReadableStreamを取り出します
const readableStream = response.body

// ストリーミングでデータを受け取る
for await (const chunk of readableStream) {
  // ストリーミングで受け取ったデータをchunkと呼ぶことが多いため、ここでもchunkという変数で定義していきます
  // ここでchunkに対して何かしらの処理を書いていくことで、ストリーミングでデータを受け取り次第処理をすることが可能です。
  console.log(chunk);
}

まずリクエストする際に、bodyに stream: true を含めます。

const response = await fetch('~chatGPT_URL~', {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "api-key": "~chatGPT_key~",
    },
    body: JSON.stringify({
      messages: ["~チャットの履歴データ~"],
      stream: true, // ここでストリーミング指定をします
      stream_options: { include_usage: true }, // token数を取得したい場合は明示的に指定する必要があります
    }),
});

こうすることでresponseではReadableStreamを受け取ることができます。
responseの中のbodyに格納されているので、取り出していきます。

// bodyにあるReadableStreamを取り出します
const readableStream = response.body

受け取ったReadableStreamを for await ... ofで処理します。

// ストリーミングでデータを受け取る
for await (const chunk of readableStream) {
  // ストリーミングで受け取ったデータをchunkと呼ぶことが多いため、ここでもchunkという変数で定義していきます
  // ここでchunkに対して何かしらの処理を書いていくことで、ストリーミングでデータを受け取り次第処理をすることが可能です。
  console.log(chunk);
}

こうすることでストリーミングで受け取ったデータを逐次処理していくことが可能です。

補足
また、別の書き方として、 for await ... ofは使用せずにreaderを取り出して処理する書き方もあります。
できることとしては同じため、参考までにどうぞ。

コード全文はこちら
const response = await fetch('~chatGPT_URL~', {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "api-key": "~chatGPT_key~",
    },
    body: JSON.stringify({
      messages: ["~チャットの履歴データ~"],
      stream: true, // ここでストリーミング指定をします
      stream_options: { include_usage: true }, // token数を取得したい場合は明示的に指定する必要があります
    }),
});

// readerを取り出す
const reader = response.body.getReader();

// while文を使用してストリーミングでデータを受け取る
while (true) {
  // 完了状況(done)とストリーミングデータ(value)を受け取ることができます
  const { done, value } = await reader.read();
  if (done) break;

  // このvalueが上記の例のchunkに該当します
  // ここで上記同様ストリーミングの処理を書いていきます
  console.log(value);
}

おわりに

今回はReadableStreamについてまとめていきました。
この記事では基礎的な概要部分を書きましたが、イルシルに実装したときの例や実装時にハマったポイントなども今後記事にできればと思っています。

また、イルシルではプロダクトを成長させてくれるエンジニアを絶賛大募集しております!
興味を持ってくださった方は以下の固定コメントにあるURLからご応募ください!
話だけ聞いてみたい、という方も大歓迎です!

GitHubで編集を提案
株式会社イルシル

Discussion