🔖

EventBridge Pipesを試しながらSQSのDLQを理解する

に公開

はじめに

業務で EventBridge Pipes を利用したときに、SQS の DLQ にうまく入らない時がありました。
結論から言うと、非同期呼び出しを Target に指定した場合、パイプとしては正常終了で終わってしまうため、エラー発生問わずメッセージの削除が行われていました。
今回、もう少し動作を理解したかったので検証しました。

Amazon SQS とは

今回、EventBridge Pipes の Source として指定します。

aws_sqs.drawio.png

手元でも試せるように AWS_CLI を実行します。

# メッセージの送信
$ aws sqs send-message --queue-url <QUEUE_URL> --message-body <MESSAGE_TEXT>

send-message のコマンドによりキューにメッセージを送ることができます。 送信直後のメッセージは他の利用者からも受信できる状態です。
CloudWatch メトリクスの ApproximateNumberOfMessagesVisible はこの Visible の数になり、積まれたメッセージ総数がわかります。

# メッセージの受信
$ aws sqs receive-message --queue-url <QUEUE_URL>

receive-message のコマンドによりメッセージを受け取ることができます。標準キューの場合、順序保証はありません。
受信対象となったメッセージは、SQS の可視性タイムアウトの時間まで他の利用者から見えない状態になります。
CloudWatch メトリクスの ApproximateNumberOfMessagesNotVisible はこの Not Visible の数になり、処理中のメッセージ総数がわかります。

# メッセージの削除
$ aws sqs delete-message --queue-url <QUEUE_URL> --receipt-handle <RECIPT_HANDLE>

delete-message はキューのメッセージを削除します。
受信したメッセージはそのまま残しておくと、可視性タイムアウトにより再度可視化され、再実行対象になります。 そのため、処理が完了した際にメッセージを削除する必要があります。
receive-message を実行すると RECIPT_HANDLE というプロパティがメッセージに含まれているので、これを指定して削除します。

逆に何かしらエラーが起きて、再実行したい場合はあえて削除しないことで再実行できます。
キューから削除しない場合、キューにメッセージが残り続けてしまわないようにいくつかの設定があります。

  • メッセージ保持期間 : この設定値を超えたメッセージは削除される。1分~14日で設定が可能。
  • DLQ(Dead Letter Queue) : 最大受信数を超えたメッセージはDLQとして設定したキューに移動される。1~1000で設定が可能。

EventBridge Pipes とは

https://aws.amazon.com/jp/eventbridge/pipes/

Source → Filter → Enrich(Transform) → Target のプロデューサーからコンシューマまでを繋げるサービスです。
Source にはSQS、Kinesis、DynamoDB 等、Target には StepFunctions や API Gateway、他の SQS に詰め直すこともできるようです。

Target で Lambda や StepFunctions を指定するときには、同期呼び出しと非同期呼び出しがあります。
ただし、StepFunctions の指定には注意が必要です。 StepFunctions には Standard と Express のワークフローが存在しています。
が、Pipes では Standard は非同期の呼び出ししかできず、同期的に実行が必要な場合は Express で作成する必要があります。

EventBridge Pipes に対しても DLQ の設定が可能です。ただし、Source によって挙動が変わります。

  • SQS : SQS の DLQ の設定を引き継ぐ
  • DynamoDB や Kinesis 等のストリームソース : SQS キュー と SNS トピック を設定可能

また、以下の注意があるようです。

Kinesis または DynamoDB ソースを含むパイプに DeadLetterConfig を指定する場合は、パイプの MaximumRecordAgeInSeconds プロパティがソースイベントの MaximumRecordAge プロパティよりも小さいことを確認してください。
MaximumRecordAgeInSeconds は、パイプポーラーがイベントをあきらめて DLQ に配信するタイミングを制御し、MaximumRecordAge は、メッセージが削除されるまでのソースストリームに表示される時間を制御します。
そのため、MaximumRecordAgeInSeconds は、イベントが DLQ に送信されてからソースによって自動的に削除されるまでの間に十分な時間があって、イベントが DLQ に送信された理由を判断できるように、ソース MaximumRecordAge よりも小さい値に設定します。

本題

Source は SQS 固定とします。簡単のため Target には Lambda を利用します。
が、元々 Lambda で SQS をトリガーに DLQ の設定も可能なので、シンプルな使い方であれば、実際には使わない構成かもしれません。

ソースコードを用意したので細かい設定の記載は省略します。
https://github.com/shikazuki/sqs-dlq-validation
後述しますが、うまくいかなかった点があり、検証のため Target を StepFunctions でラップしたものも試しています。

Lambda はエラーをハンドリングせず、失敗するようにします。

index.py
import json

def lambda_handler(event, context):
    # SQSから受け取ったイベントをログに記録
    print("Received event: ", json.dumps(event))

    # 意図的に失敗
    raise Exception("Simulated failure")

SQS の設定はとして、可視性タイムアウトは30秒としており、30秒空いた後に再度実行される想定です。
最大受信数は2回とし、最大数を超えると DLQ に移動する想定です。

resource "aws_sqs_queue" "main" {
  name                       = "${local.prefix}-sqs"
  message_retention_seconds  = 86400
  receive_wait_time_seconds  = 10
  visibility_timeout_seconds = 30 # 可視性タイムアウト
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 2 # 最大受信数
  })


  tags = {
    Name = "${local.prefix}-sqs"
  }
}

Lambda(同期実行)

なぜかうまくいかない

lambda_sync.png

Pipesのログは以下となっています。 TRACE レベルまで有効にしています。

Pollingに成功
{
    "resourceArn": "arn:aws:pipes:ap-northeast-1:xxxxxxxx:pipe/dlq-dev-pipe",
    "timestamp": 1736066349120,
    "executionId": "bbbbbb",
    "messageType": "PollingStageSucceeded",
    "logLevel": "TRACE"
}

Polling の開始から一番最後にあるログは TargetInvocationFailed で終了しています。
Lambda でハンドリングしていないエラーが発生していますが、 httpStatusCode は 200 でした。

Targetに失敗
{
    "resourceArn": "arn:aws:pipes:ap-northeast-1:xxxxxxxx:pipe/dlq-dev-pipe",
    "timestamp": 1736066349447,
    "executionId": "bbbbbb",
    "messageType": "TargetInvocationFailed",
    "logLevel": "ERROR",
    "error": {
        "message": "Target arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:dlq-dev-lambda encountered an error while processing event(s).",
        "httpStatusCode": 200,
        "awsService": "lambda",
        "requestId": "ccccc",
        "exceptionType": "BadRequest",
        "resourceArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:dlq-dev-lambda"
    },
    "awsRequest": "{ // JSON STRING } ",
    "awsResponse": "{ // JSON STRING } "
}

実行時間を見てもらうとわかるのですが、30秒の想定が7分ほどで再実行になっています。
存在期間が800秒程度になっており、想定通りに動いていません。
lambda_sync_pipe_log.png
lambda_sync_dlq.png
lambda_sync_old_age.png

納得できないがちゃんと動く

色々試したのですが、元々の設定からバッチウインドウの秒数を0秒としました。
元々、お試しで何となく2秒を設定していたのですが、SQS のデフォルトが0秒だったのでデフォルトと同じにしました。

  source_parameters {
    sqs_queue_parameters {
      batch_size                         = 1
-      maximum_batching_window_in_seconds = 2
+      maximum_batching_window_in_seconds = 0
    }
  }

lambda_sync_batch_size.png

想定通りに2回目の実行が30秒後になりました。
lambda_sync_batch_size_pipe_log.png

Lambda(非同期実行)

次に、呼び出しタイプを非同期実行にしてみます。

  target_parameters {
    lambda_function_parameters {
-      invocation_type = "REQUEST_RESPONSE"
+      invocation_type = "FIRE_AND_FORGET"
    }
  }

lambda_aync.png

実行は1度だけになります。
lambda_async_pipe_log.png
Lambda のログはエラーとなっています。
lambda_async_log.png

パイプラインとしては早々に TargetInvocationSucceeded のログが出ており、成功扱いとなっています。

{
    "resourceArn": "arn:aws:pipes:ap-northeast-1:xxxxxxxx:pipe/dlq-dev-pipe",
    "timestamp": 1736068193536,
    "executionId": "bbbbb",
    "messageType": "TargetInvocationSucceeded",
    "logLevel": "TRACE",
    "awsRequest": "{ // JSON STRING } ",
    "awsResponse": "{ // JSON STRING } "
}

成功扱いなのでメッセージの削除が行われ、DLQ にメッセージは移動しません。
lambda_async_delete.png
lambda_async_dlq.png

この場合は Lambda 側に失敗の送信先を作るか DLQ を設定する必要があります。

おわりに

maximum_batching_window_in_seconds の設定で想定通りに動かず、動くようにはなったんですがしっくりきていません。
しばらくしたらもう一度確認してみようかなと思います。

参考

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/choosing-workflow-type.html
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-available-cloudwatch-metrics.html
https://track3jyo.com/2023/01/pipes-with-sqs/
https://aws.amazon.com/jp/blogs/news/introducing-logging-support-for-amazon-eventbridge-pipes/
https://aws.amazon.com/jp/blogs/news/best-practices-for-writing-step-functions-terraform-projects/
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/terraform-sfn.html
https://docs.aws.amazon.com/ja_jp/eventbridge/latest/userguide/eb-pipes-error-troubleshooting.html

Discussion