🦅

S3上のファイルのダウンロードが所定回数を超えたらアラート通知する方法

に公開

過去のある案件で「S3上のファイルのダウンロードが所定回数(回数のリセット日時は固定)を超えたら、アラート通知する」というセキュリティ要件があり、この要件をAWSネイティブな機能で満たすことができなかったので、Lambdaを使った独自実装で、これを実現した方法の備忘録。

なぜAWSネイティブな機能だと駄目なのか

「S3上のファイルのダウンロードが行われたらアラート通知」であれば、データイベント(S3)を有効化にしたCloudTrailのイベントをEventBridgeでキャッチすれば良いが、所定の回数を超えたらという要件を満たすには、その回数をどこかに記録しておく or 過去のダウンロードイベントを参照し、現時点だと何回目のダウンロードなのかを都度計算するといったロジックが必要になる為、AWSネイティブな機能ではこれは実現できない。
※ニッチな要件なので、おそらく、今後もこのような機能は実装されないと思われ?

構成

Dynamoを使って、ダウンロード回数を記録する構成案も当初は考えたが、S3上のファイルのダウンロードを個別で監査ログとして残す必要もあったので、この監査ログを活かす形で、ダウンロードの度に所定の日時(回数のリセット日時)〜の監査ログ件数(ダウンロード回数)をカウントし、所定の回数を超えていた場合、アラートを発報という構成にした。

サンプルコード

コメントも入れてないし、もっと良い書き方があった気がする

import json
import logging
import json
import base64
import gzip
import datetime
import pandas as pd
import boto3
import os

log_group_name = os.environ['LOG_GROUP_NAME']
sns_topic_arn = os.environ['SNS_TOPIC_ARN']
sns_subject = "test"
threshold = 2

def datetime_to_millisec_timestamp(value):
    return int(int(value.strftime('%s')) * 1000)

def parse_trigger_event(event):
    decoded_data = base64.b64decode(event)
    json_data = json.loads(gzip.decompress(decoded_data))

    print("Trigger Event: " + json.dumps(json_data))

    timestamp = json_data['logEvents'][0]['timestamp']
    message = json_data['logEvents'][0]['message']

    return timestamp, message

def get_last_monday():
    week = [0, -1, -2, -3, -4, -5, -6]
    today = datetime.date.today()
    weekday = today.weekday()
    reduce_days = week[weekday]
    last_monday = today + datetime.timedelta(reduce_days)

    return last_monday

def count_events(end_time):
    client = boto3.client('logs','ap-northeast-1')
    is_finished = False
    next_token = None
    events = []
    alert = False
    start_time = pd.to_datetime(datetime.datetime.strftime(get_last_monday(), '%Y-%m-%d') + "T" + "1:00:00", utc=True).tz_convert('Asia/Tokyo')
    end_time = pd.to_datetime(end_time, unit='ms', utc=True).tz_convert('Asia/Tokyo')

    print("Time range to count events: " + str(start_time) + " ~ "+ str(end_time))

    while not is_finished:
        args = {
            "logGroupName": log_group_name,
        }
        if next_token is not None:
            args["nextToken"] = next_token
        else:
            args["startTime"] = datetime_to_millisec_timestamp(start_time)
            args["endTime"] = datetime_to_millisec_timestamp(end_time)

        response = client.filter_log_events(**args)

        for event in response["events"]:
              events.append(event)
              if len(events) >= threshold:
                alert = True
                break
        else:
            if "nextToken" not in response:
                is_finished = True
            else:
                next_token = response["nextToken"]
            continue
        break

    return alert

def publish_sns(message):
    client = boto3.client('sns','ap-northeast-1')
    args = {
        'TopicArn': sns_topic_arn,
        'Subject': sns_subject,
        'Message': message
    }
    response = client.publish(**args)

    print("Publish SNS: " + json.dumps(response)) 

def lambda_handler(event, context):
    end_time, message = parse_trigger_event(event['awslogs']['data'])
    if count_events(end_time):
        publish_sns(message)

Discussion