📖
FastAPIでMessagePackを使う
FastAPIにて、モデルにpydanticを使っている場合に、JSON以外の形式を使うにはどうすればよいか? ここではMessagePackを例にとっています。
以下が参考になりました。
これをapplication/xml
からapplication/x-msgpack
へと書き換えてみたのが以下のコードです。POSTでMessagePack形式のリクエストを受け取り、MessagePackのレスポンスを返します。例外処理は適当です。
from fastapi import Depends, FastAPI, Response
from typing import TypeVar, Generic, Type, Any
from pydantic import BaseModel
from starlette.requests import Request
import msgpack
T = TypeVar("T", bound=BaseModel)
app = FastAPI()
class MsgPackBody(Generic[T]):
def __init__(self, model_class: Type[T]) -> None:
self.model_class = model_class
async def __call__(self, request: Request) -> T:
if request.headers.get("Content-Type") != "application/x-msgpack":
raise
body = await request.body()
dict_data = msgpack.unpackb(body, use_list=False)
return self.model_class.parse_obj(dict_data)
class MyRequest(BaseModel):
name: str
age: int
class MyResponse(BaseModel):
message: str
@app.post("/msgpack")
async def post_msgpack(request: MyRequest = Depends(MsgPackBody(MyRequest))) -> Response:
response = MyResponse(message=f"{request.name} ({request.age}) さん、こんにちは!")
response_msgpack = msgpack.packb(response.dict(), use_bin_type=True)
return Response(content=response_msgpack, media_type="application/x-msgpack", status_code=200)
普段のお手軽さゆえの宿命ですが、FastAPIは薄いラッパーなので、少し凝ったことをしようとするとたちまちstarletteなど裏の実装の知識を求められるのが大変です。
Discussion