🧐

BigQueryの監査ログをBigQueryAuditMetadataの新形式に移行した

2024/08/05に公開

はじめに

ラクスル株式会社では、長らく業務委託のみでデータエンジニア社員0人時代が続き、最近私を含め2人のデータエンジニア社員が入社し、データ基盤周りの刷新を進めています。
先日、BigQuery の管理アクティビティ監査ログを新スキーマに移行したので、備忘録としてまとめておきます。

ことのはじまり

  • 弊社では Looker Studio を使ったり、Connected Sheet を使ってスプレッドシートから BigQuery にアクセスしているケースがある
  • 以前から BigQuery 監査ログを Log Router の Logging Sink を使って BigQuery テーブルとして保存しており、metadataJson カラムからクエリ実行元となるスプレッドシートの ID などを取得し、アラートやデータリネージの可視化に利用しようとした
  • しかし、確認したところ、当時の監査ログには metadataJson カラムはなく、代わりに servicedata_v1_bigquery といったカラムがあり、他にもカラム構造が部分的に違い、どうやら監査ログのスキーマが古いことが判明

ということで、最新のスキーマの監査ログに移行しようということになりました。

ちなみに、BigQuery 監査ログには AuditData 形式の旧スキーマと、BigQueryAuditMetadata 形式の新スキーマがあり、2020年には 公式ブログ で紹介しているように新形式を利用できていたので、大分今更ながらの移行になりました(汗)

やりたいこと

  • BigQuery の管理アクティビティ監査ログを新スキーマに移行し、metadataJson などのカラムを利用できるようにする
  • 新と旧のスキーマの監査ログが別テーブルだと使い勝手が悪く、統合的にみれるようにする

移行手順

移行にはいくつかの手順を踏む必要があり、下記のように移行しました。

1. 監査ログの Sink を新規追加

公式 Doc にあるように、下記コマンドで BigQuery テーブルとして保存する Logging Sink を新規に追加しました。追加には roles/logging.configWriter の権限が必要です。

gcloud logging sinks create audit_log_bq_sink bigquery.googleapis.com/projects/[project_name]/datasets/lake_cloud_logging --log-filter='protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"'

--use-partitioned-tables オプションで、デフォルトの日付別テーブルでなく、パーティションテーブルで保存することもできます (詳細は公式 Doc)。

また、データセットに対して、上記コマンドで作られた Logging Sink のサービスアカウントが書き込みできるように WRITER ロールのアクセス権を設定する必要があります。

ちなみに、Terraform で追加する場合は下記のようになります。

resource "google_bigquery_dataset" "cloud_logging" {
  dataset_id = "lake_cloud_logging"
  location   = var.location
}

resource "google_bigquery_dataset_access" "cloud_logging_access" {
  dataset_id    = google_bigquery_dataset.cloud_logging.dataset_id
  role          = "WRITER"
  user_by_email = split(":", google_logging_project_sink.audit_log_bq_sink.writer_identity)[1]
}

resource "google_logging_project_sink" "audit_log_bq_sink" {
  name                   = "audit_log_bq_sink"
  destination            = "bigquery.googleapis.com/${google_bigquery_dataset.cloud_logging.id}"
  filter                 = "protoPayload.metadata.\"@type\"=\"type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata\""
  unique_writer_identity = true
}

2. 既存の監査ログの Sink 停止

新スキーマの監査ログが問題なく保存され始めたことを確認後、既存の監査ログの Log Router の Sink を停止しました。

3. 既存の監査ログの移動

Sink 停止後、既存の監査ログを新監査ログと同じデータセットに移動したいため、テーブルをクローンして新データセット配下にテーブルを作り直しました。

なお、日付別テーブルだと日付分のクローンを実行する必要があり時間や確認の手間がかかるため、日付別テーブルをパーティションテーブルに変換してからクローンを実行するようにしました。これで既存の監査ログのデータセットを削除するなどして整備ができます。BigQueryではデーセット名の変更ができないためこのような対応にしました。

bq partition --time_partitioning_type=DAY [source_dataset].cloudaudit_googleapis_com_data_access_ [source_dataset].cloudaudit_googleapis_com_data_access

bq cp --clone [source_dataset].cloudaudit_googleapis_com_data_access [destination_dataset].cloudaudit_googleapis_com_data_access

4. 新旧監査ログを見れる統合ビューを作成

最後に、新と旧のスキーマの監査ログが別々のテーブルだと、新旧の期間に跨る調査などしづらいため、下記のようなビューを作りました。

create or replace view lake_cloud_logging.audit_log_bq as
with old_audit_log as (
  select
    date(_partitiondate) as event_date,
    logName,
    struct(
      resource.type,
      struct(
        cast(null as string) as location,
        resource.labels.project_id,
        cast(null as string) as dataset_id
      ) as labels
    ) as `resource`,
    struct(
      protopayload_auditlog.serviceName,
      protopayload_auditlog.methodName,
      protopayload_auditlog.resourceName,
      protopayload_auditlog.resourceLocation,
      protopayload_auditlog.numResponseItems,
      protopayload_auditlog.status,
      struct(
        protopayload_auditlog.authenticationInfo.principalEmail,
        protopayload_auditlog.authenticationInfo.authoritySelector,
        protopayload_auditlog.authenticationInfo.serviceAccountKeyName,
        array(
          select
            struct(
              principalSubject,
              firstPartyPrincipal
            )
          from unnest(protopayload_auditlog.authenticationInfo.serviceAccountDelegationInfo)
        ) as serviceAccountDelegationInfo,
        protopayload_auditlog.authenticationInfo.principalSubject,
        protopayload_auditlog.authenticationInfo.serviceDelegationHistory
      ) as authenticationInfo,
      array(
        select
          struct(
            resource,
            permission,
            granted,
            resourceAttributes,
            cast(null as string) as permissionType
          )
        from unnest(protopayload_auditlog.authorizationInfo)
      ) as authorizationInfo,
      protopayload_auditlog.policyViolationInfo,
      struct(
        protopayload_auditlog.requestMetadata.callerIp,
        protopayload_auditlog.requestMetadata.callerSuppliedUserAgent,
        protopayload_auditlog.requestMetadata.callerNetwork,
        struct(
          protopayload_auditlog.requestMetadata.requestAttributes.id,
          protopayload_auditlog.requestMetadata.requestAttributes.method,
          protopayload_auditlog.requestMetadata.requestAttributes.headers,
          protopayload_auditlog.requestMetadata.requestAttributes.path,
          protopayload_auditlog.requestMetadata.requestAttributes.host,
          protopayload_auditlog.requestMetadata.requestAttributes.scheme,
          protopayload_auditlog.requestMetadata.requestAttributes.query,
          protopayload_auditlog.requestMetadata.requestAttributes.time,
          protopayload_auditlog.requestMetadata.requestAttributes.size,
          protopayload_auditlog.requestMetadata.requestAttributes.protocol,
          protopayload_auditlog.requestMetadata.requestAttributes.reason,
          struct(
            protopayload_auditlog.requestMetadata.requestAttributes.auth.principal,
            protopayload_auditlog.requestMetadata.requestAttributes.auth.audiences,
            protopayload_auditlog.requestMetadata.requestAttributes.auth.presenter,
            protopayload_auditlog.requestMetadata.requestAttributes.auth.accessLevels,
            cast(null as string) as credentialId
          ) as auth
        ) as requestAttributes,
        protopayload_auditlog.requestMetadata.destinationAttributes
      ) as requestMetadata,
      cast(null as string) as metadataJson,
      to_json_string(protopayload_auditlog.servicedata_v1_bigquery) as servicedata_v1_bigquery_json
    ) as protopayload_auditlog,
    textPayload,
    timestamp,
    receiveTimestamp,
    severity,
    insertId,
    httpRequest,
    operation,
    trace,
    spanId,
    traceSampled,
    sourceLocation,
    split,
    [struct(cast(null as string) as id)] as errorGroups,
  from
    `lake_cloud_logging.cloudaudit_googleapis_com_data_access`
),

new_audit_log as (
  select
    parse_date('%Y%m%d', _table_suffix) as event_date,
    logName,
    resource,
    struct(
      protopayload_auditlog.serviceName,
      protopayload_auditlog.methodName,
      protopayload_auditlog.resourceName,
      protopayload_auditlog.resourceLocation,
      protopayload_auditlog.numResponseItems,
      protopayload_auditlog.status,
      protopayload_auditlog.authenticationInfo,
      protopayload_auditlog.authorizationInfo,
      protopayload_auditlog.policyViolationInfo,
      protopayload_auditlog.requestMetadata,
      protopayload_auditlog.metadataJson,
      cast(null as string) as servicedata_v1_bigquery_json
    ) as protopayload_auditlog,
    textPayload,
    timestamp,
    receiveTimestamp,
    severity,
    insertId,
    httpRequest,
    operation,
    trace,
    spanId,
    traceSampled,
    sourceLocation,
    split,
    errorGroups,
  from
    `lake_cloud_logging.cloudaudit_googleapis_com_data_access_*`
),

main as (
  select * from old_audit_log where event_date < 'yyyy-mm-dd' -- Specify switched date
  union all
  select * from new_audit_log where event_date >= 'yyyy-mm-dd' -- Specify switched date
)

select * from main

metadataJsonservicedata_v1_bigquery 配下の変換については、項目数が多く構造も大きく異なり網羅的な対応が難しかったため、現時点では見送りとしました。

また、公式の Migration Guide にあるように、新・旧のスキーマで同じカラム名でも値が変わっているものもあり、metadataJson 配下の特定の項目 (totalBilledBytes, totalSlotMs など) をよく利用するなら、ユースケースに応じたビューを追加していくのがよさそうなので、今後必要に応じて対応しようと思います。

なお、本番環境と開発環境で既存の監査ログを比較したところ、Struct型のカラム内でフィールドの順番が異なるケースがありました。私達の環境がたまたまで広く一般に当てはまるかわかりませんが、このように環境によっては微妙にスキーマが異なることがあるかもしれません。

まとめ

Logging Sink でテーブル化した監査ログのスキーマは自動更新されないため、新規に Logging Sink を追加して新スキーマの監査ログを利用できるようにしました。

また、カラム構造が異なるためにそれぞれ別テーブルとなってしまった新・旧スキーマの監査ログを横断的に見れるように統合ビューを作成しました。

監査ログの新スキーマへの移行もそうですが、特に統合ビュー作成では意外にネット上で活用できる情報が多くなく、自力で作ることになったため、この記事が参考になりどなたかの役に立てば嬉しいです。

今後の対応として、本監査ログだけでなく Workspace の監査ログも追加し dbt exposures なども利用しつつ、データがどう使われているかデータリネージをより強化していきたいです。道半ばですが、着実に進めていきます。

データチーム拡大中につき、興味を持たれた方ぜひカジュアル面談からでもお話しましょう!

RAKSUL Data Analytics

Discussion