本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】1 ~CDK編~

2021/12/09に公開

この記事は以下Advent Calendarの記事です。
AWS CDK Advent Calendar 2021 - 12月9日
airCloset Advent Calendar 2021 - 12月9日

はじめに

株式会社エアークローゼットでエンジニアをしている三好 @miyomiyo344_ です。

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作った話しです。

長編になりそうなので3部作に分けようと思います。
第1部ではAWS CDKで全体の構成を構築したところを書いています。

第2部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】2 ~SDK編前半~

第3部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編後半~

目的

なぜ本番環境から同期(リストア)するのか

クックパッドさんの開発者ブログでも書かれていますが、以下が本番環境から同期する理由です。

  • ユーザと同等体験での開発やテスト、オペレーション対応を行いたい
  • パフォーマンス問題の早期発見をしたい
  • データ依存不具合の早期発見をしたい

当初の運用

アーキテクチャ

  • 当初はCircleCIベースで行っていました。

要件

対象となるデータベースは複数

  • DB一覧
    • AmazonAuroraDB - MySQL (Cluster)
    • AmazonAuroraDB - PostgreSQL (Cluster)
    • MySQL (Instance)
  • 今後データベースが増えても大丈夫な設計

各データベースに個人情報が含まれている場合はマスキングする

  • マスキング処理はテーブルごとに並列実行させる
  • ステージング環境に不要なテーブルはTruncateする
  • マスキング対象データが元々NULLの場合はNULLのままにする
    • 本番環境との差分をできるだけ無くす

毎日自動的に実行する

  • スキップすることも可能

その他

  • 処理が失敗した場合はSlack通知

アーキテクチャ

アーキテクチャ

Lambda

Lambda Functionを大きく分けて4種類用意しています。

  1. 本番DBからSnapshotを取得して、新たにステージングDBとなるデータベースを作成
    • 対象となるDB
      • AmazonAuroraDB - MySQL (Cluster)
      • AmazonAuroraDB - PostgreSQL (Cluster)
      • MySQL (Instance)
  2. Masking処理の準備
    1. 新たに作成したDBの接続情報を取得
    2. そのDBのMasking対象となるテーブル一覧を取得しStep Functionへ渡す
  3. 各テーブルのMasking処理を並列実行
  4. 後処理
    1. 新しいDBに旧ステージングDBの接続ユーザー情報を追加
    2. ステージング用の接続ユーザーで接続確認
    3. DNSの付け替え
    4. 旧ステージングDBの削除

StepFunctions

上記ステップの3〜4がStepFunctionsでの実行となります。
また、「各テーブルのMasking処理を並列実行」の実行前に、StepFunctionsにてWait処理を加えています。
理由としては、リストア直後だとMySQL/PostgreSQLが安定しておらず、Masking処理が失敗することがあったためです。
リストア後5分待ち、その後Maskingへ進む設計としています。

StepFunctionsグラフインスペクター

備考

2021/10/01にAWD Step Functionsがアップデートされ200以上のAWSサービスと連携できるようになりました。
開発を始めたタイミングではAWS CDKがこのアップデートの対応をしていなかったため、上記のようなアーキテクチャになりました。
現在(2021/11月)はAWS CDKのプルリクエストがマージされており、GUIと変わらず200以上のサービスと連携できるようになっているみたいなので、いまから設計する場合はStep Functionを軸とした設計にしてもいいかもしれないです。たぶんその方が楽です。

環境構築

AWS上でSandbox環境を作成

開発中は本番データベースへの影響が怖いので社内でSandbox環境を作ってもらいました。

package.json

package.json
  "devDependencies": {
    "@aws-cdk/assert": "1.111.0",
    "@aws-cdk/aws-events-targets": "1.111.0",
    "@aws-cdk/aws-lambda": "1.111.0",
    "@aws-cdk/aws-lambda-event-sources": "1.111.0",
    "@aws-cdk/aws-rds": "1.111.0",
    "@aws-cdk/aws-sns": "1.111.0",
    "@aws-cdk/aws-sns-subscriptions": "1.111.0",
    "@aws-cdk/aws-sqs": "1.111.0",
    "@aws-cdk/aws-stepfunctions": "1.111.0",
    "@aws-cdk/aws-stepfunctions-tasks": "^1.111.0",
    "@aws-cdk/core": "1.111.0",
    "@types/aws-lambda": "^8.10.78",
    "@types/faker": "^5.5.9",
    "@types/jest": "^26.0.10",
    "@types/mysql2": "github:types/mysql2",
    "@types/node": "10.17.27",
    "@types/pg": "^8.6.1",
    "aws-cdk": "1.111.0",
    "jest": "^26.4.2",
    "ts-jest": "^26.2.0",
    "ts-node": "^9.0.0",
    "typescript": "~3.9.7"
  },
  "dependencies": {
    "@slack/web-api": "^6.2.4",
    "aws-sdk": "^2.942.0",
    "faker": "^5.5.3",
    "mysql2": "^2.3.2",
    "pg": "^8.7.1"
  }

AWS CDK

lib/index.ts
import * as cdk from '@aws-cdk/core';
import { settings } from './settings';
import CONFIG from './config/config.json';
import * as awsvpc from './vpc';
import * as awssqs from './sqs';
import * as awssns from './sns';
import * as awsrds from './rds';
import * as awslambda from './lambda';
import * as awssfn from './sfn';
import * as awsevent from './event';
export class CdkRestoreRDS4stgStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const {} = settings;
    const { vpc, lambdaSecurityGroup } = awsvpc.create(this);

    // topicからrestorationイベントを受け取るqueue
    const { queue } = awssqs.create(this);

    // RDSのrestorationイベントをfun outするtopic
    const { topic } = awssns.create(this, { queue });

    // RDSのrestorationイベントをsbscribe
    awsrds.subscribe(this, { topic });

    // Lambda作成準備
    const {
      create: createLambda,
      create4sfn: create4sfnLambda,
    } = awslambda.initialize(this);

    // step functionで実行するLambdaの作成
    const {
      anonymizationTaskHandler,
      anonymizedDatabaseTaskHandler,
    } = create4sfnLambda({
      vpc,
      securityGroup: lambdaSecurityGroup,
    });

    // step function作成
    const { machine } = awssfn.create(this, {
      mapFunction: anonymizationTaskHandler,
      postFunction: anonymizedDatabaseTaskHandler,
    });

    // step functionで実行しないLambdaの作成
    const lambdaFunction = createLambda({
      queue,
      sfnArn: machine.stateMachineArn,
      securityGroup: lambdaSecurityGroup,
    });

    // EventBridgeのスケジュール作成
    awsevent.createEvent(
      this,
      lambdaFunction.scheduledEventHandler,
      CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE,
      CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE.DB_NAME,
      CONFIG.CLUSTER_MYSQL_DB.SCHEDULE_TITLE.SECURITY_GROUP_ID,
    );
    awsevent.createEvent(
      this,
      lambdaFunction.scheduledEventHandler,
      CONFIG.CLUSTER_POSTGRES_DB.SCHEDULE_TITLE,
      CONFIG.CLUSTER_POSTGRES_DB.DB_NAME,
      CONFIG.CLUSTER_POSTGRES_DB.SECURITY_GROUP_ID,
    );
    awsevent.createEvent(
      this,
      lambdaFunction.scheduledEventHandler,
      CONFIG.INSTANCE_MYSQL_DB.SCHEDULE_TITLE,
      CONFIG.INSTANCE_MYSQL_DB.DB_NAME,
      CONFIG.INSTANCE_MYSQL_DB.SECURITY_GROUP_ID,
    );
  }
}

VPC

lib/vpc.ts
import { Stack } from '@aws-cdk/core';
import { Vpc, SecurityGroup, Port, IVpc } from '@aws-cdk/aws-ec2';
import { settings } from './settings';
import { parameters } from './parameter';

const securityGroup = (stack: Stack, {
  vpc,
  securityGroupName,
  description,
}: {
    vpc: IVpc,
    securityGroupName: string,
    description: string,
  }) => {
  return new SecurityGroup(stack, securityGroupName, {
    vpc,
    securityGroupName,
    description,
  });
};

export const create = (stack: Stack) => {
  const {
    resourcePrefix,
  } = settings;
  const {
    vpcId,
  } = parameters(stack);
  // リストアを実行するVPCを取得。
  const vpc = Vpc.fromLookup(stack, 'Vpc', { vpcId });
  // Lambdaに対して付与するsecurity group
  const lambdaSecurityGroup = securityGroup(stack, {
    vpc,
    securityGroupName: `${resourcePrefix}-lambda-security-group`,
    description: 'This security group is managed by CDK.',
  });
  // restoreされたDBに対して紐付けるsecurity group
  const restoreRdsSecurityGroup = securityGroup(stack, {
    vpc,
    securityGroupName: `${resourcePrefix}-restored-rds-security-group`,
    description: 'This security group is managed by CDK.',
  });

  // restoreされたDBに対してLambdaから無名化処理を行うためにアクセス許可
  restoreRdsSecurityGroup.connections.allowFrom(lambdaSecurityGroup, Port.tcp(3306), 'Lambda for restoration');

  return {
    vpc,
    lambdaSecurityGroup,
    restoreRdsSecurityGroup,
  }
}

SQS

lib/sqs.ts
import { Stack, Duration } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
import { settings } from './settings';

export const create = (stack: Stack) => {
  const { resourcePrefix } = settings;

  // topicからrestorationイベントを受け取るqueue
  const queue = new Queue(stack, `${resourcePrefix}Queue`, {
    visibilityTimeout: Duration.seconds(300)
  });

  return {
    queue,
  }
}

SNS

lib/sns.ts
import { Stack } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
import { Topic } from '@aws-cdk/aws-sns';
import { SqsSubscription } from '@aws-cdk/aws-sns-subscriptions';
import { settings } from './settings';

export const create = (stack: Stack, { queue }: { queue: Queue }) => {
  const { resourcePrefix } = settings;
  // RDSのrestorationイベントをfun outするtopic
  const topic = new Topic(stack, `${resourcePrefix}Topic`);
  // SQSがtopicをsubscribe
  topic.addSubscription(new SqsSubscription(queue));

  return {
    topic,
  }
}

RDS

lib/rds.ts
import { Stack } from '@aws-cdk/core';
import { CfnEventSubscription } from '@aws-cdk/aws-rds';
import { Topic } from '@aws-cdk/aws-sns';
import { settings } from './settings';

export const subscribe = (stack: Stack, { topic }: { topic: Topic }) => {
  const { resourcePrefix } = settings;
  // memo: instance型: restoration
  // memo: cluster型: creation (clusterをsnapshotから作成後、instanceをcreateするため。)
  new CfnEventSubscription(stack, `${resourcePrefix}InstanceRestorationEventSubscription`, {
    snsTopicArn: topic.topicArn,
    eventCategories: ['restoration', 'creation'],
    sourceType: 'db-instance',
  });
}

Lambda

lib/lambda.ts
import { Stack, Duration } from '@aws-cdk/core';
import { IVpc, SubnetSelection, SecurityGroup, Subnet } from '@aws-cdk/aws-ec2';
import { Queue } from '@aws-cdk/aws-sqs';
import { Role, ServicePrincipal, ManagedPolicy } from '@aws-cdk/aws-iam';
import {
  LayerVersion,
  AssetCode,
  Runtime,
  Function,
  Code,
} from '@aws-cdk/aws-lambda';
import { SqsEventSource } from '@aws-cdk/aws-lambda-event-sources';
import { settings } from './settings';

const createLambdaFunction = (
  stack: Stack,
  lambdaOptions: {
    stack: Stack;
    role: Role;
    layers: [LayerVersion];
    functionName: string;
    handler: string;
    vpc?: IVpc;
    vpcSubnets?: SubnetSelection;
    securityGroups?: [SecurityGroup];
    environment?: { [key: string]: string };
    timeout?: Duration;
  },
) => {
  const options = {
    ...lambdaOptions,
    runtime: Runtime.NODEJS_14_X, // ランタイムの指定
    code: Code.fromAsset('dist'), // ソースコードのディレクトリ -> npm run build
    memorySize: 1024, // メモリーの指定
    allowPublicSubnet: true,
  };

  return new Function(stack, lambdaOptions.functionName, options);
};

export const initialize = (stack: Stack) => {
  // それぞれのlambdaに渡す環境変数
  const { resourcePrefix, spreadsheetId, worksheetId, slackToken } = settings;

  // node_modulesを登録するlayer作成
  const nodeModulesLayer = new LayerVersion(
    stack,
    `${resourcePrefix}NodeModulesLayer`,
    {
      code: AssetCode.fromAsset('bundle'),
      compatibleRuntimes: [Runtime.NODEJS_14_X],
    },
  );
  // Lambdaに付与するロール作成
  const lambdaRole = new Role(stack, `${resourcePrefix}LambdaRole`, {
    assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
    managedPolicies: [
      ManagedPolicy.fromAwsManagedPolicyName(
        'service-role/AWSLambdaBasicExecutionRole',
      ),
      ManagedPolicy.fromAwsManagedPolicyName(
        'service-role/AWSLambdaVPCAccessExecutionRole',
      ),
      ManagedPolicy.fromAwsManagedPolicyName('AWSStepFunctionsFullAccess'),
      ManagedPolicy.fromAwsManagedPolicyName('AmazonRDSFullAccess'),
      ManagedPolicy.fromAwsManagedPolicyName('AmazonRoute53FullAccess'),
      ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMReadOnlyAccess'),
    ],
  });

  return {
    create: ({
      queue,
      sfnArn,
      securityGroup,
    }: {
      queue: Queue;
      sfnArn: string;
      securityGroup: SecurityGroup;
    }) => {
      // Lambda Function 作成
      const scheduledEventHandler = createLambdaFunction(stack, {
        stack,
        role: lambdaRole,
        layers: [nodeModulesLayer],
        functionName: `${resourcePrefix}ScheduledEventHandler`,
        handler: 'presentation/handler/scheduled-event.handler',
        environment: {
          RESOURCE_PREFIX: resourcePrefix,
          SLACK_TOKEN: slackToken,
        },
        timeout: Duration.seconds(60),
      });
      const rdsRestorationEventHandler = createLambdaFunction(stack, {
        stack,
        role: lambdaRole,
        layers: [nodeModulesLayer],
        functionName: `${resourcePrefix}RdsRestorationEventHandler`,
        handler: 'presentation/handler/rds-restoration-event.handler',
        environment: {
          SFN_ARN: sfnArn,
          LAMBDA_SECURITY_GROUP_ID: securityGroup.securityGroupId,
          SLACK_TOKEN: slackToken,
        },
        timeout: Duration.seconds(60),
      });

      rdsRestorationEventHandler.addEventSource(new SqsEventSource(queue));

      return {
        scheduledEventHandler,
        rdsRestorationEventHandler,
      };
    },
    create4sfn: ({
      vpc,
      securityGroup,
    }: {
      vpc: IVpc;
      securityGroup: SecurityGroup;
    }) => {
      const anonymizationTaskHandler = createLambdaFunction(stack, {
        stack,
        role: lambdaRole,
        layers: [nodeModulesLayer],
        functionName: `${resourcePrefix}AnonymizationTaskHandler`,
        handler: 'presentation/handler/anonymization-task.handler',
        environment: {
          SLACK_TOKEN: slackToken,
        },
        vpc,
        vpcSubnets: { // Systems Managerメソッドの実行に必要
          subnets: [
            Subnet.fromSubnetAttributes(
              stack,
              'subnet-2c3b4c74-anonymizationTask',
              {
                subnetId: 'subnet-2c3b4c74',
              },
            ),
            Subnet.fromSubnetAttributes(
              stack,
              'subnet-81192df7-anonymizationTask',
              {
                subnetId: 'subnet-81192df7',
              },
            ),
          ],
        },
        securityGroups: [securityGroup],
        timeout: Duration.minutes(15), // memo: Masking用のLambdaは今後データが増えた時を見越して最大の15分。
      });
      const anonymizedDatabaseTaskHandler = createLambdaFunction(stack, {
        stack,
        role: lambdaRole,
        layers: [nodeModulesLayer],
        functionName: `${resourcePrefix}AnonymizedDatabaseTaskHandler`,
        handler: 'presentation/handler/anonymized-database-task.handler',
        environment: {
          SPREADSHEET_ID: spreadsheetId,
          WORKSHEET_ID: worksheetId,
          SLACK_TOKEN: slackToken,
        },
        vpc,
        vpcSubnets: { // Systems Managerメソッドの実行に必要
          subnets: [
            Subnet.fromSubnetAttributes(
              stack,
              'subnet-2c3b4c74-anonymizedDatabaseTask',
              {
                subnetId: 'subnet-2c3b4c74',
              },
            ),
            Subnet.fromSubnetAttributes(
              stack,
              'subnet-81192df7-anonymizedDatabaseTask',
              {
                subnetId: 'subnet-81192df7',
              },
            ),
          ],
        },
        securityGroups: [securityGroup],
        timeout: Duration.seconds(60),
      });

      return {
        anonymizationTaskHandler,
        anonymizedDatabaseTaskHandler,
      };
    },
  };
};

StepFunctions

lib/sfn.ts
import { Stack, Duration } from '@aws-cdk/core';
import { Map, StateMachine, Wait, WaitTime } from '@aws-cdk/aws-stepfunctions';
import { LambdaInvoke } from '@aws-cdk/aws-stepfunctions-tasks';
import { Function } from '@aws-cdk/aws-lambda';
import { settings } from './settings';

export const create = (
  stack: Stack,
  {
    mapFunction,
    postFunction,
  }: {
    mapFunction: Function;
    postFunction: Function;
  },
) => {
  const { resourcePrefix } = settings;
  const fiveMinutesLater = new Date();
  fiveMinutesLater.setMinutes(fiveMinutesLater.getMinutes() + 5);

  // memo: リストア直後だとRDSが安定しないため5分待つ処理
  const wait = new Wait(stack, 'restoreCompleteWait', {
    time: WaitTime.duration(Duration.minutes(5)),
  });

  const anonymizationTaskMap = new Map(stack, 'anonymizationTaskMap', {
    maxConcurrency: 0, // memo: 同時実行する処理(Lambda)の最大値。0は無制限。
    itemsPath: '$.tasks',
    inputPath: '$',
    parameters: {
      'task.$': '$$.Map.Item.Value',
      'database.$': '$.database',
    },
    outputPath: '$.database',
    resultPath: '$.null',
  });
  const anonymizationTask = new LambdaInvoke(stack, 'anonymizationtask', {
    lambdaFunction: mapFunction,
    payloadResponseOnly: true,
  });
  const anonymizedDatabaseTask = new LambdaInvoke(
    stack,
    'anonymizedDatabaseTask',
    {
      lambdaFunction: postFunction,
      payloadResponseOnly: true,
    },
  );
  anonymizationTaskMap.iterator(anonymizationTask); // memo: iteratorで並列処理を実行
  wait.next(anonymizationTaskMap).next(anonymizedDatabaseTask);

  const definition = wait;

  const machine = new StateMachine(stack, `${resourcePrefix}StepFunction`, {
    definition,
    timeout: Duration.minutes(15),
  });

  return {
    machine,
  };
};

EventBridge

lib/event.ts
import { Stack } from '@aws-cdk/core';
import { Rule, Schedule, RuleTargetInput } from '@aws-cdk/aws-events';
import { LambdaFunction } from '@aws-cdk/aws-events-targets';
import { IFunction } from '@aws-cdk/aws-lambda';

export const createEvent = (
  stack: Stack,
  lambdaFunction: IFunction,
  scheduleEventTitle: string,
  targetDB: string,
  restoreRdsSecurityGroupId: string,
) => {
  return new Rule(stack, scheduleEventTitle, {
    schedule: Schedule.cron({
      minute: '0',
      hour: '14',
      day: '*',
      month: '*',
      year: '*',
    }),
    targets: [
      new LambdaFunction(lambdaFunction, {
        event: RuleTargetInput.fromObject({
          targetDB,
          restoreRdsSecurityGroupId,
        }),
      }),
    ],
    description: `restore rds for ${targetDB} staging`,
  });
};

Point

リストアの方法

対象となるRDSがCluster型(AWS Aurora)か、Instance型かでリストアの方法が異なります。

  • Cluster型: snapshotからclusterを作成後、そのclusterに紐付くinstanceをcreateする。
  • Instance型: snapshotからinstanceを作成する。

そのためSNSが受け取るRDSイベントは、 sourceType: 'db-instance' の、 eventCategories: ['restoration', 'creation'] になっています。

各Lambda/StepFunctionsのTimeout時間

Masking用のLambdaは今後もデータが増えることを見越して最大の15分にしました。
その他は時間がかかる処理ではないので一旦60秒にしています。
それに合わせてStepFunctionsのTimeoutも15分としています。

System Managerの実行

LambdaからSystem Managerのメソッド実行する場合、VCP Subnetsを指定する必要があります。

AWS CDKの実装は以上

これでEvent Bridgeから各Lambdaが動く処理を構築できました。

次は実際のLambdaの処理を書いていきます。

第2部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編前半~に続きます。

採用情報

採用活動も積極的に行っていますのでもし興味あれば以下コーポレートサイト等を覗いてみてください。

https://corp.air-closet.com/recruiting/
https://corp.air-closet.com/recruiting/developers/
エンジニアコーポレートサイト
https://youtu.be/w99lPd-Uea0

Discussion