⚡️

[FastAPI][SQLAlchemy] BackgroundTasks を使う際は Depends せず新規コネクションにしときなさい

2024/03/06に公開

結論

タイトル通りですが、FastAPI の BackgroundTasks を使っている場合は、ContextManager などを活用して、以下のように新規コネクションを確保しましょう

url = f"mysql+pymysql://{config.mysql_user}:{config.mysql_password}@{config.mysql_host}:{config.mysql_port}/{config.mysql_db}?charset=utf8mb4"


engine = create_engine(url, pool_recycle=40, pool_size=10, max_overflow=15)


def create_new_session():
    maker = sessionmaker(
        autocommit=False, autoflush=False, expire_on_commit=False, bind=engine
    )
    session = maker()

    try:
        yield session
    except Exception as e:
        logger.error(e)
        session.rollback()
    finally:
        session.close()

# Depends などで呼び出す方
def get_db() -> scoped_session:
    yield from create_new_session()

# BackgroundTasks などで新規確立するコネクションを呼び出す方
@contextmanager
def get_db_with_context_manager():
    session = next(create_new_session())
    try:
        yield session
    except Exception as e:
        logger.error(e)
        session.rollback()
    finally:
        session.close()

@router.get("/hello")
def hello_route(background_tasks: BackgroundTasks):
	... # 何がしかの事前チェックなど。 `db_session` を使う処理はここでは書いていない

	background_tasks.add_task(super_heavy_task)
	return HelloRouteResponse(result="ok")


def super_heavy_task():
	with get_db_with_context_manager() as db_session:
		db_session.query(User).all() # クエリなど

		return

発端

FastAPI で BackgroundTasks を使っているエンドポイントを利用されだしてから、QueuePool を使い潰している旨のエラーを検知しました。

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/20/3o7r)

db_session は下記のように渡しており、各リポジトリ層でこのコネクションを使っている形です。

url = f"mysql+pymysql://{config.mysql_user}:{config.mysql_password}@{config.mysql_host}:{config.mysql_port}/{config.mysql_db}?charset=utf8mb4"

engine = create_engine(url, pool_recycle=40, pool_size=10, max_overflow=15)


def create_new_session():
    maker = sessionmaker(
        autocommit=False, autoflush=False, expire_on_commit=False, bind=engine
    )
    session = maker()

    try:
        yield session
    except Exception as e:
        session.rollback()
    finally:
        session.close()

# Depends(get_db) でFastAPIから呼び出す
def get_db() -> scoped_session:
    yield from create_new_session()

@router.get("/hello")
def hello_route(background_tasks: BackgroundTasks, db_session: Depends(get_db)):
	... # 何がしかの事前チェックなど。 `db_session` を使う処理はここでは書いていない

	background_tasks.add_task(super_heavy_task, db_session)
	return HelloRouteResponse(result="ok")


def super_heavy_task(db_session: ScopedSession):

	db_session.query(User).all() // クエリなど

	return

try-except-finally でコネクションを渡しており、 session.close()をすれば SQLAlchemy にコネクションプールリソースを返却することで、他のリクエストが新しくコネクションを利用したい際に、確保したリソースプールからコネクションを確保できます。

また、 max_overflow が 15 なので、 同時にコネクションを10件使っている場合は、追加で15コネクションまで新規確立でき、用が済んだらpool_sizeの値までコネクションをクローズします。

その認識でいたのですが、BackgroundTasks を使っているエンドポイントのみ、max_overflow も使い潰し、かつそれらが使われたままになっているのが上記のエラーでした。

気づいた違和感

ローカルでも再現させていると、以下2つの点に気になりました。

BackgroundTasks はリクエスト終了後に実行される

try-except-finally 内のコードがいつ、どのように実行されるかデバッガでみてみたところ、以下のような流れでした

コード(再掲)

url = f"mysql+pymysql://{config.mysql_user}:{config.mysql_password}@{config.mysql_host}:{config.mysql_port}/{config.mysql_db}?charset=utf8mb4"

engine = create_engine(url, pool_recycle=40, pool_size=10, max_overflow=15)


def create_new_session():
    maker = sessionmaker(
        autocommit=False, autoflush=False, expire_on_commit=False, bind=engine
    )
    session = maker()

    try:
        yield session
    except Exception as e:
        session.rollback()
    finally:
        session.close()

# Depends(get_db) でFastAPIから呼び出す
def get_db() -> scoped_session:
    yield from create_new_session()

@router.get("/hello")
def hello_route(background_tasks: BackgroundTasks, db_session: Depends(get_db)):
	... # 何がしかの事前チェックなど。 `db_session` を使う処理はここでは書いていない

	background_tasks.add_task(super_heavy_task, db_session)
	return HelloRouteResponse(result="ok")


def super_heavy_task(db_session: ScopedSession):

	db_session.query(User).all() // クエリなど

	return
  1. Depends(get_db) を実行し、yield にきたタイミングで FastAPIのパスルート内の処理に返す
  2. パスルート内で session を扱う処理はない(BackgroundTasks内にある)ので、background_task.add_task() にコネクションを渡し、レスポンスを返す
  3. finallyブロック内が呼び出され、 session.close()される
  4. レスポンス後、add_task()した super_heavy_task() が実行される
  5. super_heavy_task() 内で DBへのクエリが実行される

BackgroundTasks がリクエストの終了後に呼び出される点については違和感なかったですが、その際、Dependsで呼び出していた処理も終了されてしまうため、session.close() の実行タイミングが予想外でした。

また、QueuePool は消費し尽くされるものの、BackgroundTasks内で実行していた処理は(Pool確保さえできれば)正常に動作していたので、後述の違和感も調査するまで気づけませんでした。

コネクションクローズ後もクエリが実行できてしまう

上記実行フローを調査中に頭の中で仮説として組み立てた際、「なぜクローズしたコネクションでクエリが実行できるんだ?」と疑問が浮かびました。

結論言うと、 session.close() 自体はコネクションを閉じているのではなく、SQLAlchemyが保持するコネクションプールにコネクションリソースを返却しているだけで、物理的には閉じていないこと、そして閉じた(返却した)場合でも、新規コネクションリソースが必要であれば自動で確保するのかなということで理解しました。

そういえば、コネクションプールも yield部分で session を渡していますが、この時点ではリソースを確保しておらず、session.query()... が実行されて初めて確保されるみたいな記事を前に読んだのを思い出しました。

SQLAlchemy では、コネクション・オブジェクトを生成したタイミングではなく、そのコネクション・オブジェクトが最初に query を発行したタイミングでプールからコネクションが払い出されている。

SQLAlchemyのコネクション・プーリング | 達人ドヤリストへの道

確かに、それであれば、finally で クローズ(返却)後も、新規コネクションを払い出して使って、そのまま再クローズ(返却)できていないところにつながりますね。

解決策について

上記について納得はしたものの、ではどうするか?という点については以下ディスカッションがあったので、BackgroundTasks 向けに新規コネクションを作成する方向にしました。

今回は try-except-finally を何度も書かなくて良いように、 get_db_with_context_manager() という関数を用意し、 with でラップするだけで新規コネクションを確立できるようにしています。

url = f"mysql+pymysql://{config.mysql_user}:{config.mysql_password}@{config.mysql_host}:{config.mysql_port}/{config.mysql_db}?charset=utf8mb4"

engine = create_engine(url, pool_recycle=40, pool_size=10, max_overflow=15)


def create_new_session():
    maker = sessionmaker(
        autocommit=False, autoflush=False, expire_on_commit=False, bind=engine
    )
    session = maker()

    try:
        yield session
    except Exception as e:
        logger.error(e)
        session.rollback()
    finally:
        session.close()

# Depends などで呼び出す方
def get_db() -> scoped_session:
    yield from create_new_session()

# BackgroundTasks などで新規確立するコネクションを呼び出す方
@contextmanager
def get_db_with_context_manager():
    session = next(create_new_session())
    try:
        yield session
    except Exception as e:
        logger.error(e)
        session.rollback()
    finally:
        session.close()

@router.get("/hello")
def hello_route(background_tasks: BackgroundTasks):
	... # 何がしかの事前チェックなど。 `db_session` を使う処理はここでは書いていない

	background_tasks.add_task(super_heavy_task)
	return HelloRouteResponse(result="ok")


def super_heavy_task():
	with get_db_with_context_manager() as db_session:
		db_session.query(User).all() # クエリなど

		return

また、サンプルコードでは BackgroundTasks の前にクエリを実行するような処理ではなかったため、 Depends経由で取得していたget_db()の呼び出しは削除しました。
エンドポイントによっては BackgroundTasks を使う前にDBへクエリを流したいケースもあると思うので、その際は通常通り Dependsを用いてコネクションを確立し、BackgroundTasksには同じコネクションを使い回さないようにすれば良いでしょう。

まとめ

  • BackgroundTasks はリクエスト終了後に実行される
  • SQLAlchemy のコネクションプールでは、セッションに対してはじめてクエリを実行したタイミングでコネクションを確保する。 session.close()を実行した後でも、その後に同じインスタンスに対して実行したクエリは新規実行時と同じようにコネクションリソースの確保を試みる
  • BackgroundTasks で SQLAlchemy を用いたクエリの実行をしたい場合は、BackgroundTasks 用に新規コネクションを確保し、確実にクローズしましょう。今回は ContextManager でラップしましたが、個人的にはおすすめです。

参考

GitHubで編集を提案

Discussion