Zenn
🌟

fastapiでaiomysqlを利用した非同期のapiを作ってみた

2025/02/28に公開
1

fastapiで非同期処理のapiについて学習したのでまとめていきます
例としてタスク管理アプリ(todoアプリ)のapiを作って進めていこうと思います
以下の本で学習した内容を参考に作っています
https://gihyo.jp/book/2024/978-4-297-14447-0

ディレクトリ・ファイル構造

ディレクトリ、ファイルは以下の構造とします

│  .env
│  .gitignore
│  docker-compose.yml
│  Dockerfile
│  README.md
│  requirement.txt
│
└─api
    │  db.py
    │  init_database.py
    │  main.py
    │
    ├─cruds
    │  │  task.py
    │  │  __init__.py
    |
    ├─models
    │  │  task.py
    │  │  __init__.py
    │
    ├─routers
    │  │  task.py
    │  │  __init__.py
    │
    ├─schemas
    │  │  task.py
    │  │  __init__.py

  • ディレクトリの役割について簡単な説明

schemasはapiの定義(どのようなデータを扱うか)を表します。また、BaseMdelを継承することで定義したデータのバリデーションチェックを行います
modelsはテーブルの設計を行います
routersはapiのエンドポイントを示したファイルです。APIRouter()を用いることでアプリケーションのルーティングを複数のファイルに分割して管理できます
crudsはdbにアクセスし操作します

環境構築

まずはdockerfile

Dockerfile
FROM python:3.12

WORKDIR /task

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

COPY ./requirement.txt /task/requirement.txt
#キャッシュを無効
RUN pip install --no-cache-dir -r requirement.txt

COPY ./api /task

CMD ["uvicorn","main:app","--host","0.0.0.0","--port","8000","--reload"]

必要パッケージのファイル(requirement.txt)は以下とします

fastapi
sqlalchemy
pydantic
uvicorn
aiomysql
httpx
cryptography

次にdocker-compose.ymlの設定。

docker-compose.yml
version: '3.9'
services:
  back:
    build:
      context: .
      dockerfile: ./Dockerfile
    container_name: fastapi_task
    ports:
      - "8000:8000"
    volumes:
      - ./api:/task
    depends_on:
      - db
    env_file:
      - .env
  db:
    image: mysql:8.0
    container_name: fastapi_db
    environment:
      MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
      MYSQL_DATABASE: ${MYSQL_DATABASE}
      MYSQL_USER: ${MYSQL_USER}
      MYSQL_PASSWORD: ${MYSQL_PASSWORD}
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql

volumes:
  mysql_data:

環境変数については以下のようにしてください

env
MYSQL_ROOT_PASSWORD = password
MYSQL_DATABASE = db
MYSQL_USER = user
MYSQL_PASSWORD = password
DATABASE_URL=mysql+aiomysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE}

DATABASE_URLはdbファイルを作成するときに使用します書き方はこんな感じ
データベースの種類+オプション://ユーザー名:パスワード@サービス名:ポート番号/データベース名

モデル・スキーマの作成

apiについて考えます。今回は簡単に5つのエンドポイントに分けて作っていこうと思います

url method 操作 リクエストボディ レスポンスボディ
tasks post 新しくタスクを追加する タスクのタイトルとコンテンツ 処理のメッセージ
tasks get 追加したタスクの一覧表示 なし タスクのid、タイトル、コンテンツをリスト形式
tasks/{task_id} get 特定のタスクを表示 特定のタスクのid タスクのid、タイトル、コンテンツ
tasks/{task_id} put 特定のタスクの書き換え 特定のタスクのid 処理のメッセージ
tasks/{task_id} delete 特定のタスクを削除 特定のタスクのid 処理のメッセージ

上記の受け取るデータとレスポンスするデータ想定してスキーマを作っていきます
中身は以下の通り

schemas/task.py
from pydantic import BaseModel,Field

#タスクを追加するときのスキーマ
class CreateAndUpdateTaskSchema(BaseModel):
    #タスクのタイトル
    title:str = Field(...,
                    description="タスクのタイトル.必須項目、一文字以上最大二十文字まで",
                    example="筋トレ",
                    min_length=1,
                    max_length=20)
    #タスクの内容
    content:str = Field(default='',
                        description='タスクの詳細。任意の項目',
                        example="腹筋トレーニング")

#タスクの情報を渡すときのスキーマ
class TaskSchema(CreateAndUpdateTaskSchema):
    #タスクのid
    id:int = Field(...,
                description="タスクのid。dbから自動でゲットする",
                example="1")

#タスクの追加や更新、削除ができた際にレスポンスするスキーマ
class ResponseSchema(BaseModel):
    message:str = Field(...,
                        description="処理結果のmessage",
                        example="タスクの追加に成功しました")

BaseModelを継承することでバリデーションチェックが行えるようになります。また、クラスの書き方も変数名:型のように型ヒントだけでデータ定義することができます。
Field()はデフォ値や文字数などデータの詳細を定義する関数です。詳しくは以下の公式ドキュメント
https://docs.pydantic.dev/latest/concepts/fields/

  • モデルファイル。
    タスクの固有のid、タイトル、コンテンツ、作成日、更新日の5つのカラムを作ります
    中身は以下の通り
models/task.py
from sqlalchemy import Column,String,Integer,DateTime
from db import Base
from datetime import datetime

#タスクモデル
class Task(Base):
    #テーブル名
    __tablename__ = "tasks"
    #id
    id = Column("id",Integer,primary_key=True,autoincrement=True)
    #タイトル
    title = Column("title",String(20),nullable=False)
    #内容
    content = Column("content",String(255),nullable=True)
    #作成日
    create_at = Column("create_at",DateTime,default=datetime.now())
    #更新日
    update_at = Column("update_at",DateTime)

二行目のBaseを継承することでpythonのクラスをテーブルとして認識することができます
Column("カラム名")はなくてもokです(なかった場合pythonの変数名が使われる)

DBファイルの作成

次にDBを作成します
dbファイルを作るに至って知識が全くなかったため自分なりに解釈してみました

エンジン・・・dbとの接続。
セッション・・・実際にdbに操作を加えるときにdbとの間で確立される接続のこと。ユーザがクエリを実行する際に維持され、commit()やclose()によって終了される
要約・・・dbへの何らかの処理(追加や削除など) => セッションを確立 => エンジン => DB

中身は以下の通り

db.py
import os
from sqlalchemy.ext.asyncio import create_async_engine,AsyncSession
from sqlalchemy.orm import sessionmaker,declarative_base


#ベースを定義
Base = declarative_base()
#データベースの接続先
DATABASE_URL = os.environ.get("DATABASE_URL")
#非同期エンジンの作成
engine = create_async_engine(DATABASE_URL,echo=True)

#非同期セッションの作成
create_session = sessionmaker(
    engine,
    expire_on_commit=False,
    class_=AsyncSession
)

#セッションを呼び出し元に渡す関数
async def get_session():
    async with create_session() as session:
        yield session

create_async_session()で非同期のエンジンを作成することができ、echo=TrueをつけることでSQL文をコンソールで確認することができるのでデバックが容易になります
sessionmaker()でセッションを作成し、class_=AsyncSessionをつけることで非同期のセッションを使用することができます。
expire_on_commitはdbのデータを更新した際にプログラムの情報も自動で更新し、常に最新の情報を扱えるようになります。Trueにするとその分パフォーマンスが落ちてしますため、頻繁にデータの更新がないならFalseにしといたほうがパフォーマンスは上がります。

get_sessionはセッションを開き、呼び出し元(dbにアクセスする関数)に対してセッションを返す関数です。async withにより処理後は自動的にセッションが閉じられます

  • テーブルの初期化

次にtaskテーブルの初期化をします。中身は以下の通り

init_database.py
import os
from sqlalchemy.ext.asyncio import create_async_engine
from models.task import Base
import asyncio

#データベースの接続先
DATABASE_URL = os.environ.get("DATABASE_URL")
#非同期エンジンの作成
engine = create_async_engine(DATABASE_URL,echo=True)

#データベースを初期化する関数
async def init_db():
    async with engine.begin() as conn:
        #既存のテーブルを削除
        await conn.run_sync(Base.metadata.drop_all)
        #テーブルの作成
        await conn.run_sync(Base.metadata.create_all)
    await engine.dispose() 
if __name__ == "__main__":
    asyncio.run(init_db())

Base.metadata.drop_allBase.metadata.create_allで既存のテーブルを削除し、
新しくテーブルを作っています。
上のメソッドは同期処理のため非同期エンジンを使っている場合はrun_sync()メソッドを使用することで非同期的に同期処理をしています。

dockerをビルドした後docker-compose exec back bashで中に入り、
python init_database.pyを実行しときましょう

CRUD操作とルーティング

次にcrudとそのルーティングについてまとめます
crudの中身は以下の通り

cruds/task.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.task import CreateAndUpdateTaskSchema,TaskSchema
from models.task import Task
from datetime import datetime

#タスク追加
async def create_task(db:AsyncSession,task:CreateAndUpdateTaskSchema) -> Task:
    #入力されたタスクを辞書形式で受け取る
    new_task = Task(**task.model_dump())
    #dbに追加してコミット
    db.add(new_task)
    await db.commit()
    await db.refresh(new_task)
    return new_task

#タスク一覧
async def get_tasks(db:AsyncSession) -> list[Task]:
    result = await db.execute(select(Task))
    tasks = result.scalars().all()
    return tasks

#タスク詳細
async def get_task_detail(db:AsyncSession,task_id:int) -> Task | None:
    result = await db.execute(select(Task).where(Task.id == task_id))
    task = result.scalars().first()
    return task

#タスク更新
async def update_task(db:AsyncSession,
                    task_id:int,task:CreateAndUpdateTaskSchema) -> Task | None:
    #タスク詳細の関数を呼び出して取得(修正前)
    fixed_task = await get_task_detail(db=db,task_id=task_id)
    if fixed_task:
        #タイトル、内容、更新時間を変更
        fixed_task.title = task.title
        fixed_task.content = task.content
        fixed_task.update_at = datetime.now()
        await db.commit()
        await db.refresh(fixed_task)
    return fixed_task

#タスク削除
async def delete_task(db:AsyncSession,task_id:int) -> Task | None:
    task = await get_task_detail(db=db,task_id=task_id)
    if task:
        await db.delete(task)
        await db.commit()
    return task
  • タスク追加について
    model_dumpはpydanticで定義したスキーマのデータを辞書形式にするメソッド
    **でアンパックするのでTask(title='筋トレ',content='腹筋10回')のように変換してます
    add()で追加しcommit()で保存した後refresh()でDBから最新情報を取得しnew_taskに反映しています(idなどはcommitした際に割り振られるのでそのため)

  • その他メソッドについて

executeはクエリを実行します。
scalarsはクエリの実行結果をモデルのインスタンスをリストとして取得することができます
firstは最初の一つ、allはリスト全体を受け取ります

  • ルーティング

中身は以下の通り

routers/task.py
from fastapi import APIRouter,HTTPException,Depends
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.task import CreateAndUpdateTaskSchema,TaskSchema,ResponseSchema
import cruds.task as crud_task
from db import get_session

#ルータの作成
router = APIRouter(tags=["Tasks"],prefix="/tasks")

#タスク追加のエンドポイント
@router.post('/',response_model=ResponseSchema)
async def create_task(task:CreateAndUpdateTaskSchema,db:AsyncSession = Depends(get_session)):
    try:
        await crud_task.create_task(db=db,task=task)
        return ResponseSchema(message="タスクの追加に成功しました")
    except Exception as e:
        raise HTTPException(status_code=400,detail="タスクの追加に失敗しました")
    
#タスク一覧のエンドポイント
@router.get('/',response_model=list[TaskSchema])
async def get_tasks(db:AsyncSession = Depends(get_session)):
    tasks = await crud_task.get_tasks(db=db)
    return tasks

#タスク詳細のエンドポイント
@router.get('/{task_id}',response_model=TaskSchema)
async def get_task_detail(task_id:int,db:AsyncSession = Depends(get_session)): 
    task = await crud_task.get_task_detail(db=db,task_id=task_id)
    #タスクがなかった場合
    if not task:
        raise HTTPException(status_code=404,detail="task not found")
    return task

#タスク更新のエンドポイント
@router.put('/{task_id}',response_model=ResponseSchema)
async def update_task(task_id:int,task:CreateAndUpdateTaskSchema,
                    db:AsyncSession = Depends(get_session),):
    fixed_task = await crud_task.update_task(db=db,task=task,task_id=task_id)
    #タスクがなかった時
    if not fixed_task:
        raise HTTPException(status_code=404,detail="task not found")
    return ResponseSchema(message="タスクの更新に成功しました")
    
#タスク削除
@router.delete('/{task_id}',response_model=ResponseSchema)
async def delete_task(task_id:int,db:AsyncSession = Depends(get_session)):
    task = await crud_task.delete_task(db=db,task_id=task_id)
    #タスクが見つからなかった時
    if not task:
        raise HTTPException(status_code=404,detail='task not found')
    return ResponseSchema(message='タスクの削除に成功しました')

APIRouterのtagsはswaggerUI(http://localhost:8000/docs)上で確認できるグループ名です
prefix="/tasks"をつけることですべてのurlが/tasksから始まります
response_modelはレスポンスするデータをスキーマの形に戻し、jsonでレスポンスします
Dependsは依存関係の注入と呼ばれるものでget_session()を呼び出してdbに渡してくれます
これをcrudの引数のdbに渡すのでデータベースの操作ができるようになります

メインファイル

最後にmainファイル。中身は以下の通り

main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import ValidationError
from routers.task import router as task_router

#fastapiインスタンスの作成
app=FastAPI()

#CORS設定
app.add_middleware(
    CORSMiddleware,
    #許可するオリジンの設定
    allow_origins=["http://127.0.0.1:5500"],
    #認証情報を含むリクエストの許可
    allow_credentials=True,
    #許可するメソッドの設定
    allow_methods=["*"],
    #許可するヘッダー
    allow_headers=["*"]
)

#ルータ
app.include_router(task_router)
#バリデーションエラー
@app.exception_handler(ValidationError)
async def validation_exception_handler(exc:ValidationError):
    #validationエラーが発生した時のjsonレスポンス
    return JSONResponse(
        status_code=422,
        content={
            #pydanticが提供するエラーコード
            "detail":exc.errors(),
            #バリデーションエラーが発生した時の入力エラー
            'body':exc.model,\
        }
    )

ここではフロントエンドがapiにアクセスできるようし、バリデーションエラーが発生した時のjsonレスポンスを決めています
include_router()によりroutersのルーターを組み込んでいます

データの流れのまとめ

  1. エンドポイントにアクセスし入力されたデータがBaseModelで定義したスキーマと正しいかチェック
  • getメソッドの場合
  1. crudでデータベースにアクセスしクエリを実行した後、scalars()でクエリの結果をモデル形式で取得する
  2. 2で取得したモデル形式のデータをresponse_modelによりスキーマの形式に戻し(create_atなどの不要な要素は排除)その後json形式に戻してレスポンス

以下レスポンス

  {
    "title": "テスト1",
    "content": "登録テスト",
    "id": 2
  },
  {
    "title": "テスト2",
    "content": "登録テスト",
    "id": 3
  },
  {
    "title": "テスト3",
    "content": "登録テスト",
    "id": 4
  }
  • postメソッドの場合
  1. 受け取ったスキーマ形式のデータをTask(**task.model_dump())でモデル形式に戻してdbに追加
  2. response_modelによりResponseSchemaをjson形式に戻してレスポンス

以下レスポンス

{
"message": "タスクの追加に成功しました"
}

これにて終了です 動作確認は http://localhost:8000/docs でチェック
説明が冗長であったり、間違って解釈していることもあるかもしれませんが読んでいただきありがとうございました!
gitにサンプルコード上げているので確認したい方はどうぞ(.envは除く)

1

Discussion

ログインするとコメントできます