🎃

memo_20240612

2024/06/12に公開

EventBridge で CloudWatch ロググループのログを日次で S3 バケットにエクスポートする

EventBridge の問題点

現状、CreateExportTask を使用してロググループのログを S3 バケットにエクスポートすることは可能。もちろん EventBridge なので cron や rate 対応しているので定期実行可能。
ただし、CreateExportTask API を使用してJSONでパラメータを渡す時に、動的にパラメータを渡すことができない・・・
つまり、日次でログが被らないように(昨日エクスポートしたログはエクスポートしない)することが EventBridge だけの機能だとできない。
具体的には、以下の画面で EventBridge から CreateExportTask API を実行する時の JSON ペイロードを指定する。

ここで問題になるのが、「From」と「To」で、リファレンスを見ると必須のパラメータなので省略できない。
しかし、この画面はUNIX時間直で指定なので、実行した日時から何日前までのタイムスタンプのログなどの動的な指定が不可能。(AWSさんどうにかして)
※CLIとかならできるんだけど、コンソールで指定する時は現状無理。
https://docs.aws.amazon.com/cli/latest/reference/logs/create-export-task.html

解決方法

EventBridge -> Lambda でやることにした。
Lambda で createExportTask を実行させることにする。
拡張性を考慮して考えてみました。

デモ

以下考慮する

  • 日次で任意のロググループのログを任意の S3 バケットにエクスポートする
  • S3 に格納されるログは重複しないようにする(きっちり日時を分けるように)
  • ロググループ一つに対して、EventBridge 一つにする
  • Lambda 関数は一つだけで、EventBridge の引数で実行するロググループを分けられるようにする(ロググループが増えても大丈夫なようにLambdaにハードコードしない)

構築

  1. エクスポート先 S3 の作成

    バケットポリシーを以下に記載する。それ以外はデフォルト設定にしてます。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:GetBucketAcl",
            "Resource": "arn:aws:s3:::<バケット名>",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "<アカウントID>"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:logs:ap-northeast-1:<アカウントID>:log-group:*"
                }
            }
        },
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::<バケット名>/*",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "<アカウントID>",
                    "s3:x-amz-acl": "bucket-owner-full-control"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:logs:ap-northeast-1:<アカウントID>:log-group:*"
                }
            }
        }
    ]
}
  1. ロググループの作成(検証用)

    検証なのでKMSキーは作ってません。
  2. Lambda 用 IAM ロール作成


    検証なので、CloudWatch Logs と s3 へのフルアクセスをアタッチしてます。
  3. Lambda 関数の作成(ランタイムはpython3.12です)


    作成したら以下のコードをデプロイします。
import boto3
import datetime
import logging
import os

# logging初期化
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info("LOAD Function: " + context.function_name)

    #現在の日付を取得する(時刻切り捨てにすることで前日実行分と重複しないようにする)
    to_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    print(to_time)

    #30日前の日付を取得する(時刻切り捨てにすることで前日実行分と重複しないようにする)
    from_time = (datetime.datetime.now() - datetime.timedelta(int(os.environ['days']))).replace(hour=0, minute=0, second=0, microsecond=0)
    print(from_time)

    #unixTime取得(float)
    unix_from_time = from_time.timestamp()
    unix_to_time = to_time.timestamp()

    #unixTimeをmsに変換してintにキャスト
    m_unix_from_time = int(unix_from_time * 1000)
    m_unix_unix_to_time = int(unix_to_time * 1000)

    #logsをロード
    client = boto3.client('logs')

    #createExportTaskを実行
    response = client.create_export_task(
        logGroupName         = event['log_group_name'],
        fromTime             = m_unix_from_time,
        to                   = m_unix_unix_to_time,
        destination          = event['s3_bucket_name'],
        destinationPrefix    = event['s3_prefix']
    )

    logger.info("END Function: " + context.function_name)

    return response

環境変数に何日前までのログを取得するか、日数を入れます。key名はdaysです。
また、Lambda はタイムゾーンがUTCになるので、環境変数TZをAsia/Tokyoにしてください。

※ここでは試しに1日前
6. EventBridge の作成




Invoke する際にペイロードを渡せます。Lambda関数内のeventに渡される値です。
以下のペイロードにしてください。
ロググループ、S3バケット名、S3プレフィックスは任意の値を入れてください。
Lambda関数内のcreate_export_taskメソッドで参照してます。

{
    "log_group_name": "<ロググループ名>",
    "s3_bucket_name": "<S3バケット名>",
    "s3_prefix": "<S3プレフィックス>"
}




実行確認

EventBridge の実行時間を 2024/06/12 午前2:39 に変更したので、その時間の実行ログを確認する。

以下Lambdaの実行ログです。

特にエラーは出ていない

では、実際にログが吐かれているか確認


しっかり来てます。
これでOKです。
※実際のログは0000.gzみたいなファイルになります。画像で「aws-logs-write-test」しかないのはロググループにサンプルのログを入れ忘れたためです。

まとめ

EventBridge と Lambda で CloudWatchLogs から簡単に定期的にたぶん重複せずにS3バケットへログを送れるものができました。
ロググループが増えた場合は、EventBridge を増やすだけで簡単に増殖できます。
EventBridge にアカウント内のロググループ名を配列にしてLambdaに渡す方法もありますが、ログが非常に多い場合などは15分以内で終わらない可能性も無きにしも非ずということで、EventBridge は分ける形にしています。
StepFunctions を使ってもいいですが、今回は Lambda の依存関係はないので、EventBridge ごり押しにしました。

EventBridge から複数のロググループを配列で渡して一気に実行させるパターンのコード

import boto3
import datetime
import logging
import os
import time

# logging初期化
logger = logging.getLogger()
logger.setLevel(logging.INFO)

#エクスポートタスク実行関数
def create_export_task(client, log_group_name, destination_bucket, from_time, to_time, destination_prefix):
    response = client.create_export_task(
        logGroupName=log_group_name,
        fromTime=from_time,
        to=to_time,
        destination=destination_bucket,
        destinationPrefix=destination_prefix
    )
    print(response['taskId'])
    return response['taskId']

#タスクの進捗状況を確認する関数
def wait_for_export_task_completion(client, task_id):
    #COMPLETEになるまでループ
    while True:
        try:
            print(task_id)
            response = client.describe_export_tasks(taskId=task_id)
            task_status = response['exportTasks'][0]['status']['code']
            print(f"Task status: {task_status}")

            #ステータスがCOMPLETEになったらbreakする
            if task_status == 'COMPLETED':
                print(f"Export task {task_id} is COMPLETED")
                break
            else:
                print(f"Unhandled task status: {task_status}")
                #終わってなかったら5s待機
                time.sleep(5)
                break
        except Exception as e:
            print(f"An error occurred while checking the export task status: {str(e)}")
            time.sleep(3)  # Wait before retrying in case of error

def lambda_handler(event, context):
    logger.info("LOAD Function: " + context.function_name)

    #現在の日付を取得する(時刻切り捨て)
    to_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    print(to_time)

    #30日前の日付を取得する(時刻切り捨て)
    from_time = (datetime.datetime.now() - datetime.timedelta(int(os.environ['days']))).replace(hour=0, minute=0, second=0, microsecond=0)
    print(from_time)

    #unixTime取得(float)
    unix_from_time = from_time.timestamp()
    unix_to_time = to_time.timestamp()

    #unixTimeをmsに変換してintにキャスト
    m_unix_from_time = int(unix_from_time * 1000)
    m_unix_to_time = int(unix_to_time * 1000)
    
    #logsをロード
    client = boto3.client('logs')
    
    #ロググループ一覧の配列をEventBridgeペイロードから参照
    log_groups = event['log_group_name']
    bucket = event['s3_bucket_name']
    
    for log_group_name in log_groups:
        print(f"Starting export task for log group: {log_group_name}")
        task_id = create_export_task(client, log_group_name, bucket, m_unix_from_time, m_unix_to_time, log_group_name)
        wait_for_export_task_completion(client, task_id)
        print(f"Finished export task for log group: {log_group_name}")

    logger.info("END Function: " + context.function_name)

    return {
        'statusCode': 200,
        'body': 'Log export tasks completed for all log groups'
    }
    

ペイロードJSON

{
    "log_group_name": [
        "log-group-ymz-note-create-export-task-demo",
        "log-group-ymz-note-create-export-task-demo-001",
        "log-group-ymz-note-create-export-task-demo-002"
    ],
    "s3_bucket_name": "ymz-note-create-export-task-demo"
}

Discussion