FastAPIとSQLModelを用いた非同期セッション管理
このガイドでは、FastAPIとSQLModelを用いて非同期セッションを管理する方法について解説します。async_generator
とasynccontextmanager
を利用したセッション管理の実装方法について詳しく見ていきます。
非同期ジェネレータを用いたセッション管理
非同期セッションを管理するために、まずは非同期ジェネレータを使用してセッションを管理する関数を定義します。
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
from src.util.config.config_util import get_current_env_config
config = get_current_env_config()
DB_URL = "mysql+aiomysql://%s:%s@%s/%s?charset=utf8" % (
config.get("MYSQL_USER"),
config.get("MYSQL_PASSWORD"),
config.get("MYSQL_HOST"),
config.get("MYSQL_DATABASE"),
)
engine = create_async_engine(DB_URL, echo=True)
async def get_db_session():
async with AsyncSession(engine) as session:
yield session
AsyncSessionと非同期コンテキストマネジャー
SQLModelのAsyncSession
はsqlalchemyの_AsyncSession
を継承しており、非同期コンテキストマネジャーとして実装されています。
class AsyncSession(ReversibleProxy[Session]):
async def __aenter__(self: _AS) -> _AS:
return self
async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
task = asyncio.create_task(self.close())
await asyncio.shield(task)
そのため先のようにasync with
を用いて非同期コンテキストマネジャーとして扱い__aenter__
で自身を返すためyield session
を用いてAsyncSession
インスタンスを非同期ジェネレータとして扱えるようになります。
# async_generatorを返す関数
async def get_db_session():
async with AsyncSession(engine) as session:
yield session
async withを用いることで、AsyncSessionオブジェクトが非同期コンテキスト内で利用可能な状態を維持し、yieldで返されたセッションは__aenter__状態のまま使用されます。
FastAPIでの依存関係としての利用
FastAPIでは、Dependsを用いて依存関係を注入することができます。依存関係が非同期ジェネレータ関数である場合、Dependsはその関数を自動的に非同期コンテキストマネジャーとして扱います。これにより、Dependsが内部的にasynccontextmanagerを利用してコンテキストの管理を行います。
例えば、以下のように設定すると、FastAPIはget_db_session関数を非同期コンテキストマネジャーとして処理します。
from fastapi import FastAPI, Depends
app = FastAPI()
@app.post("/items/")
async def create_item(db: AsyncSession = Depends(get_db_session)):
await session.exec(statement)
Dependsによる非同期ジェネレータの管理
Dependsが非同期ジェネレータ関数を受け取ると、その関数がasynccontextmanagerでラップされ、非同期コンテキストマネジャーとして機能します。具体的には、以下のような動作が行われます(以下のクラスは概念的なイメージです)。
class AsyncContextManagerWrapper:
def __init__(self, gen):
self._gen = gen
self._session = None
async def __aenter__(self):
# 非同期ジェネレータから最初の値を取得し、コンテキスト内で使用できるようにする
self._session = await self._gen.__anext__()
return self._session
async def __aexit__(self, exc_type, exc_val, exc_tb):
# コンテキストを抜ける際にジェネレータを適切にクローズ
if self._gen:
try:
await self._gen.aclose()
except StopAsyncIteration:
pass
この仕組みにより、get_db_sessionのような非同期ジェネレータ関数がDependsを通じて呼び出されると、その戻り値であるAsyncSessionが自動的にコンテキスト管理されます。具体的には、次のプロセスが行われます
- aenter メソッドの呼び出し: コンテキストに入る際に、非同期ジェネレータからyieldされた最初の値(ここではAsyncSession)が取り出されます。
- リソースの使用: AsyncSessionを利用してデータベース操作を行います。
- aexit メソッドの呼び出し: コンテキストを抜ける際に、acloseを呼び出してジェネレータを適切にクローズし、リソースを解放します。
このプロセスにより、FastAPIが非同期ジェネレータのライフサイクルを管理し、リソースの適切なクリーンアップを保証します。
テストでの利用
テスト環境でDepends
を使用しない場合、非同期ジェネレータから非同期コンテキストマネジャーとして明示的に利用する必要があります。asynccontextmanager
を用いて非同期コンテキストマネジャーを生成し、テスト内で利用する方法は以下の通りです。
import pytest
from contextlib import asynccontextmanager
@pytest_asyncio.fixture(scope='class')
async def db_session():
async with asynccontextmanager(get_db_session)() as session:
yield session
おまけ
自動コミット、ロールバックも入れるなら以下のような形
async def get_db_session():
async with AsyncSession(engine, expire_on_commit=False) as session:
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise e
Discussion