[chalice] dynamodbの変更をトリガーにAPI Gatewayのwebsocketで通知する
背景
APIとは別系統のシステム(機械学習処理など)がDynamoDBの特定のItemを更新したときに、その更新をリアルタイムにユーザーへ通知したいという要件がありました。
既存のREST APIはAPI Gateway x Lambda x DynamoDBのサーバーレスな構成になっていて、インフラとアプリケーションコードの管理にはchaliceというAWS謹製のサーバーレス管理フレームワークを利用していました。
chaliceを使ったwebsocketの実装記事が少なく苦労したので共有します。
システムの全体像
DynamoDB Streamを使った変更検知
DynamoDB Streamという機能を利用すると、DynamoDBへの変更をリアルタイムに検知することができます。
dynamodb側の実装
dynamodbはCDKで構成管理しています。
# Define DynamoDB Table
table = dynamodb.Table(self,
f"sample-table-{stage}",
table_name=f"sample-table-{stage}",
partition_key=dynamodb.Attribute(
name="PK",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="SK",
type=dynamodb.AttributeType.STRING
),
removal_policy=RemovalPolicy.RETAIN,
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
)
StreamViewTypeは4種類あり、変更を検知したときに欲しい情報を指定できます。
- KEYS_ONLY
- 変更があったitemのkeyだけ取得
- NEW_IMAGE
- 変更後の値を取得
- OLD_IMAGE
- 変更前の値を取得
- NEW_AND_OLD_IMAGES
- 変更前後の値を取得
chalice側の実装
.chalice/policy-dev.json
LambdaからDynamoDBへのアクセス権を付与します.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:BatchGetItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev",
"arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/*"
]
},
.chalice/config.json
{
"version": "2.0",
"app_name": "sample",
"stages": {
"dev": {
"environment_variables": {
"TABLE_STREAM_ARN": "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/stream/2024-12-03T08:06:15.779"
}
},
},
}
app.py
from chalice.app import DynamoDBEvent
@app.on_dynamodb_record(stream_arn=os.environ['TABLE_STREAM_ARN'])
def handle_dynamodb_stream(event: DynamoDBEvent):
try:
for record in event:
if record.event_name != 'MODIFY':
return
pk = record.keys.get("PK", {}).get("S")
sk = record.keys.get("SK", {}).get("S")
# 該当するアイテム以外の場合は早期returnする
if 'target_pk' not in pk or 'target_sk' not in sk:
return
new_value = record.new_image.get("TargetAttribute", {}).get("N")
old_value = record.old_image.get("TargetAttribute", {}).get("N")
if new_value and new_value != old_value:
app.log.info('変更が検出されました!')
except Exception as e:
app.log.error(str(e))
Deployして動作確認する
chaliceをデプロイします
chalice deploy --profile <AWSのプロファイル名> --stage dev
chaliceのログを標準出力します
chalice logs --stage dev --profile <プロファイル名> --name handle_dynamodb_stream --follow
NoSQL Workbenchなどを使って任意のItemを変更してみましょう
chaliceのログに変更されたItemのbefore/afterが出力されたら成功です!
{
"Records": [
{
"eventID": "e5af4678419f06907d93c3c431a9ebce",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1733215301.0,
"Keys": {
"SK": { "S": "sample_sk#ae6679e0-2bca-4515-a944-314c6c481a08" },
"PK": { "S": "sample_pk#a5958171-9ba7-4fa4-bc9b-8645afc32be6" }
},
"NewImage": {
...
},
"OldImage": {
...
},
"SequenceNumber": "317099400000000078937565720",
"SizeBytes": 635,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/stream/2024-12-03T08:06:15.779"
}
]
}
chaliceでWebsocketエンドポイントの実装
connect用のLambda関数、disconnect用のLambda関数を実装していきます。
準備
app.py
from boto3.session import Session
from chalice import Chalice
from chalice.app import DynamoDBEvent, WebsocketDisconnectedError, WebsocketEvent
stage = os.getenv('STAGE', 'dev')
app = Chalice(app_name=f'sample-{stage}', debug=True)
app.websocket_api.session = Session()
app.experimental_feature_flags.update(['WEBSOCKETS'])
chaliceにおいてwebsocketはexperimentalな機能なので、feature_flagをONにする必要があります。
Lambdaにwebsocket関係の権限を付与します。
policy-dev.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "execute-api:ManageConnections",
"Resource": [
"arn:aws:execute-api:*:*:*/@connections/*"
]
}
]
}
connectした時にConnectionをDynamoDBに保存する
queryStringParameters
を使うことで任意のデータをクエリパラメータとして送信できます。
app.py
@app.on_ws_connect()
def connect(event: WebsocketEvent):
try:
query_params = event.to_dict().get('queryStringParameters', {})
sort_key = query_params.get('SK', 'undefined')
connection_id = event.connection_id
connection = Connection(
connection_id=connection_id,
sk=sort_key,
created_at=datetime.datetime.now().isoformat()
)
connection_use_cases = get_connection_use_cases()
connection_use_cases.create_connection(connection=connection)
except Exception as e:
app.log.error(str(e))
disconnectした時にConnectionをDynamoDBから削除する
論理削除にすると履歴がすごいことになりそうだったので物理削除しています。
disconnectの時はqueryStringParametersは使えません。
@app.on_ws_disconnect()
def disconnect(event: WebsocketEvent):
try:
connection_use_cases = get_connection_use_cases()
connection_use_cases.delete_connection(connection_id=event.connection_id)
except Exception as e:
app.log.error(str(e))
動作確認
APIをデプロイします。
chalice deploy
websocketのClientにはwscatを利用します。
brew install wscat
クエリパラメータにSK=hogeを渡しています。
wscat -c 'wss://<WebSocketID>.execute-api.ap-northeast-1.amazonaws.com/api/?SK=hoge'
Connectedと表示されたら成功です!
以下のコマンドでログを見れます。
uv run chalice logs --stage dev --profile xxx --name websocket_connect --follow
uv run chalice logs --stage dev --profile xxx --name websocket_disconnect --follow
DynamoDB Streamをトリガーにして接続中のConnectionに通知する
app.py
@app.on_dynamodb_record(stream_arn=os.environ['TABLE_STREAM_ARN'])
def handle_dynamodb_stream(event: DynamoDBEvent):
try:
for record in event:
if record.event_name != 'MODIFY':
return
pk = record.keys.get("PK", {}).get("S")
sk = record.keys.get("SK", {}).get("S")
# 該当するアイテム以外の場合は早期returnする
if 'target_pk' not in pk or 'target_sk' not in sk:
return
new_value = record.new_image.get("TargetAttribute", {}).get("N")
old_value = record.old_image.get("TargetAttribute", {}).get("N")
if new_value and new_value != old_value:
app.log.info('変更が検出されました!')
pk = pk.split('#')[1]
sk = sk.split('#')[1]
connection_use_cases = get_connection_use_cases()
connections: list[Connection] = connection_use_cases.get_connections(sk=sk)
# TODO: for文よくない
for connection in connections:
try:
# WebSocket APIを設定
# TODO: ドメイン名を直書きしてるのはよくない
app.websocket_api.configure(domain_name='xxx.execute-api.ap-northeast-1.amazonaws.com', stage='api')
message = {
"pk": pk,
"sk": sk,
"value": new_value,
}
app.websocket_api.send(connection.connection_id, json.dumps(message))
except WebsocketDisconnectedError:
app.log(f'Connection {connection.connection_id} disconnected')
except Exception as e:
app.log.error(str(e))
動作確認
デプロイします
chalice deploy
NoSQL Workbenchで適当なItemを更新します
websocketに繋いでいるターミナルに通知がきます!!
wscat -c 'wss://xxxxx.execute-api.ap-northeast-1.amazonaws.com/api/?sk=hogehoge'
Connected (press CTRL+C to quit)
< {"sk": "hogehoge", "pk": "fugafuga", "new_value": "90"}
Discussion