🔥

ChatGPTのように文字がリアルタイムで流れるストリーミング通信の仕組みを、実際のTypeScriptソースコードを使って説明する

に公開

TypeScriptでサーバーとクライアント間の文字列ストリーミング通信を実現する簡単なサンプルを、fetch APIのReadableStreamを使って説明します。

demo.gif

仕組みの概要

サーバー側: 特定のエンドポイントにリクエストが来ると、HTTPレスポンスをすぐには完了させません。代わりに、res.write()を使って文字列の断片(チャンク)を少しずつ送信します。全てのデータを送り終えたら、res.end()でストリームを閉じます。
2. クライアント側: fetch APIでサーバーにリクエストを送り、レスポンスのbodyReadableStream)を取得します。このストリームからデータをチャンクごとに非同期で読み取り、受け取るたびに画面に表示します。


## サーバー側の実装 (Node.js + Express)

まず、文字列を定期的に送信するHTTPサーバーをExpressで構築します。

1. サーバーのコード (src/server.ts)

import express from 'express';

const app = express();
const port = 3000;

// 文字列をストリーミングで返すエンドポイント
app.get('/streaming', (req, res) => {
  // 接続が切断された場合も考慮
  req.on('close', () => {
    console.log('Client disconnected.');
  });

  // ヘッダーを設定
  res.setHeader('Content-Type', 'text/plain; charset=utf-8');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  let count = 0;
  const intervalId = setInterval(() => {
    count++;
    const message = `メッセージ ${count}: ${new Date().toLocaleTimeString()}\n`;
    
    // データをチャンクとしてクライアントに送信
    res.write(message);
    console.log(`Sent: ${message.trim()}`);

    // 5回送信したらストリームを終了
    if (count >= 5) {
      clearInterval(intervalId);
      res.end('ストリームの終わりです。'); // 最後のメッセージを送り、接続を閉じる
      console.log('Stream ended.');
    }
  }, 1000); // 1秒ごとに送信
});

app.listen(port, () => {
  console.log(`Server listening at http://localhost:${port}`);
});

ポイント

  • res.write(data): この関数で、レスポンスを完了させずにデータの断片を送信します。
  • res.end(): 全てのデータを送信し終えたら、この関数を呼び出してレスポンスを完了させます。
  • ヘッダー: Content-TypeCache-Controlを設定して、ブラウザがレスポンスを正しく解釈できるようにします。

## クライアント側の実装 (ブラウザ)

次に、サーバーからのストリーミングデータを受け取って表示するクライアント側のコードです。

1. HTMLファイル (public/index.html)
(プロジェクトルートにpublicフォルダを作成して配置してください)

<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <title>ストリーミング受信</title>
</head>
<body>
    <h1>サーバーからのストリーミングメッセージ</h1>
    <pre id="output"></pre>
    <script src="client.js"></script>
</body>
</html>

2. クライアントのTypeScriptコード (public/client.ts)
tscコマンドでJavaScriptにコンパイルして使います)

const outputElement = document.getElementById('output');

async function startStreaming() {
  try {
    const response = await fetch('/streaming'); // サーバーのエンドポイントにリクエスト

    if (!response.body) {
      throw new Error('Response body is null');
    }

    // レスポンスボディをReadableStreamとして取得
    const reader = response.body.getReader();
    // UTF-8のテキストとしてデコードするためのデコーダー
    const decoder = new TextDecoder('utf-8');

    // ストリームからデータを読み続ける
    while (true) {
      const { done, value } = await reader.read();

      // ストリームが終了したらループを抜ける
      if (done) {
        console.log('Stream finished.');
        break;
      }
      
      // 受け取ったデータ (Uint8Array) を文字列に変換
      const chunk = decoder.decode(value, { stream: true });
      if (outputElement) {
        outputElement.textContent += chunk;
      }
      console.log('Received chunk:', chunk);
    }
  } catch (error) {
    console.error('Streaming failed:', error);
    if (outputElement) {
        outputElement.textContent += '\nエラーが発生しました。';
    }
  }
}

startStreaming();

ポイント

  • fetch('/streaming'): サーバーのストリーミングエンドポイントを呼び出します。
  • response.body.getReader(): レスポンスボディからReadableStreamDefaultReaderを取得します。
  • await reader.read(): ストリームから次のデータチャンクを非同期で読み取ります。donetrueになるまで繰り返します。
  • TextDecoder: reader.read()が返すvalueUint8Arrayなので、これを文字列に変換するために使います。

## ソースコード

https://github.com/softjapan/nodejs-streaming

Discussion