Fast API
初めに
プロジェクトの作成
Dockerfile
フォルダ構成
version: '3'
services:
demo-app:
build: .
volumes:
- .dockervenv:/src/.venv
- .:/src
ports:
- 8000:8000
# ホストマシンのポート8000を、docker内のポート8000に接続する
# python3.9のイメージをダウンロード
FROM python:3.9-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /src
# pipを使ってpoetryをインストール
RUN pip install poetry
# poetryの定義ファイルをコピー (存在する場合)
COPY pyproject.toml* poetry.lock* ./
# poetryでライブラリをインストール (pyproject.tomlが既にある場合)
RUN poetry config virtualenvs.in-project true
RUN if [ -f pyproject.toml ]; then poetry install --no-root; fi
# uvicornのサーバーを立ち上げる
ENTRYPOINT ["poetry", "run", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--reload"]
イメージのビルド
docker-compose build
poetry
- Pythonのパッケージ管理を行ってくれるツール
- RubyにおけるBundler
- poetry を使う理由
- pip が行わないパッケージ同士の依存関係の解決
- lockファイルを利用したバージョン固定
- Pythonの仮想環境管理
poetry を使ってFastAPIをインストールし、 pyproject.toml を作成
docker-compose run \
--entrypoint "poetry init \
--name demo-app \
--dependency fastapi \
--dependency uvicorn[standard]" \
demo-app
これはdemo-appの中で
- poetry init
- fastapi と、ASGIサーバーである uvicorn をインストール
この後ダイアログがありAuthorのパートのみ n の入力後はenter
Version [0.1.0]:
Description []:
Author [None, n to skip]: n
License []:
Compatible Python versions [^3.9]:
:
上記よりFastAPIを依存パッケージに含む、poetryの定義ファイルを作成
FastAPIのインストール
依存パッケージのダウンロードが始まりインストール
docker-compose run --entrypoint "poetry install --no-root" demo-app
poetry.lock ファイルが作成されている
上2つのコマンドよりpyproject.toml
と poetry.lock
ファイルが作成される
新しいPythonパッケージを追加した場合などは次のコマンドでイメージを再ビルドしてpyproject.toml に含まれている全てのパッケージをインストール
docker-compose build --no-cache
apiディレクトリを作成する
-
__init__.py
この api ディレクトリがpythonモジュールであることを示す 空ファイル です。 -
main.py
FastAPIのコードを記述
from fastapi import FastAPI
# FastAPIのインスタンス
app = FastAPI()
# @ で始まるこの部分を、デコレータ と呼ぶ。
# デコレータは、以下の2つの部分に分かれる。
# - パス "/hello"
# - オペレーション "get" の部分
@app.get("/hello")
async def hello():
return {"message": "hello world!"}
ディレクトリ構造
.
├── api
│ ├── __init__.py
│ └── main.py
├── Dockerfile
├── docker-compose.yaml
├── poetry.lock
└── pyproject.toml
APIの立ち上げ
docker-compose up
とするとSwagger UIというAPIの仕様を示すドキュメントの画面が出てくる
{
"message": "hello world!"
}
が表示される
TODOアプリ
REST API
GET/POST/PUT/DELETE
- TODOリストを表示する
- TODOにタスクを追加する
- TODOのタスクの説明文を変更する
- TODOのタスク自体を削除する
- TODOタスクに「完了」フラグを立てる
- TODOタスクから「完了」フラグを外す
ディレクトリ構成
api
├── __init__.py
├── main.py
├── schemas
│ └── __init__.py
├── routers
│ └── __init__.py
├── models
│ └── __init__.py
└── cruds
└── __init__.py
- GET /tasks
- POST /tasks
- PUT /tasks/{task_id}
- DELETE /tasks/{task_id}
- PUT /tasks/{task_id}/done
- DELETE /tasks/{task_id}/done
ルーター(Routers)
api/routers/task.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/tasks")
async def list_tasks():
pass
@router.post("/tasks")
async def create_task():
pass
@router.put("/tasks/{task_id}")
async def update_task():
pass
@router.delete("/tasks/{task_id}")
async def delete_task():
pass
routers/done.py
from fastapi import APIRouter
router = APIRouter()
@router.put("/tasks/{task_id}/done")
async def mark_task_as_done():
pass
@router.delete("/tasks/{task_id}/done")
async def unmark_task_as_done():
pass
passについて
pass は「何もしない文」を表す。
api/main.py
from fastapi import FastAPI
# ここを追加
from api.routers import task, done
app = FastAPI()
# ここを追加
app.include_router(task.router)
app.include_router(done.router)
スキーマ(Schemas)レスポンス
api/schemas/task.py
from typing import Optional
from pydantic import BaseModel, Field
# id, title, done の3フィールド
# それぞれ int, Optional[str], bool の型ヒント
# Field はフィールドに関する付加情報
class Task(BaseModel):
id: int
title: Optional[str] = Field(None, example="クリーニングを取りに行く")
done: bool = Field(False, description="完了フラグ")
- title は None 、 done は False をデフォルト値に取っている
- example はフィールドの値の例
api/routers/task.py
from typing import List
from fastapi import APIRouter
# api.schemas.task を task_schema と読み替えてimport
import api.schemas.task as task_schema
router = APIRouter()
# スキーマに定義した Task クラスを複数返すので、リストとして定義
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks():
return [task_schema.Task(id=1, title="1つ目のTODOタスク")]
id と title を任意の内容にし、 done はデフォルトで False のため指定していない。
ダミーデータとして、 [task_schema.Task(id=1, title="1つ目のTODOタスク")] を返却
スキーマ(Schemas) - リクエスト
- POST関数では id を指定せず、DBで自動的に id を採番する
- done フィールドは作成時は常に false であるため、 POST /tasks のエンドポイントからは除く
👉 リクエストボディとして title のフィールドだけ受け取る
class TaskCreate(BaseModel):
title: Optional[str] = Field(None, example="クリーニングを取りに行く")
Task と TaskCreate の共通するフィールドは title のみ
👉title のみを持つ両方のベースクラスとして、 TaskBase を定義
共通部分を切り出す
# 共通部分
class TaskBase(BaseModel):
title: Optional[str] = Field(None, example="クリーニングを取りに行く")
# passにする= 何もしない文
class TaskCreate(TaskBase):
pass
class TaskCreateResponse(TaskCreate):
id: int
class Config:
orm_mode = True
# 共通部分titleを除く
class Task(TaskBase):
id: int
done: bool = Field(False, description="完了フラグ")
# orm_mode はDBと接続する際に使用
# レスポンススキーマ TaskCreateResponse が、暗黙的にORMを受け取り、レスポンススキーマに変換することを意味
class Config:
orm_mode = True
ルーター (POST)
以下のことを行なってる
- APIとして正しい型のデータを受け取り、正しい型でレスポンスを返す
- 受け取ったリクエストボディに id を付与して、レスポンスデータを返す
@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
#引数にリクエストボディ
async def create_task(task_body: task_schema.TaskCreate):
# dict に変換し、これらのkey/valueおよび id=1 を持つ task_schema.TaskCreateResponse インスタンスを作成
return task_schema.TaskCreateResponse(id=1, **task_body.dict())
- dict インスタンスに対して先頭に ** をつけることで、 dict を キーワード引数として展開 し、 task_schema.TaskCreateResponse クラスのコンストラクタに対して dict のkey/valueを渡す。
👉task_schema.TaskCreateResponse(id=1, title=task_body.title, done=task_body.done)
と同じ
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks():
return [task_schema.Task(id=1, title="1つ目のTODOタスク")]
@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(task_body: task_schema.TaskCreate):
return task_schema.TaskCreateResponse(id=1, **task_body.dict())
# putはtaskにすでにidを持つため task_id: intを追加
@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(task_id: int, task_body: task_schema.TaskCreate):
return task_schema.TaskCreateResponse(id=task_id, **task_body.dict())
@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int):
return
@router.put("/tasks/{task_id}/done", response_model=None)
async def mark_task_as_done(task_id: int):
return
@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int):
return
- チェックボックスをON/OFFするだけのためリクエストボディもレスポンスもない
- いいね機能も同じ?
MySQLコンテナの立ち上げ
version: '3'
services:
demo-app:
build: .
volumes:
- .dockervenv:/src/.venv
- .:/src
ports:
- 8000:8000 # ホストマシンのポート8000を、docker内のポート8000に接続する
db:
image: mysql:8.0
platform: linux/x86_64 # M1 Macの場合必要
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: 'yes' # rootアカウントをパスワードなしで作成
MYSQL_DATABASE: 'demo' # 初期データベースとしてdemoを設定
TZ: 'Asia/Tokyo' # タイムゾーンを日本時間に設定
volumes:
- mysql_data:/var/lib/mysql
command: --default-authentication-plugin=mysql_native_password # MySQL8.0ではデフォルトが"caching_sha2_password"で、ドライバが非対応のため変更
ports:
- 33306:3306 # ホストマシンのポート33306を、docker内のポート3306に接続する
volumes:
mysql_data:
docker-compose up
docker-compose exec db mysql demo
インストール
sqlalchemy というORMライブラリ
aiomysql はMySQL向けに非同期IO処理を提供するライブラリ
# "demo-app" コンテナの中で "poetry add sqlalchemy aiomysql" を実行
docker-compose exec demo-app poetry add sqlalchemy aiomysql
実行後変更点を確認
[tool.poetry.dependencies]
python = "^3.9"
fastapi = "^0.65.1"
uvicorn = {extras = ["standard"], version = "^0.13.4"}
SQLAlchemy = "^1.4.20"
aiomysql = "^0.0.21"
DB接続クラス
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
ASYNC_DB_URL = "mysql+aiomysql://root@db:3306/demo?charset=utf8"
# MySQLのdockerコンテナに対して接続するセッションを作成
async_engine = create_async_engine(ASYNC_DB_URL, echo=True)
async_session = sessionmaker(
autocommit=False, autoflush=False, bind=async_engine, class_=AsyncSession
)
Base = declarative_base()
#セッションを取得し、DBへのアクセスを可能にする
async def get_db():
async with async_session() as session:
yield session
DBのモデル(Models)の定義
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from api.db import Base
class Task(Base):
__tablename__ = "tasks"
#第一引数にカラムの型を渡します。第2引数以降にカラムの設定
id = Column(Integer, primary_key=True)
title = Column(String(1024))
#テーブル(モデルクラス)同士の関係性を定義
done = relationship("Done", back_populates="task", cascade="delete")
class Done(Base):
__tablename__ = "dones"
id = Column(Integer, ForeignKey("tasks.id"), primary_key=True)
task = relationship("Task", back_populates="done")
DBマイグレーション
from sqlalchemy import create_engine
from api.models.task import Base
DB_URL = "mysql+pymysql://root@db:3306/demo?charset=utf8"
engine = create_engine(DB_URL, echo=True)
def reset_database():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
reset_database()
DBマイグレーション
from sqlalchemy import create_engine
from api.models.task import Base
DB_URL = "mysql+pymysql://root@db:3306/demo?charset=utf8"
engine = create_engine(DB_URL, echo=True)
def reset_database():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
reset_database()
# api モジュールの migrate_db スクリプトを実行する
docker-compose exec demo-app poetry run python -m api.migrate_db
DB確認する
docker-compose exec db mysql demo
mysql> SHOW TABLES;
DB操作(CRUDs)
from sqlalchemy.ext.asyncio import AsyncSession
import api.models.task as task_model
import api.schemas.task as task_schema
# 引数としてスキーマ task_create: task_schema.TaskCreate を受け取る。
async def create_task(
db: AsyncSession, task_create: task_schema.TaskCreate
) -> task_model.Task:
# task_model.Task に変換して、DB に保存する。
task = task_model.Task(**task_create.dict())
db.add(task)
# DBにコミットする
await db.commit()
# DB上のデータを元にTaskインスタンス task を更新する。
await db.refresh(task)
return task
async def は関数が非同期処理を行うことができる、 「コルーチン関数」 (以下、コルーチン)である
await では、非同期処理、ここではDBへの接続(IO処理)が発生するため、「待ち時間が発生するような処理をしますよ」ということを示しています。これによって、pythonはこのコルーチンの処理からいったん離れ、イベントループ内で別のコルーチンの処理を行うことができるようになります。これが非同期・並行処理の肝になります。
Create
ルータの変更
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
import api.cruds.task as task_crud
from api.db import get_db
@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(
# Depends は引数に関数を取り、 DI(Dependency Injection、依存性注入) を行うためのもの= テストを容易にする仕組み
task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
return await task_crud.create_task(db, task_body)
Read
ToDoアプリでは Task に対して Done モデルが定義されている
別々に Read で取得するのは面倒のためjoinして、ToDoタスクにDoneフラグが付加された状態のリストを取得するエンドポイントを作成する
from typing import List, Tuple
from sqlalchemy import select
from sqlalchemy.engine import Result
async def get_tasks_with_done(db: AsyncSession) -> List[Tuple[int, str, bool]]:
result: Result = await (
db.execute(
#select() で必要なフィールドを指定
select(
task_model.Task.id,
task_model.Task.title,
# Done.id が存在するときは done=True とし、存在しないときは done=False としたレコードを返却。
task_model.Done.id.isnot(None).label("done"),
#メインのDBモデルに対してjoinしたいモデルを指定
).outerjoin(task_model.Done)
)
)
return result.all()
ルーター
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks(db: AsyncSession = Depends(get_db)):
return await task_crud.get_tasks_with_done(db)
Update
最初にリクエストしているのが存在している Task に対してなのかをチェックし、存在した場合は更新、存在しない場合は404エラーを返却するAPI
from typing import List, Tuple, Optional
async def get_task(db: AsyncSession, task_id: int) -> Optional[task_model.Task]:
result: Result = await db.execute(
select(task_model.Task).filter(task_model.Task.id == task_id)
)
task: Optional[Tuple[task_model.Task]] = result.first()
return task[0] if task is not None else None # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す
async def update_task(
db: AsyncSession, task_create: task_schema.TaskCreate, original: task_model.Task
) -> task_model.Task:
original.title = task_create.title
db.add(original)
await db.commit()
await db.refresh(original)
return original
ルーター
from fastapi import APIRouter, Depends, HTTPException
@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(
task_id: int, task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
task = await task_crud.get_task(db, task_id=task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return await task_crud.update_task(db, task_body, original=task)
Delete
async def delete_task(db: AsyncSession, original: task_model.Task) -> None:
await db.delete(original)
await db.commit()
ルーター
@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db)):
task = await task_crud.get_task(db, task_id=task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return await task_crud.delete_task(db, original=task)
Doneリソース
CRUD
from typing import Tuple, Optional
from sqlalchemy import select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession
import api.models.task as task_model
async def get_done(db: AsyncSession, task_id: int) -> Optional[task_model.Done]:
result: Result = await db.execute(
select(task_model.Done).filter(task_model.Done.id == task_id)
)
done: Optional[Tuple[task_model.Done]] = result.first()
return done[0] if done is not None else None # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す
async def create_done(db: AsyncSession, task_id: int) -> task_model.Done:
done = task_model.Done(id=task_id)
db.add(done)
await db.commit()
await db.refresh(done)
return done
async def delete_done(db: AsyncSession, original: task_model.Done) -> None:
await db.delete(original)
await db.commit()
ルータ
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
import api.schemas.done as done_schema
import api.cruds.done as done_crud
from api.db import get_db
router = APIRouter()
@router.put("/tasks/{task_id}/done", response_model=done_schema.DoneResponse)
async def mark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
done = await done_crud.get_done(db, task_id=task_id)
if done is not None:
raise HTTPException(status_code=400, detail="Done already exists")
return await done_crud.create_done(db, task_id)
@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
done = await done_crud.get_done(db, task_id=task_id)
if done is None:
raise HTTPException(status_code=404, detail="Done not found")
return await done_crud.delete_done(db, original=done)
schema
from pydantic import BaseModel
class DoneResponse(BaseModel):
id: int
class Config:
orm_mode = True
ユニットテスト
テスト関連ライブラリのインストール
- Pytestを使う
- Pytestを非同期用に拡張する、 pytest-asyncio をインストール
- SQLiteのオンメモリモードを使用するためSQLiteの非同期クライアントとして aiosqlite をインストール
- 非同期HTTPクライアントの httpx をインストール
docker-compose exec demo-app poetry add -D pytest-asyncio aiosqlite httpx
-
-D
はpoetryの「開発用モード」を指定するオプション
(project root)
├── Dockerfile
├── docker-compose.yaml
├── poetry.lock
├── pyproject.toml
├── api
│ ├── __init__.py
│ ├── db.py
│ ├── main.py
│ ├── migrate_db.py
│ ├── cruds
│ ├── models
│ ├── routers
│ └── schemas
└── tests
├── __init__.py
└── test_main.py
- Async用のengineとsessionを作成
- テスト用にオンメモリのSQLiteテーブルを初期化(関数ごとにリセット)
- DIを使ってFastAPIのDBの向き先をテスト用DBに変更
- テスト用に非同期HTTPクライアントを返却
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from api.db import get_db, Base
from api.main import app
ASYNC_DB_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture
async def async_client() -> AsyncClient:
# Async用のengineとsessionを作成
async_engine = create_async_engine(ASYNC_DB_URL, echo=True)
async_session = sessionmaker(
autocommit=False, autoflush=False, bind=async_engine, class_=AsyncSession
)
# テスト用にオンメモリのSQLiteテーブルを初期化(関数ごとにリセット)
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
# DIを使ってFastAPIのDBの向き先をテスト用DBに変更
async def get_test_db():
async with async_session() as session:
yield session
### オーバーライドget_db の代わりに get_test_db を使う
app.dependency_overrides[get_db] = get_test_db
# テスト用に非同期HTTPクライアントを返却
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
テストを書く
最初に POST コールによってTODOタスクを作成し、2つ目の GET コールによって作成したTODOタスクを確認
import starlette.status
@pytest.mark.asyncio
async def test_create_and_read(async_client):
response = await async_client.post("/tasks", json={"title": "テストタスク"})
assert response.status_code == starlette.status.HTTP_200_OK
response_obj = response.json()
assert response_obj["title"] == "テストタスク"
response = await async_client.get("/tasks")
assert response.status_code == starlette.status.HTTP_200_OK
response_obj = response.json()
assert len(response_obj) == 1
assert response_obj[0]["title"] == "テストタスク"
assert response_obj[0]["done"] is False
完了フラグを使ったテストを書く
@pytest.mark.asyncio
async def test_done_flag(async_client):
response = await async_client.post("/tasks", json={"title": "テストタスク2"})
assert response.status_code == starlette.status.HTTP_200_OK
response_obj = response.json()
assert response_obj["title"] == "テストタスク2"
# 完了フラグを立てる
response = await async_client.put("/tasks/1/done")
assert response.status_code == starlette.status.HTTP_200_OK
# 既に完了フラグが立っているので400を返却
response = await async_client.put("/tasks/1/done")
assert response.status_code == starlette.status.HTTP_400_BAD_REQUEST
# 完了フラグを外す
response = await async_client.delete("/tasks/1/done")
assert response.status_code == starlette.status.HTTP_200_OK
# 既に完了フラグが外れているので404を返却
response = await async_client.delete("/tasks/1/done")
assert response.status_code == starlette.status.HTTP_404_NOT_FOUND
テストを実行する
docker-compose run --entrypoint "poetry run pytest" demo-app