fastapiでaiomysqlを利用した非同期のapiを作ってみた
fastapiで非同期処理のapiについて学習したのでまとめていきます
例としてタスク管理アプリ(todoアプリ)のapiを作って進めていこうと思います
以下の本で学習した内容を参考に作っています
ディレクトリ・ファイル構造
ディレクトリ、ファイルは以下の構造とします
│ .env
│ .gitignore
│ docker-compose.yml
│ Dockerfile
│ README.md
│ requirement.txt
│
└─api
│ db.py
│ init_database.py
│ main.py
│
├─cruds
│ │ task.py
│ │ __init__.py
|
├─models
│ │ task.py
│ │ __init__.py
│
├─routers
│ │ task.py
│ │ __init__.py
│
├─schemas
│ │ task.py
│ │ __init__.py
- ディレクトリの役割について簡単な説明
schemas
はapiの定義(どのようなデータを扱うか)を表します。また、BaseMdel
を継承することで定義したデータのバリデーションチェックを行います
models
はテーブルの設計を行います
routers
はapiのエンドポイントを示したファイルです。APIRouter()
を用いることでアプリケーションのルーティングを複数のファイルに分割して管理できます
cruds
はdbにアクセスし操作します
環境構築
まずはdockerfile
FROM python:3.12
WORKDIR /task
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY ./requirement.txt /task/requirement.txt
#キャッシュを無効
RUN pip install --no-cache-dir -r requirement.txt
COPY ./api /task
CMD ["uvicorn","main:app","--host","0.0.0.0","--port","8000","--reload"]
必要パッケージのファイル(requirement.txt)は以下とします
fastapi
sqlalchemy
pydantic
uvicorn
aiomysql
httpx
cryptography
次にdocker-compose.ymlの設定。
version: '3.9'
services:
back:
build:
context: .
dockerfile: ./Dockerfile
container_name: fastapi_task
ports:
- "8000:8000"
volumes:
- ./api:/task
depends_on:
- db
env_file:
- .env
db:
image: mysql:8.0
container_name: fastapi_db
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
volumes:
mysql_data:
環境変数については以下のようにしてください
MYSQL_ROOT_PASSWORD = password
MYSQL_DATABASE = db
MYSQL_USER = user
MYSQL_PASSWORD = password
DATABASE_URL=mysql+aiomysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE}
DATABASE_URL
はdbファイルを作成するときに使用します書き方はこんな感じ
データベースの種類+オプション://ユーザー名:パスワード@サービス名:ポート番号/データベース名
モデル・スキーマの作成
apiについて考えます。今回は簡単に5つのエンドポイントに分けて作っていこうと思います
url | method | 操作 | リクエストボディ | レスポンスボディ |
---|---|---|---|---|
tasks | post | 新しくタスクを追加する | タスクのタイトルとコンテンツ | 処理のメッセージ |
tasks | get | 追加したタスクの一覧表示 | なし | タスクのid、タイトル、コンテンツをリスト形式 |
tasks/{task_id} | get | 特定のタスクを表示 | 特定のタスクのid | タスクのid、タイトル、コンテンツ |
tasks/{task_id} | put | 特定のタスクの書き換え | 特定のタスクのid | 処理のメッセージ |
tasks/{task_id} | delete | 特定のタスクを削除 | 特定のタスクのid | 処理のメッセージ |
上記の受け取るデータとレスポンスするデータ想定してスキーマを作っていきます
中身は以下の通り
from pydantic import BaseModel,Field
#タスクを追加するときのスキーマ
class CreateAndUpdateTaskSchema(BaseModel):
#タスクのタイトル
title:str = Field(...,
description="タスクのタイトル.必須項目、一文字以上最大二十文字まで",
example="筋トレ",
min_length=1,
max_length=20)
#タスクの内容
content:str = Field(default='',
description='タスクの詳細。任意の項目',
example="腹筋トレーニング")
#タスクの情報を渡すときのスキーマ
class TaskSchema(CreateAndUpdateTaskSchema):
#タスクのid
id:int = Field(...,
description="タスクのid。dbから自動でゲットする",
example="1")
#タスクの追加や更新、削除ができた際にレスポンスするスキーマ
class ResponseSchema(BaseModel):
message:str = Field(...,
description="処理結果のmessage",
example="タスクの追加に成功しました")
BaseModel
を継承することでバリデーションチェックが行えるようになります。また、クラスの書き方も変数名:型
のように型ヒントだけでデータ定義することができます。
Field()
はデフォ値や文字数などデータの詳細を定義する関数です。詳しくは以下の公式ドキュメント
- モデルファイル。
タスクの固有のid、タイトル、コンテンツ、作成日、更新日の5つのカラムを作ります
中身は以下の通り
from sqlalchemy import Column,String,Integer,DateTime
from db import Base
from datetime import datetime
#タスクモデル
class Task(Base):
#テーブル名
__tablename__ = "tasks"
#id
id = Column("id",Integer,primary_key=True,autoincrement=True)
#タイトル
title = Column("title",String(20),nullable=False)
#内容
content = Column("content",String(255),nullable=True)
#作成日
create_at = Column("create_at",DateTime,default=datetime.now())
#更新日
update_at = Column("update_at",DateTime)
二行目のBase
を継承することでpythonのクラスをテーブルとして認識することができます
Column("カラム名")はなくてもokです(なかった場合pythonの変数名が使われる)
DBファイルの作成
次にDBを作成します
dbファイルを作るに至って知識が全くなかったため自分なりに解釈してみました
エンジン・・・dbとの接続。
セッション・・・実際にdbに操作を加えるときにdbとの間で確立される接続のこと。ユーザがクエリを実行する際に維持され、commit()やclose()によって終了される
要約・・・dbへの何らかの処理(追加や削除など) => セッションを確立 => エンジン => DB
中身は以下の通り
import os
from sqlalchemy.ext.asyncio import create_async_engine,AsyncSession
from sqlalchemy.orm import sessionmaker,declarative_base
#ベースを定義
Base = declarative_base()
#データベースの接続先
DATABASE_URL = os.environ.get("DATABASE_URL")
#非同期エンジンの作成
engine = create_async_engine(DATABASE_URL,echo=True)
#非同期セッションの作成
create_session = sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession
)
#セッションを呼び出し元に渡す関数
async def get_session():
async with create_session() as session:
yield session
create_async_session()
で非同期のエンジンを作成することができ、echo=True
をつけることでSQL文をコンソールで確認することができるのでデバックが容易になります
sessionmaker()
でセッションを作成し、class_=AsyncSession
をつけることで非同期のセッションを使用することができます。
expire_on_commit
はdbのデータを更新した際にプログラムの情報も自動で更新し、常に最新の情報を扱えるようになります。Trueにするとその分パフォーマンスが落ちてしますため、頻繁にデータの更新がないならFalseにしといたほうがパフォーマンスは上がります。
get_session
はセッションを開き、呼び出し元(dbにアクセスする関数)に対してセッションを返す関数です。async with
により処理後は自動的にセッションが閉じられます
- テーブルの初期化
次にtaskテーブルの初期化をします。中身は以下の通り
import os
from sqlalchemy.ext.asyncio import create_async_engine
from models.task import Base
import asyncio
#データベースの接続先
DATABASE_URL = os.environ.get("DATABASE_URL")
#非同期エンジンの作成
engine = create_async_engine(DATABASE_URL,echo=True)
#データベースを初期化する関数
async def init_db():
async with engine.begin() as conn:
#既存のテーブルを削除
await conn.run_sync(Base.metadata.drop_all)
#テーブルの作成
await conn.run_sync(Base.metadata.create_all)
await engine.dispose()
if __name__ == "__main__":
asyncio.run(init_db())
Base.metadata.drop_all
とBase.metadata.create_all
で既存のテーブルを削除し、
新しくテーブルを作っています。
上のメソッドは同期処理のため非同期エンジンを使っている場合はrun_sync()
メソッドを使用することで非同期的に同期処理をしています。
dockerをビルドした後docker-compose exec back bash
で中に入り、
python init_database.py
を実行しときましょう
CRUD操作とルーティング
次にcrudとそのルーティングについてまとめます
crudの中身は以下の通り
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.task import CreateAndUpdateTaskSchema,TaskSchema
from models.task import Task
from datetime import datetime
#タスク追加
async def create_task(db:AsyncSession,task:CreateAndUpdateTaskSchema) -> Task:
#入力されたタスクを辞書形式で受け取る
new_task = Task(**task.model_dump())
#dbに追加してコミット
db.add(new_task)
await db.commit()
await db.refresh(new_task)
return new_task
#タスク一覧
async def get_tasks(db:AsyncSession) -> list[Task]:
result = await db.execute(select(Task))
tasks = result.scalars().all()
return tasks
#タスク詳細
async def get_task_detail(db:AsyncSession,task_id:int) -> Task | None:
result = await db.execute(select(Task).where(Task.id == task_id))
task = result.scalars().first()
return task
#タスク更新
async def update_task(db:AsyncSession,
task_id:int,task:CreateAndUpdateTaskSchema) -> Task | None:
#タスク詳細の関数を呼び出して取得(修正前)
fixed_task = await get_task_detail(db=db,task_id=task_id)
if fixed_task:
#タイトル、内容、更新時間を変更
fixed_task.title = task.title
fixed_task.content = task.content
fixed_task.update_at = datetime.now()
await db.commit()
await db.refresh(fixed_task)
return fixed_task
#タスク削除
async def delete_task(db:AsyncSession,task_id:int) -> Task | None:
task = await get_task_detail(db=db,task_id=task_id)
if task:
await db.delete(task)
await db.commit()
return task
-
タスク追加について
model_dump
はpydanticで定義したスキーマのデータを辞書形式にするメソッド
**
でアンパックするのでTask(title='筋トレ',content='腹筋10回')
のように変換してます
add()
で追加しcommit()
で保存した後refresh()
でDBから最新情報を取得しnew_taskに反映しています(idなどはcommitした際に割り振られるのでそのため) -
その他メソッドについて
execute
はクエリを実行します。
scalars
はクエリの実行結果をモデルのインスタンスをリストとして取得することができます
first
は最初の一つ、all
はリスト全体を受け取ります
- ルーティング
中身は以下の通り
from fastapi import APIRouter,HTTPException,Depends
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.task import CreateAndUpdateTaskSchema,TaskSchema,ResponseSchema
import cruds.task as crud_task
from db import get_session
#ルータの作成
router = APIRouter(tags=["Tasks"],prefix="/tasks")
#タスク追加のエンドポイント
@router.post('/',response_model=ResponseSchema)
async def create_task(task:CreateAndUpdateTaskSchema,db:AsyncSession = Depends(get_session)):
try:
await crud_task.create_task(db=db,task=task)
return ResponseSchema(message="タスクの追加に成功しました")
except Exception as e:
raise HTTPException(status_code=400,detail="タスクの追加に失敗しました")
#タスク一覧のエンドポイント
@router.get('/',response_model=list[TaskSchema])
async def get_tasks(db:AsyncSession = Depends(get_session)):
tasks = await crud_task.get_tasks(db=db)
return tasks
#タスク詳細のエンドポイント
@router.get('/{task_id}',response_model=TaskSchema)
async def get_task_detail(task_id:int,db:AsyncSession = Depends(get_session)):
task = await crud_task.get_task_detail(db=db,task_id=task_id)
#タスクがなかった場合
if not task:
raise HTTPException(status_code=404,detail="task not found")
return task
#タスク更新のエンドポイント
@router.put('/{task_id}',response_model=ResponseSchema)
async def update_task(task_id:int,task:CreateAndUpdateTaskSchema,
db:AsyncSession = Depends(get_session),):
fixed_task = await crud_task.update_task(db=db,task=task,task_id=task_id)
#タスクがなかった時
if not fixed_task:
raise HTTPException(status_code=404,detail="task not found")
return ResponseSchema(message="タスクの更新に成功しました")
#タスク削除
@router.delete('/{task_id}',response_model=ResponseSchema)
async def delete_task(task_id:int,db:AsyncSession = Depends(get_session)):
task = await crud_task.delete_task(db=db,task_id=task_id)
#タスクが見つからなかった時
if not task:
raise HTTPException(status_code=404,detail='task not found')
return ResponseSchema(message='タスクの削除に成功しました')
APIRouterのtags
はswaggerUI(http://localhost:8000/docs
)上で確認できるグループ名です
prefix="/tasks"
をつけることですべてのurlが/tasksから始まります
response_model
はレスポンスするデータをスキーマの形に戻し、jsonでレスポンスします
Depends
は依存関係の注入と呼ばれるものでget_session()
を呼び出してdbに渡してくれます
これをcrudの引数のdbに渡すのでデータベースの操作ができるようになります
メインファイル
最後にmainファイル。中身は以下の通り
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import ValidationError
from routers.task import router as task_router
#fastapiインスタンスの作成
app=FastAPI()
#CORS設定
app.add_middleware(
CORSMiddleware,
#許可するオリジンの設定
allow_origins=["http://127.0.0.1:5500"],
#認証情報を含むリクエストの許可
allow_credentials=True,
#許可するメソッドの設定
allow_methods=["*"],
#許可するヘッダー
allow_headers=["*"]
)
#ルータ
app.include_router(task_router)
#バリデーションエラー
@app.exception_handler(ValidationError)
async def validation_exception_handler(exc:ValidationError):
#validationエラーが発生した時のjsonレスポンス
return JSONResponse(
status_code=422,
content={
#pydanticが提供するエラーコード
"detail":exc.errors(),
#バリデーションエラーが発生した時の入力エラー
'body':exc.model,\
}
)
ここではフロントエンドがapiにアクセスできるようし、バリデーションエラーが発生した時のjsonレスポンスを決めています
include_router()
によりroutersのルーターを組み込んでいます
データの流れのまとめ
- エンドポイントにアクセスし入力されたデータが
BaseModel
で定義したスキーマと正しいかチェック
- getメソッドの場合
- crudでデータベースにアクセスしクエリを実行した後、
scalars()
でクエリの結果をモデル形式で取得する - 2で取得したモデル形式のデータを
response_model
によりスキーマの形式に戻し(create_atなどの不要な要素は排除)その後json形式に戻してレスポンス
以下レスポンス
{
"title": "テスト1",
"content": "登録テスト",
"id": 2
},
{
"title": "テスト2",
"content": "登録テスト",
"id": 3
},
{
"title": "テスト3",
"content": "登録テスト",
"id": 4
}
- postメソッドの場合
- 受け取ったスキーマ形式のデータを
Task(**task.model_dump())
でモデル形式に戻してdbに追加 -
response_model
によりResponseSchemaをjson形式に戻してレスポンス
以下レスポンス
{
"message": "タスクの追加に成功しました"
}
これにて終了です 動作確認は http://localhost:8000/docs でチェック
説明が冗長であったり、間違って解釈していることもあるかもしれませんが読んでいただきありがとうございました!
gitにサンプルコード上げているので確認したい方はどうぞ(.envは除く)
Discussion