🌟

Server Sent Eventsの色んな実装パターンを考える

2024/12/27に公開

こんにちは、クラウドエースの三原と申します。
今回は Server Sent Events に関して紹介させていただければと思います。

Server Sent Eventsとは

Server-Sent Events は簡潔に言うとサーバーからクライアントへ一方通行且つリアルタイムにイベントデータを配信する技術です。

アプリケーションのリアルタイム性を求められた際の選択肢の一つとして挙げられます。
(他に要求を満たす技術として WebSocket、ポーリング等があります。)

例として株価の監視やチャットツール等の他、代表例で言うと ChatGPT でモデルからの生成結果をユーザーに逐次通知する技術として使われています。
開発者コンソールでリクエストをキャプチャすると以下のように回答が終わるまでストリーミングでデータが流れているのがわかります。

event: delta_encoding
data: "v1"

event: delta
data: {"p": "", "o": "add", "v": {"message": {"id": "058d008f-5ff9-4c6b-9cc7-af7d64dd264d", "author": {"role": "system", "name": null, "metadata": {}}, "create_time": null, "update_time": null, "content": {"content_type": "text", "parts": [""]}, "status": "finished_successfully", "end_turn": true, "weight": 0.0, "metadata": {"is_visually_hidden_from_conversation": true}, "recipient": "all", "channel": null}, "conversation_id": "676a57a7-8f90-800b-9620-4d4f62ad85a5", "error": null}, "c": 0}
 
event: delta
data: {"p": "/message/content/parts/0", "o": "append", "v": "「タ"}   

event: delta
data: {"v": "トゥ"}   

event: delta
data: {"v": "ー」と「刺青("}     

event: delta
data: {"v": "いれずみ)」は、どちらも皮膚"}   

...(中略)

event: delta
data: {"p": "", "o": "patch", "v": [{"p": "/message/content/parts/0", "o": "append", "v": "注意が必要です。"}, {"p": "/message/status", "o": "replace", "v": "finished_successfully"}, {"p": "/message/end_turn", "o": "replace", "v": true}, {"p": "/message/metadata", "o": "append", "v": {"finish_details": {"type": "stop"}, "is_complete": true}}]}     

data: {"type": "conversation_detail_metadata", "banner_info": null, "blocked_features": [], "model_limits": [], "default_model_slug": "auto", "conversation_id": "676a57a7-8f90-800b-9620-4d4f62ad85a5"}

data: [DONE]

個人の所感として実装の容易さや各ブラウザのサポートのカバー率と比べて
活用事例の共有やプラクティスが少ないように思い、なかなか認知されていないように感じます。
概念として登場したのは2004年頃なので、分類でいえばかなりレガシーなのですが全く枯れている印象が無いので不思議な気がします。

私自身も生成AIが人口に膾炙するに至ったタイミングで周辺技術としての位置づけでようやく知った次第です。(そして同じような事書いてる人多いですね...)

というわけで Server Sent Events の実装やってきます。

バックエンド実装編

やりたい事


Server Sent Events (以下、SSE)のユースケースとしてユーザーに特定のデータの更新をリアルタイムに通知したいがまず挙げられるかと思います。
実装のパターンは色々あるとは思いますが SSE 接続を確立した上で、 RDB(PostgreSQL等) や MemoryStore(Redis) 等のデータストア内のデータ更新を検知し、その内容をクライアントに逐次通知していくような仕組みの実装を目指していきます

今回は PostgreSQL と Redis を使った以下の4つの SSE 実装パターンを紹介していきます。
notify/listenがPostgreSQLのみ利用可能な機能なのでRDBはPostgreSQLを利用します。

  • RDB notify/listen パターン(PostgreSQL のみ)
  • RDB ポーリングパターン
  • Redis Pub/Sub パターン
  • Redis Streams パターン

全て FastAPI(python) で実装しています。

RDB notify/listen パターン(PostgreSQL のみ)


以下の テーブル構成 を前提としています。

CREATE TYPE "EventType" AS ENUM ('ITEM_CREATE','ITEM_UPDATE');

CREATE TABLE "item" (
    "id" SERIAL NOT NULL,
    "name" TEXT NOT NULL,
    "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

    CONSTRAINT "item_pkey" PRIMARY KEY ("id")
);

FastAPI のコードは以下の通りです。ドライバは psycopg です。

import psycopg


@api_router.post(
    "/pg",
    status_code=status.HTTP_201_CREATED,
)
async def insert_item(name: str = Body(...)):
    async with await psycopg.AsyncConnection.connect(
        dbname="event_db",
        user="*******",
        password="*******",
        host="*******",
        port="5432",
    ) as async_connection:
        async with async_connection.cursor() as async_cursor:
            await async_cursor.execute(
                "INSERT INTO public.item (name) VALUES (%s) RETURNING id", (name,)
            )
  
            await async_connection.commit()
        return JSONResponse(
            content={"status": "ok"}, status_code=status.HTTP_201_CREATED
        )

@api_router.get(
    "/pg/stream",
    status_code=status.HTTP_200_OK,
)
async def event_stream_with_pg():
    async def generate():
        try:
            async with await psycopg.AsyncConnection.connect(
                dbname="event_db",
                user="*******",
                password="*******",
                host="*******",
                port="5432",
                autocommit=True,
            ) as async_connection:
                await async_connection.execute("LISTEN items_notification;")

                async for notify in async_connection.notifies():
                    print(notify)
                    yield f"data: {jsonable_encoder(notify.payload)}\n\n"
        except asyncio.CancelledError:
            print("Stream connection cancelled for event")
            raise
        finally:
            yield "data: Connection closed\n\n"
            print("Stream connection closed for event")

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
    )

PostgreSQL Notify/Listen とは

まずこの実装の前提として、 PostgreSQL の Notify / Listen なる機能を利用します。(CloudSQL でも利用可能です。)
本機能とトリガー等を組み合わせる事で、特定のテーブルに対する更新にトリガーして、その通知 Payload を Listen しているクライアントに通知し何かしらの処理を実行するイベントドリブンな実装が可能です。

例えば以下の例で言うと、トリガ、トリガ関数を以下のように構成する事で item テーブルに対する INSERT 文を実行すると、items_notification チャネルに対してidを含む json 文字列の Payload が通知されます。

CREATE OR REPLACE FUNCTION notify_item_create() RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify(
    'items_notification',
    json_build_object(
      'id', NEW.id,
      'name', NEW.name,
      'event_type', 'ITEM_CREATE'
    )::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER items_notify_create_trigger
AFTER INSERT ON public.item
FOR EACH ROW EXECUTE PROCEDURE notify_item_create();

items_notification チャネルに listen した上で INSERT 文を実行すると、トリガ関数で指定した Payload が通知されます。

event_db=> LISTEN items_notification;

event_db=> INSERT INTO public.item (name) VALUES ('aaaaaa') RETURNING id;
INSERT 0 1
Asynchronous notification "items_notification" with payload "{"id" : 18, "name" : "aaaaaa", "event_type" : "ITEM_CREATE"}" received from server process with PID 3396.

この Notify/Listen と SSE を組み合わせてitemテーブルの作成にトリガーしてクライアントに通知していきます。

実例

①アプリケーションサーバー起動後に以下の curl を投げて SSE 接続を確立します。

curl -N \
     -H "Accept:text/event-stream" \
     http://{host}/api/v1/pg/stream

③以下の curl を投げてitemテーブルに対する INSERT 文を実行する POST メソッド API を実行します。

curl -X POST "http://{host}/api/v1/pg" \
    -H "Content-Type: application/json" \
    -d '"test item name"'

{"status":"ok"}

③Insert されたデータが逐次通知されます。

data: {"id" : 19, "name" : "test item name", "event_type" : "ITEM_CREATE"}

data: {"id" : 20, "name" : "test item name", "event_type" : "ITEM_CREATE"}

制限事項

この実装の問題点として、listen 中はコネクションを占有する事になりクライアントが複数接続していた場合にコネクションが枯渇する可能性が高いです。

それぞれ /pg/stream エンドポイントに対して複数のターミナルから SSE 接続を確立した上で、
pg_stat_activity を確認すると以下のように LISTEN 中のコネクションが確認できます。

event_db=> SELECT pid,
state,
query,
state_change FROM pg_stat_activity
where state is not null;;
pid state query state_change
3735 idle LISTEN items_notification; 2024-12-21 07:39:33.817749+00
3573 idle LISTEN items_notification; 2024-12-21 07:33:13.776233+00
3738 idle LISTEN items_notification; 2024-12-21 07:39:40.540963+00
3740 idle LISTEN items_notification; 2024-12-21 07:39:45.923769+00
3743 idle LISTEN items_notification; 2024-12-21 07:39:55.741775+00
3751 idle LISTEN items_notification; 2024-12-21 07:40:06.90044+00
3769 idle LISTEN items_notification; 2024-12-21 07:40:52.295469+00
3753 idle LISTEN items_notification; 2024-12-21 07:40:12.580201+00
3771 idle LISTEN items_notification; 2024-12-21 07:40:57.758905+00
3781 idle LISTEN items_notification; 2024-12-21 07:41:09.411387+00
3786 idle LISTEN items_notification; 2024-12-21 07:41:27.983722+00
3796 idle LISTEN items_notification; 2024-12-21 07:41:37.45392+00
3797 idle LISTEN items_notification; 2024-12-21 07:41:45.018817+00
3829 idle LISTEN items_notification; 2024-12-21 07:42:58.623+00
3917 idle LISTEN items_notification; 2024-12-21 07:46:01.232221+00
3926 idle LISTEN items_notification; 2024-12-21 07:46:06.277419+00


うわ~~~~~~~~~~~~無駄使い無駄使い!
この状態ではいずれコネクション数がmax_connectionsを超える事になります。

psycopg.OperationalError: connection failed: connection to server at "******", port 5432 failed: FATAL:  remaining connection slots are reserved for roles with the SUPERUSER attribute

その意味ではアプリケーションがユーザーの同接が多く見込まれる場合にはこの実装は向かないと言えます。
また、そのほかにも再通知ができない等の制限もあったりします。(つまりは SSE 接続を確立する前に発生した通知を取得する事はできません。)
逆説的に言えば限定的なユーザーで且つイベントが再通知されなくても許容されるようなシナリオであれば、適している可能性があると言えます。

RDB ポーリングパターン


ちょっとトラディショナルなやり方ですが...。ポーリングと Server Side Events を組み合わせるパターンを実装していきます。

テーブル構成は上記の notify / listen パターンに加えて event テーブルも加えます。

CREATE TABLE "event" (
    "id" SERIAL NOT NULL,
    "item_id" INTEGER NOT NULL,
    "event_type" "EventType" NOT NULL,
    "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

    CONSTRAINT "event_pkey" PRIMARY KEY ("id")
);

ALTER TABLE "event" ADD CONSTRAINT "event_item_id_fkey" FOREIGN KEY ("item_id") REFERENCES "item" ("id") ON DELETE CASCADE;

コードは以下の通りで、notify/listenパターンと比較して、コネクションプールを作成している他、post 時に event テーブルに INSERT 文を投げるようにしています。

3秒おきに event テーブルに対して select 文を投げてポーリングしてます。

import psycopg
from psycopg_pool import AsyncConnectionPool

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool

    pool = AsyncConnectionPool(
        min_size=2,
        max_size=20,
        conninfo="dbname=event_db user=postgres password=******* host=******* ",
    )
    await pool.open()
    yield
    await pool.close()

app = FastAPI(
    title="test-sse",
    lifespan=lifespan,
)

@api_router.post(
    "/pg",
    status_code=status.HTTP_201_CREATED,
)
async def insert_item(name: str = Body(...)):
    async with await psycopg.AsyncConnection.connect(
        dbname="event_db",
        user="postgres",
        password="*******",
        host="*******",
        port="5432",
    ) as async_connection:
        async with async_connection.cursor() as async_cursor:
            await async_cursor.execute(
                "INSERT INTO public.item (name) VALUES (%s) RETURNING id", (name,)
            )
            item_id = (await async_cursor.fetchone())[0]

            await async_cursor.execute(
                "INSERT INTO public.event (event_type, item_id, created_at) VALUES (%s, %s, %s)",
                ("ITEM_CREATE", item_id, datetime.now()),
            )

            await async_connection.commit()
        return JSONResponse(
            content={"status": "ok"}, status_code=status.HTTP_201_CREATED
        )


@api_router.get(
    "/pg/stream/polling",
    status_code=status.HTTP_200_OK,
)
async def event_stream_with_pg_polling(
    last_event_id: int = Header(default=0, alias="last-event-id"),
):
    async def generate():
        current_last_event_id = last_event_id
        try:
            while True:
                async with pool.connection() as async_connection:
                    async with async_connection.cursor() as cursor:
                        await cursor.execute(
                            "SELECT id, event_type, created_at FROM event WHERE id > %s",
                            (current_last_event_id,),
                        )
                        events = await cursor.fetchall()
                        if events:
                            for event in events:
                                yield f"data: {jsonable_encoder(event)}\n\n"
                                current_last_event_id = event[0]

                await asyncio.sleep(3)
        except asyncio.CancelledError:
            print("Stream connection cancelled for event")
            raise
        finally:
            yield "data: Connection closed\n\n"
            print("Stream connection closed for event")

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
    )

実演

①アプリケーションサーバー起動後に以下の curl を投げて SSE 接続を確立します。

curl -N \
     -H "Accept:text/event-stream" \
     -H "LAST-EVENT-ID: 0" \
     http://{host}/api/v1/pg/stream/polling

③以下の curl を投げてitemテーブルに対する INSERT 文を実行する POST メソッド API を実行します。

curl -X POST "http://{host}/api/v1/pg" \
    -H "Content-Type: application/json" \
    -d '"test item name"'

{"status":"ok"}

③時間差はあれど Insert されたデータが SSE から逐次通知されます。

data: [11, 'ITEM_CREATE', '2024-12-25T17:58:57.098000']

notify/listen パターンと比較して、3秒に一度コネクションを開放しているので、コネクションが枯渇しないような実装になってます。
検証の為 SSE 接続リクエストを同時に300回バックグラウンドで実施した上で挙動を確認しました。

#!/bin/bash

CONNECTIONS=300

count=1

while [ $count -le $CONNECTIONS ]
do
    curl -N \
        -H "Accept:text/event-stream" \
        -H "LAST-EVENT-ID: 10" \
        http://{host}/api/v1/pg/stream/polling &
    
    echo "接続 $count 開始"
    count=$((count + 1))
    
    sleep 0.1
done

鬼のようにデータが流れてきますが特にコネクションが枯れる事はありません。

data: [21, 'ITEM_CREATE', '2024-12-21T13:52:32.087000']

data: [21, 'ITEM_CREATE', '2024-12-21T13:52:32.087000']

data: [21, 'ITEM_CREATE', '2024-12-21T13:52:32.087000']

data: [21, 'ITEM_CREATE', '2024-12-21T13:52:32.087000']

data: [21, 'ITEM_CREATE', '2024-12-21T13:52:32.087000']
...
...

また、 last-event-id を指定可能にする事でクライアントが SSE 接続を確立する前に既に発生したイベントを取得する事が可能です。
なのでイベントの配信が担保されている意味でトラディショナルながら堅い実装といえます。
ただし、今回の event に相当するテーブルは実際にはデータが肥大化する可能性が高いと思われるので、定期的な物理削除が必要に思います。

Redis Pub/Subパターン


イベントドリブンな実装でリアルタイム性を維持しつつコネクション問題を解決したいという命題がある中で、Redis の Pub/Sub 機能を使ってきます。

import redis.asyncio as redis

pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    max_connections=1000,
    decode_responses=True,
)
async_redis_client = redis.Redis(connection_pool=pool)


@api_router.post(
    "/redis/pubsub/publish",
    status_code=status.HTTP_200_OK,
)
async def publish(message: str = Body(...)):
    await async_redis_client.publish("test-pubsub", message)
    return {"status": "ok"}


@api_router.get(
    "/redis/pubsub/stream",
    status_code=status.HTTP_200_OK,
)
async def event_stream_with_redis_pubsub():
    async def generate():
        try:
            pubsub = async_redis_client.pubsub()
            await pubsub.subscribe("test-pubsub")

            while True:
                async for message in pubsub.listen():
                    if message and message["type"] == "message":
                        yield f"data: {jsonable_encoder(message['data'])}\n\n"

        except asyncio.CancelledError:
            print("Stream connection cancelled for event")
            raise
        finally:
            await pubsub.unsubscribe("test-pubsub")
            yield "data: Connection closed\n\n"
            print("Stream connection closed for event")

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
    )

Redis Pub/Subとは

Redis の Pub/Subメッセージブローカーとしての位置づけの機能で、基本的には以下のサイクルで動作します。

①パブリッシャーが特定のチャネルにメッセージを配信(Publish)
②サブスクライバーが特定のチャネルを購読(Subscribe)し、メッセージを受信

今回は②に相当する部分が SSE 接続後に Redis の Pub/Sub を購読してメッセージを待機する実装になってます。

実演

①以下の curl を投げて SSE 接続を確立します。

curl -N \
     -H "Accept: text/event-stream" \
     "http://{host}/api/v1/redis/pubsub/stream"

②以下の curl を投げて Redis に対してメッセージを publish します。

curl -X POST "http://{host}/api/v1/redis/pubsub/publish" \
     -H "Content-Type: application/json" \
     -d '"Item1"'

③Redis に publish されたメッセージが逐次通知されます。

data: Item1

data: Item2

制限事項

Redis' Pub/Sub exhibits at-most-once message delivery semantics. As the name suggests, it means that a message will be delivered once if at all. Once the message is sent by the Redis server, there's no chance of it being sent again. If the subscriber is unable to handle the message (for example, due to an error or a network disconnect) the message is forever lost.

引用元: Redis Pub/Sub
Redis の Pub/Sub 機能は PostgreSQL の Notify/Listen と同様に再通知ができない制限があり、メッセージを publish してもどのクライアントもサブスクライブしていない状況などの場合はメッセージが消失します。なので Pub/Sub を利用する場合で過去のイベントを遡及して取得する場合は以下の記事のように、メッセージ保存の為にRedis Listsを併用する等のハックが必要そうです。
同時接続数30万超のチャットサービスのメッセージ配信基盤をRedis Pub/SubからRedis Streamsにした話

というわけで公式ドキュメントでも配信保証を求めるなら Streams を採用せい、と書いてるので Streams の実装パターン書いてきます。

Redis Streamsパターン

@api_router.post(
    "/redis/stream/xadd",
    status_code=status.HTTP_200_OK,
)
async def stream_xadd(message: str = Body(...)):
    await async_redis_client.xadd("test-stream", {"message": message})
    return {"status": "ok"}


@api_router.get(
    "/redis/stream/stream",
    status_code=status.HTTP_200_OK,
)
async def event_stream_with_redis_stream(
    last_event_id: str = Header(default="0", alias="last-event-id"),
):
    async def generate():
        try:
            current_last_event_id = last_event_id
            while True:
                stream = await async_redis_client.xread(
                    count=5, block=10000, streams={"test-stream": current_last_event_id}
                )
                if stream:
                    for id, value in stream[0][1]:
                        yield f"data: id: {id}, {jsonable_encoder(value)}\n\n"
                    current_last_event_id = stream[0][1][-1][0]

        except asyncio.CancelledError:
            print("Stream connection cancelled for event")
            raise
        finally:
            yield "data: Connection closed\n\n"
            print("Stream connection closed for event")

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
    )

Redis Streamsとは

Redis のStreamsは Pub/Sub と同様にメッセージブローカーに位置づけられる機能ですが、
Pub/Sub との差異として、過去のメッセージを遡及して取得できる点があります。

redis-cli で以下のようにメッセージを追加してみます。
それぞれユニークなIDが付与されている事がわかると思います。

127.0.0.1:6379> XADD test-zenn * test article1
"1735264101845-0"
127.0.0.1:6379> XADD test-zenn * test article2
"1735264106580-0"
127.0.0.1:6379> XADD test-zenn * test article3
"1735264109435-0"

全て取得するには、以下のようになり時系列別に取得できるのがわかると思います。

127.0.0.1:6379> XRANGE test-zenn - +
1) 1) "1735264101845-0"
   2) 1) "test"
      2) "article1"
2) 1) "1735264106580-0"
   2) 1) "test"
      2) "article2"
3) 1) "1735264109435-0"
   2) 1) "test"
      2) "article3"

上記の-は過去のメッセージを全て取得する為の文字列で、+は現在までメッセージを取得する為の文字列です。
この例で言うと1735264101845-0から1735264106580-0までのメッセージを取得するには、以下のようになります。

127.0.0.1:6379> XRANGE test-zenn 1735264101845-0 1735264106580-0
1) 1) "1735264101845-0"
   2) 1) "test"
      2) "article1"
2) 1) "1735264106580-0"
   2) 1) "test"
      2) "article2"

実演

①以下の curl を投げて SSE 接続を確立します。

curl -N \
     -H "Accept: text/event-stream" \
     "http://{host}/api/v1/redis/stream/stream"

②以下の curl を投げて Redis の Streams に対してメッセージを追加します。

curl -X POST "http://{host}/api/v1/redis/stream/xadd" \
     -H "Content-Type: application/json" \
     -d '"Item1"'

③ Redis の Streams に追加されたメッセージが逐次通知されます。

data: id: 1735256659132-0, {'message': 'Item1'}

data: id: 1735256668613-0, {'message': 'Item1'}

data: id: 1735256669323-0, {'message': 'Item1'}

data: id: 1735256669922-0, {'message': 'Item1'}

バックエンド側まとめ

表にしてまとめると以下になります。

コネクション 再通知 リアルタイム性
Notify/Listen(PostgreSQL) × ×
ポーリング(PostgreSQL)
Pub/Sub(Redis) ×
Streams(Redis)

RDB ならポーリング、Redisを採用するなら Streams が最も堅い選択といえるのではないでしょうか。
そもそもとして、 SSE がイベントストリームの接続と切断を繰り返す前提の設計思想になっており、 Last-Event-ID ヘッダによるイベントの再取得を規格としている為、再通知が不可能で特定の ID から遡及してイベントを取得できない実装パターンは不適格といえると思います。

フロントエンド編(おまけ)

バックエンド編が長すぎたので色々かいつまんで書きます。

クライアント側の SSE 接続処理で肝要である部分は以下の3点です。

①サーバー側でイベントストリームが切断されていた場合には再接続を行う
②イベントのエイリアス毎に処理を分ける
③ユーザーがブラウザから離脱した際に last-event-id を指定できるように任意のブラウザデータストレージを利用する

①サーバー側でイベントストリームが切断されていた場合には再接続を行う
②イベントのエイリアス毎に処理を分ける

上記の要件は event-source-plus を利用すると簡単に実現できます。
標準の EventSource クラスではできないカスタムヘッダの指定やエラー時の再接続の制御をサポートしている点が便利です。

①サーバー側でイベントストリームが切断されていた場合には再接続を行う

EventSourcePlus 初期化時の retryStrategyalways(デフォルト)にするとサーバー側でイベントストリームが切断されていた場合には再接続を行うようになります。

export const fetchEventSource = async (url: string, options: RequestInit = {}) => {
  const eventSource = new EventSourcePlus(`${BASE_API_URI}${url}`, {
    retryStrategy: "always",
    headers: {
      ...(options.headers as Record<string, string>)
    }
  });

  return eventSource;
};

②イベントのエイリアス毎に処理を分ける

SSEの仕様として、イベントストリームデータは event , data, id, retry の4つのフィールドを改行区切りで解釈します。
(これまでの実装コードでは便宜的にdataのみを返却していました)

"event: ITEM_CREATE\ndata: {"item_name": "Item1"}\nid: 1735264101845-0\nretry: 10000\n\n"
↓
event: ITEM_CREATE
data: {"item_name": "Item1"}
id: 1735264101845-0
retry: 10000

なので、サーバー側で event フィールドを含めてレスポンスを返してもらう事で、以下のように SSEMessageevent プロパティに代入されるので event のエイリアス毎に処理を制御する事ができます。

export enum EventType {
    ITEM_CREATE = 'ITEM_CREATE',
    CONNECTION_CLOSED = 'CONNECTION_CLOSED',
}
export type StreamEventResponse = {
    item_name: string | null;
}
subscribeToEventStream: async () => {

        const eventSource: EventSourcePlus = await fetchEventSource('/event/stream');
        const latest_event_id = await getLatestEventId();
        if(latest_event_id){
            eventSource.lastEventId = latest_event_id?.toString() ?? '';
        }

        const listener = eventSource.listen({
            onRequest: (request) => {
                console.log(request);
            },
            onResponse: (response:OnResponseContext) => {
                console.log(response);
            },
            onRequestError(context) {
                console.log(context);
            },
            onMessage: (sse_message:SseMessage) => {
                try {
                    const jsonString = sse_message.data.replace(/'/g, '"').replace(/None/g, 'null');
                    const newEvent: StreamEventResponse = JSON.parse(jsonString);
    
                    if (!sse_message.id) {
                        return;
                    }
                    switch(sse_message.event){
                        case EventType.CONNECTION_CLOSED:
                            console.log("イベントストリームが切断されました");
                            return;
                        case EventType.ITEM_CREATE:
                            console.log(`アイテム${newEvent.name}が作成されました`);
                            break;
                    }

                } catch (error) {
                    console.error('SSEメッセージのパースに失敗しました:', error);
                    console.error('受信したデータ:', sse_message.data);
                }
            },
            onResponseError: ({ request, response, options }) => {
                console.log(request, response, options);
            }
        });
}

③ユーザーがブラウザから離脱した際にlast-event-idを指定できるように任意のブラウザデータストレージを利用する

基本的にlast-event-idevent-source-plusがストリームデータを受け取った際に設定してくれるので、再接続時には問題ないのですが、ユーザーがブラウザから離脱した際にはlast-event-idを指定できるように任意のブラウザデータストアを利用する必要があります。

関わっている案件の実装では Indexed DB を利用して、ユーザーがイベント通知を閲覧したら最新のイベント以外を削除して、ユーザーが離脱したら最新のイベントのidをlast-event-idとして設定するようにしています。

どのデータストレージを利用するかは要件次第だと思います。

おしまい

想定以上に長くなってしまいました。よくよく考えたらなんで Redis の説明とかしているんだ...ってなりました。

アプリケーションの要件にリアルタイム性が要求された際には是非 SSE を一考してみてはいかがでしょうか。
主に実装面に重きを置いて説明しましたが、Cloud Run にのっけた際のリクエストタイムアウト等、WebSocket の実装に通じる問題が発生するように予想してるので(まだ本格検証はしていない)、実運用ベースのプラクティスは需要ありそうなら書きます。
この記事の諸々が参考になれば幸いに存じます。

Discussion