🔑

ServerlessFrameworkでDynamoDBのソートキーを変更してみる

2021/11/05に公開

結論

Cloudformationの実行でエラーになります。
ソートキーを変更したい場合は、ちゃんとリカバリしましょう。
というわけで以下の手順で検証して、リカバリまでやってみました。

やったこと

DynamoDBのテーブル作成

serverless.ymlのResourcesに以下のDynamoDBの情報を定義してデプロイします。

serverless.yml
service: dynamo-test
frameworkVersion: '2'
provider:
  name: aws
  runtime: nodejs12.x
  lambdaHashingVersion: 20201221
  region: ap-northeast-1
resources:
 Resources:
   Test:
     Type: AWS::DynamoDB::Table
     Properties:
       TableName: Test
       AttributeDefinitions:
         - AttributeName: pKey
           AttributeType: S
         - AttributeName: sKey
           AttributeType: S
       StreamSpecification:
         StreamViewType: NEW_AND_OLD_IMAGES
       KeySchema:
        - AttributeName: pKey
          KeyType: HASH
        - AttributeName: sKey
          KeyType: RANGE
       BillingMode: PAY_PER_REQUEST
npx serverless deploy

testのテーブルが作成されます。

sortKeyの変更

sKeyの名称をsKey2に変更して実行してみます

resources:
 Resources:
   Test:
     Type: AWS::DynamoDB::Table
     Properties:
       TableName: Test
       AttributeDefinitions:
         - AttributeName: pKey
           AttributeType: S
-         - AttributeName: sKey
+         - AttributeName: sKey2
           AttributeType: S
       StreamSpecification:
         StreamViewType: NEW_AND_OLD_IMAGES
       KeySchema:
        - AttributeName: pKey
          KeyType: HASH
-        - AttributeName: sKey
+        - AttributeName: sKey2
          KeyType: RANGE
       BillingMode: PAY_PER_REQUEST
npx serverless deploy

以下のようなエラーが表示されます。

An error occurred: Test - CloudFormation cannot update a stack when a custom-named resource requires replacing. Rename Test and update the stack again..

リンクでも説明がある通り、DynamoDBの場合はTableNameが一緒のものに対して変更をすることができないようになっています。
https://aws.amazon.com/jp/premiumsupport/knowledge-center/cloudformation-custom-name/
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html

ちなみに、tableNameを変更してデプロイすると、元のテーブル(Test)は削除されます。

resources:
 Resources:
   Test:
     Type: AWS::DynamoDB::Table
     Properties:
-       TableName: Test
+       TableName: Test2
       AttributeDefinitions:
         - AttributeName: pKey
           AttributeType: S
         - AttributeName: sKey
           AttributeType: S
       StreamSpecification:
         StreamViewType: NEW_AND_OLD_IMAGES
       KeySchema:
        - AttributeName: pKey
          KeyType: HASH
        - AttributeName: sKey
          KeyType: RANGE
       BillingMode: PAY_PER_REQUEST

他にも色々なパターンを試して見ましたが、単純な移行は難しいことがわかりました。

  • コンソールからテーブルを削除して、Cloudformationを実行する
    • Cloudformationが各リソースの作成前に前の処理との差分をとるので、その時点で、上記と同じエラーになります。
  • リソース名を変更してみる(Test → Test2)
    • すでに別のリソース(Test)でTestのテーブルは利用しているっていうエラーになります

リカバリ

DynamoDBのキーになる部分を変更する場合は以下の手順が必要になります。

  • 新しいテーブル定義でテーブルを作成する(Test2)
  • データの移行をする(Test → Test2)
    • listItemでデータを読みつつ、変換処理をしてからBatchWriteItemで値を書き込む処理になるかと思います。
  • 元のテーブル(Test)をCloudFormationでいったん削除する
  • CloudFormationを使って新しい定義でテーブルを作成しなおす(Test)
  • データを移行する(Test2 → Test)
  • Test2のテーブルを削除する

バックアップ

テーブル定義の作成 → データの移行まではこんな感じの処理でできると思います。

migrate.ts
import DynamoDB, { KeySchema } from 'aws-sdk/clients/dynamodb';

const AWS_INFO = {
  region: 'ap-northeast-1',
  accessKeyId: 'xxx',
  secretAccessKey: 'xxx',
};

const backupInfo = {
  tableName: 'Test',  // backup対象のテーブル名
  oldKeyName: 'sKey',  // 変換対象のkey
  keyName: 'sKey2',  // 変換後のkey
  KeyType: 'RANGE',  // keyのtype
  AttributeType: 'S',  // keyのAttribute
  converter: {
    sKey2: item => `${item.sKey}2`,
  },  // 変換処理。keyの名前でValue側の処理を実行して格納する
};

const client = new DynamoDB.DocumentClient(AWS_INFO);

const dynamoDb = new DynamoDB({
  apiVersion: '2012-08-10',
  region: AWS_INFO.region,
  credentials: {
    accessKeyId: AWS_INFO.accessKeyId,
    secretAccessKey: AWS_INFO.secretAccessKey,
  },
});

/**
 * backup前のテーブルを再帰的に全件取得する
 * 件数が多い場合はOOMになるので注意
 * @param tableName
 * @param pre
 * @param lastEvaluatedKey
 * @returns
 */
const listItems = async <T>(tableName: string, pre: T[], lastEvaluatedKey?: DynamoDB.DocumentClient.Key) => {
  console.log('list item');

  const items = await client
    .scan({
      TableName: tableName,
      ExclusiveStartKey: lastEvaluatedKey,
    })
    .promise();

  const result = [...pre, ...items.Items];
  if (items.LastEvaluatedKey) {
    return await listItems(tableName, result, items.LastEvaluatedKey);
  }

  return result;
};

/**
 * tableにデータを25件ずつ入れる
 * batchWriteの仕様で25件ずつしかいれられない
 * @param tableName
 * @param items
 */
const insertItems = async <T>(tableName: string, items: T[]) => {
  const batch25 = async (items: T[]) => {
    if (items.length === 0) {
      return;
    }
    await client
      .batchWrite({
        RequestItems: {
          [tableName]: items.slice(0, 25).map(item => ({ PutRequest: { Item: item } })),
        },
      })
      .promise();
    return await batch25(items.slice(25));
  };

  await batch25(items);
};

/**
 * backup時にconvertする
 * @param item
 * @returns
 */
const converter = (item: any) => ({
  ...item,
  ...Object.entries(backupInfo.converter).reduce(
    (pre, [key, value]) => ({
      ...pre,
      [key]: value(item),
    }),
    {},
  ),
});

/**
 * backup用のテーブルを作成する。名前は_bakで固定
 * @param tableInfo
 */
const createTable = async (tableInfo: DynamoDB.DescribeTableOutput) => {
  await dynamoDb
    .createTable({
      TableName: `${tableInfo.Table.TableName}_bak`,
      KeySchema: createKeySchema(tableInfo.Table.KeySchema),
      AttributeDefinitions: createAttributeDefinitions(tableInfo.Table.AttributeDefinitions),
      LocalSecondaryIndexes: tableInfo.Table.LocalSecondaryIndexes?.map(idx => ({
        IndexName: idx.IndexName,
        KeySchema: idx.KeySchema,
        Projection: idx.Projection,
      })),
      GlobalSecondaryIndexes: tableInfo.Table.GlobalSecondaryIndexes.map(idx => ({
        IndexName: idx.IndexName,
        KeySchema: idx.KeySchema,
        Projection: idx.Projection,
        // billingModeがPAY_PER_REQUESTの場合、0になるので入れるとエラーになる
        // ProvisionedThroughput: {
        //   ReadCapacityUnits: idx.ProvisionedThroughput.ReadCapacityUnits,
        //   WriteCapacityUnits: idx.ProvisionedThroughput.WriteCapacityUnits,
        // },
      })),
      BillingMode: tableInfo.Table.BillingModeSummary.BillingMode,
      // billingModeがPAY_PER_REQUESTの場合、0になるので入れるとエラーになる
      // ProvisionedThroughput: {
      //   ReadCapacityUnits: tableInfo.Table.ProvisionedThroughput.ReadCapacityUnits,
      //   WriteCapacityUnits: tableInfo.Table.ProvisionedThroughput.WriteCapacityUnits,
      // },
      StreamSpecification: tableInfo.Table.StreamSpecification,
      SSESpecification: tableInfo.Table.SSEDescription,
    })
    .promise();
};

const createKeySchema = (keySchema: KeySchema): KeySchema => {
  return [
    ...keySchema.filter(key => key.AttributeName !== backupInfo.oldKeyName),
    {
      AttributeName: backupInfo.keyName,
      KeyType: backupInfo.KeyType,
    },
  ];
};

const createAttributeDefinitions = (attributeDefinitions: DynamoDB.AttributeDefinitions) => {
  return [
    ...attributeDefinitions.filter(def => def.AttributeName !== backupInfo.oldKeyName),
    {
      AttributeName: backupInfo.keyName,
      AttributeType: backupInfo.AttributeType,
    },
  ];
};

const sleep = async (ms: number) => {
  return new Promise(resolve =>
    setTimeout(() => {
      resolve(null);
    }, ms),
  );
};

const migrate = async () => {
  const tableInfo = await dynamoDb
    .describeTable({
      TableName: backupInfo.tableName,
    })
    .promise();

  await createTable(tableInfo);

  // tableの作成が終わるまで待つ
  while (true) {
    console.log('wait ...');

    await sleep(5000);
    const tableInfo = await dynamoDb
      .describeTable({
        TableName: `${backupInfo.tableName}_bak`,
      })
      .promise();
    if (tableInfo.Table.TableStatus === 'ACTIVE') {
      break;
    }
  }

  const result = await listItems(backupInfo.tableName, []);
  await insertItems(
    `${backupInfo.tableName}_bak`,
    result.map(ret => converter(ret)),
  );
};

migrate();
npx ts-node migrate.ts

あとはcloudformationで一度Testのテーブルを削除した状態で実行したあと、replaceした情報で再度実行します。

resources:
 Resources:
-   Test:
-     Type: AWS::DynamoDB::Table
-     Properties:
-       TableName: Test
-       AttributeDefinitions:
-         - AttributeName: pKey
-           AttributeType: S
-         - AttributeName: sKey
-           AttributeType: S
-       StreamSpecification:
-         StreamViewType: NEW_AND_OLD_IMAGES
-       KeySchema:
-        - AttributeName: pKey
-          KeyType: HASH
-        - AttributeName: sKey
-          KeyType: RANGE
-       BillingMode: PAY_PER_REQUEST
npx serverless deploy
resources:
 Resources:
   Test:
     Type: AWS::DynamoDB::Table
     Properties:
       TableName: Test
       AttributeDefinitions:
         - AttributeName: pKey
           AttributeType: S
-         - AttributeName: sKey
+         - AttributeName: sKey2
           AttributeType: S
       StreamSpecification:
         StreamViewType: NEW_AND_OLD_IMAGES
       KeySchema:
        - AttributeName: pKey
          KeyType: HASH
        - AttributeName: sKey
          KeyType: RANGE
       BillingMode: PAY_PER_REQUEST
npx serverless deploy

作成されたTestのテーブルにたいして、Test2のデータを↑のコードの一部を使って実行すればOKです。

migrate.ts(↑の一部変更版)
import DynamoDB, { KeySchema } from 'aws-sdk/clients/dynamodb';

const AWS_INFO = {
  region: 'ap-northeast-1',
  accessKeyId: 'xxx',
  secretAccessKey: 'xxx',
};

const backupInfo = {
  tableName: 'Test',  // backup対象のテーブル名
};

const client = new DynamoDB.DocumentClient(AWS_INFO);

/**
 * backup前のテーブルを再帰的に全件取得する
 * 件数が多い場合はOOMになるので注意
 * @param tableName
 * @param pre
 * @param lastEvaluatedKey
 * @returns
 */
const listItems = async <T>(tableName: string, pre: T[], lastEvaluatedKey?: DynamoDB.DocumentClient.Key) => {
  console.log('list item');

  const items = await client
    .scan({
      TableName: tableName,
      ExclusiveStartKey: lastEvaluatedKey,
    })
    .promise();

  const result = [...pre, ...items.Items];
  if (items.LastEvaluatedKey) {
    return await listItems(tableName, result, items.LastEvaluatedKey);
  }

  return result;
};

/**
 * tableにデータを25件ずつ入れる
 * batchWriteの仕様で25件ずつしかいれられない
 * @param tableName
 * @param items
 */
const insertItems = async <T>(tableName: string, items: T[]) => {
  const batch25 = async (items: T[]) => {
    if (items.length === 0) {
      return;
    }
    await client
      .batchWrite({
        RequestItems: {
          [tableName]: items.slice(0, 25).map(item => ({ PutRequest: { Item: item } })),
        },
      })
      .promise();
    return await batch25(items.slice(25));
  };

  await batch25(items);
};

const migrate = async () => {
  const result = await listItems(backupInfo.tableName, []);
  await insertItems(
    `${backupInfo.tableName}_bak`,
    result.map(ret => converter(ret)),
  );
};

migrate()
npx ts-node migrate.ts

おわりに。

簡単なことのようですごく面倒だった。
partitionKeyやsortKeyの情報は変わらないような値にして、変わる可能性のあるクエリはGSIにしたほうが無難なのかもしれないですね。

Discussion