本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】1 ~CDK編~
この記事は以下Advent Calendarの記事です。
AWS CDK Advent Calendar 2021 - 12月9日
airCloset Advent Calendar 2021 - 12月9日
はじめに
株式会社エアークローゼットでエンジニアをしている三好 @miyomiyo344_ です。
本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作った話しです。
長編になりそうなので3部作に分けようと思います。
第1部ではAWS CDKで全体の構成を構築したところを書いています。
第2部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】2 ~SDK編前半~
第3部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編後半~
目的
なぜ本番環境から同期(リストア)するのか
クックパッドさんの開発者ブログでも書かれていますが、以下が本番環境から同期する理由です。
- ユーザと同等体験での開発やテスト、オペレーション対応を行いたい
- パフォーマンス問題の早期発見をしたい
- データ依存不具合の早期発見をしたい
当初の運用
- 当初はCircleCIベースで行っていました。
要件
対象となるデータベースは複数
- DB一覧
- AmazonAuroraDB - MySQL (Cluster)
- AmazonAuroraDB - PostgreSQL (Cluster)
- MySQL (Instance)
- 今後データベースが増えても大丈夫な設計
各データベースに個人情報が含まれている場合はマスキングする
- マスキング処理はテーブルごとに並列実行させる
- ステージング環境に不要なテーブルはTruncateする
- マスキング対象データが元々NULLの場合はNULLのままにする
- 本番環境との差分をできるだけ無くす
毎日自動的に実行する
- スキップすることも可能
その他
- 処理が失敗した場合はSlack通知
アーキテクチャ
Lambda
Lambda Functionを大きく分けて4種類用意しています。
- 本番DBからSnapshotを取得して、新たにステージングDBとなるデータベースを作成
- 対象となるDB
- AmazonAuroraDB - MySQL (Cluster)
- AmazonAuroraDB - PostgreSQL (Cluster)
- MySQL (Instance)
- 対象となるDB
- Masking処理の準備
- 新たに作成したDBの接続情報を取得
- そのDBのMasking対象となるテーブル一覧を取得しStep Functionへ渡す
- 各テーブルのMasking処理を並列実行
- 後処理
- 新しいDBに旧ステージングDBの接続ユーザー情報を追加
- ステージング用の接続ユーザーで接続確認
- DNSの付け替え
- 旧ステージングDBの削除
StepFunctions
上記ステップの3〜4がStepFunctionsでの実行となります。
また、「各テーブルのMasking処理を並列実行」の実行前に、StepFunctionsにてWait処理を加えています。
理由としては、リストア直後だとMySQL/PostgreSQLが安定しておらず、Masking処理が失敗することがあったためです。
リストア後5分待ち、その後Maskingへ進む設計としています。
備考
2021/10/01にAWD Step Functionsがアップデートされ200以上のAWSサービスと連携できるようになりました。
開発を始めたタイミングではAWS CDKがこのアップデートの対応をしていなかったため、上記のようなアーキテクチャになりました。
現在(2021/11月)はAWS CDKのプルリクエストがマージされており、GUIと変わらず200以上のサービスと連携できるようになっているみたいなので、いまから設計する場合はStep Functionを軸とした設計にしてもいいかもしれないです。たぶんその方が楽です。
環境構築
AWS上でSandbox環境を作成
開発中は本番データベースへの影響が怖いので社内でSandbox環境を作ってもらいました。
package.json
- aws-cdkのパッケージのバージョンは一律にする必要があります。
- Lambda Layersを使用しています。(参考:AWS CDK を使って node_modules を AWS Lambda Layers にデプロイするサンプル)
- 今回の場合はNode Versionを12以上にする必要がありました。(参考:cdk deploy may fail with cache directory error)
"devDependencies": {
"@aws-cdk/assert": "1.111.0",
"@aws-cdk/aws-events-targets": "1.111.0",
"@aws-cdk/aws-lambda": "1.111.0",
"@aws-cdk/aws-lambda-event-sources": "1.111.0",
"@aws-cdk/aws-rds": "1.111.0",
"@aws-cdk/aws-sns": "1.111.0",
"@aws-cdk/aws-sns-subscriptions": "1.111.0",
"@aws-cdk/aws-sqs": "1.111.0",
"@aws-cdk/aws-stepfunctions": "1.111.0",
"@aws-cdk/aws-stepfunctions-tasks": "^1.111.0",
"@aws-cdk/core": "1.111.0",
"@types/aws-lambda": "^8.10.78",
"@types/faker": "^5.5.9",
"@types/jest": "^26.0.10",
"@types/mysql2": "github:types/mysql2",
"@types/node": "10.17.27",
"@types/pg": "^8.6.1",
"aws-cdk": "1.111.0",
"jest": "^26.4.2",
"ts-jest": "^26.2.0",
"ts-node": "^9.0.0",
"typescript": "~3.9.7"
},
"dependencies": {
"@slack/web-api": "^6.2.4",
"aws-sdk": "^2.942.0",
"faker": "^5.5.3",
"mysql2": "^2.3.2",
"pg": "^8.7.1"
}
AWS CDK
import * as cdk from '@aws-cdk/core';
import { settings } from './settings';
import CONFIG from './config/config.json';
import * as awsvpc from './vpc';
import * as awssqs from './sqs';
import * as awssns from './sns';
import * as awsrds from './rds';
import * as awslambda from './lambda';
import * as awssfn from './sfn';
import * as awsevent from './event';
export class CdkRestoreRDS4stgStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const {} = settings;
const { vpc, lambdaSecurityGroup } = awsvpc.create(this);
// topicからrestorationイベントを受け取るqueue
const { queue } = awssqs.create(this);
// RDSのrestorationイベントをfun outするtopic
const { topic } = awssns.create(this, { queue });
// RDSのrestorationイベントをsbscribe
awsrds.subscribe(this, { topic });
// Lambda作成準備
const {
create: createLambda,
create4sfn: create4sfnLambda,
} = awslambda.initialize(this);
// step functionで実行するLambdaの作成
const {
anonymizationTaskHandler,
anonymizedDatabaseTaskHandler,
} = create4sfnLambda({
vpc,
securityGroup: lambdaSecurityGroup,
});
// step function作成
const { machine } = awssfn.create(this, {
mapFunction: anonymizationTaskHandler,
postFunction: anonymizedDatabaseTaskHandler,
});
// step functionで実行しないLambdaの作成
const lambdaFunction = createLambda({
queue,
sfnArn: machine.stateMachineArn,
securityGroup: lambdaSecurityGroup,
});
// EventBridgeのスケジュール作成
awsevent.createEvent(
this,
lambdaFunction.scheduledEventHandler,
CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE,
CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE.DB_NAME,
CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE.SECURITY_GROUP_ID,
);
awsevent.createEvent(
this,
lambdaFunction.scheduledEventHandler,
CONFIG.CLUSTER_POSTGRES_DB.SCHEDULE_TITLE,
CONFIG.CLUSTER_POSTGRES_DB.DB_NAME,
CONFIG.CLUSTER_POSTGRES_DB.SECURITY_GROUP_ID,
);
awsevent.createEvent(
this,
lambdaFunction.scheduledEventHandler,
CONFIG.INSTANCE_MYSQL_DB.SCHEDULE_TITLE,
CONFIG.INSTANCE_MYSQL_DB.DB_NAME,
CONFIG.INSTANCE_MYSQL_DB.SECURITY_GROUP_ID,
);
}
}
VPC
import { Stack } from '@aws-cdk/core';
import { Vpc, SecurityGroup, Port, IVpc } from '@aws-cdk/aws-ec2';
import { settings } from './settings';
import { parameters } from './parameter';
const securityGroup = (stack: Stack, {
vpc,
securityGroupName,
description,
}: {
vpc: IVpc,
securityGroupName: string,
description: string,
}) => {
return new SecurityGroup(stack, securityGroupName, {
vpc,
securityGroupName,
description,
});
};
export const create = (stack: Stack) => {
const {
resourcePrefix,
} = settings;
const {
vpcId,
} = parameters(stack);
// リストアを実行するVPCを取得。
const vpc = Vpc.fromLookup(stack, 'Vpc', { vpcId });
// Lambdaに対して付与するsecurity group
const lambdaSecurityGroup = securityGroup(stack, {
vpc,
securityGroupName: `${resourcePrefix}-lambda-security-group`,
description: 'This security group is managed by CDK.',
});
// restoreされたDBに対して紐付けるsecurity group
const restoreRdsSecurityGroup = securityGroup(stack, {
vpc,
securityGroupName: `${resourcePrefix}-restored-rds-security-group`,
description: 'This security group is managed by CDK.',
});
// restoreされたDBに対してLambdaから無名化処理を行うためにアクセス許可
restoreRdsSecurityGroup.connections.allowFrom(lambdaSecurityGroup, Port.tcp(3306), 'Lambda for restoration');
return {
vpc,
lambdaSecurityGroup,
restoreRdsSecurityGroup,
}
}
SQS
import { Stack, Duration } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
import { settings } from './settings';
export const create = (stack: Stack) => {
const { resourcePrefix } = settings;
// topicからrestorationイベントを受け取るqueue
const queue = new Queue(stack, `${resourcePrefix}Queue`, {
visibilityTimeout: Duration.seconds(300)
});
return {
queue,
}
}
SNS
import { Stack } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
import { Topic } from '@aws-cdk/aws-sns';
import { SqsSubscription } from '@aws-cdk/aws-sns-subscriptions';
import { settings } from './settings';
export const create = (stack: Stack, { queue }: { queue: Queue }) => {
const { resourcePrefix } = settings;
// RDSのrestorationイベントをfun outするtopic
const topic = new Topic(stack, `${resourcePrefix}Topic`);
// SQSがtopicをsubscribe
topic.addSubscription(new SqsSubscription(queue));
return {
topic,
}
}
RDS
import { Stack } from '@aws-cdk/core';
import { CfnEventSubscription } from '@aws-cdk/aws-rds';
import { Topic } from '@aws-cdk/aws-sns';
import { settings } from './settings';
export const subscribe = (stack: Stack, { topic }: { topic: Topic }) => {
const { resourcePrefix } = settings;
// memo: instance型: restoration
// memo: cluster型: creation (clusterをsnapshotから作成後、instanceをcreateするため。)
new CfnEventSubscription(stack, `${resourcePrefix}InstanceRestorationEventSubscription`, {
snsTopicArn: topic.topicArn,
eventCategories: ['restoration', 'creation'],
sourceType: 'db-instance',
});
}
Lambda
import { Stack, Duration } from '@aws-cdk/core';
import { IVpc, SubnetSelection, SecurityGroup, Subnet } from '@aws-cdk/aws-ec2';
import { Queue } from '@aws-cdk/aws-sqs';
import { Role, ServicePrincipal, ManagedPolicy } from '@aws-cdk/aws-iam';
import {
LayerVersion,
AssetCode,
Runtime,
Function,
Code,
} from '@aws-cdk/aws-lambda';
import { SqsEventSource } from '@aws-cdk/aws-lambda-event-sources';
import { settings } from './settings';
const createLambdaFunction = (
stack: Stack,
lambdaOptions: {
stack: Stack;
role: Role;
layers: [LayerVersion];
functionName: string;
handler: string;
vpc?: IVpc;
vpcSubnets?: SubnetSelection;
securityGroups?: [SecurityGroup];
environment?: { [key: string]: string };
timeout?: Duration;
},
) => {
const options = {
...lambdaOptions,
runtime: Runtime.NODEJS_14_X, // ランタイムの指定
code: Code.fromAsset('dist'), // ソースコードのディレクトリ -> npm run build
memorySize: 1024, // メモリーの指定
allowPublicSubnet: true,
};
return new Function(stack, lambdaOptions.functionName, options);
};
export const initialize = (stack: Stack) => {
// それぞれのlambdaに渡す環境変数
const { resourcePrefix, spreadsheetId, worksheetId, slackToken } = settings;
// node_modulesを登録するlayer作成
const nodeModulesLayer = new LayerVersion(
stack,
`${resourcePrefix}NodeModulesLayer`,
{
code: AssetCode.fromAsset('bundle'),
compatibleRuntimes: [Runtime.NODEJS_14_X],
},
);
// Lambdaに付与するロール作成
const lambdaRole = new Role(stack, `${resourcePrefix}LambdaRole`, {
assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
managedPolicies: [
ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSLambdaBasicExecutionRole',
),
ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSLambdaVPCAccessExecutionRole',
),
ManagedPolicy.fromAwsManagedPolicyName('AWSStepFunctionsFullAccess'),
ManagedPolicy.fromAwsManagedPolicyName('AmazonRDSFullAccess'),
ManagedPolicy.fromAwsManagedPolicyName('AmazonRoute53FullAccess'),
ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMReadOnlyAccess'),
],
});
return {
create: ({
queue,
sfnArn,
securityGroup,
}: {
queue: Queue;
sfnArn: string;
securityGroup: SecurityGroup;
}) => {
// Lambda Function 作成
const scheduledEventHandler = createLambdaFunction(stack, {
stack,
role: lambdaRole,
layers: [nodeModulesLayer],
functionName: `${resourcePrefix}ScheduledEventHandler`,
handler: 'presentation/handler/scheduled-event.handler',
environment: {
RESOURCE_PREFIX: resourcePrefix,
SLACK_TOKEN: slackToken,
},
timeout: Duration.seconds(60),
});
const rdsRestorationEventHandler = createLambdaFunction(stack, {
stack,
role: lambdaRole,
layers: [nodeModulesLayer],
functionName: `${resourcePrefix}RdsRestorationEventHandler`,
handler: 'presentation/handler/rds-restoration-event.handler',
environment: {
SFN_ARN: sfnArn,
LAMBDA_SECURITY_GROUP_ID: securityGroup.securityGroupId,
SLACK_TOKEN: slackToken,
},
timeout: Duration.seconds(60),
});
rdsRestorationEventHandler.addEventSource(new SqsEventSource(queue));
return {
scheduledEventHandler,
rdsRestorationEventHandler,
};
},
create4sfn: ({
vpc,
securityGroup,
}: {
vpc: IVpc;
securityGroup: SecurityGroup;
}) => {
const anonymizationTaskHandler = createLambdaFunction(stack, {
stack,
role: lambdaRole,
layers: [nodeModulesLayer],
functionName: `${resourcePrefix}AnonymizationTaskHandler`,
handler: 'presentation/handler/anonymization-task.handler',
environment: {
SLACK_TOKEN: slackToken,
},
vpc,
vpcSubnets: { // Systems Managerメソッドの実行に必要
subnets: [
Subnet.fromSubnetAttributes(
stack,
'subnet-2c3b4c74-anonymizationTask',
{
subnetId: 'subnet-2c3b4c74',
},
),
Subnet.fromSubnetAttributes(
stack,
'subnet-81192df7-anonymizationTask',
{
subnetId: 'subnet-81192df7',
},
),
],
},
securityGroups: [securityGroup],
timeout: Duration.minutes(15), // memo: Masking用のLambdaは今後データが増えた時を見越して最大の15分。
});
const anonymizedDatabaseTaskHandler = createLambdaFunction(stack, {
stack,
role: lambdaRole,
layers: [nodeModulesLayer],
functionName: `${resourcePrefix}AnonymizedDatabaseTaskHandler`,
handler: 'presentation/handler/anonymized-database-task.handler',
environment: {
SPREADSHEET_ID: spreadsheetId,
WORKSHEET_ID: worksheetId,
SLACK_TOKEN: slackToken,
},
vpc,
vpcSubnets: { // Systems Managerメソッドの実行に必要
subnets: [
Subnet.fromSubnetAttributes(
stack,
'subnet-2c3b4c74-anonymizedDatabaseTask',
{
subnetId: 'subnet-2c3b4c74',
},
),
Subnet.fromSubnetAttributes(
stack,
'subnet-81192df7-anonymizedDatabaseTask',
{
subnetId: 'subnet-81192df7',
},
),
],
},
securityGroups: [securityGroup],
timeout: Duration.seconds(60),
});
return {
anonymizationTaskHandler,
anonymizedDatabaseTaskHandler,
};
},
};
};
StepFunctions
import { Stack, Duration } from '@aws-cdk/core';
import { Map, StateMachine, Wait, WaitTime } from '@aws-cdk/aws-stepfunctions';
import { LambdaInvoke } from '@aws-cdk/aws-stepfunctions-tasks';
import { Function } from '@aws-cdk/aws-lambda';
import { settings } from './settings';
export const create = (
stack: Stack,
{
mapFunction,
postFunction,
}: {
mapFunction: Function;
postFunction: Function;
},
) => {
const { resourcePrefix } = settings;
const fiveMinutesLater = new Date();
fiveMinutesLater.setMinutes(fiveMinutesLater.getMinutes() + 5);
// memo: リストア直後だとRDSが安定しないため5分待つ処理
const wait = new Wait(stack, 'restoreCompleteWait', {
time: WaitTime.duration(Duration.minutes(5)),
});
const anonymizationTaskMap = new Map(stack, 'anonymizationTaskMap', {
maxConcurrency: 0, // memo: 同時実行する処理(Lambda)の最大値。0は無制限。
itemsPath: '$.tasks',
inputPath: '$',
parameters: {
'task.$': '$$.Map.Item.Value',
'database.$': '$.database',
},
outputPath: '$.database',
resultPath: '$.null',
});
const anonymizationTask = new LambdaInvoke(stack, 'anonymizationtask', {
lambdaFunction: mapFunction,
payloadResponseOnly: true,
});
const anonymizedDatabaseTask = new LambdaInvoke(
stack,
'anonymizedDatabaseTask',
{
lambdaFunction: postFunction,
payloadResponseOnly: true,
},
);
anonymizationTaskMap.iterator(anonymizationTask); // memo: iteratorで並列処理を実行
wait.next(anonymizationTaskMap).next(anonymizedDatabaseTask);
const definition = wait;
const machine = new StateMachine(stack, `${resourcePrefix}StepFunction`, {
definition,
timeout: Duration.minutes(15),
});
return {
machine,
};
};
EventBridge
import { Stack } from '@aws-cdk/core';
import { Rule, Schedule, RuleTargetInput } from '@aws-cdk/aws-events';
import { LambdaFunction } from '@aws-cdk/aws-events-targets';
import { IFunction } from '@aws-cdk/aws-lambda';
export const createEvent = (
stack: Stack,
lambdaFunction: IFunction,
scheduleEventTitle: string,
targetDB: string,
restoreRdsSecurityGroupId: string,
) => {
return new Rule(stack, scheduleEventTitle, {
schedule: Schedule.cron({
minute: '0',
hour: '14',
day: '*',
month: '*',
year: '*',
}),
targets: [
new LambdaFunction(lambdaFunction, {
event: RuleTargetInput.fromObject({
targetDB,
restoreRdsSecurityGroupId,
}),
}),
],
description: `restore rds for ${targetDB} staging`,
});
};
Point
リストアの方法
対象となるRDSがCluster型(AWS Aurora)か、Instance型かでリストアの方法が異なります。
- Cluster型: snapshotからclusterを作成後、そのclusterに紐付くinstanceをcreateする。
- Instance型: snapshotからinstanceを作成する。
そのためSNSが受け取るRDSイベントは、 sourceType: 'db-instance'
の、 eventCategories: ['restoration', 'creation']
になっています。
各Lambda/StepFunctionsのTimeout時間
Masking用のLambdaは今後もデータが増えることを見越して最大の15分にしました。
その他は時間がかかる処理ではないので一旦60秒にしています。
それに合わせてStepFunctionsのTimeoutも15分としています。
System Managerの実行
LambdaからSystem Managerのメソッド実行する場合、VCP Subnetsを指定する必要があります。
AWS CDKの実装は以上
これでEvent Bridgeから各Lambdaが動く処理を構築できました。
次は実際のLambdaの処理を書いていきます。
第2部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編前半~に続きます。
採用情報
採用活動も積極的に行っていますのでもし興味あれば以下コーポレートサイト等を覗いてみてください。
Discussion