BigQuery Data Transfer Serviceを使ってAmazon AuroraからBigQueryにデータを転送する
やりたいこと
- Google AnalyticsやサービスのデータをもとにBigQueryで分析したい
- そのためにAmazon AuroraにあるデータをBigQueryに転送したい
- 分析用途なので転送頻度は1回/日以下で良い
- Auroraには追加で負荷があまりかからないようにしたい
- なるべくコードを書きたくない
方針
Auroraにあまり負荷をかけないようにしようとすると、Auroraが自動で取っている日次のスナップショットが活用できるんじゃないか?と考えた。ラッキーなことにこのスナップショットは、S3にExportすることができる(2020年1月ごろにできるようになった模様)。この機能ではスナップショットをPARQUET形式でExportされる。あまり詳しくないので軽く調べた感じだと、PARQUET形式はデータ容量的にかなり効率の良い形式だそうだ。
S3においたPARQUET形式のデータをBigQueryに読み込めればよくなったわけだが、これまた丁度いい機能がGCPには存在した。BigQuery Data Transfer Serviceを使えば、S3にあるPARQUETファイルを読み込んでBigQueryに送ることができる。しかも、日次など24時間以上の間隔であれば定期的に実行させることができる。データ転送量などはかかるが、このサービス自体は無料。
ということでこの2つを組みあせてて以下のような流れで実施する。
- 日次の任意のタイミングでLambda等を起動する
- Amazon Auroraが自動で取っている日次のスナップショットをS3にExportする
- GCPのBigQuery Data Transfer Serviceでそのファイルを読み込む
実装をする前に、この流れを一度コンソール上からやってみることをオススメする。S3にはどのように吐かれるのか、Exportする際に必要なものはなんなのか、Transfer Serviceはどういう感じで動くのかなどが見えてくる。基本的にはドキュメントのとおりにやればよいが、いくつかハマったことがある。
コンソールで試す
コンソールからポチポチやってみた。AuroraのスナップショットをS3に吐くのは簡単だった。ただめちゃくちゃ時間がかかり、30分くらいかかった。ファイル形式の変換に時間がかかるのだろうか。詳細は不明。
👆で吐かれたPARQUETファイルの1つをTransfer Serviceで読み込もうとすると PERMISSION_DENIED: [AccessDenied: Access Denied]
のようなエラーが出て転送ができない。
-
S3ReadOnlyAccess
をつけているので足りるはずだった(ドキュメントにもそう書いてる) - AWS CloudTrailを見ると
kms:Decrypt
が足りないというエラーが出ている - BigQuery Data Transfer Serviceに設定する AWSユーザー の権限に
kms:Decrypt
を追加する- 対象はあとの方(👇)で出てくる
StartExportTask
で指定するKmsKeyId
- 対象はあとの方(👇)で出てくる
この対応をすると転送ができた。ちなみに、事前にBigQueryにデータセットとテーブルを用意する必要がある。必ず WRITE_APPEND
で取り込まれるようなので、取り込み時間でのパーティションをテーブルに設定しておくと良い。ひとまず空テーブルでスキーマも指定なしで作成した。
BigQueryに転送されたデータは、スキーマの型はボチボチあっていてすべてNULLABLEになっていた。
これを見る限り、転送時のS3 URIの設定でワイルドカードが使えるようになっているが結局1つのテーブルに突っ込まれるので、Auroraの1テーブルにつき1つの転送設定が必要そうだと判断した。
ちなみに、試しにワイルドカードを使って複数のRDBテーブルを1つのBigQueryのテーブルに突っ込んでみようとしたが、S3からGCPへのファイル転送はうまくいくが読み込みがうまくいかなかった(0 jobs succeededになった)。
自動化する
Aurora Snapshot to S3
AuroraのスナップショットをS3にExportする部分を自動化するにはコードを書く必要がある。TypeScriptで実装したらこんな感じになる。
import { DescribeDBClusterSnapshotsCommand, RDSClient, StartExportTaskCommand } from '@aws-sdk/client-rds'
export const requestToExportSnapshotToS3 = async () => {
const client = new RDSClient({ region: 'Auroraのリージョン' })
// 最新のSnapshotを取得
const describeCommand = new DescribeDBClusterSnapshotsCommand({
DBClusterIdentifier: 'AuroraのDBクラスターのID',
})
const describeResult = await this.client.send(describeCommand)
if (!describeResult.DBClusterSnapshots) return
const latestSnapshot = describeResult.DBClusterSnapshots.sort(
// 降順に並び替えたい
(a, b) => b.SnapshotCreateTime?.getTime() ?? 0 - (a.SnapshotCreateTime?.getTime() ?? 0)
)[0]
const snapshotArn = latestSnapshot?.DBClusterSnapshotArn
if (!snapshotArn) return
// Exportをリクエスト
// 詳しくは👇のドキュメント
// https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_StartExportTask.html
// taskIDには日付をいれる。Transfer Serviceで使える変数が日時系だけなため。
const taskID = `export-id-YYYYMMDD`
const exportCommand = new StartExportTaskCommand({
ExportTaskIdentifier: taskID,
// 👇コンソールからS3 Exportするときに作れるのでそれを使うのが楽
IamRoleArn: 'S3Exportのためのロール',
// database とか table とか指定できる
ExportOnly: ['database_name'],
S3BucketName: 'Export先のバケット',
// カスタムキーなら何でもよさそうだったけどDBの暗号化に使ってるキーにした
KmsKeyId: '暗号化キーのID',
SourceArn: snapshotArn,
})
return this.client.send(exportCommand)
}
これを適当に日次で発火させればよい。CloudWatchとLambdaとか使えばできる。大事なのは👆の処理を実行するロールの権限。
- DBの暗号化に使っているKMSのキーユーザーに、Export処理を実行するロールを追加(SDKをたたくユーザー/ロール)
- KMSKeyNotAccessibleFaultが出るため
- 👆のExport処理を実行するロールに次を許可
-
kms:DescribeKey
(上のキーが対象リソース) -
kms:CreateGrant
(上のキーが対象リソース) -
iam:PassRole
(StartExportTask実行時に必要) rds:StartExportTask
-
rds:DescribeDBClusterSnapshots
(対象のSnapshotのArnを取得するため)
-
これらを付与すれば実行できた。
BigQuery Data Transfer Serviceの設定
S3のURIだけちょっと考える必要がある。👆の実装だとこんな感じの指定になる。気をつけたいのは run_time
変数はUTCなので、Transfer Serviceを動かすタイミングによっては👆で設定していたtaskIDの YYYYMMDD
とうまく合わせる必要がある。
s3://your_bucket_name/export-id-{run_time|"%Y%m%d"}/database_name/schema_name.table_name/*.parquet
あとは動かせば指定したBigQueryのテーブルにデータが入っている。
※ 追記(2022/11/09)
なにやら2022/10/21頃からエクスポートされたS3ファイルのURIが変わったようだ。1というディレクトリが増えている。手元のデータを確認した限りではすべて1のみだったので一旦以下で対応した。
s3://your_bucket_name/export-id-{run_time|"%Y%m%d"}/database_name/schema_name.table_name/1/*.parquet
おわりに
CloudTrailに助けられまくった。できるだけ最小権限をつけるために、Access DeniedになるたびにCloudTrailを見て、どのアクションがどのリソースにアクセスしてエラーになるのかを逐一確認した。CloudTrailはデバッグにも使えるすばらしいサービス。
AuroraからのExportだったが、DynamoDBでもできるかなと思って軽く見てみたところ、一応DynamoDBからS3へのExportは存在いるが、DYNAMO_JSONという少し特殊な形式で出力されるようだった。データ形式の変換レイヤーが必要になりそうで今回のケースよりは面倒になりそうだ。
{"Item":{"id":{"S":"83369704-ccef-4163-8c20-24d240c10806"},"name":{"S":"Leopoldine Blümel-Mude"}}}
{"Item":{"id":{"S":"1f187e30-b279-4bfa-85dc-e347092f364f"},"name":{"S":"Theo Trommler"}}}
{"Item":{"id":{"S":"88942fa5-e8be-4b29-8869-6abdc662eaea"},"name":{"S":"Herr Franz-Peter Thanel B.A."}}}
Discussion
参考にできて凄く助かる神記事でした!
ご存知かも&記事の本質とは全然関係ないところなのですが、
b.SnapshotCreateTime?.getTime() ?? 0 - (a.SnapshotCreateTime?.getTime() ?? 0)
のところ、前半もカッコで囲っておかないとソートが昇順になってしまうかもです。