Lambdaのリトライ回数に応じて処理を変えたい
「AWS LambdaとServerless Advent Calendar 2022」3日目の記事です。
TL;DR
- 実行中のLambdaのリトライ回数を、その場で調べたい
- その1 DynamoDBを使う
- その2 CloudWatch Logsを確認する
 
リトライについて少し整理
呼び出し方によって、リトライの仕組みが違いますが、今回扱うのは「非同期実行のリトライ」です。
- 
Lambdaを直接呼び出すとき - リトライは起きないので、自力でエラー処理する
- コンソールのテスト実行もこちらに含まれる
 
- 
非同期実行 - EventBridge、S3、SNSなどから呼ばれたとき
- リトライは0-2回(デフォルト2)
- 待機時間 リトライ1回目の前に1分間、2回目の前に2分間
 
- 
イベントソースマッピング - Kinesis,DynamoDBなど
- Messageがexpireするまでバッチ全体をリトライ
 
- SQS
- Messageがexpireするまでバッチ全体をリトライ
- 終了したキューは削除することもできる
 
 
- Kinesis,DynamoDBなど
どのAWSサービスが非同期なのかは、こちらのドキュメントの表を参照してください。→他のサービスで AWS Lambda を使用するに表があります。
参考
- 
(docs.aws)エラー処理と AWS Lambda での自動再試行 
- 
(Youtube) AWS re:Invent 2020: Handling errors in a serverless world 
Lambdaのリトライ回数に応じて処理を変えたい
非同期呼び出しで、リトライ回数は0,1,2回を設定できます。何回目のリトライかによって分岐をさせたい、なんてこともあるのではないでしょうか。
ところが、Lambda関数は自身がリトライなのかどうかの情報を持っていません。「私はたぶん3人目だと思うから」とか言いません。
ということで、Lambda実行中に調べられないかとやってみました。ここでは
- DynamoDBに記録する方法
- CloudWatch Logsで調べる方法
 を紹介します。
DynamoDBで記録する
DynamoDBを作成する必要がありますが、こちらの方が簡単です。

Lambdaを実行すると、ユニークなrequest_idが発行されますが、これはリトライ時には同じものが使われます。このrequest_idをキーにしてDynamoDBに実行回数を保存して、問い合わせます。
テーブルを作る
CLIで作ります。名前は下で使うLambdaRetryCounterにしてあります。
aws dynamodb create-table \
--table-name LambdaRetryCounter \
--attribute-definitions AttributeName=Name,AttributeType=S \
--key-schema AttributeName=Name,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
Lambdaを作る
- Python3.9以上でデフォルトで作ります。
- パーミッションRoleに、DynamoDB用にAmazonDynamoDBFullAccessを追加します。
- 非同期呼び出しには、私はよくEventBridgeから起動していますが、SNSで呼び出したほうが早いかもしれません。
Codeでやっていること
- 非同期でLambdaを呼び出し
- DynamoDBにカウントを問い合わせ
- DynamoDBのカウンターを+1、EventTimeを更新
- Exceptionでエラー
from datetime import datetime
from zoneinfo import ZoneInfo
import boto3
tz = ZoneInfo("Asia/Tokyo")
dynamo_table_name = 'LambdaRetryCounter'
def lambda_retry_counter_dynamodb(name):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(dynamo_table_name)
    response = table.get_item(
        Key={"Name": name},
        AttributesToGet=['InvocationCount'],
    )
    if response.get('Item') is None:
        dynamo_cnt = 0
    else:
        dynamo_cnt = response['Item']['InvocationCount']
    table.update_item(
        Key={"Name": name},
        UpdateExpression='ADD InvocationCount :incr',
        ExpressionAttributeValues={':incr': 1},
    )
    table.update_item(
        Key={"Name": name},
        AttributeUpdates={
            'LastEventTime': {
                'Value': datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S%z"),
                'Action': 'PUT'
            }
        },
    )
    return dynamo_cnt
def lambda_handler(event, context):
    request_id = context.aws_request_id
    dynamodb_cnt = lambda_retry_counter_dynamodb(request_id)
    print("dynamodb_cnt", dynamodb_cnt)
    raise Exception
結果
CloudWatchを見てみると、print("dynamodb_cnt", dynamodb_cnt)でリトライカウントが0,1,2と出力されています。

DynamoDBのカウンターを見てみると、想定通りにカウントできています。このレコードは不要になるので、TTL 1時間で消してもいいかもしれません。

CloudWatch Logsを調べる
この方法では、追加のリソースは必要ありません。Lambda自身が直前に実行したログを、CloudWatch Logsストリームの中から調べます。

Lambdaを作る
- Pythonのデフォルトで作って、タイムアウトを変更します(例えば10秒)。
- パーミッションでCloudWatch用にCloudWatchLogsFullAccessを追加します。
- 非同期で呼び出すと、リトライしてカウントがCW logに書き込まれます。
このとき以下のようなことをやっています。
- 
get_log_eventsでLambda自身のログをCloudWatch Logsに読みに行く。
- ログの中で、自身のユニークなリクエストIDを使ってf"END RequestId: {request_id}"を探し、これが見つかればリトライ1回とする。
- 
get_log_eventsではCWのデータを見るときに空の結果が返ることがある、という問題(仕様)がある。その回避方法として、nextForwardTokenが2回同じものが出るまで繰り返し、出力はsetとfrozenを使って同一項目を削除する。(空のログ取得を防ぐ方法はこちらを参考にしました。)
- Exceptionでエラー
import time
import boto3
client = boto3.client("logs")
def get_log_streams(log_group_name):
    response = client.describe_log_streams(
        logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=1
    )
    log_stream_names = [i["logStreamName"] for i in response["logStreams"]]
    return log_stream_names
def get_lambda_retry_count(request_id, log_stream_name, log_group_name, search_words):
    next_token = None
    log_events_set = set()
    while True:
        args = {
            "logGroupName": log_group_name,
            "logStreamName": log_stream_name,
            "startFromHead": True,
        }
        if next_token is not None:
            args["nextToken"] = next_token
        response = client.get_log_events(**args)
        for log_event in response["events"]:
            key = frozenset(log_event.items())
            log_events_set.add(key)
        new_next_token = response.get("nextForwardToken")
        if new_next_token is None:
            break
        elif next_token == new_next_token:
            break  # End of the stream
        else:
            next_token = new_next_token
    retry_cnt = 0
    for i in log_events_set:
        d = dict(i)  # from frozen to dict
        if search_words in d["message"]:
            retry_cnt += 1
    return retry_cnt
def lambda_handler(event, context):
    time.sleep(5)  # Wait: log group created
    request_id = context.aws_request_id
    search_words = f"END RequestId: {request_id}"
    log_group_name = f"/aws/lambda/{context.function_name}"
    log_stream_names = get_log_streams(log_group_name)
    log_stream_name = log_stream_names[0]
    retry_count = get_lambda_retry_count(
        request_id,
        log_stream_name,
        log_group_name,
        search_words,
    )
    print("retry_count:", retry_count)
    raise Exception
結果
こういう結果になるはずです。

カウントされていますね!これでリトライ回数に応じて処理を変えることができそうです。
参考
How to query cloudwatch logs using boto3 in python
リトライをboto3で調べるネタ。こちらはget_query_resultsを使っています。最初この実装を試していたのですが、うまくいかずget_log_eventsに変えました。
まとめ
Lambdaのエラー処理についてまとめたり、リトライ回数を調べる方法について紹介しました。
おまけ
以前のものですが、エラー処理のことを書いた記事がややウケたのでご紹介。
Lambdaがタイムアウトしてエラーになるときは、終了してしまうのでそのLambdaを使ったエラー処理はできません。どうするのが良いか、と考えて書いた記事がこちらです。




Discussion