データ分析のためにAWS AuroraからParquet形式でエクスポートしてみた
はじめに
おはようございます、加藤です。データ分析の為にAuroraに格納されたデータをParquet形式でエクスポートする方法を調査しました。AWS CDKで実装したので、その方法を汎用化したサンプルコードを共有します。
背景
データ分析の為に複数のAWSアカウント上のAuroraに格納されたデータをParquet形式かつHive形式のパスをでエクスポートする必要がありました。エクスポート先はデータ分析の為のGoogle Cloud Storageです。
それぞれのAWSアカウントの管理者の負担を減らす為に、Google Cloud Storageに直接エクスポートするのではなく、データ分析側でAWSアカウントとS3バケットを用意し、そこにエクスポートしてもらうことにしました。
合わせてエクスポートの手段を提供するために、AWS CDKで実装し配布することにしました。この配布物にLambda関数が含まれていると、関数ランタイムのやAWS SDKなどのライブラリのバージョン管理が必要になるため、それを避けるために、Lambda関数を使わずに実装することにしました。
全体の流れ
- 各AWSアカウントのAuroraは日時でシステムスナップショットを取得する
- スナップショット作成完了イベントをトリガーに
rds:StartExportTask
APIを呼び出すStep Functionsを起動しエクスポートバケットへエクスポートする - エクスポートが完了したらエクスポートバケットから宛先バケットへコピーするStep Functionsを起動する
サンプルコード紹介
AWS→Google Cloud Storageの転送は対象外としAWSの部分のみ紹介します。
ここまで説明した流れはクロスアカウントですが、シングルアカウントの場合にしたサンプルコードを使って説明します。
今回のコード全体はこちらのリポジトリにあります。
https://github.com/intercept6/cdk-rds-startexporttask
依存リソースの作成
エクスポート対象となるAurora Clusterとエクスポート先およびコピー先のS3バケットとKMSキーを作成します。
import { CfnOutput } from "aws-cdk-lib";
import { Vpc } from "aws-cdk-lib/aws-ec2";
import { Key } from "aws-cdk-lib/aws-kms";
import {
AuroraPostgresEngineVersion,
ClusterInstance,
DatabaseCluster,
DatabaseClusterEngine,
} from "aws-cdk-lib/aws-rds";
import { Bucket } from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
export class DemoResources extends Construct {
public readonly vpc: Vpc;
public readonly db: DatabaseCluster;
public readonly exportKey: Key;
public readonly exportBucket: Bucket;
public readonly destinationKey: Key;
public readonly destinationBucket: Bucket;
constructor(scope: Construct, id: string) {
super(scope, id);
const vpc = new Vpc(this, "Vpc", {
maxAzs: 2,
natGateways: 1,
});
this.vpc = vpc;
const db = new DatabaseCluster(this, "Cluster", {
engine: DatabaseClusterEngine.auroraPostgres({
version: AuroraPostgresEngineVersion.VER_16_3,
}),
vpc,
writer: ClusterInstance.serverlessV2("Writer"),
enableDataApi: true,
serverlessV2MaxCapacity: 16,
defaultDatabaseName: "postgres",
});
if (db.secret?.secretFullArn != null) {
new CfnOutput(this, "DBSecretFullArn", {
value: db.secret.secretFullArn,
});
}
this.db = db;
const exportKey = new Key(this, "ExportKey");
this.exportKey = exportKey;
const exportBucket = new Bucket(this, "ExportBucket", {
encryptionKey: exportKey,
});
this.exportBucket = exportBucket;
const destinationKey = new Key(this, "DestinationKey");
this.destinationKey = destinationKey;
const destinationBucket = new Bucket(this, "DestinationBucket", {
encryptionKey: destinationKey,
});
this.destinationBucket = destinationBucket;
}
}
ExportTask
StartExportTaskを実行するためのStep Functionsを作成します。
import { Aws, Duration } from "aws-cdk-lib";
import { Rule } from "aws-cdk-lib/aws-events";
import { SfnStateMachine } from "aws-cdk-lib/aws-events-targets";
import {
PolicyDocument,
PolicyStatement,
Role,
ServicePrincipal,
} from "aws-cdk-lib/aws-iam";
import { IKey } from "aws-cdk-lib/aws-kms";
import { LogGroup } from "aws-cdk-lib/aws-logs";
import { IDatabaseCluster } from "aws-cdk-lib/aws-rds";
import { IBucket } from "aws-cdk-lib/aws-s3";
import {
Choice,
Condition,
DefinitionBody,
Fail,
IStateMachine,
JsonPath,
LogLevel,
Pass,
StateMachine,
Succeed,
TaskInput,
Wait,
WaitTime,
} from "aws-cdk-lib/aws-stepfunctions";
import {
CallAwsService,
StepFunctionsStartExecution,
} from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";
export interface RdsExportProps {
db: IDatabaseCluster;
export: {
bucket: IBucket;
key: IKey;
};
copyStateMachine: IStateMachine;
}
export class RdsExport extends Construct {
constructor(scope: Construct, id: string, props: RdsExportProps) {
super(scope, id);
// `rds:StartExportTask`APIを実行する為のIAMロールを作成します。
// こちらのドキュメントを確認し、必要な権限を付与します。
// https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/USER_ExportSnapshot.html
const exportRole = new Role(this, "ExportRole", {
assumedBy: new ServicePrincipal("export.rds.amazonaws.com"),
inlinePolicies: {
bucket: new PolicyDocument({
statements: [
new PolicyStatement({
actions: [
"s3:PutObject*",
"s3:ListBucket",
"s3:GetObject*",
"s3:DeleteObject*",
"s3:GetBucketLocation",
],
resources: [
props.export.bucket.bucketArn,
props.export.bucket.arnForObjects("*"),
],
}),
],
}),
kms: new PolicyDocument({
statements: [
new PolicyStatement({
actions: [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:CreateGrant",
"kms:DescribeKey",
"kms:RetireGrant",
],
resources: [props.export.key.keyArn],
}),
],
}),
},
});
// StartExportTaskを実行するTaskを作成します。
// 今回はクラスタのスナップショットに対してエクスポートを行うのでcluster-snapshotに対しての権限を付与します。
// また、iam:PassRoleの権限も付与します。
// https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonrds.html
const startTask = new CallAwsService(this, "StartExport", {
service: "rds",
action: "startExportTask",
parameters: {
ExportTaskIdentifier: JsonPath.format("snapshot-{}", JsonPath.uuid()),
SourceArn: JsonPath.stringAt("$.detail.SourceArn"),
S3BucketName: props.export.bucket.bucketName,
IamRoleArn: exportRole.roleArn,
KmsKeyId: props.export.key.keyArn,
},
additionalIamStatements: [
new PolicyStatement({
actions: ["iam:PassRole"],
resources: [exportRole.roleArn],
}),
],
iamResources: [
`arn:${Aws.PARTITION}:rds:${Aws.REGION}:${Aws.ACCOUNT_ID}:cluster-snapshot:*`,
],
});
// エクスポートの完了をポーリングするために、DescribeExportTaskを実行するTaskを作成します。
// ドキュメントによると、クラスターのスナップショットに対しての権限があれば良いようですが、権限不足になってしまいました。`arn:${Aws.PARTITION}:rds:${Aws.REGION}:${Aws.ACCOUNT_ID}:*`でも権限不足だったので、`*`にしています。
// https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonrds.html
const describeTask = new CallAwsService(this, "DescribeExportTask", {
service: "rds",
action: "describeExportTasks",
parameters: {
ExportTaskIdentifier: JsonPath.stringAt("$.ExportTaskIdentifier"),
},
iamResources: ["*"],
});
// DescribeExportTaskの結果を整形するためのTaskを作成します。
const processOutput = new Pass(this, "ProcessOutput", {
parameters: {
ExportTaskIdentifier: JsonPath.stringAt(
"$.ExportTasks[0].ExportTaskIdentifier"
),
Status: JsonPath.stringAt("$.ExportTasks[0].Status"),
SnapshotTime: JsonPath.stringAt("$.ExportTasks[0].SnapshotTime"),
S3Bucket: JsonPath.stringAt("$.ExportTasks[0].S3Bucket"),
S3Prefix: JsonPath.stringAt("$.ExportTasks[0].S3Prefix"),
SourceType: JsonPath.stringAt("$.ExportTasks[0].SourceType"),
},
});
// ポーリングの間隔を5分に設定するためのWaitを作成します。
const wait5min = new Wait(this, "Wait5min", {
time: WaitTime.duration(Duration.minutes(5)),
});
const jobFailed = new Fail(this, "Export Failed", {
cause: "RDS Export Failed",
error: "DescribeExportTasks returned FAILED",
});
const jobSucceeded = new Succeed(this, "Export Succeeded");
// エクスポート完了後にコピーするためのStep Functionsを呼び出すTaskを作成します。
const copyObjects = new StepFunctionsStartExecution(this, "CopyObjects", {
stateMachine: props.copyStateMachine,
input: TaskInput.fromObject({
s3Prefix: JsonPath.stringAt("$.ExportTaskIdentifier"),
snapshotTime: JsonPath.stringAt("$.SnapshotTime"),
}),
});
const exportComplete = new Choice(this, "Export Complete?");
// エクスポートに関するTaskを連結します。エクスポートステータスがCOMPLETEの場合はコピー処理を行います。
// ステータスがFAILED、CANCELEDの場合は処理を失敗で終了します。
const exportJob = startTask
.next(describeTask)
.next(processOutput)
.next(
exportComplete
.when(
Condition.stringEquals("$.Status", "COMPLETE"),
copyObjects.next(jobSucceeded)
)
.when(Condition.stringEquals("$.Status", "FAILED"), jobFailed)
.when(Condition.stringEquals("$.Status", "CANCELED"), jobFailed)
.otherwise(wait5min.next(describeTask))
);
// スナップショット作成完了イベントは全てのクラスターに対して発生し、クラスタ識別子でフィルタリングができません。そのため、Step Functionsの最初でスナップショット識別子からクラスタ識別子を取得し対象外の場合は終了します。
const describeDbClusterSnapshots = new CallAwsService(
this,
"DescribeDBClusterSnapshots",
{
service: "rds",
action: "describeDBClusterSnapshots",
parameters: {
DbClusterSnapshotIdentifier: JsonPath.stringAt("$.detail.SourceArn"),
},
iamAction: "rds:DescribeDBClusterSnapshots",
iamResources: [
`arn:${Aws.PARTITION}:rds:${Aws.REGION}:${Aws.ACCOUNT_ID}:cluster-snapshot:*`,
],
resultPath: "$.describeDbClusterSnapshots",
}
);
const choise = new Choice(this, "CheckDBClusterIdentifier");
const skip = new Succeed(this, "NotExpectedDBCluster", {
comment: "期待するDBクラスターのスナップショットではありませんでした",
});
choise.when(
Condition.not(
Condition.stringEquals(
"$.describeDbClusterSnapshots.DbClusterSnapshots[0].DbClusterIdentifier",
props.db.clusterIdentifier
)
),
skip
);
const definition = describeDbClusterSnapshots.next(
choise.otherwise(exportJob)
);
const stateMachine = new StateMachine(this, "StateMachine", {
definitionBody: DefinitionBody.fromChainable(definition),
timeout: Duration.hours(1),
tracingEnabled: true,
logs: {
destination: new LogGroup(this, "LogGroup"),
level: LogLevel.ALL,
includeExecutionData: true,
},
});
// StartExportTaskを呼び出す側にもKMSキーへの権限が必要なので、こちらにも権限を付与します。
// 呼び出し元にはここまでのActionsは不要な気がしますがドキュメントに記載されている通りに付与します。
// https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/APIReference/API_StartExportTask.html
stateMachine.addToRolePolicy(
new PolicyStatement({
actions: [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:CreateGrant",
"kms:DescribeKey",
"kms:RetireGrant",
],
resources: [props.export.key.keyArn],
})
);
// スナップショット作成完了のイベントをトリガーにStep Functionsを起動するためのRuleを作成します。
// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_Events.Messages.html
new Rule(this, "RdsSnapshot", {
eventPattern: {
source: ["aws.rds"],
detailType: ["RDS DB Cluster Snapshot Event"],
detail: {
EventID: [
"RDS-EVENT-0075" /*手動スナップショット作成完了 */,
"RDS-EVENT-0169" /*自動スナップショット作成完了 */,
],
},
},
targets: [new SfnStateMachine(stateMachine)],
});
}
}
CopyTask
エクスポートバケットから宛先バケットへコピーするStep Functionsを作成します。
import { Duration } from "aws-cdk-lib";
import { PolicyStatement } from "aws-cdk-lib/aws-iam";
import { IKey } from "aws-cdk-lib/aws-kms";
import { LogGroup } from "aws-cdk-lib/aws-logs";
import { IBucket } from "aws-cdk-lib/aws-s3";
import {
Choice,
Condition,
DefinitionBody,
JsonPath,
LogLevel,
Map,
Pass,
StateMachine,
Succeed,
} from "aws-cdk-lib/aws-stepfunctions";
import {
CallAwsService,
CallAwsServiceProps,
} from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";
export interface S3CopyProps {
maxConcurrency?: number;
source: {
bucket: IBucket;
key: IKey;
};
destination: {
bucket: IBucket;
key: IKey;
};
}
export class S3Copy extends Construct {
public readonly stateMachine: StateMachine;
constructor(scope: Construct, id: string, props: S3CopyProps) {
super(scope, id);
const jobSucceeded = new Succeed(this, "Succeeded");
// StartExportTaskで出力されるオブジェクトの内で必要なのはparquetファイルと_SUCCESSファイルのみです。不要なファイルをフィルタリングするためのTaskを作成します。
const filterParquet = new Pass(this, "FilterParquetAndSuccess", {
inputPath: JsonPath.stringAt(
"$.ListObjects.Contents[?(@.Key =~ /.*_SUCCESS/i || @.Key =~ /.*parquet/i)]"
),
resultPath: "$.ListObjects.FilterdContents",
});
// ListObjectsV2を実行しオブジェクト一覧を取得するTaskを作成します。
const listObjectsProps: CallAwsServiceProps = {
service: "s3",
action: "listObjectsV2",
parameters: {
Bucket: props.source.bucket.bucketName,
Prefix: JsonPath.stringAt("$.s3Prefix"),
},
iamResources: [`arn:aws:s3:::${props.source.bucket.bucketName}`],
iamAction: "s3:ListBucket",
additionalIamStatements: [
new PolicyStatement({
actions: ["s3:GetObject"],
resources: [`arn:aws:s3:::${props.source.bucket.bucketName}/*`],
}),
],
resultPath: "$.ListObjects",
};
const listObjects = new CallAwsService(
this,
"ListObjects",
listObjectsProps
);
// ListObjectsV2の結果がトークンを持っている場合は次のリクエストを行うためのTaskを作成します。
const listObjectsWithToken = new CallAwsService(
this,
"ListObjectsWithToken",
{
...listObjectsProps,
parameters: {
...listObjectsProps.parameters,
ContinuationToken: JsonPath.stringAt(
"$.ListObjects.NextContinuationToken"
),
},
}
);
// 今回は /${DATABASE}/${TABLE}/partition_datetime=${SNAPSHOT_TIME}/${NUM}/${OBJ} の形式でコピー先のキーを作成します。
// プログラミング言語を使えば簡単に出来ますが、今回はLambda関数を縛っているのでStep Functionsの組み込み関数でがんばります。
const split = JsonPath.stringSplit(JsonPath.stringAt("$.source.key"), "/");
const db = JsonPath.arrayGetItem(split, 1);
const table = JsonPath.arrayGetItem(split, 2);
const hive = JsonPath.format(
"partition_datetime={}",
JsonPath.stringAt("$.snapshotTime")
);
const num = JsonPath.arrayGetItem(split, 3);
const obj = JsonPath.arrayGetItem(split, 4);
const formatDestinationKey = new Pass(this, "FormatDestinationKey", {
parameters: {
key: JsonPath.format("{}/{}/{}/{}/{}", db, table, hive, num, obj),
},
resultPath: "$.destination",
});
// S3バケットへコピーするTaskを作成します。
const copyS3Object = new CallAwsService(this, "CopyS3Object", {
service: "s3",
action: "copyObject",
parameters: {
Bucket: props.destination.bucket.bucketName,
CopySource: JsonPath.format(
`${props.source.bucket.bucketName}/{}`,
JsonPath.stringAt("$.source.key")
),
Key: JsonPath.stringAt("$.destination.key"),
},
iamResources: [`arn:aws:s3:::${props.source.bucket.bucketName}`],
iamAction: "s3:ListBucket",
additionalIamStatements: [
// Read permission for source bucket
new PolicyStatement({
actions: ["s3:GetObject"],
resources: [`arn:aws:s3:::${props.source.bucket.bucketName}/*`],
}),
new PolicyStatement({
actions: ["kms:Decrypt"],
resources: [props.source.key.keyArn],
}),
// Write permission for destination bucket
new PolicyStatement({
actions: ["s3:PutObject"],
resources: [`arn:aws:s3:::${props.destination.bucket.bucketName}/*`],
}),
new PolicyStatement({
actions: ["kms:GenerateDataKey"],
resources: [props.destination.key.keyArn],
}),
],
});
// ListObjectsV2の結果がトークンを持っている場合は次のリクエストを行うためのChoiceを作成します。
const checkIfAllListed = new Choice(this, "CheckIfAllListed");
// Mapを使ってListObjectsV2の結果を並列でコピーするTaskを作成します。
const maxConcurrency = props.maxConcurrency || 50;
const map = new Map(this, "Map", {
maxConcurrency,
itemsPath: "$.ListObjects.FilterdContents",
itemSelector: {
snapshotTime: JsonPath.stringAt("$.snapshotTime"),
source: {
key: JsonPath.stringAt("$$.Map.Item.Value.Key"),
},
},
resultPath: JsonPath.DISCARD,
}).itemProcessor(formatDestinationKey.next(copyS3Object));
// 処理を連結します。
const s3Copy = listObjects
.next(filterParquet)
.next(map)
.next(
checkIfAllListed
.when(
Condition.booleanEquals("$.ListObjects.IsTruncated", false),
jobSucceeded
)
.otherwise(listObjectsWithToken.next(filterParquet))
);
const stateMachine = new StateMachine(this, "StateMachine", {
definitionBody: DefinitionBody.fromChainable(s3Copy),
timeout: Duration.hours(1),
tracingEnabled: true,
logs: {
destination: new LogGroup(this, "LogGroup"),
level: LogLevel.ALL,
includeExecutionData: true,
},
});
this.stateMachine = stateMachine;
}
}
スタックの作成
これらのConstructを使ってスタックを作成します。
import { Stack, StackProps } from "aws-cdk-lib";
import { Construct } from "constructs";
import { DemoResources } from "./demo-resources";
import { RdsExport } from "./rds-export";
import { S3Copy } from "./s3-copy";
export class RdsS3ExportCopyStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const { db, exportBucket, exportKey, destinationBucket, destinationKey } =
new DemoResources(this, "DemoResources");
const { stateMachine } = new S3Copy(this, "S3Copy", {
source: {
bucket: exportBucket,
key: exportKey,
},
destination: {
bucket: destinationBucket,
key: destinationKey,
},
});
new RdsExport(this, "RdsExport", {
db,
export: {
bucket: exportBucket,
key: exportKey,
},
copyStateMachine: stateMachine,
});
}
}
デプロイとデータの投入
cdk deploy
後にマネジメントコンソールのクエリエディタから適当にデータを投入しスナップショットを作成します。トリガーを手動スナップショットにも対応させているので一覧の処理が動きます。
検証データは以下のようにしました。
CREATE TABLE city (
id integer NOT NULL,
name text NOT NULL,
countrycode character(3) NOT NULL,
district text NOT NULL,
population integer NOT NULL
);
CREATE TABLE country (
code character(3) NOT NULL,
name text NOT NULL,
continent text NOT NULL,
region text NOT NULL,
surfacearea real NOT NULL,
indepyear smallint,
population integer NOT NULL,
lifeexpectancy real,
gnp numeric(10,2),
gnpold numeric(10,2),
localname text NOT NULL,
governmentform text NOT NULL,
headofstate text,
capital integer,
code2 character(2) NOT NULL,
CONSTRAINT country_continent_check CHECK ((((((((continent = 'Asia'::text) OR (continent = 'Europe'::text)) OR (continent = 'North America'::text)) OR (continent = 'Africa'::text)) OR (continent = 'Oceania'::text)) OR (continent = 'Antarctica'::text)) OR (continent = 'South America'::text)))
);
CREATE TABLE countrylanguage (
countrycode character(3) NOT NULL,
"language" text NOT NULL,
isofficial boolean NOT NULL,
percentage real NOT NULL
);
INSERT INTO city (id, name, countrycode, district, population) VALUES
(1, 'Tokyo', 'JPN', 'Kanto', 37400068),
(2, 'New York', 'USA', 'New York', 8175133),
(3, 'Los Angeles', 'USA', 'California', 3792621),
(4, 'Paris', 'FRA', 'Ile-de-France', 2140526),
(5, 'London', 'GBR', 'England', 8982000);
INSERT INTO country (code, name, continent, region, surfacearea, indepyear, population, lifeexpectancy, gnp, gnpold, localname, governmentform, headofstate, capital, code2) VALUES
('JPN', 'Japan', 'Asia', 'Eastern Asia', 377930.0, 660, 126476461, 84.6, 5064870.00, NULL, 'Nihon', 'Constitutional Monarchy', 'Naruhito', 1, 'JP'),
('USA', 'United States', 'North America', 'Northern America', 9833517.0, 1776, 331002651, 78.9, 21433226.00, NULL, 'United States', 'Federal Republic', 'Joe Biden', 2, 'US'),
('FRA', 'France', 'Europe', 'Western Europe', 551695.0, 843, 65273511, 82.4, 2715518.00, NULL, 'France', 'Republic', 'Emmanuel Macron', 4, 'FR'),
('GBR', 'United Kingdom', 'Europe', 'British Isles', 243610.0, 1066, 67886011, 81.2, 2825208.00, NULL, 'United Kingdom', 'Constitutional Monarchy', 'Elizabeth II', 5, 'GB');
INSERT INTO countrylanguage (countrycode, "language", isofficial, percentage) VALUES
('JPN', 'Japanese', TRUE, 99.2),
('USA', 'English', TRUE, 80.0),
('USA', 'Spanish', FALSE, 13.0),
('FRA', 'French', TRUE, 100.0),
('GBR', 'English', TRUE, 98.0);
あとがき
Step Functionsだけで出来るか不安でしたが無事に実装できました。ただし、プロジェクトメンバーのスキルセットや組織体制によってはDockerイメージを各AWSアカウントから参照可能にして何かしらのコンテナ実行サービスで実行する方がマッチする場合もあると思います。
また、データの取り込みとして様々なデータベースやサービスが具体的に見えている場合はデータ取り込みSaaSの利用も検討すると良いと思います。
Discussion