🐙

AWS EventBridgeでDLQからの再送

4 min read

S3へのPutObjectイベントをCloudTrail経由でEventBridgeに流し、EventBridgeからLambdaを起動し、Lambdaが失敗するとSQS DeadLetter Queue(以下DLQ)に流れるようにしています。

DLQに流れたメッセージをもう一度再実行する方法をまとめました。

LambdaからSQS Deadletter

Lambdaが失敗すると、そのメッセージが指定したDLQに渡されます。この処理自体はMangedです。

SQS DLQ の中身

DLQに渡されてきたメッセージはこのような感じです(長いので適当に削っています)。これは通常のCloudwatch Eventと同じ内容です。

{
  "version": "0",
  "id": "aaaaaaa-dddd-fddd3-a18f-bbbbbbbb",
  "detail-type": "AWS API Call via CloudTrail",
  "source": "aws.s3",
  "account": "123456789000",
  "time": "2021-04-07T19:16:15Z",
  "region": "ap-northeast-1",
  "resources": [],
  "detail": {
    "eventTime": "2021-04-07T19:16:15Z",
    "eventSource": "s3.amazonaws.com",
    "eventName": "PutObject",
    "requestParameters": {
      "bucketName": "foo-bar-bucket",
      "Host": "foo-bar-bucket.s3.ap-northeast-1.amazonaws.com",
      "key": "some/file/path/foo.txt"
    },
    "eventID": "aaacd189-e580-aaaa-bbbbb-ddddd",
    "resources": [
      {
        "type": "AWS::S3::Object",
        "ARN": "arn:aws:s3:::foo-bar-bucket/some/file/path/foo.txt"
      },
      {
        "accountId": "123456789000",
        "type": "AWS::S3::Bucket",
        "ARN": "arn:aws:s3:::foo-bar-bucket"
      }
    ],
    "eventType": "AwsApiCall",
  }
}

SQS DLQから取り出して、再びEventBridgeに流す

リトライをするには、このメッセージを再びEventBridgeに流してあげればよいわけです。

DLQ内のメッセージからtimeやdetail等を取り出します。Detail内以下のメッセージはエスケープして一つの文字列としてDetailに入れます。

ここで重要なのは source です。元々の sourceaws.s3 でしたが、このsourceはawsのみが指定できる値であり、普通に指定してしまうと NotAuthorizedForSourceException Not authorized for the source. というエラーが出ます。

そのため、 sourceretry に変更してentryを組み立てます。もしも受信のイベントルールにこの source を指定していたら、sourceが retry という別のルールを作成する必要があります。しかし、ルールの指定が "eventSource": ["s3.amazonaws.com"] だけの指定であれば、ルールに変更は必要ありません。ちなみに、EventBridge スキーマレジストリの自動検出を使うならsourceには retry-s3 などとDetailの中身をもう少し細かく指定しておいたほうが安全かもしれません。

結果として以下のようなentryになります。

[
  {
    "Time": "2021-04-07T19:16:15Z",
    "Source": "retry",
    "Resources": [],
    "DetailType": "AWS API Call via CloudTrail",
    "Detail": "{\"eventVersion\": \"1.08\", \"eventTime\": \"2021-04-07T19:16:15Z\", \"eventSource\": \"s3.amazonaws.com\", \"eventName\": \"PutObject\", \"requestParameters\": {\"bucketName\": \"foo-bar-bucket\", \"Host\": \"foo-bar-bucket.s3.ap-northeast-1.amazonaws.com\", \"key\": \"some/file/path/foo.txt\"}, \"eventID\": \"aaacd189-e580-aaaa-bbbbb-ddddd\", \"resources\": [{\"type\": \"AWS::S3::Object\", \"ARN\": \"arn:aws:s3:::foo-bar-bucket/some/file/path/foo.txt\"}, {\"accountId\": \"123456789000\", \"type\": \"AWS::S3::Bucket\", \"ARN\": \"arn:aws:s3:::foo-bar-bucket\"}], \"eventType\": \"AwsApiCall\", \"managementEvent\": false, \"recipientAccountId\": \"123456789000\", \"eventCategory\": \"Data\"}"
  }
]

出来上がったJSONは、例えばaws cliからなら以下のコマンドでdefaultのEventBridgeに送出します。

aws events put-events --entries file://resend.json

Lambdaが再び起動され、しかもそのsourceが retry になっていれば成功です。

python実装

上記説明してきた動作を実装したのが以下のコードです。

def new_entry(msg: Dict, source: str) -> Dict:
    """CloudWatch Event用のEntryを作成します"""
    return {
        "Time": msg["time"],
        "Source": source,
        "Resources": msg["resources"],
        "DetailType": msg["detail-type"],
        "Detail": json.dumps(msg["detail"]),
    }

def resend(queue_name: str):
    """DLQからメッセージを取り出してCloudWatch Eventに投げます"""
    queue = sqs.get_queue_by_name(QueueName=queue_name)
    retry_source_name = "retry"
    while True:
        messages_to_delete = []
        entries = []
        for message in queue.receive_messages(MaxNumberOfMessages=10):
            entries.append(new_entry(json.loads(message.body), retry_source_name))
            messages_to_delete.append(
                {"Id": message.message_id, "ReceiptHandle": message.receipt_handle}
            )
        # メッセージがないならキューが空
        if len(messages_to_delete) == 0:
            break

        # CloudWatchEventに送る
        res = cloudwatch_events.put_events(Entries=entries)
        if res["FailedEntryCount"] > 0:
            logger.error("put events failed", extra={"response": res})
            continue
        delete_res = queue.delete_messages(Entries=messages_to_delete)
        if "Failed" in delete_res:
            logger.error("delete message failed", extra={"response": delete_res})

Discussion

ログインするとコメントできます