🏎

FastAPIでREST(その2)

2023/04/18に公開

前回まで

https://zenn.dev/robon/articles/aa7ba513b3bdb3

FastAPIも導入すれば、とりあえず動くところまでは簡単でした。

設計と実装

プロジェクトの構造はどうする?

NestJSはCLIでひな型の自動生成をしてくれたので、それに乗っかることにしました。スキーマ単位の構造は、スキーマ駆動でマイクロサービスな感じで、これからはこういう構造になっていくのかなぁと思いました。

で、FastAPIです。公式では、どうやらレイヤ構造を推奨しているようです。

https://fastapi.tiangolo.com/tutorial/bigger-applications/

多くの記事で上記に近い構造が紹介されていましたが、ここはNestJSのようにスキーマ単位の構造を採用してみようと思います。ディレクトリがスキーマ名になり、機能名がファイル名になるだけなので、縦と横をひっくり返したようになるだけです。

$ tree src -I __pycache__
src
├── config.py
├── customers
│   ├── __init__.py
│   ├── models.py
│   ├── router.py
│   ├── schemas.py
│   └── service.py
├── database.py
├── __init__.py
├── main.py
├── orders
└── products

最初に実装するcustomersと同様にordersとproductsにも実装していきます。

SQLAlchemy & Postgres

折角なので、データベースアクセスも非同期にしてみます。

(venv) $ pip install sqlalchemy
(venv) $ pip install asyncpg

共通部

pydanticのBeseSettingsを使って.envの設定を取り込みます。

src/config.py
from pydantic import BaseSettings


class Settings(BaseSettings):
    DB_HOST: str
    DB_USER: str
    DB_PASS: str

    class Config:
        env_file = ".env.dev"

上記を使用して、SQLAlchemyの共通設定をします。SQLAlchemy1.4相当の実装例が多かったので、本家のドキュメントを読んで、できるだけ2.0相当で、かつ、非同期を使うように実装しました。

src/database.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

from . import config


settings = config.Settings()
DB_URL = "postgresql+asyncpg://" \
    f"{settings.DB_USER}:{settings.DB_PASS}@{settings.DB_HOST}:5432/postgres"

engine = create_async_engine(DB_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)


class Base(DeclarativeBase):
    pass


async def get_session():
    async with async_session() as session:
        yield session

あとは、main.pyですが、これは個別部の後にします。

customers

まず、データベースのモデルを定義します。

src/customers/models.py
from sqlalchemy.orm import Mapped, mapped_column

from src.database import Base


class Customer(Base):
    __tablename__ = "customer"

    customerId: Mapped[int] = mapped_column(
        name="customer_id", primary_key=True)
    name: Mapped[str]
    address: Mapped[str]

    def __repr__(self) -> str:
        return f"Customer(customerId={self.customerId!r}, " \
            f"name={self.name!r}, address={self.address!r})"

次に、リソース表現としてのリクエスト、レスポンスのスキーマをpydanticのBaseModelで定義します。

src/customers/schemas.py
from pydantic import BaseModel


class Customer(BaseModel):
    customerId: int
    name: str
    address: str

    class Config:
        orm_mode = True

orm_mode = True については、以下の意味になります。

Pydantic's orm_mode will tell the Pydantic model to read the data even if it is not a dict, but an ORM model (or any other arbitrary object with attributes).

https://fastapi.tiangolo.com/ja/tutorial/sql-databases/

モデルとスキーマの準備ができたら、この両者の変換とSQLAlchemyへの命令を実装します。

src/customers/service.py
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from . import models, schemas


async def get_customer(db: AsyncSession, customerId: int):
    result = await (db.execute(select(models.Customer).filter(
        models.Customer.customerId == customerId)))
    customer = result.first()
    if customer is None:
        raise HTTPException(status_code=404, detail="Customer not found")
    return customer[0]


async def create_customer(db: AsyncSession, customer: schemas.Customer):
    db_customer = models.Customer(
        customerId=customer.customerId,
        name=customer.name,
        address=customer.address)
    db.add(db_customer)
    await db.commit()
    await db.refresh(db_customer)
    return db_customer


async def update_customer(
        db: AsyncSession, customerId: int, customer: schemas.Customer
):
    original = await get_customer(db, customerId)
    original.name = customer.name
    original.address = customer.address
    db.add(original)
    await db.commit()
    await db.refresh(original)
    return original


async def delete_customer(db: AsyncSession, customerId: int):
    original = await get_customer(db, customerId)
    await db.delete(original)
    await db.commit()

そして、最後にルーターを定義します。

src/customers/router.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.database import get_session
from . import schemas, service


router = APIRouter()


@router.post("/customers", response_model=schemas.Customer)
async def create_customer(
    customer: schemas.Customer, db: AsyncSession = Depends(get_session)
):
    return await service.create_customer(db, customer)


@router.get("/customers/{customerId}", response_model=schemas.Customer)
async def get_customer(
    customerId: int, db: AsyncSession = Depends(get_session)
):
    return await service.get_customer(db, customerId)


@router.put("/customers/{customerId}", response_model=schemas.Customer)
async def update_customer(
    customerId: int,
    customer: schemas.Customer,
    db: AsyncSession = Depends(get_session)
):
    return await service.update_customer(db, customerId, customer)


@router.delete("/customers/{customerId}", response_model=None)
async def delete_customer(
    customerId: int, db: AsyncSession = Depends(get_session)
):
    await service.delete_customer(db, customerId)

もう一度、共通部

リソース毎にまとめたルーターをアプリケーションに組み込みます。

src/main.py
from fastapi import FastAPI

from src.customers.router import router as customers_router

app = FastAPI()
app.include_router(customers_router)

次回は

残りのリソースを実装していきます。

https://zenn.dev/robon/articles/c943cc4740bffb

GitHubで編集を提案
株式会社ROBONの技術ブログ

Discussion