🌊
【AI_7日目_2回目】FastAPI_1冊目
こんにちは投資ロウトです。
背景
AI開発ができるためにwebAPIについて理解を深めていきます。
※Tailwind CSSとkeycloakは現場でピンチになれば再開します。
非同期処理
MySQLのクライアントとして、pymsqlがありますが、こちらが非同期処理のasyncioに対応しておらず、並列処理が難しい状態にあったとのことです。aiomysqlを使用すると、非同期処理がDB処理で行えるとのことでした。
下記はコンテナを起動した状態で実施。
# インストール
docker compose exec アプリ名 poetry add aiomysql
非同期処理に合わせて修正していく。
async def・・・コルーチン関数(非同期処理ができる関数)
api/cruds/stock.py
from sqlalchemy.orm import Session
from sqlalchemy import Select
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession
import api.models.stock as stock_model
import api.schemas.stock as stock_schema
async def create_stock(db: AsyncSession, stock: stock_schema.StockCreate) -> stock_model.Stock:
stock = stock_model.Stock(**stock.dict())
db.add(stock)
await db.commit()
await db.refresh(stock)
return stock
async def get_stocks(db: AsyncSession) -> list[tuple[int, str]]:
result: Result = await db.execute(Select(stock_model.Stock.id, stock_model.Stock.stock_name).select_from(stock_model.Stock))
return result.all()
# 引数で渡されたstockIdに該当するレコードがないか確認し、その結果を返す
async def get_stock(db: AsyncSession, stock_id: int) -> stock_model.Stock | None:
result: Result = await db.execute(Select(stock_model.Stock).filter(stock_model.Stock.id == stock_id))
return result.scalars().first()
# 引数で渡されたstock_nameに書き換えて更新する。
async def update_stock(db: AsyncSession, stock_create: stock_schema.StockCreate, original: stock_model.Stock) -> stock_model.Stock:
original.stock_name = stock_create.stock_name
db.add(original)
await db.commit()
await db.refresh(original)
return original
# 削除処理
async def delete_stock(db: AsyncSession, original: stock_model.Stock) -> None:
await db.delete(original)
await db.commit()
api/routers/stock.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
import api.cruds.stock as stock_crud
from api.db import get_db
from typing import List
import api.schemas.stock as stock_schema
router = APIRouter()
@router.get("/stocks", response_model=List[stock_schema.Stock])
async def list_stocks(db: AsyncSession = Depends(get_db)):
return await stock_crud.get_stocks(db)
@router.post("/stocks", response_model=stock_schema.StockCreateResponse)
async def create_stock(stock_body: stock_schema.StockCreate, db: AsyncSession = Depends(get_db)):
return await stock_crud.create_stock(db, stock_body)
@router.put("/stocks/{stock_id}", response_model=stock_schema.StockCreateResponse)
async def update_stock(stock_id: int, stock_body: stock_schema.StockCreate, db: AsyncSession = Depends(get_db)):
stock = await stock_crud.get_stock(db, stock_id)
if stock is None:
raise HTTPException(status_code=404, detail="Stock not found")
return await stock_crud.update_stock(db, stock_body, stock)
@router.delete("/stocks/{stock_id}", response_model=None)
async def delete_stock(stock_id: int, db: AsyncSession = Depends(get_db)):
stock = await stock_crud.get_stock(db, stock_id)
if stock is None:
raise HTTPException(status_code=404, detail="Stock not found")
return await stock_crud.delete_stock(db, stock)
では上記の修正で問題ないか、手動打鍵していきます。
登録処理
取得処理。登録の成功も合わせて確認
更新処理
取得処理で再確認
削除処理
取得処理で再確認
諸々非同期処理を入れても問題なく、動いていることも確認できました。
と短いですが、一旦以上で学習を区切りたいと思います。焦らずコツコツ自分のペースでできればと思います。ご精読ありがとうございました。亀のように直実といきたいと思います。
Discussion