🔄

分析基盤の DMS をやめて Glue バッチETL にしたら月額 $310 → $12 になった

に公開

はじめに

複数の AWS アカウントで運用されている RDS / Aurora のプロダクトデータを、分析専用アカウントに集約して Amazon Athena でクエリできるようにするデータ基盤を構築しています。

当初は AWS DMS (Database Migration Service) を使ってリアルタイム同期していました。しかし、これは 分析チームへの要件確認が不十分なまま「リアルタイム同期が必要だろう」と思い込んで設計した結果 でした。実際に運用を始めてから分析チームにヒアリングしたところ、週次更新で十分だと分かり、以下の課題が浮き彫りになりました。

  • DMS の Replication Instance を常時稼働させるコスト: 月額 $310(週次バッチなら不要)
  • CDC (Change Data Capture) のオペレーション負荷(レプリケーション遅延の監視、タスク再起動など)
  • そもそも 分析用途にリアルタイム同期は不要 だった

結果、DMS を撤廃して Glue バッチ ETL + RDS Snapshot Export に移行し、月額コストを $12 まで削減しました。本記事では設計・実装・つまづきポイントをまとめます。

対象読者

  • マルチアカウント環境でデータ分析基盤を構築したい方
  • DMS のコストに悩んでいる方
  • Glue ETL の実践的なノウハウを知りたい方

規模感

項目
データソース数 6(MySQL × 4、PostgreSQL × 1、MariaDB × 1)
テーブル数 64
最大テーブル規模 約 8 億件 / 1.2 TB(Parquet 圧縮後)
スケジュール 毎週日曜 2:00 JST

アーキテクチャ概要

全体構成図

ポイント: Glue Job は VPC の Private Subnet 内で起動し、VPC ピアリング経由で各ソース RDS に JDBC 接続します。NAT Instance(NAT Gateway ではなくコスト最適化のため t3.nano を使用)経由で S3 / Secrets Manager などの AWS サービスにアクセスします。

Snapshot Export フロー(初回全量ロード)

初回全量ロードと差分取り込みのデータが 同一 S3 パスに統合 されるのがポイントです。Athena からは 1 つのテーブルとしてシームレスにクエリできます。

週次 ETL 実行フロー

なぜ DMS をやめたか

DMS のコスト構造

DMS は Replication Instance を常時稼働 させる必要があります。データ転送がない時間帯もインスタンス料金が発生するため、週次バッチのような「たまに動かす」ユースケースではコスト効率が悪くなります。

項目 コスト/月
Replication Instance (dms.t3.medium) ~$70
ストレージ + データ転送 ~$10
複数ソースで合計 ~$310

CDC のオペレーション負荷

CDC (Change Data Capture) はリアルタイム同期には強力ですが、運用負荷があります。

  • レプリケーション遅延の監視
  • ソース DB のスキーマ変更時にタスクの再作成が必要
  • binlog の保持期間管理
  • レプリケーションタスクのエラー時の手動復旧

「週次で十分」だった

正直に言うと、これは最初の要件定義の段階で確認すべきことでした。「分析基盤だからリアルタイム同期が必要だろう」と思い込み、DMS + CDC という構成を選択してしまいました。

運用開始後に分析チームに改めてヒアリングしたところ、週次更新で十分 ということが分かりました。リアルタイム性が不要なら、CDC のコストと複雑さを負う必要はありません。DMS の月額 $310 は、要件確認不足のコストでもありました。

Glue バッチ ETL の設計

増分取得ロジック

S3 上のメタデータファイルで前回実行時刻を管理し、WHERE timestamp >= last_run で増分データを取得するシンプルな設計です。

S3: glue-metadata/{db_name}/{table_name}/last_run_timestamp.txt
→ "2026-02-16 02:00:00"
def get_last_run_timestamp(table_name):
    """S3から前回実行時刻を取得"""
    metadata_key = f"glue-metadata/{db_name}/{table_name}/last_run_timestamp.txt"
    try:
        response = s3.get_object(Bucket=metadata_bucket, Key=metadata_key)
        return response['Body'].read().decode('utf-8').strip()
    except s3.exceptions.NoSuchKey:
        # 初回実行: 1週間前から取得
        return (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')

DynamoDB やパラメータストアではなく S3 のテキストファイル にしたのは、追加コストゼロでシンプルだからです。Glue Job のサマリー情報(レコード件数など)も同様に S3 に JSON で保存し、Lambda の Slack 通知から参照しています。

単一スクリプトで複数ソース対応

62 テーブルに対して 62 個の Glue Job を作るのではなく、1 つの Python スクリプト で MySQL / PostgreSQL 両方に対応しています。

# DB_ENGINE に応じて JDBC URL とドライバーを切り替え
if db_engine in ('postgresql', 'postgres'):
    jdbc_url = f"jdbc:postgresql://{host}:{port}/{db_name}"
    connection_properties = {
        "user": user, "password": password,
        "driver": "org.postgresql.Driver"
    }
else:
    jdbc_url = f"jdbc:mysql://{host}:{port}/{db_name}"
    connection_properties = {
        "user": user, "password": password,
        "driver": "com.mysql.jdbc.Driver",
        "useSSL": "false", "fetchsize": "10000"
    }

テーブルの指定は 単一テーブルモード--TABLE_NAME)と 複数テーブルモード--TABLE_LIST)の 2 パターンをサポートしています。プロダクト A は 1 テーブル 1 Job、プロダクト B は DB ごとに 1 Job で複数テーブルを順次処理する構成です。

Job テーブル数 Worker 数 実行時間
プロダクトA (テーブル1) 1 10 ~2 分
プロダクトA (テーブル2) 1 10 ~2 分
プロダクトB-1 (PostgreSQL) 26 5 ~7 分
プロダクトB-2 28 5 ~2 分
プロダクトB-3 6 3 ~1 時間
プロダクトB-4 2 2 ~1 分

Spark パーティション最適化(coalesce)

Spark はデフォルトで多数のパーティションにデータを分割しますが、少量データではこれが 致命的なオーバーヘッド になります(後述「つまづきポイント」参照)。

if record_count < 100000:
    # 少量データ: 単一ファイルに統合
    optimized = datasource.toDF().coalesce(1)
else:
    # 大量データ: 10万件あたり1パーティション
    num_partitions = max(1, record_count // 100000)
    optimized = datasource.toDF().coalesce(num_partitions)

JDBC パーティション分割(OOM 対策)

数千万件規模のテーブルでは、JDBC の単一タスクで全データを読み込むと OOM が発生します。Spark の numPartitions を使って読み込みを分散させます。

# MIN/MAX を取得してパーティション範囲を決定
bounds_query = f"""(SELECT MIN({partition_col}) AS min_val,
                           MAX({partition_col}) AS max_val
                    FROM {table_name}
                    WHERE {ts_col} >= '{last_run}') AS bounds"""

df = spark.read.jdbc(
    url=jdbc_url, table=query,
    column=partition_col,
    lowerBound=int(lower_bound),
    upperBound=int(upper_bound),
    numPartitions=num_partitions,  # 例: 100分割
    properties=connection_properties
)

パーティション分割時は count() を呼ぶと全データをメモリに展開してしまうため、count() をスキップして直接 S3 に書き込む ようにしています。

Snapshot Export による初回全量ロード

なぜ Glue ETL で初回ロードしないのか

Glue ETL は JDBC 経由でソース DB に接続するため、数億件規模のデータを一括読み込みすると以下の問題が発生します。

  • OOM: Worker のメモリを超過
  • ソース DB への負荷: 本番 RDS に長時間の全テーブルスキャンをかける
  • 実行時間: DMS の Full Load で 22.5 時間かかっていた規模

RDS Snapshot Export to S3 なら スナップショットから読み取るため本番 DB への負荷がゼロ で、日中作業が可能です。

クロスアカウント構成

ソースアカウントから分析環境の S3 バケットに直接 Export するため、クロスアカウントの IAM / KMS / S3 ポリシーの設定が必要です。

【ソースアカウント側】
├── KMS キー(Export 暗号化用)
│   └── キーポリシーで分析環境の Glue ロールに kms:Decrypt を許可
└── IAM ロール(Export 実行用)
    ├── 信頼ポリシー: export.rds.amazonaws.com
    └── 権限: S3 書き込み + KMS 暗号化

【分析環境アカウント側】
└── S3 バケットポリシー
    └── ソースアカウントの Export ロールに書き込みを許可

Export → フラッテン → ETL 統合の流れ

Snapshot Export の出力は rds-snapshot-export/<source>/<export-id>/<db>/<db>.<table>/ という深いパス構造になります。これを Glue ETL の incremental パスにフラッテン移動して統合します。

# Export データを incremental パスに移動(_SUCCESS ファイルは除外)
aws s3 mv \
  "s3://${BUCKET}/rds-snapshot-export/${SOURCE}/${EXPORT_ID}/${DB}/${DB}.${TABLE}/" \
  "s3://${BUCKET}/${SOURCE}_incremental/${TABLE}/" \
  --recursive --exclude "_SUCCESS"

移動後、Glue Crawler を実行してカタログに登録し、Athena で MAX(timestamp) を確認して last_run_timestamp.txt に設定すれば、以降は Glue ETL が差分を同じパスに書き込みます。

実績値

対象 データ量 所要時間
プロダクトA(最大テーブル) ~8 億件 / 1.2 TB Export ~1 時間
プロダクトB-4 3,992 GB Export 数時間
プロダクトB-2/B-3 835 GB Export 数時間
プロダクトB-1 (PostgreSQL) 468 GB Export 数時間

つまづきポイントと解決策

1. PostgreSQL Parquet スキーマ不一致

症状: PostgreSQL ソースで Snapshot Export データと Glue ETL の差分データが同一テーブルに混在すると、Athena で HIVE_CURSOR_ERROR が発生。

原因: Aurora Snapshot Export は PostgreSQL の timestamp 型を Parquet の string として出力しますが、PySpark の JDBC は timestamp[ns] として出力します。同一パスに異なるスキーマの Parquet ファイルが混在するため、Athena がスキーマ不整合でエラーになります。

解決策: PostgreSQL の場合のみ、ETL スクリプト内で timestamp カラムを string にキャストして Snapshot Export と型を統一。

def cast_timestamps_to_string(df, table_name):
    """PostgreSQL の timestamp カラムを string にキャスト"""
    from pyspark.sql.types import TimestampType, StringType
    from pyspark.sql.functions import col

    ts_columns = [f.name for f in df.schema.fields
                  if isinstance(f.dataType, TimestampType)]
    if ts_columns:
        for c in ts_columns:
            df = df.withColumn(c, col(c).cast(StringType()))
    return df

# メイン処理内で PostgreSQL の場合のみ適用
if db_engine in ('postgresql', 'postgres'):
    df = cast_timestamps_to_string(df, table_name)

MySQL ソースでは Snapshot Export と JDBC の Parquet スキーマが一致するため、この問題は発生しません。

2. Spark の過剰パーティション化(6 件で 7 時間)

症状: 週次の増分データが 6 件しかないのに、Glue Job の実行に 7 時間 かかる。

原因: Spark がデフォルトで大量のパーティション(空のパーティション含む)を作成し、各パーティションの S3 書き込みのオーバーヘッドが積み重なる。

解決策: coalesce() でパーティション数を明示的に制御。

# 改善前: Spark デフォルトの多数パーティションで書き込み
glueContext.write_dynamic_frame.from_options(frame=datasource, ...)

# 改善後: 少量データは 1 ファイルに統合
optimized = DynamicFrame.fromDF(
    datasource.toDF().coalesce(1), glueContext, "optimized"
)
glueContext.write_dynamic_frame.from_options(frame=optimized, ...)

3. Worker 数を増やしたら逆に遅くなった

症状: あるソースの実行時間が 15 時間かかっていたため、Worker 数を 2 → 10 に増やしたが、逆に 13 時間に悪化。

原因: WHERE updated_at >= 'xxx' のカラムにインデックスがなく、全テーブルフルスキャン が発生していた。Worker を増やすと並列でフルスキャンするため、ソース RDS の負荷が増大して逆効果。

解決策: ソース DB に適切なインデックスを追加。

-- 1.4億行のテーブルに created_at 専用インデックスを追加
CREATE INDEX large_table_created_at_index ON large_table (created_at);

結果: 15 時間 → 1 分(約 900 倍改善)。

4. メタデータパスの DB_NAME 誤り

症状: Glue ETL がデフォルトの「1 週間前」として大量データを取得してしまう。

原因: last_run_timestamp.txt のパスに使う DB_NAME は Secrets Manager の dbname 値(例: myapp_core)であって、データソースの通称(myapp_log 等)ではなかった。パスを間違えると NoSuchKey になり、初回実行と同じ挙動になる。

解決策: パスの命名規則をドキュメントに明記。

✅ glue-metadata/{Secrets Manager の dbname}/{table_name}/last_run_timestamp.txt
❌ glue-metadata/{データソース名}/{table_name}/last_run_timestamp.txt

コスト比較・成果

Before / After

項目 DMS (Before) Glue バッチ ETL (After)
月額コスト $310 $12
削減率 - 96%
アーキテクチャ Replication Instance 常時稼働 週 1 回バッチ実行
同期頻度 リアルタイム (CDC) 週次
初回ロード DMS Full Load(22.5 時間) Snapshot Export(~1 時間)
ソース DB への負荷 常時(binlog 読み取り) 週 1 回(JDBC クエリ)
運用負荷 高(CDC 監視・復旧) 低(Slack 通知で完結)

コスト内訳

リソース 月額
Glue Job 実行(6 Job × 週 1 回) ~$10-15
Lambda (Slack 通知) < $1
合計 ~$12

データソース規模

ソース DB エンジン テーブル数
プロダクトA MySQL (RDS) 2
プロダクトB-1 PostgreSQL (Aurora) 26
プロダクトB-2 MySQL (Aurora) 28
プロダクトB-3 MySQL (Aurora) 6
プロダクトB-4 MySQL (Aurora) 2
合計 64

まとめ

DMS から Glue バッチ ETL への移行で得られた知見をまとめます。

設計判断

  1. 「リアルタイムが本当に必要か」を最初に確認する — 分析用途では週次バッチで十分なケースが多い。「たぶん必要だろう」で CDC を選ぶと、不要なコストと複雑さを抱えることになる
  2. 初回全量ロードは Snapshot Export — JDBC で数億件を読むのは危険。スナップショット経由なら本番 DB への負荷ゼロ
  3. Snapshot と差分を同一パスに統合 — 別テーブルにせず同じ S3 プレフィックスに書き込むことで、Athena からシームレスにクエリ可能

実装 Tips

  1. coalesce() でパーティション数を制御 — Spark のデフォルトは少量データでは過剰。明示的に制御しないと書き込みに何時間もかかる
  2. JDBC パーティション分割で OOM 回避 — 大量データは numPartitions で読み込みを分散、count() をスキップして直接書き込む
  3. Worker 数より先にインデックス — DB 側のインデックスがなければ、Glue の Worker をいくら増やしても効果なし

運用上の注意

  1. PostgreSQL × Snapshot Export は型に注意timestampstring になるため、ETL 側でキャスト処理が必要
  2. メタデータのパス命名を統一 — DB 名の指し間違いで意図しない全量取得が発生する

参考リンク

株式会社スプリックス IT戦略部・SPRIX Enginieering Lab

Discussion