DynamoDBに保存されているデータをTTLでS3にバックアップ

2023/06/21に公開

はじめに

こんにちは。今日はDynamoDBに保存されているデータをTTLでS3にバックアップする構成をガイダンスを参照しながら構築した際の話です。

東京リージョンでは、以下のように料金体系が組まれているDynamoDB。

・DynamoDB Standard テーブルクラスを使用すると、
 最初に保存される 25 GB (1 か月あたり) は無料です
・それ以降、0.285USD/GB-月

アクセス頻度が低いログデータ等のデータを蓄積するテーブルで、データライフサイクルを考慮せずにテーブルを作成した後、どんどんデータが溜まっていき、結果、毎月その分のコストを支払う...みたいなことはもったいないです。とはいえ、アプリケーション側の既存構成はあまりいじりたくないし、TTLで消去するにもログデータをいつ参照するかも分からないし、TTLで削除するのではなく、これをトリガーにS3に保存する方法はないか?という考えになりました。

実際に、ガイダンスがまとめられていましたので、こちらを参照しながら環境を構築していこうと思います。こちらのガイダンスではCLIを用いていますが、自分がCLIをあまり理解できていないこともあり、訳のわからないまま作業するのは今後のためにならないと思ったので、今回はCLIのコマンドと照らし合わせながら、コンソール画面での作業中心に進めていこうと思います。
https://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/patterns/automatically-archive-items-to-amazon-s3-using-dynamodb-ttl.html

DynamoDBテーブルの作成

とりあえずお試しなので、最低限の設定とします。TTLに設定するAttributeをtimestampにしたかったので、ソートキーにtimestampを設定しました。

テーブル詳細の「エクスポートおよびストリーム」>「DynamoDBストリームの詳細」からDynamoDBストリームをオンにします。表示タイプは新旧イメージにします。ガイダンスにあるLambdaをそのまま使用する関係上、新旧イメージにしていますが、削除だけ捌きたいのであれば、後述のLambdaを編集して、古いイメージを選択するのでも問題ないと思います。

TTLを有効化します。ここで指定するのはtimestampとします。

S3バケットの作成

とりあえずデフォルト設定です。

ここで、ライフサイクルポリシーを作成して、S3 Glacier等の長期保管用のストレージクラスに移行する仕組みを作るのもアリですが、S3→S3 Glacierに転送する際にも料金が発生します。[1]
DynamoDBからS3へTTLで転送してくるデータが小さくてもファイル数が大量になるならば、思ったよりコストがかかる懸念があるので、見積ツールなどを使用して、どちらが望ましいか選択することが必要だと思います。今回は通常のS3で構築したいと思います。

Kinesis Data Firehoseの作成

IAMポリシーを作成します。

{
	"Version": "2012-10-17",
	"Statement": [
            {
                "Sid": "1",
                "Effect": "Allow",
                "Action": [
                    "s3:AbortMultipartUpload",
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:PutObject"
                ],
                "Resource": [
                    "{your s3_bucket ARN}/*",
                    "{Your s3 bucket ARN}"
                ]
            }
	]
}

IAMロールを作成します。ポリシーには先ほど作成したIAMポリシーを紐づけ、信頼されたエンティティにはFirehoseを指定します。以下の信頼ポリシーが作成されます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Principal": {
                "Service": [
                    "firehose.amazonaws.com"
                ]
            }
        }
    ]
}

firehoseを作成します。



Lambda関数の作成

IAMロールを作成します。信頼されたエンティティにはLambdaを指定します。ロールに充てるポリシーはガイダンスのものをそのまま適用しています。

Lambda関数を作成します。ランタイムにはPython3.8を指定します。

Lambda関数の環境変数を設定します。

Lambda関数のトリガーを設定します。

Lambda関数のコードは、ガイダンスにもある下記をそのまま引用します。zipインポートでもコピペでも大丈夫だと思います。ただし、ReservationIDReservationDateはDynamoDBの構成次第で変える必要があります。今回は、ReservationIDのところを構成したDynamoDBのPKであるidとしました。また、ハンドラもlambda_handlerにしました。Lambda作成はCLI経由の方が圧倒的に楽です。
https://github.com/aws-samples/automatically-archive-items-to-s3-using-dynamodb-ttl/blob/main/LambdaStreamProcessor.py

import json
import logging
import time
import boto3
import os
from botocore.exceptions import ClientError

def lambda_handler(event, context):
    # Set up logging
    logging.basicConfig(level=logging.DEBUG,format='%(levelname)s: %(asctime)s: %(message)s')
    
    oldRecords = []
	#1. Iterate over each record , in this example we are only focused on REMOVE deleted items by DynamoDB
    try:
        for record in event['Records']:
            #2. Handle event by type
            if record['eventName'] == 'INSERT':
                handle_insert(record)
            elif record['eventName'] == 'MODIFY':
                handle_modify(record)
            elif record['eventName'] == 'REMOVE':
            	if (record['userIdentity']['principalId'] == 'dynamodb.amazonaws.com'):
            		oldRecords.append(record)
        handle_remove(oldRecords)
    except Exception as e:
        logging.error(e)
        return "Error"


def handle_insert(record):
	logging.info("Handling INSERT Event")
	
	#3a. Get newImage content
	newImage = record['dynamodb']['NewImage']
	
	#3b. Parse values
	newId = newImage['id']['S']

	#3c. log it
	logging.info ('New row added with ID=' + newId)
	logging.info("Done handling INSERT Event")

def handle_modify(record):
	logging.info("Handling MODIFY Event")

	#3a. Parse oldImage and score
	oldImage = record['dynamodb']['OldImage']
	oldReservationDate = oldImage['ReservationDate']['N']
	
	#3b. Parse oldImage
	newImage = record['dynamodb']['NewImage']
	newReservationDate = newImage['ReservationDate']['N']

	#3c. Check for change
	if oldReservationDate != newReservationDate:
		logging.info('Reservation Date Changed  - oldReservationDate=' + str(oldReservationDate) + ', newReservationDate=' + str(newReservationDate))

	logging.info("Done handling MODIFY Event")

def handle_remove(oldRecords):
	logging.info("Handling REMOVE Event")
    
	firehose_client = boto3.client('firehose')
    
	# Assign these values before running the program
	firehose_name = os.environ['firehose_name']
	bucket_arn = os.environ['bucket_arn']
	iam_role_name = os.environ['iam_role_name']
	batch_size = int(os.environ['batch_size'])
	
	oldImages=[]
	#3a. Parse oldImage
	for record in oldRecords:
	    oldImage = record['dynamodb']['OldImage']
	    oldImages.append(oldImage)
	    #3b. Parse values
	    oldId = oldImage['id']['S']

	    #3c. log it
	    logging.info ('Row removed with ID=' + oldId)
	    
	#3d. Determine the size of the archived data and process in a batch of up to 400 records

	oldImagesSize = len(oldImages)
	result=None

	if oldImagesSize > 0 and oldImagesSize < batch_size :
		batch = [{'Data': json.dumps(oldImages)}]
		try:
			result=firehose_client.put_record_batch(DeliveryStreamName=firehose_name,Records=batch)
		except ClientError as e :
			logging.error(e)
			exit(1)
	elif oldImagesSize > batch_size :
		# Break the list to a batch size of 400 and put record batch
		chunckedOldImagesList = [oldImages[i * batch_size:(i + 1) * batch_size] for i in range((len(oldImages) + batch_size - 1) // batch_size )] 
		for list in chunckedOldImagesList :
			batch = [{'Data': json.dumps(list)}]
			try:
				result=firehose_client.put_record_batch(DeliveryStreamName=firehose_name,Records=batch)
			except ClientError as e :
				logging.error(e)
				exit(1)
		
	# Check for records in the batch did not get processed
	if result :
		num_failures = result['FailedPutCount']
		if num_failures:
			# Resend failed records
			logging.info('Resending {num_failures} failed records')
			rec_index = 0
			for record in result['RequestResponses']:
				if 'ErrorCode' in record:
					# Resend the record
					firehose_client.put_record(DeliveryStreamName=firehose_name,Record=batch[rec_index])
					# Stop if all failed records have been resent
					num_failures -= 1
					if not num_failures:
						break
				rec_index += 1
	logging.info('Data sent to Firehose stream')

確認

過去日のtimestampのデータを登録します。いったんは登録されますが、その後、TTLによって削除されます。

しばらくしてS3に格納されていることが確認できました。

ファイルの中身はこんな感じで、レコードのデータが格納されています。

おわりに

今回はコンソールを通じて構築しましたが、CLIを使ってやった方が圧倒的に早いし楽です。ただ、自分自身CLIを触ったことが少なかったこともあり、『コンソールでやるこの作業がCLIでいうとこのコマンドになるのか』といったように相互に理解が進んだので、その点は良かったかなと思います。ここまでご覧いただきありがとうございました。

脚注
  1. https://aws.amazon.com/jp/s3/pricing/ ↩︎

Discussion