🫠

FastAPIでサービス層とリポジトリ層を使って一連の流れを作成してみた

2024/05/19に公開

この記事は前回からの内容を前提に記載しているのでご注意ください。
DockerでFastAPI環境構築(HelloWorldまで)
https://zenn.dev/momonga_g/articles/f131ea192b1184
FastAPIのDB接続とマイグレーション(DIコンテナも準備)
https://zenn.dev/momonga_g/articles/a62e2b95c68590
FastAPIのログ設定と全体のエラーハンドリング
https://zenn.dev/momonga_g/articles/f9248614c01859

ではさっそく始めていきます!
FastAPIはエンドポイントがそのままrouteとcontrollerとしての役割を果たします。

ここを分割することも可能ですが、分けるメリットがあまりないのと逆に見通しが悪くなりそうな気がしていて、一緒に定義した方がいいと思っています。(公式も一緒だしね)

ただ、そのままエンドポイントに処理やDBのデータのやり取りのロジックを入れるとかなり見通しと再利用性が悪くなります。

なので、大雑把に分けると

endpoint→service→repository→DBのように責務を分けてみたいと思います。

(Laravelなどは結構このパターンが多いと思います。FastAPIのお作法だともっと違うやり方があるかもしれないので、あくまで筆者の分け方だという認識でいて頂ければと思います。)

  • endpoint
    • route定義とリクエストを受け取り、レスポンスを返すのみに責務を絞る
  • service
    • ロジックやデータの加工を行う。
  • repository
    • DBへのアクセス

実際にかなり簡単な例ですが、ユーザー取得の流れを書いてみます。(記事全般が前回からの流れをくみますので、ご注意ください。)

project_root
├── _docker
│   ├── nginx
│   │   └── nginx.conf
│   └── python
│       └── Dockerfile
├── src
│   └── main.py
│   └── init.py
│   └── core
│        └── dependency.py
│        └── logging.py
│   └── middleware
│        └── exception_handler.py
│   └── schema
│        └── response
│           └── error_response.py
│           └── base_response.py
│           └── user_response.py
│   └── database
│        └── database.py
│   └── endpoint// 追加
│        └── user_endpoint.py// 追加
│   └── service// 追加
│        └── user_service.py// 追加
│   └── repository// 追加
│        └── user_repository.py// 追加
├── .env
├── pyproject.toml
├── poetry.lock
├── makefile
├── makefile.local
├── makefile.container
├── .gitignore
└── docker-compose.yml

エンドポイントの作成

サービスの作成

リポジトリの作成

レスポンスモデルの作成

main.pyへエンドポイントを追加

自動生成されたOpenAPIをブラウザで開き、getしてみる

上記の順番で作業を進めていきます。

エンドポイントの作成

/src/endpoint/user_endpoint.pyを作成します。

APIRouterクラスを定義してあげることでmain.pyのappに個別定義したrouteを追加することができるようになります。

routerデコレータの中身はOpenAPI関連の記載もしていきます。

OpenAPIを自動生成してくれるのがFastAPIの強みのひとつなので、情報として必要な部分は積極的に記載していく方が望ましいです。

デコレータ内のoperation_idについては、フロント側のAPI自動コード生成でメソッド名として、扱われるので、OpenAPIジェネレータを使用する際は分かりやすい命名で記載した方がいいと思います。

from fastapi import APIRouter
from src.schema.response.user_response import UserResponse
from src.core.dependency import di_injector
from src.service.user_service import UserService

router = APIRouter()

@router.get(
    "/me", # パス
    response_model=UserResponse, # レスポンスモデル(OpenAPIにプロパティが使用される)
    tags=["user"], # OpenAPIのタグ
    name="ユーザー取得", # OpenAPIの名称
    description="ユーザー取得", # OpenAPIの説明
    operation_id="get_me", # OpenAPIジェネレータでコード自動生成しなければ不要
)
async def me() -> UserResponse:
    try:
        result = await di_injector.get_class(UserService).get_user()
    except Exception:
        raise
    return UserResponse(data=result)

メソッドの中身ですが、極めて簡単ですね!

処理の途中で例外が発生した場合はcatchで補足して、以前作成したハンドラーが拾ってフロントへ返します。

メインの流れはdi_injectorクラス(DIコンテナ)でUserServiceクラスを取得してget_userメソッドを呼び出しています。

FastAPIのDependenciedを使用しても良いのですが、Serviceには複数のRepositoryの依存が発生する可能性があるので、DIコンテナに任せてしまった方が、見通しが良く扱いやすいため、採用しました。

Dependencies - FastAPI

di_injectorは前回作成したものですが、再度乗せておきます。

from injector import Injector, inject
from src.database import AppConfig

class DependencyInjector:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(DependencyInjector, cls).__new__(cls)
            cls._instance._initialize()
        return cls._instance

    def _initialize(self):
        self.di = Injector([AppConfig()])

    async def update_injector(self, _class):
        self.di = Injector([_class])

    @inject
    def get_class(self, _class):
        return self.di.get(_class)

di_injector = DependencyInjector()

サービスの作成

サービス層を作成していきます。

今回はリポジトリを呼び出す以外は行っていませんが、ここでデータの加工やビジネスロジックを記載したりします。(本来はマジでコアとなるビジネスロジックはサービスドメインなどに切り出した方がいいです。)

ここでもinjectorを使用してコンストラクタインジェクションにて対応しています。(endpointもクラス化した方が設計に一貫性が出るのでそっちの方が良かったかもです。)

from injector import inject

from src.repository.user_repository import UserRepository

class UserService:
    @inject
    def __init__(
        self,
        repository: UserRepository,
    ):
        self.repository = repository

    async def get_user(self):
        return await self.repository.get_user()

リポジトリの作成

データ取得は適当なので参考にしないでください。

ただ、流れとしてサービスから呼び出されたリポジトリはデータのCRUDの責任を守ります。

それ以外はサービスで対応するのがベターだと思います。

DBの接続についてもDatabaseConnectionクラスをDIコンテナからコンストラクタインジェクションで対応しています。

from typing import Optional
from injector import inject
from sqlalchemy.future import select
from src.database.database import DatabaseConnection
from src.model.user import Users

class UserRepository:
    @inject
    def __init__(
        self,
        db: DatabaseConnection,
    ) -> None:
        self.db = db
        
    async def get_user(self) -> Optional[Users]:
        async with self.db.get_db() as session:
            result = await session.exec(select(Users))
            user = result.scalars().first()
            return user

DatabaseConnectionクラスも以前記載した記事にあるので、省略して記載しておきます。

基本的にはDIコンテナに登録する際に設定を渡して、シングルトンクラスで扱います。

DI様様です!

class DatabaseConnection:
    @singleton
    def __init__(self, connection_url: str, migration_url: str, option: dict = {}):
        self.connection_url = connection_url
        self.migration_url = migration_url
        self.option = option
        self.engine = self.get_async_engine()
        self.session = self.get_session(self.engine)

    @asynccontextmanager
    async def get_db(self):
        async with self.session() as session:
            yield session

    async def close_engine(self):
        if self.engine:
            await self.engine.dispose()
            await self.session.close()
            self.engine = None
            self.session = None

    def get_url(self) -> str:
        return self.connection_url

    def get_migration_url(self) -> str:
        return self.migration_url

    def get_async_engine(self) -> AsyncEngine:
        return create_async_engine(self.connection_url, **self.option)

    def get_session(self, engine: AsyncEngine) -> AsyncSession:
        async_session_factory = sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=engine,
            class_=AsyncSession,
            expire_on_commit=True,
        )
        # セッションのスコープ設定
        return async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)

レスポンスモデルの作成

最後にレスポンスモデルの定義を行っていきます。

再度、endpointを載せておきます。

/src/endpoint/user_endpoint.py(省略)

@router.get(
    "/me", # パス
    response_model=UserResponse, # レスポンスモデル
    tags=["user"],
    name="ユーザー取得",
    description="ユーザー取得",
    operation_id="get_me",
)
async def me() -> UserResponse:
    try:
        result = await di_injector.get_class(UserService).get_user()
    except Exception:
        raise
    return UserResponse(data=result)

UserResponseを作成したいところですが、レスポンスの形式がAPIによって異なることは少ないと思うので、まずはbase_responseを作成したいと思います。

/src/schema/response/base_response.py

SQLModelを継承して、各プロパティを設定しています。

statusやmessageは正常系を扱う際には繰り返し同じものを記載したくないのでデフォルト値を持たせてあります。

from typing import Any, Union
from sqlmodel import Field, SQLModel

class JsonResponse(SQLModel):
    status: int = Field(200, description="ステータスコード")
    data: Any = Field(None, description="データ")
    message: Union[str, None] = Field("ok", description="メッセージ")

このクラスを継承して、user_responseを作成していきます。

/src/schema/response/user_response.py

JsonResponseを継承してUserResponseを作成しました。

入れ子構造でUserResponseItemクラスを定義して、詳細のプロパティを記載しています。

Configクラスではorm_mode=TrueとすることでDBから取得したUserクラスをマッピングしてくれるようになります。

また、schema_extraを記載することでOpenAPIにexampleが表示されるようになります。

from datetime import datetime
from sqlmodel import Field, SQLModel

from src.schema.response.base_response import JsonResponse

class UserResponse(JsonResponse):
    class UserResponseItem(SQLModel):
        id: int
        uuid: str
        email: str
        created_at: datetime
        updated_at: datetime
        class Config:
            orm_mode = True
            schema_extra = {
                "example": {
                    "id": 1,
                    "uuid": "uuid",
                    "email": "a@a.com",
                    "created_at": "2022-01-01 00:00:00",
                    "updated_at": "2022-01-01 00:00:00",
                }
            }
    data: UserResponseItem = Field(None, description="ユーザー情報")

レスポンスの階層イメージ

{
  "status": 200, // JsonResponse
  "data": { // UserResponse
    "id": 1, // UserResponseItem
    "uuid": "uuid", // UserResponseItem
    "email": "a@a.com", // UserResponseItem
    "created_at": "2022-01-01 00:00:00", // UserResponseItem
    "updated_at": "2022-01-01 00:00:00" // UserResponseItem
  },
  "message": "ok" // JsonResponse
}

main.pyへエンドポイントを追加

/src/main.py

作成したエンドポイントをmainでappインスタンスに追加します。

# 変更後
from src.middleware.exception_handler import EnhancedTracebackMiddleware
from src.init import app
from src.endpoint import user_endpoint # 追加する

app = app
app.add_middleware(EnhancedTracebackMiddleware)

# 追加する
app.include_router(user_endpoint.router)

# 変更前
from fastapi import HTTPException # 不要なので削除
from src.middleware.exception_handler import EnhancedTracebackMiddleware
from src.init import app

app = app
app.add_middleware(EnhancedTracebackMiddleware)

# 不要なので削除
@app.get("/")
async def root():
    try:
        # 何らかの処理
        raise Exception("Internal Server Error")
    except Exception as e:
        raise HTTPException(status_code=500)

ここまでで、基本的には完了です。

DBには適当にデータを入れておいてください。

実際にopenAPIをみてみましょう。

自動生成されたOpenAPIをブラウザで開き、getしてみる

下記にブラウザでアクセスするとOpenAPIが開きます。

http://localhost:8000/docs

実際にTry it outボタンから、リクエストを送ることができます。

(これが神がかって楽なんです。postmanなども使用せずに簡単にデバックやテストが行えます。)

下記のようなデータが返って来ればOKです。

{
  "status": 200,
  "data": {
    "id": 1,
    "uuid": "",
    "email": "a@a.com",
    "created_at": "2024-05-15T00:00:00",
    "updated_at": "2024-05-15T00:00:00"
  },
  "message": "ok"
}

またOpenAPI自体が欲しい場合は

http://127.0.0.1:8000/openapi.json

でブラウザにjsonが表示されますが、大概ファイルで欲しいので

curl http://127.0.0.1:8000/openapi.json -o openapi.json

を叩いた方がいいかもです。

取得できたjson

{
    "openapi": "3.0.2",
    "info": {
        "title": "app_name",
        "description": "API for app_name",
        "version": "0.1.0"
    },
    "servers": [
        {
            "url": "http://localhost:8000",
            "description": "Local server"
        }
    ],
    "paths": {
        "/me": {
            "get": {
                "tags": [
                    "user"
                ],
                "summary": "ユーザー取得",
                "description": "ユーザー取得",
                "operationId": "get_me",
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {
                            "application/json": {
                                "schema": {
                                    "$ref": "#/components/schemas/UserResponse"
                                }
                            }
                        }
                    }
                }
            }
        }
    },
    "components": {
        "schemas": {
            "UserResponse": {
                "title": "UserResponse",
                "type": "object",
                "properties": {
                    "status": {
                        "title": "Status",
                        "type": "integer",
                        "description": "ステータスコード",
                        "default": 200
                    },
                    "data": {
                        "title": "Data",
                        "allOf": [
                            {
                                "$ref": "#/components/schemas/UserResponseItem"
                            }
                        ],
                        "description": "ユーザー情報"
                    },
                    "message": {
                        "title": "Message",
                        "type": "string",
                        "description": "メッセージ",
                        "default": "ok"
                    }
                }
            },
            "UserResponseItem": {
                "title": "UserResponseItem",
                "required": [
                    "id",
                    "uuid",
                    "email",
                    "created_at",
                    "updated_at"
                ],
                "type": "object",
                "properties": {
                    "id": {
                        "title": "Id",
                        "type": "integer"
                    },
                    "uuid": {
                        "title": "Uuid",
                        "type": "string"
                    },
                    "email": {
                        "title": "Email",
                        "type": "string"
                    },
                    "created_at": {
                        "title": "Created At",
                        "type": "string",
                        "format": "date-time"
                    },
                    "updated_at": {
                        "title": "Updated At",
                        "type": "string",
                        "format": "date-time"
                    }
                },
                "example": {
                    "id": 1,
                    "uuid": "uuid",
                    "email": "a@a.com",
                    "created_at": "2022-01-01 00:00:00",
                    "updated_at": "2022-01-01 00:00:00"
                }
            }
        }
    }
}

今回は以上です。

この後はテスト関連について書いていきたいと思います。

テストについてはメインで使っているMySQLではなくSqLiteにDBをテスト時だけ、切り替えて行う方針でいこうと思います。

Discussion