🫵

監査ログから特定した GCS バケット上のファイル作成者をカスタムメタデータに設定する

2025/02/20に公開

概要

GCS バケットにファイルをアップロードしたユーザーを Cloud Audit Logs から特定し、そのファイルのカスタムメタデータに設定する処理を Cloud Run functions (Python) 上に実装します。

想定読者

  • Python の開発経験のある方
  • GCS (Google Cloud Storage) の使用経験のある方
  • Cloud Logging の Logs explore の使用経験のある方
  • Cloud Audit Logs 活用の一例を知りたい方

※ Cloud Run functions のサービスアカウント準備、デプロイ方法などは省略しています

環境

ローカル端末

  • OS: macOS Sonoma 14.4.1
  • チップ: Apple M3 Max
  • IDE: PyCharm 2024.3
  • Python: 3.11.4

Cloud Audit Logs (監査ログ) とは

Google Cloud リソース内で発生した操作やアクティビティを記録し、「誰が、どこで、いつ、何をしたか」を追跡するのに役立ちます。
https://cloud.google.com/logging/docs/audit

監査ログは4種類ありますが、今回は GCS バケット上のオブジェクト作成操作が記録されるようにしたいので、データアクセス監査ログDATA_WRITE を有効化します。

データアクセス監査ログの有効化

Cloud コンソールから以下の手順を実施

  1. 左メニューから [IAM & Admin] > [Audit logs] を選択
  2. [Google Cloud Storage] にチェック
  3. サブタイプの [Date write] をチェックし、SAVEボタンを押下

GCS (Google Cloud Storage) のカスタムメタデータとは

GCS に保存されるオブジェクトには Content-Type (text/csvなど) や generation (バージョニングで使用する世代番号) といった固定キーメタデータが設定されます。
但し、固定キーメタデータ内にはそのオブジェクトを作成したユーザー情報は含まれていません。
そこで key : value 形式で自由にキーや値の設定が可能なカスタムメタデータを使用し、データアクセス監査ログから特定したオブジェクト作成者を設定します。
https://cloud.google.com/storage/docs/metadata#custom-metadata

Cloud Run functions にデプロイする資材 (Python) の準備

Cloud Run functions 上で動かす Python コードを用意します。

ディレクトリ構成

audit_log_tracker
├── main.py
└── requirements.txt

各ファイルの中身

requirements.txt
functions-framework==3.8.2
google-cloud-logging~=3.10.0
google-cloud-storage~=2.19.0
main.py
import logging
import os
import time
from datetime import datetime, timedelta

import functions_framework
from google.cloud import storage, logging_v2


def setup_logger():
    """ロガーの設定"""
    logging.basicConfig(format='[%(asctime)s][%(levelname)s] %(message)s', level=logging.INFO)
    return logging.getLogger(__name__)


@functions_framework.cloud_event
def track_uploader(cloud_event):
    """GCSバケットのオブジェクト作成者を特定してカスタムメタデータに設定する"""

    logger = setup_logger()

    # GCSイベントデータを取得
    event_data = cloud_event.data
    bucket = event_data["bucket"]
    object_path = event_data["name"]
    time_created = event_data["timeCreated"]
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")

    object_uri = f"gs://{bucket}/{object_path}"

    logger.info("==== Start tracking creator for %s, created at %s", object_uri, time_created)

    # 作成日時の前後1分を範囲設定
    creation_time = datetime.fromisoformat(time_created.replace('Z', '+00:00'))
    start_time = creation_time - timedelta(minutes=1)
    end_time = creation_time + timedelta(minutes=1)

    # 監査ログの検索クエリを作成
    filter_str = (
        'resource.type="gcs_bucket" AND '
        'protoPayload.methodName="storage.objects.create" AND '
        f'resource.labels.bucket_name="{bucket}" AND '
        f'protoPayload.resourceName="projects/_/buckets/{bucket}/objects/{object_path}" AND '
        f'timestamp >= "{start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}" AND '
        f'timestamp <= "{end_time.strftime("%Y-%m-%dT%H:%M:%SZ")}"'
    )
    logger.info("==== Search query: %s", filter_str)

    # リトライロジックの設定
    retry_count = 0
    max_retries = 5
    base_delay = 5

    email = None
    logging_client = logging_v2.Client()

    while retry_count <= max_retries:
        logger.info("==== Searching audit logs... (attempt %d of %d)", retry_count + 1, max_retries + 1)

        # 監査ログを検索
        entries = logging_client.list_entries(
            resource_names=[f"projects/{project_id}"],
            filter_=filter_str,
            order_by=logging_v2.DESCENDING
        )

        entries_list = list(entries)
        if entries_list:
            # 監査ログの最初のエントリからオブジェクト作成者のメールアドレスを取得
            email = entries_list[0].payload.get("authenticationInfo", {}).get("principalEmail")
            logger.info("==== Found creator: %s", email)
            break

        # 監査ログが取得できなかったら指定回数リトライする
        if retry_count < max_retries:
            # 指数関数的なバックオフを計算(5秒、10秒、20秒...)
            delay = base_delay * (2 ** retry_count)
            logger.info("==== No entries found. Retrying in %d seconds...", delay)
            time.sleep(delay)
            retry_count += 1
        else:
            # 最大リトライ回数に達した場合はタイムアウト
            logger.error("==== No %s creation events found after %d attempts.", object_uri, max_retries + 1)
            return {"Object creation event not found", 404}

    # 監査ログからメールアドレスが取得できた場合、オブジェクトのカスタムメタデータに設定
    logger.info("==== Updating object metadata...")
    storage_client = storage.Client()
    blob = storage_client.bucket(bucket).get_blob(object_path)

    metadata = blob.metadata or {}
    metadata["created_by"] = email
    blob.metadata = metadata
    blob.patch()

    logger.info("==== Successfully updated object metadata")
    return {"success", 200}

logging_v2.client ライブラリを使用し、条件に該当する監査ログを降順で取得することで最新のログからユーザーを特定します。
リトライロジックを挟むことで、GCSのオブジェクト作成イベントと監査ログの記録の間にタイムラグが発生した際に対処できるようにしています。
https://cloud.google.com/python/docs/reference/logging/latest/google.cloud.logging_v2.client.Client#google_cloud_logging_v2_client_Client_list_entries

処理イメージ

GCS バケットへのファイルアップロードをトリガーに Cloud Run functions が起動して監査ログからユーザー特定 -> オブジェクトのカスタムメタデータにユーザーアカウントを設定するイメージです。

Cloud Run functions にデプロイ

本記事では Cloud Run functions のデプロイ手順の詳細は省きますが、トリガーは対象の GCS バケットの google.cloud.storage.object.v1.finalizedを選択します。

また、Cloud Run functions を実行するサービスアカウントには下記の事前定義ロールを付与します。

  • Eventarc Event Receiver (roles/eventarc.eventReceiver)
  • Cloud Run Service Invoker (roles/run.servicesInvoker)
  • Private Logs Viewer (roles/logging.privateLogViewer)
  • Storage Object User (roles/storage.objectUser)

動作確認

  1. 対象の GCS バケットにファイルをアップロード
  2. ファイルのカスタムメタデータにcreated_byキーと値にアップロードしたユーザーアカウントが設定されていることを確認
  3. Cloud Run functions のログからも、監査ログからユーザーを特定してファイルのカスタムメタデータに設定できていることを確認。3回目のリトライで取得できたようです

logging_v2.client ライブラリで使用したクエリは Logs Explorer でもそのまま使用できます。

main.py#38-47
    # 監査ログの検索クエリを作成
    filter_str = (
        'resource.type="gcs_bucket" AND '
        'protoPayload.methodName="storage.objects.create" AND '
        f'resource.labels.bucket_name="{bucket}" AND '
        f'protoPayload.resourceName="projects/_/buckets/{bucket}/objects/{object_path}" AND '
        f'timestamp >= "{start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}" AND '
        f'timestamp <= "{end_time.strftime("%Y-%m-%dT%H:%M:%SZ")}"'
    )
    logger.info("==== Search query: %s", filter_str)

Discussion