AWS DMSを作成する(with CDK)
前提
- RDS MySQL 8.0から Aurora MySQL 8.0へのデータの移行と同期を行いたい
- どちらのDBもAWSの同一アカウント上にある
概要
DMSは現状CDKのL2がサポートされていないのでL1で記述する。
以下を作成し、コンソールかCLIからタスクを実行する。
- レプリケーションインスタンス
- ソースエンドポイント(送信元のRDS)
- ターゲットエンドポイント(送信先のAurora)
- レプリケーションタスク
- DMSで必要なRole作成
- タスク実行前に行う事前評価用のRoleとS3の記述
- 各エンドポイントのDBにセキュリティグループを追加。
こちらのコード例がとても参考になりました。
料金
- レプリケーションのインスタンスタイプとストレージでかかる
- データ転送は基本無料だが、AWS外のDBに移行する場合や、アベイラビリティーゾーンおよびリージョンの異なるターゲットDBの場合は料金がかかる
レプリケーションインスタンスの作成
const cfnInstanceProfile = new dms.CfnReplicationInstance(this, 'ReplicationInstance', {
replicationInstanceIdentifier: "test-dms-replication-instance",
vpcSecurityGroupIds: vpcSecurityGroupIds,
replicationSubnetGroupIdentifier: subnetGroup.ref,
replicationInstanceClass: 'dms.t3.large',
allocatedStorage: 100,
engineVersion: '3.5.2',
});
- セキュリティグループを設定する場合はvpcSecurityGroupIdsに指定。
- サブネットグループは作っておいて、指定しないと動かないので注意。
- エンジンバージョンは最新を指定。
- インスタンスタイプやストレージは、どんなサイズのどういうデータをどうやって送るかによって設定する。特にJSONとかLONGTEXTとかのLOB(ラージオブジェクト)の送信するときに重くなるので注意。送り方は後述のタスクの設定で並列にしたり色々できる。
ターゲットエンドポイント, ソースエンドポイント
const cfnSourceEndpoint = new dms.CfnEndpoint(this, 'SourceEndpoint', {
endpointType: 'source',
endpointIdentifier: `test-dms-source-endpoint`,
engineName: 'mysql',
databaseName: props.sourceDatabaseName,
serverName: props.sourceDatabaseEndpoint,
port: props.sourceDatabasePort,
username: props.sourceDatabaseUserName,
password: props.sourceDatabasePassword,
});
const cfnTargetEndpoint = new dms.CfnEndpoint(this, 'TargetEndpoint', {
endpointType: 'target',
endpointIdentifier: `test-dms-target-endpoint`,
engineName: 'aurora',
databaseName: props.targetDatabaseName,
serverName: props.targetDatabaseEndpoint,
port: props.targetDatabasePort,
username: props.targetDatabaseUserName,
password: props.targetDatabasePassword,
extraConnectionAttributes: "initstmt=SET FOREIGN_KEY_CHECKS=0;",
});
- DBのプロパティをそれぞれ記述。
- serverNameはDBのエンドポイントを指定。
- extraConnectionAttributesは、外部キーの制約があるとエラーになってテーブルが作られないので、外部キー制約を無効化するように指定。デフォルトではアルファベット順にテーブルが作成される。
レプリケーションタスク
const cfnTask = new dms.CfnReplicationTask(this, 'ReplicationTask', {
migrationType: 'full-load-and-cdc',
replicationInstanceArn: cfnInstanceProfile.ref,
sourceEndpointArn: cfnSourceEndpoint.ref,
targetEndpointArn: cfnTargetEndpoint.ref,
replicationTaskSettings: JSON.stringify(TaskSettings),
tableMappings: JSON.stringify(
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "test_production",
"table-name": "%"
},
"rule-action": "include"
}
]
}
),
});
- データ移行と同期の両方を行いたいのでfull-load-and-cdcを指定。
- replicationTaskSettingsではCloudWatchLogsの設定やLOBの設定やデータ検証の設定を記述。詳細は後述。
- tableMappingsではデータ移行時に除外するテーブルなど指定できる。%にして全てのschemaを読み込むようにすると、performance_schemaなどもターゲットのDBに挿入されるので注意。
レプリケーションタスクの設定
- 長いのでトグルに記載。
TaskSettings
{
"TaskSettings": {
"TargetMetadata": {
"TargetSchema": "",
"SupportLobs": true,
"FullLobMode": true,
"LobChunkSize": 1024,
"LimitedSizeLobMode": false,
"LobMaxSize": 0,
"InlineLobMaxSize": 2048,
"LoadMaxFileSize": 0,
"ParallelLoadThreads": 0,
"ParallelLoadBufferSize": 0,
"BatchApplyEnabled": false,
"TaskRecoveryTableEnabled": false,
"ParallelLoadQueuesPerThread": 0,
"ParallelApplyThreads": 0,
"ParallelApplyBufferSize": 0,
"ParallelApplyQueuesPerThread": 0
},
"FullLoadSettings": {
"CreatePkAfterFullLoad": false,
"StopTaskCachedChangesApplied": false,
"StopTaskCachedChangesNotApplied": false,
"MaxFullLoadSubTasks": 8,
"TransactionConsistencyTimeout": 900,
"CommitRate": 10000
},
"Logging": {
"EnableLogging": true,
"LogComponents": [
{
"Id": "SOURCE_UNLOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "SOURCE_CAPTURE",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "TARGET_LOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "TARGET_APPLY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "TASK_MANAGER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
}
]
},
"ControlTablesSettings": {
"ControlSchema": "",
"HistoryTimeslotInMinutes": 5,
"HistoryTableEnabled": false,
"SuspendedTablesTableEnabled": false,
"StatusTableEnabled": false
},
"StreamBufferSettings": {
"StreamBufferCount": 4,
"StreamBufferSizeInMB": 16,
"CtrlStreamBufferSizeInMB": 5
},
"ChangeProcessingDdlHandlingPolicy": {
"HandleSourceTableDropped": true,
"HandleSourceTableTruncated": true,
"HandleSourceTableAltered": true
},
"ErrorBehavior": {
"DataErrorPolicy": "LOG_ERROR",
"DataTruncationErrorPolicy": "LOG_ERROR",
"DataErrorEscalationPolicy": "SUSPEND_TABLE",
"DataErrorEscalationCount": 0,
"TableErrorPolicy": "SUSPEND_TABLE",
"TableErrorEscalationPolicy": "STOP_TASK",
"TableErrorEscalationCount": 0,
"RecoverableErrorCount": -1,
"RecoverableErrorInterval": 5,
"RecoverableErrorThrottling": true,
"RecoverableErrorThrottlingMax": 1800,
"RecoverableErrorStopRetryAfterThrottlingMax": false,
"ApplyErrorDeletePolicy": "IGNORE_RECORD",
"ApplyErrorInsertPolicy": "LOG_ERROR",
"ApplyErrorUpdatePolicy": "LOG_ERROR",
"ApplyErrorEscalationPolicy": "LOG_ERROR",
"ApplyErrorEscalationCount": 0,
"ApplyErrorFailOnTruncationDdl": false,
"FullLoadIgnoreConflicts": true,
"FailOnTransactionConsistencyBreached": false,
"FailOnNoTablesCaptured": false
},
"ChangeProcessingTuning": {
"BatchApplyPreserveTransaction": true,
"BatchApplyTimeoutMin": 1,
"BatchApplyTimeoutMax": 30,
"BatchApplyMemoryLimit": 500,
"BatchSplitSize": 0,
"MinTransactionSize": 1000,
"CommitTimeout": 1,
"MemoryLimitTotal": 1024,
"MemoryKeepTime": 60,
"StatementCacheSize": 50
},
"ValidationSettings": {
"EnableValidation": true,
"ValidationMode": "ROW_LEVEL",
"ThreadCount": 5,
"FailureMaxCount": 10000,
"TableFailureMaxCount": 1000,
"HandleCollationDiff": false,
"ValidationOnly": false,
"RecordFailureDelayLimitInMinutes": 0,
"SkipLobColumns": false,
"ValidationPartialLobSize": 0,
"ValidationQueryCdcDelaySeconds": 0,
"PartitionSize": 10000
},
"PostProcessingRules": null,
"CharacterSetSettings": null,
"LoopbackPreventionSettings": null,
"BeforeImageSettings": null
}
}
```
- 大きいサイズのLOBがある場合は完全LOBモードだとかなりおそくなるので注意
- しかし、制限付きLOBモードだと大きいサイズのLOBが送信されないので、インラインLOBモードに設定。
- ちなみに完全LOBだと永遠に終わりませんでしたが、インラインLOBで20分ほどで完了しました。
- 7GBのテーブルです。平均して1レコードに2MBぐらいありました。
- ちなみに完全LOBだと永遠に終わりませんでしたが、インラインLOBで20分ほどで完了しました。
- CloudWatchLogsの設定
- データ検証を行うように設定
- その他プロパティの設定の仕方などの詳細はこちらを参照。
Role
dmsを動かすのに必要なロールがある。
AWS CLI または AWS DMS API をデータベース移行に使用する場合は、AWS DMS の機能を使用する前に 3 つの IAM ロールを AWS アカウントに追加する必要があります。これらのロールのうち 2 つは dms-vpc-role と dms-cloudwatch-logs-role です。Amazon Redshift をターゲットデータベースとして使用している場合、IAM ロールdms-access-for-endpoint も AWS アカウントに追加する必要があります。
https://docs.aws.amazon.com/ja_jp/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole
- 一度手動でインスタンスを作成すると上記Roleが作られる。
- 手動で作成してアタッチもできる。
移行前評価で使うS3とロール
const s3Bucket = new s3.Bucket(this, 'DmsS3PremigrationAssessmentBucket', {
bucketName: 'test-dms-s3-premigration-assessment-bucket',
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
});
const premigrationAssessmentPolicy = new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
's3:PutObject',
's3:DeleteObject',
's3:GetObject',
's3:PutObjectTagging',
's3:ListBucket',
's3:GetBucketLocation',
],
resources: [
s3Bucket.bucketArn,
`${s3Bucket.bucketArn}/*`
]
});
const premigrationAssessmentRole = new iam.Role(this, 'PremigrationAssessmentRole', {
roleName: 'test-dms-premigration-assessment-role',
assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
});
premigrationAssessmentRole.addToPolicy(premigrationAssessmentPolicy);
- 移行前評価自体はCDKでは作れなさそうなのでコンソールから作るように設定。ここで作ったs3とロールを指定する。
- CLIで作成から実行までいけるので、SDK使えば一応作れそう
セキュリティグループ
ソースDBとターゲットDBのSGにDMSからの接続を許可するSGを追加。
dmsSubnetCidrs.forEach(cidr => {
rdsSG.addIngressRule(ec2.Peer.ipv4(cidr), ec2.Port.tcp(3306), `Allow access from DMS subnet ${cidr}`);
});
タスクを動かして起きたエラーと解決
Test Endpoint failed: Application-Status: 1020912, Application-Message: Cannot connect to ODBC provider ODBC general error., Application-Detailed-Message: RetCode: SQL_ERROR SqlState: HY000 NativeError: 2003 Message: [MySQL][ODBC 8.0(w) Driver]Can't connect to MySQL server on
- レプリケーションインスタンスが各DBにアクセスできるように、サブネットのCIDRを各DBのセキュリティグループに設定する必要があった。
Last Error Failed in resolving configuration. Stop Reason FATAL_ERROR Error Level FATAL Errors in MySQL server validation. Follow all prerequisites for 'MySQL as a source in DMS' from https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MySQL.html or'MySQL as a target in DMS' from https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.MySQL.html [1020418] (mysql_endpoint_imp.c:767)
- ソースDBのパラメーターグループを修正して解決
const parameterGroup = new rds.ParameterGroup(this, 'ParameterGroup', {
engine: rds.DatabaseInstanceEngine.mysql({
version: rds.MysqlEngineVersion.VER_8_0,
}),
parameters: {
['binlog_format']: 'ROW',
['binlog_row_image']: 'FULL'
},
});
[TARGET_LOAD ]E: RetCode: SQL_ERROR SqlState: 42000 NativeError: 1064 Message: [MySQL][ODBC 8.0(w) Driver][mysqld-8.0.28]You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'COLLATE "utf8mb3_general_ci" ) COLLATE utf8mb3_general_ci' at line 1 [1022502] (ar_odbc_stmt.c:5035)
- utf8mb3になっていたテーブルをutf8mb4に変更
[TARGET_LOAD]E: RetCode: SQL_ERROR SqlState: HY000 NativeError: 3730 Message: [MySQL][ODBC 8.0(w) Driver][mysqld - 8.0.28]Cannot drop table 'items' referenced by a foreign key constraint 'fk_rails_63d3da128b' on table 'shops'. [1022502](ar_odbc_stmt.c: 5035)
-
デフォルトだとアルファベット順にテーブル移行を開始するので、ターゲットDBの外部キーを一時的に無効にする設定をCfnEndpointに追加。
extraConnectionAttributes: "initstmt=SET FOREIGN_KEY_CHECKS=0;",
[TARGET_LOAD ]E: RetCode: SQL_ERROR SqlState: 42000 NativeError: 1064 Message: [MySQL][ODBC 8.0(w) Driver][mysqld-8.0.28]You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'COLLATE "utf8mb3_general_ci" ) COLLATE utf8mb3_general_ci' at line 1 [1022502] (ar_odbc_stmt.c:5035)
-
おかしいカラムがあったので修正。
hoge_id varchar(0) COLLATE utf8mb4_general_ci DEFAULT NULL,
ありがとうございました!!
Discussion