ALB / NLB のアクセスログを CloudWatch Logs に転送する(2023 年版)
はじめに
ALB / NLB のアクセスログは S3 にしか出力できません。そのため、ログを検索するには Athena など S3 に対応した分析ツールを使用する必要があります。
Athena も非常に便利なツールですが、もっと手軽に CloudWatch Logs で閲覧したい場合は、S3 の Create Object イベントをトリガーに Lambda を実行してなんとかするパターンが一般的です。サンプルコードも GitHub などで見つけることができます。
わたし自身とても参考にさせていただいていますが、2023/1 の CloudWatch Logs 制限緩和に対応したサンプルコードが見つからなかったので作りました。せっかくなので共有したいと思います。
CloudWatch Logs の制限緩和
もう 1 年近く昔のことですが、2023/1/4 に以下のアップデートがありました。
このアップデートでは 2 つの大幅な制限緩和が発表されました。
Amazon CloudWatch Logs では、Amazon CloudWatch Logs の PutLogEvents API を呼び出す際の、1 秒あたり 5 件のリクエストのログストリームクォータを撤廃します。
Amazon CloudWatch Logs では、Amazon CloudWatch Logs の PutLogEvents API を呼び出す際にシーケンストークンを提供する必要もなくなります。
これまで CloudWatch Logs のPutLogEvents
API を使用する場合はsequenceToken
を渡す必要がありましたが、上記のアップデートによりそれが不要となります。
後方互換を維持するため、sequenceToken
を渡しても処理はエラーになりませんし、これまで同様 Response にはnextSequenceToken
が含まれますので、既存のコードに影響はありません。ただし、今後はInvalidSequenceTokenException
、およびDataAlreadyAcceptedException
のエラーは返されなくなります。
PutLogEvents
の API のマニュアルにもnextSequenceToken
は「has been deprecated」とあるため、これから実装する場合は考慮した方がよいでしょう。実装としても、余計なエラーハンドリングが不要になるのでコードもシンプルになります。
Lambda サンプルコード
以下がサンプルコードになります。
import boto3
import os
import time
import gzip
import urllib.parse
s3 = boto3.client('s3')
cloudwatch_logs = boto3.client('logs')
LOG_GROUP_NAME = os.environ['LOG_GROUP_NAME']
ELB_TYPE = os.environ['ELB_TYPE']
ELB_ID_COLUMN_NO = {
"network": 3,
"application": 2
}
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'], encoding='utf-8')
objects = get_unzip_objects(bucket, key)
put_logs(cloudwatch_logs, LOG_GROUP_NAME, objects)
def get_unzip_objects(bucket, key):
object = s3.get_object(Bucket=bucket, Key=key)
unzip_objects = gzip.decompress(
object['Body'].read()).decode('utf-8').splitlines()
return unzip_objects
def put_logs(client, log_group_name, messages):
for message in messages:
log_stream_name = message.split(" ")[ELB_ID_COLUMN_NO.get(ELB_TYPE)]
try:
log_event = {
'timestamp': int(time.time()) * 1000,
'message': message
}
try:
client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[log_event])
except client.exceptions.ResourceNotFoundException as e:
client.create_log_stream(
logGroupName=log_group_name,
logStreamName=log_stream_name)
client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[log_event])
except Exception as e:
print(e)
break
except Exception as e:
print(e)
トリガーには S3 の Object Create イベントを設定します。以下のチュートリアルを参考にしてみてください。
ロググループ名は環境変数LOGGROUP_NAME
で渡します。Lambda コードにはロググループの作成処理は含めていないため、事前にロググループを作成しておいてください。
また、ログから「ロードバランサーのリソース ID」を取得してログストリーム名に使用しています。ELB のタイプ(ALB / NLB)によってログのフォーマットが異なるため、「ロードバランサーのリソース ID」の位置を特定するために環境変数ELB_TYPE
で対象となる ELB のタイプを指定します(application
or network
)。
おわりに
ずっと書こうと思ってたネタですが、半年以上温めてしまいました…。おかげで CloudWatch Logs の制限緩和からもう 1 年近く経ってしまってます。もう少しアウトプットの頻度を上げたいんですが、一度しっかり検証してから…とか考えてると時間がかかってしまいますね。少しずつですがこれからもアウトプットを継続していきたいと思います。
Discussion