Amazon RDS: ログを定期的に S3 に保存する
概要
Amazon RDS にはデータベースのログを CloudWatch Logs に保存することができますが、特に監査ログはアクセス量に応じて結構なコストになる場合があります。CloudWatch Logs の代わりに S3 バケットに保存することでコスト削減が可能です。マネージド機能に比べるとどうしてもログが欠落する可能性は高くなる点に注意は必要と思われますが、実現方法を整理してみました。
DB インスタンスに保存されるログファイルを Lambda で定期的に取得し、S3 バケットに保存します。
ログファイルの保存
DB インスタンス一覧を取得
取得対象となる DB インスタンスが固定の場合はこの手順は不要です。
特定のクラスター内で増減する場合やすべての DB インスタンスに対してログを取得したい場合は、DescribeDBInstances API を使用して一覧を取得します。一度に取得できるインスタンス数に上限があるため、必要に応じてページングします。
取得するインスタンスの条件は Filters
パラメータを使用します。db-cluster-id
を指定することで特定のクラスターに属するインスタンスのみを取得することができます。
例)
def rds_list_db_instances(rds_client, cluster_id: Optional[str]) -> Iterable[dict[str, any]]:
params = {}
if cluster_id:
params["Filters"] = [{"Name": "db-cluster-id", "Values": [cluster_id]}]
while True:
response = rds_client.describe_db_instances(**params)
yield from response["DBInstances"]
if "Marker" not in response:
break
params["Marker"] = response["Marker"]
ログファイル一覧を取得
各 DB インスタンスに保存されているログファイルの一覧を取得するには DescribeDBLogFiles API を使用します。一度に取得できるログファイル数に上限があるため、必要に応じてページングします。
取得するログファイルを絞り込むために FileLastWritten
と FilenameContains
パラメータが使用できます。FileLastWritten
に指定した日時以降に更新されたログファイルのみ、FilenameContains
に指定した文字列を含むログファイルのみを取得します。前回処理時のタイムスタンプを保持しておき、それを FileLastWritten
に指定して前回処理から更新されたファイルのみを取得してアップロードすることにしました。
例)
def rds_list_db_log_files(rds_client, db_instance_id: str, start_datetime: datetime, filename_contains: str) \
-> Iterable[dict[str, any]]:
params = {
"DBInstanceIdentifier": db_instance_id,
"FileLastWritten": int(start_datetime.timestamp()) * 1000,
"FilenameContains": filename_contains,
}
while True:
response = rds_client.describe_db_log_files(**params)
yield from response["DescribeDBLogFiles"]
if "Marker" not in response:
break
params["Marker"] = response["Marker"]
ログファイルを DB インスタンスからダウンロード
各ログファイルをダウンロードする API には DownloadDBLogFilePortion と DownloadCompleteDBLogFile のふたつが存在します。前者は AWS SDK でサポートされていますがファイル全体をダウンロードするには向いていなかったため、AWS SDK ではサポートされていない後者の API を使用します。
参考) https://qiita.com/Batchi/items/9c97b15f183c41cd0f6f
def rds_download_db_log_file(session, f, region_name: str, db_instance_id: str, log_file_name: str):
host = f"rds.{region_name}.amazonaws.com"
url = f"https://{host}/v13/downloadCompleteLogFile/{db_instance_id}/{log_file_name}"
aws_req = AWSRequest(method="GET", url=url)
credentials = session.get_credentials()
sigv4_auth = auth.SigV4Auth(credentials, "rds", region_name)
sigv4_auth.add_auth(aws_req)
req = request.Request(url, headers={
"Authorization": aws_req.headers["Authorization"],
"Host": host,
"X-Amz-Date": aws_req.context["timestamp"],
"X-Amz-Security-Token": credentials.token,
})
with request.urlopen(req) as response:
while chunk := response.read(CHUNK_SIZE):
f.write(chunk)
補足
- DownloadDBLogFilePortion API には一度に取得できるサイズに上限があり、ページングすることもできるようですが、そのまま結合しても正しくログ全体を取得することができませんでした。
- ログファイルのサイズによっては一括で読み込むと Lambda 上でメモリー不足になる可能性があるため、ある程度のチャンクごとに読み込みながら、ファイルに保存します。
ログファイルを S3 バケットへアップロード
取得したログファイルを gzip で圧縮してアップロードします。
compressed_data = tempfile.TemporaryFile()
with gzip.GzipFile(fileobj=compressed_data, mode="wb") as f:
while chunk := log_data.read(CHUNK_SIZE):
f.write(chunk)
compressed_data.seek(0)
s3_client.upload_fileobj(
Fileobj=compressed_data,
Bucket=bucket,
Key=key,
ExtraArgs={
"Metadata": {
"last-written": str(db_log_file["LastWritten"])
},
"StorageClass": "GLACIER_IR",
},
)
補足
- ログファイルのサイズによっては一括で読み込むと Lambda 上でメモリー不足になる可能性があるため、ある程度のチャンクごとに読み込みながら、圧縮をおこないます。
- オブジェクトメタデータの
last-written
に、ログファイル一覧で取得した最終更新日時を保存しておきます。これは後述のタイムアウト対策で使用します。
日付ごとにアップロード先のパスを分ける
アップロードしたログを Athena で参照する場合、コストを考えるとパーティション化しておいたほうが良さそうです。そのため、アップロード先の S3 パスに年月日で付与して区切ります。使用するデータベースエンジンによって異なると思いますが、例えば Aurora MySQL の場合、ログファイル名は audit.log.X.yyyy-mm-dd-HH-MM.n のような形式になっているため、そこから yyyy/mm/dd/ というパスを生成して保存しすることにしました。
def log_file_object_prefix(s3_prefix: str, db_instance_id: str, log_file_name: str) -> str:
prefix = s3_prefix
m = re.search(\.log\.[0-9]\.([0-9]{4})-([0-9]{2})-([0-9]{2})-[0-9]{2}-[0-9]{2}\.[0-9]$, log_file_name)
if m:
for k in m.groups():
prefix += f"{k}/"
return f"{prefix}{db_instance_id}"
Lambda の実行トリガーについて
Lambda 関数は基本的には一定間隔 (30 分毎など) で実行します。
オートスケーリングが設定されている場合、スケールインで DB インスタンスが終了するとその DB インスタンスに保存されていたログが失われることには注意が必要です。定期的にログを取得するだけだと、タイミングによってインスタンス削除直前のログが保存できない可能性があります。インスタンスがクラスターから切り離されたタイミングで RDS-EVENT-0385
(Cluster topology is updated.) イベントが発行されるため、これもトリガーに処理を実行し、インスタンスが削除される前にログを回収します。
参考) https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_Events.Messages.html
タイムアウト対策
一度に処理する量が多いと、Lambda の最大実行時間を超えてしまう可能性もあります。30 分毎に発生したログだけをアップロードできている場合はそこまで長引くことはないと思いますが、(たとえば何らかの原因によって失敗が続いた後など)ある程度長い期間のログをまとめて処理する必要があった場合や、ログファイルのサイズにも依存するので考慮はしておきたいところです。
前回処理時のタイムスタンプを保持しておいて前回基準で更新されたファイルのみを取得してアップロードする方式では、処理に失敗した場合にタイムスタンプを更新できず、毎回すべてやり直しになってしまいます。そこで、ファイルごとにアップロード済みかどうかをチェックしてスキップすることにしました。また、アップロード済みであってもログが更新されている場合もあるため、ログの最終更新日時を比較して最新かどうかをチェックします。
例)
# S3 バケット上に存在するオブジェクト一覧を取得
s3_objects = s3_list_objects(s3_client, s3_bucket, object_prefix)
for db_log_file in rds_list_db_log_files(rds_client, db_instance_id, start_datetime, filename_contains):
log_file_name = db_log_file["LogFileName"]
object_prefix = log_file_object_prefix(s3_prefix, db_instance_id, log_file_name)
object_key = f"{object_prefix}/{log_file_name}.gz"
if object_key in s3_objects:
head_response = s3_client.head_object(Bucket=s3_bucket, Key=object_key)
existing_last_written = int(head_response["Metadata"].get("last-written", "0"))
if existing_last_written == db_log_file["LastWritten"]:
print(": already exists.")
continue
# アップロード処理
エラー検知
いろいろ考慮はしましたが失敗する可能性はいつでもあります。処理に失敗した場合のアラートも設定して、ログが失われないよう対処できるようにしましょう!
Athena による Aurora MySQL 監査ログの参照
S3 バケットに保存したログは多数のファイルに分断されているため、CloudWatch Logs の代わりに閲覧・検索するには Amazon Athena を使用します。以下は Aurora MySQL の監査ログが保存された S3 バケットからクエリーする場合の設定方法です。
参考) Amazon Aurora MySQL DB クラスターでのアドバンストな監査の使用
設定
Aurora MySQL 監査ログを保存したバケットを参照するテーブルを以下のクエリーで作成します。ログファイルの保存先を s3://(バケット名)/audit/{year}/{month}/{day}/**/*.zip としている前提です。
CREATE EXTERNAL TABLE IF NOT EXISTS `(Athena データベース名)`.`(Athena テーブル名)` (
`timestamp` bigint,
`serverhost` string,
`username` string,
`host` string,
`connectionid` bigint,
`queryid` bigint,
`operation` string,
`database` string,
`object` string,
`retcode` int
)
PARTITIONED BY (`year` int, `month` int, `day` int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ('field.delim' = ',', 'quoteChar' = "'")
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://(バケット名)/audit/'
TBLPROPERTIES (
'EXTERNAL' = 'TRUE',
'classification' = 'csv',
'projection.enabled' = 'true',
'projection.year.type' = 'integer',
'projection.year.digits' = '4',
'projection.year.range' = '2021,2100',
'projection.month.type' = 'integer',
'projection.month.digits' = '2',
'projection.month.range' = '1,12',
'projection.day.type' = 'integer',
'projection.day.digits' = '2',
'projection.day.range' = '1,31',
'storage.location.template' = 's3://(バケット名)/audit/${year}/${month}/${day}'
);
注意点
- クエリー (object) は「シングルクォート」でエスケープされています。
クエリー
作成したテーブルに対して以下のようなクエリーでログを検索することができます。パーティション (year, month, day) を WHERE 句の条件に指定することでデータ量を抑えることができます。
SELECT from_unixtime(timestamp / 1000000) AS datetime, object
FROM "(Athena データベース名)"."(Athena テーブル名)"
WHERE operation = 'QUERY'
AND database = '(データベース名)'
AND year = 2025
AND month = 2
AND day = 3
AND timestamp >= to_unixtime(date_parse('2025/02/03 00:00:00', '%Y/%m/%d %H:%i:%s')) * 1000000
AND timestamp < to_unixtime(date_parse('2025/02/03 09:00:00', '%Y/%m/%d %H:%i:%s')) * 1000000
ORDER BY timestamp DESC
LIMIT 1000;
参考
以下の記事を参考にさせていただきました。
Discussion