🍰

[chalice] dynamodbの変更をトリガーにAPI Gatewayのwebsocketで通知する

2024/12/12に公開

背景

APIとは別系統のシステム(機械学習処理など)がDynamoDBの特定のItemを更新したときに、その更新をリアルタイムにユーザーへ通知したいという要件がありました。

既存のREST APIはAPI Gateway x Lambda x DynamoDBのサーバーレスな構成になっていて、インフラとアプリケーションコードの管理にはchaliceというAWS謹製のサーバーレス管理フレームワークを利用していました。

chaliceを使ったwebsocketの実装記事が少なく苦労したので共有します。

システムの全体像

DynamoDB Streamを使った変更検知

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.html
https://aws.amazon.com/jp/blogs/developer/aws-chalice-now-supports-amazon-kinesis-and-amazon-dynamodb-streams/

DynamoDB Streamという機能を利用すると、DynamoDBへの変更をリアルタイムに検知することができます。

dynamodb側の実装

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_StreamSpecification.html

dynamodbはCDKで構成管理しています。

# Define DynamoDB Table
table = dynamodb.Table(self,
    f"sample-table-{stage}",
    table_name=f"sample-table-{stage}",
    partition_key=dynamodb.Attribute(
        name="PK",
        type=dynamodb.AttributeType.STRING
    ),
    sort_key=dynamodb.Attribute(
        name="SK",
        type=dynamodb.AttributeType.STRING
    ),
    removal_policy=RemovalPolicy.RETAIN,
    stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
)

StreamViewTypeは4種類あり、変更を検知したときに欲しい情報を指定できます。

  • KEYS_ONLY
    • 変更があったitemのkeyだけ取得
  • NEW_IMAGE
    • 変更後の値を取得
  • OLD_IMAGE
    • 変更前の値を取得
  • NEW_AND_OLD_IMAGES
    • 変更前後の値を取得

chalice側の実装

https://aws.github.io/chalice/api.html#Chalice.on_dynamodb_record

.chalice/policy-dev.json

LambdaからDynamoDBへのアクセス権を付与します.

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "dynamodb:GetItem",
          "dynamodb:PutItem",
          "dynamodb:UpdateItem",
          "dynamodb:DeleteItem",
          "dynamodb:BatchGetItem",
          "dynamodb:Query",
          "dynamodb:Scan",
          "dynamodb:GetRecords",
          "dynamodb:GetShardIterator",
          "dynamodb:DescribeStream",
          "dynamodb:ListStreams"
        ],
        "Resource": [
          "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev",
          "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/*"
        ]
      },

.chalice/config.json

{
  "version": "2.0",
  "app_name": "sample",
  "stages": {
    "dev": {
      "environment_variables": {
        "TABLE_STREAM_ARN": "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/stream/2024-12-03T08:06:15.779"
      }
    },
  },
}

app.py

from chalice.app import DynamoDBEvent


@app.on_dynamodb_record(stream_arn=os.environ['TABLE_STREAM_ARN'])
def handle_dynamodb_stream(event: DynamoDBEvent):
    try:
        for record in event:
            if record.event_name != 'MODIFY':
                return 
            
            pk = record.keys.get("PK", {}).get("S")
            sk = record.keys.get("SK", {}).get("S")

            # 該当するアイテム以外の場合は早期returnする
            if 'target_pk' not in pk or 'target_sk' not in sk:
                return
            
            new_value = record.new_image.get("TargetAttribute", {}).get("N")
            old_value = record.old_image.get("TargetAttribute", {}).get("N")

            if new_value and new_value != old_value:
                app.log.info('変更が検出されました!')
    except Exception as e:
        app.log.error(str(e))

Deployして動作確認する

chaliceをデプロイします

chalice deploy --profile <AWSのプロファイル名> --stage dev

chaliceのログを標準出力します

chalice logs --stage dev --profile <プロファイル名> --name handle_dynamodb_stream --follow

NoSQL Workbenchなどを使って任意のItemを変更してみましょう

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/workbench.html

chaliceのログに変更されたItemのbefore/afterが出力されたら成功です!

{
  "Records": [
    {
      "eventID": "e5af4678419f06907d93c3c431a9ebce",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1733215301.0,
        "Keys": {
          "SK": { "S": "sample_sk#ae6679e0-2bca-4515-a944-314c6c481a08" },
          "PK": { "S": "sample_pk#a5958171-9ba7-4fa4-bc9b-8645afc32be6" }
        },
        "NewImage": {
          ...
        },
        "OldImage": {
          ...
        },
        "SequenceNumber": "317099400000000078937565720",
        "SizeBytes": 635,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:123456789:table/sample-table-dev/stream/2024-12-03T08:06:15.779"
    }
  ]
}

chaliceでWebsocketエンドポイントの実装

https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/websocket-api-chat-app.html

connect用のLambda関数、disconnect用のLambda関数を実装していきます。

準備

https://aws.github.io/chalice/topics/websockets.html

app.py

from boto3.session import Session
from chalice import Chalice
from chalice.app import DynamoDBEvent, WebsocketDisconnectedError, WebsocketEvent

stage = os.getenv('STAGE', 'dev')

app = Chalice(app_name=f'sample-{stage}', debug=True)
app.websocket_api.session = Session()
app.experimental_feature_flags.update(['WEBSOCKETS'])

chaliceにおいてwebsocketはexperimentalな機能なので、feature_flagをONにする必要があります。

Lambdaにwebsocket関係の権限を付与します。
policy-dev.json

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": "execute-api:ManageConnections",
        "Resource": [
            "arn:aws:execute-api:*:*:*/@connections/*"
        ]
      }
    ]
  }
  

connectした時にConnectionをDynamoDBに保存する

https://aws.github.io/chalice/tutorials/index.html#websocket-tutorial
queryStringParametersを使うことで任意のデータをクエリパラメータとして送信できます。

app.py

@app.on_ws_connect()
def connect(event: WebsocketEvent):
    try:
        query_params = event.to_dict().get('queryStringParameters', {})
        sort_key = query_params.get('SK', 'undefined')
        connection_id = event.connection_id
        connection = Connection(
            connection_id=connection_id,
            sk=sort_key,
            created_at=datetime.datetime.now().isoformat()
        )
        connection_use_cases = get_connection_use_cases()
        connection_use_cases.create_connection(connection=connection)
    except Exception as e:
        app.log.error(str(e))

disconnectした時にConnectionをDynamoDBから削除する

論理削除にすると履歴がすごいことになりそうだったので物理削除しています。
disconnectの時はqueryStringParametersは使えません。

@app.on_ws_disconnect()
def disconnect(event: WebsocketEvent):
    try:
        connection_use_cases = get_connection_use_cases()
        connection_use_cases.delete_connection(connection_id=event.connection_id)
    except Exception as e:
        app.log.error(str(e))

動作確認

APIをデプロイします。

chalice deploy

websocketのClientにはwscatを利用します。

brew install wscat

クエリパラメータにSK=hogeを渡しています。

wscat -c 'wss://<WebSocketID>.execute-api.ap-northeast-1.amazonaws.com/api/?SK=hoge'

Connectedと表示されたら成功です!

以下のコマンドでログを見れます。

uv run chalice logs --stage dev --profile xxx --name websocket_connect --follow

uv run chalice logs --stage dev --profile xxx --name websocket_disconnect --follow

DynamoDB Streamをトリガーにして接続中のConnectionに通知する

app.py

@app.on_dynamodb_record(stream_arn=os.environ['TABLE_STREAM_ARN'])
def handle_dynamodb_stream(event: DynamoDBEvent):
    try:
        for record in event:
            if record.event_name != 'MODIFY':
                return 
            
            pk = record.keys.get("PK", {}).get("S")
            sk = record.keys.get("SK", {}).get("S")

            # 該当するアイテム以外の場合は早期returnする
            if 'target_pk' not in pk or 'target_sk' not in sk:
                return
            
            new_value = record.new_image.get("TargetAttribute", {}).get("N")
            old_value = record.old_image.get("TargetAttribute", {}).get("N")

            if new_value and new_value != old_value:
                app.log.info('変更が検出されました!')
                pk = pk.split('#')[1]
                sk = sk.split('#')[1]
                connection_use_cases = get_connection_use_cases()
                connections: list[Connection] = connection_use_cases.get_connections(sk=sk)
                # TODO: for文よくない
                for connection in connections:
                    try:
                        # WebSocket APIを設定
                        # TODO: ドメイン名を直書きしてるのはよくない
                        app.websocket_api.configure(domain_name='xxx.execute-api.ap-northeast-1.amazonaws.com', stage='api')
                        message = {
                            "pk": pk,
                            "sk": sk,
                            "value": new_value,
                        }
                        app.websocket_api.send(connection.connection_id, json.dumps(message))
                    except WebsocketDisconnectedError:
                        app.log(f'Connection {connection.connection_id} disconnected')
    except Exception as e:
        app.log.error(str(e))

動作確認

デプロイします

chalice deploy

NoSQL Workbenchで適当なItemを更新します

websocketに繋いでいるターミナルに通知がきます!!

wscat -c 'wss://xxxxx.execute-api.ap-northeast-1.amazonaws.com/api/?sk=hogehoge'
Connected (press CTRL+C to quit)
< {"sk": "hogehoge", "pk": "fugafuga", "new_value": "90"}

Discussion