DynamoDB の up/downマイグレーションツールを Python とPynamoDB で自作する
概要
DynamoDB が永続層のアプリケーションを運用するにあたって、Python でバックエンドを記述しているが、既存のライブラリにマイグレーションツールがなかったため自作した(DynamoDB のためスキーマのマイグレーションではなくデータのマイグレーションを対象としている)。
ORM として PynamoDB を利用しているケースを想定している(が、PynamoDBを利用していない場合でも多少記述を変更すれば使えるようになる)。一般的な up/down処理に加え、シードデータの作成/削除まで対応している。
利用するライブラリ
ライブラリ | バージョン |
---|---|
python | 3.10 |
pynamodb | 5.5.1 |
boto3 | 1.33.2 |
成果物
成果物は以下となる。 migration.py
を実行(このとき、up/downを指定できる)することで、migrations
ディレクトリ配下のファイルに記述された up/down定義が実行され、履歴管理される。 seed.py
も同じように動作する。詳細は 各ファイルの詳細と解説 の項で説明する。
.
├── migration.py # migration を実行するスクリプト
├── migrations # migration の up/down定義 を記述したファイルを配置
│ ├── 202403100900_create_xxx.py
│ ├── 202403201700_update_xxx.py
│ ├── ...
│ └── 202403261500_delete_xxx.py
├── seed.py # seed を実行するスクリプト
└── seeds # seed の up/down定義 を記述したファイルを配置
└── dev # どの環境で利用したいかをディレクトリで分ける
├── 202403011600_general.py
├── ...
└── 202403041200_multi_boards.py
ツールの使い方
事前準備
マイグレーション履歴管理テーブルの作成
マイグレーションの履歴を管理するテーブルとして、migrations
テーブルを作成する。ここでは AWS CLI を用いる。
$ aws dynamodb create-table \
--cli-input-json file://migrations.json \
--no-cli-pager
テーブル定義ファイル(Json)は以下。operation_type
と version
を属性に持つ。
属性 | 説明 |
---|---|
operation_type |
migrate または seed という文字列が入る(マイグレーションとシードの履歴をそれぞれ管理するため) |
version |
up/down定義ファイルにある日付が入る(up/down定義ファイルの新規/既存を判別するため) |
{
"TableName": "migrations",
"AttributeDefinitions": [
{
"AttributeName": "operation_type",
"AttributeType": "S"
},
{
"AttributeName": "version",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "operation_type",
"KeyType": "HASH"
},
{
"AttributeName": "version",
"KeyType": "RANGE"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
サンプルテーブルの作成
DynamoDB にマイグレーション対象となるサンプルのテーブル(users
と boards
)を AWS CLI を用いて作成しておく。
$ aws dynamodb create-table \
--cli-input-json file://users.json \
--no-cli-pager
$ aws dynamodb create-table \
--cli-input-json file://boards.json \
--no-cli-pager
各テーブル定義ファイル(Json)は以下。
users.json
id, email 属性を持ち、email を GlobalSecondaryIndexes に指定。
{
"TableName": "users",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
},
{
"AttributeName": "email",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"GlobalSecondaryIndexes": [
{
"IndexName": "email_gsi",
"KeySchema": [
{
"AttributeName": "email",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "ALL"
}
}
],
"BillingMode": "PAY_PER_REQUEST"
}
boards.json
id を属性に持つ。
{
"TableName": "boards",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
サンプルテーブルの PynamoDB Model の作成
ここでは users
テーブルの PynamoDB Model を作成し、後ほど seed で利用する。この PynamoDB Model はアプリケーションでも利用する。
import os
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute
from pynamodb.indexes import AllProjection, GlobalSecondaryIndex
IS_LOCAL = os.environ.get("IS_LOCAL")
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000"
class EmailGsi(GlobalSecondaryIndex):
class Meta:
index_name = "email_gsi"
projection = AllProjection()
email = UnicodeAttribute(hash_key=True)
class User(Model):
class Meta:
table_name = "users"
region = AWS_REGION
host = ENDPOINT_URL
id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
email = UnicodeAttribute()
language = UnicodeAttribute()
email_gsi = EmailGsi()
マイグレーション
1. up/donw定義ファイルの作成
例えば、boards
テーブルに初期出荷データを投入するケースでは、以下のようなファイルを作成する。
202403100900_create_boards.py
def up(client) -> list:
put_sample_board = {
"Put": {
"TableName": "boards",
"Item": {
"id": {"S": "sample_board"},
"name": {"S": "サンプルボード"},
"order": {"N": "0"},
"charts": {
"L": [
{"S": "chart1"},
{"S": "chart2"},
{"S": "chart3"},
]
}
},
}
}
return [
put_sample_board,
]
def down(client) -> list:
delete_sample_board = {
"Delete": {
"TableName": "boards",
"Key": {
"id": {"S": "sample_board"},
},
}
}
return [
delete_sample_board,
]
2. upマイグレーションの実行
通常のマイグレーション(upマイグレーション)は以下のコマンドで実行できる。
$ python migration.py
成功すると以下のようなログが出力される。
Succeeded:
[{'Put': {'TableName': 'boards', 'Item': {'id': {'S': 'sample_board'}, 'name': {'S': 'サンプルボード'}, 'order': {'N': '0'}, 'charts': {'L': [{'S': 'chart1'}, {'S': 'chart2'}, {'S': 'chart3'}]}}}}, {'Put': {'TableName': 'migrations', 'Item': {'operation_type': {'S': 'migrate'}, 'version': {'S': '202403100900_create_boards'}}}}]
Applied 202403100900_create_boards
3. downマイグレーションの実行
downマイグレーションは以下のコマンドで実行できる。
$ python migration.py --down
成功すると以下のようなログが出力される。
Succeeded:
[{'Delete': {'TableName': 'boards', 'Key': {'id': {'S': 'sample_board'}}}}, {'Delete': {'TableName': 'migrations', 'Key': {'operation_type': {'S': 'migrate'}, 'version': {'S': '202403100900_create_boards'}}}}]
Reverted 202403100900_create_boards
シード
1. up/donw定義ファイルの作成
例えば、users
テーブルに開発用のユーザーを作成するケースでは、以下のようなファイルを作成する。
from user import User
user = User(
id="sample_user_id",
name="sample_user_name",
email="user@sample.com",
language="ja",
)
def up():
return [
user,
]
def down():
return [
user,
]
2. upシードの実行
upシードは以下のコマンドで実行できる。
$ python seed.py dev # dev はディレクトリ名
成功すると以下のようなログが出力される。
Applied 202403011600_general
3. downシードの実行
downシードは以下のコマンドで実行できる。
$ python seed.py dev --down
成功すると以下のようなログが出力される。
Reverted 202403011600_general
各ファイルの詳細と解説
各ファイルのコードに対してコメントで解説している。
migration.py
マイグレーションを実行するスクリプト。主に以下の制御を行う。
- 引数情報から up/down を判定
- 新規の up/down定義ファイルを順に実行
- 既に適用済みの up/down定義ファイルをスキップ
- up/down定義ファイルごとのトランザクション
- マイグレーション履歴管理テーブルの更新
import argparse
import importlib
import os
import boto3
from botocore.exceptions import ClientError
from pynamodb.attributes import UnicodeAttribute
from pynamodb.models import Model
IS_LOCAL = os.environ.get("IS_LOCAL") # ローカル実行用で次のように利用 $IS_LOCAL="true" python migration.py
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000" # ローカル時はDynamoDB localのエンドポイントを指定
TABLE_NAME = "migrations"
OPERATION_TYPE = "migrate" # マイグレーション履歴管理テーブルは、マイグレーションとシードどちらも管理しているため、migrateに絞るために定義
MAX_TRANSACT_ITEMS = 100
client = boto3.client(
"dynamodb",
region_name=AWS_REGION,
endpoint_url=ENDPOINT_URL,
)
# マイグレーション履歴管理テーブルの PynamoDB Model
class Migration(Model):
class Meta:
table_name = TABLE_NAME
region = AWS_REGION
host = ENDPOINT_URL
operation_type = UnicodeAttribute(hash_key=True)
version = UnicodeAttribute(range_key=True)
def main(down: bool):
# 適用済みのマイグレーションを取得
response = Migration.query(OPERATION_TYPE, scan_index_forward=False)
applied_versions = [item.version for item in response]
target_dir = "migrations"
if down:
__down(applied_versions, target_dir)
else:
__up(applied_versions, target_dir)
def __down(applied_versions: list[str], target_dir: str):
if not applied_versions:
print("No data to perform down seed")
return
# 最新のバージョンを取得し、対象up/down定義ファイルのdownメソッドを実行
version = applied_versions[0]
module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
# up/down定義ファイルでreturnされる単位をトランザクションとして管理
transact_items = module.down(client)
# トランザクションアイテムにマイグレーション履歴管理テーブルの最新バージョンの削除定義を追加
transact_items.append(
{
"Delete": {
"TableName": TABLE_NAME,
"Key": {
"operation_type": {"S": OPERATION_TYPE},
"version": {"S": version},
},
}
}
)
# トランザクションの限界値を超えた場合はトランザクションを分割して実行
__transact_chunk_write(transact_items, MAX_TRANSACT_ITEMS, version)
print(f"Reverted {version}")
def __up(applied_versions: list[str], target_dir: str):
# ディレクトリ以下のファイルからバージョンを取得
versions = sorted([f.replace(".py", "") for f in os.listdir(target_dir) if f.endswith(".py")])
# 新しいバージョンが存在するかどうかをチェック
new_versions = [v for v in versions if not applied_versions or v > applied_versions[0]]
if not new_versions:
print("Already up to date")
return
# 新しいバージョンを順次適用
for version in new_versions:
module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
transact_items = module.up(client)
transact_items.append(
{
"Put": {
"TableName": TABLE_NAME,
"Item": {
"operation_type": {"S": OPERATION_TYPE},
"version": {"S": version},
},
}
}
)
__transact_chunk_write(transact_items, MAX_TRANSACT_ITEMS, version)
print(f"Applied {version}")
def __transact_chunk_write(transact_items, max_transact_items, version):
"""MAX_TRANSACT_ITEMS毎に分割してトランザクションの実行"""
for chunked_transact_items in __chunk(transact_items, max_transact_items):
try:
client.transact_write_items(TransactItems=chunked_transact_items)
print(f"Succeeded:\n{chunked_transact_items}")
except ClientError as e:
print(f"Failed:\n{chunked_transact_items}")
print(f"Error applying {version}: {e}")
raise
def __chunk(lst: list, n: int):
for i in range(0, len(lst), n):
yield lst[i : i + n]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--down", action="store_true")
args = parser.parse_args()
main(args.down)
migrations ディレクトリ内のファイル
このディレクトリにはマイグレーションの up/down定義ファイルを配置する。
202403100900_create_boards.py
を例にとると、202403100900
がバージョンとして利用され、その後の _create_boards
はどんな文字列でも良い。ファイルを追加するときは、このバージョンをより新しいものにすることで、次回のマイグレーション対象にすることができる。
その他の解説はコメントで記載している。
def up(client) -> list: # client(boto3のdynamodb client)は、scanメソッドなどの機能を利用したい場面で使う
put_sample_board = { # テーブル操作の定義を変数に格納
"Put": {
"TableName": "boards",
"Item": {
"id": {"S": "sample_board"},
"name": {"S": "サンプルボード"},
"order": {"N": "0"},
"charts": {
"L": [
{"S": "chart1"},
{"S": "chart2"},
{"S": "chart3"},
]
}
},
}
}
return [
put_sample_board, # 適用したい順番で変数を配列に追加
]
def down(client) -> list:
delete_sample_board = {
"Delete": {
"TableName": "boards",
"Key": {
"id": {"S": "sample_board"},
},
}
}
return [
delete_sample_board, # upで複数の場合、downでも基本的に複数になるが、その場合はupと逆順で記述するのが一般的
]
seed.py
migration.py
とほとんど同じだが、PynamoDB Model を利用する部分が異なる。また、大量のシードデータを作成/削除するケースが少ないのでトランザクションを分割して実行することはしていないが、必要であれば migration.py
と同様の処理を加えることで実現できる。
import argparse
import importlib
import os
import boto3
from pynamodb.attributes import UnicodeAttribute
from pynamodb.connection.base import Connection
from pynamodb.models import Model
from pynamodb.transactions import TransactWrite
IS_LOCAL = os.environ.get("IS_LOCAL")
AWS_REGION = os.environ.get("AWS_REGION")
ENDPOINT_URL = None if IS_LOCAL != "true" else "http://host.docker.internal:8000"
TABLE_NAME = "migrations"
OPERATION_TYPE = "seed"
client = boto3.client(
"dynamodb",
region_name=AWS_REGION,
endpoint_url=ENDPOINT_URL,
)
class Migration(Model):
class Meta:
table_name = TABLE_NAME
region = AWS_REGION
host = ENDPOINT_URL
operation_type = UnicodeAttribute(hash_key=True)
version = UnicodeAttribute(range_key=True)
def main(down: bool, target_dir: str):
response = Migration.query(OPERATION_TYPE, scan_index_forward=False)
applied_versions = [item.version for item in response]
target_dir = "seeds/" + target_dir
conn = Connection(region=AWS_REGION, host=ENDPOINT_URL)
if down:
__down(applied_versions, target_dir, conn)
else:
__up(applied_versions, target_dir, conn)
def __down(applied_versions: list[str], target_dir: str, conn: Connection):
if not applied_versions:
print("No data to perform down seed")
return
version = applied_versions[0]
module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
transact_items = module.down()
try:
with TransactWrite(connection=conn) as transaction:
for item in transact_items:
# PynamoDB Model を利用するので deleteメソッドを呼ぶ
transaction.delete(item)
transaction.delete(
Migration(version=version, operation_type=OPERATION_TYPE)
)
print(f"Reverted {version}")
except Exception as e:
print(f"Error reverting {version}: {e}")
def __up(applied_versions: list[str], target_dir: str, conn: Connection):
versions = sorted(
[f.replace(".py", "") for f in os.listdir(target_dir) if f.endswith(".py")]
)
new_versions = [
v for v in versions if not applied_versions or v > applied_versions[0]
]
if not new_versions:
print("Already up to date")
return
for version in new_versions:
module = importlib.import_module(f"{target_dir}.{version}".replace("/", "."))
transact_items = module.up()
try:
with TransactWrite(connection=conn) as transaction:
for item in transact_items:
# PynamoDB Model を利用するので saveメソッドを呼ぶ
transaction.save(item)
transaction.save(
Migration(version=version, operation_type=OPERATION_TYPE)
)
print(f"Applied {version}")
except Exception as e:
print(f"Error applying {version}: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("dir")
parser.add_argument("--down", action="store_true")
args = parser.parse_args()
main(args.down, args.dir)
seeds ディレクトリ内のファイル
このディレクトリにはシードの up/down定義ファイルを配置する。
migrations ディレクトリ内のファイル
とほとんど同じ使い方をする。PynamoDB Model を利用できるため、マイグレーションよりシンプルに記述できる。
from user import User
user = User( # up/downしたい PynamoDB Model をインスタンス化
id="sample_user_id",
name="sample_user_name",
email="user@sample.com",
language="ja",
)
def up():
return [
user,
]
def down():
return [
user,
]
Discussion