🐡

AWS SDK: Amazon DynamoDB テーブルのデータを全削除する

2024/02/10に公開

概要

Amazon DynamoDB ではデータを一括削除する API は存在しないため、DeleteItem で 1 件ずつ、または BatchWriteItem で最大 25 件ずつ、指定しながら削除する必要があります。

実装例

AWS SDK for JavaScript v3

以下は Scan で 25 件ずつ取得して BatchWriteItem で削除を繰り返す例です。ProjectionExpression にパーティションキー/ソートキーを指定することで、取得した結果を削除パラメーターとしてそのまま渡すことができます。

以下は TypeScript での例です。

import {
    BatchWriteItemCommand,
    BatchWriteItemCommandInput,
    DynamoDBClient,
    ScanCommand,
    ScanCommandInput
} from "@aws-sdk/client-dynamodb";

const client = new DynamoDBClient({});

async function deleteAll(tableName: string, keys: string[]) {
    const scanInput: ScanCommandInput = {
        TableName: tableName,
        ProjectionExpression: keys.join(", "),
        Limit: 25,
    };
    while (true) {
        const scanCommand = new ScanCommand(scanInput);
        const scanResponse = await client.send(scanCommand);
        const items = scanResponse.Items ?? [];
        if (items.length > 0) {
            const batchDeleteInput: BatchWriteItemCommandInput = {
                RequestItems: {
                    [tableName]: items.map((item) => ({
                        DeleteRequest: {Key: item},
                    })),
                },
            };
            const batchDeleteCommand = new BatchWriteItemCommand(batchDeleteInput);
            await client.send(batchDeleteCommand);
        }
        if (!scanResponse.LastEvaluatedKey) {
            break;
        }
        scanInput.ExclusiveStartKey = scanResponse.LastEvaluatedKey;
    }
}

AWS SDK for JavaScript v3 (その 2)

ページングしながらすべてのデータを取得するジェネレーター関数と、取得したデータを 25 件ずつに分割して削除する処理に分ける場合は、以下のようになります。scanAll() は汎用化するためいったん unmarshall() したレコードを返しています。

import {
    BatchWriteItemCommand,
    BatchWriteItemCommandInput,
    DynamoDBClient,
    ScanCommand,
    ScanCommandInput
} from "@aws-sdk/client-dynamodb";
import {marshall, unmarshall} from "@aws-sdk/util-dynamodb";

const client = new DynamoDBClient({});

async function* scanAll(tableName: string, attributeNames: string[]) {
    const scanInput: ScanCommandInput = {
        TableName: tableName,
        ProjectionExpression: attributeNames.join(", "),
    };
    while (true) {
        const scanCommand = new ScanCommand(scanInput);
        const scanResponse = await client.send(scanCommand);
        for (const item of scanResponse.Items ?? []) {
            yield unmarshall(item);
        }
        if (!scanResponse.LastEvaluatedKey) {
            break;
        }
        scanInput.ExclusiveStartKey = scanResponse.LastEvaluatedKey;
    }
}

async function* chunkGenerator<T>(generator: AsyncGenerator<T, void, void>, chunkSize: number) {
    let chunk: T[] = [];
    for await (const value of generator) {
        chunk.push(value);
        if (chunk.length === chunkSize) {
            yield chunk;
            chunk = [];
        }
    }
    if (chunk.length > 0) {
        yield chunk;
    }
}

async function deleteAll(tableName: string, keys: string[]) {
    for await (const items of chunkGenerator(scanAll(tableName, keys), 25)) {
        const batchDeleteInput: BatchWriteItemCommandInput = {
            RequestItems: {
                [tableName]: items.map((item) => ({
                    DeleteRequest: {Key: marshall(item)},
                })),
            },
        };
        const batchDeleteCommand = new BatchWriteItemCommand(batchDeleteInput);
        await client.send(batchDeleteCommand);
    }
}

AWS SDK for Python (boto3)

Python の場合は抽象化されたリソースインタフェースを使うと、少しシンプルに実装することができます。batch_writer() が適切な個数ごとに分割してくれます。

import boto3

resource = boto3.resource("dynamodb")


def scan_all(table_name: str, attribute_names: list[str] = None):
    scan_input = {}
    if attribute_names:
        scan_input["ProjectionExpression"] = ", ".join(attribute_names)
    while True:
        res = resource.Table(table_name).scan(**scan_input)
        for item in res["Items"]:
            yield item
        if "LastEvaluatedKey" not in res:
            break
        scan_input.ExclusiveStartKey = res["LastEvaluatedKey"]


def delete_all(table_name: str, keys: list[str]):
    with resource.Table(table_name).batch_writer() as writer:
        for item in scan_all(table_name, keys):
            writer.delete_item(Key=item)

補足) リソースインタフェースの改修は凍結されています。SDK 全体として抽象化レイヤーは提供せずクライアントインタフェースのみに統一する方針なのかと思います。

Discussion