Fetch Upload Streaming でチャットアプリを作ってみる
Fetch Upload Streaming とは
Fetch Upload Streaming とは、ブラウザの JavaScript の POST リクエストで HTTP のストリーミングを行える機能です。より具体的には、Fetch API の body
に ReadableStream や WritableStream のような Streams API を渡せるようになります。
const stream = new ReadableStream({})
fetch(`/send?room=${roomId}`, {
method: "POST",
body: stream,
duplex: "half",
});
かねてより Fetch API ではレスポンスを ReadableStream
で取得できたのですが、反対にリクエスト時に body
に ReadableStream
を渡すことはできませんでした。Chrome 105 から Fetch Upload Streaming が有効となります。
Fetch Upload Streaming を使用すれば、WebSocket のようなリアルタイムなデータのやり取りも Fetch API
だけで可能となります。今回は Fetch API
を利用してリアルタイムなチャットアプリを作成してみます。
コードの内容は以下を参照してください。
クライアントサイド
まずはクライアント側のコードから見ていきましょう。クライアント側のコードは static
フォルダ配下に作成します。はじめに input
の入力値やフォームの submit
イベントを捕捉するために document.querySelector
で DOM 要素を取得しています。チャットアプリなので roomId
も必要です。roomId
はクエリパラメータから取得し、存在しない場合にはアラートを出して処理を終了しています。
const input = document.querySelector("#input");
const form = document.querySelector("#form");
const messages = document.querySelector("#messages");
const quit = document.querySelector("#quit");
const roomId = new URLSearchParams(window.location.search).get("roomId");
if (!roomId) {
alert("No roomId given");
return;
}
ストリーム をアップロード
ストリームをアップロードするために ReadableStream
インスタンスを作成します。ReadableStream() コンストラクター の start
メソッドを利用してストリームを生成します。
const stream = new ReadableStream({
start(controller) {
// フォームがサブミットされたとき
form.addEventListener("submit", (event) => {
event.preventDefault();
const message = input.value;
input.value = "";
controller.enqueue(message); // ストリームに文字列を追加
});
quit.addEventListener("click", () => controller.close());
},
}).pipeThrough(new TextEncoderStream());
form
が submit されたとき、input
の入力値を取得します。controller.enqueue
メソッドでストリームに文字列を追加します。ストリームの生成を停止するためには controller.close()
を呼び出します。これは「quit」ボタンをクリックしたときに呼び出すようにしています。
stream は最終的に ReadableStream.pipeThrough() により別の形式に変換されます。ここでは TextEncoderStream() により文字列のストリームを UTF=8 エンコーディングでバイトに変換されます。
作成した ReadableStream
インスタンスは fetch
の body
パラメータとして設定されます。
fetch(`/send?room=${roomId}`, {
method: "POST",
headers: { "Content-Type": "text/plain" },
body: stream,
duplex: "half",
});
ストリームを使用するためには duplex: "half"
を設定する必要があります。HTTP の機能では「リクエストを送信している間にレスポンスを受信し始めることができるかどうか」というものがあります。duplex: "half"
はリクエストボディが完全に送信されるまでレスポンスは受信できません。これはブラウザのデフォルトのパターンです。
しかし、例えば Deno の Fetch などの実装はリクエストが完了する間にレスポンスが利用可能となる deplex: "full"
がデフォルトとなっています。
このような互換性の問題を回避するために duplex: "half"
をリクエストに必ず設定する必要があるのです。
ストリームを読み込む
Fetch API で res.body
を ReadableStream
として取得します。
fetch(`/receive?room=${roomId}`).then(async (res) => {
// ストリームを読むためにリーダーを取り付ける
const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
// ストリームが閉じられるまでチャンクを取得する
while (true) {
const { done, value } = await reader.read();
if (done) return;
const newMessage = document.createElement("li");
messages.appendChild(newMessage);
newMessage.innerText = value
}
});
ストリームを読むためにはリーダーを取り付ける必要があります。はじめに res.body.pipeThrough(new TextDecoderStream())
で文字列の stream を UTF=8 エンコーディングでバイトに変換します。変換後のデータは getReader()
を呼び出すことでリーダーが作成され、ストリームがロックされます。このリーダーが開放されるまで、他のリーダーはこのストリームを読み取ることができません。
リーダーを取り付けたら ReadableStreamDefaultReader.read() メソッドを使用してストリームからチャンクを読み取ることができます。チャンクは value
プロパティから取得できます。done
が true
を返した場合には、ストリームが閉じられたことを意味します。
取得した chunk を元に li
要素を作成して新たに DOM に追加することで取得したメッセージを表示できます。
Fetch Upload Streaming が有効かどうかの判定
Fetch Upload Streamig をブラウザがサポートしているかどうか判定するために、ちょっぴり奇妙なコードを使用します。
const supportsRequestStreams = (() => {
let duplexAccessed = false;
const hasContentType = new Request('', {
body: new ReadableStream(),
method: 'POST',
get duplex() {
duplexAccessed = true;
return 'half';
},
}).headers.has('Content-Type');
return duplexAccessed && !hasContentType;
})();
if (!supportsRequestStreams) {
// …
}
もしブラウザが Fetch Upload Streaming をサポートしていない場合、body
に ReadableStream
のインスタンスを渡した際に ReadableStream
のインスタンスの toString()
メソッドが呼ばれます。その結果 body
には '[object ReadableStream]'
という string
型が指定されることになります。body
に string
型が渡された場合には、Content-Type
ヘッダーに text/plain;charset=UTF-8
という値が自動的に設定されます。つまりは、Content-Type
ヘッダーが設定されている(headers.has('Content-Type')
が true
)ときにはブラウザが Fetch Upload Streaming をサポートしていないと判定できます。
Safari は少々やっかいで、body
にストリームを指定することをサポートしているものの、Fetch API において使用することは許可されていません。そのため、現在 Safari がサポートしていない duplex
オプションが有効かどうかで判定しています。
HTML
HTML コードはシンプルなので、特に説明する必要もないでしょう。
<h1>Chat via fetch upload streaming</h1>
<form id="form">
<input type="text" id="input" placeholder="Message">
<button type="submit" id="send">Send</button>
</form>
<ul id="messages"></ul>
<button id="quit">Quit</button>
サーバーサイド
Fetch Upload Streaming には制限があり HTTP/1.x で利用しようとすると ERR_H2_OR_QUIC_REQUIRED
というエラーで失敗します。これは HTTP/1.1 の規則ではリクエストとレスポンスのボディは Content-Length
ヘッダーを送信して相手側が受け取るデータの量を知るか、メッセージのフォーマットを変更して chunked エンコードを使用する必要があるためです。
つまりはサーバーのコードは HTTP/2 または HTTP/3 で実装する必要があります。
プロジェクトのセットアップ
サーバー側のコードは Express で実装します。spdy は Node.js で HTTP/2 サーバーを実装するためのモジュールです。
npm init -y
npm install express spdy
spdy
を使用するためにはサーバー証明書が必要なので openssl
コマンドで作成します。
openssl req -x509 -sha256 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt
package.json
にサーバーを起動するコマンドを追加します。
{
"scripts": {
"start": "node server.js"
},
}
サーバーコードの実装
server.js
ファイルを作成して次のようにコードを実装します。
import express from "express";
import spdy from "spdy";
import fs from "fs";
const app = express();
const receivers = new Map();
app.post("/send", (req, res) => {
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
res.status(200);
req.on("data", (chunk) => {
const set = receivers.get(room);
if (!set) return;
for (const res of set) res.write(chunk);
});
req.on("end", (chunk) => {
if (res.writableEnded) return;
res.send("Ended");
});
});
app.get("/receive", (req, res) => {
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
if (!receivers.has(room)) {
receivers.set(room, new Set());
}
receivers.get(room).add(res);
res.on("close", () => {
const set = receivers.get(room);
set.delete(res);
if (set.size === 0) receivers.delete(room);
});
res.status(200);
res.set("Content-Type", "text/plain");
});
app.use(express.static("static"));
const port = process.env.PORT || 3000;
spdy
.createServer(
{
key: fs.readFileSync("./server.key"),
cert: fs.readFileSync("./server.crt"),
},
app
)
.listen(port, (err) => {
if (err) {
console.error(err);
return;
}
console.log(`Listening on port ${port}`);
});
リクエストの送信
まずはリクエストを送信する /send
パスのコードを見ていきます。クエリパラメータから room
ID を取得します。
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
res.status(200);
req.on("data")
で stream data イベントを購読します。クライアントから controller.close()
で接続が閉じられた場合には req.on("end")
イベントがコールされます。
stream data を受信したとき、receivers
Map オブジェクトから room
ID を使用して対応する /receive
パスの res
オブジェクトに対して受信したチャンクデータを送信します。
req.on("data", (chunk) => {
const set = receivers.get(room);
if (!set) return;
for (const res of set) res.write(chunk);
});
req.on("end", (chunk) => {
if (res.writableEnded) return;
res.send("Ended");
});
リクエストの受信
/receive
パスにクライアントから接続してリクエストの受信を実施します。/send
パスと同様にクエリパラメータから room
ID を取得します。
const room = req.query.room;
if (!room) {
res.status(400).send("No room given");
return;
}
receivers
Map オブジェクトの対応する room
ID に res
オブジェクトをセットします。res
オブジェクトは /send
パスからチャンクデータを送信するために使用されます。
if (!receivers.has(room)) {
receivers.set(room, new Set());
}
receivers.get(room).add(res);
HTTP/2 サーバーの作成
最後に spdy
モジュールを使用して HTTP/2 サーバーを作成します。createServer
メソッドにはプロジェクトのセットアップ時に作成したサーバー証明書と Express
の app
オブジェクトを設定します。後は listen
メソッドで 3000 ポートを指定しています。
spdy
.createServer(
{
key: fs.readFileSync("./server.key"),
cert: fs.readFileSync("./server.crt"),
},
app
)
.listen(port, (err) => {
if (err) {
console.error(err);
return;
}
console.log(`Listening on port ${port}`);
});
感想
やっていること自体は WebSocket で実現できることとほとんど変わらないのですが、HTTP の技術のみで実装できるのが嬉しいですね。WebSocket 用のサーバー建てるのって結構面倒だったりしますしね。
Discussion