🌐

Cloudflare Workers で Server Sent Event (Durable Objects 使用)

に公開

最終コード: https://github.com/ykrods/minimal-cloudflare-workers/blob/main/src/sse.ts

背景

SSE (Server Sent Event) を使い、リアルタイムで通知を受け取るデモアプリを作ろうとしたのですが、Cloudflare Workers でもどうやらできるらしいということで、実際にやってみました。あまり情報がなかったのでほぼ多くの場合で必要となるであろう Durable Objects も含めてまとめることにしました。

SSE 実装(1): シンプルな例

まず、Cloudflare Workers で SSE を実装する場合、ResponseReadableStream を渡せるため、ストリームAPI を利用するのが基本になります。コードとしては以下のようになります。

export default {
  fetch(request: Request, env: Env) {
    const url = new URL(request.url);

    if (url.pathname === "/events") {
      const encoder = new TextEncoder();
      const { readable, writable } = new TransformStream();
      const writer = writable.getWriter();

      let n = 0;
      setInterval(() => {
        writer.write(encoder.encode(`data: ${n}\n\n`));
        n++;
      }, 1000)

      return new Response(readable, {
        headers: {
          Connection: "keep-alive",
          "Content-Type": "text/event-stream",
          "Cache-Control": "no-cache",
        },
      });
    }
    return new Response(null, { status: 404 });
  },
}

このコード自体は問題なく動作しますが、ここから実際にアプリケーションを作っていこうとすると、問題が起きます。

問題点

Cloudflare Workers の実行環境は「異なるコンテキスト(リクエスト)間で Promise を受け渡すことができない」という制限があるため、別のリクエストなどからストリームに書き込むことができません。実際のアプリケーションでは他のユーザからの入力で新しいイベントを発生させたいことは多々あると思いますが、そのような場合には一手間加える必要があります。

(参考: 実際にやってみて出たエラー)

✘ [ERROR] A hanging Promise was canceled. This happens when the worker runtime is waiting for a Promise from JavaScript to resolve, but has detected that the Promise cannot possibly ever resolve because all code and events related to the Promise's I/O context have already finished.

SSE 実装(2): メッセージボックス+ポーリング

ではまぁ仕方ないということで、SSE接続ごとにメッセージボックス的なものを作り、ポーリングでメッセージが来ていたら書き込んでイベントを流そう、という方針にすると以下のような実装になります。

// 接続ごとに id を振り、メッセージボックスを作成する
const connections: { id: string, inbox: number[] } = []
let counter = 0

export default {
  fetch(request: Request, env: Env) {
    const url = new URL(request.url);

    if (url.pathname === "/events") {
      const encoder = new TextEncoder();
      const { readable, writable } = new TransformStream();
      const writer = writable.getWriter();

      const id = counter
      counter++

      const inbox = []
      connections.push({ id, inbox })

      // 新しい接続が発生するたびに現在の接続数を送信
      // メッセージボックスに要素を追加してイベントを送信
      connections.forEach(({ inbox }) => inbox.push(connections.length))

      const intervalId = setInterval(async () => {
        try {
          // メッセージボックスに要素があれば取り出して書き込む
          while (0 < inbox.length) {
            const n = inbox.shift()
            await writer.write(encoder.encode(`data: ${n}\n\n`));
          }
        } catch (e) {
          writer.close()
          clearInterval(intervalId);
        }
      }, 1000)

      return new Response(readable, {
        headers: {
          Connection: "keep-alive",
          "Content-Type": "text/event-stream",
          "Cache-Control": "no-cache",
        },
      });
    }
    return new Response(null, { status: 404 });
  },
}

このコードはローカル上ではそれらしく動作します。このコードでは新たな接続が発生するたびに現在の接続数を全ての接続元に対して送信していますが、実際にデプロイして接続してみると「接続数が常に1」という結果になります。これは Cloudflare Workers の挙動によって起こります。

Cloudflare Workers の実行環境

How Workers works - Cloudflare Workers doc に以下の解説があります。

  • 単一の Workers インスタンスは、あるリクエストの処理中に他のリクエストが到着した場合にそのリクエストを処理する 可能性 がある
  • 同時に発生した 2つのユーザーリクエストが Worker の同じインスタンスにルーティングされるか、または異なるインスタンスにルーティングされるかは保証されない
  • Cloudflare はグローバル状態の使用や変更を推奨しない

同じ地点からのリクエストなのでエッジは距離的に一番近い場所になり、SSE接続がある間は worker インスタンスが残っているので同じインスタンスで処理されそうな気もしますが、現状の挙動としては 1 リクエストごとに worker インスタンスが起動されているので、接続数がそれぞれ 1 になったのかと考えられます。まぁ、そこの挙動に関しては未保証ですし、実際にアプリを作る場合はそもそも違うエッジ上で worker が起動することも全然あり得るので、ドキュメントで言われているように worker がグローバルな状態を持たない方が良いというわけですね。

では一体どうするかというと、Durable Objects を使います。

Durable Objects

Durable Objects は特殊な Worker で、ストレージを持ち、通常の Worker から RPC でやり取りができます。状態を Durable Objects に持たせることで、 Worker 間で状態を共有することができます。また Durable Objects は以下の特徴を持ちます。

  • Durable Objects はグローバルに一意な ID を持ち、オブジェクトは最初に要求された場所の近くのエッジに分散配置される。

    • 具体的な例では、Durable Object ID = 1 が初めてアメリカで要求されたらアメリカに ID = 1 のオブジェクトが配置され、 ID = 2 が初めて日本で要求されたら日本に ID = 2 のオブジェクトが配置される。その後日本から ID = 1 のオブジェクトを要求した場合はアメリカまで通信することになる。
  • Durable Objects の ID をユーザID に対応させればユーザごとに分割してストレージを作成できるし、チャットルームのID を対応させればそのルームのメンバーのみストレージを共有するなど、柔軟なアクセス管理が可能

そのほか、ストレージに sqlite を乗せることで分散型のデータベースを構築できたりバッチ処理とかもできそうですが、今回は不使用なので触れません。

SSE 実装(3): Durable Objects を使用

Durable Objects を使用する場合 wrangler.jsoncdurable_objects フィールドを追記する必要があります。

{
  "name": "minimal-cloudflare-workers",
  "compatibility_date": "2025-04-03",
  "main": "./src/index.ts",
  "durable_objects": {
    "bindings": [
      {
        "name": "SSE_HUB",
        "class_name": "SSEHub"
      }
    ]
  }
}

次に、記述したクラス名で main ファイル ( src/index.ts ) でエクスポートします。

import { DurableObject } from "cloudflare:workers";

type Subscriber = {
  stream: WritableStream
  timeout: ReturnType<typeof setTimeout>
  start: Date
}

export interface Env {
  SSE_HUB: DurableObjectNamespace<SSEHub>;
}

export class SSEHub extends DurableObject {
  #subscribers: Subscriber[] = []

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
  }

  subscribe(stream: WritableStream) {
    // 接続にタイムアウトを設ける(後述)
    const timeout = setTimeout(() => {
      const index = this.#subscribers.findIndex(s => s.stream === stream);
      if (index !== -1) {
        const writer = stream.getWriter();
        writer.close(); // force close
        this.#subscribers.splice(index, 1);
        this.publish("disconnected", { subscribers: this.#subscribers.length });
      }
    }, 1000 * 30);

    this.#subscribers.push({
      stream,
      timeout,
      start: new Date(),
    })

    this.publish("connected", { subscribers: this.#subscribers.length })
  }

  async publish(name: string, data: Record<string, any>) {
    const encoder = new TextEncoder();

    const survivors = [];

    for (const { stream, timeout, start } of this.#subscribers) {
      const writer = stream.getWriter();
      try {
        await writer.write(encoder.encode(`event: ${name}\ndata: ${JSON.stringify(data)}\n\n`));
        survivors.push({ stream, timeout, start });
      } catch (e) {
        console.error(e)
        writer.close();
        clearTimeout(timeout);
      } finally {
        writer.releaseLock();
      }
    }

    this.#subscribers = survivors;
  }
}

リクエストハンドラ側では env.SSE_HUB.idFromName("shared"); で固定 ID を使うことで、どのリクエストからも 共通の Durable Objects にアクセスし、 subscribe メソッドで writable を Durable Object に渡すことでイベントをブロードキャストできるようにします。

export default {
  async fetch(request: Request, env: Env) {
    const id = env.SSE_HUB.idFromName("shared");
    const stub = env.SSE_HUB.get(id);
    const url = new URL(request.url);

    if (url.pathname === "/events") {
      const { readable, writable } = new TransformStream();

      await stub.subscribe(writable)

      return new Response(readable, {
        headers: {
          Connection: "keep-alive",
          "Content-Type": "text/event-stream",
          "Cache-Control": "no-cache",
        },
      });
    }
    return new Response(null, { status: 404 });
  },
} satisfies ExportedHandler<Env>

これで、期待した挙動になったわけですが、実装について補足事項が少しあります。

補足事項

WritableStream の仕様なので今までの全ての実装に言えることですが、 WritableStream はクライアント側からの切断を検知できないという点に注意が必要です。

WritableStream の API では切断を検知するAPIが存在せず、書き込みでエラーが出て初めてストリームが閉じていることがわかります。そのため、メモリ上に不要になったストリームが残り続ける懸念があります。また、通常のブラウザからの接続では問題ないと思われますが、curl でデバッグする場合、curl を終了しても TCP 接続が残り続けることがあるらしく( half-open というらしいがよくわかっていない) これに対応するにはサーバ側で明示的にタイムアウトを設けたり、定期的な接続確認を行う必要があります。

今回はクライアント側の EventSource は生きていれば勝手に再接続を行うので、一定時間経過したストリームを強制的にタイムアウトすることで対応しました。

感想

接続状態の確認で ping などを行うのであれば WebSocket にした方がやりやすいだろうし、Durable Objects に WebSocket Hibernation という機能もあるので WebSocket 使っておいた方が無難なのではという雰囲気が多少漂っています。とはいえ、いい勉強になったのと Cloudflare Workers が早くていい感じだったのでよかったかなと。

参考

Discussion