SODA Engineering Blog
📲

AIアプリを陰で支える通信プロトコル「SSE」を学びましょう

に公開

What?

サーバ送信イベント(SSE)という通信プロトコルがあります。このSSEが、MCPやLLMチャットなどのAIアプリの通信プロトコルとして採用されています。

本記事ではSSEとは何か? AIアプリにおいてどのように使われているのか? を概説します。

ゴール

  • サーバ->クライアント方向への通信の世界に触れる
  • SSEの概念と基本的な使い方を理解する(Go, JS, Flutter)
  • SSEの利用例としてのAIチャットアプリとMCPサーバを学ぶ

デモ

AIチャットアプリで、AIからの回答をストリーム表示するのにSSEを使っています。サーバがLLMからの回答をストリームとして受け取っているので、それをSSEでそのままクライアントにpushすることで実現しています。

サーバー送信イベント(SSE)とは何か?

サーバー送信イベント(Server-Sent Events、SSE)は、HTTPプロトコルを使用してサーバーからクライアント(ウェブブラウザなど)へ一方的にデータをストリーム配信するための標準技術です。現代のウェブアプリケーションにおいて、SNSの通知、株価のリアルタイム表示、ニュース速報、処理状況のプログレスバーなど、サーバー側で発生したイベントを即座にクライアントに通知する「リアルタイム性」が求められる場合に利用されます。

従来のリアルタイム通信手法である「ポーリング」や「ロングポーリング」が抱えていた、無駄な通信によるサーバー・ネットワーク負荷やタイムラグ、実装の複雑さといった課題を解決するために導入されました。SSEを利用することで、ユーザーエージェントはネットワークリソースをより有効活用でき、特に携帯機器でのバッテリー寿命の節約にもつながるとされています。

WebSocketとの比較

SSEとWebSocketはどちらも接続を維持しながらサーバからクライアントへのイベント送信ができるプロトコルで、用途は似ています。

大きな違いは2点です。

WebSocketは、サーバーとクライアント間の双方向通信を提供しますが、SSEは、サーバーからクライアントへの単方向通信を提供します。

  • WebSocket: サーバーとクライアント間で双方向のリアルタイム通信が必要な場合に利用されます。例えば、チャットアプリケーション、オンラインゲーム、リアルタイムコラボレーションツール、インタラクティブなダッシュボードなどです。
  • SSE: 主にサーバーからクライアントへの単方向のデータプッシュに利用されます。例えば、ニュースフィード、株価ティッカー、通知、ライブログなど、サーバーからの情報が一方的に更新され続けるようなシナリオに適しています。

サンプルのAIチャットアプリは、LLMが生成した回答チャンクをサーバからリアルタイムで送信するという片方向通信のためSSEを採用しました(ただしWebSocketを採用しても大きな問題があるわけではありません)。

WebSocketは専用プロトコルですが、SSEはHTTPの上に作られたプロトコルです。

サーバがGoの場合、WebSocketを使う場合は外部ライブラリを使う必要があります。SSEはhttpパッケージで実装できます。

クライアントはWebSocketの場合は専用のWebSocketクライアントを使うことになりますが、SSEの場合はEventSourceクライアントという専用クライアント、または通常のHTTPクライアントでも利用可能です。

その他の差分

SSEはテキストのみ送信可能ですが、WebSocketはバイナリも送信可能です。

WebSocketは実際のデータ送信時にヘッダを送らないので、大量のデータを送信する場合はデータ量を節約できます。

AIチャットアプリの実装例

サーバサイド

Content-Type: text/event-stream を送ると、そこからSSEが始まります。

// SSEを開始
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")

// SSE
err = ConverseStream(prompt, func(chunk string) {
    fmt.Fprintf(c.Writer, "event: chunk\n")
    fmt.Fprintf(c.Writer, "data: %s\n", chunk)
    fmt.Fprint(c.Writer, "\n") // イベント区切り
    c.Writer.Flush()
})

ConverseStreamはLLMを呼び出している関数です(実体はAmazon BedrockのConverseStream APIのラッパ)。
第2引数はLLMからチャンク(会話の断片↓)が返されるたびに呼び出されるコールバックで、そのチャンクをそのままSSEのイベントとしてクライアントに送信しています。

実際に送信しているデータ

event: chunk
data: 日本

event: chunk
data: の

event: chunk
data: 首都

event: chunk
data: は

event: chunk
data: 東京

クライアントサイド(JS)

EventSourceインターフェイスを作成してコールバックを定義します。

form.addEventListener("submit", async (e) => {
const prompt = encodeURIComponent(promptInput.value);
const sseUrl = `/v2/ai-search/stream?prompt=${prompt}`;
const evtSource = new EventSource(sseUrl);

// chunkイベントのハンドリング
evtSource.addEventListener("chunk", (event) => {
    resultDiv.innerHTML += event.data.replace(/\n/g, "<br>");
});

// 異常系
evtSource.onerror = (error) => {
    console.log("end: ", error);
    setTimeout(() => {
        if (error.target.readyState != EventSource.CLOSED) {
            evtSource.close();
        }
    }, 1000);
});

表示

<div id="result">日本の首都は東京</div>

エラーの実装例

EventSourceはHTTPのステータスコードを扱えないので、errorイベントとして送信する。

event: error
data: {"code": "unauthorized", "message": "ログインしてください。"}
...
// New
// エラーレスポンスのハンドリング
evtSource.addEventListener("error", (event) => {
    try {
    const errorData = JSON.parse(event.data);
    let errorMessage = "エラーが発生しました: ";

    // エラーコードに応じてメッセージをカスタマイズ
    switch (errorData.code) {
        case "unauthorized":
        errorMessage = "ログインしてください。";
        break;
        case "too_many_requests":
        errorMessage =
            "リクエスト制限に達しました。しばらくしてから再度お試しください。";
        break;
        default:
        errorMessage += errorData.message || "不明なエラー";
    }

    evtSource.close();
});

SSEの主な特徴と仕組み

改めてSSEの説明をしてみます。SSEの主要な特徴は以下の通りです。

  • 単方向通信: SSEはサーバーからクライアントへのデータ配信に特化した単方向の通信モデルです。クライアントからサーバーへデータを送信する必要がない場合に適しています。
  • 標準HTTPプロトコルの利用: 特別なプロトコルを必要とせず、既存の標準的なHTTPプロトコル(HTTP/1.1またはHTTP/2)を利用します。これにより、プロキシやファイアウォールなどの既存のウェブインフラと相性が良く、導入の障壁が低いというメリットがあります。サーバ側実装はシンプルになります。例えばWebSocketを使う場合はwsプロトコルを受信する設定をwebサーバの設定ファイルに記述する必要があります。
  • 永続的なコネクション: 一度確立されたHTTPコネクションを維持し、その上でサーバーがデータを継続的に送信します。コネクションは、EventSource.close()メソッドが呼び出されるまで開いたまま維持されます。
  • シンプルな実装: クライアント側の実装はEventSource APIを使用することで非常に簡単であるとされています。EventSource APIを使うと、ネットワークの切断など、何らかの理由で接続が切れても、ブラウザが自動的にサーバーへの再接続を試みます。
  • 接続数の制限: HTTP/1.1を使用している場合、ブラウザは同一ドメインに対して同時に開ける接続数に上限(通常6つ程度)を設けていることがあります。SSEが1つの接続を占有するため、この制限が複数のタブを開く際に問題となる可能性があります。ただし、HTTP/2を使用している場合は、多重化によりこの問題は緩和されます。

EventSourceインターフェイス

JSクライアント側でSSEを扱うためのインターフェイスは、EventSourceインターフェイスです(XHRやfetchを使うようにEventSourceを使う)。このインターフェイスを利用することで、HTTPサーバーとの永続的な接続を開き、イベントを受信することができます。

EventSourceオブジェクトを作成し、SSEで送られてきた各イベントに対してイベントリスナーを登録することでAPIを利用します。

Flutterでも、EventSourceを使ってSSEを実装できます。

EventSourceは制約があり、GETリクエストしかできず、ステータスコードを受け取ったり、エラー時のレスポンスを受け取ることもできません。

fetchなど他のクライアントでSSEを受け取ることもできますが、低いレイヤーで最適化されているEventSourceを使うのが推奨されます。

EventSourceはWeb Workerで起動する、ということでWeb Workerの復習

EventSourceは、内部的にWeb Workerで起動します。

Web WorkerとはUIスレッドとは別のスレッドでJavaScriptを実行するための仕組みです。

なぜウェブワーカーで起動するのか?

JavaScriptのメインスレッドは、UIのレンダリング、ユーザー入力の処理、DOM操作など、ブラウザの多くの重要なタスクを担当しています。メインスレッドで時間のかかる処理や継続的な処理を行うと、UIがフリーズしたり、ユーザー操作にラグが生じたりして、ユーザーエクスペリエンスが低下します。

EventSourceは、サーバーとの永続的な接続を維持し、継続的にデータを受信します。特にデータが頻繁に送られてくる場合、その受信と処理はメインスレッドにとって負担になる可能性があります。

ウェブワーカーは、JavaScriptのコードをバックグラウンドスレッドで実行するためのメカニズムです。これにより、メインスレッドがブロックされることなく、重い計算や継続的なネットワーク処理(EventSourceによるデータ受信など)を行うことができます。

したがって、EventSourceをウェブワーカーで起動する主な理由は、メインスレッドの応答性を保ち、UIのパフォーマンスを向上させるためです。

大抵の場合専用ワーカー(呼び出し元のJSと一対一で通信する)が使われますが、サービスワーカー(URLを持ち複数プロセスからアクセスできる、Web Pushなどで使われる)など他のワーカーが使われることもあります。

MCPの通信プロトコルとしてのSSE

かつてMCPクライアント・サーバ間の通信プロトコルは「標準出力」「HTTP SSE」の二択でした。

SSEが採用されていた理由はMCPサーバがMCPクライアントにリアルタイム・継続的にデータを送信するためでした。

ですが、現在はSSEはStreamable HTTPという、独自のプロトコルに置き換えられています。

これによりMCPサーバはSSEか従来のHTTPを使うかを選択できるようになりました(SSEはStreamable HTTPで引き続き内部的に利用されています)。

入門 Streamable HTTP

https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http

「Streamable HTTP」は、以前の「HTTP+SSE」に代わる新しい通信プロトコルです。これはHTTPのPOSTとGETリクエストを利用し、必要に応じて**Server-Sent Events (SSE)**を使って複数のサーバーメッセージをストリーミングできます。これにより、基本的なサーバーだけでなく、ストリーミングやサーバーからクライアントへの通知・リクエストに対応した高機能なサーバーも構築できます。

主要な特徴と仕組み

単一のMCPエンドポイント: サーバーは、POSTとGETの両方をサポートする単一のHTTPエンドポイントを提供する必要があります。

クライアントからサーバーへのメッセージ送信(HTTP+α)

  • クライアントはJSON-RPCメッセージをHTTP POSTリクエストとしてMCPエンドポイントに送信します。Acceptヘッダーにapplication/jsonとtext/event-streamの両方を指定する必要があります。リクエストボディには、単一のJSON-RPCリクエスト/通知/レスポンス、またはそれらのバッチを含めることができます。
  • サーバーは、リクエストにJSON-RPCリクエストが含まれている場合、SSEストリームを開始するか、JSONオブジェクトを返す必要があります。

サーバーからクライアントへのメッセージ受信(HTTP SSE)

  • クライアントはHTTP GETリクエストをMCPエンドポイントに発行してSSEストリームを開き、サーバーからの通知やリクエストを受信できます。
  • サーバーはこのGETリクエストに対してContent-Type: text/event-streamを返すか、405エラーを返します。

セキュリティと堅牢性

  • セキュリティ警告: DNSリバインディング攻撃を防ぐため、サーバーはOriginヘッダーの検証、localhostへのバインド、および適切な認証の実装が必須または推奨されます。
  • 再開可能性と再配信: 接続が切断された場合、サーバーはSSEイベントにIDを付与することで、クライアントがLast-Event-IDヘッダーを使用してストリームを再開し、未送信のメッセージを再配信できるようになります。
  • サーバーは初期化時にMcp-Session-IdヘッダーでセッションIDを割り当てることができます。クライアントは以降のリクエストでこのセッションIDを含める必要があります。クライアントは、不要になったセッションをHTTP DELETEリクエストで明示的に終了できます。

後方互換性

以前のHTTP+SSEトランスポートとの後方互換性も考慮されており、サーバーは古いエンドポイントも並行して提供でき、クライアントは両方のトランスポートに対応できるよう、初期化リクエストの動作で判断する仕組みが示されています。

まとめ

  • このStreamable HTTPプロトコルは、より柔軟で堅牢なクライアント・サーバー間通信を実現するための設計となっています。
  • SSEはサーバ→クライアントの片方向通信でしたが、Streamable HTTPはサーバ←→クライアントの双方向通信が擬似的にできます。text/event-streamをクライアントから送信できたり、POSTリクエストに対して制約付きのストリームレスポンスを返せたりと独自仕様がいろいろ入っているようです(実際触ってみないと使用感が分からなさそうですが)

[補講]Flutterアプリでの実装

FlutterでServer-Sent Events(SSE)を実装するには、いくつかの方法があります。ここでは、主要な3つのアプローチを比較し、それぞれの特徴と最適なユースケースを探ります。

実装方法の全体比較

まずは、これから紹介する3つの方法の特徴を一覧で見てみましょう。

項目 プリミティブ実装 (http) flutter_client_sse eventflux
手軽さ 中〜高
機能性 低 (自前で実装)
安定性 低 (自前実装に依存)
カスタマイズ性
おすすめ度
ユースケース 学習目的、依存を極限まで減らしたい場合 シンプルな機能を手早く実装したい場合 高度な接続管理や安定性が求められる本番環境

方法1: プリミティブな実装 (httpパッケージ)

Dart標準のhttpパッケージを使い、SSEの仕様に沿って手動で実装する方法です。

メリット

✅ 依存が少ない: httpパッケージ以外の追加ライブラリが不要です。

✅ 学習に最適: SSEの仕組み(ストリーム、イベント形式)を深く理解できます。

✅ 高い柔軟性: 細かい挙動を完全にコントロールできます。

デメリット

❌ 実装が複雑: レスポンスストリームの分割、data:event:形式のパース、UTF-8デコードなどを自前で行う必要があります。

❌ 再接続ロジックが必須: 接続が切れた際の再接続処理(retry:フィールドの解釈など)を自分で実装する必要があり、非常に手間がかかります。

❌ コードが冗長: 定型的な処理が多く、コードが長くなりがちです。

コード例(概念)

import 'package:http/http.dart' as http;
import 'dart:convert';

void connectSSE() async {
  final client = http.Client();
  final request = http.Request("GET", Uri.parse("YOUR_SSE_ENDPOINT"));
  
  // SSE接続には必須のヘッダー
  request.headers['Cache-Control'] = 'no-cache';
  request.headers['Accept'] = 'text/event-stream';

  final response = await client.send(request);

  // レスポンスのストリームを購読
  response.stream
      .transform(utf8.decoder)       // バイトをUTF-8文字列に変換
      .transform(const LineSplitter()) // 改行で分割
      .listen((line) {
    // SSEの仕様に基づきデータをパース
    if (line.startsWith('data:')) {
      final data = line.substring(5).trim();
      print('Received data: $data');
      // ここでさらにJSONパースなどを行う
    }
  }, onError: (error) {
    print('Error: $error');
    // ここに再接続ロジックを自前で実装する必要がある
  }, onDone: () {
    print('Stream closed');
    // ここに再接続ロジックを自前で実装する必要がある
  });
}

ポイント: この方法はSSEの仕組みを学ぶには最適ですが、本番環境での利用は多くの落とし穴があるため推奨されません。

方法2: flutter_client_sse パッケージ

SSEクライアントを手軽に実装するために作られた、シンプルなパッケージです。

メリット

✅ 非常に手軽: SSEClient.subscribeToSSE を呼び出すだけで、簡単に購読を開始できます。

✅ シンプル: APIが直感的で分かりやすく、学習コストが低いです。

デメリット

❌ 機能が限定的: 自動再接続は行いますが、その間隔の調整や、詳細なエラーハンドリング、接続状態の管理といった高度な機能は限定的です。

❌ メンテナンス状況: 他のパッケージと比べて更新が活発でない可能性があります(利用前にpub.devで確認しましょう)。

コード例

import 'package:flutter_client_sse/flutter_client_sse.dart';

void connectWithFlutterClientSSE() {
  SSEClient.subscribeToSSE(
    url: 'YOUR_SSE_ENDPOINT',
    header: {
      "Accept": "text/event-stream",
      "Cache-Control": "no-cache",
    }
  ).listen(
    (event) {
      // パース済みのSSEModelオブジェクトが受け取れる
      print('Id: ${event.id}');
      print('Event: ${event.event}');
      print('Data: ${event.data}');
    },
    onError: (error) {
       print('Error: $error');
    }
  );
}

ポイント: とにかく手早くSSEを試したい、または非常にシンプルな機能で十分な場合に適しています。

方法3: eventflux パッケージ

より高機能で堅牢なSSEクライアントを実装するために設計されたパッケージです。

メリット

✅ 高機能で安定: 洗練された自動再接続ロジック(指数バックオフなど)、接続状態(接続中、切断など)の詳細な管理機能を提供します。

✅ カスタマイズ性が高い: Dioを内部で利用しており、コネクションタイムアウトやカスタムヘッダーなど、HTTPリクエストに関する詳細な設定が可能です。

✅ モダンなAPI: StreamベースのAPIが使いやすく、FlutterのStreamBuilderなどとスムーズに連携できます。

デメリット

❌ やや学習コストが高い: flutter_client_sseに比べると機能が多い分、APIの選択肢が多く、少し学習が必要です。

コード例

import 'package:eventflux/eventflux.dart';

void connectWithEventFlux() {
  _eventFlux = EventFlux.spawn();
  // EventFluxで接続
    _eventFlux!.connect(
      EventFluxConnectionType.get,
      url,
      header: <String, String>{
        'Accept': 'text/event-stream',
        'Cache-Control': 'no-cache',
      },
      onSuccessCallback: (EventFluxResponse? response) {
        if (response?.stream != null) {
          _eventSubscription = response!.stream!.listen(
            (eventData) {
              _handleChunkEvent(eventdata.data);
            },
            onError: (Object error) {
              _handleError('ストリーム処理エラーが発生しました: $error');
            },
          );
        }
      },
      onError: (EventFluxException error) {
        _handleError('接続エラーが発生しました: ${error.message}');
      },
    );
}

ポイント: 本番環境で安定したリアルタイム通信を実装するなら、現状これが最もおすすめです。

FlutterでSSEやってみるまとめ

Flutterの場合EventSourceが標準ライブラリとして提供されていません。

よって選択肢は以下のようになります。

  • http, dioなどの標準クライアントを使って実装:SSEの機能をいい感じにに使う場合は追加実装が必要
  • 外部のEventSourceライブラリを導入:HTTPとしての機能には乏しいが、SSEの機能をすぐに利用できる
  • flutter_client_sse, eventfluxなどのモダンなライブラリを導入:HTTP, SSEの機能の両立ができる

様々な理由で標準クライアントを使いたい場合はあるでしょう。
通常のHTTPクライアントでも、レスポンスボディをストリームとして扱えるクライアントであればSSEクライアントとして利用できます。JSのfetchやdartのdioやhttpなど。逆にそれができないXHRなどは難しいです。
新しいrhttpのようなクライアントでもstreamの節が丁寧に説明されているので利用できるでしょう。
実装方法としては、どのクライアントでも、bodyをstream型として受け取るオプションやメソッドがあるはずなのでそれを使うと、chunkを受信するたびに発生するコールバックを定義してSSEクライアントとして利用できるんじゃないかと思います。
ただし、イベントのパースやネットワークエラー時の再接続はスクラッチで記述する必要があります。

SODA Engineering Blog
SODA Engineering Blog

Discussion