🐈

DynamoDB へ 大量データを 一括投入する

2024/07/24に公開

はじめに

DynamoDB テーブルへの CSV ファイルのデータ1万件ほどを一括取り込みのため、AWSが提供しているCloudFormation テンプレートを利用した際に、つまづいた点をお話します。

作業する上で注意する点などは、下記のサイトもご参考ください。
https://kimyas.link/article/405

発生した事象

1. LambdaのRuntimeが古かった。

Templateでいうと下記のRuntime:部分になります。
対応しているバージョンへ変更しましょう。

"CsvToDDBLambdaFunction": {
            "Type": "AWS::Lambda::Function",
            "Properties": {
                "Handler": "index.lambda_handler",
                # 略...
                "Runtime": "python3.7", # この部分★
                # 略...
2. インデントがずれてしまっていた。

CloudWatchのログを確認してみると、Lambda関数のインデントが一部ずれているとのエラーが。
修正したものが下記になります。

import json
import boto3
import os
import csv
import codecs
import sys

s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')

bucket = os.environ['bucket']
key = os.environ['key']
tableName = os.environ['table']

def lambda_handler(event, context):
    try:
        obj = s3.Object(bucket, key).get()['Body']
    except Exception as error:
        print(error)
        print("S3 Object could not be opened. Check environment variable. ")
    try:
        table = dynamodb.Table(tableName)
    except Exception as error:
        print(error)
        print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")

    batch_size = 100
    batch = []

    for row in csv.DictReader(codecs.getreader('utf-8-sig')(obj)):
        if len(batch) >= batch_size:
            write_to_dynamo(batch)
            batch.clear()

        batch.append(row)

    if batch:
        write_to_dynamo(batch)

    return {
        'statusCode': 200,
        'body': json.dumps('Uploaded to DynamoDB Table')
    }


def write_to_dynamo(rows):
    try:
        table = dynamodb.Table(tableName)
    except Exception as error:
        print(error)
        print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")

    try:
        with table.batch_writer() as batch:
            for i in range(len(rows)):
                batch.put_item(
                    Item=rows[i]
                )
    except Exception as error:
        print(error)
        print("Error executing batch_writer")
3. その他:私の勘違い

このTemplateでCSVファイルのデータをDynamoDBに格納する流れは下記になります。

  1. CSVファイルがS3バケットにアップロードされる。
  2. S3バケットへのファイル追加をトリガーにして、Lambda関数が起動する。
  3. Lambda関数がCSVファイルのデータをDynamoDBに書き込む。

そしてTemplateのパラメータでは下記を指定します。

  • BucketName
  • DynamoDBTableName
  • FileName

私はS3(BucketName)に入れておいたファイル(FileName)のデータをDynamoDB(DynamoDBTableName)に格納してくれると勘違いし、S3をあらかじめ準備していた為「そのS3は既に存在するよ!」とエラーで怒られました。

結果

1万件程のデータ取込でしたが、あっという間に完了しました。すごい!
公式サイトによると、100,000 行のファイルでの実行でも約 80 秒ほどで完了するそうです。

翌日AWS CLIで DynamoDB テーブルのアイテム件数を取得して、問題がないことを確認しました。

参考:【小ネタ】AWS CLI で Amazon DynamoDB テーブルのアイテム件数を取得する

以上、どなたかの参考になれば幸いです。
えみり〜でした|ωΦ)ฅ

Discussion