データ分析基盤を作る DBのS3エクスポート編
こんにちは、zinです🦑
分析基盤を作るにあたり、まずは弊社サービスLinyのメインデータベース(Aurora MySQL)をAthenaでクエリできるようにしよう!ということでAuroraのS3エクスポート機能を使ってみました。
S3エクスポートを使うための準備や、やってみてわかったことなどをまとめておこうと思います。
やることは大まかに以下の3点、Glue Crawlerについてはハマりどころはなさそうだったので、S3エクスポートと後処理について書きます。
- AuroraのS3エクスポートを使ってデータをS3に吐き出す
- 細かいParquetをまとめなおす
- Glue Crawlerでスキーマ・パーティションを更新する
AuroraのS3エクスポートを使ってデータをS3に吐き出す
本気でパフォーマンスのことを考えるとGlue JobやEMRを使って自力でデータを抜き取るのが良さそうです。
ただしMySQLのエクスポートを自前で作るにはやや凝った実装が必要になるため、とりあえず早めに動かしてみたい現状には合っていません。
サクッとデータを出すのに便利なものはないかなあと思っていたところ、AuroraのS3エクスポートというものがあったのでこちらを使ってみることにしました。
AuroraのデータをApache Parqet形式でS3に吐き出してくれる超便利なやつだぜ!とのことです。
準備
S3エクスポートを使うにあたり、いくつか準備が必要なのでTerraformで作っていきます。
出力先のS3、エクスポート用のIAMロール、暗号化用のKMSを作成します。
# S3
resource "aws_s3_bucket" "example" {
bucket = "example"
}
resource "aws_s3_bucket_public_access_block" "example" {
bucket = aws_s3_bucket.example.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# IAM
resource "aws_iam_role" "rds_export" {
name = "rds-export"
assume_role_policy = data.aws_iam_policy_document.assume_rds_export.json
inline_policy {
name = "s3"
policy = data.aws_iam_policy_document.rds_export.json
}
}
data "aws_iam_policy_document" "rds_export" {
statement {
sid = "S3Export"
effect = "Allow"
actions = [
"s3:DeleteObject*",
"s3:GetBucketLocation",
"s3:GetObject*",
"s3:ListBucket",
"s3:PutObject*",
]
resources = [
aws_s3_bucket.example.arn,
"${aws_s3_bucket.example.arn}/rds_export/*",
]
}
}
data "aws_iam_policy_document" "assume_rds_export" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["export.rds.amazonaws.com"]
}
}
}
# KMS
resource "aws_kms_key" "rds_export" {
description = "rds_exportkey"
key_usage = "ENCRYPT_DECRYPT"
deletion_window_in_days = 7
is_enabled = true
enable_key_rotation = true
}
resource "aws_kms_alias" "rds_export" {
name = "alias/rds-export"
target_key_id = aws_kms_key.rds_export.key_id
}
S3については監査や個人情報保護法対応のためいくつかの追加設定をしています。
本題からは逸れますが情報がまとまっている記事があまりなかったので備忘のため一緒に載せておきます、不要な場合は読み飛ばしてください。
# バージョニング(誤削除からの保護)
resource "aws_s3_bucket_versioning" "example" {
bucket = aws_s3_bucket.example.id
versioning_configuration {
status = "Enabled"
}
}
# オブジェクト操作ログ(監査)
resource "aws_s3_bucket_logging" "example" {
bucket = aws_s3_bucket.example.id
target_bucket = aws_s3_bucket.example_logging_destination.id
target_prefix = "example/"
}
resource "aws_s3_bucket" "example_logging_destination" {
bucket = "example-logging-destination"
}
resource "aws_s3_bucket_public_access_block" "example_logging_destination" {
bucket = aws_s3_bucket.example_logging_destination.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# ライフサイクル(節約ほか)
# https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/object-lifecycle-mgmt.html
resource "aws_s3_bucket_lifecycle_configuration" "example" {
bucket = aws_s3_bucket.example.id
# Intelligent-Tiering
rule {
id = "mvIntelligentTiering"
status = "Enabled"
transition {
days = 0
storage_class = "INTELLIGENT_TIERING"
}
}
# バージョニングが有効なバケットでの完全削除用
# 個人情報保護法対応でデータ削除が発生するため(削除の仕組みはこの記事の範囲外とします)
rule {
id = "rmRemnants"
status = "Enabled"
noncurrent_version_expiration {
noncurrent_days = 3 # 誤削除からの復旧用で少しバッファ
}
expiration {
expired_object_delete_marker = true
}
abort_incomplete_multipart_upload {
days_after_initiation = 1
}
}
}
動かしてみる
エクスポートを実行すると、裏でクラスターをクローンしてからクローンのデータをS3に吐き出しているようです。
まだ本番環境では数回しか動かしていませんが、途中で失敗するようなことは今のところありません。
task_identifier=example-yyyy-mm-dd
cli_input=$(cat <<EOF
# タスクに一意な名前をつける
ExportTaskIdentifier: ${task_identifier}
# S3バケット(上で作ったもの プレフィックスはロールで許可したものと揃える)
S3BucketName: example
S3Prefix: rds_export
# エクスポート用ロールARN(上で作ったもの)
IamRoleArn: arn:aws:iam::111111111111:role/rds-export
# KMSキーID(上で作ったもの)
KmsKeyId: arn:aws:kms:ap-northeast-1:111111111111:alias/rds-export
# エクスポートするAuroraクラスターARN
SourceArn: arn:aws:rds:ap-northeast-1:111111111111:cluster:target-aurora-cluster
# エクスポート対象のデータベース(省略可)
ExportOnly:
- target_db
EOF
)
aws rds start-export-task --cli-input-yaml "${cli_input}"
##### 完了を待つ場合 #####
function get_export_task_status() {
aws rds describe-export-tasks --export-task-identifier "${task_identifier}" \
| jq -r '.ExportTasks[0].Status'
}
exit_code=0
while :
do
case "$(get_export_task_status)" in
COMPLETE)
break
;;
CANCELED|FAILED)
exit_code=1
break
;;
STARTING|IN_PROGRESS|CANCELING)
echo "$(date +%Y-%m-%dT%H:%M:%S) sleep..."
sleep 300
;;
*)
echo "No export task: '${task_identifier}'"
exit_code=1
;;
esac
done
exit "${exit_code}"
動かしてわかった問題点
ここまでで自前で実装するのに比べると非常に便利というのがわかりましたが、実際に使っていくにはいくつか問題がありました。
- 巨大テーブルではファイルが細切れ(大量)になる
小さいテーブルでは問題なさそうでしたが、数千万や億オーダー行数を抱えるテーブルでは後半に出力されるファイルがかなり小さく、Athenaで扱うには気になるファイル数になってしまいました。
これは対応が必要なのでこの後に続きます。 - そもそものファイル命名規則が扱いにくい
エクスポートのファイル命名規則は以下のようになっており、partition_index以降は変わる可能性があるので当てにならないとのことです。
export_identifier/database_name/schema_name.table_name/partition_index/part-00000-random_uuid.format-based_extension
これでは扱いにくいので、ファイル細切れ問題の対応と一緒にパスの変更します。 - クラスターが大きいとエクスポートに時間がかかることがある
弊社のメインデータベースではエクスポート完了までに10時間以上かかっていました。
現状はお試し環境構築中のためスルーしましたが、もっと大きなクラスターを対象とする場合や毎日エクスポートを動かしたいような場合は別の方法を検討する必要がありそうです。 - 特定の時間のスナップショット のような取り方は(単体では)できない
例えば「毎日00:00時点のデータをエクスポートしたい」など、日時を指定してエクスポートするというのはできないようでした。
これについてはポイントインタイムリカバリ(PITR)との組み合わせで実現できるので別途記事にしようと思います。
スナップショットからのS3エクスポート機能もあるため、うまくはまればそちらを使うという手もあります。
ということで、AthenaでクエリするためにGlue Jobでファイルの出し直しをすることにしました。
ある程度大きなデータを抱えると一筋縄ではいかないということですね。
細かいParquetをまとめなおす
想定より巨大な記事になってしまったので詳細は別の記事にしようと思いますが、重要なポイントだけかいつまんでおきます。
大量の細切れファイルを扱う際、GlueのDynamicFrameを使うと普通にSparkでreadするよりも高速なようです。
時間的な都合で詳細な検証や比較はサボってしまったのですが、一般的にSparkは細切れファイル苦手なのでドキュメントを信じてみました。
ただし、DynamicFrameのwrite(write_dynamic_frame)だと圧縮オプションにgzipを渡しても無視されてしまったので、DataFrameに変換しています。
(エクスポート後のサイズがそれなりに大きかったので、コスト優先でsnappyではなくgzipにしています)
S3エクスポートすると export_identifier/export_tables_info_*.json
という名前でメタデータが一緒に出力されます。
repartition(coalesce)するためにテーブルサイズを使っているのですが、メタデータから取得したテーブルサイズをジョブの起動時に渡して使いました。
shuffle_partitions = 16 # see spark.sql.shuffle.partitions
max_partition_bytes = 134217728 # 128MiB
# ここは引数で渡すなど、テーブルごとに設定
target = {
"src_path": "s3://bucket/export_identifier/database_name/schema_name.table_name/",
"total_bytes": 123456789,
"dst_path": "s3://bucket/database_name/table_name/dt=yyyy-mm-dd/",
}
df = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
connection_options={
"paths": [target["src_path"]],
"groupFiles": "inPartition",
"groupSize": max_partition_bytes,
},
format="parquet",
).toDF()
# ファイルが多すぎたら減らす
# 128MiB付近でまとめたいが、num_partitionsが小さすぎるとファイルが巨大化するのである程度のばらつきは許容する
num_partitions = max(
int(target["total_bytes"] / max_partition_bytes),
shuffle_partitions,
)
if num_partitions < df.rdd.getNumPartitions():
df = df.coalesce(num_partitions)
# 書き込み
write_options = {"compression": "gzip"}
df.write.mode("overwrite").options(**write_options).parquet(target["dst_path"])
まとめなおしにどれくらい効果があったかというと、一番行数が多いテーブルで70000ファイルあったものが3000ファイルまで削減できました。(3000ファイルも大量には変わりないですが...)
今回は時間の都合で検証をスキップしてしまいましたが、S3DistCpでも同様のことが実現できそうだったので自分用メモ。。
おわりに
この記事ではS3エクスポートにフォーカスしてまとめてきました。後半端折りがちになってしまいましたがこれからデータ基盤を作る方の参考になれば幸いです。
プロダクション環境のデータベースには顧客の個人情報が含まれているので、実用にはS3にファイルを吐き出すだけでは不十分です。
Lakeformationでテーブル・カラムのアクセス制御を行っているので、別の記事でまとめたいと思います。
Discussion