🥎

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】2 ~SDK編前半~

2021/12/19に公開

この記事は以下Advent Calendarの記事です。
AWS CDK Advent Calendar 2021 - 12月9日
airCloset Advent Calendar 2021 - 12月9日

前回

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】1~CDK構築編~

はじめに

株式会社エアークローゼットでエンジニアをしている三好 @miyomiyo344_ です。

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作った話しで、前回の続きです。

第1部では今回の目的や要件から、AWS CDKで全体の構成を構築したところを書きました。
第2部の本記事ではAWS SDK for JavaScript にて、実際のLambdaの処理を書いていきます。

アーキテクチャ

アーキテクチャ

ざっくりやること

  1. 本番DBからSnapshotを取得して、新たにステージングDBとなるデータベースを作成
    • 対象となるDB
      • AmazonAuroraDB - MySQL (Cluster)
      • AmazonAuroraDB - PostgreSQL (Cluster)
      • MySQL (Instance)
  2. Masking処理の準備
    1. 新たに作成したDBの接続情報を取得
    2. 対象となるDBのMasking対象となるテーブル一覧を取得しStep Functionへ渡す
  3. 各テーブルのMasking処理を並列実行
  4. 後処理
    1. 新しいDBに旧ステージングDBの接続ユーザー情報を追加
    2. ステージング用の接続ユーザーで接続確認
    3. DNSの付け替え
    4. 旧ステージングDBの削除

設計

  • ドメイン駆動設計(DDD)で構築しています。
  • AWS SDK for JavaScript Version1で書いていますが、Version3の方がいろいろ便利だったなと感じています。

実装 → テスト

LocalStackの導入も検討しましたが、当時LocalStackがM1 Macに対応していなかったため導入しませんでした。
そのため確認する際はSDKをデプロイしてから、AWS GUI上でテストしていました。

注意点としては、Lambdaを更新するときに同時にCDKにも差分があった場合、Lambdaは一度削除され、その後新たなソースで作成となっているようで、GUI上のテストデータが都度消えます。(テストデータもCDKで管理可能だと思いますが。)
CDKに差分がなかった場合はLambdaの差分がそのまま更新されて、テストデータが消えません。

現在はLocalStackがM1に対応したみたいで、デプロイも地味に時間がかかるので、LocalStackを導入するのがいいかなと思います。(AWS SAMでもいいかも?)

ディレクトリ構成

src/
├─ @types/
├─ config/
├─ domain/
│  └─ repository/
│     └─ aws/
│     │  ├─ rds.ts
│     │  ├─ route53.ts
│     │  ├─ security-group.ts
│     │  └─ ssm.ts
│     └─ database/
│        ├─ anonymization/
│        │    ├─ db1/
│        │    │  ├─ table1.ts
│        │    │  ├─ table2.ts
│        │    │  ├─ table3.ts
│        │    │   ︙
│        │    ├─ db2/
│        │    ├─ db3/
│        │    └─ index.ts
│        ├─ mysql/
│        │  └─ index.ts
│        ├─ postgres/
│        │  └─ index.ts
│        └─ index.ts
│        └─ query.ts
├─ infrastructure/
│  └─ aws/
│  │  ├─ rds.ts
│  │  ├─ route53.ts
│  │  └─ ssm.ts
│  ├─ faker.ts
│  ├─ mysql.ts
│  ├─ postgres.ts
│  └─ slack.ts
├─ presentation/
│  └─ handler/
│     ├─ anonymization-task.ts
│     ├─ anonymized-database-task.ts
│     ├─ rds-restoration-event.ts
│     └─ scheduled-event.ts
├─ usecase/
│  ├─ anonymize-database.ts
│  ├─ configure-database.ts
│  ├─ kick-restore.ts
│  └─ prepare-anonymization.ts
└─ utils/
   └─ index.ts

1. 本番DBからSnapshotを取得して、新たにステージングDBとなるデータベースを作成

Point

presentation層

presentation/handler/scheduled-event.ts
import { Context } from 'aws-lambda';
import { KickRestoreUsecase } from '../../usecase/kick-restore';
import { send } from '../../infrastructure/slack';

exports.handler = async (event: any, context: Context) => {
  const kickRestoreUsecase = new KickRestoreUsecase();
  try {
    const targetDB = event.targetDB;
    if (!targetDB) {
      throw new Error('対象となるDatabaseが指定されていません');
    }
    const restoreRdsSecurityGroupId = event.restoreRdsSecurityGroupId;
    if (!restoreRdsSecurityGroupId) {
      throw new Error('セキュリティグループIDが渡されていません');
    }
    await kickRestoreUsecase.execute(targetDB, restoreRdsSecurityGroupId);
  } catch (error) {
    await send(
      '#developers',
      `Failed to kick database restoration for staging environment. error: ${error}`,
    );
    console.log({ status: 500, error });
    return { status: 500, error };
  }

  console.log({ status: 200, event });
  return { status: 200, event };
};

usecase層

usecase/kick-restore.ts
import { RdsRepository } from '../domain/repository/aws/rds';
import { DatabaseRepository } from '../domain/repository/database';

export class KickRestoreUsecase {
  async execute(targetDB: string, restoreRdsSecurityGroupId: string) {
    const rdsRepository: RdsRepository = new RdsRepository();
    const databaseRepository: DatabaseRepository = new DatabaseRepository();

    // ターゲットとなるDatabaseの情報を取得
    const targetDatabaseProperty = databaseRepository.getTargetDatabaseProperty(
      targetDB,
    );

    // 最新のSnapshotを取得
    const latestSnapshot: any = await rdsRepository.getLatestDBSnapshot(
      targetDB,
      targetDatabaseProperty,
    );

    const securityGroupIds = [
      restoreRdsSecurityGroupId,
      targetDatabaseProperty.securityGroupId,
    ];
    const parameterGroupNameForCluster = targetDatabaseProperty.parameterGroupNameForCluster
      ? targetDatabaseProperty.parameterGroupNameForCluster
      : null;
    const parameterGroupNameForInstance =
      targetDatabaseProperty.parameterGroupNameForInstance;

    // 取得したSnapshotからRestoreを実行
    return await rdsRepository.restoreDBFromDBSnapshot(
      targetDatabaseProperty.dbIdentifier4RestoredStg,
      latestSnapshot.identifier,
      latestSnapshot.engine,
      securityGroupIds,
      targetDatabaseProperty.dbIdentifier4RestoredStgInstanceInCluster,
      parameterGroupNameForCluster,
      parameterGroupNameForInstance,
      targetDB,
    );
  };
};

repository層

repository/database/index.ts
import { DATABASE_CONFIG } from '../../../config/database';
import { ITargetDBProperty } from '../../../@types';
import { getNowYmdHm } from '../../../utils';

export class DatabaseRepository {
  public getTargetDatabaseProperty(databaseName: string) {
    let targetDBProperty: ITargetDBProperty = {
      dbiResourceId4Prod: '',
      dbIdentifier4Prod: '',
      dbIdentifier4RestoredStg: '',
      dbIdentifier4RestoredStgInstanceInCluster: '',
      securityGroupId: '',
      parameterGroupNameForCluster: '',
      parameterGroupNameForInstance: '',
    };

    switch (databaseName) {
      case 'clusterMysqlDB':
        targetDBProperty = {
          dbiResourceId4Prod: DATABASE_CONFIG.CLUSTER_MYSQL_DB.DBI_RESOURCE_ID.PRODUCTION,
          dbIdentifier4Prod: DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.PRODUCTION,
          dbIdentifier4RestoredStg: `${DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING}-${getNowYmdHm()}`,
          dbIdentifier4RestoredStgInstanceInCluster: `${
            DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING
          }-${getNowYmdHm()}`,
          securityGroupId: DATABASE_CONFIG.CLUSTER_MYSQL_DB.SECURITY_GROUP_ID,
          parameterGroupNameForCluster: DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_GROUP_NAME.CLUSTER,
          parameterGroupNameForInstance: DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_GROUP_NAME.INSTANCE,
        };
        break;
      case 'clusterPostgresDB':
        targetDBProperty = {
          dbiResourceId4Prod: DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DBI_RESOURCE_ID.PRODUCTION,
          dbIdentifier4Prod: DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.PRODUCTION,
          dbIdentifier4RestoredStg: `${DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING}-${getNowYmdHm()}`,
          dbIdentifier4RestoredStgInstanceInCluster: `${DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING}-${getNowYmdHm()}`,
          securityGroupId: DATABASE_CONFIG.CLUSTER_POSTGRES_DB.SECURITY_GROUP_ID,
          parameterGroupNameForCluster: DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_GROUP_NAME.CLUSTER,
          parameterGroupNameForInstance: DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_GROUP_NAME.INSTANCE,
        };
        break;
      case 'instaneMysqlDB':
        targetDBProperty = {
          dbiResourceId4Prod: DATABASE_CONFIG.INSTANCE_MYSQL_DB.DBI_RESOURCE_ID.PRODUCTION,
          dbIdentifier4Prod: DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.PRODUCTION,
          dbIdentifier4RestoredStg: `${DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING}-${getNowYmdHm()}`,
          dbIdentifier4RestoredStgInstanceInCluster: '',
          securityGroupId: DATABASE_CONFIG.INSTANCE_MYSQL_DB.SECURITY_GROUP_ID,
          parameterGroupNameForInstance: DATABASE_CONFIG.INSTANCE_MYSQL_DB.PARAMETER_GROUP_NAME.INSTANCE,
        };
        break;
      default:
        throw new Error(`DatabaseName: ${databaseName} が対象になっているため処理を中止しました`);
    }

    return targetDBProperty;
  }
};
repository/aws/rds.ts
import {
  describeDBClusterSnapshots,
  describeDBInstanceSnapshots,
  restoreDBInstanceFromDBSnapshot,
  restoreDBClusterFromSnapshot,
  createDBInstance,
} from '../../../infrastructure/aws/rds';
import { ILatestSnapshot, ITargetDBProperty } from '../../../@types';
import { PromiseResult } from 'aws-sdk/lib/request';
import { RestoreDBInstanceFromDBSnapshotResult } from 'aws-sdk/clients/rds';
import { AWSError } from 'aws-sdk';

export class RdsRepository {
  public async getLatestDBSnapshot(
    databaseName: string,
    targetDatabaseProperty: ITargetDBProperty
  ) {
    let latestSnapshot: any;
    let latestSnapshotProperty: ILatestSnapshot = {
      identifier: '',
      engine: '',
    };

    switch (databaseName) {
      case 'clusterMysqlDB':
      case 'clusterPostgresDB':
        const dbClusterSnapshots = await describeDBClusterSnapshots(targetDatabaseProperty.dbIdentifier4Prod);
        if (!dbClusterSnapshots) {
          throw new Error(`DB識別子: ${targetDatabaseProperty.dbIdentifier4Prod} のSnapshot一覧が取得できませんでした`);
        }
        latestSnapshot = this.getLatestSnapshot(dbClusterSnapshots.DBClusterSnapshots);
        latestSnapshotProperty = {
          identifier: latestSnapshot.DBClusterSnapshotIdentifier,
          engine: latestSnapshot.Engine,
        };
        break;
      case 'instanceMysqlDB':
        const dbInstanceSnapshots = await describeDBInstanceSnapshots(targetDatabaseProperty.dbiResourceId4Prod);
        if (!dbInstanceSnapshots) {
          throw new Error(
            `DB Resource ID: ${targetDatabaseProperty.dbiResourceId4Prod} のSnapshot一覧が取得できませんでした`
          );
        }
        latestSnapshot = this.getLatestSnapshot(dbInstanceSnapshots.DBSnapshots);
        latestSnapshotProperty = {
          identifier: latestSnapshot.DBSnapshotIdentifier,
          engine: latestSnapshot.Engine,
        };
        break;
      default:
        throw new Error(`DatabaseName: ${databaseName} は対象外のためSnapshotが取得できませんでした`);
    }

    if (!latestSnapshotProperty.engine || !latestSnapshotProperty.identifier) {
      throw new Error(`DatabaseName: ${databaseName} の最新Snapshotの情報が取得できませんでした`);
    }
    return latestSnapshotProperty;
  }

  public async restoreDBFromDBSnapshot(
    restoreDBIdentifier: string,
    baseDBidentifier: string,
    engineName: string,
    securityGroupIds: string[],
    instanceIdentifierInRestoreCluster: string,
    parameterGroupNameForCluster: string | null,
    parameterGroupNameForInstance: string,
  ) {
    const rdsTags = [{ Key: 'databaseType', Value: 'autoRestoredDB' }];
    const instanceClass = 'db.t3.medium'; // memo: 現在は固定値

    let createdDatabase!: PromiseResult<RestoreDBInstanceFromDBSnapshotResult, AWSError>;

    switch (engineName) {
      case 'mysql':
        createdDatabase = await restoreDBInstanceFromDBSnapshot(
          restoreDBIdentifier,
          baseDBidentifier,
          securityGroupIds,
          rdsTags,
          parameterGroupNameForInstance,
          instanceClass
        );
        break;
      case 'aurora-mysql':
      case 'aurora-postgresql':
        if (!parameterGroupNameForCluster) {
          throw new Error('Cluster用のParameterGroupが取得できませんでした');
        }
        await restoreDBClusterFromSnapshot(
          restoreDBIdentifier,
          baseDBidentifier,
          securityGroupIds,
          engineName,
          rdsTags,
          parameterGroupNameForCluster
        )
          .then(async () => {
            createdDatabase = await createDBInstance(
              instanceIdentifierInRestoreCluster,
              restoreDBIdentifier,
              engineName,
              instanceClass,
              parameterGroupNameForInstance,
              rdsTags
            );
          })
          .catch((error) => {
            throw new Error(error);
          });
        break;
      default:
        throw new Error(`EngineName: ${engineName} が対象になっているためリストア処理が実行できませんでした`);
    }
    return createdDatabase;
  }
}

infrastrusture層

infrastructure/aws/rds.ts
import aws = require('aws-sdk');
import { IRdsTags } from '../../@types';
const rds = new aws.RDS({ region: 'ap-northeast-1' });

export const describeDBClusterSnapshots = async (dbIdentifier: string) => {
  return await rds
    .describeDBClusterSnapshots({
      DBClusterIdentifier: dbIdentifier,
    })
    .promise();
};

export const describeDBInstanceSnapshots = async (dbResourceId: string) => {
  return await rds
    .describeDBSnapshots({
      DbiResourceId: dbResourceId,
    })
    .promise();
};

export const restoreDBInstanceFromDBSnapshot = async (restoreDBIdentifier: string, baseDBidentifier: string, rdsSecurityGroupId: string, rdsTags: IRdsTags[]) => {
  return await rds
    .restoreDBInstanceFromDBSnapshot({
      DBInstanceIdentifier: restoreDBIdentifier,
      DBSnapshotIdentifier: baseDBidentifier,
      Tags: rdsTags,
      VpcSecurityGroupIds: [rdsSecurityGroupId],
    })
    .promise();
};

export const restoreDBClusterFromSnapshot = async (restoreDBIdentifier: string, baseDBidentifier: string, rdsSecurityGroupId: string, engineName: string, rdsTags: IRdsTags[]) => {
  return await rds
    .restoreDBClusterFromSnapshot({
      DBClusterIdentifier: restoreDBIdentifier,
      SnapshotIdentifier: baseDBidentifier,
      Engine: engineName,
      Tags: rdsTags,
      VpcSecurityGroupIds: [rdsSecurityGroupId],
    })
    .promise();
};

export const createDBInstance = async(instanceIdentifierInRestoreCluster: string, restoreDBIdentifier: string, engineName: string, instanceClass: string) => {
  return await rds
    .createDBInstance({
      DBInstanceIdentifier: instanceIdentifierInRestoreCluster,
      DBInstanceClass: instanceClass,
      Engine: engineName,
      DBClusterIdentifier: restoreDBIdentifier,
      CopyTagsToSnapshot: true,
    })
    .promise();
};

ここまでできると単体でLambdaを動かしたときに、最新のSnapshotからリストアされたデータベースが生成されると思います。

2. Masking処理の準備

すること

  • 新たに作成したDBの接続情報を取得
  • 対象となるDBのMasking対象となるテーブル一覧を取得しStep Functionへ渡す

Point

  • リストア対象外のDatabaseだった場合はFaileとして扱いたくないので、純粋に処理を終了させるようにしました。

presentation層

presentation/handler/rds-restoration-event.ts
import * as aws from 'aws-sdk';
import { Context } from 'aws-lambda';
import { PrepareAnonymizationUsecase } from '../../usecase/prepare-anonymization';
import { send } from '../../infrastructure/slack';
import { IDatabase } from '../../@types';
import { DatabaseRepository } from '../../domain/repository/database';

exports.handler = async (event: any, context: Context) => {
  const prepareAnonymizationUsecase = new PrepareAnonymizationUsecase();
  try {
    const body = JSON.parse(event.Records[0].body);
    const message = JSON.parse(body.Message);
    const databaseRepository: DatabaseRepository = new DatabaseRepository();

    const {
      extractedDBIdentifier,
      commonDBIdentifier,
    } = await prepareAnonymizationUsecase.getDBIdentifierFromMessage(message);

    const isTargetDatabase = databaseRepository.checkTargetDatabase(
      commonDBIdentifier,
    );
    // memo: リストア対象外のDatabaseだった場合、処理を終了。
    if (!isTargetDatabase) {
      return false;
    }

    const restoredDBConnectInfo: IDatabase = await prepareAnonymizationUsecase.getRestoredDBConnectInfo(
      extractedDBIdentifier,
      commonDBIdentifier,
    );

    const {
      name,
      instanceId,
      host,
      username,
      password,
    } = restoredDBConnectInfo;
    const stateMachineArn: string = process.env.SFN_ARN || '';
    const { anonymizationTasks } = await prepareAnonymizationUsecase.execute(
      name,
    );

    const params = {
      stateMachineArn,
      input: JSON.stringify({
        database: {
          name,
          instanceId,
          host,
          username,
          password,
        },
        tasks: anonymizationTasks,
      }),
    };
    const stepfunctions = new aws.StepFunctions();
    await stepfunctions.startExecution(params).promise(); // memo: awaitしないとStepFunctionの処理開始されない
  } catch (error) {
    await send(
      '#developers',
      `Failed to prepare database anonymization for staging environment. error: ${error}`,
    );
    console.log({ status: 500, event, error });
    return { status: 500, event, error };
  }

  console.log({ status: 200, event });
  return { status: 200, event };
};

usecase層

usecase/prepare-anonymization.ts
import { RdsRepository } from '../domain/repository/aws/rds';
import { SsmRepository } from '../domain/repository/aws/ssm';
import { DatabaseRepository } from '../domain/repository/database';
import { IDatabase, IMessage } from '../@types';

export class PrepareAnonymizationUsecase {
  async getDBIdentifierFromMessage(message: IMessage) {
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const extractedDBIdentifier = await databaseRepository.getExtractDBIdentifier(message);
    if (!extractedDBIdentifier) {
      throw new Error('');
    }
    const commonDBIdentifier = extractedDBIdentifier.split(/-\d+/)[0]; // memo: DB識別子の"-YYYYMMDD"以降を削除
    if (!commonDBIdentifier) {
      throw new Error('');
    }
    return { extractedDBIdentifier, commonDBIdentifier };
  }

  async getRestoredDBConnectInfo(extractedDBIdentifier: string, commonDBIdentifier: string) {
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const rdsRepository: RdsRepository = new RdsRepository();
    const ssmRepository: SsmRepository = new SsmRepository();

    const describeDatabase = await rdsRepository.postDescribedDatabase(commonDBIdentifier, extractedDBIdentifier);
    if (!describeDatabase) {
      throw new Error('Databaseの情報が取得できませんでした');
    }

    const databaseType = describeDatabase?.TagList?.[0]?.Value;
    const isAutoRestoredDB: boolean = databaseType === 'autoRestoredDB';
    if (!isAutoRestoredDB) {
      throw new Error('自動実行のDatabaseでなかったため処理を終了します。');
    }

    const name = databaseRepository.getDatabaseNameFromDBIdentifier(commonDBIdentifier);
    if (!name) {
      throw new Error('DatabaseNameが取得できませんでした');
    }
    const host = databaseRepository.getDatabaseHostFromDBName(name, describeDatabase);
    const { username, password } = await ssmRepository.postUsernameAndPassword4Production(name);

    const database: IDatabase = {
      name,
      instanceId: extractedDBIdentifier,
      host,
      username,
      password,
    };

    return database;
  }

  async execute(databaseName: string) {
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const anonymizationTasks = databaseRepository.getAnonymizationTasks(databaseName);
    return { anonymizationTasks };
  }
}

repository層

domain/repository/aws/rds.ts
import {
  describeDBClusters,
  describeDBInstances,
} from '../../../infrastructure/aws/rds';
import { DATABASE_CONFIG } from '../../../config/database';

export class RdsRepository {
  public async postDescribedDatabase(
    commonDBIdentifier: string,
    extractedDBIdentifier: string,
  ) {
    let describeDatabase: any;
    switch (commonDBIdentifier) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING:
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING:
        const describeDatabaseCluster = await describeDBClusters(
          extractedDBIdentifier,
        );
        describeDatabase = describeDatabaseCluster?.DBClusters?.[0];
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING:
        const describeDatabaseInstance = await describeDBInstances(
          extractedDBIdentifier,
        );
        describeDatabase = describeDatabaseInstance?.DBInstances?.[0];
        break;
      default:
        throw new Error(
          'リストア対象のDatabaseでなかったため処理を終了します。',
        );
    }
    return describeDatabase;
  }
}
domain/repository/aws/ssm.ts
import { getParameter } from '../../../infrastructure/aws/ssm';
import { DATABASE_CONFIG } from '../../../config/database';

export class SsmRepository {
  public async postUsernameAndPassword4Production(databaseName: string) {
    let usernameParameterKey: string = '';
    let passwordParameterKey: string = '';

    switch (databaseName) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_STORE.PRODUCTION.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_STORE.PRODUCTION.PASSWORD;
        break;
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_STORE.PRODUCTION.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_STORE.PRODUCTION.PASSWORD;
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.INSTANCE_MYSQL_DB.PARAMETER_STORE.PRODUCTION.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.INSTANCE_MYSQL_DB.PARAMETER_STORE.PRODUCTION.PASSWORD;
        break;
      default:
        throw new Error('パラメータストアの情報を取得できません');
    }

    const usernameParameter = await getParameter(usernameParameterKey);
    const username = usernameParameter?.Parameter?.Value;
    if (!username) {
      throw new Error('接続確認用ユーザーの取得に失敗しました');
    }

    const passwordParameter = await getParameter(passwordParameterKey);
    const password = passwordParameter?.Parameter?.Value;
    if (!password) {
      throw new Error('接続確認用パスワードの取得に失敗しました');
    }

    return { username, password };
  }
}
domain/repository/database.ts
import { DATABASE_CONFIG } from '../../../config/database';
import { IMessage } from '../../../@types';

export class DatabaseRepository {
  public getDatabaseNameFromDBIdentifier(dbIdentifier: string) {
    let databaseName: string = '';
    switch (dbIdentifier) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING:
        databaseName = DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME;
        break;
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING:
        databaseName = DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME;
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING:
        databaseName = DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME;
        break;
      default:
        throw new Error(
          'リストア対象のDatabaseでなかったため処理を終了します。',
        );
    }
    return databaseName;
  }
  
  public getDatabaseHostFromDBName(databaseName: string, describeDatabase: any) {
    let host: string;
    switch (databaseName) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        host = describeDatabase?.Endpoint;
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        host = describeDatabase?.Endpoint?.Address;
        break;
      default:
        throw new Error(`DatabaseName: ${databaseName} が対象になっているためHost取得の処理を中止しました`);
    }
    if (!host) {
      throw new Error(`DatabaseName: ${databaseName} のHostが取得できませんでした`);
    }
    return host;
  }

  public async getExtractDBIdentifier(message: IMessage) {
    let dbIdentifier: string;
    const matchedDBClusterId = message['Identifier Link'].match(
      /dbclusters:id=(.*)/,
    );
    const matchedDBInstanceId = message['Identifier Link'].match(
      /dbinstance:id=(.*)/,
    );

    if (matchedDBClusterId) {
      dbIdentifier = matchedDBClusterId && matchedDBClusterId[1];
    } else if (matchedDBInstanceId) {
      dbIdentifier = matchedDBInstanceId && matchedDBInstanceId[1];
    } else {
      throw new Error('対象DatabaseのIdentifierIDが取得できませんでした');
    }

    if (!dbIdentifier) {
      throw new Error('Database識別子の取得に失敗しました');
    }

    return dbIdentifier;
  }
  
  public getAnonymizationTasks(databaseName: string) {
    const fileNames = readdirSync(`./domain/repository/database/anonymization/${databaseName}`);
    const tasks = fileNames
      .filter((fileName) => /\.js$/.test(fileName))
      .map((fileName) => fileName.replace(/\.js$/, ''));
    return tasks;
  }
  
  public checkTargetDatabase(databaseName: string) {
    const targetDatabase = [
      DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING,
      DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING,
      DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING,
    ];
    return targetDatabase.includes(databaseName);
  }
}

infrastrusture層

infrastructure/aws/rds.ts
import aws = require('aws-sdk');
import { IRdsTags } from '../../@types';
const rds = new aws.RDS({ region: 'ap-northeast-1' });

export const describeDBClusters = async (dbIdentifier: string) => {
  return await rds
    .describeDBClusters({
      DBClusterIdentifier: dbIdentifier,
    })
    .promise();
};

export const describeDBInstances = async (dbIdentifier: string) => {
  return await rds
    .describeDBInstances({
      DBInstanceIdentifier: dbIdentifier,
    })
    .promise();
};
infrastructure/aws/ssm.ts
import * as aws from 'aws-sdk';
const ssm = new aws.SSM({ region: 'ap-northeast-1' });

export const getParameter = async (parameterName: string) => {
  return await ssm
    .getParameter({
      Name: parameterName,
      WithDecryption: true,
    })
    .promise();
};

SDK前半終了

本番DBからSnapshotを取得して、新たにステージングDBとなるデータベースを作成し、Masking処理の準備までができました。

ここまででリストアされたStagingのデータベースに接続できるようになっていると思います。

続いてSDK後半、まずは各テーブルのMasking処理を書いていきます。

第3部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編後半~に続きます。

採用情報

採用活動も積極的に行っていますのでもし興味あれば以下コーポレートサイト等を覗いてみてください。

https://corp.air-closet.com/recruiting/
https://corp.air-closet.com/recruiting/developers/
エンジニアコーポレートサイト
https://youtu.be/w99lPd-Uea0

Discussion