🐍

FastAPI × OpenAI ストリームで出力するAPIを作ってみた

に公開

はじめに

「ピロピロ」と一文字ずつ流れるような UI を実現したいとき、OpenAI API のストリーミング機能を使うと、サーバーからクライアントへリアルタイムにトークンを渡せます。FastAPI ならシンプルに実装できるらしいのでやってみました。

↓ こんな感じです。
stream-demo.gif

依存ライブラリ

簡単な実装例では使っていないものもありますが、以下のライブラリを使用します。

# requirements.txt
fastapi
uvicorn[standard]
dependency-injector
pytest
httpx
openai

OpenAI API を使用するためには、API キーが必要です。
環境変数としてOPENAI_API_KEY に設定してください。

OPENAI_API_KEY=sk-…

かんたんな実装例

まずは、FastAPI を使って OpenAI API のストリーミングレスポンスを受け取る簡単な実装例を紹介します。
OpenAIの stremをtrue にして、FastAPI の StreamingResponse を使うことで、サーバーからクライアントへリアルタイムにトークンを渡すことができます。

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI

client = OpenAI()
app = FastAPI()

def openai_stream_generator(messages: list[dict[str, str]]):
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        stream=True,
        messages=messages,
    )
    for chunk in response:
        delta = chunk.choices[0].delta
        if hasattr(delta, "content") and delta.content:
            yield f"data: {delta.content}\n\n"

    yield "event: done\ndata: [DONE]\n\n"

@app.post("/v1/chat/stream", status_code=200)
async def chat_stream(request: Request):
    body = await request.json()
    messages = body.get("messages", [])

    generator = openai_stream_generator(messages)
    return StreamingResponse(generator, media_type="text/event-stream")

curlで動作確認する

例えば、以下のようなリクエストを送信すると、OpenAI API からのストリーミングレスポンスを受け取ることができます。

curl -N \
  -H "Content-Type: application/json" \
  -d '{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}' \
  http://127.0.0.1:8000/v1/chat/stream
  
data: や

data: っ

data: ほ

data: !

...

event: done
data: [DONE]

シェルスクリプトでそれっぽく出力させる

下記のシェルスクリプトを実行すると、AI からのストリーミングレスポンスを受け取ることができます。

#!/usr/bin/env bash

URL="http://127.0.0.1:8000/v1/chat/stream"
PAYLOAD='{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}'

curl -sN \
  -H "Content-Type: application/json" \
  -d "${PAYLOAD}" \
  "${URL}" |
while IFS= read -r line; do
  if [[ $line == data:* ]]; then
    printf '%s' "${line#data: }"
  elif [[ $line == event:\ done ]]; then
    break
  fi
done

printf '\n'

まとめ

FastAPI を使って OpenAI API のストリーミングレスポンスを受け取る方法を解説しました。
いまさら感がありますが、次はRAGでもやってみようかなと思います。

おまけ

FastAPIにあまり慣れていないので、練習で少し整理した構成で実装してみました 。以下のような構成にしています。 細かいところは色々ありそうですが、それっぽくできそうです。
作成したコードは githubのリポジトリにあります。

1. プロジェクト構成

以下のようなディレクトリ構成にします。

src/app/
├── containers.py                 # DI コンテナ定義
├── main.py                       # FastAPI アプリ起点
├── domain/adapters/
│   └── openai_client_interface.py  # インターフェース定義(pythonってインターフェース無い?)
├── infrastructure/adapters/
│   └── openai_client.py           # OpenAI SDK をラッパーしてる
├── use_cases/
│   └── chat_stream_action.py      # ユースケース実装
└── routes/v1/
    └── root.py                    # ルーティング

2. DI コンテナ定義 (src/app/containers.py)

dependency_injector を使って DI コンテナを定義します。

from dependency_injector import containers, providers
from app.infrastructure.adapters.openai_client import OpenAIClient
from app.use_cases.chat_stream_action import ChatStreamAction
from openai import OpenAI

class ApplicationContainer(containers.DeclarativeContainer):
    openai_client = providers.Singleton(OpenAI)
    openai_client_adapter = providers.Singleton(OpenAIClient, client=openai_client)
    chat_stream_action = providers.Factory(ChatStreamAction, openai_client=openai_client_adapter)

3. インターフェース定義 (src/app/domain/adapters/openai_client_interface.py)

openai_client.py で実装するインターフェースを定義します。
Protocol と Abstract の違いがよく分かっていないですが、とりあえず Protocol を使ってみました。後で調べます。

from typing import List, Dict, Generator, Protocol

class IOpenAIClient(Protocol):
    def stream_chat(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
        """
        OpenAI からのストリーミング応答を SSE 形式で yield する。
        """
        ...

4. SDK ラッパーするアダプターの実装 (src/app/infrastructure/adapters/openai_client.py)

先ほど定義したインターフェースを実装します。

import json
from typing import List, Dict, Generator
from openai import OpenAI

from app.domain.adapters.openai_client_interface import IOpenAIClient


class OpenAIClient(IOpenAIClient):
    def __init__(self, client: OpenAI, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model

    def stream_chat(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
        """
        OpenAI からのストリーミング応答を SSE 形式で yield する。
        """
        response = self.client.chat.completions.create(
            model=self.model, stream=True, messages=messages
        )
        for chunk in response:
            delta = chunk.choices[0].delta
            if hasattr(delta, "content") and delta.content:
                yield f"data: {delta.content}\n\n"
        yield "event: done\ndata: [DONE]\n\n"

5. ユースケース (src/app/use_cases/chat_stream_action.py)

ユースケースを定義します。ここでは、OpenAIClient を使ってストリーミングレスポンスを取得するアクションを実装します。

from typing import List, Dict, Generator
from app.infrastructure.adapters.openai_client import OpenAIClient

class ChatStreamAction:
    def __init__(self, openai_client: OpenAIClient):
        self.openai_client = openai_client

    def execute(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
        return self.openai_client.stream_chat(messages)

6. ルーティング (src/app/routes/v1/root.py)

FastAPI のルーティングを定義します。ここでは、POST リクエストを受け取って、OpenAI API からのストリーミングレスポンスを返すエンドポイントを実装します。

# app/routes/v1/root.py
from typing import List, Dict

from fastapi import APIRouter, Depends
from dependency_injector.wiring import inject, Provide

from app.use_cases.chat_stream_action import ChatStreamAction
from app.containers import ApplicationContainer
from fastapi import Request
from fastapi.responses import StreamingResponse

router = APIRouter()

@router.post("/chat/stream", status_code=200)
@inject
async def chat_stream(
        request: Request,
        action: ChatStreamAction = Depends(Provide[ApplicationContainer.chat_stream_action]),
):
    body = await request.json()
    messages: List[Dict[str, str]] = body.get("messages", [])
    generator = action.execute(messages)
    return StreamingResponse(generator, media_type="text/event-stream")

7. アプリケーションのmain.py (src/app/main.py)

FastAPI アプリケーションのエントリーポイントを定義します。

# app/main.py
from fastapi import FastAPI
from app.containers import ApplicationContainer
from app.routes.v1.root import router as message_router

def create_app() -> FastAPI:
    container = ApplicationContainer()

    container.wire(
        modules=[
            __name__,
            "app.routes.v1.root",
        ]
    )

    app = FastAPI()
    app.container = container

    app.include_router(message_router, prefix="/v1", tags=["Message"])

    return app

app = create_app()

8. 動作確認

動作は特に変わらないので、さきほどのシェルスクリプトやcurlで確認できます。

curl -N \
  -H "Content-Type: application/json" \
  -d '{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}' \
  http://127.0.0.1:8000/v1/chat/stream
  
data: や

data: っ

data: ほ

data: !

...

event: done
data: [DONE]
株式会社ソニックムーブ

Discussion