🧬

DynamoDB の up/downマイグレーションツールを Python とPynamoDB で自作する

2024/03/29に公開

概要

DynamoDB が永続層のアプリケーションを運用するにあたって、Python でバックエンドを記述しているが、既存のライブラリにマイグレーションツールがなかったため自作した(DynamoDB のためスキーマのマイグレーションではなくデータのマイグレーションを対象としている)。

ORM として PynamoDB を利用しているケースを想定している(が、PynamoDBを利用していない場合でも多少記述を変更すれば使えるようになる)。一般的な up/down処理に加え、シードデータの作成/削除まで対応している。

利用するライブラリ

ライブラリ バージョン
python 3.10
pynamodb 5.5.1
boto3 1.33.2

成果物

成果物は以下となる。 migration.py を実行(このとき、up/downを指定できる)することで、migrations ディレクトリ配下のファイルに記述された up/down定義が実行され、履歴管理される。 seed.py も同じように動作する。詳細は 各ファイルの詳細と解説 の項で説明する。

.
├── migration.py # migration を実行するスクリプト
├── migrations   # migration の up/down定義 を記述したファイルを配置
│   ├── 202403100900_create_xxx.py
│   ├── 202403201700_update_xxx.py
│   ├── ...
│   └── 202403261500_delete_xxx.py
├── seed.py      # seed を実行するスクリプト
└── seeds        # seed の up/down定義 を記述したファイルを配置
    └── dev      # どの環境で利用したいかをディレクトリで分ける
        ├── 202403011600_general.py
        ├── ...
        └── 202403041200_multi_boards.py

ツールの使い方

事前準備

マイグレーション履歴管理テーブルの作成

マイグレーションの履歴を管理するテーブルとして、migrations テーブルを作成する。ここでは AWS CLI を用いる。

$ aws dynamodb create-table \
    --cli-input-json file://migrations.json \
    --no-cli-pager

テーブル定義ファイル(Json)は以下。operation_typeversion を属性に持つ。

属性 説明
operation_type migrate または seed という文字列が入る(マイグレーションとシードの履歴をそれぞれ管理するため)
version up/down定義ファイルにある日付が入る(up/down定義ファイルの新規/既存を判別するため)
{
    "TableName": "migrations",
    "AttributeDefinitions": [
        {
            "AttributeName": "operation_type",
            "AttributeType": "S"
        },
        {
            "AttributeName": "version",
            "AttributeType": "S"
        }
    ],
    "KeySchema": [
        {
            "AttributeName": "operation_type",
            "KeyType": "HASH"
        },
        {
            "AttributeName": "version",
            "KeyType": "RANGE"
        }
    ],
    "BillingMode": "PAY_PER_REQUEST"
}

サンプルテーブルの作成

DynamoDB にマイグレーション対象となるサンプルのテーブル(usersboards)を AWS CLI を用いて作成しておく。

$ aws dynamodb create-table \
    --cli-input-json file://users.json \
    --no-cli-pager

$ aws dynamodb create-table \
    --cli-input-json file://boards.json \
    --no-cli-pager

各テーブル定義ファイル(Json)は以下。

users.json
id, email 属性を持ち、email を GlobalSecondaryIndexes に指定。

{
    "TableName": "users",
    "AttributeDefinitions": [
        {
            "AttributeName": "id",
            "AttributeType": "S"
        },
        {
            "AttributeName": "email",
            "AttributeType": "S"
        }
    ],
    "KeySchema": [
        {
            "AttributeName": "id",
            "KeyType": "HASH"
        }
    ],
    "GlobalSecondaryIndexes": [
        {
            "IndexName": "email_gsi",
            "KeySchema": [
                {
                    "AttributeName": "email",
                    "KeyType": "HASH"
                }
            ],
            "Projection": {
                "ProjectionType": "ALL"
            }
        }
    ],
    "BillingMode": "PAY_PER_REQUEST"
}

boards.json
id を属性に持つ。

{
    "TableName": "boards",
    "AttributeDefinitions": [
        {
            "AttributeName": "id",
            "AttributeType": "S"
        }
    ],
    "KeySchema": [
        {
            "AttributeName": "id",
            "KeyType": "HASH"
        }
    ],
    "BillingMode": "PAY_PER_REQUEST"
}

サンプルテーブルの PynamoDB Model の作成

ここでは users テーブルの PynamoDB Model を作成し、後ほど seed で利用する。この PynamoDB Model はアプリケーションでも利用する。

import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute
from pynamodb.indexes import AllProjection, GlobalSecondaryIndex

IS_LOCAL = os.environ.get("IS_LOCAL")
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000"


class EmailGsi(GlobalSecondaryIndex):
    class Meta:
        index_name = "email_gsi"
        projection = AllProjection()

    email = UnicodeAttribute(hash_key=True)


class User(Model):
    class Meta:
        table_name = "users"
        region = AWS_REGION
        host = ENDPOINT_URL

    id = UnicodeAttribute(hash_key=True)
    name = UnicodeAttribute()
    email = UnicodeAttribute()
    language = UnicodeAttribute()
    email_gsi = EmailGsi()

マイグレーション

1. up/donw定義ファイルの作成

例えば、boards テーブルに初期出荷データを投入するケースでは、以下のようなファイルを作成する。

202403100900_create_boards.py

def up(client) -> list:
    put_sample_board = {
        "Put": {
            "TableName": "boards",
            "Item": {
                "id": {"S": "sample_board"},
                "name": {"S": "サンプルボード"},
                "order": {"N": "0"},
                "charts": {
                    "L": [
                        {"S": "chart1"},
                        {"S": "chart2"},
                        {"S": "chart3"},
                    ]
                }
            },
        }
    }

    return [
        put_sample_board,
    ]


def down(client) -> list:
    delete_sample_board = {
        "Delete": {
            "TableName": "boards",
            "Key": {
                "id": {"S": "sample_board"},
            },
        }
    }

    return [
        delete_sample_board,
    ]

2. upマイグレーションの実行

通常のマイグレーション(upマイグレーション)は以下のコマンドで実行できる。

$ python migration.py

成功すると以下のようなログが出力される。

Succeeded:
[{'Put': {'TableName': 'boards', 'Item': {'id': {'S': 'sample_board'}, 'name': {'S': 'サンプルボード'}, 'order': {'N': '0'}, 'charts': {'L': [{'S': 'chart1'}, {'S': 'chart2'}, {'S': 'chart3'}]}}}}, {'Put': {'TableName': 'migrations', 'Item': {'operation_type': {'S': 'migrate'}, 'version': {'S': '202403100900_create_boards'}}}}]
Applied 202403100900_create_boards

3. downマイグレーションの実行

downマイグレーションは以下のコマンドで実行できる。

$ python migration.py --down

成功すると以下のようなログが出力される。

Succeeded:
[{'Delete': {'TableName': 'boards', 'Key': {'id': {'S': 'sample_board'}}}}, {'Delete': {'TableName': 'migrations', 'Key': {'operation_type': {'S': 'migrate'}, 'version': {'S': '202403100900_create_boards'}}}}]
Reverted 202403100900_create_boards

シード

1. up/donw定義ファイルの作成

例えば、users テーブルに開発用のユーザーを作成するケースでは、以下のようなファイルを作成する。

from user import User


user = User(
    id="sample_user_id",
    name="sample_user_name",
    email="user@sample.com",
    language="ja",
)


def up():
    return [
        user,
    ]


def down():
    return [
        user,
    ]

2. upシードの実行

upシードは以下のコマンドで実行できる。

$ python seed.py dev # dev はディレクトリ名

成功すると以下のようなログが出力される。

Applied 202403011600_general

3. downシードの実行

downシードは以下のコマンドで実行できる。

$ python seed.py dev --down

成功すると以下のようなログが出力される。

Reverted 202403011600_general

各ファイルの詳細と解説

各ファイルのコードに対してコメントで解説している。

migration.py

マイグレーションを実行するスクリプト。主に以下の制御を行う。

  • 引数情報から up/down を判定
  • 新規の up/down定義ファイルを順に実行
  • 既に適用済みの up/down定義ファイルをスキップ
  • up/down定義ファイルごとのトランザクション
  • マイグレーション履歴管理テーブルの更新
import argparse
import importlib
import os

import boto3
from botocore.exceptions import ClientError
from pynamodb.attributes import UnicodeAttribute
from pynamodb.models import Model

IS_LOCAL = os.environ.get("IS_LOCAL") # ローカル実行用で次のように利用 $IS_LOCAL="true" python migration.py
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000" # ローカル時はDynamoDB localのエンドポイントを指定
TABLE_NAME = "migrations"
OPERATION_TYPE = "migrate" # マイグレーション履歴管理テーブルは、マイグレーションとシードどちらも管理しているため、migrateに絞るために定義
MAX_TRANSACT_ITEMS = 100

client = boto3.client(
    "dynamodb",
    region_name=AWS_REGION,
    endpoint_url=ENDPOINT_URL,
)


# マイグレーション履歴管理テーブルの PynamoDB Model
class Migration(Model):
    class Meta:
        table_name = TABLE_NAME
        region = AWS_REGION
        host = ENDPOINT_URL

    operation_type = UnicodeAttribute(hash_key=True)
    version = UnicodeAttribute(range_key=True)


def main(down: bool):
    # 適用済みのマイグレーションを取得
    response = Migration.query(OPERATION_TYPE, scan_index_forward=False)
    applied_versions = [item.version for item in response]

    target_dir = "migrations"
    if down:
        __down(applied_versions, target_dir)
    else:
        __up(applied_versions, target_dir)


def __down(applied_versions: list[str], target_dir: str):
    if not applied_versions:
        print("No data to perform down seed")
        return

    # 最新のバージョンを取得し、対象up/down定義ファイルのdownメソッドを実行
    version = applied_versions[0]
    module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
    # up/down定義ファイルでreturnされる単位をトランザクションとして管理
    transact_items = module.down(client)

    # トランザクションアイテムにマイグレーション履歴管理テーブルの最新バージョンの削除定義を追加
    transact_items.append(
        {
            "Delete": {
                "TableName": TABLE_NAME,
                "Key": {
                    "operation_type": {"S": OPERATION_TYPE},
                    "version": {"S": version},
                },
            }
        }
    )
    # トランザクションの限界値を超えた場合はトランザクションを分割して実行
    __transact_chunk_write(transact_items, MAX_TRANSACT_ITEMS, version)
    print(f"Reverted {version}")


def __up(applied_versions: list[str], target_dir: str):
    # ディレクトリ以下のファイルからバージョンを取得
    versions = sorted([f.replace(".py", "") for f in os.listdir(target_dir) if f.endswith(".py")])

    # 新しいバージョンが存在するかどうかをチェック
    new_versions = [v for v in versions if not applied_versions or v > applied_versions[0]]

    if not new_versions:
        print("Already up to date")
        return

    # 新しいバージョンを順次適用
    for version in new_versions:
        module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
        transact_items = module.up(client)

        transact_items.append(
            {
                "Put": {
                    "TableName": TABLE_NAME,
                    "Item": {
                        "operation_type": {"S": OPERATION_TYPE},
                        "version": {"S": version},
                    },
                }
            }
        )

        __transact_chunk_write(transact_items, MAX_TRANSACT_ITEMS, version)
        print(f"Applied {version}")


def __transact_chunk_write(transact_items, max_transact_items, version):
    """MAX_TRANSACT_ITEMS毎に分割してトランザクションの実行"""
    for chunked_transact_items in __chunk(transact_items, max_transact_items):
        try:
            client.transact_write_items(TransactItems=chunked_transact_items)
            print(f"Succeeded:\n{chunked_transact_items}")
        except ClientError as e:
            print(f"Failed:\n{chunked_transact_items}")
            print(f"Error applying {version}: {e}")
            raise


def __chunk(lst: list, n: int):
    for i in range(0, len(lst), n):
        yield lst[i : i + n]


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--down", action="store_true")
    args = parser.parse_args()

    main(args.down)

migrations ディレクトリ内のファイル

このディレクトリにはマイグレーションの up/down定義ファイルを配置する。

202403100900_create_boards.py を例にとると、202403100900 がバージョンとして利用され、その後の _create_boards はどんな文字列でも良い。ファイルを追加するときは、このバージョンをより新しいものにすることで、次回のマイグレーション対象にすることができる。

その他の解説はコメントで記載している。

def up(client) -> list: # client(boto3のdynamodb client)は、scanメソッドなどの機能を利用したい場面で使う
    put_sample_board = { # テーブル操作の定義を変数に格納
        "Put": {
            "TableName": "boards",
            "Item": {
                "id": {"S": "sample_board"},
                "name": {"S": "サンプルボード"},
                "order": {"N": "0"},
                "charts": {
                    "L": [
                        {"S": "chart1"},
                        {"S": "chart2"},
                        {"S": "chart3"},
                    ]
                }
            },
        }
    }

    return [
        put_sample_board, # 適用したい順番で変数を配列に追加
    ]


def down(client) -> list:
    delete_sample_board = {
        "Delete": {
            "TableName": "boards",
            "Key": {
                "id": {"S": "sample_board"},
            },
        }
    }

    return [
        delete_sample_board, # upで複数の場合、downでも基本的に複数になるが、その場合はupと逆順で記述するのが一般的
    ]

seed.py

migration.py とほとんど同じだが、PynamoDB Model を利用する部分が異なる。また、大量のシードデータを作成/削除するケースが少ないのでトランザクションを分割して実行することはしていないが、必要であれば migration.py と同様の処理を加えることで実現できる。

import argparse
import importlib
import os

import boto3
from pynamodb.attributes import UnicodeAttribute
from pynamodb.connection.base import Connection
from pynamodb.models import Model
from pynamodb.transactions import TransactWrite

IS_LOCAL = os.environ.get("IS_LOCAL")
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000"
TABLE_NAME = "migrations"
OPERATION_TYPE = "seed"

client = boto3.client(
    "dynamodb",
    region_name=AWS_REGION,
    endpoint_url=ENDPOINT_URL,
)


class Migration(Model):
    class Meta:
        table_name = TABLE_NAME
        region = AWS_REGION
        host = ENDPOINT_URL

    operation_type = UnicodeAttribute(hash_key=True)
    version = UnicodeAttribute(range_key=True)


def main(down: bool, target_dir: str):
    response = Migration.query(OPERATION_TYPE, scan_index_forward=False)
    applied_versions = [item.version for item in response]

    target_dir = "seeds/" + target_dir
    conn = Connection(region=AWS_REGION, host=ENDPOINT_URL)
    if down:
        __down(applied_versions, target_dir, conn)
    else:
        __up(applied_versions, target_dir, conn)


def __down(applied_versions: list[str], target_dir: str, conn: Connection):
    if not applied_versions:
        print("No data to perform down seed")
        return

    version = applied_versions[0]
    module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
    transact_items = module.down()

    try:
        with TransactWrite(connection=conn) as transaction:
            for item in transact_items:
                # PynamoDB Model を利用するので deleteメソッドを呼ぶ
                transaction.delete(item)
            transaction.delete(
                Migration(version=version, operation_type=OPERATION_TYPE)
            )

            print(f"Reverted {version}")

    except Exception as e:
        print(f"Error reverting {version}: {e}")


def __up(applied_versions: list[str], target_dir: str, conn: Connection):
    versions = sorted(
        [f.replace(".py", "") for f in os.listdir(target_dir) if f.endswith(".py")]
    )

    new_versions = [
        v for v in versions if not applied_versions or v > applied_versions[0]
    ]

    if not new_versions:
        print("Already up to date")
        return

    for version in new_versions:
        module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
        transact_items = module.up()

        try:
            with TransactWrite(connection=conn) as transaction:
                for item in transact_items:
                    # PynamoDB Model を利用するので saveメソッドを呼ぶ
                    transaction.save(item)
                transaction.save(
                    Migration(version=version, operation_type=OPERATION_TYPE)
                )

                print(f"Applied {version}")

        except Exception as e:
            print(f"Error applying {version}: {e}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("dir")
    parser.add_argument("--down", action="store_true")
    args = parser.parse_args()

    main(args.down, args.dir)

seeds ディレクトリ内のファイル

このディレクトリにはシードの up/down定義ファイルを配置する。

migrations ディレクトリ内のファイル とほとんど同じ使い方をする。PynamoDB Model を利用できるため、マイグレーションよりシンプルに記述できる。

from user import User


user = User( # up/downしたい PynamoDB Model をインスタンス化
    id="sample_user_id",
    name="sample_user_name",
    email="user@sample.com",
    language="ja",
)


def up():
    return [
        user,
    ]


def down():
    return [
        user,
    ]

Discussion