🌊

ロググループに保存されているログを定期的にS3にエクスポートしてみた。

2023/01/07に公開

はじめに

こんにちは、山田です。
現在携わっている案件で、CloudWatchロググループに保存されているログを一定期間経過後に長期保管用に作成したS3バケットにエクスポートするコードを作成したので以下に記載していきます。
よろしくお願いいたします。

全体概要図

全体概要図は以下の通りです。

①:ロググループ一覧とS3フォルダ一覧を比較します。
②:比較した結果S3フォルダに名前がないもの似関しては、新規でS3フォルダを作成します。

③:ロググループに保存されているログをS3にエクスポートします。

コード

以下が今回作成したコードは以下になります。

import boto3
import datetime
import time

def get_s3folder():
    s3_folder = []
    s3_folder2 =[]
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('長期保管用のS3バケット名')
    
    #長期保管用のS3バケットのフォルダ一覧を取得
    for item in bucket.objects.all():
        s3_folder.append(item.key)
    for item in s3_folder:]
    
    #最後の「/」を削除して、配列に追加
        s3_folder2.append(item[:-1])
    return s3_folder2
    
def get_loggroup():
    client = boto3.client('logs')
    loggroup = []
    test = client.describe_log_groups()
    first_count = 0
    
    #ロググループ一覧取得
    for i in range(len(test['logGroups'])):
        loggroup.append(test['logGroups'][first_count]['logGroupName'])
        first_count+= 1
    second_count = 0
    
    #「nextToken」が存在する場合、繰り返しロググループ一覧取得
    while 'nextToken' in test:
        test = client.describe_log_groups(nextToken=test['nextToken'])
        for i in range(len(test['logGroups'])):
            loggroup.append(test['logGroups'][second_count]['logGroupName'])
            second_count+=1
	    
    return loggroup

def LogsExport():
    s3folder = get_loggroup()
    #実行日取得
    to_time = datetime.datetime.now()  

    #30日前取得
    from_time = datetime.datetime.now() - datetime.timedelta(days=30) 

    #エポック時刻取得(float型)
    epoc_from_time = from_time.timestamp()
    epoc_to_time = to_time.timestamp()

    #エポック時刻をミリ秒にしint型にキャスト
    m_epoc_from_time = int(epoc_from_time * 1000)
    m_epoc_epoc_to_time = int(epoc_to_time * 1000)

    client = boto3.client('logs')
    
    #ロググループに保存されているログをS3にエクスポート
    for item in s3folder:
        client.create_export_task(
            logGroupName         = item,
            fromTime             = m_epoc_from_time,     
            to                   = m_epoc_epoc_to_time,
            destination          = '長期保管用のS3バケット名',
            destinationPrefix    = item
            )
	    
  #ログエクスポートが実行中か確認し、実行中の場合処理を5秒停止
        time.sleep(5)
        export_tasks = client.describe_export_tasks(statusCode='RUNNING')
        while len(export_tasks['exportTasks'])>= 1:
            export_tasks = client.describe_export_tasks(statusCode='RUNNING')
            time.sleep(5)

def lambda_handler(event, context):
    s3folder = get_s3folder()
    loggroup = get_loggroup()
    client = boto3.client('s3')
    for item in loggroup:
        if len(s3folder) == 0 or item not in s3folder:
            response = client.put_object(
            Bucket='長期保管用のS3バケット名',
            Key=(item + '/'),
            )
    LogsExport()

動作確認

①:ログ長期保管用のS3バケットを作成します。

②:Lambda関数を実行します。

③:長期保管用のフォルダが作成されていることを確認

④:ログがエクスポートされていることが確認できれば完了

おわりに

今回はロググループに保存されているログを定期的にS3にエクスポートする方法について記載しました。
これからも仕事で学んだことを定期的に記載していこうと思います。
最後まで読んでいただきありがとうございます。

以上。

Discussion