FastAPI × OpenAI ストリームで出力するAPIを作ってみた
はじめに
「ピロピロ」と一文字ずつ流れるような UI を実現したいとき、OpenAI API のストリーミング機能を使うと、サーバーからクライアントへリアルタイムにトークンを渡せます。FastAPI ならシンプルに実装できるらしいのでやってみました。
↓ こんな感じです。
依存ライブラリ
簡単な実装例では使っていないものもありますが、以下のライブラリを使用します。
# requirements.txt
fastapi
uvicorn[standard]
dependency-injector
pytest
httpx
openai
OpenAI API を使用するためには、API キーが必要です。
環境変数としてOPENAI_API_KEY
に設定してください。
OPENAI_API_KEY=sk-…
かんたんな実装例
まずは、FastAPI を使って OpenAI API のストリーミングレスポンスを受け取る簡単な実装例を紹介します。
OpenAIの stremをtrue にして、FastAPI の StreamingResponse を使うことで、サーバーからクライアントへリアルタイムにトークンを渡すことができます。
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
client = OpenAI()
app = FastAPI()
def openai_stream_generator(messages: list[dict[str, str]]):
response = client.chat.completions.create(
model="gpt-4o-mini",
stream=True,
messages=messages,
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
yield f"data: {delta.content}\n\n"
yield "event: done\ndata: [DONE]\n\n"
@app.post("/v1/chat/stream", status_code=200)
async def chat_stream(request: Request):
body = await request.json()
messages = body.get("messages", [])
generator = openai_stream_generator(messages)
return StreamingResponse(generator, media_type="text/event-stream")
curlで動作確認する
例えば、以下のようなリクエストを送信すると、OpenAI API からのストリーミングレスポンスを受け取ることができます。
curl -N \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}' \
http://127.0.0.1:8000/v1/chat/stream
data: や
data: っ
data: ほ
data: !
...
event: done
data: [DONE]
シェルスクリプトでそれっぽく出力させる
下記のシェルスクリプトを実行すると、AI からのストリーミングレスポンスを受け取ることができます。
#!/usr/bin/env bash
URL="http://127.0.0.1:8000/v1/chat/stream"
PAYLOAD='{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}'
curl -sN \
-H "Content-Type: application/json" \
-d "${PAYLOAD}" \
"${URL}" |
while IFS= read -r line; do
if [[ $line == data:* ]]; then
printf '%s' "${line#data: }"
elif [[ $line == event:\ done ]]; then
break
fi
done
printf '\n'
まとめ
FastAPI を使って OpenAI API のストリーミングレスポンスを受け取る方法を解説しました。
いまさら感がありますが、次はRAGでもやってみようかなと思います。
おまけ
FastAPIにあまり慣れていないので、練習で少し整理した構成で実装してみました 。以下のような構成にしています。 細かいところは色々ありそうですが、それっぽくできそうです。
作成したコードは githubのリポジトリにあります。
1. プロジェクト構成
以下のようなディレクトリ構成にします。
src/app/
├── containers.py # DI コンテナ定義
├── main.py # FastAPI アプリ起点
├── domain/adapters/
│ └── openai_client_interface.py # インターフェース定義(pythonってインターフェース無い?)
├── infrastructure/adapters/
│ └── openai_client.py # OpenAI SDK をラッパーしてる
├── use_cases/
│ └── chat_stream_action.py # ユースケース実装
└── routes/v1/
└── root.py # ルーティング
src/app/containers.py
)
2. DI コンテナ定義 (dependency_injector を使って DI コンテナを定義します。
from dependency_injector import containers, providers
from app.infrastructure.adapters.openai_client import OpenAIClient
from app.use_cases.chat_stream_action import ChatStreamAction
from openai import OpenAI
class ApplicationContainer(containers.DeclarativeContainer):
openai_client = providers.Singleton(OpenAI)
openai_client_adapter = providers.Singleton(OpenAIClient, client=openai_client)
chat_stream_action = providers.Factory(ChatStreamAction, openai_client=openai_client_adapter)
src/app/domain/adapters/openai_client_interface.py
)
3. インターフェース定義 (openai_client.py で実装するインターフェースを定義します。
Protocol と Abstract の違いがよく分かっていないですが、とりあえず Protocol を使ってみました。後で調べます。
from typing import List, Dict, Generator, Protocol
class IOpenAIClient(Protocol):
def stream_chat(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
"""
OpenAI からのストリーミング応答を SSE 形式で yield する。
"""
...
src/app/infrastructure/adapters/openai_client.py
)
4. SDK ラッパーするアダプターの実装 (先ほど定義したインターフェースを実装します。
import json
from typing import List, Dict, Generator
from openai import OpenAI
from app.domain.adapters.openai_client_interface import IOpenAIClient
class OpenAIClient(IOpenAIClient):
def __init__(self, client: OpenAI, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
def stream_chat(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
"""
OpenAI からのストリーミング応答を SSE 形式で yield する。
"""
response = self.client.chat.completions.create(
model=self.model, stream=True, messages=messages
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
yield f"data: {delta.content}\n\n"
yield "event: done\ndata: [DONE]\n\n"
src/app/use_cases/chat_stream_action.py
)
5. ユースケース (ユースケースを定義します。ここでは、OpenAIClient を使ってストリーミングレスポンスを取得するアクションを実装します。
from typing import List, Dict, Generator
from app.infrastructure.adapters.openai_client import OpenAIClient
class ChatStreamAction:
def __init__(self, openai_client: OpenAIClient):
self.openai_client = openai_client
def execute(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
return self.openai_client.stream_chat(messages)
src/app/routes/v1/root.py
)
6. ルーティング (FastAPI のルーティングを定義します。ここでは、POST リクエストを受け取って、OpenAI API からのストリーミングレスポンスを返すエンドポイントを実装します。
# app/routes/v1/root.py
from typing import List, Dict
from fastapi import APIRouter, Depends
from dependency_injector.wiring import inject, Provide
from app.use_cases.chat_stream_action import ChatStreamAction
from app.containers import ApplicationContainer
from fastapi import Request
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.post("/chat/stream", status_code=200)
@inject
async def chat_stream(
request: Request,
action: ChatStreamAction = Depends(Provide[ApplicationContainer.chat_stream_action]),
):
body = await request.json()
messages: List[Dict[str, str]] = body.get("messages", [])
generator = action.execute(messages)
return StreamingResponse(generator, media_type="text/event-stream")
src/app/main.py
)
7. アプリケーションのmain.py (FastAPI アプリケーションのエントリーポイントを定義します。
# app/main.py
from fastapi import FastAPI
from app.containers import ApplicationContainer
from app.routes.v1.root import router as message_router
def create_app() -> FastAPI:
container = ApplicationContainer()
container.wire(
modules=[
__name__,
"app.routes.v1.root",
]
)
app = FastAPI()
app.container = container
app.include_router(message_router, prefix="/v1", tags=["Message"])
return app
app = create_app()
8. 動作確認
動作は特に変わらないので、さきほどのシェルスクリプトやcurlで確認できます。
curl -N \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"やっほ"}]}' \
http://127.0.0.1:8000/v1/chat/stream
data: や
data: っ
data: ほ
data: !
...
event: done
data: [DONE]
Discussion