DynamoDB初めて触ってみたときのメモ(備忘録)
DynamoDBは使ってみたいけど、料金体系やパーティションキーやソートキーが良く分からないので、「まぁとりあえずRDS使おう」みたいにずっと敬遠してました。
基礎中の基礎ではありますが、やっと勉強始めましたので、忘れないようにメモ残しときます。
自分のモヤモヤした疑問が晴れた分かりやすい動画でした。
カプコンのエンジニアの方の登壇。
- RDB脳、DynamoDB脳、考え方を変える必要がある。
- パーティションキー毎に物理的な場所に配置されるので、分散するように工夫する。ゲームとかだと大抵ユーザID(UUID)になる。
- ソートキーは、RDBのテーブル名@おおまかな分類IDみたいなものにして管理。
- よっぽど大規模じゃない限り1個または数個のテーブルで済む。キャパシティユニットはテーブル単位で設定管理するので、少ない方が楽。
- 沢山クエリする場合はBatchを使う。とにかくこれで解決させる。
- オンデマンドでも失敗する。基本はプロビジョニング設定で使えるようにしておくのが安心。
- どうしてもテーブルが爆増しそうなら、そもそもDynamoDB向いてないかもしれない。
パーティションキーとソートキーが決まったら、あとはガンガン突っ込んでいけばOK。ぐらいの気軽さでまずは使っていこうかと思います。もちろん甘く見ているわけじゃないですが、気軽に使えるメリットをまずは享受したい。
DynamoDBの制限
割と制限がある印象。これを超えるようなサイズはBatchでいくつかのクエリに分割してやるんでしょうかね。追々直面すると思います。
項目 | 制限 |
---|---|
テーブルサイズ | 無制限 |
Itemサイズ(RDBのレコードに相当) | 400KB |
パーティションキーの長さ | 2048B |
ソートキーの長さ | 1024B |
属性名の長さ | 255B |
属性値の長さ | 400KB Itemサイズで400KBを超えない |
グローバルセカンダリインデックス | 20個 |
ローカルセカンダリインデックス | 5個 |
API等で読み込みできるアイテム | 1MB |
API等で書き込みできるアイテム | 最大25個のItemまたは16MB |
トランザクションの制限 | 100個のItemまたは4MB |
テーブルの作成
とりあえずテーブルとキー名だけ決めて、あとはほぼ全部デフォルトにします。
キャパシティユニットはReadもWriteもとりあえず1CUとしました。AutoscalingはとりあえずOFFにします。あとで負荷かけてエラーを発生させてみたいので。
セカンダリインデックスはとりあえず無しです。
読み書きしてみる(Boto3)
PythonのBoto3で読み書きしてみます。
# test.py
import json
import decimal
import boto3
import time
from boto3.dynamodb.conditions import Key
# DynamoDBに接続
dynamodb = boto3.resource('dynamodb', region_name="ap-northeast-1")
# テーブル名を指定
table_name = 'test_table1'
table_pk = "pk"
table_sk = "sk"
table = dynamodb.Table(table_name)
#----------------------------------
# Create - アイテムの作成
#----------------------------------
def create_item(pk, sk, additional_attributes):
print("--- create_item ---")
item = {
'pk': pk,
'sk': sk,
}
item.update(additional_attributes)
response = table.put_item(Item=item)
print(response)
#----------------------------------
# Read - アイテムの読み取り
#----------------------------------
def read_item(pk, sk):
print("--- read_item ---")
response = table.get_item(
Key={
'pk': pk,
'sk': sk
}
)
print(response)
item = response.get('Item')
print(item)
return item
def test1():
# float型をそのまま入力するとDynamoDB関数でエラーになる
# Decimal型である必要がある。
# json.loadsで読み込みさせてそのときにfloat => Decimalに変換させる。
json_data = {"x": 0.00, "y": -12.23, "z": -0.00091, "created_at": "20240812T000000.000+0900"}
item = json.loads(json.dumps(json_data), parse_float=decimal.Decimal)
create_item('example_pk', 'example_sk2@1', item)
# Read(get_item)
read_item('example_pk', 'example_sk2@1')
if __name__ == "__main__":
test1()
実行
> python test.py
boto3の各APIのレスポンスの内容です。
put_itemのレスポンス。成功したかどうかはHTTPStatusCodeが200か否かですかね。
{
"ResponseMetadata": {
"RequestId": "K97V9T8OSMJ7UTEKK237BF1SJFVV4KQNSO5AEMVJF66Q9ASUZZZZ",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "Server",
"date": "Mon, 12 Aug 2024 07:09:57 GMT",
"content-type": "application/x-amz-json-1.0",
"content-length": "2",
"connection": "keep-alive",
"x-amzn-requestid": "K97V9T8OSMJ7UTEKK237BF1SJFVV4KQNSO5AEMVJF66Q9ASUZZZZ",
"x-amz-crc32": "2745614147"
},
"RetryAttempts": 0
}
}
get_itemのレスポンス。成否はHTTPStatusCode、get_itemは1個だけしか値を取得できないので、ItemキーにItemのJsonが返ってきます。
{
"ResponseMetadata": {
"RequestId": "EV7SOHC9S0NBPCLP8NFH01HV03VV4KQNSO5AEMVJF66Q9ASUYYYY",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "Server",
"date": "Mon, 12 Aug 2024 07:09:57 GMT",
"content-type": "application/x-amz-json-1.0",
"content-length": "114",
"connection": "keep-alive",
"x-amzn-requestid": "EV7SOHC9S0NBPCLP8NFH01HV03VV4KQNSO5AEMVJF66Q9ASUYYYY",
"x-amz-crc32": "401378920"
},
"RetryAttempts": 0
},
"Item": {
"y": Decimal("-12.23"),
"z": Decimal("-0.00091"),
"sk": "example_sk2@1",
"created_at": "20240812T000000.000+0900",
"pk": "example_pk",
"x": Decimal("0")
}
}
コンソールでも見てみます。
更新はupdate_item, 削除はdelete_itemがありますが、今回は省略。
取得するAPIには、get_item以外にもqueryというのがあって、こちらは結果をN個取得する機能で、条件式とか設定してもう少し高度な取得方法が提供されています。
以下はパーティションキーだけ指定して全件取得。
#expression = Key('pk').eq('example_pk') & Key('sk').between('2024-01-01', '2024-12-31')
#expression = Key('pk').eq(pk) & Key('sk').eq(sk)
expression = Key('pk').eq(pk)
response = table.query( KeyConditionExpression=expression )
print(response)
レスポンスをダンプしたものが下記になります。Itemsという配列にItemが2個返ってきます。
{
"Items": [
{
"y": Decimal("-12.23"),
"z": Decimal("-0.00091"),
"sk": "example_sk2@1",
"created_at": "20240812T000000.000+0900",
"pk": "example_pk2",
"x": Decimal("0")
},
{
"y": Decimal("0.001"),
"z": Decimal("-3.03"),
"sk": "example_sk2@2",
"created_at": "20240812T000001.000+0900",
"pk": "example_pk2",
"x": Decimal("99.123")
}
],
"Count": 2,
"ScannedCount": 2,
"ResponseMetadata": {
(省略)
}
}
キャパシティユニットの検証
まずは、Autoscalingオフで1RCUが設定されている状態で、Readクエリを大量に投げてみます。
データが登録されている状態で、0.1sec間隔でquery実行してみる。
pk = "example_pk2"
expression = Key('pk').eq(pk)
while True:
response = table.query( KeyConditionExpression=expression )
print(response)
time.sleep(0.1)
エラー終了するかと予想してたけど、ならないです。ただしログの流れ方を見ると、一定周期でレスポンスが遅く返ってくる動きをしていました。
レスポンスを見ると、RetryAttempts=5になってます。DynamoDBのスロットルで取得エラーが起きていて、リトライ処理されている感じです。
{
"Items": [
(省略)
],
"Count": 2,
"ScannedCount": 2,
"ResponseMetadata": {
"RequestId": "66H1TOBFLGR2FFAP08R9H69AJRVV4KQNSO5AEMVJF66Q9ASUTTTT",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "Server",
"date": "Mon, 12 Aug 2024 15:21:57 GMT",
"content-type": "application/x-amz-json-1.0",
"content-length": "347",
"connection": "keep-alive",
"x-amzn-requestid": "66H1TOBFLGR2FFAP08R9H69AJRVV4KQNSO5AEMVJF66Q9ASUTTTT",
"x-amz-crc32": "3931018412"
},
"RetryAttempts": 5
}
}
CloudWatchメトリクスを見てみます。ピークが2つありますが、0.1sec間隔でReadしたところは、後半のピークです。4RCUまで跳ね上がってますが、関数エラーにはならずに、調整されて1.5RCUまで落ちてます。その後ずっとこの調子で動いていたので、手動でプログラム停止しました。
4RCUで少しの間処理できているのは、バーストキャパシティというDynamoDBの仕様で、過去300secほどのキャパシティの空きを使って、一時的に上限を超えて処理してくれる機能らしい。急にスパイク状のアクセスが発生したときに吸収してくれる感じです。
その後、1.5RCU程度でリトライしながら調整されて処理しているのは、DynamoDBの機能ではなく、boto3のSDKの内部実装っぽいです。エクスポネンシャルバックオフという概念で、遅延時間を延ばしながら再試行してくれます。
このリトライ回数をゼロに設定してからもう一度負荷かけてみます。
import boto3
from botocore.config import Config
# 現在の設定を確認するためのConfigオブジェクトを作成
my_config = Config(
retries = {
'max_attempts': 0, # 最大リトライ回数
}
)
# DynamoDBに接続
dynamodb = boto3.resource('dynamodb',
config=my_config, region_name="ap-northeast-1")
実行した結果の抜粋です。リクエストのHTTPステータスと、RetryAttemptsをprint出力してます。最初は正常処理されていて、キャパシティ超えたときにProvisionedThroughputExceededExceptionの例外が発生してエラー終了しました。
リトライしていても、データ処理量に対して回数が少ないと、処理が追い付かなくて上記の例外が発生するということです。
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 0
Traceback (most recent call last):
:
botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the Query operation (reached max retries: 0): The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.
キャパシティのオートスケールを確認
次にテーブル作成時にオフにしていたキャパシティのオートスケールを試してみます。下図のように最大10CUまで、ターゲットは最大の90%でスケールできるようにします。
リトライオフでやってみたところ、スケールが間に合わないのか例外がスローされるので、max_attempts=5に設定して検証しました。
スケールが必要なタイミングで、下記のようにSDK内部でリトライが断続的に発生していて、スケールが完了すると、リトライが発生しなくなりました。
status = 200, retry = 0
status = 200, retry = 1
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 2
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 0
status = 200, retry = 1
status = 200, retry = 0
このタイミングで、テーブルの情報を取得(後述)すると、ReadCapacityUnits: 5になってます。オートスケールされていることを確認できました。
{
"Table": {
"AttributeDefinitions": [
(省略)
],
"TableStatus": "ACTIVE",
"CreationDateTime": (省略),
"ProvisionedThroughput": {
"LastIncreaseDateTime": (省略),
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 1
},
(省略)
}
}
メトリクスでも確認してみます。
オートスケールが遅れて上昇するまでは、リトライしながら処理を続けており、オートスケール後は安定して読み取り処理が続いてます。その後、プログラムを停止して読み取りが亡くなると、遅れて1RUCまでスケールアウトしたのが分かります。
キャパシティユニットのオートスケール自体は課金はないので、基本的にオンしておけばいいということですね。あとは最大値どれくらいにするかですけど。
(参考)テーブルのConfigを取得
テーブルの設定を取得する方法。
import json
import time
import boto3
# DynamoDBに接続
dynamodb = boto3.client('dynamodb', region_name='ap-northeast-1')
# テーブルの設定を取得
response = dynamodb.describe_table(TableName="test_table1")
print(response["Table"])
Discussion