🏀

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】3 ~SDK編後半~

2022/01/16に公開約36,300字

前回

第1部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】1~CDK構築編~

第2部: 本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作りました【AWS CDK/SDK】2 ~SDK編前半~

はじめに

株式会社エアークローゼットでエンジニアをしている三好 @miyomiyo344_ です。

本番環境のデータをマスクしてステージング環境に自動同期する仕組みを作った話しで、前回の続きです。

第1部では今回の目的や要件から、AWS CDKで全体の構成を構築したところを、
第2部ではAWS SDK for JavaScript にて、実際のLambdaの処理の前半を書きました。
最後となる第3部の本記事ではLambdaの処理の後半を書きます。

3.各テーブルのMasking処理を実行

Point

  • ORM
    • ORMの導入も検討しましたが、Masking自体は複雑な処理ではないため、生のMySQL/PostgreSQLで処理を行っています。
  • Maskingのランダム値
    • テーブルによってはMaskingされたランダムの値を入れたい場合があったため、少し複雑になってしまったものもあります。
  • Chunk
    • 処理が重すぎるとDatabaseのConnectionが切れることがあったため、Chunkして処理を実行しているテーブルもあります。当たり前ですが実行時間はその分長くなるので、できるだけしないほうがいいです。
  • NULLカラム
    • Maskingにて、もともとNULLのカラムはNULLのままにしています。(本番のデータに近づけるため。)
  • query vs execute
    • connectionのqueryメソッドはSQLインジェクションができてしまいますが、executeメソッドだと処理が途中で終わってしまうことがあった(レコード数が何万とか多い場合)のでqueryを使っています。
  • postgres role
    • クエリで旧ステージングDBからダンプを実行(pg_dumpall)しようとしましたが、権限的に実行ができなかったため、SpreadSheetから接続情報を取ってきて、CreateUserするようにしています。使用を検討したライブラリのpgdump-aws-lambda
    • ALL PRIVILAGESだとCREATEROLE権限はつきません。

presentation層

presentation/handler/anonymization-task.ts
import { Context } from 'aws-lambda';
import { AnonymizeDatabaseUsecase } from '../../usecase/anonymize-database';
import { send }  from '../../infrastructure/slack';

exports.handler = async (event: any, context: Context) => {
  const anonymizeDatabaseUsecase = new AnonymizeDatabaseUsecase();

  try {
    await anonymizeDatabaseUsecase.execute(event);
  } catch (error) {
    await send('#developers', `Failed to anonymize database for staging environment. error: ${error}`);
    console.log({ status: 500, event, error });
    return { status: 500, event, error };
  };

  console.log({ status: 200, event });
  return { status: 200, event };
};

usecase層

usecase/anonymize-database.ts
import { IAnonymization } from '../@types';

export class AnonymizeDatabaseUsecase {
  async execute({ database, task }: {
    database: {
      name: string,
      instanceId: string,
      host: string,
      username: string,
      password: string,
    },
    task: string,
  }) {
    const { anonymize }: IAnonymization = await import(
      `../domain/repository/database/anonymization/${database.name}/${task}`
    );
    const databaseConnectInfo = {
      name: database.name,
      instanceId: database.instanceId,
      host: database.host,
      username: database.username,
      password: database.password,
    };

    return await anonymize(databaseConnectInfo);
  }
}

repository層

Masking例

以下のファイルがMasking処理で、各Database、および各Tableごとにあるイメージです。
Masking方法はTableごとに様々なので例をいくつか記載します。

domain/repository/database/anonymization/database1/users.ts
import { DatabaseRepository } from '../../../database';
import { QueryRepository } from '../query';
import { IDatabase } from '../../../../../@types';

export const anonymize = async (database: IDatabase) => {
  try {
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const queryRepository: QueryRepository = new QueryRepository();
    const connection = await databaseRepository.connectionDatabase(database);
    if (!connection) {
      throw new Error('Databaseへの接続に失敗しました');
    }

    const tableName = 'users';

    const updateSql = `
      UPDATE ${tableName}
      SET ${tableName}.updated_at = NOW(),
          ${tableName}.${queryRepository.getLoginEmail(tableName, 'email')},
          ${tableName}.${queryRepository.getPassword(
            'password',
          )}
      ;
    `;

    await connection.query(updateSql);
    await databaseRepository.disConnectionDatabase(connection, database.name);
    return;
  } catch (error) {
    throw new Error(error);
  }
};
domain/repository/database/anonymization/database1/user_information.ts
import { DatabaseRepository } from '../../../database';
import { AnonymizationRepository } from '..';
import { QueryRepository } from '../query';
import { IDatabase } from '../../../../../@types';

export const anonymize = async (database: IDatabase) => {
  try {
    const anonymizationRepository: AnonymizationRepository = new AnonymizationRepository();
    const queryRepository: QueryRepository = new QueryRepository();
    const databaseRepository: DatabaseRepository = new DatabaseRepository();

    const connection = await databaseRepository.connectionDatabase(database);
    if (!connection) {
      throw new Error('Databaseへの接続に失敗しました');
    }

    const tableName = 'user_information';
    const findAllSql = `
      SELECT id, nick_name, fist_name, last_name, fist_name_kana, last_name_kana, address, tel, profile_picture, face_picture
      FROM ${tableName}
      ;
    `;

    const [rows] = await connection.query(findAllSql);

    // memo: 処理が重すぎて失敗することがあったので、レコード数が多かった場合はChunkする
    const isChunkPromise = anonymizationRepository.getIsChunkPromise(rows);

    // memo: 一定数でChunkして処理を実行
    const chunkPromiseCount = anonymizationRepository.getChunkPromiseCount();

    const updateGroup: any = {};
    let updateSqlList = [];

    // memo: ランダムとなる値のすべての組み合わせを抽出
    for (const userInformation of rows) {
      let lastName = anonymizationRepository.getFakeUserLastName(
        userInformation.last_name,
      );
      let firstName = anonymizationRepository.getFakeUserFirstName(
        userInformation.first_name,
      );
      let facePicture = anonymizationRepository.getDummyFacePicture(
        userInformation.face_picture,
      );
      let profilePicture = anonymizationRepository.getDummySystemicPicture(
        userInformation.profile_picture,
      );

      updateGroup[lastName || 'null'] = updateGroup[lastName || 'null'] || {};
      updateGroup[lastName || 'null'][firstName || 'null'] =
        updateGroup[lastName || 'null'][firstName || 'null'] || {};
      updateGroup[lastName || 'null'][firstName || 'null'][
        facePicture || 'null'
      ] =
        updateGroup[lastName || 'null'][firstName || 'null'][
          facePicture || 'null'
        ] || {};
      updateGroup[lastName || 'null'][firstName || 'null'][
        facePicture || 'null'
      ][profilePicture || 'null'] =
        updateGroup[lastName || 'null'][firstName || 'null'][
          facePicture || 'null'
        ][profilePicture || 'null'] || [];
      updateGroup[lastName || 'null'][firstName || 'null'][
        facePicture || 'null'
      ][profilePicture || 'null'].push(userInformation.id);
    }
    let idx = 0;
    const lastNameKeys = Object.keys(updateGroup);

    for (let i = 0, length = lastNameKeys.length; i < length; i++) {
      const lastName = lastNameKeys[i];
      const firstNameKeys = Object.keys(updateGroup[lastName]);

      for (let k = 0, length = firstNameKeys.length; k < length; k++) {
        const firstName = firstNameKeys[k];
        const facePictureKeys = Object.keys(updateGroup[lastName][firstName]);

        for (let m = 0, length = facePictureKeys.length; m < length; m++) {
          const facePicture = facePictureKeys[m];
          const profilePictureKeys = Object.keys(
            updateGroup[lastName][firstName][facePicture],
          );

          for (let j = 0, length = profilePictureKeys.length; j < length; j++) {
            const profilePicture = profilePictureKeys[j];
            const updateIds =
              updateGroup[lastName][firstName][facePicture][profilePicture];

            const updateSql = `
              UPDATE ${tableName}
              SET ${tableName}.updated_at = NOW(),
                  ${tableName}.last_name = ${lastName},
                  ${tableName}.fist_name = ${firstName},
                  ${tableName}.nick_name = ${firstName},
                  ${tableName}.face_picture = ${facePicture},
                  ${tableName}.profile_picture = ${profilePicture},
                  ${tableName}.${queryRepository.getLastNameKana(
              'last_name_kana',
            )},
                  ${tableName}.${queryRepository.getFirstNameKana(
              'fist_name_kana',
            )},
                  ${tableName}.${queryRepository.getStreet('address')},
                  ${tableName}.${queryRepository.getTel('tel')},
              WHERE ${tableName}.id IN(${updateIds.join()});
            `;

            updateSqlList.push(updateSql);
            idx++;

            if (isChunkPromise) {
              if (idx !== 0 && idx % chunkPromiseCount === 0) {
                await Promise.all(
                  updateSqlList.map((sql) => connection.query(sql)),
                );
                updateSqlList = [];
              }
            } else {
              await Promise.all(
                updateSqlList.map((sql) => connection.query(sql)),
              );
            }
          }
        }
      }
    }
    if (idx > 0) {
      await Promise.all(updateSqlList.map((sql) => connection.query(sql)));
    }

    await databaseRepository.disConnectionDatabase(connection, database.name);
    return;
  } catch (error) {
    throw new Error(error);
  }
};

Masking用メソッド例

domain/repository/database/anonymization/index.ts
import {
  getFakeFirstName,
  getFakeLastName,
} from '../../../../infrastructure/faker';
import { getRandomYmd } from '../../../../utils';

export class AnonymizationRepository {
  public getChunkPromiseCount(): number {
    return 100; // memo: 固定値
  }

  public getIsChunkPromise(records: object[]): boolean {
    return records.length > 500; // memo: 固定値。1000では動かなくなるテーブルがあった経緯。
  }

  // memo: ランダムで名前を取得
  public getFakeUserLastName(lastNameData: string | null) {
    let lastName = null;
    if (lastNameData) {
      lastName = `'${getFakeLastName()}'`;
    }
    return lastName;
  }

  public getFakeUserFirstName(firstNameData: string | null) {
    let firstName = null;
    if (firstNameData) {
      firstName = `'${getFakeFirstName()}'`;
    }
    return firstName;
  }

  public getDummyUserLastNameKana(lastNameKanaData: string | null) {
    let lastNameKana = null;
    if (lastNameKanaData) {
      lastNameKana = `'ミョウジ'`;
    }
    return lastNameKana;
  }

  public getDummyUserFirstNameKana(firstNameKanaData: string | null) {
    let firstNameKana = null;
    if (firstNameKanaData) {
      firstNameKana = `'ナマエ'`;
    }
    return firstNameKana;
  }

  public getDummyBirthday(birthdayData: string | null) {
    let birthday = null;
    if (birthdayData) {
      birthday = `'${getRandomYmd('1950/01/01', '2000/12/31')}'`;
    }
    return birthday;
  }

  public getDummyStreet(streetData: string | null) {
    let street = null;
    if (streetData) {
      street = `'1-1-1'`;
    }
    return street;
  }
}
domain/repository/database/anonymization/query.ts
export class QueryRepository {
  public getDummyEmail(tableName: string, columnName: string) {
    return `
      ${columnName} =
        CASE WHEN ${columnName} IS NOT NULL THEN (
          CONCAT('test_' , CAST(${tableName}.id AS CHAR) , '@hoge.com')
        ) ELSE NULL END
    `;
  }

  public getStreet(columnName: string) {
    return ` ${columnName} = CASE WHEN ${columnName} IS NOT NULL THEN '1-1-1' ELSE NULL END `;
  }

  public getFirstName(columnName: string) {
    return ` ${columnName} = CASE WHEN ${columnName} IS NOT NULL THEN '太郎' ELSE NULL END `;
  }

  public getFirstNameKana(columnName: string) {
    return ` ${columnName} = CASE WHEN ${columnName} IS NOT NULL THEN 'ナマエ' ELSE NULL END `;
  }

  public getBirthday(columnName: string) {
    return ` ${columnName} = CASE WHEN ${columnName} IS NOT NULL THEN '2000-12-25 00:00:00' ELSE NULL END `;
  }
}

共通処理

domain/repository/database/index.ts
import { MySqlRepository } from './mysql';
import { PostgresRepository } from './postgres';
import { DATABASE_CONFIG } from '../../../config/database';
import { IDatabase } from '../../../@types';

export class DatabaseRepository {
  public async connectionDatabase(database: IDatabase) {
    const mySqlRepository: MySqlRepository = new MySqlRepository();
    const postgresRepository: PostgresRepository = new PostgresRepository();
    let connection: any;

    switch (database.name) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        connection = await mySqlRepository.createPool(database);
        break;
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        connection = await postgresRepository.connection(database);
        break;
      default:
        throw new Error(
          `DatabaseName: ${database.name} は接続できないDatabaseです`,
        );
    }
    if (!connection) {
      throw new Error(
        `Database識別子: ${database.instanceId} へ接続に失敗しました`,
      );
    }
    return connection;
  }

  public async disConnectionDatabase(connection: any, databaseName: string) {
    const mySqlRepository: MySqlRepository = new MySqlRepository();
    const postgreRepository: PostgresRepository = new PostgresRepository();

    switch (databaseName) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        return await mySqlRepository.disConnection(connection);
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        return await postgreRepository.disConnection(connection);
      default:
        throw new Error('切断できないDatabaseです');
    }
  }
}
domain/repository/database/mysql/index.ts
import { createPool, disConnection, } from '../../../../infrastructure/mysql';
import { IDatabase } from '../../../../@types';

export class MySqlRepository {
  public async createPool(database: IDatabase) {
    return await createPool(database);
  }

  public async disConnection(connection: any) {
    return await disConnection(connection);
  }
};
domain/repository/database/postgres/index.ts
import { IDatabase } from '../../../../@types';
import { connection, disConnection } from '../../../../infrastructure/postgres';

export class PostgresRepository {
  public async connection(database: IDatabase) {
    return await connection(database);
  };

  public async disConnection(connection: any) {
    return await disConnection(connection);
  };
};

infrastrusture層

infrastructure/mysql.ts
import * as mysql from 'mysql2/promise';
import { IDatabase } from '../@types';

export const createPool = async (database: IDatabase) => {
  let pool!: mysql.Pool;
  pool = await mysql.createPool({
    host: database.host,
    user: database.username,
    password: database.password,
    database: database.name,
    connectionLimit: 100,
    connectTimeout: 1500000,
  });
  pool.getConnection();
  return pool;
};

export const disConnection = async (connection: mysql.Pool | mysql.Connection) => {
  return await connection.end();
};
infrastructure/postgres.ts
import pg from 'pg';
import { IDatabase } from '../@types';

export const connection = async (database: IDatabase) => {
  let pool!: pg.Pool;
  const config = {
    host: database.host,
    user: database.username,
    password: database.password,
    database: database.name,
    port: 5432,
    idleTimeoutMillis: 6000000,
  };

  pool = new pg.Pool(config);
  await pool.connect()
  return pool;
};

export const disConnection = async (connection: any) => {
  return (await connection.connect()).release();
};

4.後処理

すること

  1. 新しいDBに旧ステージングDBの接続ユーザー情報を追加
  2. ステージング用の接続ユーザーで接続確認
  3. DNSの付け替え
  4. 旧ステージングDBの削除

Point

  • MySQLは接続ユーザーをInsertしたあとに、 FLUSH PRIVILEGES; すると接続情報が反映される
  • mysql接続ユーザー
    • rdsadmin rdsrepladmin:  Deleteできない
    • prod-restore-rds4staging: 接続している自分自身になるのでDeleteしない。
  • StepFunctionとLambdaのタイムアウトの仕組み
    • Lambdaタイムアウト前にstepFunctionのタイムアウト迎えた場合
      • StepFunction自体はタイムアウト扱い。StepFunctionの中のLambdaの判定はキャンセル済み扱い。

presentation層

presentation/handler/anonymized-database-task.ts
import { Context } from 'aws-lambda';
import { ConfigureDatabaseUsecase } from '../../usecase/configure-database';
import { send }  from '../../infrastructure/slack';
import { IDatabase } from '../../@types';

exports.handler = async (event: any, context: Context) => {
  const configureDatabaseUsecase = new ConfigureDatabaseUsecase();
  const databaseConnectInfo: IDatabase = {
    name: event.name,
    instanceId: event.instanceId,
    host: event.host ? event.host : event.host.Address, // memo: clusterとinstanceでプロパティの持ち方が異なる
    username: event.username,
    password: event.password,
  };

  try {
    await configureDatabaseUsecase.execute(databaseConnectInfo);
  } catch (error) {
    await send('#developers', `Failed to configure database for staging environment. error: ${error}`);
    console.log({ status: 500, event, error });
    return { status: 500, event, error };
  }

  console.log({ status: 200, event });
  return { status: 200, event };
};

usecase層

import { RdsRepository } from '../domain/repository/aws/rds';
import { DatabaseRepository } from '../domain/repository/database';
import { IDatabase } from '../@types';
import { Route53Repository } from '../domain/repository/aws/route53';
import { SsmRepository } from '../domain/repository/aws/ssm';
import { CONFIG } from '../config';
import { SpreadsheetRepository } from '../domain/repository/spreadsheet';

export class ConfigureDatabaseUsecase {
  async execute(afterDBConnectionInfo: IDatabase) {
    const rdsRepository: RdsRepository = new RdsRepository();
    const route53Repository: Route53Repository = new Route53Repository();
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const ssmRepository: SsmRepository = new SsmRepository();
    const spreadsheetRepository: SpreadsheetRepository = new SpreadsheetRepository();

    const targetDBName: string = afterDBConnectionInfo.name;
    const dbIdentifier: string = afterDBConnectionInfo.instanceId;
    const hostedZone = await route53Repository.getTargetHostedZone(
      CONFIG.HOSTED_ZONE.NAME,
    );
    const beforeHostName = await route53Repository.getHostNameFromRoute53(
      targetDBName,
      hostedZone,
    );
    const mysqlEngines = ['mysql', 'aurora-mysql'];
    const postgresqlEngines = ['postgres', 'aurora-postgresql'];

    let matchedIdentifier: RegExpMatchArray | null;
    matchedIdentifier = beforeHostName.match(/(.*)-\d{12}/);

    if (!matchedIdentifier) {
      throw new Error(
        `DatabaseName: ${afterDBConnectionInfo.name} のDB識別子が取得できませんでした`,
      );
    }

    const beforeDBDescribe = await rdsRepository.postDescribedDatabaseFromEndpoint(
      matchedIdentifier,
    );
    const {
      username,
      password,
    } = await ssmRepository.postUsernameAndPassword4Staging(targetDBName);
    const beforeDBConnectionInfo = {
      name: targetDBName,
      instanceId: matchedIdentifier[0],
      host: beforeHostName,
      username: username,
      password: password,
    };

    const afterDBConnection = await databaseRepository.connectionDatabase(
      afterDBConnectionInfo,
    );
    const beforeDBConnection = await databaseRepository.connectionDatabase(
      beforeDBConnectionInfo,
    );

    const isMysqlEngine = mysqlEngines.includes(beforeDBDescribe.Engine);
    const isPostgresqlEngine = postgresqlEngines.includes(
      beforeDBDescribe.Engine,
    );

    if (isMysqlEngine) {
      await databaseRepository.deleteConnectionUsers(
        afterDBConnection,
        beforeDBDescribe.Engine,
      );
      const connectionUsers = await databaseRepository.selectConnectionUsers(
        beforeDBConnection,
        beforeDBDescribe.Engine,
      );
      await databaseRepository.insertConnectionUsers(
        targetDBName,
        afterDBConnection,
        beforeDBDescribe.Engine,
        connectionUsers,
      );
    } else if (isPostgresqlEngine) {
      const createRoleSqls = await spreadsheetRepository.getCreateRoleSqls(
        process.env.SPREADSHEET_ID || '',
        process.env.WORKSHEET_ID || '',
      );
      await databaseRepository.insertConnectionUsers(
        targetDBName,
        afterDBConnection,
        beforeDBDescribe.Engine,
        createRoleSqls,
      );
    } else {
      throw new Error('接続ユーザーの追加に失敗しました');
    }

    // memo: Insertしたユーザーで接続できるかを確認するために旧DBと新DBとの接続を一度切る
    await databaseRepository.disConnectionDatabase(
      beforeDBConnection,
      beforeDBConnectionInfo.name,
    );
    await databaseRepository.disConnectionDatabase(
      afterDBConnection,
      afterDBConnectionInfo.name,
    );

    // memo: 新しいDatabaseにステージング用のユーザーで接続できるか確認
    await databaseRepository.checkDatabaseConnection(
      afterDBConnectionInfo,
      beforeDBDescribe.Engine,
      username,
      password,
    );

    // memo: DNSの設定(該当のstg環境のCNAMEを新しくrestoreされたDBに付け替える)。
    const dbDescribe = await rdsRepository.postDescribedDatabaseFromDBName(
      targetDBName,
      dbIdentifier,
    );
    const endpoint: string = dbDescribe.Endpoint.Address
      ? dbDescribe.Endpoint.Address
      : dbDescribe.Endpoint;
    if (!endpoint) {
      throw new Error('RDSのEndpointが取得できませんでした');
    }
    await route53Repository.route53ChangeCurrentDBToRestoredDB(
      targetDBName,
      endpoint,
      hostedZone.Id,
    );

    // memo: 旧DBの削除
    await rdsRepository.deleteDatabase(matchedIdentifier);
    return;
  }
}

repository層

domain/repository/aws/rds.ts
import {
  describeDBClusters,
  describeDBInstances,
  deleteDBCluster,
  deleteDBInstance,
} from '../../../infrastructure/aws/rds';
import { DATABASE_CONFIG } from '../../../config/database';

export class RdsRepository {
  public async postDescribedDatabaseFromDBName(
    dbName: string,
    identifier: string,
  ) {
    let describeDatabase: any;
    switch (dbName) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        const describeDatabaseCluster = await describeDBClusters(identifier);
        describeDatabase = describeDatabaseCluster?.DBClusters?.[0];
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        const describeDatabaseInstance = await describeDBInstances(identifier);
        describeDatabase = describeDatabaseInstance?.DBInstances?.[0];
        break;
      default:
        throw new Error(
          '想定外のDatabaseDescribeを取得しようとしたため処理を終了します',
        );
    }
    return describeDatabase;
  }

  public async postDescribedDatabaseFromEndpoint(
    matchedIdentifier: RegExpMatchArray,
  ) {
    const targetIdentifier = matchedIdentifier[0];
    const commonIdentifier = matchedIdentifier[1];

    let describeDatabase: any;
    switch (commonIdentifier) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING:
        const describeDatabaseCluster = await describeDBClusters(
          targetIdentifier,
        );
        describeDatabase = describeDatabaseCluster?.DBClusters?.[0];
        break;
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING:
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING:
        const describeDatabaseInstance = await describeDBInstances(
          targetIdentifier,
        );
        describeDatabase = describeDatabaseInstance?.DBInstances?.[0];
        break;
      default:
        throw new Error('想定外の識別子のためDB情報取得ができません');
    }
    if (!describeDatabase) {
      throw new Error('DB情報の取得に失敗しました');
    }
    return describeDatabase;
  }

  public async deleteDatabase(matchedIdentifier: RegExpMatchArray) {
    const targetIdentifier = matchedIdentifier[0];
    const commonIdentifier = matchedIdentifier[1];

    switch (commonIdentifier) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING:
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING:
        return await deleteDBCluster(targetIdentifier);
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING:
        return await deleteDBInstance(targetIdentifier);
      default:
        throw new Error('対象外のDatabaseは削除できません');
    }
  }
}
domain/repository/database/index.ts
import { DATABASE_CONFIG } from '../../../config/database';
import { IDatabase } from '../../../@types';

export class DatabaseRepository {
  public async deleteConnectionUsers(
    connection: any,
    engine: string,
    users?: any[],
  ) {
    switch (engine) {
      case 'mysql':
      case 'aurora-mysql':
        const deleteUsers4MySql = `DELETE FROM mysql.user WHERE User NOT IN ('rdsadmin', 'rdsrepladmin', 'prod-restore-rds4staging') ;`;
        return await connection.execute(deleteUsers4MySql);
      default:
        throw new Error(
          `engineType: ${engine} のDatabase接続ユーザーの削除はできません`,
        );
    }
  }

  public async selectConnectionUsers(connection: any, engine: string) {
    switch (engine) {
      case 'mysql':
      case 'aurora-mysql':
        const findAllUsers4MySql = `SELECT * FROM mysql.user ;`;
        const mysqlConnectionUsers = await connection.execute(
          findAllUsers4MySql,
        );
        return mysqlConnectionUsers[0];
      default:
        throw new Error(
          `engineType: ${engine} のDatabase接続ユーザーの取得はできません`,
        );
    }
  }

  public async insertConnectionUsers(
    targetDBName: string,
    connection: any,
    engine: string,
    connectionUsers: any[],
  ) {
    switch (engine) {
      case 'mysql':
      case 'aurora-mysql':
        let insertSql4MySql: string;
        let accessUserData: any[] = [];

        switch (targetDBName) {
          case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
            insertSql4MySql =
              'INSERT INTO mysql.user (Host, User, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Reload_priv, Shutdown_priv, Process_priv, File_priv, Grant_priv, References_priv, Index_priv, Alter_priv, Show_db_priv, Super_priv, Create_tmp_table_priv, Lock_tables_priv, Execute_priv, Create_view_priv, Show_view_priv, Create_routine_priv, Alter_routine_priv, Create_user_priv, Event_priv, Trigger_priv, Create_tablespace_priv, ssl_type, ssl_cipher, x509_issuer, x509_subject, max_questions, max_updates, max_connections, max_user_connections, plugin, authentication_string, password_expired, password_last_changed, password_lifetime, account_locked, Load_from_S3_priv, Select_into_S3_priv, Invoke_lambda_priv) VALUES ?';

            let deletedPermissionUser: any[] = [];
            connectionUsers.map((user) => {
              // memo: cluster限定でinsertできなくなるので以下プロパティを削除
              delete user.Repl_slave_priv;
              delete user.Repl_client_priv;
              return deletedPermissionUser.push(user);
            });
            accessUserData = deletedPermissionUser.map((user) => {
              return Object.values(user);
            });
            break;
          case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
            insertSql4MySql =
              'INSERT INTO mysql.user (Host, User, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Reload_priv, Shutdown_priv, Process_priv, File_priv, Grant_priv, References_priv, Index_priv, Alter_priv, Show_db_priv, Super_priv, Create_tmp_table_priv, Lock_tables_priv, Execute_priv, Repl_slave_priv, Repl_client_priv, Create_view_priv, Show_view_priv, Create_routine_priv, Alter_routine_priv, Create_user_priv, Event_priv, Trigger_priv, Create_tablespace_priv, ssl_type, ssl_cipher, x509_issuer, x509_subject, max_questions, max_updates, max_connections, max_user_connections, plugin, authentication_string, password_expired, password_last_changed, password_lifetime, account_locked) VALUES ?';

            accessUserData = connectionUsers.map((user: any) => {
              return Object.values(user);
            });
            break;
          default:
            throw new Error(
              `targetDBName: ${targetDBName} のAccessUserInsert文は生成できません`,
            );
        }

        const filteredAccessUserData = accessUserData.filter((user: any) => {
          if (user[1] === 'rdsadmin' || user[1] === 'rdsrepladmin') {
            return false;
          }
          return true;
        });
        await connection.query(insertSql4MySql, [filteredAccessUserData]); // memo: executeだと失敗するので注意 https://github.com/sidorares/node-mysql2/issues/1239
        return await connection.query('FLUSH PRIVILEGES;'); // memo: FLUSH PRIVILEGES;を実行するにはRELOAD権限が必要
      case 'postgres':
      case 'aurora-postgresql':
        const createRoleSql = connectionUsers.join(' ').replace(/[\"]/g, '');
        return await connection.query(createRoleSql); // memo: ALL PRIVILAGESだとCREATEROLE権限はつかない
      default:
        throw new Error(
          `engineType: ${engine} のDatabase接続ユーザーの追加はできません`,
        );
    }
  }

  public async checkDatabaseConnection(
    databaseConnectInfo: IDatabase,
    engine: string,
    username: string,
    password: string,
  ) {
    const databaseRepository: DatabaseRepository = new DatabaseRepository();
    const checkConnectionInfo = {
      name: databaseConnectInfo.name,
      instanceId: databaseConnectInfo.instanceId,
      host: databaseConnectInfo.host,
      username: username,
      password: password,
    };

    const connection = await databaseRepository.connectionDatabase(
      checkConnectionInfo,
    );
    if (!connection) {
      throw new Error('Databaseへの接続に失敗しました');
    }

    let rows: any;
    const checkConnectionSql = `SELECT id FROM users LIMIT 1 ;`;
    switch (engine) {
      case 'mysql':
      case 'aurora-mysql':
        const mysqlData = connection.execute(checkConnectionSql);
        rows = mysqlData[rows];
        break;
      case 'postgres':
      case 'aurora-postgresql':
        const postgresData = connection.query(checkConnectionInfo);
        rows = postgresData.rows;
        break;
      default:
        throw new Error('想定外のEngineTypeです');
    }

    if (rows) {
      throw new Error('接続確認用ユーザーでのデータ取得に失敗しました');
    }
    return;
  }

  public checkTargetDatabase(databaseName: string) {
    const targetDatabase = [
      DATABASE_CONFIG.CLUSTER_MYSQL_DB.DB_IDENTIFIER.STAGING,
      DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DB_IDENTIFIER.STAGING,
      DATABASE_CONFIG.INSTANCE_MYSQL_DB.DB_IDENTIFIER.STAGING,
    ];
    return targetDatabase.includes(databaseName);
  }
}
domain/repository/aws/route53.ts
import {
  changeResourceRecordSets,
  postListResourceRecordSets,
  postListHostedZones,
} from '../../../infrastructure/aws/route53';
import { HostedZone } from 'aws-sdk/clients/route53';

export class Route53Repository {
  public async route53ChangeCurrentDBToRestoredDB(
    dbName: string,
    endpoint: string,
    hostedZoneId: string,
  ) {
    const resourceRecordName = this.getResourceRecordName(dbName);
    return await changeResourceRecordSets(
      hostedZoneId,
      resourceRecordName,
      endpoint,
    );
  }

  public async getHostNameFromRoute53(dbName: string, hostedZone: HostedZone) {
    const resourceRecordName = this.getResourceRecordName(dbName);
    const listResourceRecord = await postListResourceRecordSets(
      hostedZone.Id,
      resourceRecordName,
    );

    const targetRecord = listResourceRecord.ResourceRecordSets[0];
    const hostName = targetRecord?.ResourceRecords?.[0]?.Value;
    if (!hostName) {
      throw new Error('HostNameの取得に失敗しました');
    }
    return hostName;
  }

  public async getTargetHostedZone(hostedZoneName: string) {
    const hostedZones = await postListHostedZones();
    const targetHostedZone = hostedZones.HostedZones.find((hostedZone) => {
      return hostedZone.Name === hostedZoneName;
    });
    if (!targetHostedZone) {
      throw new Error('対象のHostedZoneが取得できませんでした');
    }
    return targetHostedZone;
  }
}
domain/repository/aws/ssm.ts
import { getParameter } from '../../../infrastructure/aws/ssm';
import { DATABASE_CONFIG } from '../../../config/database';

export class SsmRepository {
  public async postUsernameAndPassword4Staging(databaseName: string) {
    let usernameParameterKey: string = '';
    let passwordParameterKey: string = '';

    switch (databaseName) {
      case DATABASE_CONFIG.CLUSTER_MYSQL_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_STORE.STAGING.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.CLUSTER_MYSQL_DB.PARAMETER_STORE.STAGING.PASSWORD;
        break;
      case DATABASE_CONFIG.CLUSTER_POSTGRES_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_STORE.STAGING.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.CLUSTER_POSTGRES_DB.PARAMETER_STORE.STAGING.PASSWORD;
        break;
      case DATABASE_CONFIG.INSTANCE_MYSQL_DB.DATABASE_NAME:
        usernameParameterKey =
          DATABASE_CONFIG.INSTANCE_MYSQL_DB.PARAMETER_STORE.STAGING.USERNAME;
        passwordParameterKey =
          DATABASE_CONFIG.INSTANCE_MYSQL_DB.PARAMETER_STORE.STAGING.PASSWORD;
        break;
      default:
        throw new Error('パラメータストアの情報を取得できません');
    }

    const usernameParameter = await getParameter(usernameParameterKey);
    const username = usernameParameter?.Parameter?.Value;
    if (!username) {
      throw new Error('接続確認用ユーザーの取得に失敗しました');
    }

    const passwordParameter = await getParameter(passwordParameterKey);
    const password = passwordParameter?.Parameter?.Value;
    if (!password) {
      throw new Error('接続確認用パスワードの取得に失敗しました');
    }

    return { username, password };
  }
}
domain/repository/spreadsheet/index.ts
import { getRows } from '../../../infrastructure/spreadsheet';

export class SpreadsheetRepository {
  public async getCreateRoleSqls(spreadsheetId: string, worksheetId: string) {
    let sqls: any[];
    const rows = await getRows(spreadsheetId, worksheetId);
    sqls = rows.filter((row) => row.nick_name).map((row) => row.SQL.replace(/\n/g, ' '));
    return sqls;
  }
}

infrastrusture層

infrastructure/aws/ssm.ts
import * as aws from 'aws-sdk';
const ssm = new aws.SSM({ region: 'ap-northeast-1' });

export const getParameter = async (parameterName: string) => {
  return await ssm
    .getParameter({
      Name: parameterName,
      WithDecryption: true,
    })
    .promise();
};
infrastructure/aws/route53.ts
import aws = require('aws-sdk');
const route53 = new aws.Route53({ region: 'ap-northeast-1' });

export const postListHostedZones = async () => {
  return await route53
    .listHostedZones()
    .promise();
};

export const changeResourceRecordSets = async (hostedZoneId: string, resourceRecordName: string, endpoint: string) => {
  return await route53
    .changeResourceRecordSets({
      HostedZoneId: hostedZoneId,
      ChangeBatch: {
        Changes: [
          {
            Action: 'UPSERT',
            ResourceRecordSet: {
              Name: resourceRecordName,
              ResourceRecords: [{ Value: endpoint }],
              Type: 'CNAME',
              TTL: 60,
            },
          },
        ],
      },
    })
    .promise();
};

export const postListResourceRecordSets = async (hostedZoneId: string, resourceRecordName: string) => {
  return await route53
    .listResourceRecordSets({
      HostedZoneId: hostedZoneId,
      StartRecordName: resourceRecordName,
    })
    .promise();
};
infrastructure/spreadsheet.ts
import { GoogleSpreadsheet } from 'google-spreadsheet';
import * as credentials from '../credentials.json';

export const getRows = async (spreadsheetId: string, worksheetId: string) => {
  const doc = new GoogleSpreadsheet(spreadsheetId);

  await doc.useServiceAccountAuth(credentials);
  await doc.loadInfo();

  const sheet = await doc.sheetsById[worksheetId];
  return await sheet.getRows();
};

以上です。

想像以上に長くなってしまいました。
まだ運用を初めて短いので改善できるところはたくさん出てくると思いますが、ひとまずこれで動いています。
長々とありがとうございました。

採用情報

採用活動も積極的に行っていますのでもし興味あれば以下コーポレートサイト等を覗いてみてください。

https://corp.air-closet.com/recruiting/
https://corp.air-closet.com/recruiting/developers/
エンジニアコーポレートサイト
https://youtu.be/w99lPd-Uea0

Discussion

ログインするとコメントできます