Open21

Fast API

tomakotomako
tomakotomako

https://zenn.dev/sh0nk/books/537bb028709ab9

version: '3'
services:
  demo-app:
    build: .
    volumes:
      - .dockervenv:/src/.venv
      - .:/src
    ports:
      - 8000:8000  
 # ホストマシンのポート8000を、docker内のポート8000に接続する
# python3.9のイメージをダウンロード
FROM python:3.9-buster
ENV PYTHONUNBUFFERED=1

WORKDIR /src

# pipを使ってpoetryをインストール
RUN pip install poetry

# poetryの定義ファイルをコピー (存在する場合)
COPY pyproject.toml* poetry.lock* ./

# poetryでライブラリをインストール (pyproject.tomlが既にある場合)
RUN poetry config virtualenvs.in-project true
RUN if [ -f pyproject.toml ]; then poetry install --no-root; fi

# uvicornのサーバーを立ち上げる
ENTRYPOINT ["poetry", "run", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--reload"]

イメージのビルド

docker-compose build
tomakotomako

poetry

  • Pythonのパッケージ管理を行ってくれるツール
  • RubyにおけるBundler
  • poetry を使う理由
    • pip が行わないパッケージ同士の依存関係の解決
    • lockファイルを利用したバージョン固定
    • Pythonの仮想環境管理

poetry を使ってFastAPIをインストールし、 pyproject.toml を作成

docker-compose run \
  --entrypoint "poetry init \
    --name demo-app \
    --dependency fastapi \
    --dependency uvicorn[standard]" \
  demo-app

これはdemo-appの中で

  • poetry init
  • fastapi と、ASGIサーバーである uvicorn をインストール
    この後ダイアログがありAuthorのパートのみ n の入力後はenter
Version [0.1.0]:
Description []:
Author [None, n to skip]:  n
License []:
Compatible Python versions [^3.9]:
:

上記よりFastAPIを依存パッケージに含む、poetryの定義ファイルを作成

FastAPIのインストール

依存パッケージのダウンロードが始まりインストール

docker-compose run --entrypoint "poetry install --no-root" demo-app

poetry.lock ファイルが作成されている
上2つのコマンドよりpyproject.toml poetry.lock ファイルが作成される

新しいPythonパッケージを追加した場合などは次のコマンドでイメージを再ビルドしてpyproject.toml に含まれている全てのパッケージをインストール

docker-compose build --no-cache
tomakotomako

apiディレクトリを作成する

  • __init__.py
    この api ディレクトリがpythonモジュールであることを示す 空ファイル です。
  • main.py
    FastAPIのコードを記述
from fastapi import FastAPI

# FastAPIのインスタンス
app = FastAPI()

# @ で始まるこの部分を、デコレータ と呼ぶ。
# デコレータは、以下の2つの部分に分かれる。
# - パス "/hello"
# - オペレーション  "get" の部分
@app.get("/hello")
async def hello():
    return {"message": "hello world!"}

ディレクトリ構造

.
├── api
│   ├── __init__.py
│   └── main.py
├── Dockerfile
├── docker-compose.yaml
├── poetry.lock
└── pyproject.toml

APIの立ち上げ

docker-compose up

http://localhost:8000/docs
とするとSwagger UIというAPIの仕様を示すドキュメントの画面が出てくる

http://localhost:8000/hello

{
  "message": "hello world!"
}

が表示される

tomakotomako

TODOアプリ

REST API

GET/POST/PUT/DELETE

  • TODOリストを表示する
  • TODOにタスクを追加する
  • TODOのタスクの説明文を変更する
  • TODOのタスク自体を削除する
  • TODOタスクに「完了」フラグを立てる
  • TODOタスクから「完了」フラグを外す

ディレクトリ構成

api
├── __init__.py
├── main.py
├── schemas
│   └── __init__.py
├── routers
│   └── __init__.py
├── models
│   └── __init__.py
└── cruds
    └── __init__.py
  • GET /tasks
  • POST /tasks
  • PUT /tasks/{task_id}
  • DELETE /tasks/{task_id}
  • PUT /tasks/{task_id}/done
  • DELETE /tasks/{task_id}/done
tomakotomako

ルーター(Routers)

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/86648d

api/routers/task.py

from fastapi import APIRouter

router = APIRouter()


@router.get("/tasks")
async def list_tasks():
    pass


@router.post("/tasks")
async def create_task():
    pass


@router.put("/tasks/{task_id}")
async def update_task():
    pass


@router.delete("/tasks/{task_id}")
async def delete_task():
    pass

routers/done.py

from fastapi import APIRouter

router = APIRouter()


@router.put("/tasks/{task_id}/done")
async def mark_task_as_done():
    pass


@router.delete("/tasks/{task_id}/done")
async def unmark_task_as_done():
    pass

passについて
pass は「何もしない文」を表す。

api/main.py

from fastapi import FastAPI
# ここを追加
from api.routers import task, done

app = FastAPI()

# ここを追加
app.include_router(task.router)
app.include_router(done.router)
tomakotomako

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/2c02d7

スキーマ(Schemas)レスポンス

api/schemas/task.py

from typing import Optional

from pydantic import BaseModel, Field

# id, title, done の3フィールド
# それぞれ int, Optional[str], bool の型ヒント
# Field はフィールドに関する付加情報
class Task(BaseModel):
    id: int
    title: Optional[str] = Field(None, example="クリーニングを取りに行く")
    done: bool = Field(False, description="完了フラグ")
  • title は None 、 done は False をデフォルト値に取っている
  • example はフィールドの値の例
    api/routers/task.py
from typing import List
 
 from fastapi import APIRouter
# api.schemas.task を task_schema と読み替えてimport
import api.schemas.task as task_schema
 
 router = APIRouter()
 
# スキーマに定義した Task クラスを複数返すので、リストとして定義
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks():
    return [task_schema.Task(id=1, title="1つ目のTODOタスク")]

id と title を任意の内容にし、 done はデフォルトで False のため指定していない。
ダミーデータとして、 [task_schema.Task(id=1, title="1つ目のTODOタスク")] を返却

tomakotomako

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/a302b1

スキーマ(Schemas) - リクエスト

  • POST関数では id を指定せず、DBで自動的に id を採番する
  • done フィールドは作成時は常に false であるため、 POST /tasks のエンドポイントからは除く
    👉 リクエストボディとして title のフィールドだけ受け取る
class TaskCreate(BaseModel):
    title: Optional[str] = Field(None, example="クリーニングを取りに行く")

Task と TaskCreate の共通するフィールドは title のみ
👉title のみを持つ両方のベースクラスとして、 TaskBase を定義
共通部分を切り出す

api/schemas/task.py
# 共通部分
class TaskBase(BaseModel):
    title: Optional[str] = Field(None, example="クリーニングを取りに行く")

# passにする= 何もしない文
class TaskCreate(TaskBase):
    pass


class TaskCreateResponse(TaskCreate):
    id: int

    class Config:
        orm_mode = True

# 共通部分titleを除く
class Task(TaskBase):
    id: int
    done: bool = Field(False, description="完了フラグ")

# orm_mode はDBと接続する際に使用
# レスポンススキーマ TaskCreateResponse が、暗黙的にORMを受け取り、レスポンススキーマに変換することを意味
    class Config:
        orm_mode = True

ルーター (POST)

以下のことを行なってる

  • APIとして正しい型のデータを受け取り、正しい型でレスポンスを返す
  • 受け取ったリクエストボディに id を付与して、レスポンスデータを返す
api/routers/task.py
@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
#引数にリクエストボディ
async def create_task(task_body: task_schema.TaskCreate):
# dict に変換し、これらのkey/valueおよび id=1 を持つ task_schema.TaskCreateResponse インスタンスを作成
    return task_schema.TaskCreateResponse(id=1, **task_body.dict())
  • dict インスタンスに対して先頭に ** をつけることで、 dict を キーワード引数として展開 し、 task_schema.TaskCreateResponse クラスのコンストラクタに対して dict のkey/valueを渡す。
    👉task_schema.TaskCreateResponse(id=1, title=task_body.title, done=task_body.done) と同じ
tomakotomako
api/routers/task.py
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks():
    return [task_schema.Task(id=1, title="1つ目のTODOタスク")]


@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(task_body: task_schema.TaskCreate):
    return task_schema.TaskCreateResponse(id=1, **task_body.dict())

# putはtaskにすでにidを持つため task_id: intを追加
@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(task_id: int, task_body: task_schema.TaskCreate):
    return task_schema.TaskCreateResponse(id=task_id, **task_body.dict())


@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int):
    return
api/routers/done.py
@router.put("/tasks/{task_id}/done", response_model=None)
async def mark_task_as_done(task_id: int):
    return


@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int):
    return
  • チェックボックスをON/OFFするだけのためリクエストボディもレスポンスもない
  • いいね機能も同じ?
tomakotomako

MySQLコンテナの立ち上げ

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/281ee0

docker-compose.yaml
version: '3'
services:
  demo-app:
    build: .
    volumes:
      - .dockervenv:/src/.venv
      - .:/src
    ports:
      - 8000:8000  # ホストマシンのポート8000を、docker内のポート8000に接続する
  db:
    image: mysql:8.0
    platform: linux/x86_64  # M1 Macの場合必要
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: 'yes'  # rootアカウントをパスワードなしで作成
      MYSQL_DATABASE: 'demo'  # 初期データベースとしてdemoを設定
      TZ: 'Asia/Tokyo'  # タイムゾーンを日本時間に設定
    volumes:
      - mysql_data:/var/lib/mysql
    command: --default-authentication-plugin=mysql_native_password  # MySQL8.0ではデフォルトが"caching_sha2_password"で、ドライバが非対応のため変更
    ports:
      - 33306:3306  # ホストマシンのポート33306を、docker内のポート3306に接続する
volumes:
  mysql_data:
docker-compose up
docker-compose exec db mysql demo 

インストール

sqlalchemy というORMライブラリ
aiomysql はMySQL向けに非同期IO処理を提供するライブラリ

# "demo-app" コンテナの中で "poetry add sqlalchemy aiomysql" を実行
docker-compose exec demo-app poetry add sqlalchemy aiomysql

実行後変更点を確認

pyproject.toml
[tool.poetry.dependencies]
python = "^3.9"
fastapi = "^0.65.1"
uvicorn = {extras = ["standard"], version = "^0.13.4"}
SQLAlchemy = "^1.4.20"
aiomysql = "^0.0.21"

DB接続クラス

api/db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

ASYNC_DB_URL = "mysql+aiomysql://root@db:3306/demo?charset=utf8"

# MySQLのdockerコンテナに対して接続するセッションを作成
async_engine = create_async_engine(ASYNC_DB_URL, echo=True)
async_session = sessionmaker(
    autocommit=False, autoflush=False, bind=async_engine, class_=AsyncSession
)

Base = declarative_base()

#セッションを取得し、DBへのアクセスを可能にする
async def get_db():
    async with async_session() as session:
        yield session

tomakotomako

DBのモデル(Models)の定義

api/models/task.py
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship

from api.db import Base


class Task(Base):
    __tablename__ = "tasks"

    #第一引数にカラムの型を渡します。第2引数以降にカラムの設定
    id = Column(Integer, primary_key=True)
    title = Column(String(1024))

    #テーブル(モデルクラス)同士の関係性を定義
    done = relationship("Done", back_populates="task", cascade="delete")


class Done(Base):
    __tablename__ = "dones"

    id = Column(Integer, ForeignKey("tasks.id"), primary_key=True)

    task = relationship("Task", back_populates="done")

DBマイグレーション

api/migrate_db.py
from sqlalchemy import create_engine

from api.models.task import Base

DB_URL = "mysql+pymysql://root@db:3306/demo?charset=utf8"
engine = create_engine(DB_URL, echo=True)


def reset_database():
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)


if __name__ == "__main__":
    reset_database()
tomakotomako

DBマイグレーション

api/migrate_db.py
from sqlalchemy import create_engine

from api.models.task import Base

DB_URL = "mysql+pymysql://root@db:3306/demo?charset=utf8"
engine = create_engine(DB_URL, echo=True)


def reset_database():
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)


if __name__ == "__main__":
    reset_database()
# api モジュールの migrate_db スクリプトを実行する
docker-compose exec demo-app poetry run python -m api.migrate_db

DB確認する

 docker-compose exec db mysql demo
mysql> SHOW TABLES;
tomakotomako

DB操作(CRUDs)

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/b92ab0

api/cruds/task.py
from sqlalchemy.ext.asyncio import AsyncSession

import api.models.task as task_model
import api.schemas.task as task_schema

# 引数としてスキーマ task_create: task_schema.TaskCreate を受け取る。
async def create_task(
    db: AsyncSession, task_create: task_schema.TaskCreate
) -> task_model.Task:
    # task_model.Task に変換して、DB に保存する。
    task = task_model.Task(**task_create.dict())
    db.add(task)
    # DBにコミットする
    await db.commit()
    # DB上のデータを元にTaskインスタンス task を更新する。
    await db.refresh(task)
    return task

async def は関数が非同期処理を行うことができる、 「コルーチン関数」 (以下、コルーチン)である
await では、非同期処理、ここではDBへの接続(IO処理)が発生するため、「待ち時間が発生するような処理をしますよ」ということを示しています。これによって、pythonはこのコルーチンの処理からいったん離れ、イベントループ内で別のコルーチンの処理を行うことができるようになります。これが非同期・並行処理の肝になります。

tomakotomako

Create

ルータの変更

routers/task.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

import api.cruds.task as task_crud
from api.db import get_db
api/routers/task.py
@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(
# Depends は引数に関数を取り、 DI(Dependency Injection、依存性注入) を行うためのもの= テストを容易にする仕組み
    task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
    return await task_crud.create_task(db, task_body)
tomakotomako

Read

ToDoアプリでは Task に対して Done モデルが定義されている
別々に Read で取得するのは面倒のためjoinして、ToDoタスクにDoneフラグが付加された状態のリストを取得するエンドポイントを作成する

api/cruds/task.py
from typing import List, Tuple

from sqlalchemy import select
from sqlalchemy.engine import Result


async def get_tasks_with_done(db: AsyncSession) -> List[Tuple[int, str, bool]]:
    result: Result = await (
        db.execute(
#select() で必要なフィールドを指定
            select(
                task_model.Task.id,
                task_model.Task.title,
# Done.id が存在するときは done=True とし、存在しないときは done=False としたレコードを返却。
                task_model.Done.id.isnot(None).label("done"),
#メインのDBモデルに対してjoinしたいモデルを指定
            ).outerjoin(task_model.Done)
        )
    )
    return result.all()

ルーター

api/routers/task.py
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks(db: AsyncSession = Depends(get_db)):
    return await task_crud.get_tasks_with_done(db)
tomakotomako

Update

最初にリクエストしているのが存在している Task に対してなのかをチェックし、存在した場合は更新、存在しない場合は404エラーを返却するAPI

api/cruds/task.py
from typing import List, Tuple, Optional


async def get_task(db: AsyncSession, task_id: int) -> Optional[task_model.Task]:
    result: Result = await db.execute(
        select(task_model.Task).filter(task_model.Task.id == task_id)
    )
    task: Optional[Tuple[task_model.Task]] = result.first()
    return task[0] if task is not None else None  # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す


async def update_task(
    db: AsyncSession, task_create: task_schema.TaskCreate, original: task_model.Task
) -> task_model.Task:
    original.title = task_create.title
    db.add(original)
    await db.commit()
    await db.refresh(original)
    return original

ルーター

api/routers/task.py
from fastapi import APIRouter, Depends, HTTPException


@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(
    task_id: int, task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
    task = await task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return await task_crud.update_task(db, task_body, original=task)
tomakotomako

Delete

api/cruds/task.py
async def delete_task(db: AsyncSession, original: task_model.Task) -> None:
    await db.delete(original)
    await db.commit()

ルーター

api/routers/task.py
@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db)):
    task = await task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return await task_crud.delete_task(db, original=task)
tomakotomako

Doneリソース

CRUD

api/cruds/done.py
from typing import Tuple, Optional

from sqlalchemy import select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession

import api.models.task as task_model


async def get_done(db: AsyncSession, task_id: int) -> Optional[task_model.Done]:
    result: Result = await db.execute(
        select(task_model.Done).filter(task_model.Done.id == task_id)
    )
    done: Optional[Tuple[task_model.Done]] = result.first()
    return done[0] if done is not None else None  # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す


async def create_done(db: AsyncSession, task_id: int) -> task_model.Done:
    done = task_model.Done(id=task_id)
    db.add(done)
    await db.commit()
    await db.refresh(done)
    return done


async def delete_done(db: AsyncSession, original: task_model.Done) -> None:
    await db.delete(original)
    await db.commit()

ルータ

api/routers/done.py
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession

import api.schemas.done as done_schema
import api.cruds.done as done_crud
from api.db import get_db

router = APIRouter()


@router.put("/tasks/{task_id}/done", response_model=done_schema.DoneResponse)
async def mark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
    done = await done_crud.get_done(db, task_id=task_id)
    if done is not None:
        raise HTTPException(status_code=400, detail="Done already exists")

    return await done_crud.create_done(db, task_id)


@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
    done = await done_crud.get_done(db, task_id=task_id)
    if done is None:
        raise HTTPException(status_code=404, detail="Done not found")

    return await done_crud.delete_done(db, original=done)

schema

api/schemas/done.py
from pydantic import BaseModel


class DoneResponse(BaseModel):
    id: int

    class Config:
        orm_mode = True
tomakotomako

ユニットテスト

https://zenn.dev/sh0nk/books/537bb028709ab9/viewer/d3f074

テスト関連ライブラリのインストール

  • Pytestを使う
  • Pytestを非同期用に拡張する、 pytest-asyncio をインストール
  • SQLiteのオンメモリモードを使用するためSQLiteの非同期クライアントとして aiosqlite をインストール
  • 非同期HTTPクライアントの httpx をインストール
docker-compose exec demo-app poetry add -D pytest-asyncio aiosqlite httpx
  • -D はpoetryの「開発用モード」を指定するオプション
(project root)
├── Dockerfile
├── docker-compose.yaml
├── poetry.lock
├── pyproject.toml
├── api
│   ├── __init__.py
│   ├── db.py
│   ├── main.py
│   ├── migrate_db.py
│   ├── cruds
│   ├── models
│   ├── routers
│   └── schemas
└── tests
    ├── __init__.py
    └── test_main.py
  1. Async用のengineとsessionを作成
  2. テスト用にオンメモリのSQLiteテーブルを初期化(関数ごとにリセット)
  3. DIを使ってFastAPIのDBの向き先をテスト用DBに変更
  4. テスト用に非同期HTTPクライアントを返却
tests/test_main.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from api.db import get_db, Base
from api.main import app

ASYNC_DB_URL = "sqlite+aiosqlite:///:memory:"


@pytest.fixture
async def async_client() -> AsyncClient:
    # Async用のengineとsessionを作成
    async_engine = create_async_engine(ASYNC_DB_URL, echo=True)
    async_session = sessionmaker(
        autocommit=False, autoflush=False, bind=async_engine, class_=AsyncSession
    )

    # テスト用にオンメモリのSQLiteテーブルを初期化(関数ごとにリセット)
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    # DIを使ってFastAPIのDBの向き先をテスト用DBに変更
    async def get_test_db():
        async with async_session() as session:
            yield session

### オーバーライドget_db の代わりに get_test_db を使う
    app.dependency_overrides[get_db] = get_test_db

    # テスト用に非同期HTTPクライアントを返却
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

テストを書く

最初に POST コールによってTODOタスクを作成し、2つ目の GET コールによって作成したTODOタスクを確認

tests/test_main.py
import starlette.status


@pytest.mark.asyncio
async def test_create_and_read(async_client):
    response = await async_client.post("/tasks", json={"title": "テストタスク"})
    assert response.status_code == starlette.status.HTTP_200_OK
    response_obj = response.json()
    assert response_obj["title"] == "テストタスク"

    response = await async_client.get("/tasks")
    assert response.status_code == starlette.status.HTTP_200_OK
    response_obj = response.json()
    assert len(response_obj) == 1
    assert response_obj[0]["title"] == "テストタスク"
    assert response_obj[0]["done"] is False

完了フラグを使ったテストを書く

tests/test_main.py
@pytest.mark.asyncio
async def test_done_flag(async_client):
    response = await async_client.post("/tasks", json={"title": "テストタスク2"})
    assert response.status_code == starlette.status.HTTP_200_OK
    response_obj = response.json()
    assert response_obj["title"] == "テストタスク2"

    # 完了フラグを立てる
    response = await async_client.put("/tasks/1/done")
    assert response.status_code == starlette.status.HTTP_200_OK

    # 既に完了フラグが立っているので400を返却
    response = await async_client.put("/tasks/1/done")
    assert response.status_code == starlette.status.HTTP_400_BAD_REQUEST

    # 完了フラグを外す
    response = await async_client.delete("/tasks/1/done")
    assert response.status_code == starlette.status.HTTP_200_OK

    # 既に完了フラグが外れているので404を返却
    response = await async_client.delete("/tasks/1/done")
    assert response.status_code == starlette.status.HTTP_404_NOT_FOUND

テストを実行する

docker-compose run --entrypoint "poetry run pytest" demo-app