Lambdaのリトライ回数に応じて処理を変えたい
「AWS LambdaとServerless Advent Calendar 2022」3日目の記事です。
TL;DR
- 実行中のLambdaのリトライ回数を、その場で調べたい
- その1 DynamoDBを使う
- その2 CloudWatch Logsを確認する
リトライについて少し整理
呼び出し方によって、リトライの仕組みが違いますが、今回扱うのは「非同期実行のリトライ」です。
-
Lambdaを直接呼び出すとき
- リトライは起きないので、自力でエラー処理する
- コンソールのテスト実行もこちらに含まれる
-
非同期実行
- EventBridge、S3、SNSなどから呼ばれたとき
- リトライは0-2回(デフォルト2)
- 待機時間 リトライ1回目の前に1分間、2回目の前に2分間
-
イベントソースマッピング
- Kinesis,DynamoDBなど
- Messageがexpireするまでバッチ全体をリトライ
- SQS
- Messageがexpireするまでバッチ全体をリトライ
- 終了したキューは削除することもできる
- Kinesis,DynamoDBなど
どのAWSサービスが非同期なのかは、こちらのドキュメントの表を参照してください。→他のサービスで AWS Lambda を使用するに表があります。
参考
-
(docs.aws)エラー処理と AWS Lambda での自動再試行
-
(Youtube) AWS re:Invent 2020: Handling errors in a serverless world
Lambdaのリトライ回数に応じて処理を変えたい
非同期呼び出しで、リトライ回数は0,1,2回を設定できます。何回目のリトライかによって分岐をさせたい、なんてこともあるのではないでしょうか。
ところが、Lambda関数は自身がリトライなのかどうかの情報を持っていません。「私はたぶん3人目だと思うから」とか言いません。
ということで、Lambda実行中に調べられないかとやってみました。ここでは
- DynamoDBに記録する方法
- CloudWatch Logsで調べる方法
を紹介します。
DynamoDBで記録する
DynamoDBを作成する必要がありますが、こちらの方が簡単です。
Lambdaを実行すると、ユニークなrequest_id
が発行されますが、これはリトライ時には同じものが使われます。このrequest_id
をキーにしてDynamoDBに実行回数を保存して、問い合わせます。
テーブルを作る
CLIで作ります。名前は下で使うLambdaRetryCounter
にしてあります。
aws dynamodb create-table \
--table-name LambdaRetryCounter \
--attribute-definitions AttributeName=Name,AttributeType=S \
--key-schema AttributeName=Name,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
Lambdaを作る
- Python3.9以上でデフォルトで作ります。
- パーミッションRoleに、DynamoDB用に
AmazonDynamoDBFullAccess
を追加します。 - 非同期呼び出しには、私はよくEventBridgeから起動していますが、SNSで呼び出したほうが早いかもしれません。
Codeでやっていること
- 非同期でLambdaを呼び出し
- DynamoDBにカウントを問い合わせ
- DynamoDBのカウンターを+1、EventTimeを更新
- Exceptionでエラー
from datetime import datetime
from zoneinfo import ZoneInfo
import boto3
tz = ZoneInfo("Asia/Tokyo")
dynamo_table_name = 'LambdaRetryCounter'
def lambda_retry_counter_dynamodb(name):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(dynamo_table_name)
response = table.get_item(
Key={"Name": name},
AttributesToGet=['InvocationCount'],
)
if response.get('Item') is None:
dynamo_cnt = 0
else:
dynamo_cnt = response['Item']['InvocationCount']
table.update_item(
Key={"Name": name},
UpdateExpression='ADD InvocationCount :incr',
ExpressionAttributeValues={':incr': 1},
)
table.update_item(
Key={"Name": name},
AttributeUpdates={
'LastEventTime': {
'Value': datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S%z"),
'Action': 'PUT'
}
},
)
return dynamo_cnt
def lambda_handler(event, context):
request_id = context.aws_request_id
dynamodb_cnt = lambda_retry_counter_dynamodb(request_id)
print("dynamodb_cnt", dynamodb_cnt)
raise Exception
結果
CloudWatchを見てみると、print("dynamodb_cnt", dynamodb_cnt)
でリトライカウントが0,1,2と出力されています。
DynamoDBのカウンターを見てみると、想定通りにカウントできています。このレコードは不要になるので、TTL 1時間で消してもいいかもしれません。
CloudWatch Logsを調べる
この方法では、追加のリソースは必要ありません。Lambda自身が直前に実行したログを、CloudWatch Logsストリームの中から調べます。
Lambdaを作る
- Pythonのデフォルトで作って、タイムアウトを変更します(例えば10秒)。
- パーミッションでCloudWatch用に
CloudWatchLogsFullAccess
を追加します。 - 非同期で呼び出すと、リトライしてカウントがCW logに書き込まれます。
このとき以下のようなことをやっています。
-
get_log_events
でLambda自身のログをCloudWatch Logsに読みに行く。 - ログの中で、自身のユニークなリクエストIDを使って
f"END RequestId: {request_id}"
を探し、これが見つかればリトライ1回とする。 -
get_log_events
ではCWのデータを見るときに空の結果が返ることがある、という問題(仕様)がある。その回避方法として、nextForwardToken
が2回同じものが出るまで繰り返し、出力はsetとfrozenを使って同一項目を削除する。(空のログ取得を防ぐ方法はこちらを参考にしました。) - Exceptionでエラー
import time
import boto3
client = boto3.client("logs")
def get_log_streams(log_group_name):
response = client.describe_log_streams(
logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=1
)
log_stream_names = [i["logStreamName"] for i in response["logStreams"]]
return log_stream_names
def get_lambda_retry_count(request_id, log_stream_name, log_group_name, search_words):
next_token = None
log_events_set = set()
while True:
args = {
"logGroupName": log_group_name,
"logStreamName": log_stream_name,
"startFromHead": True,
}
if next_token is not None:
args["nextToken"] = next_token
response = client.get_log_events(**args)
for log_event in response["events"]:
key = frozenset(log_event.items())
log_events_set.add(key)
new_next_token = response.get("nextForwardToken")
if new_next_token is None:
break
elif next_token == new_next_token:
break # End of the stream
else:
next_token = new_next_token
retry_cnt = 0
for i in log_events_set:
d = dict(i) # from frozen to dict
if search_words in d["message"]:
retry_cnt += 1
return retry_cnt
def lambda_handler(event, context):
time.sleep(5) # Wait: log group created
request_id = context.aws_request_id
search_words = f"END RequestId: {request_id}"
log_group_name = f"/aws/lambda/{context.function_name}"
log_stream_names = get_log_streams(log_group_name)
log_stream_name = log_stream_names[0]
retry_count = get_lambda_retry_count(
request_id,
log_stream_name,
log_group_name,
search_words,
)
print("retry_count:", retry_count)
raise Exception
結果
こういう結果になるはずです。
カウントされていますね!これでリトライ回数に応じて処理を変えることができそうです。
参考
How to query cloudwatch logs using boto3 in python
リトライをboto3で調べるネタ。こちらはget_query_results
を使っています。最初この実装を試していたのですが、うまくいかずget_log_events
に変えました。
まとめ
Lambdaのエラー処理についてまとめたり、リトライ回数を調べる方法について紹介しました。
おまけ
以前のものですが、エラー処理のことを書いた記事がややウケたのでご紹介。
Lambdaがタイムアウトしてエラーになるときは、終了してしまうのでそのLambdaを使ったエラー処理はできません。どうするのが良いか、と考えて書いた記事がこちらです。
Discussion