Closed3

FastAPIでストリーミングを使ってみる

須藤ナノ須藤ナノ

とにかくやってみる

main.py
import asyncio

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()


@app.get("/stream-json")
async def stream_json_example():
    async def generate_json_data():
        for i in range(10):
            data = {"index": i, "message": f"Hello from index {i}"}
            await asyncio.sleep(1.0)
            yield f"data:{json.dumps(data)}\n\n"


    return StreamingResponse(content=generate_json_data(), media_type="text/event-stream")
須藤ナノ須藤ナノ

実は1時間ぐらい沼りました...
沼ったところ

  • media_typetext/event-streamにする
  • yieldで送るデータは先頭にdata:とつけて末尾に改行を2回(\n\n)つける <-これ重要

data:の部分は最初何も知らなくてなんで動かないんだろう?ってなってました(馬鹿)
エラーハンドリングは全くやってないですがあしからず。

これでopenai互換のapiが作れるぞ...!

このスクラップは2024/03/25にクローズされました