分析基盤の DMS をやめて Glue バッチETL にしたら月額 $310 → $12 になった
はじめに
複数の AWS アカウントで運用されている RDS / Aurora のプロダクトデータを、分析専用アカウントに集約して Amazon Athena でクエリできるようにするデータ基盤を構築しています。
当初は AWS DMS (Database Migration Service) を使ってリアルタイム同期していました。しかし、これは 分析チームへの要件確認が不十分なまま「リアルタイム同期が必要だろう」と思い込んで設計した結果 でした。実際に運用を始めてから分析チームにヒアリングしたところ、週次更新で十分だと分かり、以下の課題が浮き彫りになりました。
- DMS の Replication Instance を常時稼働させるコスト: 月額 $310(週次バッチなら不要)
- CDC (Change Data Capture) のオペレーション負荷(レプリケーション遅延の監視、タスク再起動など)
- そもそも 分析用途にリアルタイム同期は不要 だった
結果、DMS を撤廃して Glue バッチ ETL + RDS Snapshot Export に移行し、月額コストを $12 まで削減しました。本記事では設計・実装・つまづきポイントをまとめます。
対象読者
- マルチアカウント環境でデータ分析基盤を構築したい方
- DMS のコストに悩んでいる方
- Glue ETL の実践的なノウハウを知りたい方
規模感
| 項目 | 値 |
|---|---|
| データソース数 | 6(MySQL × 4、PostgreSQL × 1、MariaDB × 1) |
| テーブル数 | 64 |
| 最大テーブル規模 | 約 8 億件 / 1.2 TB(Parquet 圧縮後) |
| スケジュール | 毎週日曜 2:00 JST |
アーキテクチャ概要
全体構成図
ポイント: Glue Job は VPC の Private Subnet 内で起動し、VPC ピアリング経由で各ソース RDS に JDBC 接続します。NAT Instance(NAT Gateway ではなくコスト最適化のため t3.nano を使用)経由で S3 / Secrets Manager などの AWS サービスにアクセスします。
Snapshot Export フロー(初回全量ロード)
初回全量ロードと差分取り込みのデータが 同一 S3 パスに統合 されるのがポイントです。Athena からは 1 つのテーブルとしてシームレスにクエリできます。
週次 ETL 実行フロー
なぜ DMS をやめたか
DMS のコスト構造
DMS は Replication Instance を常時稼働 させる必要があります。データ転送がない時間帯もインスタンス料金が発生するため、週次バッチのような「たまに動かす」ユースケースではコスト効率が悪くなります。
| 項目 | コスト/月 |
|---|---|
| Replication Instance (dms.t3.medium) | ~$70 |
| ストレージ + データ転送 | ~$10 |
| 複数ソースで合計 | ~$310 |
CDC のオペレーション負荷
CDC (Change Data Capture) はリアルタイム同期には強力ですが、運用負荷があります。
- レプリケーション遅延の監視
- ソース DB のスキーマ変更時にタスクの再作成が必要
- binlog の保持期間管理
- レプリケーションタスクのエラー時の手動復旧
「週次で十分」だった
正直に言うと、これは最初の要件定義の段階で確認すべきことでした。「分析基盤だからリアルタイム同期が必要だろう」と思い込み、DMS + CDC という構成を選択してしまいました。
運用開始後に分析チームに改めてヒアリングしたところ、週次更新で十分 ということが分かりました。リアルタイム性が不要なら、CDC のコストと複雑さを負う必要はありません。DMS の月額 $310 は、要件確認不足のコストでもありました。
Glue バッチ ETL の設計
増分取得ロジック
S3 上のメタデータファイルで前回実行時刻を管理し、WHERE timestamp >= last_run で増分データを取得するシンプルな設計です。
S3: glue-metadata/{db_name}/{table_name}/last_run_timestamp.txt
→ "2026-02-16 02:00:00"
def get_last_run_timestamp(table_name):
"""S3から前回実行時刻を取得"""
metadata_key = f"glue-metadata/{db_name}/{table_name}/last_run_timestamp.txt"
try:
response = s3.get_object(Bucket=metadata_bucket, Key=metadata_key)
return response['Body'].read().decode('utf-8').strip()
except s3.exceptions.NoSuchKey:
# 初回実行: 1週間前から取得
return (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
DynamoDB やパラメータストアではなく S3 のテキストファイル にしたのは、追加コストゼロでシンプルだからです。Glue Job のサマリー情報(レコード件数など)も同様に S3 に JSON で保存し、Lambda の Slack 通知から参照しています。
単一スクリプトで複数ソース対応
62 テーブルに対して 62 個の Glue Job を作るのではなく、1 つの Python スクリプト で MySQL / PostgreSQL 両方に対応しています。
# DB_ENGINE に応じて JDBC URL とドライバーを切り替え
if db_engine in ('postgresql', 'postgres'):
jdbc_url = f"jdbc:postgresql://{host}:{port}/{db_name}"
connection_properties = {
"user": user, "password": password,
"driver": "org.postgresql.Driver"
}
else:
jdbc_url = f"jdbc:mysql://{host}:{port}/{db_name}"
connection_properties = {
"user": user, "password": password,
"driver": "com.mysql.jdbc.Driver",
"useSSL": "false", "fetchsize": "10000"
}
テーブルの指定は 単一テーブルモード(--TABLE_NAME)と 複数テーブルモード(--TABLE_LIST)の 2 パターンをサポートしています。プロダクト A は 1 テーブル 1 Job、プロダクト B は DB ごとに 1 Job で複数テーブルを順次処理する構成です。
| Job | テーブル数 | Worker 数 | 実行時間 |
|---|---|---|---|
| プロダクトA (テーブル1) | 1 | 10 | ~2 分 |
| プロダクトA (テーブル2) | 1 | 10 | ~2 分 |
| プロダクトB-1 (PostgreSQL) | 26 | 5 | ~7 分 |
| プロダクトB-2 | 28 | 5 | ~2 分 |
| プロダクトB-3 | 6 | 3 | ~1 時間 |
| プロダクトB-4 | 2 | 2 | ~1 分 |
Spark パーティション最適化(coalesce)
Spark はデフォルトで多数のパーティションにデータを分割しますが、少量データではこれが 致命的なオーバーヘッド になります(後述「つまづきポイント」参照)。
if record_count < 100000:
# 少量データ: 単一ファイルに統合
optimized = datasource.toDF().coalesce(1)
else:
# 大量データ: 10万件あたり1パーティション
num_partitions = max(1, record_count // 100000)
optimized = datasource.toDF().coalesce(num_partitions)
JDBC パーティション分割(OOM 対策)
数千万件規模のテーブルでは、JDBC の単一タスクで全データを読み込むと OOM が発生します。Spark の numPartitions を使って読み込みを分散させます。
# MIN/MAX を取得してパーティション範囲を決定
bounds_query = f"""(SELECT MIN({partition_col}) AS min_val,
MAX({partition_col}) AS max_val
FROM {table_name}
WHERE {ts_col} >= '{last_run}') AS bounds"""
df = spark.read.jdbc(
url=jdbc_url, table=query,
column=partition_col,
lowerBound=int(lower_bound),
upperBound=int(upper_bound),
numPartitions=num_partitions, # 例: 100分割
properties=connection_properties
)
パーティション分割時は count() を呼ぶと全データをメモリに展開してしまうため、count() をスキップして直接 S3 に書き込む ようにしています。
Snapshot Export による初回全量ロード
なぜ Glue ETL で初回ロードしないのか
Glue ETL は JDBC 経由でソース DB に接続するため、数億件規模のデータを一括読み込みすると以下の問題が発生します。
- OOM: Worker のメモリを超過
- ソース DB への負荷: 本番 RDS に長時間の全テーブルスキャンをかける
- 実行時間: DMS の Full Load で 22.5 時間かかっていた規模
RDS Snapshot Export to S3 なら スナップショットから読み取るため本番 DB への負荷がゼロ で、日中作業が可能です。
クロスアカウント構成
ソースアカウントから分析環境の S3 バケットに直接 Export するため、クロスアカウントの IAM / KMS / S3 ポリシーの設定が必要です。
【ソースアカウント側】
├── KMS キー(Export 暗号化用)
│ └── キーポリシーで分析環境の Glue ロールに kms:Decrypt を許可
└── IAM ロール(Export 実行用)
├── 信頼ポリシー: export.rds.amazonaws.com
└── 権限: S3 書き込み + KMS 暗号化
【分析環境アカウント側】
└── S3 バケットポリシー
└── ソースアカウントの Export ロールに書き込みを許可
Export → フラッテン → ETL 統合の流れ
Snapshot Export の出力は rds-snapshot-export/<source>/<export-id>/<db>/<db>.<table>/ という深いパス構造になります。これを Glue ETL の incremental パスにフラッテン移動して統合します。
# Export データを incremental パスに移動(_SUCCESS ファイルは除外)
aws s3 mv \
"s3://${BUCKET}/rds-snapshot-export/${SOURCE}/${EXPORT_ID}/${DB}/${DB}.${TABLE}/" \
"s3://${BUCKET}/${SOURCE}_incremental/${TABLE}/" \
--recursive --exclude "_SUCCESS"
移動後、Glue Crawler を実行してカタログに登録し、Athena で MAX(timestamp) を確認して last_run_timestamp.txt に設定すれば、以降は Glue ETL が差分を同じパスに書き込みます。
実績値
| 対象 | データ量 | 所要時間 |
|---|---|---|
| プロダクトA(最大テーブル) | ~8 億件 / 1.2 TB | Export ~1 時間 |
| プロダクトB-4 | 3,992 GB | Export 数時間 |
| プロダクトB-2/B-3 | 835 GB | Export 数時間 |
| プロダクトB-1 (PostgreSQL) | 468 GB | Export 数時間 |
つまづきポイントと解決策
1. PostgreSQL Parquet スキーマ不一致
症状: PostgreSQL ソースで Snapshot Export データと Glue ETL の差分データが同一テーブルに混在すると、Athena で HIVE_CURSOR_ERROR が発生。
原因: Aurora Snapshot Export は PostgreSQL の timestamp 型を Parquet の string として出力しますが、PySpark の JDBC は timestamp[ns] として出力します。同一パスに異なるスキーマの Parquet ファイルが混在するため、Athena がスキーマ不整合でエラーになります。
解決策: PostgreSQL の場合のみ、ETL スクリプト内で timestamp カラムを string にキャストして Snapshot Export と型を統一。
def cast_timestamps_to_string(df, table_name):
"""PostgreSQL の timestamp カラムを string にキャスト"""
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql.functions import col
ts_columns = [f.name for f in df.schema.fields
if isinstance(f.dataType, TimestampType)]
if ts_columns:
for c in ts_columns:
df = df.withColumn(c, col(c).cast(StringType()))
return df
# メイン処理内で PostgreSQL の場合のみ適用
if db_engine in ('postgresql', 'postgres'):
df = cast_timestamps_to_string(df, table_name)
MySQL ソースでは Snapshot Export と JDBC の Parquet スキーマが一致するため、この問題は発生しません。
2. Spark の過剰パーティション化(6 件で 7 時間)
症状: 週次の増分データが 6 件しかないのに、Glue Job の実行に 7 時間 かかる。
原因: Spark がデフォルトで大量のパーティション(空のパーティション含む)を作成し、各パーティションの S3 書き込みのオーバーヘッドが積み重なる。
解決策: coalesce() でパーティション数を明示的に制御。
# 改善前: Spark デフォルトの多数パーティションで書き込み
glueContext.write_dynamic_frame.from_options(frame=datasource, ...)
# 改善後: 少量データは 1 ファイルに統合
optimized = DynamicFrame.fromDF(
datasource.toDF().coalesce(1), glueContext, "optimized"
)
glueContext.write_dynamic_frame.from_options(frame=optimized, ...)
3. Worker 数を増やしたら逆に遅くなった
症状: あるソースの実行時間が 15 時間かかっていたため、Worker 数を 2 → 10 に増やしたが、逆に 13 時間に悪化。
原因: WHERE updated_at >= 'xxx' のカラムにインデックスがなく、全テーブルフルスキャン が発生していた。Worker を増やすと並列でフルスキャンするため、ソース RDS の負荷が増大して逆効果。
解決策: ソース DB に適切なインデックスを追加。
-- 1.4億行のテーブルに created_at 専用インデックスを追加
CREATE INDEX large_table_created_at_index ON large_table (created_at);
結果: 15 時間 → 1 分(約 900 倍改善)。
4. メタデータパスの DB_NAME 誤り
症状: Glue ETL がデフォルトの「1 週間前」として大量データを取得してしまう。
原因: last_run_timestamp.txt のパスに使う DB_NAME は Secrets Manager の dbname 値(例: myapp_core)であって、データソースの通称(myapp_log 等)ではなかった。パスを間違えると NoSuchKey になり、初回実行と同じ挙動になる。
解決策: パスの命名規則をドキュメントに明記。
✅ glue-metadata/{Secrets Manager の dbname}/{table_name}/last_run_timestamp.txt
❌ glue-metadata/{データソース名}/{table_name}/last_run_timestamp.txt
コスト比較・成果
Before / After
| 項目 | DMS (Before) | Glue バッチ ETL (After) |
|---|---|---|
| 月額コスト | $310 | $12 |
| 削減率 | - | 96% |
| アーキテクチャ | Replication Instance 常時稼働 | 週 1 回バッチ実行 |
| 同期頻度 | リアルタイム (CDC) | 週次 |
| 初回ロード | DMS Full Load(22.5 時間) | Snapshot Export(~1 時間) |
| ソース DB への負荷 | 常時(binlog 読み取り) | 週 1 回(JDBC クエリ) |
| 運用負荷 | 高(CDC 監視・復旧) | 低(Slack 通知で完結) |
コスト内訳
| リソース | 月額 |
|---|---|
| Glue Job 実行(6 Job × 週 1 回) | ~$10-15 |
| Lambda (Slack 通知) | < $1 |
| 合計 | ~$12 |
データソース規模
| ソース | DB エンジン | テーブル数 |
|---|---|---|
| プロダクトA | MySQL (RDS) | 2 |
| プロダクトB-1 | PostgreSQL (Aurora) | 26 |
| プロダクトB-2 | MySQL (Aurora) | 28 |
| プロダクトB-3 | MySQL (Aurora) | 6 |
| プロダクトB-4 | MySQL (Aurora) | 2 |
| 合計 | 64 |
まとめ
DMS から Glue バッチ ETL への移行で得られた知見をまとめます。
設計判断
- 「リアルタイムが本当に必要か」を最初に確認する — 分析用途では週次バッチで十分なケースが多い。「たぶん必要だろう」で CDC を選ぶと、不要なコストと複雑さを抱えることになる
- 初回全量ロードは Snapshot Export — JDBC で数億件を読むのは危険。スナップショット経由なら本番 DB への負荷ゼロ
- Snapshot と差分を同一パスに統合 — 別テーブルにせず同じ S3 プレフィックスに書き込むことで、Athena からシームレスにクエリ可能
実装 Tips
-
coalesce()でパーティション数を制御 — Spark のデフォルトは少量データでは過剰。明示的に制御しないと書き込みに何時間もかかる -
JDBC パーティション分割で OOM 回避 — 大量データは
numPartitionsで読み込みを分散、count()をスキップして直接書き込む - Worker 数より先にインデックス — DB 側のインデックスがなければ、Glue の Worker をいくら増やしても効果なし
運用上の注意
-
PostgreSQL × Snapshot Export は型に注意 —
timestampがstringになるため、ETL 側でキャスト処理が必要 - メタデータのパス命名を統一 — DB 名の指し間違いで意図しない全量取得が発生する
Discussion