DynamoDB の具体的なユースケースや考慮点を模索

2022/07/17に公開約23,700字

目的

実際のユースケースを参考に、DynamoDB (NoSQL) 特有のDynamoDB のテーブル・インデックスの設計、それに対する効率的なデータの登録、要件に応じたクエリの実行を実践してみる。

イベント履歴管理 - Hash + Range keyの利用例

[1] のユースケース例を参考

やりたいこと

  • デバイスごとのイベント履歴を記録したい。
  • デバイス ID を Hash Key。
  • 時間軸で各デバイス毎のイベントの検索要件があると仮定して、タイムスタンプを Range Key にする。

デモ

  1. テーブル作成。タイムスタンプは特定のISOフォーマットで文字列型として登録すればOK
aws dynamodb delete-table --table-name device
aws dynamodb create-table \
    --table-name device \
    --attribute-definitions \
        AttributeName=deviceId,AttributeType=S \
        AttributeName=timestamp,AttributeType=S \
    --key-schema \
        AttributeName=deviceId,KeyType=HASH \
        AttributeName=timestamp,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST
  1. テストデータを登録
import boto3
import random
from datetime import datetime

tablename = 'device'
dynamodb = boto3.resource('dynamodb')

def put_test_data(device_number, table):
    with table.batch_writer() as writer:
        for i in range(100):
            timestamp = datetime(2022, 1, 1, int(random.uniform(0, 23)), int(random.uniform(0, 59)), int(random.uniform(0, 59)))
            writer.put_item(
                Item={
                    'deviceId': f'device#{device_number}',
                    'timestamp': timestamp.isoformat() + 'Z',
                    'event': 'event test message'
                }
            )

table = dynamodb.Table(tablename)
for device_number in range(100):
    put_test_data(str(device_number).zfill(3), table)

こういうデータが登録される

  1. 時間軸で特定のデバイスのイベントを検索する例
import json
import boto3
from boto3.dynamodb.conditions import Key
import time

dynamodb = boto3.resource('dynamodb')
tablename = 'device'

deviceId='device#021'
start_timestamp = '2022-01-01T16:00:00Z'  
end_timestamp = '2022-01-01T18:00:00Z'  

table = dynamodb.Table(tablename)
option = {
    'KeyConditionExpression':
        Key('deviceId').eq(deviceId) & \
        Key('timestamp').between(start_timestamp, end_timestamp)
}

res = table.query(**option)

for i in res['Items']:
    print(i)

実行結果:

# python3 get_sample1.py 
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T16:11:16Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T16:11:35Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T16:27:48Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T16:30:07Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T16:53:42Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T17:15:48Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T17:20:50Z'}
{'event': 'event test message', 'deviceId': 'device#021', 'timestamp': '2022-01-01T17:43:16Z'}

ソーシャル画像共有アプリ - 複数テーブルによるデータモデル、LSI、GSIの利用例

[1] のユースケース例を参考

やりたいこと

  • 画像共有できるような SNS アプリをイメージ。

  • クエリの実行要件としては以下を想定

    1. ユーザー毎に友人の一覧を取得する
    2. ユーザー毎に投稿した画像の一覧を取得する
    3. ユーザー毎に特定の時間帯に投稿された画像を取得する
    4. 特定の画像にタグづけされたユーザーの一覧を取得する
    5. 特定のユーザーがタグ付けされた画像の一覧を取得する
  • 要件盛りだくさんのため、複数のテーブル、ただなるべくテーブル数は少なくしてこれらの要件を全て満たしたい。

  • 想定するテーブルは以下の3つ。

    • 1 の要件を満たすための Friendsテーブル。
    user(Hash) friend(Range)
    Text Text
    • 2 の要件を満たすための Images テーブル。
    • 3 の要件を満たすために LSI (Range=Date) を付与する
    user(Hash) image(Range) date link
    Text Text Text Text
    • 4 の要件を満たすための ImageTags テーブル。
    • 5 の要件を満たすために GSI (Hash=TagUser/Range=Image) を付与する
    image(Hash) taguser(Range)
    Text Text

デモ

  1. テーブル作成。
aws dynamodb delete-table --table-name Friends
aws dynamodb delete-table --table-name Images
aws dynamodb delete-table --table-name ImageTags

aws dynamodb create-table \
    --table-name Friends \
    --attribute-definitions \
        AttributeName=user,AttributeType=S \
        AttributeName=friend,AttributeType=S \
    --key-schema \
        AttributeName=user,KeyType=HASH \
        AttributeName=friend,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST

aws dynamodb create-table \
    --table-name Images \
    --attribute-definitions \
        AttributeName=user,AttributeType=S \
        AttributeName=image,AttributeType=S \
	AttributeName=date,AttributeType=S \
    --key-schema \
        AttributeName=user,KeyType=HASH \
        AttributeName=image,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --local-secondary-indexes \
        "[
            {
                \"IndexName\": \"UserDate-LSI\",
                \"KeySchema\": [{\"AttributeName\":\"user\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"date\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                }
            }
        ]"
	
aws dynamodb create-table \
    --table-name ImageTags \
    --attribute-definitions \
        AttributeName=image,AttributeType=S \
        AttributeName=taguser,AttributeType=S \
    --key-schema \
        AttributeName=image,KeyType=HASH \
        AttributeName=taguser,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --global-secondary-indexes \
        "[
            {
                \"IndexName\": \"TagUserImage-GSI\",
                \"KeySchema\": [{\"AttributeName\":\"taguser\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"image\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                }
            }
        ]"
  1. テストデータを登録
import boto3
import random
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
tablename1 = 'Friends'
tablename2 = 'Images'
tablename3 = 'ImageTags'

# Friends
table1 = dynamodb.Table(tablename1)
with table1.batch_writer() as writer:
    writer.put_item(Item={'user': 'Bob','friend': 'Alice'})
    writer.put_item(Item={'user': 'Alice','friend': 'Bob'})
    writer.put_item(Item={'user': 'Alice','friend': 'Carol'})
    writer.put_item(Item={'user': 'Alice','friend': 'Dan'})
    writer.put_item(Item={'user': 'Carol','friend': 'Alice'})
    writer.put_item(Item={'user': 'Dan','friend': 'Alice'})

#Images
table2 = dynamodb.Table(tablename2)
with table2.batch_writer() as writer:
    writer.put_item(Item={'user': 'Bob','image': 'abcde','date':'2020-12-31','link': 's3://xxx'})
    writer.put_item(Item={'user': 'Bob','image': 'vwxyz','date':'2021-04-01','link': 's3://xxx'})
    writer.put_item(Item={'user': 'Bob','image': 'efghj','date':'2021-12-14','link': 's3://xxx'})
    writer.put_item(Item={'user': 'Alice','image': 'lmnop','date':'2022-02-15','link': 's3://xxx'})

#Images
table3 = dynamodb.Table(tablename3)
with table3.batch_writer() as writer:
    writer.put_item(Item={'image': 'abcde','taguser': 'Alice'})
    writer.put_item(Item={'image': 'abcde','taguser': 'Carol'})
    writer.put_item(Item={'image': 'vwxyz','taguser': 'Bob'})
    writer.put_item(Item={'image': 'efghj','taguser': 'Alice'})
    writer.put_item(Item={'image': 'lmnop','taguser': 'Bob'})
  1. 検索クエリ
import boto3
import random
from boto3.dynamodb.conditions import Key
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
tablename1 = 'Friends'
tablename2 = 'Images'
tablename3 = 'ImageTags'
lsi='UserDate-GSI'

print('##### 1.ユーザー毎に友人の一覧を取得する #####')
table = dynamodb.Table(tablename1)
option = {
    'KeyConditionExpression': Key('user').eq('Alice')
}
res = table.query(**option)
for i in res['Items']:
    print(i)
print('\n')

print('##### 2. ユーザー毎に投稿した画像の一覧を取得する #####')
table = dynamodb.Table(tablename2)
option = {
    'KeyConditionExpression': Key('user').eq('Bob')
}
res = table.query(**option)
for i in res['Items']:
    print(i)
print('\n')

print('##### 3. ユーザー毎に特定の時間帯に投稿された画像を取得する #####')
table = dynamodb.Table(tablename2)
option = {
    'IndexName':'UserDate-LSI',
    'KeyConditionExpression':
        Key('user').eq('Bob') & \
        Key('date').between('2021-12-01', '2021-12-31')
}
res = table.query(**option)
for i in res['Items']:
    print(i)
print('\n')

print('##### 4. 特定の画像にタグづけされたユーザーの一覧を取得する #####')
table = dynamodb.Table(tablename3)
option = {
    'KeyConditionExpression': Key('image').eq('abcde')
}
res = table.query(**option)
for i in res['Items']:
    print(i)
print('\n')

print('##### 5. 特定のユーザーがタグ付けされた画像の一覧を取得する #####')
table = dynamodb.Table(tablename3)
option = {
    'IndexName':'TagUserImage-GSI',
    'KeyConditionExpression': Key('taguser').eq('Alice')
}
res = table.query(**option)
for i in res['Items']:
    print(i)
print('\n')

実行結果:

# python3 get_sample2.py 
##### 1.ユーザー毎に友人の一覧を取得する #####
{'user': 'Alice', 'friend': 'Bob'}
{'user': 'Alice', 'friend': 'Carol'}
{'user': 'Alice', 'friend': 'Dan'}


##### 2. ユーザー毎に投稿した画像の一覧を取得する #####
{'user': 'Bob', 'date': '2020-12-31', 'link': 's3://xxx', 'image': 'abcde'}
{'user': 'Bob', 'date': '2021-12-14', 'link': 's3://xxx', 'image': 'efghj'}
{'user': 'Bob', 'date': '2021-04-01', 'link': 's3://xxx', 'image': 'vwxyz'}


##### 3. ユーザー毎に特定の時間帯に投稿された画像を取得する #####
{'user': 'Bob', 'date': '2021-12-14', 'link': 's3://xxx', 'image': 'efghj'}


##### 4. 特定の画像にタグづけされたユーザーの一覧を取得する #####
{'taguser': 'Alice', 'image': 'abcde'}
{'taguser': 'Carol', 'image': 'abcde'}


##### 5. 特定のユーザーがタグ付けされた画像の一覧を取得する #####
{'taguser': 'Alice', 'image': 'abcde'}
{'taguser': 'Alice', 'image': 'efghj'}

マルチプレーヤーゲーム - Conditional Update

[1] のユースケース例を参考

やりたいこと

  • 複数のプレイヤーがリアルタイムで同時に、同じ敵に攻撃してダメージを与えるようなゲームを想定。
  • 同じエンティティ(敵)のデータ(HP)が同時多発的に更新されなければならないので、プレイヤーや攻撃頻度が多ければ多いほど、RDBMS だとロックが同一のレコードに多発するなどして高遅延は発生しそう。
  • その辺り、結果整合性の DynamoDB ならば、高頻度の更新にも低遅延で対処できるのではないか。
  • 敵の HP > 0を条件にしたConditional Writeで実装する。
    • HP > 0 の場合、攻撃力分 HP をマイナス
    • HP < 0 でupdateがエラーで失敗した場合、敵を倒したと判断するロジック。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.ConditionalUpdate

デモ

  1. テーブル作成。
aws dynamodb delete-table --table-name boss
aws dynamodb create-table \
    --table-name boss \
    --attribute-definitions \
        AttributeName=bossid,AttributeType=S \
    --key-schema \
        AttributeName=bossid,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST
aws dynamodb put-item --table-name boss \
--item '{ "bossid": { "S": "boss001" },"hp": { "N": "5000" },"mp": { "N": "1000" },"attack": { "N": "250" },"defence": { "N": "150" }}'
  1. HP5000の敵に対して、並列スレッドで攻撃。Conditional Write で条件に合致しない状態になったことを持って敵を討伐したというロジックにする
import boto3
import threading
from botocore.exceptions import ClientError

dynamodb = boto3.resource('dynamodb')
table=dynamodb.Table('boss')
attack_point=-200
hp_killed=0
boss_killed_flag=0

def attack(table):
    global boss_killed_flag
    if boss_killed_flag == 0:
        try:
            option = {
                'Key': {'bossid': 'boss001'},
                'UpdateExpression': 'ADD hp :attack_point',
                'ConditionExpression': 'hp > :hp_killed',
                'ExpressionAttributeValues': {
                    ':attack_point' : attack_point,
                    ':hp_killed' : hp_killed
                },
                'ReturnValues':'UPDATED_NEW'
            }
            res=table.update_item(**option)
            print('残HP : ' + str(res['Attributes']['hp']))
        except Exception as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                boss_killed_flag=1

def run():
    attack(table)

parallel=30
t=[''] * parallel
for i in range(parallel):
    t[i] = threading.Thread(target=run)
    t[i].start()
for i in range(parallel):
    t[i].join()

if boss_killed_flag==1:
    print('boss001 already killed !!!')   

実行結果

# python3 update_sample.py 
残HP : 4800
残HP : 4600
残HP : 4400
残HP : 4200
残HP : 4000
残HP : 3800
残HP : 3600
残HP : 3400
残HP : 3000
残HP : 2800
残HP : 3200
残HP : 2600
残HP : 2400
残HP : 2000
残HP : 1800
残HP : 2200
残HP : 1600
残HP : 1400
残HP : 1200
残HP : 1000
残HP : 800
残HP : 600
残HP : 400
残HP : 200
残HP : 0
boss001 already killed !!!

投票システム - Write Sharding

[1] のユースケース例を参考

やりたいこと

  • 候補者毎に投票数を記録するテーブルを作成する。
  • 候補者を Hash キーにしたいところ、その場合、特定の候補者に票が集まる際に DynamoDB の特定のパーティションに対して書き込みが集中してシャーディングのメリットが得られないため、候補者名に対してSuffixを付与することで書き込みのシャーディングが機能するように工夫する。
  • Hash キーを候補者名にSuffixを付与した場合、候補者毎の標数の集計はその全 Suffix 分のデータレコードの票数を加算する必要がある点には注意。
  • 以下の戦略が参考になる

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/bp-partition-key-sharding.html#bp-partition-key-sharding-random

デモ

  1. テーブル作成。
aws dynamodb delete-table --table-name election
aws dynamodb create-table \
    --table-name election \
    --attribute-definitions \
        AttributeName=candidate,AttributeType=S \
    --key-schema \
        AttributeName=candidate,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST
  1. テストデータを登録
    • 候補者は Kevin,Alice,Mark,Joe の 4 名
    • それぞれの候補者名の後に Suffix (この例では 0 ~ 99 の数字)
    • これにより Write Sharding が可能。
import boto3
import random
from datetime import datetime

tablename = 'election'
dynamodb = boto3.resource('dynamodb')

def put_test_data(candidate, table):
    table = dynamodb.Table(tablename)
    candidate = candidate + '_' +  str(random.randint(0, 100))
    votes=int(random.randint(0, 100))
    table.put_item(
        Item={
            'candidate': f'{candidate}',
            'votes': votes
        }
    )

candidate_list=['Kevin','Alice','Mark','Joe']
for n in range(100):
    for candidate in candidate_list:
        put_test_data(candidate,tablename)

こういうデータが登録される

  1. 候補者毎の 全 Suffix を検索して候補者毎の全票数を算出する
import json
import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
tablename = 'election'
table = dynamodb.Table(tablename)

def get_votes(candidate_name):
    votes=0
    for n in range(100):
        candidate=candidate_name + '_' + str(n)
        option = {
            'KeyConditionExpression':
                Key('candidate').eq(candidate)
        }
        res = table.query(**option)
        try:
            votes+=res['Items'][0]['votes']
        except:
            pass
    return votes

candidate_list=['Kevin','Alice','Mark','Joe']

for candidate in candidate_list:
    print(candidate + ' : ' + str(get_votes(candidate)))

実行結果

# python3 get_sample4.py 
Kevin : 3064
Alice : 3305
Mark : 3330
Joe : 3200

チケット予約サイト - 読み込み整合性(Read Consistency)の使い分け

やりたいこと

  • チケット予約サイトで予約枠の残数の頻繁な検索があるようなシステムを想定する。
  • 利用者にとって重要なのは、予約枠の残数に余裕がある場合は大凡どの程度の空きがその時点であるか、残数に余裕がない場合は枠が0でないかどうかが懸念事項のためその時点の予約枠の残数の最新の値を把握できることとする。
  • そのため、予約枠の残数に余裕がある場合は通常の結果整合性のある読み込み、残数に余裕がなくなってきた場合は強力な整合性のある読み込みを使って予約枠の残数を検索するようなワークロードを想定。
  • CAP 定理の、A(Availability) と C(Consistency) を適宜切り替えるという例。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html

デモ

  1. テーブル作成。
    • 予約残初期値=1000
    • 整合性の差による性能を検証したいので負荷が高いほど性能が上がるオンデマンドではなく、プロビジョンドでWCU、RCUは固定値。
aws dynamodb delete-table --table-name ticket
aws dynamodb create-table \
    --table-name ticket \
    --attribute-definitions \
        AttributeName=ticketid,AttributeType=S \
    --key-schema \
        AttributeName=ticketid,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=10,WriteCapacityUnits=10
aws dynamodb put-item --table-name ticket \
--item '{ "ticketid": { "S": "livefestival" },"vacant_reservations": { "N": "1000" }}'

  1. 予約兼予約残を確認するスクリプト
    • 予約残枠が rc_threshold 変数以下になったら強力な整合性に変更
import boto3
import threading
from boto3.dynamodb.conditions import Key
import random
import time

dynamodb = boto3.resource('dynamodb')
table=dynamodb.Table('ticket')
strong_rc=0
endflag=0
rc_threshold=200

def ticket_reserve():
    global endflag
    while True:
        reserve=1
        try:
            option = {
                'Key': {'ticketid': 'livefestival'},
                'UpdateExpression': 'ADD vacant_reservations :minus_reserve',
                'ConditionExpression': 'vacant_reservations  > :reserve',
                'ExpressionAttributeValues': {
                    ':reserve' : reserve,
                    ':minus_reserve' : -reserve
                },
                'ReturnValues':'UPDATED_NEW'
            }
            res=table.update_item(**option)
        except Exception as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                print('Ticket の空きが足りず予約できません')
                endflag=1
                break

def rc_control():
    global endflag
    global strong_rc
    while True:
        option = {
            'KeyConditionExpression':
                Key('ticketid').eq('livefestival')
        }
        res = table.query(**option)
        vacant=res['Items'][0]['vacant_reservations']
        if res['Items'][0]['vacant_reservations'] <= rc_threshold:
            strong_rc=1
        print('Strong Read Consistency : ' + str(strong_rc))
        time.sleep(0.1)

        if endflag==0:
            continue
        else:
            break

def run1():
    ticket_reserve()
def run2():
    rc_control()

t=threading.Thread(target=run1)
t.start()
c=threading.Thread(target=run2)
c.start()

rc_count=0
rc_elapsed_time=0
strong_rc_count=0
strong_rc_elapsed_time=0
while True:
    if strong_rc != 1:
        start=time.time()
        option = {
            'KeyConditionExpression': Key('ticketid').eq('livefestival')
        }
        res = table.query(**option)    
        end=time.time()
        elapsed_time = end - start
        rc_count+=1
        rc_elapsed_time += elapsed_time
        print('Ticket Vacant Reservations: ' + str(res['Items'][0]['vacant_reservations']))
    else:
        start=time.time()
        option = {
            'KeyConditionExpression': Key('ticketid').eq('livefestival'),
            'ConsistentRead' : True
        }  
        res = table.query(**option)    
        end=time.time()
        elapsed_time = end - start
        strong_rc_count+=1
        strong_rc_elapsed_time += elapsed_time
        print('Ticket Vacant Reservations: ' + str(res['Items'][0]['vacant_reservations']))

    if endflag==0:
        continue
    else:
        print('rc avg : ' + str((rc_elapsed_time * 1000)/ rc_count) + ' ms')
        print('strong rc avg : ' + str((strong_rc_elapsed_time * 1000)/ strong_rc_count) + ' ms')
        break

実行結果:

  • 強力な整合性の方がレイテンシが明らかに上がった。
# python3 get_ticket.py 
~~~
Ticket Vacant Reservations: 24
Ticket Vacant Reservations: 24
Ticket Vacant Reservations: 24
Ticket の空きが足りず予約できません
Strong Read Consistency : 1
Ticket Vacant Reservations: 1
rc avg : 32.80878495989424 ms
strong rc avg : 112.72108832070994 ms

ランキング付 - アトミックカウンター

やりたいこと

  • 何らかの処理が終了した時間のランキング付を DynamoDB で行いたい。
  • RDBMS のシーケンスのような連番を生成したいが、DynamoDB ではシーケンスや MySQL の Auto Increment の類は機能としてそのまま存在しない。
  • そのため、連番生成用の管理テーブルを作成して別テーブルに対して取得した連番でのランキング付の項目を登録。
  • アトミックカウンターという機能を用いることで、連番をインクリメントしつつその値をレスポンスとして得ることができる。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters

この UpdateItem オペレーションを使用して、アトミックカウンター (他の書き込みリクエストに干渉することなく無条件に増分される数値属性) を実装できます。(すべての書き込みリクエストは、受信された順に適用されます)。アトミックカウンターでは、更新はべき等ではありません。つまり、UpdateItem を呼び出すたびに数値はインクリメントされます。

デモ

  1. 連番管理テーブル作成
aws dynamodb delete-table --table-name sequence
aws dynamodb create-table \
    --table-name sequence \
    --attribute-definitions \
        AttributeName=tablename,AttributeType=S \
    --key-schema \
        AttributeName=tablename,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST
aws dynamodb put-item --table-name sequence \
--item '{ "tablename": { "S": "ranking" },"seq": { "N": "0" }}'
  1. ランキングテーブル作成
aws dynamodb delete-table --table-name ranking
aws dynamodb create-table \
    --table-name ranking \
    --attribute-definitions \
        AttributeName=rank,AttributeType=N \
    --key-schema \
        AttributeName=rank,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST
  1. テストワークロード
    • 並列スレッド(threads)で完了したスレッドから上位の順位付を行う。
    • 連番管理テーブルから連番をレスポンス(ReturnValues='UPDATED_NEW'が肝)で受け取り、ランキングテーブルにスレッド名と共に順位登録
    • 各スレッドはランダムな時間sleepするようにしている。
import sys
import boto3
import threading
import random
import time

dynamodb = boto3.resource('dynamodb')
seqtable=dynamodb.Table('sequence')
ranktable=dynamodb.Table('ranking')
threads=200

body={}
body['tablename'] = 'ranking'
body['seq'] = 0
seqtable.put_item(Item=body)

def next_seq(table,tablename):
    response=table.update_item(
        Key={
            'tablename' : tablename
        },
        UpdateExpression="add seq :val",
        ExpressionAttributeValues={
            ':val' : 1
        },
        ReturnValues='UPDATED_NEW'
    )
    return response['Attributes']['seq']

def worker():
    time.sleep(random.randint(0,30))
    nextseq=next_seq(seqtable,'ranking')
    body={}
    body['rank'] = nextseq
    body['worker'] = threading.currentThread().getName()
    ranktable.put_item(Item=body)

for i in range(threads):
    t = threading.Thread(target=worker)
    t.start()

以下のようなデータがランキングテーブルに結果として登録

チャットアプリのメッセージ保存

やりたいこと

  • チャットアプリのメッセージ保存をDynamoDB で行いたい。
  • 誰が受診したか、誰からのメッセージか、受診時刻、メッセージ本文を保存する。
  • この時、メッセージ本文のサイズがかなり大きくなることが想定されるのでできれば、メッセージ本文は例えばメッセージ一覧機能等で主に表示しそうな前述の情報とは別テーブルに切り出した方が良い。理由は、DynamoDB で消費する RCU は仮に表示するAttributeを絞ってもあくまでItem全体のサイズで計算されるのでメッセージを表示しなくてもメッセージサイズ分RCUを消費してしまい、性能的にもコスト的にもとんでもないデメリットになるから。

デモ

  • チャットアプリのデモというか、表示しないAttributeも含めてRCUが消費される点もデモ。
  1. 256KB のでかいメッセージを含めたItemを登録
import boto3
from boto3.dynamodb.conditions import Key
import json
import sys

startkey=1
item_count=100

dynamodb = boto3.resource('dynamodb',config=config)
table = dynamodb.Table('bigitem')

# 256 kb data
sample_data = '1234567890' * 25600

putlist=[]
for seq in range(startkey,item_count):
    body = {}
    body['id'] = seq
    body['col1'] = 'sample' + str(seq)
    body['col2'] = sample_data
    putlist.append(body)

    if len(putlist) >= 10:
        with table.batch_writer() as batch:
            print('key: ' + str(seq))
            for d in putlist:
                batch.put_item(Item=d)
        putlist=[]

with table.batch_writer() as batch:
    print('key: ' + str(seq))
    for d in putlist:
        batch.put_item(Item=d)
  1. 対象Itemを大規模Attributeを除いて検索
import boto3
from boto3.dynamodb.conditions import Key
import json
import sys
import time

key=1

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('bigitem')

args={}
args['KeyConditionExpression'] = Key('id').eq(key)
args['ProjectionExpression'] = 'id,col1'
args['ReturnConsumedCapacity'] = 'INDEXES'

res = table.query(**args)
print(res)

実行結果を見る限り,大規模Attributeサイズ分も含んだRCUである 'CapacityUnits': 31.5 が消費される。そのため、大規模なItemについては対象の Attribute 部分は別テーブルに切り出す工夫をした方が良いのでは。

{'Items': [{'id': Decimal('1'), 'col1': 'sample1'}], 'Count': 1, 'ScannedCount': 1, 'ConsumedCapacity': {'TableName': 'bigitem', 'CapacityUnits': 31.5, 'Table': {'CapacityUnits': 31.5}}, 'ResponseMetadata': {'RequestId': 'EQ552FRP4PDVOK1SNLURBBJ2DNVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Sat, 23 Jul 2022 16:00:51 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '173', 'connection': 'keep-alive', 'x-amzn-requestid': 'EQ552FRP4PDVOK1SNLURBBJ2DNVV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '1743303984'}, 'RetryAttempts': 0}}

参考資料

[1] Amazon DynamoDB テーブル設計と実践 Tips https://media.amazonwebservices.com/jp/summit2014/TA-10.pdf

Discussion

ログインするとコメントできます