Zenn
🐥

SQLalchemyにおけるasyncでcreate_engine()ができない

2025/03/17に公開
1

こちらの記事を参考にしてDocker環境でFastAPIの実装をした。

https://zenn.dev/sh0nk/books/537bb028709ab9

その際、参考記事ではDBにMySQLが使用されていたが、PostgresSQLを使用してみた。
このpythonファイルを実行してDBを作成しようとしたときに以下のようなエラーが発生。

migrate_db.py
from sqlalchemy import create_engine

from models.task import Base


DB_URL = "postgresql://postgres:postgres@db:5432/demo"
engine = create_engine(DB_URL, echo=True)


def reset_database():
     Base.metadata.drop_all(bind=engine)
     Base.metadata.create_all(bind=engine)


if __name__ == "__main__":
    reset_database()
ModuleNotFoundError: No module named 'psycopg2'

順当にこちらのモジュールをインストールして再度DB作成を行ったところ、このようなエラーが発生

    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError:
The asyncio extension requires an async driver to be used. The loaded 'psycopg2' is not async.

このエラーについては以下のURLを参考にした
https://stackoverflow.com/questions/69872948/async-sqlalchemy-cant-create-engine

こちらの記事を参考に、以下のように修正

- from sqlalchemy import create_engine
+ from sqlalchemy.ext.asyncio import create_async_engine
  
  
- DB_URL = "postgresql://postgres:postgres@db:5432/demo"
- engine = create_engine(DB_URL, echo=True)
+ DB_URL = "postgresql+asyncpg://postgres:postgres@db:5432/demo"
+ engine = create_async_engine(DB_URL, echo=True)

プロトコルの部分(postgresql://の部分)に非同期通信であることを明示する必要があるみたい

この修正に伴ってその他のコードも修正した
以下が最終的なコード

migrate_db.py
- from sqlalchemy import create_engine
+ import asyncio
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.ext.asyncio import AsyncSession
  
  from models.task import Base
  
- DB_URL = "postgresql://postgres:postgres@db:5432/demo"
- engine = create_engine(DB_URL, echo=True)
+ # 非同期接続用のURL
+ DB_URL = "postgresql+asyncpg://postgres:postgres@db:5432/demo"
+ engine = create_async_engine(DB_URL, echo=True)
  
- def reset_database():
-     Base.metadata.drop_all(bind=engine)
-     Base.metadata.create_all(bind=engine)
+ async def reset_database():
+     async with engine.begin() as conn:
+         await conn.run_sync(Base.metadata.drop_all)
+         await conn.run_sync(Base.metadata.create_all)
+ 
  if __name__ == "__main__":
-     reset_database()
+     asyncio.run(reset_database())
1

Discussion

ログインするとコメントできます