Chapter 12

DB操作(CRUDs)

smithonisan
smithonisan
2021.08.08に更新

前章では、DB接続の準備と、TODOアプリのためのDBモデルの準備を行いました。本章では、いよいよDBに接続するRead/Writeの処理と、これをAPIに繋げて動作を確認してみましょう。

C: Create

最初はデータが存在しないので、 POST /tasks から書いていくことにしましょう。

CRUDs

ルーターはMVCでいうところのコントローラに該当します。RailsなどのMVCフレームワークに慣れている人だとありがちだと思いますが、コントローラはモデルやビューとの接続を行うので肥大化しがちです(Fat Controller)。これを避けるため、DBに対するCRUD操作を行う処理は api/cruds/task.py に書いていくこととします。

api/cruds/task.py
from sqlalchemy.ext.asyncio import AsyncSession

import api.models.task as task_model
import api.schemas.task as task_schema


async def create_task(
    db: AsyncSession, task_create: task_schema.TaskCreate
) -> task_model.Task:
    task = task_model.Task(**task_create.dict())
    db.add(task)
    await db.commit()
    await db.refresh(task)
    return task

やっていることの大まかな流れを箇条書きで書き下してみます。

  1. 引数としてスキーマ task_create: task_schema.TaskCreate を受け取る。
  2. これをDBモデルである task_model.Task に変換する
  3. DBにコミットする
  4. DB上のデータを元にTaskインスタンス task を更新する(この場合、作成したレコードの id を取得する)
  5. 作成したDBモデルを返却する

これが大まかな流れです。ここで注目したいのは、関数定義が async def となっていること、それから db.commit()db.refresh(task)await が付いていることです。

async def は関数が非同期処理を行うことができる、 「コルーチン」 であるということを表します。
await では、非同期処理、ここではDBへの接続(IO処理)が発生するため、「待ち時間が発生するような処理をしますよ」ということを示しています。これによって、pythonはこのコルーチンの処理からいったん離れ、イベントループ内で別のコルーチンの処理を行うことができるようになります。これが非同期・並行処理の肝になります。

コルーチンとは
 
コルーチンはサブルーチン(関数)の一般形です。 def に対して async def なのでむしろ特殊形なのでは?と思うかもしれません。通常の関数は必ず同期処理しかできないのに対し、コルーチンでは同期処理も非同期処理も行うことができるため、一般形なのです。

ルーター

上記のCRUDs定義を利用するルーターは、以下のように書き直すことができます。

api/routers/task.py
+from fastapi import APIRouter
-from fastapi import APIRouter, Depends
 
+import api.cruds.task as task_crud
+from api.db import get_db
api/routers/task.py
-@router.post("/tasks")
-async def create_task():
-    pass
+@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
+async def create_task(
+    task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
+):
+    return await task_crud.create_task(db, task_body)

先ほどの create_task() と同様、ルーターのパスオペレーション関数もコルーチンとして定義されています。ですので、 create_task() からの返却値も await を使って返却します。

ここで、 await の指定を忘れるとどうなるでしょうか?

async def で定義されるコルーチンは、同期的処理も行うことができると説明しました。そのためPythonは文法エラーとなりません。しかし、 task_crud.create_task(db, task_body) の応答を待たずにレスポンスを返しますので、以下のようなエラーとなります。

pydantic.error_wrappers.ValidationError: 1 validation error for TaskCreateResponse
response -> id
  field required (type=value_error.missing)

DBモデルとレスポンススキーマの変換

リクエストボディの task_schema.TaskCreate と、レスポンスモデルの task_schema.TaskCreateResponse については、 9章のレスポンススキーマ で説明したとおり、リクエストに対して id だけを付与して返却する必要があります。

api/schemas/task.py
class TaskBase(BaseModel):
    title: Optional[str] = Field(None, example="クリーニングを取りに行く")


class TaskCreate(TaskBase):
    pass


class TaskCreateResponse(TaskCreate):
    id: int

    class Config:
        orm_mode = True

ここで、 orm_mode = True は、このレスポンススキーマ TaskCreateResponse が、暗黙的にORMを受け取り、レスポンススキーマに変換することを意味します。

その証拠に、 task_crud.create_task(db, task_body) はDBモデルの task_model.Task を返却していますが、APIは正しく TaskCreateResponse に変換しているのがわかります。これは、内部的に TaskCreateResponsetask_model.Task の各フィールドを使って初期化することによって実現しています。

DI

ここで、見慣れない db: AsyncSession = Depends(get_db) にも注目してみましょう。

Depends は引数に関数またはコルーチンを取り、 DI(Dependency Injection、依存性注入) を行う機構です。

DB接続部分にDIを利用することにより、ビジネスロジックとDBが密結合になることを防ぎます。また、DIによってこの db インスタンスの中身を外部からoverrideすることが可能になるため、例えばテストのときに get_db と異なるテスト用の接続先に置換するといったことが、プロダクションコードに触れることなく可能になります。

このテストを容易にする仕組みについては、 13章 ユニットテスト で単体テストを作成する際に改めて説明します。

動作確認

ここで、Swagger UIから POST /tasks エンドポイントにアクセスしてみましょう。「Execute」 を押す度に、 id がインクリメントされて結果が返却されることがわかります。

R: Read

これで、 Task の作成が可能になったので、次に Task をリストで受け取る Read エンドポイントを作成しましょう。
ToDoアプリでは Task に対して Done モデルが定義されていますが、これらを別々に Read で取得するのは面倒です。これらをjoinして、ToDoタスクにDoneフラグが付加された状態のリストを取得できるエンドポイントとしましょう。

CRUDs

joinを行うため、CRUDs定義は少し複雑になります。

api/cruds/task.py
from typing import List, Tuple

from sqlalchemy import select
from sqlalchemy.engine import Result


async def get_tasks_with_done(db: AsyncSession) -> List[Tuple[int, str, bool]]:
    result: Result = await (
        db.execute(
            select(
                task_model.Task.id,
                task_model.Task.title,
                task_model.Done.id.isnot(None).label("done"),
            ).outerjoin(task_model.Done)
        )
    )
    return result.all()

まず、 get_tasks_with_done()create_task() と同様コルーチンなので、 async def で定義され、 await を使って Result を取得します。

実は、この Result インスタンスはこの時点ではまだすべてのDBリクエストの結果を持ちません。DBレコードを処理する際にforループなどで効率的に結果を取得するためにイテレータとして定義されています。今回はループで処理するような重い処理はありませんので、コルーチンの返却値として result.all() コールによって、初めてすべてのDBレコードを取得します。

select() で必要なフィールドを指定し、 .outerjoin() によってメインのDBモデルに対してjoinしたいモデルを指定しています。

また、 dones テーブルは tasks テーブルと同じIDを持ち、ToDoタスクが完了しているときだけレコードが存在していると説明しました。
task_model.Done.id.isnot(None).label("done") によって、 Done.id が存在するときは done=True とし、存在しないときは done=False としてjoinしたレコードを返却します。

ルーター

上記のCRUDs定義を利用するルーターは、 Create のものとほぼ同等です。

api/routers/task.py
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks(db: AsyncSession = Depends(get_db)):
    return await task_crud.get_tasks_with_done(db)

動作確認

Create を叩いた回数だけTODOタスクが作成されており、すべてがリストで返却されます。

また、純粋な tasks テーブルの内容だけではなく、各TODOタスクについて完了フラグ done が付加されているのがわかります。まだ done リソースに関するエンドポイントを定義していませんので、現時点ではこれらはすべて false です。

U: Update

UpdateCreate とほぼ同等ですが、最初にリクエストしているのが存在している Task に対してなのかをチェックし、存在した場合は更新、存在しない場合は404エラーを返却するAPIにします。

CRUDs

以下の2つのコルーチンを定義します。

api/cruds/task.py
from typing import List, Tuple, Optional


async def get_task(db: AsyncSession, task_id: int) -> Optional[task_model.Task]:
    result: Result = await db.execute(
        select(task_model.Task).filter(task_model.Task.id == task_id)
    )
    task: Optional[Tuple[task_model.Task]] = result.first()
    return task[0] if task is not None else None  # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す


async def update_task(
    db: AsyncSession, task_create: task_schema.TaskCreate, original: task_model.Task
) -> task_model.Task:
    original.title = task_create.title
    db.add(original)
    await db.commit()
    await db.refresh(original)
    return original

get_task() では、 .filter() を使って SELECT ~ WHERE として対象を絞り込んでいます。
また、Resultselect() で指定する要素が1つであってもtupleで返却されますので、tupleではなく値として取り出すためにはflatten処理が必要になります。そのため少し複雑な書き方になっていることに注意してください。

update_task()create_task() とほとんど見た目が同じです。 original としてDBモデルを受け取り、これの中身を更新して返却しているのが唯一の差分です。

ルーター

上記のCRUDs定義を利用するルーターは以下のとおりです。

api/routers/task.py
from fastapi import APIRouter, Depends, HTTPException


@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(
    task_id: int, task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
    task = await task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return await task_crud.update_task(db, task_body, original=task)

ここで、 HTTPException は任意のHTTPステータスコードを引数に取ることができる Exception クラスです。今回は 404 Not Found を指定して raise します。

動作確認

task_id=1 のタイトルを変更してみましょう。

Read インターフェイスから、変更後の結果が取得できているのが確認できます。

D: Delete

CRUDs

Delete のインターフェイスも Update とほぼ同等です。まず get_task を実行してから、 delete_task を実行します。

api/cruds/task.py
async def delete_task(db: AsyncSession, original: task_model.Task) -> None:
    await db.delete(original)
    await db.commit()

ルーター

上記のCRUDs定義を利用するルーターは以下のとおりです。

api/routers/task.py
@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db)):
    task = await task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return await task_crud.delete_task(db, original=task)

動作確認

task_id=2 を削除してみましょう。

もう一度実行すると、既に削除が完了しているので 404 エラーが返ります。

Read インターフェイスからも削除が完了していることが確認できます。

Doneリソース

Taskリソースと同様、Doneリソースも定義していきましょう。

CRUDsとルーターを同時に見ていきます。

api/cruds/done.py
from typing import Tuple, Optional

from sqlalchemy import select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession

import api.models.task as task_model


async def get_done(db: AsyncSession, task_id: int) -> Optional[task_model.Done]:
    result: Result = await db.execute(
        select(task_model.Done).filter(task_model.Done.id == task_id)
    )
    done: Optional[Tuple[task_model.Done]] = result.first()
    return done[0] if done is not None else None  # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す


async def create_done(db: AsyncSession, task_id: int) -> task_model.Done:
    done = task_model.Done(id=task_id)
    db.add(done)
    await db.commit()
    await db.refresh(done)
    return done


async def delete_done(db: AsyncSession, original: task_model.Done) -> None:
    await db.delete(original)
    await db.commit()
api/routers/done.py
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession

import api.schemas.done as done_schema
import api.cruds.done as done_crud
from api.db import get_db

router = APIRouter()


@router.put("/tasks/{task_id}/done", response_model=done_schema.DoneResponse)
async def mark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
    done = await done_crud.get_done(db, task_id=task_id)
    if done is not None:
        raise HTTPException(status_code=400, detail="Done already exists")

    return await done_crud.create_done(db, task_id)


@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
    done = await done_crud.get_done(db, task_id=task_id)
    if done is None:
        raise HTTPException(status_code=404, detail="Done not found")

    return await done_crud.delete_done(db, original=done)

レスポンススキーマが必要なので、 api/schemas/done.py も同時に作成します。

api/schemas/done.py
from pydantic import BaseModel


class DoneResponse(BaseModel):
    id: int

    class Config:
        orm_mode = True

条件に応じて、以下のような挙動になることに注意してください。

  • 完了フラグが立っていないとき
    • PUT: 完了フラグが立つ
    • DELETE: フラグがないので 404 エラーを返す
  • 完了フラグが立っているとき
    • PUT: 既にフラグが経っているので 400 エラーを返す
    • DELETE: 完了フラグを消す

動作確認

Doneリソースの Create を行うことで、Taskリソースの Read インターフェイスで、 done フラグが操作できることが確認できます。

最終的なディレクトリ構成

おめでとうございます!これでToDoアプリが動くのに必要なファイルはすべて定義できました 🎉
最終的には以下のようなファイル構成になっているはずです。

api
├── __init__.py
├── db.py
├── main.py
├── migrate_db.py
├── cruds
│   ├── __init__.py
│   ├── done.py
│   └── task.py
├── models
│   ├── __init__.py
│   └── task.py
├── routers
│   ├── __init__.py
│   ├── done.py
│   └── task.py
└── schemas
    ├── __init__.py
    ├── done.py
    └── task.py

ここまでで、すべての挙動がSwagger UIから確認できます。

Swagger UIで確認せずとも変更時のバグ発見を早期にできるように、次章では、ユニットテストを書いていきます。