🌊

Fetch Upload Streaming でチャットアプリを作ってみる

2022/08/23に公開

Fetch Upload Streaming とは

Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch APIbodyReadableStreamWritableStream のような Streams API を渡せるようになります。

const stream = new ReadableStream({})

fetch(`/send?room=${roomId}`, {
  method: "POST",
  body: stream,
  duplex: "half",
});

かねてより Fetch API ではレスポンスを ReadableStream で取得できたのですが、反対にリクエスト時に bodyReadableStream を渡すことはできませんでした。Chrome 105 から Fetch Upload Streaming が有効となります。

Fetch Upload Streaming を使用すれば、WebSocket のようなリアルタイムなデータのやり取りも Fetch API だけで可能となります。今回は Fetch API を利用してリアルタイムなチャットアプリを作成してみます。

fetch-chat

コードの内容は以下を参照してください。

https://github.com/azukiazusa1/fetch-upload-streaming-chat-app

クライアントサイド

まずはクライアント側のコードから見ていきましょう。クライアント側のコードは static フォルダ配下に作成します。はじめに input の入力値やフォームの submit イベントを捕捉するために document.querySelector で DOM 要素を取得しています。チャットアプリなので roomId も必要です。roomId はクエリパラメータから取得し、存在しない場合にはアラートを出して処理を終了しています。

static/main.js
const input = document.querySelector("#input");
const form = document.querySelector("#form");
const messages = document.querySelector("#messages");
const quit = document.querySelector("#quit");

const roomId = new URLSearchParams(window.location.search).get("roomId");
if (!roomId) {
  alert("No roomId given");
  return;
}

ストリーム をアップロード

ストリームをアップロードするために ReadableStream インスタンスを作成します。ReadableStream() コンストラクターstart メソッドを利用してストリームを生成します。

static/main.js
const stream = new ReadableStream({
  start(controller) {
    // フォームがサブミットされたとき
    form.addEventListener("submit", (event) => {
      event.preventDefault();
      const message = input.value;
      input.value = "";

      controller.enqueue(message); // ストリームに文字列を追加
    });

    quit.addEventListener("click", () => controller.close());
  },
}).pipeThrough(new TextEncoderStream());

form が submit されたとき、input の入力値を取得します。controller.enqueue メソッドでストリームに文字列を追加します。ストリームの生成を停止するためには controller.close() を呼び出します。これは「quit」ボタンをクリックしたときに呼び出すようにしています。

stream は最終的に ReadableStream.pipeThrough() により別の形式に変換されます。ここでは TextEncoderStream() により文字列のストリームを UTF=8 エンコーディングでバイトに変換されます。

作成した ReadableStream インスタンスは fetchbody パラメータとして設定されます。

static/main.js
fetch(`/send?room=${roomId}`, {
  method: "POST",
  headers: { "Content-Type": "text/plain" },
  body: stream,
  duplex: "half",
});

ストリームを使用するためには duplex: "half" を設定する必要があります。HTTP の機能では「リクエストを送信している間にレスポンスを受信し始めることができるかどうか」というものがあります。duplex: "half" はリクエストボディが完全に送信されるまでレスポンスは受信できません。これはブラウザのデフォルトのパターンです。

しかし、例えば Deno の Fetch などの実装はリクエストが完了する間にレスポンスが利用可能となる deplex: "full" がデフォルトとなっています。

このような互換性の問題を回避するために duplex: "half" をリクエストに必ず設定する必要があるのです。

ストリームを読み込む

Fetch API で res.bodyReadableStream として取得します。

static/main.js
fetch(`/receive?room=${roomId}`).then(async (res) => {
  // ストリームを読むためにリーダーを取り付ける
  const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
  // ストリームが閉じられるまでチャンクを取得する
  while (true) {
    const { done, value } = await reader.read();
    if (done) return;
    const newMessage = document.createElement("li");
    messages.appendChild(newMessage);
    newMessage.innerText = value
  }
});

ストリームを読むためにはリーダーを取り付ける必要があります。はじめに res.body.pipeThrough(new TextDecoderStream()) で文字列の stream を UTF=8 エンコーディングでバイトに変換します。変換後のデータは getReader() を呼び出すことでリーダーが作成され、ストリームがロックされます。このリーダーが開放されるまで、他のリーダーはこのストリームを読み取ることができません。

リーダーを取り付けたら ReadableStreamDefaultReader.read() メソッドを使用してストリームからチャンクを読み取ることができます。チャンクは value プロパティから取得できます。donetrue を返した場合には、ストリームが閉じられたことを意味します。

取得した chunk を元に li 要素を作成して新たに DOM に追加することで取得したメッセージを表示できます。

Fetch Upload Streaming が有効かどうかの判定

Fetch Upload Streamig をブラウザがサポートしているかどうか判定するために、ちょっぴり奇妙なコードを使用します。

static/main.js
const supportsRequestStreams = (() => {
  let duplexAccessed = false;

  const hasContentType = new Request('', {
    body: new ReadableStream(),
    method: 'POST',
    get duplex() {
      duplexAccessed = true;
      return 'half';
    },
  }).headers.has('Content-Type');

  return duplexAccessed && !hasContentType;
})();

if (!supportsRequestStreams) {
  // …
}

もしブラウザが Fetch Upload Streaming をサポートしていない場合、bodyReadableStream のインスタンスを渡した際に ReadableStream のインスタンスの toString() メソッドが呼ばれます。その結果 body には '[object ReadableStream]' という string 型が指定されることになります。bodystring 型が渡された場合には、Content-Type ヘッダーに text/plain;charset=UTF-8 という値が自動的に設定されます。つまりは、Content-Type ヘッダーが設定されている(headers.has('Content-Type')true)ときにはブラウザが Fetch Upload Streaming をサポートしていないと判定できます。

Safari は少々やっかいで、body にストリームを指定することをサポートしているものの、Fetch API において使用することは許可されていません。そのため、現在 Safari がサポートしていない duplex オプションが有効かどうかで判定しています。

HTML

HTML コードはシンプルなので、特に説明する必要もないでしょう。

static/index.html
<h1>Chat via fetch upload streaming</h1>
<form id="form">
  <input type="text" id="input" placeholder="Message">
  <button type="submit" id="send">Send</button>
</form>
<ul id="messages"></ul>
<button id="quit">Quit</button>

サーバーサイド

Fetch Upload Streaming には制限があり HTTP/1.x で利用しようとすると ERR_H2_OR_QUIC_REQUIRED というエラーで失敗します。これは HTTP/1.1 の規則ではリクエストとレスポンスのボディは Content-Length ヘッダーを送信して相手側が受け取るデータの量を知るか、メッセージのフォーマットを変更して chunked エンコードを使用する必要があるためです。

https://web.dev/i18n/zh/fetch-upload-streaming/#默认仅限-http2

つまりはサーバーのコードは HTTP/2 または HTTP/3 で実装する必要があります。

プロジェクトのセットアップ

サーバー側のコードは Express で実装します。spdy は Node.js で HTTP/2 サーバーを実装するためのモジュールです。

npm init -y
npm install express spdy

spdy を使用するためにはサーバー証明書が必要なので openssl コマンドで作成します。

openssl req -x509 -sha256 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt

package.json にサーバーを起動するコマンドを追加します。

package.json
{
  "scripts": {
    "start": "node server.js"
  },
}

サーバーコードの実装

server.js ファイルを作成して次のようにコードを実装します。

server.js
import express from "express";
import spdy from "spdy";
import fs from "fs";

const app = express();
const receivers = new Map();

app.post("/send", (req, res) => {
  const room = req.query.room;
  if (!room) {
    res.status(400).send("No room given");
    return;
  }

  res.status(200);

  req.on("data", (chunk) => {
    const set = receivers.get(room);
    if (!set) return;
    for (const res of set) res.write(chunk);
  });

  req.on("end", (chunk) => {
    if (res.writableEnded) return;
    res.send("Ended");
  });
});

app.get("/receive", (req, res) => {
  const room = req.query.room;
  if (!room) {
    res.status(400).send("No room given");
    return;
  }

  if (!receivers.has(room)) {
    receivers.set(room, new Set());
  }

  receivers.get(room).add(res);

  res.on("close", () => {
    const set = receivers.get(room);
    set.delete(res);
    if (set.size === 0) receivers.delete(room);
  });

  res.status(200);
  res.set("Content-Type", "text/plain");
});

app.use(express.static("static"));

const port = process.env.PORT || 3000;
spdy
  .createServer(
    {
      key: fs.readFileSync("./server.key"),
      cert: fs.readFileSync("./server.crt"),
    },
    app
  )
  .listen(port, (err) => {
    if (err) {
      console.error(err);
      return;
    }
    console.log(`Listening on port ${port}`);
  });

リクエストの送信

まずはリクエストを送信する /send パスのコードを見ていきます。クエリパラメータから room ID を取得します。

server.js
const room = req.query.room;
if (!room) {
  res.status(400).send("No room given");
  return;
}

res.status(200);

req.on("data") で stream data イベントを購読します。クライアントから controller.close() で接続が閉じられた場合には req.on("end") イベントがコールされます。

stream data を受信したとき、receivers Map オブジェクトから room ID を使用して対応する /receive パスの res オブジェクトに対して受信したチャンクデータを送信します。

server.js
req.on("data", (chunk) => {
  const set = receivers.get(room);
  if (!set) return;
  for (const res of set) res.write(chunk);
});

req.on("end", (chunk) => {
  if (res.writableEnded) return;
  res.send("Ended");
});

リクエストの受信

/receive パスにクライアントから接続してリクエストの受信を実施します。/send パスと同様にクエリパラメータから room ID を取得します。

server.js
const room = req.query.room;
if (!room) {
  res.status(400).send("No room given");
  return;
}

receivers Map オブジェクトの対応する room ID に res オブジェクトをセットします。res オブジェクトは /send パスからチャンクデータを送信するために使用されます。

server.js
if (!receivers.has(room)) {
  receivers.set(room, new Set());
}

receivers.get(room).add(res);

HTTP/2 サーバーの作成

最後に spdy モジュールを使用して HTTP/2 サーバーを作成します。createServer メソッドにはプロジェクトのセットアップ時に作成したサーバー証明書と Expressapp オブジェクトを設定します。後は listen メソッドで 3000 ポートを指定しています。

server.js
spdy
  .createServer(
    {
      key: fs.readFileSync("./server.key"),
      cert: fs.readFileSync("./server.crt"),
    },
    app
  )
 .listen(port, (err) => {
    if (err) {
      console.error(err);
      return;
    }
    console.log(`Listening on port ${port}`);
  });

感想

やっていること自体は WebSocket で実現できることとほとんど変わらないのですが、HTTP の技術のみで実装できるのが嬉しいですね。WebSocket 用のサーバー建てるのって結構面倒だったりしますしね。

参考

GitHubで編集を提案

Discussion