🦅
S3上のファイルのダウンロードが所定回数を超えたらアラート通知する方法
過去のある案件で「S3上のファイルのダウンロードが所定回数(回数のリセット日時は固定)を超えたら、アラート通知する」というセキュリティ要件があり、この要件をAWSネイティブな機能で満たすことができなかったので、Lambdaを使った独自実装で、これを実現した方法の備忘録。
なぜAWSネイティブな機能だと駄目なのか
「S3上のファイルのダウンロードが行われたらアラート通知」であれば、データイベント(S3)を有効化にしたCloudTrailのイベントをEventBridgeでキャッチすれば良いが、所定の回数を超えたら
という要件を満たすには、その回数をどこかに記録しておく or 過去のダウンロードイベントを参照し、現時点だと何回目のダウンロードなのかを都度計算するといったロジックが必要になる為、AWSネイティブな機能ではこれは実現できない。
※ニッチな要件なので、おそらく、今後もこのような機能は実装されないと思われ?
構成
Dynamoを使って、ダウンロード回数を記録する構成案も当初は考えたが、S3上のファイルのダウンロードを個別で監査ログとして残す必要もあったので、この監査ログを活かす形で、ダウンロードの度に所定の日時(回数のリセット日時)〜の監査ログ件数(ダウンロード回数)
をカウントし、所定の回数を超えていた場合、アラートを発報という構成にした。
サンプルコード
コメントも入れてないし、もっと良い書き方があった気がする
import json
import logging
import json
import base64
import gzip
import datetime
import pandas as pd
import boto3
import os
log_group_name = os.environ['LOG_GROUP_NAME']
sns_topic_arn = os.environ['SNS_TOPIC_ARN']
sns_subject = "test"
threshold = 2
def datetime_to_millisec_timestamp(value):
return int(int(value.strftime('%s')) * 1000)
def parse_trigger_event(event):
decoded_data = base64.b64decode(event)
json_data = json.loads(gzip.decompress(decoded_data))
print("Trigger Event: " + json.dumps(json_data))
timestamp = json_data['logEvents'][0]['timestamp']
message = json_data['logEvents'][0]['message']
return timestamp, message
def get_last_monday():
week = [0, -1, -2, -3, -4, -5, -6]
today = datetime.date.today()
weekday = today.weekday()
reduce_days = week[weekday]
last_monday = today + datetime.timedelta(reduce_days)
return last_monday
def count_events(end_time):
client = boto3.client('logs','ap-northeast-1')
is_finished = False
next_token = None
events = []
alert = False
start_time = pd.to_datetime(datetime.datetime.strftime(get_last_monday(), '%Y-%m-%d') + "T" + "1:00:00", utc=True).tz_convert('Asia/Tokyo')
end_time = pd.to_datetime(end_time, unit='ms', utc=True).tz_convert('Asia/Tokyo')
print("Time range to count events: " + str(start_time) + " ~ "+ str(end_time))
while not is_finished:
args = {
"logGroupName": log_group_name,
}
if next_token is not None:
args["nextToken"] = next_token
else:
args["startTime"] = datetime_to_millisec_timestamp(start_time)
args["endTime"] = datetime_to_millisec_timestamp(end_time)
response = client.filter_log_events(**args)
for event in response["events"]:
events.append(event)
if len(events) >= threshold:
alert = True
break
else:
if "nextToken" not in response:
is_finished = True
else:
next_token = response["nextToken"]
continue
break
return alert
def publish_sns(message):
client = boto3.client('sns','ap-northeast-1')
args = {
'TopicArn': sns_topic_arn,
'Subject': sns_subject,
'Message': message
}
response = client.publish(**args)
print("Publish SNS: " + json.dumps(response))
def lambda_handler(event, context):
end_time, message = parse_trigger_event(event['awslogs']['data'])
if count_events(end_time):
publish_sns(message)
Discussion