✏️

FastAPI統合テストで学ぶSQLAlchemyトランザクションの仕組みと実践的なテストデータ管理

に公開

はじめに

こんにちは、オークンの孫です。
FastAPI + pytest + SQLAlchemyの組み合わせで統合テストを実装しました。今回、その際に遭遇した問題に直面し、その解決過程で学んだことを共有したいと思います。

TL;DR

本記事では、非同期pytestの統合テストにおいて、追加テストデータが全体に影響してしまう問題を扱っています。
前半は挙動確認の検証のため、解決方法は「検証2(解決)」以降からご確認ください。

  1. 同一トランザクションを使うmock方式は、APIテストとして成立しない
  2. contextmanagerを使って一時データをcommit、確実に削除する方法
    • CSV管理 + withによる自動クリーンアップで、テストの独立性/pytestのランダム実行耐性/DB初期化1回の高速実行/全APIの一括テストを実現

システムの背景

今開発しているシステムは、事務側と会員側を分離した、定期購読契約を管理するシステムです。
主な機能として、会員情報の一覧取得および更新、契約対象となる商品の管理、ならびに購読商品の売上管理を行います。

目標

私たちのプロジェクトでは、以下の観点を満たすテストを整備したいと考えていました。

  1. 全APIを網羅する統合テスト
  2. 全体を一括実行できる
  3. 保守しやすいテスト設計

まず、保守しやすい設計の目標を達成するために、テストDBの初期化を@pytest.fixture(scope="session")で1回だけ行い、可能な限り高速にテストが実行されることを目指しました。

しかし、この後すぐ壁にぶつかります。

問題: データの永続化とテストの独立性

遭遇した問題

会員データに依存する複雑なビジネスロジックを持つ売上APIのテストを書いている時に、準備されていた初期データは不足していることに気づきました。

そこで、追加のテストデータを用意しようとしたところ、2つの問題に直面しました。

1. データの永続化問題

会員一覧の取得APIのテストが既に多数完成しており、データを追加すると、他のGET一覧取得APIのテストに影響が出ました。

原因

  • DB初期化はscope="session"で1回のみ実行
  • データを追加するとテストDB全体に永続化される

2. テスト実行の順序問題

「それなら、テストの実行順序を固定して、一覧のget系APIを先にテストをすればいいのではないか?」と考えました。

しかし、pytestはデフォルトでテストをランダムな順序で実行します。また、一般的に順序を固定することでテストの独立性が失われ、保守性も低下します。

検討した2つのアプローチ

この問題を解決するため、2つのアプローチを検討しました:

1. 特定のAPIだけに対してmockデータを挿入する:

commitせずに同じトランザクション内でデータを検索で、データの永続化問題を解決

2. 一時データを確実に挿入と削除:

実際にcommitしてから、テスト実行後確実に削除する

検証1: 特定のAPIだけに対してmockデータを挿入する

アプローチ1: 同一トランザクション内での検証&挙動を理解する

FastAPIの依存性注入をmockして、同じDB sessionを使えば同じトランザクションになるのではないか?と考え、検証してみました。
テストDBで使用する関数は下記になります。

@pytest.fixture(scope="function")
def db_session():
    """
    テスト用のDBセッションを作成する
    """
    DB_URL = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{
        settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
    engine = create_engine(DB_URL)
    SessionLocal = sessionmaker(autocommit=False, bind=engine)
    db: Session = SessionLocal()
    with patch.object(db, 'commit'):
        yield db
        db.rollback()
        
@pytest.fixture(scope="function")
def mock_get_writer_db(db_session):
    def override_get_writer_db():
        yield db_session
    app.dependency_overrides[get_writer_db] = override_get_writer_db
    yield
    del app.dependency_overrides[get_writer_db]

検証のテストは既存テストデータと売上に関するpatchのapiを使用しました。

# 検証1: mock_get_writer_dbあり
def test_with_mock(client, mock_get_writer_db, db_session):
    response = client.patch("/api/sales/10001", json={"price": 1200})
    assert response.status_code == 200

    # db_sessionで確認
    sales = db_session.query(Sales).filter(Sales.id == 10001).first()
    assert sales.price == 1200  # ✅ 通る 更新後の値が見える!

# 検証2: mock_get_writer_dbなし
def test_without_mock(client, db_session):
    response = client.patch("/api/sales/10001", json={"price": 1200})
    assert response.status_code == 200

    # db_sessionで確認
    sales = db_session.query(Sales).filter(Sales.id == 10001).first()
    assert sales.price == 1200  # ❌ 失敗(古い値1300のまま)

検証結果の考察

条件 API実行後のDB状態 db_sessionで見える値 テスト結果
mockあり 1300(未更新) 1200 ✅ 通る
mockなし 1200(更新済) 1300 ❌ 失敗

なぜmockなしだと失敗するのか?

SQLAlchemyのドキュメントを確認したことで理解できました。
上記の例では、db_sessionmock_get_writer_dbが呼び出される際に使用されるmock用のセッションであり、要するに同一のセッションが保持する同一コネクションリソース、つまり同じトランザクションに属しているため、db_sessionから更新後の値を見えます。
一方、mock を使用しない場合は、FastAPI の依存性注入によって新しいコネクションが取得され、新たなトランザクションが開始されます。この場合、commitが行われない限り、その更新内容が他のセッションから見えないのは当然の挙動です。

SQLAlchemyの関連内容

一度クエリが発行されたり、オブジェクトが永続化されたりすると、Session は関連付けられた Engine からコネクションリソースを要求し、そのコネクション上にトランザクションを確立します。commit または rollback を指示するまで該当トラザクションが有効です。トランザクションが終了すると、そのコネクションリソースは返却されます。新しいトランザクションは、新たなコネクションのチェックアウトによって開始されます。

https://docs.sqlalchemy.org/en/20/orm/session_basics.html

アプローチ1の結論

特定の API に対して一時的にモックデータを挿入することは可能ですが、その場合、API テストとしては有効に活用できないものになってしまいますので、採用できませんでした

検証2(解決): 一時データを確実に挿入と削除

アプローチ2: contextmanagerで一時データを管理

設計方針

  1. テストデータをCSVで管理: 可読性が高く、APIごとにデータを分離可能で、管理しやすい
  2. contextmanagerで自動クリーンアップ: with文でデータ挿入→削除を保証

実装

初期データを挿入する部分は下記になります。

from contextlib import contextmanager
import pandas as pd
from sqlalchemy.orm import Session, sessionmaker

@contextmanager
def init_data_db_session():
    """
    初期データ投入用のDBセッションを作成する
    """
    DB_URL = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{
        settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
    engine = create_engine(DB_URL)
    SessionLocal = sessionmaker(bind=engine)
    db: Session = SessionLocal()

    # 外部キー制約を一時的に無効化
    db.execute(text("SET foreign_key_checks=0;"))
    yield db
    db.execute(text("SET foreign_key_checks=1;"))
    db.close()

既存のテストデータを挿入するseederのコードを参照に、一時的なテストデータを投入用の関数を追加しました。下記になります。


@contextmanager
def insert_data_from_csv_temporarily(model, csv_path: str):
    PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__))
    csv_file_abs_path = os.path.join(PROJECT_ROOT, csv_path)
    df = pd.read_csv(csv_file_abs_path)
    # 一時データを追加
    with init_data_db_session() as db:
        df.to_sql(model.__tablename__, db.connection(),
                  if_exists="append", index=False)
        db.commit() # 確実にcommitする
    try:
        yield
    finally:
        # テスト終了後、挿入したデータを削除
        with init_data_db_session() as db:
            for _, row in df.iterrows():
                # 主キーで削除(テーブルごとに対応、ここではidで処理する)
                pk_id = getattr(row, "id", None)
                if pk_id is not None:
                    db.query(model).filter_by(id=str(pk_id)).delete()
                    continue
            db.commit()

使用前提

使用例

パターン1: 単一テーブルの一時データ

def test_get_user_detail(client):
    """ユーザー詳細取得のテスト"""
    with insert_data_from_csv_temporarily(User, "test_data/user.csv"):
        response = client.get("/api/users/10001")
        assert response.status_code == 200
    # with を抜けると自動的にUserデータが削除される

パターン2: 複数テーブルの一時データ(module scope)

複雑なAPIにおいては、まとめてさらに効率で複数のテーブルに一気に挿入することも!

@pytest.fixture(scope="module")
def seed_test_data():
    with insert_data_from_csv_temporarily(User, "test_data/user.csv"), \
            insert_data_from_csv_temporarily(Order, "test_data/order.csv"), \
            insert_data_from_csv_temporarily(Product, "test_data/product.csv"):
        yield

def test_create_order(client, seed_test_data):
    """注文作成のテスト"""
    response = client.post("/api/orders", json={"user_id": 10001, "product_id": 20001})
    assert response.status_code == 200

テスト実行フロー

上記の実装で、パターン2を例として、実行イメージはこちらです。

1. scope="session" fixtures DB初期化(1回のみ)
-  DB初期化、基本マスターデータ投入、TestClient作成
2. 他のテストファイルの実行
3. test_orders.pyに到達
- scope="module" fixtureが起動  contextmanagerのネスト実行
  - User CSV → DB挿入 → commit
  - Order CSV → DB挿入 → commit
  - Product CSV → DB挿入 → commit
  - yield (一時停止)
4. test_create_order 関数実行 APIのテスト開始
- 依存関係の解決:
  - client (session scopeから取得)
  - seed_test_data (準備完了)
  - セットアップ (scope="function"):
    - get_db → 依存性注入オーバーライド
- テスト本体:
 - ★ APIリクエスト実行
 - ★ アサーション
5. 同じモジュール内の他のテスト(あれば)
- module scopeのseed_test_dataは再実行されない
- 一時データは保持されたまま
6. test_orders.py完了時
- module fixtureのティアダウン (逆順):
 - Product データ削除 (finally節) → commit
 - Order データ削除 (finally節) → commit
 - User データ削除 (finally節) → commit
7. 他のテストファイル実行 
 - 一時挿入データの影響がなし
..

参考資料:

https://note.com/yukikkoaimanabi/n/n4bd7bec46e9f

https://docs.pytest.org/en/stable/how-to/fixtures.html#yield-fixtures-recommended

まとめ・学び

  1. SQLAlchemyトランザクション、コネクションの理解をさらに深めたこと
  2. 「なぜこうなるのか?」を検証で実際に検証することで、検証の価値を感じたこと
  3. テスト設計とデータ管理について学べたこと

今回の設計により:

  • 全APIテストが独立して実行できる
  • pytestのランダム実行に耐えられる
  • テストデータの保守が容易
  • 高速なテスト実行(DB初期化1回のみ)

おわりに

最初は「なぜこうなるのか?」と悩み始んだが、実際に自分の手で検証をすることで深く理解することができました。テストDBの使用とcontextmanagerのところに落とし穴があると思いました。今回の機会にその部分をチームに共有できたらと思います。

同じ問題に悩んでいる方の参考になれば幸いです!

O-KUN Tech Blog

Discussion