DLP スキャン結果を元に BigQuery ポリシータグを自動的に付与する仕組みを作った話

はじめに
BigQuery に格納されているデータの個人情報保護を強化するため、CIO の方と相談しながら自動的にポリシータグを付与する仕組みを構築しました。
この仕組みは約1年間安定稼働しており、新規テーブルやカラムの追加時も自動的に保護されるため、運用負荷がほぼゼロになっています。
本記事では、Google Cloud の DLP (Data Loss Prevention) API を使ってテーブルをスキャンし、検出結果をトリガーに Cloud Functions でポリシータグを自動設定する仕組みについて紹介します。
背景
課題
READYFOR では、Aurora MySQL のデータを Datastream を使って BigQuery にリアルタイムで連携しています。このデータには顧客の個人情報や機密情報が含まれていますが、そのままの状態では全社員がすべてのデータにアクセスできてしまいます。
適切にアクセス制御するには、どのカラムに個人情報が含まれるかを正確に把握し、適切なポリシータグを設定する必要があります。
当初は Terraform で BigQuery のスキーマを管理しており、ポリシータグも Terraform のコード内で手動で指定していました。しかし、テーブル数やカラム数が増えるにつれて、以下の課題が顕在化しました:
- Datastream 経由で連携されるデータのスキーマ変更に対応し続ける必要がある
- どのカラムに個人情報が含まれるか目視で確認する必要がある
- ポリシータグの付け忘れや誤設定のリスク
解決策
DLP で BigQuery テーブルを定期的にスキャンし、検出結果を Pub/Sub 経由で受け取り、Cloud Functions で自動的にポリシータグを付与する仕組みを実装しました。
DLP によるデータのマスキングや暗号化は行わず、検出された個人情報の種類(InfoType)を元に BigQuery のポリシータグを設定し、BigQuery のアクセス制御機能でデータを保護しています。
Google Cloud DLP (Data Loss Prevention) は、機密情報や個人情報を検出・保護するためのサービスです。データのスキャン、マスキング、暗号化など多機能ですが、今回は検出のみを活用しています。
アーキテクチャ
全体の流れ
- DLP サービスが BigQuery テーブルをスキャン
- スキャン結果を Pub/Sub トピックに publish
- Pub/Sub がトリガーとなり Cloud Functions が起動
- Cloud Functions が BigQuery のスキーマにポリシータグを設定
スキャン対象
以下のデータセットを対象にスキャンを実施しています:
-
lake_readyfor_db: 本番データベースのレプリケーション -
staging_*: ETL 処理の中間データ -
marts_*: dbt で加工したデータマート
データ量や機密性を考慮して、スキャン頻度や対象を調整しています。
resource "google_data_loss_prevention_discovery_config" "main" {
parent = "projects/${data.google_project.current.project_id}/locations/asia-northeast1"
location = "asia-northeast1"
status = "RUNNING"
display_name = "main"
inspect_templates = [
format(
"projects/%s/locations/global/inspectTemplates/%s",
data.google_project.current.project_id,
google_data_loss_prevention_inspect_template.main.template_id,
)
]
targets {
big_query_target {
cadence {
schema_modified_cadence {
frequency = "UPDATE_FREQUENCY_DAILY"
types = ["SCHEMA_NEW_COLUMNS", "SCHEMA_REMOVED_COLUMNS"]
}
table_modified_cadence {
frequency = "UPDATE_FREQUENCY_MONTHLY"
types = ["TABLE_MODIFIED_TIMESTAMP"]
}
inspect_template_modified_cadence {
frequency = "UPDATE_FREQUENCY_DAILY"
}
}
conditions {
created_after = null
type_collection = "BIG_QUERY_COLLECTION_ONLY_SUPPORTED_TYPES"
}
filter {
tables {
include_regexes {
patterns {
project_id_regex = data.google_project.current.project_id
dataset_id_regex = "lake_readyfor_db"
table_id_regex = ".*"
}
}
}
}
}
}
### lake_readyfor_db 以外の targets は省略
actions {
pub_sub_notification {
detail_of_message = "TABLE_PROFILE"
event = "NEW_PROFILE"
topic = google_pubsub_topic.bq_policy_tagger.id
}
}
actions {
pub_sub_notification {
detail_of_message = "TABLE_PROFILE"
event = "CHANGED_PROFILE"
topic = google_pubsub_topic.bq_policy_tagger.id
}
}
actions {
pub_sub_notification {
detail_of_message = "TABLE_PROFILE"
event = "SCORE_INCREASED"
topic = google_pubsub_topic.bq_policy_tagger.id
}
}
actions {
pub_sub_notification {
detail_of_message = "TABLE_PROFILE"
event = "ERROR_CHANGED"
topic = google_pubsub_topic.bq_policy_tagger.id
}
}
# Google provider では 「タグとして Dataplex に送信する」はサポートされていないので Google Cloud のコンソールから設定する
}
実装の詳細
DLP の Inspect Template
DLP の検出ルールとして、100種類以上の InfoType を設定しています。
日本特有の個人情報(マイナンバー、銀行口座、運転免許証など)から、メールアドレス、電話番号、クレジットカード番号、API キーまで幅広くカバーしています。
# google_data_loss_prevention.tf より抜粋
resource "google_data_loss_prevention_inspect_template" "main" {
parent = "projects/${data.google_project.current.project_id}"
template_id = "main"
display_name = "main"
description = "デフォルト構成にいくつかの検出ルールを追加したもの"
inspect_config {
min_likelihood = "POSSIBLE"
info_types { name = "EMAIL_ADDRESS" }
info_types { name = "PHONE_NUMBER" }
info_types { name = "PERSON_NAME" }
info_types { name = "JAPAN_BANK_ACCOUNT" }
info_types { name = "JAPAN_INDIVIDUAL_NUMBER" }
info_types { name = "PASSWORD" }
info_types { name = "CREDIT_CARD_NUMBER" }
info_types { name = "IP_ADDRESS" }
info_types { name = "AUTH_TOKEN" }
info_types { name = "GCP_API_KEY" }
# ... 全100種類以上の InfoType を定義
}
}
Pub/Sub トピックの作成
DLP のスキャン結果を受け取るための Pub/Sub トピックを作成します。
# pubsub.tf より抜粋
resource "google_pubsub_topic" "bq_policy_tagger" {
project = data.google_project.current.project_id
name = "${local.prefix}-bq_policy_tagger-topic"
message_retention_duration = "${60 * 60 * 24 * 7}s" # 7 days
}
resource "google_pubsub_topic_iam_policy" "bq_policy_tagger" {
project = data.google_project.current.project_id
topic = google_pubsub_topic.bq_policy_tagger.name
policy_data = jsonencode({
bindings = [
{
role = "roles/pubsub.publisher" # Pub/Sub パブリッシャー
members = [
# DLP サービスアカウントに publish 権限を付与
"serviceAccount:service-${data.google_project.current.number}@dlp-api.iam.gserviceaccount.com",
]
},
]
})
}
Cloud Functions の作成
Cloud Functions のリソースを Terraform で定義します。
# data.tf より抜粋
data "archive_file" "bq_policy_tagger" {
type = "zip"
source_dir = "${path.module}/files/function/bq_policy_tagger"
output_path = "${path.module}/files/function/bq_policy_tagger/functions.zip"
excludes = [
".rubocop.yml",
".ruby-lsp",
".ruby-version",
"README.md",
"functions.zip",
"spec/**/*",
]
}
# google_cloudfunctions2.tf より抜粋
resource "google_cloudfunctions2_function" "bq_policy_tagger" {
project = data.google_project.current.project_id
location = "asia-northeast1"
name = "${local.prefix}-bq_policy_tagger"
build_config {
runtime = "ruby34"
entry_point = "handler"
source {
storage_source {
bucket = google_storage_bucket.bq_policy_tagger.name
object = google_storage_bucket_object.bq_policy_tagger.name
}
}
}
service_config {
max_instance_count = 1
timeout_seconds = 60
available_memory = "256Mi"
ingress_settings = "ALLOW_INTERNAL_ONLY"
environment_variables = {
CONFIG_YAML_PATH = "config/prd.yml"
SENTRY_ENVIRONMENT = "production"
}
secret_environment_variables {
key = "SENTRY_DSN"
project_id = data.google_project.current.project_id
secret = google_secret_manager_secret.bq_policy_tagger_sentry_dsn.secret_id
version = "latest"
}
service_account_email = google_service_account.bq_policy_tagger.email
}
event_trigger {
trigger_region = "asia-northeast1"
event_type = "google.cloud.pubsub.topic.v1.messagePublished"
pubsub_topic = google_pubsub_topic.bq_policy_tagger.id
retry_policy = "RETRY_POLICY_RETRY"
}
}
Cloud Functions の実装
Cloud Functions は Ruby で実装されています。Pub/Sub からメッセージを受け取り、DLP のカラムプロファイルを取得して、適切なポリシータグを設定します。
Gemfile
# frozen_string_literal: true
source 'https://rubygems.org'
gem 'functions_framework'
gem 'google-cloud-bigquery'
gem 'google-cloud-dlp'
gem 'sentry-ruby'
group :development, :test do
gem 'rspec'
gem 'rubocop', require: false
gem 'rubocop-performance', require: false
gem 'rubocop-rspec', require: false
end
エントリーポイント (app.rb)
# files/function/bq_policy_tagger/app.rb
# frozen_string_literal: true
require 'functions_framework'
require 'google/cloud/bigquery'
require 'google/cloud/dlp'
require 'sentry-ruby'
require_relative 'bq_policy_tagger'
FunctionsFramework.cloud_event 'handler' do |event|
Sentry.init unless Sentry.initialized?
logger.debug "Received event: #{@event.inspect}"
BqPolicyTagger.new(
event,
FunctionsFramework.logger,
Google::Cloud::Dlp.dlp_service,
Google::Cloud::Bigquery.new
).perform
rescue StandardError => e
Sentry.capture_exception(e)
logger.error "An error occurred: #{e}"
メインロジック (bq_policy_tagger.rb)
# files/function/bq_policy_tagger/bq_policy_tagger.rb より抜粋
# frozen_string_literal: true
require 'base64'
require 'google/cloud/dlp/v2'
require 'yaml'
# DLP で検出されたプロファイルに基づいて BigQuery のスキーマにポリシータグを追加する
class BqPolicyTagger
SCORE_THRESHOLD = Google::Cloud::Dlp::V2::SensitivityScore::SensitivityScoreLevel::SENSITIVITY_MODERATE
def initialize(event, logger, dlp_client, bq_client)
@event = event
@logger = logger
@dlp_client = dlp_client
@bq_client = bq_client
end
def perform
table_profile = @dlp_client.get_table_data_profile(name: dlp_message.profile.name)
update_schema(table_profile)
end
private
def dlp_message
Google::Cloud::Dlp::V2::DataProfilePubSubMessage.decode(event_message)
rescue StandardError => e
raise "Failed to decode DLP message: #{e.message}"
end
# event の構造
# https://cloud.google.com/eventarc/docs/cloudevents?hl=ja#common-events
# {
# "subscription": "projects/test-project/subscriptions/my-subscription",
# "message": {
# "attributes": {
# "attr1": "attr1-value"
# },
# "data": "dGVzdCBtZXNzYWdlIDM=",
# "messageId": "message-id",
# "publishTime": "2021-02-05T04:06:14.109Z",
# "orderingKey": "ordering-key"
# }
# }
def event_message
message_data = @event.data['message']['data']
raise 'Message data is empty' if message_data.nil?
Base64.decode64(message_data)
end
def update_schema(table_profile)
dataset_id = table_profile.full_resource.split('/')[6]
table_id = table_profile.full_resource.split('/')[8]
@logger.info "Processing table: #{dataset_id}.#{table_id}"
table = @bq_client.dataset(dataset_id).table(table_id)
schema = new_schema(table.schema, dlp_message)
table.schema(replace: true) do |s|
s.load(fields_to_json(schema.fields))
end
@logger.info "Completed processing table: #{dataset_id}.#{table_id}"
end
def fields_to_json(fields)
fields.map do |field|
{
name: field.name,
type: field.type,
mode: field.mode,
policyTags: ({ names: field.policy_tags } if field.policy_tags),
fields: (fields_to_json(field.fields) if field.fields&.any?)
}.compact
end
end.to_json
def new_schema(base_schema, dlp_message)
schema = base_schema.dup
column_profiles = @dlp_client.list_column_data_profiles(
parent: dlp_message.profile.name.split('/')[0..3].join('/'),
filter: "table_data_profile_name = #{dlp_message.profile.name}"
)
column_profiles.each do |column_profile|
update_column(schema, column_profile)
end
schema
end
def update_column(schema, column_profile)
unless should_process_column?(column_profile)
@logger.info "Skip column: #{column_profile.column}"
return
end
@logger.info "Processing column: #{column_profile.column}"
field = schema.fields.find { |f| f.name == column_profile.column }
field.policy_tags = resolve_policy_tags(column_profile)
field
end
def should_process_column?(column_profile)
dataset = column_profile.table_full_resource.split('/').last
column = column_profile.column
return false if config['skip_columns'].include?("#{dataset}.#{column}")
level = column_profile.sensitivity_score.score
Google::Cloud::Dlp::V2::SensitivityScore::SensitivityScoreLevel.const_get(level) >= SCORE_THRESHOLD
rescue NameError
@logger.warn "Unknown sensitivity level provided: #{level}"
false
end
def resolve_policy_tags(column_profile)
begin
policy_tag_id = config.fetch('policy_tag_mapping').fetch(column_profile.column_info_type&.info_type&.name)
rescue KeyError
policy_tag_id = config.fetch('policy_tag_mapping').fetch('DEFAULT')
end
policy_tag = format(
'projects/%<project_id>s/locations/%<location>s/taxonomies/%<taxonomies_id>s/policyTags/%<policy_tag_id>s',
project_id: column_profile.table_full_resource.split('/')[4],
location: 'asia-northeast1',
taxonomies_id: config.fetch('taxonomy_id'),
policy_tag_id: policy_tag_id
)
[policy_tag]
end
def config
@config ||= YAML.safe_load_file(ENV.fetch('CONFIG_YAML_PATH'), aliases: true)
end
end
spec ファイル
# frozen_string_literal: true
require 'cloud_events/event'
require 'google/cloud/bigquery'
require 'google/cloud/dlp'
require 'spec_helper'
require_relative '../bq_policy_tagger'
RSpec.describe BqPolicyTagger do
subject(:instance) { described_class.new(event, logger, dlp_client, bq_client) }
let!(:project_id) { 'test-project' }
let!(:location) { 'asia-northeast1' }
let!(:taxonomies_id) { '12345' }
let!(:policy_tag_id) { 'pii_mail_address_policy_tag_id' }
let!(:logger) { object_spy(Logger.new($stdout)) }
let!(:dlp_client) { object_spy(Google::Cloud::Dlp.dlp_service) }
let!(:bq_client) { object_spy(Google::Cloud::Bigquery.new(project_id: project_id)) }
let!(:table_profile) { Google::Cloud::Dlp::V2::TableDataProfile.new }
let!(:dlp_message_data) do
message = Google::Cloud::Dlp::V2::DataProfilePubSubMessage.new
message.profile = table_profile
message.event = Google::Cloud::Dlp::V2::DataProfileAction::EventType::NEW_PROFILE
message
end
let!(:event_message_data) do
{
data: {
subscription: "projects/#{project_id}/subscriptions/my-subscription",
'message' => {
attributes: {
attr1: 'attr1-value'
},
'data' => Base64.encode64(
Google::Cloud::Dlp::V2::DataProfilePubSubMessage.encode(dlp_message_data)
),
messageId: 'message-id',
publishTime: '2021-02-05T04:06:14.109Z',
orderingKey: 'ordering-key'
}
}
}
end
let!(:event) do
instance_double(
CloudEvents::Event::V1,
event_message_data
)
end
let!(:dataset) { object_spy(Google::Cloud::Bigquery::Dataset.new) }
let!(:table) { object_spy(Google::Cloud::Bigquery::Table.new) }
let!(:default_policy_tag_id) { 'default_policy_tag_id' }
before do
allow(ENV).to receive(:fetch).and_return('fixtures/config.yml')
allow(YAML).to receive(:safe_load_file).with('fixtures/config.yml', aliases: true).and_return(
{
'taxonomy_id' => taxonomies_id,
'policy_tags' => {
'default' => {
'self' => default_policy_tag_id
},
'confidential' => {
'self' => 'confidential_policy_tag_id',
'credential' => 'confidential_credential_policy_tag_id'
},
'financial' => {
'self' => 'financial_policy_tag_id',
'bank_data' => 'financial_bank_data_policy_tag_id',
'transaction_data' => 'financial_transaction_data_policy_tag_id'
},
'pii' => {
'self' => 'pii_policy_tag_id',
'mail_address' => policy_tag_id,
'name' => 'pii_name_policy_tag_id'
}
},
'policy_tag_mapping' => {
'DEFAULT' => default_policy_tag_id,
'EMAIL_ADDRESS' => policy_tag_id,
'PERSON_NAME' => 'pii_name_policy_tag_id'
},
'skip_columns' => [
'table_id.skip_column'
]
}
)
end
describe '#perform' do
context '正常系' do
before do
allow(dlp_client).to receive(:get_table_data_profile).and_return(table_profile)
allow(instance).to receive(:update_schema)
end
it 'dlp_client.get_table_data_profile を呼び出す' do
instance.perform
expect(dlp_client).to have_received(:get_table_data_profile).with(name: dlp_message_data.profile.name)
end
it 'update_schema メソッドを呼び出す' do
instance.perform
expect(instance).to have_received(:update_schema).with(table_profile)
end
end
end
describe '#dlp_message' do
context '正常系' do
it 'デコードされたメッセージを返す' do
expect(instance.send(:dlp_message)).to eq(dlp_message_data)
end
end
context '異常系' do
it 'デコードに失敗した場合エラーを発生させる' do
allow(instance).to receive(:event_message).and_return('invalid message')
expect { instance.send(:dlp_message) }.to raise_error(StandardError).with_message(/Failed to decode DLP message: .*/)
end
end
end
describe '#event_message' do
let!(:dlp_message) { Google::Cloud::Dlp::V2::DataProfilePubSubMessage.encode(dlp_message_data) }
context '正常系' do
it 'デコードされたメッセージを返す' do
expect(instance.send(:event_message)).to eq(dlp_message)
end
it 'event.data が呼び出される' do
instance.send(:event_message)
expect(event).to have_received(:data).with(no_args).once
end
it 'Base64.decode64 が呼び出される' do
allow(Base64).to receive(:decode64)
instance.send(:event_message)
expect(Base64).to have_received(:decode64).with(event_message_data[:data]['message']['data']).once
end
end
context '異常系' do
it 'メッセージがない場合エラーを発生させる' do
allow(event).to receive(:data).and_return({ 'message' => { 'data' => nil } })
expect { instance.send(:event_message) }.to raise_error(RuntimeError).with_message('Message data is empty')
end
end
end
describe '#update_schema' do
let!(:dataset_id) { 'dataset_id' }
let!(:table_id) { 'table_id' }
let!(:schema) do
double(
name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
fields: [
double(name: 'id', type: 'STRING'),
double(name: 'name', type: 'STRING')
]
)
end
before do
allow(instance).to receive_messages(
new_schema: schema,
fields_to_json: []
)
allow(table_profile).to receive(:full_resource).and_return("/projects/#{project_id}/locations/#{location}/datasets/#{dataset_id}/tables/#{table_id}")
allow(bq_client).to receive(:dataset).and_return(dataset)
allow(dataset).to receive(:table).and_return(table)
allow(table).to receive(:schema) do |&block|
allow(schema).to receive(:load)
block&.call(schema)
schema
end
end
context '正常系' do
it 'dataset_id を取得する' do
expect(table_profile.full_resource.split('/')[6]).to eq(dataset_id)
end
it 'table_id を取得する' do
expect(table_profile.full_resource.split('/')[8]).to eq(table_id)
end
it 'table_profile.full_resource が2回呼び出される' do
instance.send(:update_schema, table_profile)
expect(table_profile).to have_received(:full_resource).exactly(2).times
end
it 'ログに処理中メッセージを出力する' do
instance.send(:update_schema, table_profile)
expect(logger).to have_received(:info).with("Processing table: #{dataset_id}.#{table_id}")
end
it 'bq_client.dataset が呼び出される' do
instance.send(:update_schema, table_profile)
expect(bq_client).to have_received(:dataset).with(dataset_id).once
end
it 'bq_client.dataset.table が呼び出される' do
instance.send(:update_schema, table_profile)
expect(dataset).to have_received(:table).with(table_id).once
end
it 'bq_client.dataset.table.schema が呼び出される' do
instance.send(:update_schema, table_profile)
expect(table).to have_received(:schema).with(no_args).once
end
it 'new_schema メソッドを呼び出す' do
instance.send(:update_schema, table_profile)
expect(instance).to have_received(:new_schema).with(schema, dlp_message_data)
end
it 'table.schema が呼び出される' do
instance.send(:update_schema, table_profile)
expect(table).to have_received(:schema).with(replace: true).once
end
it 'table.schema が2回呼び出される' do
instance.send(:update_schema, table_profile)
expect(table).to have_received(:schema).twice
end
it 'fields_to_json メソッドを呼び出す' do
instance.send(:update_schema, table_profile)
expect(instance).to have_received(:fields_to_json).with(schema.fields)
end
it 'ログに処理完了メッセージを出力する' do
instance.send(:update_schema, table_profile)
expect(logger).to have_received(:info).with("Completed processing table: #{dataset_id}.#{table_id}")
end
end
end
describe '#fields_to_json' do
let!(:policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"] }
context '正常系' do
it 'フィールドを JSON に変換して返す' do
fields = [
double(name: 'record', type: 'RECORD', mode: 'NULLABLE', policy_tags: policy_tags, fields: [
double(name: 'record_1', type: 'STRING', mode: 'NULLABLE', policy_tags: nil, fields: nil),
double(name: 'record_2', type: 'INTEGER', mode: 'NULLABLE', policy_tags: nil, fields: nil)
]),
double(name: 'string', type: 'STRING', mode: 'NULLABLE', policy_tags: nil, fields: nil),
double(name: 'integer', type: 'INTEGER', mode: 'NULLABLE', policy_tags: nil, fields: nil)
]
expected_json = [
{ name: 'record', type: 'RECORD', mode: 'NULLABLE', policyTags: { names: policy_tags }, fields: [
{ name: 'record_1', type: 'STRING', mode: 'NULLABLE' },
{ name: 'record_2', type: 'INTEGER', mode: 'NULLABLE' }
] },
{ name: 'string', type: 'STRING', mode: 'NULLABLE' },
{ name: 'integer', type: 'INTEGER', mode: 'NULLABLE' }
]
expect(instance.send(:fields_to_json, fields)).to eq(expected_json)
end
end
end
describe '#new_schema' do
let!(:schema) do
double(
name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
fields: [
double(name: 'id', type: 'STRING'),
double(name: 'name', type: 'STRING'),
double(name: 'email', type: 'STRING')
]
)
end
let!(:column_profiles) { [1, 2, 3] }
before do
allow(dlp_client).to receive(:list_column_data_profiles).and_return(column_profiles)
allow(instance).to receive(:update_column)
allow(instance).to receive(:dlp_message).and_return(dlp_message_data)
end
context '正常系' do
it 'スキーマをクローンして返す' do
expect { instance.send(:new_schema, schema, dlp_message_data) }.not_to(change { schema })
end
it 'dlp_client.list_column_data_profiles を呼び出す' do
instance.send(:new_schema, schema, dlp_message_data)
expect(dlp_client).to have_received(:list_column_data_profiles).with(
parent: dlp_message_data.profile.name.split('/')[0..3].join('/'),
filter: "table_data_profile_name = #{dlp_message_data.profile.name}"
)
end
it 'column_profiles の数だけ update_column を呼び出す' do
instance.send(:new_schema, schema, dlp_message_data)
expect(instance).to have_received(:update_column).exactly(column_profiles.size).times
end
end
context '異常系' do
it 'dlp_client.list_column_data_profiles が失敗した場合エラーを発生させる' do
allow(dlp_client).to receive(:list_column_data_profiles).and_raise(StandardError)
expect do
instance.send(:new_schema, schema, dlp_message_data)
end.to raise_error(StandardError)
end
end
end
describe '#update_column' do
let!(:policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"] }
let!(:aready_attached_policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/aready_attached_policy_tag_id"] }
let!(:id_column) do
id_column = double(
name: 'id',
type: 'STRING',
fields: []
)
allow(id_column).to receive(:policy_tags=) { |arg| id_column.instance_variable_set(:@policy_tags, arg) }
allow(id_column).to receive(:policy_tags) { id_column.instance_variable_get(:@policy_tags) }
id_column
end
let!(:name_column) do
name_column = double(
name: 'name',
type: 'STRING',
fields: []
)
allow(name_column).to receive(:policy_tags=) { |arg| name_column.instance_variable_set(:@policy_tags, arg) }
allow(name_column).to receive(:policy_tags) { name_column.instance_variable_get(:@policy_tags) }
name_column
end
let!(:email_column) do
email_column = double(
name: 'email',
type: 'STRING',
policy_tags: aready_attached_policy_tags,
fields: []
)
allow(email_column).to receive(:policy_tags=) { |arg| email_column.instance_variable_set(:@policy_tags, arg) }
allow(email_column).to receive(:policy_tags) { email_column.instance_variable_get(:@policy_tags) }
email_column.policy_tags = aready_attached_policy_tags
email_column
end
let!(:schema) do
double(
name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
fields: [id_column, name_column, email_column]
)
end
before do
allow(instance).to receive(:resolve_policy_tags).and_return(policy_tags)
end
context 'SENSITIVITY_HIGH の場合' do
it 'id カラムにポリシータグを追加する' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'id',
column_info_type: double(
info_type: double(
name: 'EMAIL_ADDRESS'
)
)
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[0].policy_tags).to eq(policy_tags)
end
it 'name カラムにポリシータグを追加する' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'name',
column_info_type: double(
info_type: double(
name: 'EMAIL_ADDRESS'
)
)
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[1].policy_tags).to eq(policy_tags)
end
it 'ログに処理中メッセージを出力する' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'id'
)
instance.send(:update_column, schema, column_profile)
expect(logger).to have_received(:info).with("Processing column: #{column_profile.column}")
end
end
context 'SENSITIVITY_HIGH かつポリシータグが設定されている場合' do
it 'email カラムのポリシータグを上書きする' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'email'
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[2].policy_tags).to eq(policy_tags)
end
it 'ログに処理中メッセージを出力する' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'email'
)
instance.send(:update_column, schema, column_profile)
expect(logger).to have_received(:info).with("Processing column: #{column_profile.column}")
end
end
context 'SENSITIVITY_LOW の場合' do
it 'id カラムにポリシータグを追加しない' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_LOW'
),
column: 'id'
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[0].policy_tags).to be_nil
end
it 'name カラムにポリシータグを追加しない' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_LOW'
),
column: 'name'
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[1].policy_tags).to be_nil
end
it 'email カラムにポリシータグを追加しない' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_LOW'
),
column: 'email'
)
instance.send(:update_column, schema, column_profile)
expect(schema.fields[2].policy_tags).to eq(aready_attached_policy_tags)
end
it 'ログにスキップメッセージを出力する' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_LOW'
),
column: 'name'
)
instance.send(:update_column, schema, column_profile)
expect(logger).to have_received(:info).with("Skip column: #{column_profile.column}")
end
end
end
describe '#should_process_column?' do
context '設定ファイルの skip_columns にカラム名が含まれている場合' do
let!(:column_profile) do
double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'skip_column'
)
end
it 'falseを返す' do
expect(instance.send(:should_process_column?, column_profile)).to be false
end
end
context '機密レベルが SCORE_THRESHOLD 以上の場合' do
let!(:column_profile) do
double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_HIGH'
),
column: 'id'
)
end
it 'trueを返す' do
expect(instance.send(:should_process_column?, column_profile)).to be true
end
end
context '機密レベルが SCORE_THRESHOLD 未満の場合' do
let!(:column_profile) do
double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: 'SENSITIVITY_UNKNOWN'
),
column: 'id'
)
end
it 'falseを返す' do
expect(instance.send(:should_process_column?, column_profile)).to be false
end
end
context '未知の機密レベルが与えられた場合' do
let!(:invalid_sensitivity_level) { 'UNKNOWN' }
let!(:column_profile) do
double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
sensitivity_score: double(
score: invalid_sensitivity_level
),
column: 'id'
)
end
it 'falseを返す' do
expect(instance.send(:should_process_column?, column_profile)).to be false
end
it '警告をログに出力' do
instance.send(:should_process_column?, column_profile)
expect(logger).to have_received(:warn).with("Unknown sensitivity level provided: #{invalid_sensitivity_level}")
end
end
end
describe '#resolve_policy_tags' do
let!(:column_profile) do
double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
column_info_type: double(
info_type: double(
name: 'EMAIL_ADDRESS'
)
)
)
end
it 'info_type に対応するポリシータグが設定されている場合対応しているポリシータグを返す' do
expected_policy_tags = ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"]
expect(instance.send(:resolve_policy_tags, column_profile)).to eq(expected_policy_tags)
end
it 'info_type に対応するポリシータグが設定されていない場合 DEFAULT のポリシータグを返す' do
column_profile = double(
table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
column_info_type: double(
info_type: double(
name: 'UNKNOWN_INFO_TYPE'
)
)
)
expected_policy_tags = ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{default_policy_tag_id}"]
expect(instance.send(:resolve_policy_tags, column_profile)).to eq(expected_policy_tags)
end
end
describe '#config' do
it 'CONFIG_YAML_PATH が設定されていない場合エラーを発生させる' do
allow(ENV).to receive(:fetch).with('CONFIG_YAML_PATH').and_raise(KeyError)
expect do
instance.send(:config)
end.to raise_error(KeyError)
end
it 'CONFIG_YAML_PATH が不正な場合エラーを発生させる' do
allow(ENV).to receive(:fetch).with('CONFIG_YAML_PATH').and_return('none')
allow(YAML).to receive(:safe_load_file).with('none', aliases: true).and_raise(Errno::ENOENT)
expect do
instance.send(:config)
end.to raise_error(Errno::ENOENT)
end
end
end
ポリシータグのマッピング
検出された InfoType に応じて、適切なポリシータグを割り当てています。
対応するポリシータグが設定されていない場合は DEFAULT のポリシータグを付与します。
# files/function/bq_policy_tagger/config/prd.yml より抜粋
taxonomy_id: "xxxxxxxxxxxxxxxxxx"
policy_tags:
high:
self: &high "xxxxxxxxxxxxxxxxxx1"
japan_bank_account: &high_japan_bank_account "xxxxxxxxxxxxxxxxxx2"
password: &high_password "xxxxxxxxxxxxxxxxxx3"
transaction_data: &high_transaction_data "xxxxxxxxxxxxxxxxxx4"
medium:
self: &medium "xxxxxxxxxxxxxxxxxx5"
email_address: &medium_email_address "xxxxxxxxxxxxxxxxxx6"
person_name: &medium_person_name "xxxxxxxxxxxxxxxxxx7"
phone_number: &medium_phone_number "xxxxxxxxxxxxxxxxxx8"
low:
self: &low "xxxxxxxxxxxxxxxxxx9"
policy_tag_mapping:
DEFAULT: *medium
EMAIL_ADDRESS: *medium_email_address
JAPAN_BANK_ACCOUNT: *high_japan_bank_account
PASSWORD: *high_password
PERSON_NAME: *medium_person_name
PHONE_NUMBER: *medium_phone_number
スキップリスト
一部のカラムは、誤検出や業務上の理由でポリシータグの自動設定をスキップしています。
# files/function/bq_policy_tagger/config/prd.yml より抜粋
skip_columns:
- admin_users.email # 社内ユーザーの情報なのでマスキングしない
- contracts.bank_name # JAPAN_BANK_ACCOUNT を付与
- projects.title # プロジェクトタイトルは公開情報のため除外
# ...
元ネタ
今回の実装は Google Cloud のサンプル実装 bq-pii-classifier を参考にしています。
なぜ自作したのか
元ネタは Java で実装されており、カスタマイズが難しそうでした。また、READYFOR は Ruby をメイン言語として使っている会社なので、Ruby で実装すれば他のエンジニアも修正しやすくなります。
そのため、元ネタを参考にしつつ、必要最小限の機能に絞って Ruby で自作することにしました。
運用での工夫
1. スキャン頻度の調整
データセットの特性に応じてスキャン頻度を調整しています:
- スキーマ変更時: 毎日チェック
- テーブル更新時: 毎月チェック
これにより、DLP のスキャンコストを抑えつつ、新規カラムや変更されたカラムを適時検出できます。
2. 誤検出への対応
DLP は高精度ですが、誤検出も発生します。例えば
- プロジェクトタイトルが人名と誤検出される
- 都道府県名だけが入ったカラムが住所と誤検出される
これらは設定ファイルの skip_columns に追加することで除外しています。
効果
この仕組みを約1年間運用してきましたが、安定して動作しています。
セキュリティの向上
- 個人情報を含むカラムが自動的に検出され、適切なアクセス制御が適用される
- 新規テーブルやカラムも自動的に保護される
- 人的ミスによる漏れを防止
運用負荷の削減
- 手動でのポリシータグ設定作業が不要に
- 新規テーブル追加時の対応が自動化
- 監査対応が容易に
可視化
- どのテーブル・カラムに個人情報が含まれるか一覧化
- DLP のダッシュボードで機密データの分布を把握
安定運用
約1年間の運用を通じて、以下の点が確認できました:
- Cloud Functions のエラー率は極めて低く、安定して動作
- DLP のスキャンコストも想定内に収まっている
- 誤検出は初期に
skip_columnsで対応し、その後は追加がほぼ不要 - 新規テーブルやカラム追加時も自動的にタグが付与され、運用負荷がほぼゼロ
ハマったポイント
1. サービスアカウントの権限借用が必要
ローカルでの開発時、gcloud auth application-default login でログインした権限では DLP 系の API が叩けませんでした。
--impersonate-service-account オプションを使用してサービスアカウントの権限を借用する必要があります。
gcloud auth application-default login --impersonate-service-account=SERVICE_ACCOUNT_EMAIL
Terraform の Google Provider でも同様に設定します
provider "google" {
project = local.google_project_id
region = "asia-northeast1"
# Application Default Credentials を利用すると DLP API が利用できないため、Terraform のサービスアカウントの権限借用をする
impersonate_service_account = "terraform@xxx.iam.gserviceaccount.com"
}
2. DLP API の Rate Limit
DLP のスキャン結果を確認する際、すぐに rate limit にかかってしまいます。
実装時は適切なリトライ処理を入れる必要がありました。
3. DLP 開発用のスクリプトを作成
実装中に DLP のスキャン結果を確認したり、テスト後にプロファイルを削除して再スキャンしたりする作業が頻繁に発生しました。
Rate Limit にもかかるため、手作業では辛く、以下のようなスクリプトを作成しました:
-
DLP プロファイルを CSV 出力するスクリプト
- どのカラムが検出されたか、機密性スコアや InfoType を確認できる
-
DLP プロファイルを一括削除するスクリプト
- テスト後にプロファイルをクリーンアップして、再度スキャンをやり直せる
これらのスクリプトは、開発環境でのテストや動作確認を大幅に効率化してくれました。
おわりに
Google Cloud DLP と Cloud Functions を組み合わせることで、BigQuery のポリシータグを自動設定する仕組みを構築できました。
セキュリティ向上と運用負荷削減を両立できる様になったことは、大きな成果だと感じています。
明日は READYFOR Advent Calendar 2025 9日目、resqnet さんによる記事が公開される予定です。
お楽しみに!
「みんなの想いを集め、社会を良くするお金の流れをつくる」READYFORのエンジニアブログです。技術情報を中心に様々なテーマで発信していきます。 ( Zenn: zenn.dev/p/readyfor_blog / Hatena: tech.readyfor.jp/ )
Discussion