🌊

【AI_7日目_2回目】FastAPI_1冊目

2024/09/08に公開

こんにちは投資ロウトです。

背景

AI開発ができるためにwebAPIについて理解を深めていきます。
※Tailwind CSSとkeycloakは現場でピンチになれば再開します。

非同期処理

MySQLのクライアントとして、pymsqlがありますが、こちらが非同期処理のasyncioに対応しておらず、並列処理が難しい状態にあったとのことです。aiomysqlを使用すると、非同期処理がDB処理で行えるとのことでした。

下記はコンテナを起動した状態で実施。

# インストール
docker compose exec アプリ名 poetry add aiomysql

非同期処理に合わせて修正していく。

async def・・・コルーチン関数(非同期処理ができる関数)

api/cruds/stock.py

from sqlalchemy.orm import Session
from sqlalchemy import Select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession

import api.models.stock as stock_model
import api.schemas.stock as stock_schema

async def create_stock(db: AsyncSession, stock: stock_schema.StockCreate) -> stock_model.Stock:
    stock = stock_model.Stock(**stock.dict())
    db.add(stock)
    await db.commit()
    await db.refresh(stock)
    return stock

async def get_stocks(db: AsyncSession) -> list[tuple[int, str]]:
    result: Result = await db.execute(Select(stock_model.Stock.id, stock_model.Stock.stock_name).select_from(stock_model.Stock))  
    return result.all()

# 引数で渡されたstockIdに該当するレコードがないか確認し、その結果を返す
async def get_stock(db: AsyncSession, stock_id: int) -> stock_model.Stock | None:
    result: Result = await db.execute(Select(stock_model.Stock).filter(stock_model.Stock.id == stock_id))
    return result.scalars().first()

# 引数で渡されたstock_nameに書き換えて更新する。
async def update_stock(db: AsyncSession, stock_create: stock_schema.StockCreate, original: stock_model.Stock) -> stock_model.Stock:
    original.stock_name = stock_create.stock_name
    db.add(original)
    await db.commit()
    await db.refresh(original)
    return original

# 削除処理
async def delete_stock(db: AsyncSession, original: stock_model.Stock) -> None:
    await db.delete(original)
    await db.commit()

api/routers/stock.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession

import api.cruds.stock as stock_crud
from api.db import get_db

from typing import List

import api.schemas.stock as stock_schema

router = APIRouter()

@router.get("/stocks", response_model=List[stock_schema.Stock])
async def list_stocks(db: AsyncSession = Depends(get_db)):
    return await stock_crud.get_stocks(db)

@router.post("/stocks", response_model=stock_schema.StockCreateResponse)
async def create_stock(stock_body: stock_schema.StockCreate, db: AsyncSession = Depends(get_db)):
    return await stock_crud.create_stock(db, stock_body)

@router.put("/stocks/{stock_id}", response_model=stock_schema.StockCreateResponse)
async def update_stock(stock_id: int, stock_body: stock_schema.StockCreate, db: AsyncSession = Depends(get_db)):
    stock = await stock_crud.get_stock(db, stock_id)
    if stock is None:
        raise HTTPException(status_code=404, detail="Stock not found")
    return await stock_crud.update_stock(db, stock_body, stock)

@router.delete("/stocks/{stock_id}", response_model=None)
async def delete_stock(stock_id: int, db: AsyncSession = Depends(get_db)):
    stock = await stock_crud.get_stock(db, stock_id)
    if stock is None:
        raise HTTPException(status_code=404, detail="Stock not found")
    return await stock_crud.delete_stock(db, stock)

では上記の修正で問題ないか、手動打鍵していきます。

登録処理

取得処理。登録の成功も合わせて確認

更新処理

取得処理で再確認

削除処理

取得処理で再確認

諸々非同期処理を入れても問題なく、動いていることも確認できました。

と短いですが、一旦以上で学習を区切りたいと思います。焦らずコツコツ自分のペースでできればと思います。ご精読ありがとうございました。亀のように直実といきたいと思います。

Discussion