🔥

TerraformでCloudWatchLogsのエラーログをEmailとSlackで通知する

2022/01/28に公開

今回はCloudWatchLogsに出力したアプリのエラーログなどを SlackとEmailで通知する設定をTerraformで実装してみます。

前提

今回はあくまでもCloudWatchLogsにログが出力される条件が整った上で、そこからEmailとSlackで通知するのをTerraformでやってみるという記事となります。

前回記載したこちらの記事の続きの様な形となります。

https://zenn.dev/bun913/articles/c25765744352a4

↑でやったこと

  • 超シンプルなアプリをECS(FARGATE実行環境)で動かす
  • ECSタスクには firelens用のfluentbidコンテナをサイドカーとして配置しており、アプリの標準出力に出力されるログをS3に流しつつ、エラー系のログはCloudWatchLogsに出力

構成図としては以下の様な形になります。

やること

  • Email通知用のSNSトピックを作成
  • CloudWatchLogsにサブスクリプションフィルターを追加して、lambda関数を呼び出す
  • lambda関数からSNSトピックを呼び出してメール送信・slack通知を行う

やらないこと

  • lambda関数のコードを別リポジトリで管理

補足

Slackへの通知に関して、今回lambda関数を記載しましたがAWSのChatbotを使えばコードを書かずに実現できそうです。

https://dev.classmethod.jp/articles/slack-notif-for-codeseries-by-terraform

ただ残念なことに 2022/1/28現在 Terraformで通常の方法では扱えません(CloudFormationを利用してするmoduleもあるようです)

https://zenn.dev/shonansurvivors/articles/894cae91806052

そのため今回はlambda関数で実装してみました!

実装内容

今回のモジュールは以下の様なディレクトリ構成となっています。
ネットワーク等のモジュールの内容は今回記載しておりませんが、以下リポジトリにコードを配置しております。

https://github.com/bun913/aws_network_practice/tree/main/ecs_cloudwatch_logs

**.
├── README.md
├── main.tf
├── modules
│   ├── network
│   │   ├── main.tf
│   │   ├── output.tf
│   │   └── variables.tf
│   └── web_app
│       ├── alb.tf
│       ├── ecr.tf
│       ├── ecs.tf
│       ├── files
│       │   └── ecs_task_assume_policy.json
│       ├── fluentbit
│       │   ├── Dockerfile
│       │   └── extra.conf
│       ├── iam.tf
│       ├── lambda
│       │   ├── app_log_notificate
│       │   │   └── index.py
│       │   └── sns_publish.zip
│       ├── lambda.tf
│       ├── log.tf
│       ├── output.tf
│       ├── security_group.tf
│       ├── variables.tf
│       └── vpc_endpoint.tf
├── provider.tf
├── terraform.tfstate
├── terraform.tfvars
└── variables.tf

なお、事前にSlackのwebhook_urlを発行しておく必要があります。

https://qiita.com/vmmhypervisor/items/18c99624a84df8b31008

この様な記事を参考に事前に取得して、 tfvarsに記載しておきます。

terraformコードの実装

SNSトピックの作成

log.tf
# メール通知用SNSトピック作成
resource "aws_sns_topic" "notificate" {
  name = "${var.project}-error-notification"
}

# ["hoge@gmail.com", "fuga@gmail.com"] のように配列で通知先emailを渡す
resource "aws_sns_topic_subscription" "topic_email_subscription" {
  for_each  = { for email in var.emails : email => email }
  topic_arn = aws_sns_topic.notificate.arn
  protocol  = "email"
  endpoint  = each.value
}

CloudWatchLogsのサブスクリプションフィルターの準備

log.tf
# エラーログをSNSトピックに配信するためのlambdaにサブスクリプション
resource "aws_cloudwatch_log_subscription_filter" "error_log_subscription" {
  name            = "${var.project}-error-subscription"
  log_group_name  = aws_cloudwatch_log_group.app.id
  filter_pattern  = "ERROR"
  destination_arn = aws_lambda_function.sns_publish_func.arn
}

Lambda関数の準備

lambda.tf
# エラーログをSNSにパブリッシュするLambda関数を固める
data "archive_file" "sns_publish_func" {
  type        = "zip"
  source_dir  = "${path.module}/lambda"
  output_path = "${path.module}/lambda/sns_publish.zip"
}

resource "aws_lambda_function" "sns_publish_func" {
  filename         = data.archive_file.sns_publish_func.output_path
  function_name    = "${var.project}_sns_publish"
  role             = aws_iam_role.lambda_sns_publish.arn
  handler          = "app_log_notificate.index.lambda_handler"
  source_code_hash = data.archive_file.sns_publish_func.output_base64sha256
  runtime          = "python3.9"

  environment {
    variables = {
      SNS_TOPIC_ARN = aws_sns_topic.notificate.arn
      ALARM_SUBJECT = "【Error Notification】${var.project}"
      WEB_HOOK_URL  = var.slack_webhook_url
    }
  }

  depends_on = [
    aws_cloudwatch_log_group.sns_publish_func,
    aws_iam_role.lambda_sns_publish
  ]

}

# CloudwatchLogsからLogの実行を許可
resource "aws_lambda_permission" "log_permission" {
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.sns_publish_func.arn
  principal     = "logs.ap-northeast-1.amazonaws.com"
  source_arn    = "${aws_cloudwatch_log_group.app.arn}:*"
}

# lambda関数のログ出力のためのCloudWatchLogsのロググループ
resource "aws_cloudwatch_log_group" "sns_publish_func" {
  name              = "/aws/lambda/${var.project}_sns_publish"
  retention_in_days = 14
}

lambda関数に付与するIAMロール

iam.tf
#####################
# SNSに通知を発行するlambdaに付与するIAMロール
#####################
resource "aws_iam_role" "lambda_sns_publish" {
  name = "${var.project}-publish-sns"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "lambda_sns_publish" {
  name = "${var.project}-publish-sns"
  role = aws_iam_role.lambda_sns_publish.id

  policy = <<EOF
{
  "Statement": [
    {
      "Sid": "",
      "Action": [
        "SNS:Publish"
      ],
      "Effect": "Allow",
      "Resource": "*"
    },
    {
      "Sid": "",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*",
      "Effect": "Allow"
    }
  ]
}
EOF
}

lambda関数の実装

冗長なところがありお恥ずかしいです・・・

一旦動くコードを記載しました。

lambda/app_log_notificate/index.py
import base64
import json
import zlib
import os
from urllib import request
import boto3

print('Loading function')


def lambda_handler(event, context):
    data = zlib.decompress(base64.b64decode(
        event['awslogs']['data']), 16+zlib.MAX_WBITS)
    data_dict = json.loads(data)
    log_entire_json = json.loads(json.dumps(
        data_dict["logEvents"], ensure_ascii=False))
    log_entire_len = len(log_entire_json)
    region = context.invoked_function_arn.split(":")[3]
    log_group_name = context.log_group_name
    log_stream_name = context.log_stream_name
    log_url = "https://"+region+".console.aws.amazon.com/cloudwatch/home?region="+region+"#logEvent:group="+log_group_name+";stream="+log_stream_name
    post_sns_topic(log_entire_len, data_dict, log_url)
    post_to_slack(log_entire_len, data_dict, log_url)


def post_sns_topic(log_entire_len: int, data_dict: dict, log_url:str):
    for i in range(log_entire_len):
        log_dict = json.loads(json.dumps(
            data_dict["logEvents"][i], ensure_ascii=False))
        try:
            message_str = log_dict["message"]
            message_dict = json.loads(message_str)
            message_dict['log_url'] = log_url
            message = json.dumps(message_dict)
            sns = boto3.client('sns')

            # SNS Publish
            publishResponse = sns.publish(
                TopicArn=os.environ['SNS_TOPIC_ARN'],
                Message=message,
                Subject=os.environ['ALARM_SUBJECT']
            )
        except Exception as e:
            print("[sns_notice_exception: ]" + str(e))


def post_to_slack(log_entire_len: int, data_dict: dict, log_url:str):
    for i in range(log_entire_len):
        log_dict = json.loads(json.dumps(
            data_dict["logEvents"][i], ensure_ascii=False))
        try:
            message_str = log_dict["message"]
            message_dict = json.loads(message_str)
            message_dict['log_url'] = log_url
            message = json.dumps(message_dict)
            send_data = {
                "username": "notificate_bot",
                "icon_emoji": "aws",
                "text": message,
            }
            r = request.Request(
                url=os.environ['WEB_HOOK_URL'],
                data=json.dumps(send_data).encode('utf-8')
            )
            with request.urlopen(r) as response:
                response_body = response.read().decode('utf-8')
                print(response_body)
        except Exception as e:
            print("[slack_notice_exection: ]" + str(e))


結果

これであえてアプリ側でエラーを出すことにより・・・

slackに通知が来て・・・

emailにも通知がありました!

どちらにも CloudWatchLogsのログストリームのURLを記載しているためワンクリックでエラーログを確認することができます。

実践で利用する際はもっとメッセージのテキスト整形なども実装が必要かと思います・・・

Discussion