Closed3
FastAPIでストリーミングを使ってみる
とにかくやってみる
main.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.get("/stream-json")
async def stream_json_example():
async def generate_json_data():
for i in range(10):
data = {"index": i, "message": f"Hello from index {i}"}
await asyncio.sleep(1.0)
yield f"data:{json.dumps(data)}\n\n"
return StreamingResponse(content=generate_json_data(), media_type="text/event-stream")
できた! (postman便利)

実は1時間ぐらい沼りました...
沼ったところ
-
media_typeをtext/event-streamにする -
yieldで送るデータは先頭にdata:とつけて末尾に改行を2回(\n\n)つける <-これ重要
data:の部分は最初何も知らなくてなんで動かないんだろう?ってなってました(馬鹿)
エラーハンドリングは全くやってないですがあしからず。
これでopenai互換のapiが作れるぞ...!
このスクラップは2024/03/25にクローズされました