DynamoDBのアトミックカウンターを利用した排他制御の実装方法
はじめに
AWSのEventBridgeやSQSの標準キューなどのメッセージを元にLambdaが実行されるようなイベント駆動アプリケーションを開発する際に注意しなければならないのは「少なくとも1回の配信が行われる。」という仕様で、同じリクエストが複数回配信される可能性があるという点です。べき等性(何度実行しても同じ結果になること)が担保されていれば問題はないのですが、複数回実行されてしまうとデータが不整合を起こす場合は、その他のLambdaの実行については排他制御しなければなりません。実行されている処理の実行履歴などを見て、すでに処理が進行しているのか確認する必要があるのですが、その確認行為自体が全く同時刻に実行されると、処理が複数回実行されてしまう可能性があります。そこで、DynamoDBのアトミックカウンターを使うことで処理が重複する可能性をなくすことができます。
アトミックカウンターとは
DynamoDBのアトミックカウンターは、テーブル内の特定の項目の数値属性を同時に更新できる機能です。これにより、複数のクライアントが同時に同じ属性をインクリメントまたはデクリメントする際にデータの一貫性を保つことができます。
※アトミックカウンターはUpdateItem操作でしか使えません。
要件定義
- EventBridge -> Lambdaのアーキテクチャで排他制御されたアプリケーションを実装する
- 処理の実行履歴はDynamoDBテーブルに保存する
- EventBridgeのアーカイブリプレイ機能で、失敗で終わっている処理のみ再度実行されるようにする
- DynamoDBテーブルは基本的にイミュータブルに保存(ただし、同時刻に同じパーティションキーで複数の処理開始が起こった際のみ更新が発生する)
テーブル定義
カラム名 | 説明 |
---|---|
partition_key[S] | パーティションキー。一つのイベント処理中の履歴は同じパーティションキーを使う。Lambdaで生成せずに、ユニークな値をリクエストで受け取る。 |
sequence[N] | ソートキー。同じpartition_keyの中で作成された順に昇順になるようにLambdaで値を生成。 |
atomic_counter[N] | アトミックカウンター。UpdateItemでは「ADD 1」で、PutItemでは1を保存 |
exec_status[S] | 実行開始時はstarted、成功時はsucceeded、失敗時はfailed |
created_at[S] | データ作成時間 |
実際のデータ投入後のイメージ
条件:排他制御なし、重複実行なし
Lambdaが1度だけinvokeし、処理が失敗した場合、以下のようなデータになります。
partition_key | sequence | atomic_counter | exec_status | created_at |
---|---|---|---|---|
job1 | 1 | 1 | started | 2024-08-26T23:02Z |
job1 | 2 | 1 | failed | 2024-08-26T23:13Z |
条件:排他制御なし、重複実行なし
この後、EventBridgeのアーカイブリプレイを2024-08-26T23:02Z〜2024-08-26T23:15Zの時間帯で実行して今度は成功で終了したとします。すると以下のようなデータになります。
partition_key | sequence | atomic_counter | exec_status | created_at |
---|---|---|---|---|
job1 | 1 | 1 | started | 2024-08-26T23:02Z |
job1 | 2 | 1 | failed | 2024-08-26T23:13Z |
job1 | 3 | 1 | started | 2024-08-26T23:22Z |
job1 | 4 | 1 | succeeded | 2024-08-26T23:26Z |
条件:排他制御なし、重複実行あり
ここでもし、排他制御がされていない状態で、上記のリプレイでjob1のイベントが2回配信されていて、かつLambdaが2回とも同じ時間でinvokeしていた場合ですが、PutItemのみで履歴の登録を行った場合、上記データと同じ登録になってしまいます。PutItemは、同じプライマリキーに対しては最後の書き込みが勝つからです。(Last Write Wins)
排他制御を実装する
まずは処理を続行してよいか確認するためにパーティションキーがjob1のものをsequence降順で全て取得します。
response = table.query(
KeyConditionExpression=(
boto3.dynamodb.conditions.Key('partition_key')
.eq('job1')
),
ScanIndexForward=False # Falseにすることで降順になります
)
items = response['Items']
取り出した最初の要素のexec_statusが、startedかsucceededだった場合は、すでに他のLambdaが処理を行っていたことがわかるので、処理を終了します。ここまでが従来の一般的な排他制御と言われるものかなと思います。
アトミックカウンターで最終確認
テーブルにpartition_key=job1, sequence=2まで登録されていた場合で考えます。
前項で重複実行がないことを確認しているので、処理開始の履歴を作成します。前項で取得した履歴から、最新のsequenceが2であることがわかっているため、次に登録するデータのsequenseは+1した値である3になります。
この開始の処理だけはPutItemではなく、UpdateItemを使います。アトミックカウンターを使うためです。
response = table.update_item(
Key={
'partition_key': 'job1',
'sequence': 3
},
UpdateExpression='ADD atomic_counter :incr SET exec_status = :status, created_at = :created_at',
ExpressionAttributeValues={
':incr': 1,
':status': 'started',
':created_at': create_at
},
ReturnValues='ALL_NEW'
)
# 更新後のatomic_counterの値を取得
atomic_counter = response.get('Attributes', {}).get('atomic_counter', 0)
もしこの処理をすでに実行しているLambdaがあり、実行順序的に後になった場合は、取得したatomic_counterの値は1ではなく、2になります。先に実行されたLambdaが+1の更新を行っているためです。
同時に実行しても同じ値には決してならないため、原子性が保たれるわけです。
返ってきた値が1ではなかった場合は、処理を終了すれば、その後の処理が重複して実行されることはありません。
ポイント
Lambdaでパーティションキーを生成しない理由
EventBridgeのアーカイブリプレイで、すでに実行されている処理かどうか判別するためです、毎回パーティションキーをLambdaが生成するロジックにしてしまうと、前回のパーティションキーがなんだったのかイベントの内容をみても判断することができないためです。
イミュータブル(変更不可)なテーブル設計にしている理由
ミュータブル(変更可)にして、1回のイベントで1レコード中のexec_statusを更新していく方式にしてしまうと、EventBridgeのアーカイブリプレイの度にatomic_counterが更新されてしまい、atomic_counterの値がいくつのときが重複したことになるか、判断できないからです。
何回失敗したのかなど、中間の履歴も失われてしまうので、実行履歴はアトミックカウンターの有無に関係なく、イミュータブル設計が妥当なのかなと思います。
イミュータブルといいつつ、重複が起こってatomic_counterが2になったときだけは、更新されているところが気持ち悪いところです・・・・・・・・。
イベントごとの結果一覧が欲しい場合はこの設計では無理
今回の要件ではイベントごとの結果一覧を取得したいシーンは想定されていないため、考慮されていません。そういった要件の際は別途GSIなどの設計が必要になります。
まとめ
完全な排他制御の実装は、意外と難易度が高かったですが、今回の実装で実現できたのではないかと思います。
Discussion