BigQueryの監査ログをBigQueryAuditMetadataの新形式に移行した
はじめに
ラクスル株式会社では、長らく業務委託のみでデータエンジニア社員0人時代が続き、最近私を含め2人のデータエンジニア社員が入社し、データ基盤周りの刷新を進めています。
先日、BigQuery の管理アクティビティ監査ログを新スキーマに移行したので、備忘録としてまとめておきます。
ことのはじまり
- 弊社では Looker Studio を使ったり、Connected Sheet を使ってスプレッドシートから BigQuery にアクセスしているケースがある
- 以前から BigQuery 監査ログを Log Router の Logging Sink を使って BigQuery テーブルとして保存しており、
metadataJson
カラムからクエリ実行元となるスプレッドシートの ID などを取得し、アラートやデータリネージの可視化に利用しようとした - しかし、確認したところ、当時の監査ログには
metadataJson
カラムはなく、代わりにservicedata_v1_bigquery
といったカラムがあり、他にもカラム構造が部分的に違い、どうやら監査ログのスキーマが古いことが判明
ということで、最新のスキーマの監査ログに移行しようということになりました。
ちなみに、BigQuery 監査ログには AuditData
形式の旧スキーマと、BigQueryAuditMetadata
形式の新スキーマがあり、2020年には 公式ブログ で紹介しているように新形式を利用できていたので、大分今更ながらの移行になりました(汗)
やりたいこと
- BigQuery の管理アクティビティ監査ログを新スキーマに移行し、metadataJson などのカラムを利用できるようにする
- 新と旧のスキーマの監査ログが別テーブルだと使い勝手が悪く、統合的にみれるようにする
移行手順
移行にはいくつかの手順を踏む必要があり、下記のように移行しました。
1. 監査ログの Sink を新規追加
公式 Doc にあるように、下記コマンドで BigQuery テーブルとして保存する Logging Sink を新規に追加しました。追加には roles/logging.configWriter
の権限が必要です。
gcloud logging sinks create audit_log_bq_sink bigquery.googleapis.com/projects/[project_name]/datasets/lake_cloud_logging --log-filter='protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"'
--use-partitioned-tables オプションで、デフォルトの日付別テーブルでなく、パーティションテーブルで保存することもできます (詳細は公式 Doc)。
また、データセットに対して、上記コマンドで作られた Logging Sink のサービスアカウントが書き込みできるように WRITER ロールのアクセス権を設定する必要があります。
ちなみに、Terraform で追加する場合は下記のようになります。
resource "google_bigquery_dataset" "cloud_logging" {
dataset_id = "lake_cloud_logging"
location = var.location
}
resource "google_bigquery_dataset_access" "cloud_logging_access" {
dataset_id = google_bigquery_dataset.cloud_logging.dataset_id
role = "WRITER"
user_by_email = split(":", google_logging_project_sink.audit_log_bq_sink.writer_identity)[1]
}
resource "google_logging_project_sink" "audit_log_bq_sink" {
name = "audit_log_bq_sink"
destination = "bigquery.googleapis.com/${google_bigquery_dataset.cloud_logging.id}"
filter = "protoPayload.metadata.\"@type\"=\"type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata\""
unique_writer_identity = true
}
2. 既存の監査ログの Sink 停止
新スキーマの監査ログが問題なく保存され始めたことを確認後、既存の監査ログの Log Router の Sink を停止しました。
3. 既存の監査ログの移動
Sink 停止後、既存の監査ログを新監査ログと同じデータセットに移動したいため、テーブルをクローンして新データセット配下にテーブルを作り直しました。
なお、日付別テーブルだと日付分のクローンを実行する必要があり時間や確認の手間がかかるため、日付別テーブルをパーティションテーブルに変換してからクローンを実行するようにしました。これで既存の監査ログのデータセットを削除するなどして整備ができます。BigQueryではデーセット名の変更ができないためこのような対応にしました。
bq partition --time_partitioning_type=DAY [source_dataset].cloudaudit_googleapis_com_data_access_ [source_dataset].cloudaudit_googleapis_com_data_access
bq cp --clone [source_dataset].cloudaudit_googleapis_com_data_access [destination_dataset].cloudaudit_googleapis_com_data_access
4. 新旧監査ログを見れる統合ビューを作成
最後に、新と旧のスキーマの監査ログが別々のテーブルだと、新旧の期間に跨る調査などしづらいため、下記のようなビューを作りました。
create or replace view lake_cloud_logging.audit_log_bq as
with old_audit_log as (
select
date(_partitiondate) as event_date,
logName,
struct(
resource.type,
struct(
cast(null as string) as location,
resource.labels.project_id,
cast(null as string) as dataset_id
) as labels
) as `resource`,
struct(
protopayload_auditlog.serviceName,
protopayload_auditlog.methodName,
protopayload_auditlog.resourceName,
protopayload_auditlog.resourceLocation,
protopayload_auditlog.numResponseItems,
protopayload_auditlog.status,
struct(
protopayload_auditlog.authenticationInfo.principalEmail,
protopayload_auditlog.authenticationInfo.authoritySelector,
protopayload_auditlog.authenticationInfo.serviceAccountKeyName,
array(
select
struct(
principalSubject,
firstPartyPrincipal
)
from unnest(protopayload_auditlog.authenticationInfo.serviceAccountDelegationInfo)
) as serviceAccountDelegationInfo,
protopayload_auditlog.authenticationInfo.principalSubject,
protopayload_auditlog.authenticationInfo.serviceDelegationHistory
) as authenticationInfo,
array(
select
struct(
resource,
permission,
granted,
resourceAttributes,
cast(null as string) as permissionType
)
from unnest(protopayload_auditlog.authorizationInfo)
) as authorizationInfo,
protopayload_auditlog.policyViolationInfo,
struct(
protopayload_auditlog.requestMetadata.callerIp,
protopayload_auditlog.requestMetadata.callerSuppliedUserAgent,
protopayload_auditlog.requestMetadata.callerNetwork,
struct(
protopayload_auditlog.requestMetadata.requestAttributes.id,
protopayload_auditlog.requestMetadata.requestAttributes.method,
protopayload_auditlog.requestMetadata.requestAttributes.headers,
protopayload_auditlog.requestMetadata.requestAttributes.path,
protopayload_auditlog.requestMetadata.requestAttributes.host,
protopayload_auditlog.requestMetadata.requestAttributes.scheme,
protopayload_auditlog.requestMetadata.requestAttributes.query,
protopayload_auditlog.requestMetadata.requestAttributes.time,
protopayload_auditlog.requestMetadata.requestAttributes.size,
protopayload_auditlog.requestMetadata.requestAttributes.protocol,
protopayload_auditlog.requestMetadata.requestAttributes.reason,
struct(
protopayload_auditlog.requestMetadata.requestAttributes.auth.principal,
protopayload_auditlog.requestMetadata.requestAttributes.auth.audiences,
protopayload_auditlog.requestMetadata.requestAttributes.auth.presenter,
protopayload_auditlog.requestMetadata.requestAttributes.auth.accessLevels,
cast(null as string) as credentialId
) as auth
) as requestAttributes,
protopayload_auditlog.requestMetadata.destinationAttributes
) as requestMetadata,
cast(null as string) as metadataJson,
to_json_string(protopayload_auditlog.servicedata_v1_bigquery) as servicedata_v1_bigquery_json
) as protopayload_auditlog,
textPayload,
timestamp,
receiveTimestamp,
severity,
insertId,
httpRequest,
operation,
trace,
spanId,
traceSampled,
sourceLocation,
split,
[struct(cast(null as string) as id)] as errorGroups,
from
`lake_cloud_logging.cloudaudit_googleapis_com_data_access`
),
new_audit_log as (
select
parse_date('%Y%m%d', _table_suffix) as event_date,
logName,
resource,
struct(
protopayload_auditlog.serviceName,
protopayload_auditlog.methodName,
protopayload_auditlog.resourceName,
protopayload_auditlog.resourceLocation,
protopayload_auditlog.numResponseItems,
protopayload_auditlog.status,
protopayload_auditlog.authenticationInfo,
protopayload_auditlog.authorizationInfo,
protopayload_auditlog.policyViolationInfo,
protopayload_auditlog.requestMetadata,
protopayload_auditlog.metadataJson,
cast(null as string) as servicedata_v1_bigquery_json
) as protopayload_auditlog,
textPayload,
timestamp,
receiveTimestamp,
severity,
insertId,
httpRequest,
operation,
trace,
spanId,
traceSampled,
sourceLocation,
split,
errorGroups,
from
`lake_cloud_logging.cloudaudit_googleapis_com_data_access_*`
),
main as (
select * from old_audit_log where event_date < 'yyyy-mm-dd' -- Specify switched date
union all
select * from new_audit_log where event_date >= 'yyyy-mm-dd' -- Specify switched date
)
select * from main
metadataJson
と servicedata_v1_bigquery
配下の変換については、項目数が多く構造も大きく異なり網羅的な対応が難しかったため、現時点では見送りとしました。
また、公式の Migration Guide にあるように、新・旧のスキーマで同じカラム名でも値が変わっているものもあり、metadataJson
配下の特定の項目 (totalBilledBytes, totalSlotMs など) をよく利用するなら、ユースケースに応じたビューを追加していくのがよさそうなので、今後必要に応じて対応しようと思います。
なお、本番環境と開発環境で既存の監査ログを比較したところ、Struct型のカラム内でフィールドの順番が異なるケースがありました。私達の環境がたまたまで広く一般に当てはまるかわかりませんが、このように環境によっては微妙にスキーマが異なることがあるかもしれません。
まとめ
Logging Sink でテーブル化した監査ログのスキーマは自動更新されないため、新規に Logging Sink を追加して新スキーマの監査ログを利用できるようにしました。
また、カラム構造が異なるためにそれぞれ別テーブルとなってしまった新・旧スキーマの監査ログを横断的に見れるように統合ビューを作成しました。
監査ログの新スキーマへの移行もそうですが、特に統合ビュー作成では意外にネット上で活用できる情報が多くなく、自力で作ることになったため、この記事が参考になりどなたかの役に立てば嬉しいです。
今後の対応として、本監査ログだけでなく Workspace の監査ログも追加し dbt exposures なども利用しつつ、データがどう使われているかデータリネージをより強化していきたいです。道半ばですが、着実に進めていきます。
データチーム拡大中につき、興味を持たれた方ぜひカジュアル面談からでもお話しましょう!
Discussion