🧛

Lambdaのリトライ回数に応じて処理を変えたい

2022/12/03に公開

AWS LambdaとServerless Advent Calendar 2022」3日目の記事です。

TL;DR

  • 実行中のLambdaのリトライ回数を、その場で調べたい
    • その1 DynamoDBを使う
    • その2 CloudWatch Logsを確認する

リトライについて少し整理

呼び出し方によって、リトライの仕組みが違いますが、今回扱うのは「非同期実行のリトライ」です。

  • Lambdaを直接呼び出すとき

    • リトライは起きないので、自力でエラー処理する
    • コンソールのテスト実行もこちらに含まれる
  • 非同期実行

    • EventBridge、S3、SNSなどから呼ばれたとき
    • リトライは0-2回(デフォルト2)
    • 待機時間 リトライ1回目の前に1分間、2回目の前に2分間
  • イベントソースマッピング

    • Kinesis,DynamoDBなど
      • Messageがexpireするまでバッチ全体をリトライ
    • SQS
      • Messageがexpireするまでバッチ全体をリトライ
      • 終了したキューは削除することもできる

どのAWSサービスが非同期なのかは、こちらのドキュメントの表を参照してください。→他のサービスで AWS Lambda を使用するに表があります。

参考

Lambdaのリトライ回数に応じて処理を変えたい

非同期呼び出しで、リトライ回数は0,1,2回を設定できます。何回目のリトライかによって分岐をさせたい、なんてこともあるのではないでしょうか。

ところが、Lambda関数は自身がリトライなのかどうかの情報を持っていません。「私はたぶん3人目だと思うから」とか言いません。

ということで、Lambda実行中に調べられないかとやってみました。ここでは

  • DynamoDBに記録する方法
  • CloudWatch Logsで調べる方法
    を紹介します。

DynamoDBで記録する

DynamoDBを作成する必要がありますが、こちらの方が簡単です。

Lambdaを実行すると、ユニークなrequest_idが発行されますが、これはリトライ時には同じものが使われます。このrequest_idをキーにしてDynamoDBに実行回数を保存して、問い合わせます。

テーブルを作る

CLIで作ります。名前は下で使うLambdaRetryCounterにしてあります。

aws dynamodb create-table \
--table-name LambdaRetryCounter \
--attribute-definitions AttributeName=Name,AttributeType=S \
--key-schema AttributeName=Name,KeyType=HASH \
--billing-mode PAY_PER_REQUEST

Lambdaを作る

  • Python3.9以上でデフォルトで作ります。
  • パーミッションRoleに、DynamoDB用にAmazonDynamoDBFullAccessを追加します。
  • 非同期呼び出しには、私はよくEventBridgeから起動していますが、SNSで呼び出したほうが早いかもしれません。

Codeでやっていること

  • 非同期でLambdaを呼び出し
  • DynamoDBにカウントを問い合わせ
  • DynamoDBのカウンターを+1、EventTimeを更新
  • Exceptionでエラー
get_dynamodb_count_handler.py
from datetime import datetime
from zoneinfo import ZoneInfo

import boto3

tz = ZoneInfo("Asia/Tokyo")
dynamo_table_name = 'LambdaRetryCounter'


def lambda_retry_counter_dynamodb(name):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(dynamo_table_name)

    response = table.get_item(
        Key={"Name": name},
        AttributesToGet=['InvocationCount'],
    )
    if response.get('Item') is None:
        dynamo_cnt = 0
    else:
        dynamo_cnt = response['Item']['InvocationCount']

    table.update_item(
        Key={"Name": name},
        UpdateExpression='ADD InvocationCount :incr',
        ExpressionAttributeValues={':incr': 1},
    )

    table.update_item(
        Key={"Name": name},
        AttributeUpdates={
            'LastEventTime': {
                'Value': datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S%z"),
                'Action': 'PUT'
            }
        },
    )
    return dynamo_cnt


def lambda_handler(event, context):

    request_id = context.aws_request_id
    dynamodb_cnt = lambda_retry_counter_dynamodb(request_id)
    print("dynamodb_cnt", dynamodb_cnt)

    raise Exception

結果

CloudWatchを見てみると、print("dynamodb_cnt", dynamodb_cnt)でリトライカウントが0,1,2と出力されています。

DynamoDBのカウンターを見てみると、想定通りにカウントできています。このレコードは不要になるので、TTL 1時間で消してもいいかもしれません。


CloudWatch Logsを調べる

この方法では、追加のリソースは必要ありません。Lambda自身が直前に実行したログを、CloudWatch Logsストリームの中から調べます。

Lambdaを作る

  • Pythonのデフォルトで作って、タイムアウトを変更します(例えば10秒)。
  • パーミッションでCloudWatch用にCloudWatchLogsFullAccessを追加します。
  • 非同期で呼び出すと、リトライしてカウントがCW logに書き込まれます。

このとき以下のようなことをやっています。

  • get_log_eventsでLambda自身のログをCloudWatch Logsに読みに行く。
  • ログの中で、自身のユニークなリクエストIDを使ってf"END RequestId: {request_id}"を探し、これが見つかればリトライ1回とする。
  • get_log_eventsではCWのデータを見るときに空の結果が返ることがある、という問題(仕様)がある。その回避方法として、nextForwardTokenが2回同じものが出るまで繰り返し、出力はsetとfrozenを使って同一項目を削除する。(空のログ取得を防ぐ方法はこちらを参考にしました。)
  • Exceptionでエラー
get_log_event_handler.py
import time

import boto3

client = boto3.client("logs")


def get_log_streams(log_group_name):

    response = client.describe_log_streams(
        logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=1
    )
    log_stream_names = [i["logStreamName"] for i in response["logStreams"]]

    return log_stream_names


def get_lambda_retry_count(request_id, log_stream_name, log_group_name, search_words):

    next_token = None
    log_events_set = set()

    while True:
        args = {
            "logGroupName": log_group_name,
            "logStreamName": log_stream_name,
            "startFromHead": True,
        }
        if next_token is not None:
            args["nextToken"] = next_token

        response = client.get_log_events(**args)

        for log_event in response["events"]:
            key = frozenset(log_event.items())
            log_events_set.add(key)

        new_next_token = response.get("nextForwardToken")

        if new_next_token is None:
            break
        elif next_token == new_next_token:
            break  # End of the stream
        else:
            next_token = new_next_token

    retry_cnt = 0
    for i in log_events_set:
        d = dict(i)  # from frozen to dict
        if search_words in d["message"]:
            retry_cnt += 1

    return retry_cnt


def lambda_handler(event, context):

    time.sleep(5)  # Wait: log group created

    request_id = context.aws_request_id
    search_words = f"END RequestId: {request_id}"

    log_group_name = f"/aws/lambda/{context.function_name}"
    log_stream_names = get_log_streams(log_group_name)
    log_stream_name = log_stream_names[0]

    retry_count = get_lambda_retry_count(
        request_id,
        log_stream_name,
        log_group_name,
        search_words,
    )
    print("retry_count:", retry_count)

    raise Exception

結果

こういう結果になるはずです。

カウントされていますね!これでリトライ回数に応じて処理を変えることができそうです。

参考

How to query cloudwatch logs using boto3 in python
リトライをboto3で調べるネタ。こちらはget_query_resultsを使っています。最初この実装を試していたのですが、うまくいかずget_log_eventsに変えました。

まとめ

Lambdaのエラー処理についてまとめたり、リトライ回数を調べる方法について紹介しました。

おまけ

以前のものですが、エラー処理のことを書いた記事がややウケたのでご紹介。

Lambdaがタイムアウトしてエラーになるときは、終了してしまうのでそのLambdaを使ったエラー処理はできません。どうするのが良いか、と考えて書いた記事がこちらです。

https://zenn.dev/shimo_s3/articles/dfb516f25785a2

Discussion