DynamoDBに保存されているデータをTTLでS3にバックアップ
はじめに
こんにちは。今日はDynamoDBに保存されているデータをTTLでS3にバックアップする構成をガイダンスを参照しながら構築した際の話です。
東京リージョンでは、以下のように料金体系が組まれているDynamoDB。
・DynamoDB Standard テーブルクラスを使用すると、
最初に保存される 25 GB (1 か月あたり) は無料です
・それ以降、0.285USD/GB-月
アクセス頻度が低いログデータ等のデータを蓄積するテーブルで、データライフサイクルを考慮せずにテーブルを作成した後、どんどんデータが溜まっていき、結果、毎月その分のコストを支払う...みたいなことはもったいないです。とはいえ、アプリケーション側の既存構成はあまりいじりたくないし、TTLで消去するにもログデータをいつ参照するかも分からないし、TTLで削除するのではなく、これをトリガーにS3に保存する方法はないか?という考えになりました。
実際に、ガイダンスがまとめられていましたので、こちらを参照しながら環境を構築していこうと思います。こちらのガイダンスではCLIを用いていますが、自分がCLIをあまり理解できていないこともあり、訳のわからないまま作業するのは今後のためにならないと思ったので、今回はCLIのコマンドと照らし合わせながら、コンソール画面での作業中心に進めていこうと思います。
DynamoDBテーブルの作成
とりあえずお試しなので、最低限の設定とします。TTLに設定するAttributeをtimestamp
にしたかったので、ソートキーにtimestamp
を設定しました。
テーブル詳細の「エクスポートおよびストリーム」>「DynamoDBストリームの詳細」からDynamoDBストリームをオンにします。表示タイプは新旧イメージ
にします。ガイダンスにあるLambdaをそのまま使用する関係上、新旧イメージにしていますが、削除だけ捌きたいのであれば、後述のLambdaを編集して、古いイメージを選択するのでも問題ないと思います。
TTLを有効化します。ここで指定するのはtimestamp
とします。
S3バケットの作成
とりあえずデフォルト設定です。
ここで、ライフサイクルポリシーを作成して、S3 Glacier等の長期保管用のストレージクラスに移行する仕組みを作るのもアリですが、S3→S3 Glacierに転送する際にも料金が発生します。[1]
DynamoDBからS3へTTLで転送してくるデータが小さくてもファイル数が大量になるならば、思ったよりコストがかかる懸念があるので、見積ツールなどを使用して、どちらが望ましいか選択することが必要だと思います。今回は通常のS3で構築したいと思います。
Kinesis Data Firehoseの作成
IAMポリシーを作成します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "1",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"{your s3_bucket ARN}/*",
"{Your s3 bucket ARN}"
]
}
]
}
IAMロールを作成します。ポリシーには先ほど作成したIAMポリシーを紐づけ、信頼されたエンティティにはFirehoseを指定します。以下の信頼ポリシーが作成されます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sts:AssumeRole"
],
"Principal": {
"Service": [
"firehose.amazonaws.com"
]
}
}
]
}
firehoseを作成します。
Lambda関数の作成
IAMロールを作成します。信頼されたエンティティにはLambdaを指定します。ロールに充てるポリシーはガイダンスのものをそのまま適用しています。
Lambda関数を作成します。ランタイムにはPython3.8を指定します。
Lambda関数の環境変数を設定します。
Lambda関数のトリガーを設定します。
Lambda関数のコードは、ガイダンスにもある下記をそのまま引用します。zipインポートでもコピペでも大丈夫だと思います。ただし、ReservationID
やReservationDate
はDynamoDBの構成次第で変える必要があります。今回は、ReservationID
のところを構成したDynamoDBのPKであるid
としました。また、ハンドラもlambda_handler
にしました。Lambda作成はCLI経由の方が圧倒的に楽です。
import json
import logging
import time
import boto3
import os
from botocore.exceptions import ClientError
def lambda_handler(event, context):
# Set up logging
logging.basicConfig(level=logging.DEBUG,format='%(levelname)s: %(asctime)s: %(message)s')
oldRecords = []
#1. Iterate over each record , in this example we are only focused on REMOVE deleted items by DynamoDB
try:
for record in event['Records']:
#2. Handle event by type
if record['eventName'] == 'INSERT':
handle_insert(record)
elif record['eventName'] == 'MODIFY':
handle_modify(record)
elif record['eventName'] == 'REMOVE':
if (record['userIdentity']['principalId'] == 'dynamodb.amazonaws.com'):
oldRecords.append(record)
handle_remove(oldRecords)
except Exception as e:
logging.error(e)
return "Error"
def handle_insert(record):
logging.info("Handling INSERT Event")
#3a. Get newImage content
newImage = record['dynamodb']['NewImage']
#3b. Parse values
newId = newImage['id']['S']
#3c. log it
logging.info ('New row added with ID=' + newId)
logging.info("Done handling INSERT Event")
def handle_modify(record):
logging.info("Handling MODIFY Event")
#3a. Parse oldImage and score
oldImage = record['dynamodb']['OldImage']
oldReservationDate = oldImage['ReservationDate']['N']
#3b. Parse oldImage
newImage = record['dynamodb']['NewImage']
newReservationDate = newImage['ReservationDate']['N']
#3c. Check for change
if oldReservationDate != newReservationDate:
logging.info('Reservation Date Changed - oldReservationDate=' + str(oldReservationDate) + ', newReservationDate=' + str(newReservationDate))
logging.info("Done handling MODIFY Event")
def handle_remove(oldRecords):
logging.info("Handling REMOVE Event")
firehose_client = boto3.client('firehose')
# Assign these values before running the program
firehose_name = os.environ['firehose_name']
bucket_arn = os.environ['bucket_arn']
iam_role_name = os.environ['iam_role_name']
batch_size = int(os.environ['batch_size'])
oldImages=[]
#3a. Parse oldImage
for record in oldRecords:
oldImage = record['dynamodb']['OldImage']
oldImages.append(oldImage)
#3b. Parse values
oldId = oldImage['id']['S']
#3c. log it
logging.info ('Row removed with ID=' + oldId)
#3d. Determine the size of the archived data and process in a batch of up to 400 records
oldImagesSize = len(oldImages)
result=None
if oldImagesSize > 0 and oldImagesSize < batch_size :
batch = [{'Data': json.dumps(oldImages)}]
try:
result=firehose_client.put_record_batch(DeliveryStreamName=firehose_name,Records=batch)
except ClientError as e :
logging.error(e)
exit(1)
elif oldImagesSize > batch_size :
# Break the list to a batch size of 400 and put record batch
chunckedOldImagesList = [oldImages[i * batch_size:(i + 1) * batch_size] for i in range((len(oldImages) + batch_size - 1) // batch_size )]
for list in chunckedOldImagesList :
batch = [{'Data': json.dumps(list)}]
try:
result=firehose_client.put_record_batch(DeliveryStreamName=firehose_name,Records=batch)
except ClientError as e :
logging.error(e)
exit(1)
# Check for records in the batch did not get processed
if result :
num_failures = result['FailedPutCount']
if num_failures:
# Resend failed records
logging.info('Resending {num_failures} failed records')
rec_index = 0
for record in result['RequestResponses']:
if 'ErrorCode' in record:
# Resend the record
firehose_client.put_record(DeliveryStreamName=firehose_name,Record=batch[rec_index])
# Stop if all failed records have been resent
num_failures -= 1
if not num_failures:
break
rec_index += 1
logging.info('Data sent to Firehose stream')
確認
過去日のtimestamp
のデータを登録します。いったんは登録されますが、その後、TTLによって削除されます。
しばらくしてS3に格納されていることが確認できました。
ファイルの中身はこんな感じで、レコードのデータが格納されています。
おわりに
今回はコンソールを通じて構築しましたが、CLIを使ってやった方が圧倒的に早いし楽です。ただ、自分自身CLIを触ったことが少なかったこともあり、『コンソールでやるこの作業がCLIでいうとこのコマンドになるのか』といったように相互に理解が進んだので、その点は良かったかなと思います。ここまでご覧いただきありがとうございました。
Discussion