🙄

FastAPIとSQLModelを用いた非同期セッション管理

2024/07/25に公開

このガイドでは、FastAPIとSQLModelを用いて非同期セッションを管理する方法について解説します。async_generatorasynccontextmanagerを利用したセッション管理の実装方法について詳しく見ていきます。

非同期ジェネレータを用いたセッション管理

非同期セッションを管理するために、まずは非同期ジェネレータを使用してセッションを管理する関数を定義します。

database.py
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession

from src.util.config.config_util import get_current_env_config

config = get_current_env_config()

DB_URL = "mysql+aiomysql://%s:%s@%s/%s?charset=utf8" % (
	config.get("MYSQL_USER"),
	config.get("MYSQL_PASSWORD"),
	config.get("MYSQL_HOST"),
	config.get("MYSQL_DATABASE"),
)

engine = create_async_engine(DB_URL, echo=True)


async def get_db_session():
	async with AsyncSession(engine) as session:
		yield session

AsyncSessionと非同期コンテキストマネジャー

SQLModelのAsyncSessionはsqlalchemyの_AsyncSessionを継承しており、非同期コンテキストマネジャーとして実装されています。

class AsyncSession(ReversibleProxy[Session]):
    async def __aenter__(self: _AS) -> _AS:
        return self

    async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
        task = asyncio.create_task(self.close())
        await asyncio.shield(task)

そのため先のようにasync withを用いて非同期コンテキストマネジャーとして扱い__aenter__で自身を返すためyield sessionを用いてAsyncSessionインスタンスを非同期ジェネレータとして扱えるようになります。

# async_generatorを返す関数
async def get_db_session():
	async with AsyncSession(engine) as session:
		yield session

async withを用いることで、AsyncSessionオブジェクトが非同期コンテキスト内で利用可能な状態を維持し、yieldで返されたセッションは__aenter__状態のまま使用されます。

FastAPIでの依存関係としての利用

FastAPIでは、Dependsを用いて依存関係を注入することができます。依存関係が非同期ジェネレータ関数である場合、Dependsはその関数を自動的に非同期コンテキストマネジャーとして扱います。これにより、Dependsが内部的にasynccontextmanagerを利用してコンテキストの管理を行います。
https://fastapi.tiangolo.com/ja/tutorial/dependencies/dependencies-with-yield/

例えば、以下のように設定すると、FastAPIはget_db_session関数を非同期コンテキストマネジャーとして処理します。

from fastapi import FastAPI, Depends

app = FastAPI()

@app.post("/items/")
async def create_item(db: AsyncSession = Depends(get_db_session)):
    await session.exec(statement)

Dependsによる非同期ジェネレータの管理

Dependsが非同期ジェネレータ関数を受け取ると、その関数がasynccontextmanagerでラップされ、非同期コンテキストマネジャーとして機能します。具体的には、以下のような動作が行われます(以下のクラスは概念的なイメージです)。

class AsyncContextManagerWrapper:
    def __init__(self, gen):
        self._gen = gen
        self._session = None
    
    async def __aenter__(self):
        # 非同期ジェネレータから最初の値を取得し、コンテキスト内で使用できるようにする
        self._session = await self._gen.__anext__()
        return self._session
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # コンテキストを抜ける際にジェネレータを適切にクローズ
        if self._gen:
            try:
                await self._gen.aclose()
            except StopAsyncIteration:
                pass

この仕組みにより、get_db_sessionのような非同期ジェネレータ関数がDependsを通じて呼び出されると、その戻り値であるAsyncSessionが自動的にコンテキスト管理されます。具体的には、次のプロセスが行われます

  1. aenter メソッドの呼び出し: コンテキストに入る際に、非同期ジェネレータからyieldされた最初の値(ここではAsyncSession)が取り出されます。
  2. リソースの使用: AsyncSessionを利用してデータベース操作を行います。
  3. aexit メソッドの呼び出し: コンテキストを抜ける際に、acloseを呼び出してジェネレータを適切にクローズし、リソースを解放します。

このプロセスにより、FastAPIが非同期ジェネレータのライフサイクルを管理し、リソースの適切なクリーンアップを保証します。

テストでの利用

テスト環境でDependsを使用しない場合、非同期ジェネレータから非同期コンテキストマネジャーとして明示的に利用する必要があります。asynccontextmanagerを用いて非同期コンテキストマネジャーを生成し、テスト内で利用する方法は以下の通りです。

import pytest
from contextlib import asynccontextmanager

@pytest_asyncio.fixture(scope='class')
async def db_session():
    async with asynccontextmanager(get_db_session)() as session:
        yield session

おまけ

自動コミット、ロールバックも入れるなら以下のような形

async def get_db_session():
	async with AsyncSession(engine, expire_on_commit=False) as session:
		try:
			yield session
			await session.commit()
		except Exception as e:
			await session.rollback()
			raise e

Discussion