LambdaのイベントソースマッピングでDynamoDBStreamイベントをフィルタする

2021/12/17に公開

Lambdaのイベントソースマッピングに対してイベントのフィルタが可能になりました。

https://dev.classmethod.jp/articles/event-source-mapping-support-filter-expression/

このうちフィルタ可能なイベントソースとしてはKinesis、DynamoDB Stream、SQSの3つです。このうちの1つ
DynamoDB Streamにて、Lambdaで処理したいイベントよりもたくさんのイベントが来ることが往々にあってフィルタしたいなーと常々思っていましたので、いいものが来たな!と思い導入しようと思いました。
ただ公式のドキュメントではdynamodbの例があまり記述されてなかったので実際に動かしてみたメモです。

DynamoDBのイベント

例えばDynamoDBStreamから以下のようなイベントが飛んでくるとします。

{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Partition": {
            "S": "user|5630fe5a-3cb2-b8fa-eab4-1df520b628f9"
          }
        },
        "NewImage": {
          "UserName": {
            "S": "John Doe"
          },
          "Partition": {
            "S": "user|5630fe5a-3cb2-b8fa-eab4-1df520b628f9"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:...",
      "eventSource": "aws:dynamodb"
    },

INSERTされたitemはPartitionKey "Partition"を持ち、その値として"user|5630fe5a-3cb2-b8fa-eab4-1df520b628f9"が含まれています。
ここでこのitemはUserに関する情報を保持しており、主キー("Partiion")は"user|"をprefixとして持つこととします。

ここでこれらのイベントを受けてLambda側で処理する場合、Userに関するItem以外のイベントは受信しないようにフィルターを設定します。

EventSourceMappingのCfnテンプレート

たとえば以下のようなEventSourceMappingをCfnテンプレートに定義します。yamlで書きます。
(Lambda, DynamoDBの定義は別にあるとします)

EventSourceMapping:
  Type: AWS::Lambda::EventSourceMapping
  Properties:
    Enabled: true
    EventSourceArn: !GetAtt DynamoDBTable.StreamArn
    FunctionName: !GetAtt Lambda.Arn
    BatchSize: 1
    FilterCriteria:
      Filters:
        - Pattern: "{\"dynamodb\": {\"Keys\": {\"Partition\": {\"S\": [{\"prefix\": \"user|\"}] }}}}"

解説

  • イベントソースに対するフィルタは"FilterCriteria"というpropertyに記述します。型はJSONです。
  • "Filters"に対して"Pattern"を配列で複数指定できます。ひとつのEventSourceMappingに対して5つの条件を指定できます。
  • "Pattern"にはフィルタの条件をjsonの文字列にして指定します。従ってダブルクォートの前にはエスケープが必要です。
  • DynamoDBStreamのItem属性のようにネストしているJsonオブジェクトに対しては上記の例のようにPatternに対してネストしたJsonオブジェクトを記述して、最後に条件式([{\"prefix\": \"user|\"}])を記述します。
  • Patternの中身をJsonとして抜き出すと以下のようになります。
    {
      "dynamodb" : {
        "Keys": {
          "Partition": {
            "S": [{"prefix": "user|"}]
          }
        }
      }
    }
    
    DynamoDBStreamから来る想定のイベントをそのまま抜き出して、フィルタしたい値の所だけ条件式に変換していることが分かります。

他にもいろいろな条件式を記述できますので、書き方は公式のドキュメントを参照してください。

https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html

追記

  • Cfn書くときはFn::ToJsonString 使うとyamlで書きやすい。( AWS::LanguageExtensions transformが必要)

まとめ

DynamoDBStreamイベントをフィルタする例を記述しました。
これ以外にKinesis, Sqsでもフィルタを設定できます。その場合、Patternで"dynamodb"とした箇所はそれぞれdatabodyになります。
基本的な書き方は前述の記述の仕方を踏襲すれば応用が効くと思うのでぜひお試しください。

Discussion