SSE(Server-Sent Events)をFastAPIで行いChatGPTの回答をストリーミングで返すAPIサーバーの実装
今回はFastAPIとSSE(Server-Sent Events)を使って、ChatGPTのようなストリーミングによるレスポンスを実装してみたいと思います!
SSE(Server-Sent Events)とは
SSEとは、サーバーからクライアントへのイベント・ストリームによる非同期通信をHTTP経由で提供する技術です。
Content-Typeヘッダーにtext/event-stream
を指定しデータを送信すると、クライアント側でイベントを受け取ることができます。
WebSocketとSSEの主な違いは、WebSocketが双方向通信であるのに対し、SSEは単一方向の通信です。
SSEは、リアルタイムでサーバーにメッセージを送り返す必要がないため、クライアントに通知をプッシュする際に使用されています。
参考
チャットやオンラインゲームなどのアプリでは、リアルタイム性を重視し、双方向の通信を行うことができるWebSocketを使うことが多いと思われます。
しかし、例えばChatGPTのようなモデルを活用したアプリケーションでは、ユーザーからの質問リクエストを受け取り、サーバーから回答を返すという単一方向の通信パターンが中心と考えます。
そのため要件によっては、WebSocketよりシンプルなSSEを使うことで、比較的簡単に実装することができるのではと考えます。(これはAPIサーバーだけでなくクライアント側の実装も含めての話です。)
FastAPIによるストリーミング
FastAPIではWebSocket及びSSEどちらもサポートしています。
WebSocket
WebSocketに関して、WebSocketsのページがあるのでそちらを参照してください。
導入事例として、LLM領域ではlanarkyがLangchainをベースにWebSocketを使ってAPIサーバーを実装しています。
SSE
SSEについてはStreamingResponseやsse-starletteのEventSourceResponseを使うことで、SSEを実装することができます。
StreamingResponseについては、今回のサンプルでは使用しませんがFastAPI StreamingResponse の使い方にて詳しく記載されています。
今回はEventSourceResponseを使ってSSEを実装してみたいと思います。
SSEとFastAPIを使ったChatGPTのAPIサーバーの実装
コードサンプル
コードは以下の通りです。
import os
import openai
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import uvicorn
openai.api_key = os.getenv("OPENAI_API_KEY")
class AskRequest(BaseModel):
query: str
async def ask_llm_stream(query: str):
# LangChainのstreamはコールバック周りが複雑な印象なので一旦openaiをそのまま使う
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
stream=True, # SSEを使うための設定
messages=[
{
"role": "user",
"content": f"{query}"
}
],
)
for item in response:
try:
content = item['choices'][0]['delta']['content']
except:
content = ""
# dict型で返すことでよしなに変換してくれる
yield {"data": content}
yield {"data": "[DONE]"}
app = FastAPI()
# getにしてqueryに渡すようにするでも可
@app.post("/streaming/ask")
async def ask_stream(ask_req: AskRequest) -> EventSourceResponse:
# イテラブルオブジェクトを引数に渡す
# https://github.com/sysid/sse-starlette/blob/main/sse_starlette/sse.py#L161
return EventSourceResponse(ask_llm_stream(ask_req.query))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
-
openai.ChatCompletion.create
のstream=True
を指定することで、SSEを使ったストリーミングを行うことができます。-
stream=True
の具体的な説明はHow to stream completionsを参照してください。
-
-
EventSourceResponse
を利用しています。-
StremingResponse
では、json構造をyield
する際json.dumps
を使う必要がありますが、EventSourceResponseではdict
型で返すことでよしなに変換してくれます。 -
StreamingResponse
ではmedia_typeを指定する必要がありますが、EventSourceResponse
ではtext/event-stream
がデフォルトで指定されています。 - Creating Real-Time Charts with FastAPIでは上記についての比較がされています。
-
動作確認の方法
curl
-N
(--no-buffer
の略)をつけ、データをバッファリングせずに、直ちに表示や出力するように指示します。
curl -N -X POST \
-H "Content-Type: application/json" \
-d '{"query":"FastAPIとは何ですか?"}' \
http://localhost:8080/streaming/ask
Python
sseclient-py
を利用し、受け取ったデータを表示します。
import sseclient
import requests
url = 'http://0.0.0.0:8080/streaming/ask'
query = {"query": "FastAPIとは何ですか?"}
headers = {'Accept': 'text/event-stream'}
# https://github.com/mpetazzoni/sseclient
response = requests.post(url, stream=True, headers=headers, json=query)
client = sseclient.SSEClient(response)
for event in client.events():
print(event.data)
上記のようなコードを用いてStreamlit等で表示することも可能です。
Discussion