🔥

【Langgraph+FastAPI】LanggraphのAPI化の技術選定で苦戦した話

2025/01/25に公開

はじめに

MLエンジニアのふるです。今日は珍しく、MLエンジニアらしいAPI化の技術選定(笑)について話していこうと思います。

今回はLanggraphの説明の詳細は省き、LanggraphアプリをAPI提供する場合の技術選定について、困ったことを書いていこうと思います。


背景

Langgraphでは、LLMのマルチエージェントもしくはマルチワークフローを構築する際に、forループで出力する箇所があります。

for s in graph.stream(
    {
        "user_input": user_input,
        "messages": input_messages,
        "user_profile": user_profile,
        "question": question,
        "image_url": image_url
    },
    {
        "recursion_limit": 30,
    }
):
    # (エージェントの出力処理など)

このforループを出力する際に、API設計として以下のような選択肢がぱっと思いつくでしょう。

  1. 全てエージェントの出力が終了してから、REST APIでリターンする
  2. forループを1つずつ出力する

1番はパッと実装が思いつくと思いますが、例えばhuman in the loopを入れたい場合やstreamingなどで間の応答も返してUX体験を向上させるなど、2番で実装したいケースも出てきます。今回は2番のケースに着目した上で実装を見ていきたいと思います。

以下実装は簡単のためFastAPIでするものとします。


最初の実装: WebSocket

私の方で最初に試したのはWebSocket型で設計する方法でした。コードとしては以下のようなものです。

# WebSocket エンドポイントの実装
@router.websocket("/ws/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
    # APIキーの定義(例: 環境変数から取得)
    API_KEY = "...."

    # トークンの検証
    if token != API_KEY:
        await websocket.close(code=1008)  # 1008はポリシー違反を示すコード
        logger.warning(f"Unauthorized access attempt")
        return

    # WebSocket 接続の確立
    await websocket.accept()
    try:
        while True:
            # クライアントからのメッセージを受信
            data = await websocket.receive_text()
            # JSONデータを一度パースして変数に格納
            json_data = json.loads(data)

            # Pingメッセージの処理
            if json_data.get("type") == "ping":
                await websocket.send_text(json.dumps({"type": "pong"}))
                continue

            # 必要なデータを取得
            user_input = json_data.get("user_input")
            input_messages = json_data.get("messages")
            user_profile = json_data.get("user_profile")
            question = json_data.get("question")
            image_url = json_data.get("image_url")
            if len(input_messages) > 0:
                input_messages = inv_transform_json(input_messages)

            # LangGraph の処理を非同期で実行
            async for s in graph.astream(
                {
                    "user_input": user_input,
                    "messages": input_messages,
                    "user_profile": user_profile,
                    "question": question,
                    "image_url": image_url
                },
                {
                    "recursion_limit": 30,
                }
            ):  
                s = transform_json(s)
                logger.info(f"WebSocket message: {s}")
                # ストリーミング結果をクライアントに送信
                await websocket.send_text(json.dumps(s, ensure_ascii=False))
                # 適切な間隔を空ける(必要に応じて調整)
                await asyncio.sleep(0.1)
    except WebSocketDisconnect:
        logger.info(f"WebSocket disconnected")
    except Exception as e:
        logger.error(f"Error occurred: {e}")
        await websocket.close()

これで当初 CloudRunでFastAPIを運用していましたが、以下のような課題に当たりました。

  • CloudRun側で、30秒のタイムアウトが発生する [1]
  • 接続開始から切断まで課金し続けられるため、CloudRunの費用が高い [2]
  • 何らかの理由で切断された場合に再接続するまでチャット送信ができなくなるためUX体験が悪化する
  • WebSocket接続時に負荷がかかる
  • WebSocketにおける認証の設計の難易度が高い [3]

本当にWebSocket必要なんだっけ?と再考したところでServer-Sent Events(SSE)という技術に出会いました。これはOpenAI APIなどにおいて、Streamingなどで使われている技術です。これで十分なのでは??と思い、SSEについて調査してみました。


SSEとは?

WebSocketとの違いは、WebSocketが双方向通信なのに対して、SSEはサーバーからクライアントへの一方向通信であることを見ておくとわかりやすいです。特に着目すべきは、SSEの場合は独立したプロトコルが不要な点です。表にまとめると以下のようになります。

比較項目 サーバー送信イベント (SSE) WebSocket
通信方向 サーバー → クライアントへの一方向通信 双方向通信
プロトコル HTTPベース 独立したプロトコル
接続 長時間HTTP接続を利用 専用のTCP接続を利用
データ形式 テキストデータのみ バイナリデータもサポート
実装の複雑さ 簡単な実装 プロトコルが複雑で実装も難しい
ブラウザ対応 主要ブラウザで対応済み (IEやEdgeはpolyfillで対応) 新しいブラウザで対応 (IE10以降)
用途 サーバーからのプッシュ通知や一方向通信に向いている 双方向通信が必要な場面に向いている

SSEを使った実装例

なのでWebSocketではなく、SSEを使って実装をしてみることにしました。以下、実装し直した例です。

Server-Sent Events エンドポイントの実装

@router.post("/stream/{token}")
async def stream_endpoint(request: Request, token: str):
    # APIキーの定義(例: 環境変数から取得)
    API_KEY = "...."

    # トークンの検証
    if token != API_KEY:
        logger.warning(f"Unauthorized access attempt")
        return {"message": "Invalid token"}

    # リクエストボディの取得
    body = await request.json()

    # 必要なデータを取得
    user_input = body.get("user_input")
    input_messages = body.get("messages")
    user_profile = body.get("user_profile")
    question = body.get("question")
    image_url = body.get("image_url")
    if len(input_messages) > 0:
        input_messages = inv_transform_json(input_messages)

    async def event_generator():
        try:
            # LangGraph の処理を非同期で実行
            async for s in graph.astream(
                {
                    "user_input": user_input,
                    "messages": input_messages,
                    "user_profile": user_profile,
                    "question": question,
                    "image_url": image_url
                },
                {
                    "recursion_limit": 30,
                }
            ):
                s = transform_json(s)
                logger.info(f"Streaming message: {s}")
                # ストリーミング結果をクライアントに送信
                yield json.dumps(s, ensure_ascii=False)
                # 適切な間隔を空ける(必要に応じて調整)
                await asyncio.sleep(0.1)
        except Exception as e:
            logger.error(f"Error occurred: {e}")
            yield json.dumps({"error": str(e)})

    return StreamingResponse(event_generator(), media_type="text/event-stream")

変わっている箇所としては、

  • 出力をyieldで指定している yield json.dumps(s, ensure_ascii=False)
  • StreamingResponseクラスを呼び出している

この2つの箇所になります。Langgraphのforループにシームレスに組み込まれ、シンプルな実装ができています。

フロントエンド側の実装例 (Next.js)

const processStream = async () => {
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      console.log('Stream ended');
      break;
    }

    buffer += decoder.decode(value, { stream: true });
    console.log('Buffer:', buffer);

    // JSONオブジェクトが完全に形成されているかを判定
    try {
      try {
        const parsedData = JSON.parse(buffer);
        const newMessage = parseMessage(parsedData);
        if (newMessage) {
          setChatPage(prevState => ({
            ...prevState,
            messages: [
              ...prevState.messages,
              { ...newMessage, type: 'add' },
            ],
          }));
        }
        buffer = '';
      } catch (error) {
        // JSONが不完全な場合はエラーを無視して次のチャンクを待つ
      }
    } catch (error) {
      // JSONが不完全な場合はエラーを無視して次のチャンクを待つ
    }
  }
};

かなり荒い実装ですが、Langgraphで送信したものをJSONに限定させることで、JSONのvalidationが通るまでbufferを繋げる動作で実装しています。これは、チャンクで送られてくる事象があるためです。


SSEにおける改善ポイント

SSEの実装に乗り換えたことで、コード周りもスッキリし、受け取る側もfetchを使って簡潔に書けることが魅力です。まず最初にWebSocketとSSEの技術選定をしっかりしないとなという反省がありました。

CloudRunのインスタンス稼働時間のモニタリングが以下の図です。WebSocketでデプロイ後が1/21 23:00頃ですが、明らかにインスタンス稼働時間が減っており、断続的な消費となっているため、サーバレスである利点を大きく活用できています。

インスタンス稼働時間モニタリング

また、接続の同時判定などを気にする必要もなくなり、REST APIとほぼ同じ設計でできるようになったため、考慮する点が解消できたのが大きな利点でした。


終わりに

今回は、Langgraphにおけるストリーミング出力において、技術選定に苦労した話を紹介しました。結論として、WebSocketではなくSSEで十分なケースは多々あるということです。WebSocketを使う際はまず「SSEで十分か?」を検討してみると良いかと思います。


参考文献

<a id="reference1"></a>
[1] CloudRun での WebSocket 接続のタイムアウトに対処するためには

<a id="reference2"></a>
[2] CloudRunにおけるWebsocketの課金について

<a id="reference3"></a>
[3] Websocket の認証 (Authentication) について考える


補足1 (WebSocketにおけるping処理のTypeScriptコード)

newSocket.onopen = () => {
  console.log('WebSocket connection opened successfully');

  pingInterval = setInterval(() => {
    if (newSocket.readyState === WebSocket.OPEN) {
      newSocket.send(JSON.stringify({ type: 'ping' }));
    }
  }, 10000);
};

補足2 (今回検証を実施したサイト DiaryGraphについて)

Discussion