📹

LambdaのTimeoutハンドリングをCDKで実装

2022/10/13に公開約8,200字

背景

LambdaのTimeOutを検知する際、そこそこ実装量があると勘違いして一旦は見送った経験があった。

で、こないだこの記事がTLに流れてきて簡単だったと気づく。

https://zenn.dev/shimo_s3/articles/dfb516f25785a2

下の公式ブログを参考に、Lambda→SubscriptionFilter→Lambda→SNSが動作していて、これがパッと使えたら良かったなと反省。

https://aws.amazon.com/jp/blogs/mt/get-notified-specific-lambda-function-error-patterns-using-cloudwatch/

また検討する際に便利だろうと思い、前者の@shimoさんの記事をCDKでリメイクしてみただけでの記事である。

コードサンプル

元記事との変更点は

  • sleepするLambdaは勉強がてらNode.js
    • 手癖でPython書いてしまうので
  • SNSの通知先はSQS
    • 面倒だったので

CDK

スタック名は適当なので、各人読み替えてほしい。

lib/sample.ts
import { Stack, StackProps, Duration } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as logs from "aws-cdk-lib/aws-logs";
import * as destinations from "aws-cdk-lib/aws-logs-destinations";
import * as sns from "aws-cdk-lib/aws-sns";
import * as subscriptions from "aws-cdk-lib/aws-sns-subscriptions";
import * as sqs from "aws-cdk-lib/aws-sqs";

export class SampleStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // 1. init
    const timeoutSeconds = 3;
    const fnCode = `
      const { setTimeout } = require("timers/promises");
      exports.handler = async function(event) {
           await setTimeout(${(timeoutSeconds + 1) * 1000});
      };
  `;
    const topic = new sns.Topic(this, "Topic", {
      displayName: "sample filter topic",
    });
    // 2. Setting Lambda
    const timeoutFn = new lambda.Function(this, "timeoutLambda", {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: "index.handler",
      code: lambda.Code.fromInline(fnCode),
      timeout: Duration.seconds(timeoutSeconds),
    });
    const filterFn = new lambda.Function(this, "fliterLambda", {
      code: new lambda.AssetCode("./filter"),
      handler: "filter.lambda_handler",
      runtime: lambda.Runtime.PYTHON_3_9,
      environment: {
        snsARN: topic.topicArn,
      },
    });

    // 3. Setting Subscription Filter
    const logGroup = logs.LogGroup.fromLogGroupName(
      this,
      "testLambdaLogGroup",
      timeoutFn.logGroup.logGroupName
    );
    logGroup.addSubscriptionFilter("Subscription", {
      destination: new destinations.LambdaDestination(filterFn),
      filterPattern: logs.FilterPattern.anyTerm("Task timed out after"),
    });

    // 4. Setting Destination
    const queue = new sqs.Queue(this, "Queue");
    topic.addSubscription(new subscriptions.SqsSubscription(queue));
    topic.grantPublish(filterFn);
  }
}

Lambda Code

記事差し替えなども想定して、公式ブログの引用。

Lambda(to sns)
filter/filter.py
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at## http://aws.amazon.com/apache2.0/
# or in the "license" file accompanying this file.
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
# Description: This Lambda function sends an email notification to a given AWS SNS topic when a particular
#              pattern is matched in the logs of a selected Lambda function. The email subject is
#              Execution error for Lambda-<insert Lambda function name>.
#              The JSON message body of the SNS notification contains the full event details.

# Author: Sudhanshu Malhotra

import base64
import boto3
import gzip
import json
import logging
import os

from botocore.exceptions import ClientError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def logpayload(event):
    logger.setLevel(logging.DEBUG)
    logger.debug(event['awslogs']['data'])
    compressed_payload = base64.b64decode(event['awslogs']['data'])
    uncompressed_payload = gzip.decompress(compressed_payload)
    log_payload = json.loads(uncompressed_payload)
    return log_payload


def error_details(payload):
    error_msg = ""
    log_events = payload['logEvents']
    logger.debug(payload)
    loggroup = payload['logGroup']
    logstream = payload['logStream']
    lambda_func_name = loggroup.split('/')
    logger.debug(f'LogGroup: {loggroup}')
    logger.debug(f'Logstream: {logstream}')
    logger.debug(f'Function name: {lambda_func_name[3]}')
    logger.debug(log_events)
    for log_event in log_events:
        error_msg += log_event['message']
    logger.debug('Message: %s' % error_msg.split("\n"))
    return loggroup, logstream, error_msg, lambda_func_name


def publish_message(loggroup, logstream, error_msg, lambda_func_name):
    sns_arn = os.environ['snsARN']  # Getting the SNS Topic ARN passed in by the environment variables.
    snsclient = boto3.client('sns')
    try:
        message = ""
        message += "\nLambda error  summary" + "\n\n"
        message += "##########################################################\n"
        message += "# LogGroup Name:- " + str(loggroup) + "\n"
        message += "# LogStream:- " + str(logstream) + "\n"
        message += "# Log Message:- " + "\n"
        message += "# \t\t" + str(error_msg.split("\n")) + "\n"
        message += "##########################################################\n"

        # Sending the notification...
        snsclient.publish(
            TargetArn=sns_arn,
            Subject=f'Execution error for Lambda - {lambda_func_name[3]}',
            Message=message
        )
    except ClientError as e:
        logger.error("An error occured: %s" % e)


def lambda_handler(event, context):
    pload = logpayload(event)
    lgroup, lstream, errmessage, lambdaname = error_details(pload)
    publish_message(lgroup, lstream, errmessage, lambdaname)

CDKコード解説

上のコードだけ読んでも十分な気もするが、念のため今回の意図を残しておく。

1. init

後述のLambda用コード、SNSTopicを定義している。

    const timeoutSeconds = 3;
    const fnCode = `
      const { setTimeout } = require("timers/promises");
      exports.handler = async function(event) {
           await setTimeout(${(timeoutSeconds + 1) * 1000});
      };
  `;
    const topic = new sns.Topic(this, "Topic", {
      displayName: "sample filter topic",
    });
  • Node.jsのsleep処理は以前は面倒だったが、v16からはsetTimeoutが直感的で便利。
    • 引数に入れるのをsecondと勘違いしており変にハマったが、millisecondにして設定。
  • SNS Topicもこのタイミングで作ると楽。

2. Setting Lambda

2種類のLambdaを定義している。

    const timeoutFn = new lambda.Function(this, "timeoutLambda", {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: "index.handler",
      code: lambda.Code.fromInline(fnCode),
      timeout: Duration.seconds(timeoutSeconds),
    });
    const filterFn = new lambda.Function(this, "fliterLambda", {
      code: new lambda.AssetCode("./filter"),
      handler: "filter.lambda_handler",
      runtime: lambda.Runtime.PYTHON_3_9,
      environment: {
        snsARN: topic.topicArn,
      },
    });
  • 前述のtimeout用LambdaはNode.js v16とし、前述のInlineコードを指定。
    • あくまでデモ用なので、普通は別ファイルから読む。
  • SubscriptionFilter用のLambdaには環境変数snsARNの設定が必要。
    • 前述のSNS TopicのARNを指定。

3. Setting Subscription Filter

Timeout用LambdaのCW Logsを参照しSubscriptionFilterを追加

    const logGroup = logs.LogGroup.fromLogGroupName(
      this,
      "testLambdaLogGroup",
      timeoutFn.logGroup.logGroupName
    );
    logGroup.addSubscriptionFilter("Subscription", {
      destination: new destinations.LambdaDestination(filterFn),
      filterPattern: logs.FilterPattern.anyTerm("Task timed out after"),
    });
  • FilterPatternはLambda固有の文字列"Task timed out after"でキャッチする。

4. Setting Destination

Subscription Filterの通知先SQS Queueを作成する。

    const queue = new sqs.Queue(this, "Queue");
    topic.addSubscription(new subscriptions.SqsSubscription(queue));
    topic.grantPublish(filterFn);
  • LambdaからSNSを呼び出せるようgrantPublishも指定。

終わりに

  • LambdaのTimeoutをキャッチするパターンをCDKとして写経できた。(私は知らなかったが)定型パターンの割にCDKのサンプルが見つからないので、適宜参考にしてほしい。
  • SubscriptionFilterの連携先がLambdaかKinesis系のみ指定可能な為、現在はLambdaのサンプルコードで頑張るしかない。StepFunctions,EventBridgeにも連携できると助かりそう。

Discussion

ログインするとコメントできます