🐍

SSE(Server-Sent Events)をFastAPIで行いChatGPTの回答をストリーミングで返すAPIサーバーの実装

2023/08/27に公開

今回はFastAPIとSSE(Server-Sent Events)を使って、ChatGPTのようなストリーミングによるレスポンスを実装してみたいと思います!

SSE(Server-Sent Events)とは

SSEとは、サーバーからクライアントへのイベント・ストリームによる非同期通信をHTTP経由で提供する技術です。

Content-Typeヘッダーにtext/event-streamを指定しデータを送信すると、クライアント側でイベントを受け取ることができます。

WebSocketとSSEの主な違いは、WebSocketが双方向通信であるのに対し、SSEは単一方向の通信です。

SSEは、リアルタイムでサーバーにメッセージを送り返す必要がないため、クライアントに通知をプッシュする際に使用されています。

参考

チャットやオンラインゲームなどのアプリでは、リアルタイム性を重視し、双方向の通信を行うことができるWebSocketを使うことが多いと思われます。

しかし、例えばChatGPTのようなモデルを活用したアプリケーションでは、ユーザーからの質問リクエストを受け取り、サーバーから回答を返すという単一方向の通信パターンが中心と考えます。

そのため要件によっては、WebSocketよりシンプルなSSEを使うことで、比較的簡単に実装することができるのではと考えます。(これはAPIサーバーだけでなくクライアント側の実装も含めての話です。)

FastAPIによるストリーミング

FastAPIではWebSocket及びSSEどちらもサポートしています。

WebSocket

WebSocketに関して、WebSocketsのページがあるのでそちらを参照してください。

導入事例として、LLM領域ではlanarkyがLangchainをベースにWebSocketを使ってAPIサーバーを実装しています。

SSE

SSEについてはStreamingResponsesse-starletteのEventSourceResponseを使うことで、SSEを実装することができます。

StreamingResponseについては、今回のサンプルでは使用しませんがFastAPI StreamingResponse の使い方にて詳しく記載されています。

今回はEventSourceResponseを使ってSSEを実装してみたいと思います。

SSEとFastAPIを使ったChatGPTのAPIサーバーの実装

コードサンプル

※ 今回掲載するコードはgistにて公開しています

コードは以下の通りです。

import os
import openai
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import uvicorn

openai.api_key = os.getenv("OPENAI_API_KEY")

class AskRequest(BaseModel):
    query: str


async def ask_llm_stream(query: str):
    # LangChainのstreamはコールバック周りが複雑な印象なので一旦openaiをそのまま使う
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        stream=True, # SSEを使うための設定
        messages=[
            {
                "role": "user",
                "content": f"{query}"
            }
        ],
    )

    for item in response:
        try:
            content = item['choices'][0]['delta']['content']
        except:
            content = ""
        # dict型で返すことでよしなに変換してくれる
        yield {"data": content}
    yield {"data": "[DONE]"}


app = FastAPI()

# getにしてqueryに渡すようにするでも可
@app.post("/streaming/ask")
async def ask_stream(ask_req: AskRequest) -> EventSourceResponse:
    # イテラブルオブジェクトを引数に渡す
    # https://github.com/sysid/sse-starlette/blob/main/sse_starlette/sse.py#L161
    return EventSourceResponse(ask_llm_stream(ask_req.query))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)
  • openai.ChatCompletion.createstream=Trueを指定することで、SSEを使ったストリーミングを行うことができます。

  • EventSourceResponseを利用しています。

    • StremingResponseでは、json構造をyieldする際json.dumpsを使う必要がありますが、EventSourceResponseではdict型で返すことでよしなに変換してくれます。
    • StreamingResponseではmedia_typeを指定する必要がありますが、EventSourceResponseではtext/event-streamがデフォルトで指定されています。
    • Creating Real-Time Charts with FastAPIでは上記についての比較がされています。

動作確認の方法

curl

-N(--no-bufferの略)をつけ、データをバッファリングせずに、直ちに表示や出力するように指示します。

curl -N -X POST  \
 -H "Content-Type: application/json" \
 -d '{"query":"FastAPIとは何ですか?"}' \
 http://localhost:8080/streaming/ask
Python

sseclient-pyを利用し、受け取ったデータを表示します。

import sseclient
import requests

url = 'http://0.0.0.0:8080/streaming/ask'
query = {"query": "FastAPIとは何ですか?"}
headers = {'Accept': 'text/event-stream'}
# https://github.com/mpetazzoni/sseclient
response = requests.post(url, stream=True, headers=headers, json=query)
client = sseclient.SSEClient(response)

for event in client.events():
    print(event.data)

上記のようなコードを用いてStreamlit等で表示することも可能です。

Discussion