Alembicでマイグレーション(FastAPI+SQLModel)
前回、FastAPIとSQLmoelを使用して、webアプリケーションを作成しました。
このアプリケーションに、別のモデルを追加して、さらに、二つのデータベースに関連性を持たせます。UserSchema設定変更
前回作成しUserScehmaを変更します。
コードはこちら
class User(BaseUser, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
+ uuid: str = Field(default_factory = uuid.uuid4, nullable = False)
hashed_password: str = Field(nullable = False)
created_at: datetime = Field(default = datetime.now(), nullable = False)
updated_at: datetime = Field(default_factory = datetime.now, nullable = False, sa_column_kwargs = {'onupdate': datetime.now})
refresh_token: str = Field(nullable = True)
〜中略〜
+class UserRead(BaseUser):
+ id: int
+ uuid: str
+ created_at: datetime
+ updated_at: datetime
+ press_releases: list["PressReleaseReadWithUser"] = []
これから作成するスキーマ(今回はPress Release schema)と関係性を持たせるため、foreign_keyに設定するキーには外部キー制約(foreign key constraints)の設定が必要です。
外部キー制約として、foreign_keyに使用される値が一意に決まることが必要です。
よって、primary_key=Trueまたはunique=Trueを付与することが必要になります。
ここでは、foreign_keyにidを使用することにします。
既にidにprimary_keyを設定しているので、これでOKです。
UserReadは、この後PressReleaseReadWithUserを設定して、ユーザーの検索結果がreturnされるときに、ユーザーに紐づくPress Releaseデータも一緒にreturnするように設定していきます。
変更したUserSchemaをマイグレートする
それでは変更したUserSchemaをマイグレートします。
(poetry run) alembic revision --autogenerate -m 'Change BaseUseModel'
作成されたマイグレーションファイルを実行し、マイグレートします。
(poetry run) alembic upgrade head
これで、User Schemaの更新は終了です。
Press Release Schemaの定義
次に、User Schemaと関係性を持たせるPress Release Schemaを定義していきます。
コードはこちら
from sqlmodel import SQLModel, Field, Relationship, AutoString
from typing import Optional, TYPE_CHECKING
import uuid
from datetime import datetime
if TYPE_CHECKING:
from .user import User
class BasePressRelease(SQLModel):
# 必須ではない項目には「Optional[型]」と「default = None」を設定
year: str = Field(nullable=False) # 年度
title: str = Field(nullable = False, unique = False)
description: Optional[str] = Field(nullable=True, default=None) # 説明、リード文
content: Optional[str] = Field(nullable=True, default=None) # 内容
etc: Optional[str] = Field(nullable=True, default=None) # その他
release_date: Optional[datetime] = Field(default=None)
author_name: Optional[str] = Field(nullable=True, default=None)
author_department: Optional[str] = Field(nullable=True, default=None) # 選択式
author_post_name: Optional[str] = Field(nullable=True, default=None) # 選択式
author_address_number: Optional[str] = Field(nullable=True, default=None) # 選択式
author_address: Optional[str] = Field(nullable=True, default=None) # 選択式
author_tel_num: Optional[str] = Field(nullable=True, default=None) # 選択式
author_fax_num: Optional[str] = Field(nullable=True, default=None) # 選択式
author_email: Optional[str] =Field(sa_type=AutoString, nullable=True, default=None)
thumbnail_1: Optional[str] = Field(nullable=True, default=None)
thumbnail_2: Optional[str] = Field(nullable=True, default=None)
thumbnail_3: Optional[str] = Field(nullable=True, default=None)
file_1: Optional[str] = Field(nullable=True, default=None)
file_2: Optional[str] = Field(nullable=True, default=None)
file_3: Optional[str] = Field(nullable=True, default=None)
is_draft: bool = Field(nullable=True, default=True)
class PressRelease(BasePressRelease, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
uuid: str = Field(default_factory = uuid.uuid4, nullable = False)
created_at: datetime = Field(default = datetime.now(), nullable = False)
updated_at: datetime = Field(default_factory = datetime.now, nullable = False, sa_column_kwargs = {'onupdate': datetime.now})
user_id: Optional[int] = Field(default=None, foreign_key="user.id", ondelete="CASCADE")
user: Optional["User"] = Relationship(back_populates="press_releases")
class PressReleaseList(SQLModel):
uuid: str
title: str
description: str
created_at: datetime
author_department: str
is_draft: bool
class PressReleaseRead(BasePressRelease):
user_id: int
uuid: str
created_at: datetime
user: "UserOutWithPress | None" = None
class PressReleaseReadWithUser(SQLModel):
id: int
uuid: str
created_at: datetime
class PressReleaseCreate(BasePressRelease):
user_id: int
class PressReleaseUpdate(BasePressRelease):
user_id: int
delete_thumbnails_flags: dict
delete_files_flags: dict
# UserをPressReleaseDataに含めてreturnするためのschema
class UserOutWithPress(SQLModel):
id: int
uuid: str
username: str
email: str
is_active: bool
is_superuser: bool
created_at: datetime
updated_at: datetime
foreign_keyの設定
Press Release Schemaにはforeign_keyを設定しています(以下の部分です)。
user_id: Optional[int] = Field(default=None, foreign_key="user.id", ondelete="CASCADE")
書き方は、
「キー名: 型 = Field(設定内容, foreign_key="外部テーブル名.外部テーブルのキー名")」
です。
今回は「ondelete="CASCADE"」を設定しています。
これは、ユーザーが削除された場合、Press Releaseのデータも一緒に削除する仕組みにするための設定です。
Relationshipの設定
次は、Relationshipの設定をしていきます(以下の部分です)。
user: Optional["User"] = Relationship(back_populates="press_releases")
データ結合時のテーブル同士の関連性を明確にするためにRelationshipクラスを使用します。
Relationshipの引数であるback_populatesに設定するキーは、接続するデータベースでも設定が必要になります。
コードはこちら
class BaseUser(SQLModel):
username: str = Field(nullable = False, default = None, unique = True)
email: EmailStr =Field(unique = True, index = True, sa_type=AutoString)
is_active: bool = Field(default = True)
is_superuser: bool = Field(default = False)
class User(BaseUser, table = True):
id: Optional[int] = Field(default = None, primary_key = True)
uuid: str = Field(default_factory = uuid.uuid4, nullable = False)
hashed_password: str = Field(nullable = False)
created_at: datetime = Field(default = datetime.now(), nullable = False)
updated_at: datetime = Field(default_factory = datetime.now, nullable = False, sa_column_kwargs = {'onupdate': datetime.now})
refresh_token: str = Field(nullable = True)
press_releases: list["PressRelease"] | None = Relationship(back_populates="user", cascade_delete=True)
これでScehmaの設定は完了です。
TYPE_CHECKINGについて
上記で修正したUser Schemaや追加で設定したPress Release Schemaには、「TYPE_CHECKING」という書き方を追記しています(例えば以下のように書いています)。
if TYPE_CHECKING:
from .user import User
これは、今回のようにファイルをUserとPress Releaseで分けて書く場合に、それぞれのファイルで互いのモデルをインポートしなければならなくなります。
このようなときに、循環インポートが生じてしまうことを防ぐための設定です。
そして、Schemaをインポートして使用するのは、型チェック時のみです。
そのため、「TYPE_CHECKING」とあるように、型チェック時のみ必要なコードとして認識させるため、このような書き方をしています。
同じファイル内にモデルを定義するのであれば、問題は生じませんが、通常、モデルが多くなると、ファイル分割は必要になりますので、このような手法が取られているようです。
複数のSchemaのマイグレーション(SQLModel)
それでは設定したSchemaをベースにマイグレーションしていきます。
env.pyファイルに設定を追記
まずはコードの全体像を示します。
コードはこちら
import os
import sys
sys.path.append(os.getcwd())
from logging.config import fileConfig
from sqlalchemy import engine_from_config
+ from sqlalchemy import MetaData, pool
from alembic import context
from sqlmodel import SQLModel
from v1.model.user import User
+ from v1.schema.press import PressRelease
from dotenv import load_dotenv
load_dotenv()
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# add
target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
config.set_section_option('alembic', 'POSTGRES_USER', os.environ['POSTGRES_USER']) # add
config.set_section_option('alembic', 'POSTGRES_PASSWORD', os.environ['POSTGRES_PASSWORD']) # add
config.set_section_option('alembic', 'POSTGRES_SERVER', os.environ['POSTGRES_SERVER']) # add
config.set_section_option('alembic', 'POSTGRES_PORT', os.environ['POSTGRES_PORT']) # add
config.set_section_option('alembic', 'POSTGRES_DB', os.environ['POSTGRES_DB']) # add
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
参考にした記事はこちらです。
通常のAlembicの使用方法はこちらを参考にしました。
マイグレートしていく
それでは先ほどと同様にマイグレートします。
(poetry run) alembic revision --autogenerate -m 'Create new table'
作成されたマイグレーションファイルを実行し、マイグレートします。
(poetry run) alembic upgrade head
以上で、新しいモデル、テーブルの設定ができました。
Discussion