前章では、DB接続の準備と、TODOアプリのためのDBモデルの準備を行いました。本章では、いよいよDBに接続するRead/Writeの処理と、これをAPIに繋げて動作を確認してみましょう。
C: Create
最初はデータが存在しないので、 POST
/tasks
から書いていくことにしましょう。
CRUDs
ルーターはMVCでいうところのコントローラに該当します。RailsなどのMVCフレームワークに慣れている人だとありがちだと思いますが、コントローラはモデルやビューとの接続を行うので肥大化しがちです(Fat Controller)。これを避けるため、DBに対するCRUD操作を行う処理は api/cruds/task.py
に書いていくこととします。
from sqlalchemy.ext.asyncio import AsyncSession
import api.models.task as task_model
import api.schemas.task as task_schema
async def create_task(
db: AsyncSession, task_create: task_schema.TaskCreate
) -> task_model.Task:
task = task_model.Task(**task_create.dict())
db.add(task)
await db.commit()
await db.refresh(task)
return task
やっていることの大まかな流れを箇条書きで書き下してみます。
- 引数としてスキーマ
task_create: task_schema.TaskCreate
を受け取る。 - これをDBモデルである
task_model.Task
に変換する - DBにコミットする
- DB上のデータを元にTaskインスタンス
task
を更新する(この場合、作成したレコードのid
を取得する) - 作成したDBモデルを返却する
これが大まかな流れです。ここで注目したいのは、関数定義が async def
となっていること、それから db.commit()
と db.refresh(task)
に await
が付いていることです。
async def
は関数が非同期処理を行うことができる、 「コルーチン関数」 (以下、コルーチン)であるということを表します。
await
では、非同期処理、ここではDBへの接続(IO処理)が発生するため、「待ち時間が発生するような処理をしますよ」ということを示しています。これによって、pythonはこのコルーチンの処理からいったん離れ、イベントループ内で別のコルーチンの処理を行うことができるようになります。これが非同期・並行処理の肝になります。
ルーター
上記のCRUDs定義を利用するルーターは、以下のように書き直すことができます。
-from fastapi import APIRouter
+from fastapi import APIRouter, Depends
+from sqlalchemy.ext.asyncio import AsyncSession
+import api.cruds.task as task_crud
+from api.db import get_db
-@router.post("/tasks")
-async def create_task():
- pass
+@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
+async def create_task(
+ task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
+):
+ return await task_crud.create_task(db, task_body)
先ほどの create_task()
と同様、ルーターのパスオペレーション関数もコルーチンとして定義されています。ですので、 create_task()
からの返却値も await
を使って返却します。
ここで、 await
の指定を忘れるとどうなるでしょうか?
async def
で定義されるコルーチンは、同期的処理も行うことができると説明しました。そのためPythonは文法エラーとなりません。しかし、 task_crud.create_task(db, task_body)
の応答を待たずにレスポンスを返しますので、以下のようなエラーとなります。
pydantic.error_wrappers.ValidationError: 1 validation error for TaskCreateResponse
response -> id
field required (type=value_error.missing)
DBモデルとレスポンススキーマの変換
リクエストボディの task_schema.TaskCreate
と、レスポンスモデルの task_schema.TaskCreateResponse
については、 9章のレスポンススキーマ で説明したとおり、リクエストに対して id
だけを付与して返却する必要があります。
class TaskBase(BaseModel):
title: Optional[str] = Field(None, example="クリーニングを取りに行く")
class TaskCreate(TaskBase):
pass
class TaskCreateResponse(TaskCreate):
id: int
class Config:
orm_mode = True
ここで、 orm_mode = True
は、このレスポンススキーマ TaskCreateResponse
が、暗黙的にORMを受け取り、レスポンススキーマに変換することを意味します。
その証拠に、 task_crud.create_task(db, task_body)
はDBモデルの task_model.Task
を返却していますが、APIは正しく TaskCreateResponse
に変換しているのがわかります。これは、内部的に TaskCreateResponse
を task_model.Task
の各フィールドを使って初期化することによって実現しています。
DI
ここで、見慣れない db: AsyncSession = Depends(get_db)
にも注目してみましょう。
Depends
は引数に関数を取り、 DI(Dependency Injection、依存性注入) を行う機構です。
DB接続部分にDIを利用することにより、ビジネスロジックとDBが密結合になることを防ぎます。また、DIによってこの db
インスタンスの中身を外部からoverrideすることが可能になるため、例えばテストのときに get_db
と異なるテスト用の接続先に置換するといったことが、プロダクションコードに触れることなく可能になります。
このテストを容易にする仕組みについては、 13章 ユニットテスト で単体テストを作成する際に改めて説明します。
動作確認
ここで、Swagger UIから POST
/tasks
エンドポイントにアクセスしてみましょう。「Execute」 を押す度に、 id
がインクリメントされて結果が返却されることがわかります。
R: Read
これで、 Task
の作成が可能になったので、次に Task
をリストで受け取る Read
エンドポイントを作成しましょう。
ToDoアプリでは Task
に対して Done
モデルが定義されていますが、これらを別々に Read
で取得するのは面倒です。これらをjoinして、ToDoタスクにDoneフラグが付加された状態のリストを取得できるエンドポイントとしましょう。
CRUDs
joinを行うため、CRUDs定義は少し複雑になります。
from typing import List, Tuple
from sqlalchemy import select
from sqlalchemy.engine import Result
async def get_tasks_with_done(db: AsyncSession) -> List[Tuple[int, str, bool]]:
result: Result = await (
db.execute(
select(
task_model.Task.id,
task_model.Task.title,
task_model.Done.id.isnot(None).label("done"),
).outerjoin(task_model.Done)
)
)
return result.all()
まず、 get_tasks_with_done()
も create_task()
と同様コルーチンなので、 async def
で定義され、 await
を使って Result
を取得します。
実は、この Result
インスタンスはこの時点ではまだすべてのDBリクエストの結果を持ちません。DBレコードを処理する際にforループなどで効率的に結果を取得するためにイテレータとして定義されています。今回はループで処理するような重い処理はありませんので、コルーチンの返却値として result.all()
コールによって、初めてすべてのDBレコードを取得します。
select()
で必要なフィールドを指定し、 .outerjoin()
によってメインのDBモデルに対してjoinしたいモデルを指定しています。
また、 dones
テーブルは tasks
テーブルと同じIDを持ち、ToDoタスクが完了しているときだけレコードが存在していると説明しました。
task_model.Done.id.isnot(None).label("done")
によって、 Done.id
が存在するときは done=True
とし、存在しないときは done=False
としてjoinしたレコードを返却します。
ルーター
上記のCRUDs定義を利用するルーターは、 Create
のものとほぼ同等です。
@router.get("/tasks", response_model=List[task_schema.Task])
async def list_tasks(db: AsyncSession = Depends(get_db)):
return await task_crud.get_tasks_with_done(db)
動作確認
Create
を叩いた回数だけTODOタスクが作成されており、すべてがリストで返却されます。
また、純粋な tasks
テーブルの内容だけではなく、各TODOタスクについて完了フラグ done
が付加されているのがわかります。まだ done
リソースに関するエンドポイントを定義していませんので、現時点ではこれらはすべて false
です。
U: Update
Update
も Create
とほぼ同等ですが、最初にリクエストしているのが存在している Task
に対してなのかをチェックし、存在した場合は更新、存在しない場合は404エラーを返却するAPIにします。
CRUDs
以下の2つの関数を定義します。
from typing import List, Tuple, Optional
async def get_task(db: AsyncSession, task_id: int) -> Optional[task_model.Task]:
result: Result = await db.execute(
select(task_model.Task).filter(task_model.Task.id == task_id)
)
task: Optional[Tuple[task_model.Task]] = result.first()
return task[0] if task is not None else None # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す
async def update_task(
db: AsyncSession, task_create: task_schema.TaskCreate, original: task_model.Task
) -> task_model.Task:
original.title = task_create.title
db.add(original)
await db.commit()
await db.refresh(original)
return original
get_task()
では、 .filter()
を使って SELECT ~ WHERE として対象を絞り込んでいます。
また、Result
は select()
で指定する要素が1つであってもtupleで返却されますので、tupleではなく値として取り出すためにはflatten処理が必要になります。そのため少し複雑な書き方になっていることに注意してください。
update_task()
は create_task()
とほとんど見た目が同じです。 original
としてDBモデルを受け取り、これの中身を更新して返却しているのが唯一の差分です。
ルーター
上記のCRUDs定義を利用するルーターは以下のとおりです。
from fastapi import APIRouter, Depends, HTTPException
@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(
task_id: int, task_body: task_schema.TaskCreate, db: AsyncSession = Depends(get_db)
):
task = await task_crud.get_task(db, task_id=task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return await task_crud.update_task(db, task_body, original=task)
ここで、 HTTPException
は任意のHTTPステータスコードを引数に取ることができる Exception
クラスです。今回は 404 Not Found
を指定して raise
します。
動作確認
task_id=1
のタイトルを変更してみましょう。
Read
インターフェイスから、変更後の結果が取得できているのが確認できます。
D: Delete
CRUDs
Delete
のインターフェイスも Update
とほぼ同等です。まず get_task
を実行してから、 delete_task
を実行します。
async def delete_task(db: AsyncSession, original: task_model.Task) -> None:
await db.delete(original)
await db.commit()
ルーター
上記のCRUDs定義を利用するルーターは以下のとおりです。
@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db)):
task = await task_crud.get_task(db, task_id=task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return await task_crud.delete_task(db, original=task)
動作確認
task_id=2
を削除してみましょう。
もう一度実行すると、既に削除が完了しているので 404
エラーが返ります。
Read
インターフェイスからも削除が完了していることが確認できます。
Doneリソース
Taskリソースと同様、Doneリソースも定義していきましょう。
CRUDsとルーターを同時に見ていきます。
from typing import Tuple, Optional
from sqlalchemy import select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession
import api.models.task as task_model
async def get_done(db: AsyncSession, task_id: int) -> Optional[task_model.Done]:
result: Result = await db.execute(
select(task_model.Done).filter(task_model.Done.id == task_id)
)
done: Optional[Tuple[task_model.Done]] = result.first()
return done[0] if done is not None else None # 要素が一つであってもtupleで返却されるので1つ目の要素を取り出す
async def create_done(db: AsyncSession, task_id: int) -> task_model.Done:
done = task_model.Done(id=task_id)
db.add(done)
await db.commit()
await db.refresh(done)
return done
async def delete_done(db: AsyncSession, original: task_model.Done) -> None:
await db.delete(original)
await db.commit()
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
import api.schemas.done as done_schema
import api.cruds.done as done_crud
from api.db import get_db
router = APIRouter()
@router.put("/tasks/{task_id}/done", response_model=done_schema.DoneResponse)
async def mark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
done = await done_crud.get_done(db, task_id=task_id)
if done is not None:
raise HTTPException(status_code=400, detail="Done already exists")
return await done_crud.create_done(db, task_id)
@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int, db: AsyncSession = Depends(get_db)):
done = await done_crud.get_done(db, task_id=task_id)
if done is None:
raise HTTPException(status_code=404, detail="Done not found")
return await done_crud.delete_done(db, original=done)
レスポンススキーマが必要なので、 api/schemas/done.py
も同時に作成します。
from pydantic import BaseModel
class DoneResponse(BaseModel):
id: int
class Config:
orm_mode = True
条件に応じて、以下のような挙動になることに注意してください。
- 完了フラグが立っていないとき
- PUT: 完了フラグが立つ
- DELETE: フラグがないので
404
エラーを返す
- 完了フラグが立っているとき
- PUT: 既にフラグが経っているので
400
エラーを返す - DELETE: 完了フラグを消す
- PUT: 既にフラグが経っているので
動作確認
Doneリソースの Create
を行うことで、Taskリソースの Read
インターフェイスで、 done
フラグが操作できることが確認できます。
最終的なディレクトリ構成
おめでとうございます!これでToDoアプリが動くのに必要なファイルはすべて定義できました 🎉
最終的には以下のようなファイル構成になっているはずです。
api
├── __init__.py
├── db.py
├── main.py
├── migrate_db.py
├── cruds
│ ├── __init__.py
│ ├── done.py
│ └── task.py
├── models
│ ├── __init__.py
│ └── task.py
├── routers
│ ├── __init__.py
│ ├── done.py
│ └── task.py
└── schemas
├── __init__.py
├── done.py
└── task.py
ここまでで、すべての挙動がSwagger UIから確認できます。
Swagger UIで確認せずとも変更時のバグ発見を早期にできるように、次章では、ユニットテストを書いていきます。