🐍

ALB / NLB のアクセスログを CloudWatch Logs に転送する(2023 年版)

2023/12/12に公開

はじめに

ALB / NLB のアクセスログは S3 にしか出力できません。そのため、ログを検索するには Athena など S3 に対応した分析ツールを使用する必要があります。

Athena も非常に便利なツールですが、もっと手軽に CloudWatch Logs で閲覧したい場合は、S3 の Create Object イベントをトリガーに Lambda を実行してなんとかするパターンが一般的です。サンプルコードも GitHub などで見つけることができます。

https://github.com/rupertbg/aws-load-balancer-logs-to-cloudwatch

わたし自身とても参考にさせていただいていますが、2023/1 の CloudWatch Logs 制限緩和に対応したサンプルコードが見つからなかったので作りました。せっかくなので共有したいと思います。

CloudWatch Logs の制限緩和

もう 1 年近く昔のことですが、2023/1/4 に以下のアップデートがありました。

https://aws.amazon.com/jp/about-aws/whats-new/2023/01/amazon-cloudwatch-logs-log-stream-transaction-quota-sequencetoken-requirement/

このアップデートでは 2 つの大幅な制限緩和が発表されました。

Amazon CloudWatch Logs では、Amazon CloudWatch Logs の PutLogEvents API を呼び出す際の、1 秒あたり 5 件のリクエストのログストリームクォータを撤廃します。

Amazon CloudWatch Logs では、Amazon CloudWatch Logs の PutLogEvents API を呼び出す際にシーケンストークンを提供する必要もなくなります。

これまで CloudWatch Logs のPutLogEvents API を使用する場合はsequenceTokenを渡す必要がありましたが、上記のアップデートによりそれが不要となります。

後方互換を維持するため、sequenceTokenを渡しても処理はエラーになりませんし、これまで同様 Response にはnextSequenceTokenが含まれますので、既存のコードに影響はありません。ただし、今後はInvalidSequenceTokenException、およびDataAlreadyAcceptedExceptionのエラーは返されなくなります。
PutLogEventsの API のマニュアルにもnextSequenceTokenは「has been deprecated」とあるため、これから実装する場合は考慮した方がよいでしょう。実装としても、余計なエラーハンドリングが不要になるのでコードもシンプルになります。

https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html

Lambda サンプルコード

以下がサンプルコードになります。

import boto3
import os
import time
import gzip
import urllib.parse

s3 = boto3.client('s3')
cloudwatch_logs = boto3.client('logs')

LOG_GROUP_NAME = os.environ['LOG_GROUP_NAME']
ELB_TYPE = os.environ['ELB_TYPE']
ELB_ID_COLUMN_NO = {
    "network": 3,
    "application": 2
}


def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(
        event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    objects = get_unzip_objects(bucket, key)
    put_logs(cloudwatch_logs, LOG_GROUP_NAME, objects)


def get_unzip_objects(bucket, key):
    object = s3.get_object(Bucket=bucket, Key=key)
    unzip_objects = gzip.decompress(
        object['Body'].read()).decode('utf-8').splitlines()
    return unzip_objects


def put_logs(client, log_group_name, messages):
    for message in messages:
        log_stream_name = message.split(" ")[ELB_ID_COLUMN_NO.get(ELB_TYPE)]
        try:
            log_event = {
                'timestamp': int(time.time()) * 1000,
                'message': message
            }
            try:
                client.put_log_events(
                    logGroupName=log_group_name,
                    logStreamName=log_stream_name,
                    logEvents=[log_event])
            except client.exceptions.ResourceNotFoundException as e:
                client.create_log_stream(
                    logGroupName=log_group_name,
                    logStreamName=log_stream_name)
                client.put_log_events(
                    logGroupName=log_group_name,
                    logStreamName=log_stream_name,
                    logEvents=[log_event])
            except Exception as e:
                print(e)
                break
        except Exception as e:
            print(e)

トリガーには S3 の Object Create イベントを設定します。以下のチュートリアルを参考にしてみてください。

https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html

ロググループ名は環境変数LOGGROUP_NAMEで渡します。Lambda コードにはロググループの作成処理は含めていないため、事前にロググループを作成しておいてください。
また、ログから「ロードバランサーのリソース ID」を取得してログストリーム名に使用しています。ELB のタイプ(ALB / NLB)によってログのフォーマットが異なるため、「ロードバランサーのリソース ID」の位置を特定するために環境変数ELB_TYPEで対象となる ELB のタイプを指定します(application or network)。

おわりに

ずっと書こうと思ってたネタですが、半年以上温めてしまいました…。おかげで CloudWatch Logs の制限緩和からもう 1 年近く経ってしまってます。もう少しアウトプットの頻度を上げたいんですが、一度しっかり検証してから…とか考えてると時間がかかってしまいますね。少しずつですがこれからもアウトプットを継続していきたいと思います。

Discussion