FastAPIでサービス層とリポジトリ層を使って一連の流れを作成してみた
この記事は前回からの内容を前提に記載しているのでご注意ください。
DockerでFastAPI環境構築(HelloWorldまで)
FastAPIのDB接続とマイグレーション(DIコンテナも準備)
FastAPIのログ設定と全体のエラーハンドリング
ではさっそく始めていきます!
FastAPIはエンドポイントがそのままrouteとcontrollerとしての役割を果たします。
ここを分割することも可能ですが、分けるメリットがあまりないのと逆に見通しが悪くなりそうな気がしていて、一緒に定義した方がいいと思っています。(公式も一緒だしね)
ただ、そのままエンドポイントに処理やDBのデータのやり取りのロジックを入れるとかなり見通しと再利用性が悪くなります。
なので、大雑把に分けると
endpoint→service→repository→DBのように責務を分けてみたいと思います。
(Laravelなどは結構このパターンが多いと思います。FastAPIのお作法だともっと違うやり方があるかもしれないので、あくまで筆者の分け方だという認識でいて頂ければと思います。)
- endpoint
- route定義とリクエストを受け取り、レスポンスを返すのみに責務を絞る
- service
- ロジックやデータの加工を行う。
- repository
- DBへのアクセス
実際にかなり簡単な例ですが、ユーザー取得の流れを書いてみます。(記事全般が前回からの流れをくみますので、ご注意ください。)
project_root
├── _docker
│ ├── nginx
│ │ └── nginx.conf
│ └── python
│ └── Dockerfile
├── src
│ └── main.py
│ └── init.py
│ └── core
│ └── dependency.py
│ └── logging.py
│ └── middleware
│ └── exception_handler.py
│ └── schema
│ └── response
│ └── error_response.py
│ └── base_response.py
│ └── user_response.py
│ └── database
│ └── database.py
│ └── endpoint// 追加
│ └── user_endpoint.py// 追加
│ └── service// 追加
│ └── user_service.py// 追加
│ └── repository// 追加
│ └── user_repository.py// 追加
├── .env
├── pyproject.toml
├── poetry.lock
├── makefile
├── makefile.local
├── makefile.container
├── .gitignore
└── docker-compose.yml
エンドポイントの作成
↓
サービスの作成
↓
リポジトリの作成
↓
レスポンスモデルの作成
↓
main.pyへエンドポイントを追加
↓
自動生成されたOpenAPIをブラウザで開き、getしてみる
上記の順番で作業を進めていきます。
エンドポイントの作成
/src/endpoint/user_endpoint.pyを作成します。
APIRouterクラスを定義してあげることでmain.pyのappに個別定義したrouteを追加することができるようになります。
routerデコレータの中身はOpenAPI関連の記載もしていきます。
OpenAPIを自動生成してくれるのがFastAPIの強みのひとつなので、情報として必要な部分は積極的に記載していく方が望ましいです。
デコレータ内のoperation_idについては、フロント側のAPI自動コード生成でメソッド名として、扱われるので、OpenAPIジェネレータを使用する際は分かりやすい命名で記載した方がいいと思います。
from fastapi import APIRouter
from src.schema.response.user_response import UserResponse
from src.core.dependency import di_injector
from src.service.user_service import UserService
router = APIRouter()
@router.get(
"/me", # パス
response_model=UserResponse, # レスポンスモデル(OpenAPIにプロパティが使用される)
tags=["user"], # OpenAPIのタグ
name="ユーザー取得", # OpenAPIの名称
description="ユーザー取得", # OpenAPIの説明
operation_id="get_me", # OpenAPIジェネレータでコード自動生成しなければ不要
)
async def me() -> UserResponse:
try:
result = await di_injector.get_class(UserService).get_user()
except Exception:
raise
return UserResponse(data=result)
メソッドの中身ですが、極めて簡単ですね!
処理の途中で例外が発生した場合はcatchで補足して、以前作成したハンドラーが拾ってフロントへ返します。
メインの流れはdi_injectorクラス(DIコンテナ)でUserServiceクラスを取得してget_userメソッドを呼び出しています。
FastAPIのDependenciedを使用しても良いのですが、Serviceには複数のRepositoryの依存が発生する可能性があるので、DIコンテナに任せてしまった方が、見通しが良く扱いやすいため、採用しました。
di_injectorは前回作成したものですが、再度乗せておきます。
from injector import Injector, inject
from src.database import AppConfig
class DependencyInjector:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DependencyInjector, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.di = Injector([AppConfig()])
async def update_injector(self, _class):
self.di = Injector([_class])
@inject
def get_class(self, _class):
return self.di.get(_class)
di_injector = DependencyInjector()
サービスの作成
サービス層を作成していきます。
今回はリポジトリを呼び出す以外は行っていませんが、ここでデータの加工やビジネスロジックを記載したりします。(本来はマジでコアとなるビジネスロジックはサービスドメインなどに切り出した方がいいです。)
ここでもinjectorを使用してコンストラクタインジェクションにて対応しています。(endpointもクラス化した方が設計に一貫性が出るのでそっちの方が良かったかもです。)
from injector import inject
from src.repository.user_repository import UserRepository
class UserService:
@inject
def __init__(
self,
repository: UserRepository,
):
self.repository = repository
async def get_user(self):
return await self.repository.get_user()
リポジトリの作成
データ取得は適当なので参考にしないでください。
ただ、流れとしてサービスから呼び出されたリポジトリはデータのCRUDの責任を守ります。
それ以外はサービスで対応するのがベターだと思います。
DBの接続についてもDatabaseConnectionクラスをDIコンテナからコンストラクタインジェクションで対応しています。
from typing import Optional
from injector import inject
from sqlalchemy.future import select
from src.database.database import DatabaseConnection
from src.model.user import Users
class UserRepository:
@inject
def __init__(
self,
db: DatabaseConnection,
) -> None:
self.db = db
async def get_user(self) -> Optional[Users]:
async with self.db.get_db() as session:
result = await session.exec(select(Users))
user = result.scalars().first()
return user
DatabaseConnectionクラスも以前記載した記事にあるので、省略して記載しておきます。
基本的にはDIコンテナに登録する際に設定を渡して、シングルトンクラスで扱います。
DI様様です!
class DatabaseConnection:
@singleton
def __init__(self, connection_url: str, migration_url: str, option: dict = {}):
self.connection_url = connection_url
self.migration_url = migration_url
self.option = option
self.engine = self.get_async_engine()
self.session = self.get_session(self.engine)
@asynccontextmanager
async def get_db(self):
async with self.session() as session:
yield session
async def close_engine(self):
if self.engine:
await self.engine.dispose()
await self.session.close()
self.engine = None
self.session = None
def get_url(self) -> str:
return self.connection_url
def get_migration_url(self) -> str:
return self.migration_url
def get_async_engine(self) -> AsyncEngine:
return create_async_engine(self.connection_url, **self.option)
def get_session(self, engine: AsyncEngine) -> AsyncSession:
async_session_factory = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession,
expire_on_commit=True,
)
# セッションのスコープ設定
return async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)
レスポンスモデルの作成
最後にレスポンスモデルの定義を行っていきます。
再度、endpointを載せておきます。
/src/endpoint/user_endpoint.py(省略)
@router.get(
"/me", # パス
response_model=UserResponse, # レスポンスモデル
tags=["user"],
name="ユーザー取得",
description="ユーザー取得",
operation_id="get_me",
)
async def me() -> UserResponse:
try:
result = await di_injector.get_class(UserService).get_user()
except Exception:
raise
return UserResponse(data=result)
UserResponseを作成したいところですが、レスポンスの形式がAPIによって異なることは少ないと思うので、まずはbase_responseを作成したいと思います。
/src/schema/response/base_response.py
SQLModelを継承して、各プロパティを設定しています。
statusやmessageは正常系を扱う際には繰り返し同じものを記載したくないのでデフォルト値を持たせてあります。
from typing import Any, Union
from sqlmodel import Field, SQLModel
class JsonResponse(SQLModel):
status: int = Field(200, description="ステータスコード")
data: Any = Field(None, description="データ")
message: Union[str, None] = Field("ok", description="メッセージ")
このクラスを継承して、user_responseを作成していきます。
/src/schema/response/user_response.py
JsonResponseを継承してUserResponseを作成しました。
入れ子構造でUserResponseItemクラスを定義して、詳細のプロパティを記載しています。
Configクラスではorm_mode=TrueとすることでDBから取得したUserクラスをマッピングしてくれるようになります。
また、schema_extraを記載することでOpenAPIにexampleが表示されるようになります。
from datetime import datetime
from sqlmodel import Field, SQLModel
from src.schema.response.base_response import JsonResponse
class UserResponse(JsonResponse):
class UserResponseItem(SQLModel):
id: int
uuid: str
email: str
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
schema_extra = {
"example": {
"id": 1,
"uuid": "uuid",
"email": "a@a.com",
"created_at": "2022-01-01 00:00:00",
"updated_at": "2022-01-01 00:00:00",
}
}
data: UserResponseItem = Field(None, description="ユーザー情報")
レスポンスの階層イメージ
{
"status": 200, // JsonResponse
"data": { // UserResponse
"id": 1, // UserResponseItem
"uuid": "uuid", // UserResponseItem
"email": "a@a.com", // UserResponseItem
"created_at": "2022-01-01 00:00:00", // UserResponseItem
"updated_at": "2022-01-01 00:00:00" // UserResponseItem
},
"message": "ok" // JsonResponse
}
main.pyへエンドポイントを追加
/src/main.py
作成したエンドポイントをmainでappインスタンスに追加します。
# 変更後
from src.middleware.exception_handler import EnhancedTracebackMiddleware
from src.init import app
from src.endpoint import user_endpoint # 追加する
app = app
app.add_middleware(EnhancedTracebackMiddleware)
# 追加する
app.include_router(user_endpoint.router)
# 変更前
from fastapi import HTTPException # 不要なので削除
from src.middleware.exception_handler import EnhancedTracebackMiddleware
from src.init import app
app = app
app.add_middleware(EnhancedTracebackMiddleware)
# 不要なので削除
@app.get("/")
async def root():
try:
# 何らかの処理
raise Exception("Internal Server Error")
except Exception as e:
raise HTTPException(status_code=500)
ここまでで、基本的には完了です。
DBには適当にデータを入れておいてください。
実際にopenAPIをみてみましょう。
自動生成されたOpenAPIをブラウザで開き、getしてみる
下記にブラウザでアクセスするとOpenAPIが開きます。
実際にTry it outボタンから、リクエストを送ることができます。
(これが神がかって楽なんです。postmanなども使用せずに簡単にデバックやテストが行えます。)
下記のようなデータが返って来ればOKです。
{
"status": 200,
"data": {
"id": 1,
"uuid": "",
"email": "a@a.com",
"created_at": "2024-05-15T00:00:00",
"updated_at": "2024-05-15T00:00:00"
},
"message": "ok"
}
またOpenAPI自体が欲しい場合は
でブラウザにjsonが表示されますが、大概ファイルで欲しいので
curl http://127.0.0.1:8000/openapi.json -o openapi.json
を叩いた方がいいかもです。
取得できたjson
{
"openapi": "3.0.2",
"info": {
"title": "app_name",
"description": "API for app_name",
"version": "0.1.0"
},
"servers": [
{
"url": "http://localhost:8000",
"description": "Local server"
}
],
"paths": {
"/me": {
"get": {
"tags": [
"user"
],
"summary": "ユーザー取得",
"description": "ユーザー取得",
"operationId": "get_me",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UserResponse"
}
}
}
}
}
}
}
},
"components": {
"schemas": {
"UserResponse": {
"title": "UserResponse",
"type": "object",
"properties": {
"status": {
"title": "Status",
"type": "integer",
"description": "ステータスコード",
"default": 200
},
"data": {
"title": "Data",
"allOf": [
{
"$ref": "#/components/schemas/UserResponseItem"
}
],
"description": "ユーザー情報"
},
"message": {
"title": "Message",
"type": "string",
"description": "メッセージ",
"default": "ok"
}
}
},
"UserResponseItem": {
"title": "UserResponseItem",
"required": [
"id",
"uuid",
"email",
"created_at",
"updated_at"
],
"type": "object",
"properties": {
"id": {
"title": "Id",
"type": "integer"
},
"uuid": {
"title": "Uuid",
"type": "string"
},
"email": {
"title": "Email",
"type": "string"
},
"created_at": {
"title": "Created At",
"type": "string",
"format": "date-time"
},
"updated_at": {
"title": "Updated At",
"type": "string",
"format": "date-time"
}
},
"example": {
"id": 1,
"uuid": "uuid",
"email": "a@a.com",
"created_at": "2022-01-01 00:00:00",
"updated_at": "2022-01-01 00:00:00"
}
}
}
}
}
今回は以上です。
この後はテスト関連について書いていきたいと思います。
テストについてはメインで使っているMySQLではなくSqLiteにDBをテスト時だけ、切り替えて行う方針でいこうと思います。
Discussion