🏷️

DLP スキャン結果を元に BigQuery ポリシータグを自動的に付与する仕組みを作った話

に公開

はじめに

BigQuery に格納されているデータの個人情報保護を強化するため、CIO の方と相談しながら自動的にポリシータグを付与する仕組みを構築しました。

この仕組みは約1年間安定稼働しており、新規テーブルやカラムの追加時も自動的に保護されるため、運用負荷がほぼゼロになっています。

本記事では、Google Cloud の DLP (Data Loss Prevention) API を使ってテーブルをスキャンし、検出結果をトリガーに Cloud Functions でポリシータグを自動設定する仕組みについて紹介します。

背景

課題

READYFOR では、Aurora MySQL のデータを Datastream を使って BigQuery にリアルタイムで連携しています。このデータには顧客の個人情報や機密情報が含まれていますが、そのままの状態では全社員がすべてのデータにアクセスできてしまいます。

適切にアクセス制御するには、どのカラムに個人情報が含まれるかを正確に把握し、適切なポリシータグを設定する必要があります。

当初は Terraform で BigQuery のスキーマを管理しており、ポリシータグも Terraform のコード内で手動で指定していました。しかし、テーブル数やカラム数が増えるにつれて、以下の課題が顕在化しました:

  • Datastream 経由で連携されるデータのスキーマ変更に対応し続ける必要がある
  • どのカラムに個人情報が含まれるか目視で確認する必要がある
  • ポリシータグの付け忘れや誤設定のリスク

解決策

DLP で BigQuery テーブルを定期的にスキャンし、検出結果を Pub/Sub 経由で受け取り、Cloud Functions で自動的にポリシータグを付与する仕組みを実装しました。

DLP によるデータのマスキングや暗号化は行わず、検出された個人情報の種類(InfoType)を元に BigQuery のポリシータグを設定し、BigQuery のアクセス制御機能でデータを保護しています。

Google Cloud DLP (Data Loss Prevention) は、機密情報や個人情報を検出・保護するためのサービスです。データのスキャン、マスキング、暗号化など多機能ですが、今回は検出のみを活用しています。

アーキテクチャ

全体の流れ

  1. DLP サービスが BigQuery テーブルをスキャン
  2. スキャン結果を Pub/Sub トピックに publish
  3. Pub/Sub がトリガーとなり Cloud Functions が起動
  4. Cloud Functions が BigQuery のスキーマにポリシータグを設定

スキャン対象

以下のデータセットを対象にスキャンを実施しています:

  • lake_readyfor_db: 本番データベースのレプリケーション
  • staging_*: ETL 処理の中間データ
  • marts_*: dbt で加工したデータマート

データ量や機密性を考慮して、スキャン頻度や対象を調整しています。

resource "google_data_loss_prevention_discovery_config" "main" {
  parent       = "projects/${data.google_project.current.project_id}/locations/asia-northeast1"
  location     = "asia-northeast1"
  status       = "RUNNING"
  display_name = "main"

  inspect_templates = [
    format(
      "projects/%s/locations/global/inspectTemplates/%s",
      data.google_project.current.project_id,
      google_data_loss_prevention_inspect_template.main.template_id,
    )
  ]

  targets {
    big_query_target {
      cadence {
        schema_modified_cadence {
          frequency = "UPDATE_FREQUENCY_DAILY"
          types     = ["SCHEMA_NEW_COLUMNS", "SCHEMA_REMOVED_COLUMNS"]
        }
        table_modified_cadence {
          frequency = "UPDATE_FREQUENCY_MONTHLY"
          types     = ["TABLE_MODIFIED_TIMESTAMP"]
        }
        inspect_template_modified_cadence {
          frequency = "UPDATE_FREQUENCY_DAILY"
        }
      }
      conditions {
        created_after   = null
        type_collection = "BIG_QUERY_COLLECTION_ONLY_SUPPORTED_TYPES"
      }
      filter {
        tables {
          include_regexes {
            patterns {
              project_id_regex = data.google_project.current.project_id
              dataset_id_regex = "lake_readyfor_db"
              table_id_regex   = ".*"
            }
          }
        }
      }
    }
  }
  
### lake_readyfor_db 以外の targets は省略

  actions {
    pub_sub_notification {
      detail_of_message = "TABLE_PROFILE"
      event             = "NEW_PROFILE"
      topic             = google_pubsub_topic.bq_policy_tagger.id
    }
  }

  actions {
    pub_sub_notification {
      detail_of_message = "TABLE_PROFILE"
      event             = "CHANGED_PROFILE"
      topic             = google_pubsub_topic.bq_policy_tagger.id
    }
  }

  actions {
    pub_sub_notification {
      detail_of_message = "TABLE_PROFILE"
      event             = "SCORE_INCREASED"
      topic             = google_pubsub_topic.bq_policy_tagger.id
    }
  }

  actions {
    pub_sub_notification {
      detail_of_message = "TABLE_PROFILE"
      event             = "ERROR_CHANGED"
      topic             = google_pubsub_topic.bq_policy_tagger.id
    }
  }

  # Google provider では 「タグとして Dataplex に送信する」はサポートされていないので Google Cloud のコンソールから設定する
}

実装の詳細

DLP の Inspect Template

DLP の検出ルールとして、100種類以上の InfoType を設定しています。

日本特有の個人情報(マイナンバー、銀行口座、運転免許証など)から、メールアドレス、電話番号、クレジットカード番号、API キーまで幅広くカバーしています。

# google_data_loss_prevention.tf より抜粋
resource "google_data_loss_prevention_inspect_template" "main" {
  parent       = "projects/${data.google_project.current.project_id}"
  template_id  = "main"
  display_name = "main"
  description  = "デフォルト構成にいくつかの検出ルールを追加したもの"
  inspect_config {
    min_likelihood = "POSSIBLE"
    info_types { name = "EMAIL_ADDRESS" }
    info_types { name = "PHONE_NUMBER" }
    info_types { name = "PERSON_NAME" }
    info_types { name = "JAPAN_BANK_ACCOUNT" }
    info_types { name = "JAPAN_INDIVIDUAL_NUMBER" }
    info_types { name = "PASSWORD" }
    info_types { name = "CREDIT_CARD_NUMBER" }
    info_types { name = "IP_ADDRESS" }
    info_types { name = "AUTH_TOKEN" }
    info_types { name = "GCP_API_KEY" }
    # ... 全100種類以上の InfoType を定義
  }
}

Pub/Sub トピックの作成

DLP のスキャン結果を受け取るための Pub/Sub トピックを作成します。

# pubsub.tf より抜粋
resource "google_pubsub_topic" "bq_policy_tagger" {
  project                    = data.google_project.current.project_id
  name                       = "${local.prefix}-bq_policy_tagger-topic"
  message_retention_duration = "${60 * 60 * 24 * 7}s" # 7 days
}

resource "google_pubsub_topic_iam_policy" "bq_policy_tagger" {
  project = data.google_project.current.project_id
  topic   = google_pubsub_topic.bq_policy_tagger.name
  policy_data = jsonencode({
    bindings = [
      {
        role = "roles/pubsub.publisher" # Pub/Sub パブリッシャー
        members = [
          # DLP サービスアカウントに publish 権限を付与
          "serviceAccount:service-${data.google_project.current.number}@dlp-api.iam.gserviceaccount.com",
        ]
      },
    ]
  })
}

Cloud Functions の作成

Cloud Functions のリソースを Terraform で定義します。

# data.tf より抜粋
data "archive_file" "bq_policy_tagger" {
  type        = "zip"
  source_dir  = "${path.module}/files/function/bq_policy_tagger"
  output_path = "${path.module}/files/function/bq_policy_tagger/functions.zip"
  excludes = [
    ".rubocop.yml",
    ".ruby-lsp",
    ".ruby-version",
    "README.md",
    "functions.zip",
    "spec/**/*",
  ]
}
# google_cloudfunctions2.tf より抜粋
resource "google_cloudfunctions2_function" "bq_policy_tagger" {
  project  = data.google_project.current.project_id
  location = "asia-northeast1"
  name     = "${local.prefix}-bq_policy_tagger"

  build_config {
    runtime     = "ruby34"
    entry_point = "handler"
    source {
      storage_source {
        bucket = google_storage_bucket.bq_policy_tagger.name
        object = google_storage_bucket_object.bq_policy_tagger.name
      }
    }
  }

  service_config {
    max_instance_count               = 1
    timeout_seconds                  = 60
    available_memory                 = "256Mi"
    ingress_settings                 = "ALLOW_INTERNAL_ONLY"
    environment_variables = {
      CONFIG_YAML_PATH   = "config/prd.yml"
      SENTRY_ENVIRONMENT = "production"
    }
    secret_environment_variables {
      key        = "SENTRY_DSN"
      project_id = data.google_project.current.project_id
      secret     = google_secret_manager_secret.bq_policy_tagger_sentry_dsn.secret_id
      version    = "latest"
    }
    service_account_email = google_service_account.bq_policy_tagger.email
  }

  event_trigger {
    trigger_region = "asia-northeast1"
    event_type     = "google.cloud.pubsub.topic.v1.messagePublished"
    pubsub_topic   = google_pubsub_topic.bq_policy_tagger.id
    retry_policy   = "RETRY_POLICY_RETRY"
  }
}

Cloud Functions の実装

Cloud Functions は Ruby で実装されています。Pub/Sub からメッセージを受け取り、DLP のカラムプロファイルを取得して、適切なポリシータグを設定します。

Gemfile

# frozen_string_literal: true

source 'https://rubygems.org'

gem 'functions_framework'
gem 'google-cloud-bigquery'
gem 'google-cloud-dlp'
gem 'sentry-ruby'

group :development, :test do
  gem 'rspec'
  gem 'rubocop', require: false
  gem 'rubocop-performance', require: false
  gem 'rubocop-rspec', require: false
end

エントリーポイント (app.rb)

# files/function/bq_policy_tagger/app.rb
# frozen_string_literal: true

require 'functions_framework'
require 'google/cloud/bigquery'
require 'google/cloud/dlp'
require 'sentry-ruby'
require_relative 'bq_policy_tagger'

FunctionsFramework.cloud_event 'handler' do |event|
  Sentry.init unless Sentry.initialized?
  logger.debug "Received event: #{@event.inspect}"
  BqPolicyTagger.new(
    event,
    FunctionsFramework.logger,
    Google::Cloud::Dlp.dlp_service,
    Google::Cloud::Bigquery.new
  ).perform
rescue StandardError => e
  Sentry.capture_exception(e)
  logger.error "An error occurred: #{e}"
メインロジック (bq_policy_tagger.rb)
# files/function/bq_policy_tagger/bq_policy_tagger.rb より抜粋
# frozen_string_literal: true

require 'base64'
require 'google/cloud/dlp/v2'
require 'yaml'

# DLP で検出されたプロファイルに基づいて BigQuery のスキーマにポリシータグを追加する
class BqPolicyTagger
  SCORE_THRESHOLD = Google::Cloud::Dlp::V2::SensitivityScore::SensitivityScoreLevel::SENSITIVITY_MODERATE

  def initialize(event, logger, dlp_client, bq_client)
    @event = event
    @logger = logger
    @dlp_client = dlp_client
    @bq_client = bq_client
  end

  def perform
    table_profile = @dlp_client.get_table_data_profile(name: dlp_message.profile.name)
    update_schema(table_profile)
  end

  private

  def dlp_message
    Google::Cloud::Dlp::V2::DataProfilePubSubMessage.decode(event_message)
  rescue StandardError => e
    raise "Failed to decode DLP message: #{e.message}"
  end

  # event の構造
  # https://cloud.google.com/eventarc/docs/cloudevents?hl=ja#common-events
  # {
  #   "subscription": "projects/test-project/subscriptions/my-subscription",
  #   "message": {
  #     "attributes": {
  #       "attr1": "attr1-value"
  #     },
  #     "data": "dGVzdCBtZXNzYWdlIDM=",
  #     "messageId": "message-id",
  #     "publishTime": "2021-02-05T04:06:14.109Z",
  #     "orderingKey": "ordering-key"
  #   }
  # }
  def event_message
    message_data = @event.data['message']['data']
    raise 'Message data is empty' if message_data.nil?

    Base64.decode64(message_data)
  end

  def update_schema(table_profile)
    dataset_id = table_profile.full_resource.split('/')[6]
    table_id = table_profile.full_resource.split('/')[8]

    @logger.info "Processing table: #{dataset_id}.#{table_id}"

    table = @bq_client.dataset(dataset_id).table(table_id)
    schema = new_schema(table.schema, dlp_message)

    table.schema(replace: true) do |s|
      s.load(fields_to_json(schema.fields))
    end
    @logger.info "Completed processing table: #{dataset_id}.#{table_id}"
  end

  def fields_to_json(fields)
    fields.map do |field|
      {
        name: field.name,
        type: field.type,
        mode: field.mode,
        policyTags: ({ names: field.policy_tags } if field.policy_tags),
        fields: (fields_to_json(field.fields) if field.fields&.any?)
      }.compact
    end
  end.to_json

  def new_schema(base_schema, dlp_message)
    schema = base_schema.dup
    column_profiles = @dlp_client.list_column_data_profiles(
      parent: dlp_message.profile.name.split('/')[0..3].join('/'),
      filter: "table_data_profile_name = #{dlp_message.profile.name}"
    )
    column_profiles.each do |column_profile|
      update_column(schema, column_profile)
    end
    schema
  end

  def update_column(schema, column_profile)
    unless should_process_column?(column_profile)
      @logger.info "Skip column: #{column_profile.column}"
      return
    end
    @logger.info "Processing column: #{column_profile.column}"
    field = schema.fields.find { |f| f.name == column_profile.column }
    field.policy_tags = resolve_policy_tags(column_profile)
    field
  end

  def should_process_column?(column_profile)
    dataset = column_profile.table_full_resource.split('/').last
    column = column_profile.column
    return false if config['skip_columns'].include?("#{dataset}.#{column}")

    level = column_profile.sensitivity_score.score
    Google::Cloud::Dlp::V2::SensitivityScore::SensitivityScoreLevel.const_get(level) >= SCORE_THRESHOLD
  rescue NameError
    @logger.warn "Unknown sensitivity level provided: #{level}"
    false
  end

  def resolve_policy_tags(column_profile)
    begin
      policy_tag_id = config.fetch('policy_tag_mapping').fetch(column_profile.column_info_type&.info_type&.name)
    rescue KeyError
      policy_tag_id = config.fetch('policy_tag_mapping').fetch('DEFAULT')
    end
    policy_tag = format(
      'projects/%<project_id>s/locations/%<location>s/taxonomies/%<taxonomies_id>s/policyTags/%<policy_tag_id>s',
      project_id: column_profile.table_full_resource.split('/')[4],
      location: 'asia-northeast1',
      taxonomies_id: config.fetch('taxonomy_id'),
      policy_tag_id: policy_tag_id
    )
    [policy_tag]
  end

  def config
    @config ||= YAML.safe_load_file(ENV.fetch('CONFIG_YAML_PATH'), aliases: true)
  end
end
spec ファイル
# frozen_string_literal: true

require 'cloud_events/event'
require 'google/cloud/bigquery'
require 'google/cloud/dlp'
require 'spec_helper'

require_relative '../bq_policy_tagger'

RSpec.describe BqPolicyTagger do
  subject(:instance) { described_class.new(event, logger, dlp_client, bq_client) }

  let!(:project_id) { 'test-project' }
  let!(:location) { 'asia-northeast1' }
  let!(:taxonomies_id) { '12345' }
  let!(:policy_tag_id) { 'pii_mail_address_policy_tag_id' }

  let!(:logger) { object_spy(Logger.new($stdout)) }
  let!(:dlp_client) { object_spy(Google::Cloud::Dlp.dlp_service) }
  let!(:bq_client) { object_spy(Google::Cloud::Bigquery.new(project_id: project_id)) }

  let!(:table_profile) { Google::Cloud::Dlp::V2::TableDataProfile.new }
  let!(:dlp_message_data) do
    message = Google::Cloud::Dlp::V2::DataProfilePubSubMessage.new
    message.profile = table_profile
    message.event = Google::Cloud::Dlp::V2::DataProfileAction::EventType::NEW_PROFILE
    message
  end
  let!(:event_message_data) do
    {
      data: {
        subscription: "projects/#{project_id}/subscriptions/my-subscription",
        'message' => {
          attributes: {
            attr1: 'attr1-value'
          },
          'data' => Base64.encode64(
            Google::Cloud::Dlp::V2::DataProfilePubSubMessage.encode(dlp_message_data)
          ),
          messageId: 'message-id',
          publishTime: '2021-02-05T04:06:14.109Z',
          orderingKey: 'ordering-key'
        }
      }
    }
  end
  let!(:event) do
    instance_double(
      CloudEvents::Event::V1,
      event_message_data
    )
  end
  let!(:dataset) { object_spy(Google::Cloud::Bigquery::Dataset.new) }
  let!(:table) { object_spy(Google::Cloud::Bigquery::Table.new) }
  let!(:default_policy_tag_id) { 'default_policy_tag_id' }

  before do
    allow(ENV).to receive(:fetch).and_return('fixtures/config.yml')
    allow(YAML).to receive(:safe_load_file).with('fixtures/config.yml', aliases: true).and_return(
      {
        'taxonomy_id' => taxonomies_id,
        'policy_tags' => {
          'default' => {
            'self' => default_policy_tag_id
          },
          'confidential' => {
            'self' => 'confidential_policy_tag_id',
            'credential' => 'confidential_credential_policy_tag_id'
          },
          'financial' => {
            'self' => 'financial_policy_tag_id',
            'bank_data' => 'financial_bank_data_policy_tag_id',
            'transaction_data' => 'financial_transaction_data_policy_tag_id'
          },
          'pii' => {
            'self' => 'pii_policy_tag_id',
            'mail_address' => policy_tag_id,
            'name' => 'pii_name_policy_tag_id'
          }
        },
        'policy_tag_mapping' => {
          'DEFAULT' => default_policy_tag_id,
          'EMAIL_ADDRESS' => policy_tag_id,
          'PERSON_NAME' => 'pii_name_policy_tag_id'
        },
        'skip_columns' => [
          'table_id.skip_column'
        ]
      }
    )
  end

  describe '#perform' do
    context '正常系' do
      before do
        allow(dlp_client).to receive(:get_table_data_profile).and_return(table_profile)
        allow(instance).to receive(:update_schema)
      end

      it 'dlp_client.get_table_data_profile を呼び出す' do
        instance.perform
        expect(dlp_client).to have_received(:get_table_data_profile).with(name: dlp_message_data.profile.name)
      end

      it 'update_schema メソッドを呼び出す' do
        instance.perform
        expect(instance).to have_received(:update_schema).with(table_profile)
      end
    end
  end

  describe '#dlp_message' do
    context '正常系' do
      it 'デコードされたメッセージを返す' do
        expect(instance.send(:dlp_message)).to eq(dlp_message_data)
      end
    end

    context '異常系' do
      it 'デコードに失敗した場合エラーを発生させる' do
        allow(instance).to receive(:event_message).and_return('invalid message')
        expect { instance.send(:dlp_message) }.to raise_error(StandardError).with_message(/Failed to decode DLP message: .*/)
      end
    end
  end

  describe '#event_message' do
    let!(:dlp_message) { Google::Cloud::Dlp::V2::DataProfilePubSubMessage.encode(dlp_message_data) }

    context '正常系' do
      it 'デコードされたメッセージを返す' do
        expect(instance.send(:event_message)).to eq(dlp_message)
      end

      it 'event.data が呼び出される' do
        instance.send(:event_message)
        expect(event).to have_received(:data).with(no_args).once
      end

      it 'Base64.decode64 が呼び出される' do
        allow(Base64).to receive(:decode64)
        instance.send(:event_message)
        expect(Base64).to have_received(:decode64).with(event_message_data[:data]['message']['data']).once
      end
    end

    context '異常系' do
      it 'メッセージがない場合エラーを発生させる' do
        allow(event).to receive(:data).and_return({ 'message' => { 'data' => nil } })
        expect { instance.send(:event_message) }.to raise_error(RuntimeError).with_message('Message data is empty')
      end
    end
  end

  describe '#update_schema' do
    let!(:dataset_id) { 'dataset_id' }
    let!(:table_id) { 'table_id' }
    let!(:schema) do
      double(
        name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
        fields: [
          double(name: 'id', type: 'STRING'),
          double(name: 'name', type: 'STRING')
        ]
      )
    end

    before do
      allow(instance).to receive_messages(
        new_schema: schema,
        fields_to_json: []
      )
      allow(table_profile).to receive(:full_resource).and_return("/projects/#{project_id}/locations/#{location}/datasets/#{dataset_id}/tables/#{table_id}")
      allow(bq_client).to receive(:dataset).and_return(dataset)
      allow(dataset).to receive(:table).and_return(table)
      allow(table).to receive(:schema) do |&block|
        allow(schema).to receive(:load)
        block&.call(schema)
        schema
      end
    end

    context '正常系' do
      it 'dataset_id を取得する' do
        expect(table_profile.full_resource.split('/')[6]).to eq(dataset_id)
      end

      it 'table_id を取得する' do
        expect(table_profile.full_resource.split('/')[8]).to eq(table_id)
      end

      it 'table_profile.full_resource が2回呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(table_profile).to have_received(:full_resource).exactly(2).times
      end

      it 'ログに処理中メッセージを出力する' do
        instance.send(:update_schema, table_profile)
        expect(logger).to have_received(:info).with("Processing table: #{dataset_id}.#{table_id}")
      end

      it 'bq_client.dataset が呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(bq_client).to have_received(:dataset).with(dataset_id).once
      end

      it 'bq_client.dataset.table が呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(dataset).to have_received(:table).with(table_id).once
      end

      it 'bq_client.dataset.table.schema が呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(table).to have_received(:schema).with(no_args).once
      end

      it 'new_schema メソッドを呼び出す' do
        instance.send(:update_schema, table_profile)
        expect(instance).to have_received(:new_schema).with(schema, dlp_message_data)
      end

      it 'table.schema が呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(table).to have_received(:schema).with(replace: true).once
      end

      it 'table.schema が2回呼び出される' do
        instance.send(:update_schema, table_profile)
        expect(table).to have_received(:schema).twice
      end

      it 'fields_to_json メソッドを呼び出す' do
        instance.send(:update_schema, table_profile)
        expect(instance).to have_received(:fields_to_json).with(schema.fields)
      end

      it 'ログに処理完了メッセージを出力する' do
        instance.send(:update_schema, table_profile)
        expect(logger).to have_received(:info).with("Completed processing table: #{dataset_id}.#{table_id}")
      end
    end
  end

  describe '#fields_to_json' do
    let!(:policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"] }

    context '正常系' do
      it 'フィールドを JSON に変換して返す' do
        fields = [
          double(name: 'record', type: 'RECORD', mode: 'NULLABLE', policy_tags: policy_tags, fields: [
                   double(name: 'record_1', type: 'STRING', mode: 'NULLABLE', policy_tags: nil, fields: nil),
                   double(name: 'record_2', type: 'INTEGER', mode: 'NULLABLE', policy_tags: nil, fields: nil)
                 ]),
          double(name: 'string', type: 'STRING', mode: 'NULLABLE', policy_tags: nil, fields: nil),
          double(name: 'integer', type: 'INTEGER', mode: 'NULLABLE', policy_tags: nil, fields: nil)
        ]
        expected_json = [
          { name: 'record', type: 'RECORD', mode: 'NULLABLE', policyTags: { names: policy_tags }, fields: [
            { name: 'record_1', type: 'STRING', mode: 'NULLABLE' },
            { name: 'record_2', type: 'INTEGER', mode: 'NULLABLE' }
          ] },
          { name: 'string', type: 'STRING', mode: 'NULLABLE' },
          { name: 'integer', type: 'INTEGER', mode: 'NULLABLE' }
        ]
        expect(instance.send(:fields_to_json, fields)).to eq(expected_json)
      end
    end
  end

  describe '#new_schema' do
    let!(:schema) do
      double(
        name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
        fields: [
          double(name: 'id', type: 'STRING'),
          double(name: 'name', type: 'STRING'),
          double(name: 'email', type: 'STRING')
        ]
      )
    end
    let!(:column_profiles) { [1, 2, 3] }

    before do
      allow(dlp_client).to receive(:list_column_data_profiles).and_return(column_profiles)
      allow(instance).to receive(:update_column)
      allow(instance).to receive(:dlp_message).and_return(dlp_message_data)
    end

    context '正常系' do
      it 'スキーマをクローンして返す' do
        expect { instance.send(:new_schema, schema, dlp_message_data) }.not_to(change { schema })
      end

      it 'dlp_client.list_column_data_profiles を呼び出す' do
        instance.send(:new_schema, schema, dlp_message_data)
        expect(dlp_client).to have_received(:list_column_data_profiles).with(
          parent: dlp_message_data.profile.name.split('/')[0..3].join('/'),
          filter: "table_data_profile_name = #{dlp_message_data.profile.name}"
        )
      end

      it 'column_profiles の数だけ update_column を呼び出す' do
        instance.send(:new_schema, schema, dlp_message_data)
        expect(instance).to have_received(:update_column).exactly(column_profiles.size).times
      end
    end

    context '異常系' do
      it 'dlp_client.list_column_data_profiles が失敗した場合エラーを発生させる' do
        allow(dlp_client).to receive(:list_column_data_profiles).and_raise(StandardError)
        expect do
          instance.send(:new_schema, schema, dlp_message_data)
        end.to raise_error(StandardError)
      end
    end
  end

  describe '#update_column' do
    let!(:policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"] }
    let!(:aready_attached_policy_tags) { ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/aready_attached_policy_tag_id"] }
    let!(:id_column) do
      id_column = double(
        name: 'id',
        type: 'STRING',
        fields: []
      )
      allow(id_column).to receive(:policy_tags=) { |arg| id_column.instance_variable_set(:@policy_tags, arg) }
      allow(id_column).to receive(:policy_tags) { id_column.instance_variable_get(:@policy_tags) }
      id_column
    end
    let!(:name_column) do
      name_column = double(
        name: 'name',
        type: 'STRING',
        fields: []
      )
      allow(name_column).to receive(:policy_tags=) { |arg| name_column.instance_variable_set(:@policy_tags, arg) }
      allow(name_column).to receive(:policy_tags) { name_column.instance_variable_get(:@policy_tags) }
      name_column
    end
    let!(:email_column) do
      email_column = double(
        name: 'email',
        type: 'STRING',
        policy_tags: aready_attached_policy_tags,
        fields: []
      )
      allow(email_column).to receive(:policy_tags=) { |arg| email_column.instance_variable_set(:@policy_tags, arg) }
      allow(email_column).to receive(:policy_tags) { email_column.instance_variable_get(:@policy_tags) }
      email_column.policy_tags = aready_attached_policy_tags
      email_column
    end
    let!(:schema) do
      double(
        name: "projects/#{project_id}/locations/#{location}/schemas/0000000000000000000",
        fields: [id_column, name_column, email_column]
      )
    end

    before do
      allow(instance).to receive(:resolve_policy_tags).and_return(policy_tags)
    end

    context 'SENSITIVITY_HIGH の場合' do
      it 'id カラムにポリシータグを追加する' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'id',
          column_info_type: double(
            info_type: double(
              name: 'EMAIL_ADDRESS'
            )
          )
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[0].policy_tags).to eq(policy_tags)
      end

      it 'name カラムにポリシータグを追加する' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'name',
          column_info_type: double(
            info_type: double(
              name: 'EMAIL_ADDRESS'
            )
          )
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[1].policy_tags).to eq(policy_tags)
      end

      it 'ログに処理中メッセージを出力する' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'id'
        )
        instance.send(:update_column, schema, column_profile)
        expect(logger).to have_received(:info).with("Processing column: #{column_profile.column}")
      end
    end

    context 'SENSITIVITY_HIGH かつポリシータグが設定されている場合' do
      it 'email カラムのポリシータグを上書きする' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'email'
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[2].policy_tags).to eq(policy_tags)
      end

      it 'ログに処理中メッセージを出力する' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'email'
        )
        instance.send(:update_column, schema, column_profile)
        expect(logger).to have_received(:info).with("Processing column: #{column_profile.column}")
      end
    end

    context 'SENSITIVITY_LOW の場合' do
      it 'id カラムにポリシータグを追加しない' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_LOW'
          ),
          column: 'id'
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[0].policy_tags).to be_nil
      end

      it 'name カラムにポリシータグを追加しない' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_LOW'
          ),
          column: 'name'
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[1].policy_tags).to be_nil
      end

      it 'email カラムにポリシータグを追加しない' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_LOW'
          ),
          column: 'email'
        )
        instance.send(:update_column, schema, column_profile)
        expect(schema.fields[2].policy_tags).to eq(aready_attached_policy_tags)
      end

      it 'ログにスキップメッセージを出力する' do
        column_profile = double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_LOW'
          ),
          column: 'name'
        )
        instance.send(:update_column, schema, column_profile)
        expect(logger).to have_received(:info).with("Skip column: #{column_profile.column}")
      end
    end
  end

  describe '#should_process_column?' do
    context '設定ファイルの skip_columns にカラム名が含まれている場合' do
      let!(:column_profile) do
        double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'skip_column'
        )
      end

      it 'falseを返す' do
        expect(instance.send(:should_process_column?, column_profile)).to be false
      end
    end

    context '機密レベルが SCORE_THRESHOLD 以上の場合' do
      let!(:column_profile) do
        double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_HIGH'
          ),
          column: 'id'
        )
      end

      it 'trueを返す' do
        expect(instance.send(:should_process_column?, column_profile)).to be true
      end
    end

    context '機密レベルが SCORE_THRESHOLD 未満の場合' do
      let!(:column_profile) do
        double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: 'SENSITIVITY_UNKNOWN'
          ),
          column: 'id'
        )
      end

      it 'falseを返す' do
        expect(instance.send(:should_process_column?, column_profile)).to be false
      end
    end

    context '未知の機密レベルが与えられた場合' do
      let!(:invalid_sensitivity_level) { 'UNKNOWN' }
      let!(:column_profile) do
        double(
          table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
          sensitivity_score: double(
            score: invalid_sensitivity_level
          ),
          column: 'id'
        )
      end

      it 'falseを返す' do
        expect(instance.send(:should_process_column?, column_profile)).to be false
      end

      it '警告をログに出力' do
        instance.send(:should_process_column?, column_profile)
        expect(logger).to have_received(:warn).with("Unknown sensitivity level provided: #{invalid_sensitivity_level}")
      end
    end
  end

  describe '#resolve_policy_tags' do
    let!(:column_profile) do
      double(
        table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
        column_info_type: double(
          info_type: double(
            name: 'EMAIL_ADDRESS'
          )
        )
      )
    end

    it 'info_type に対応するポリシータグが設定されている場合対応しているポリシータグを返す' do
      expected_policy_tags = ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{policy_tag_id}"]
      expect(instance.send(:resolve_policy_tags, column_profile)).to eq(expected_policy_tags)
    end

    it 'info_type に対応するポリシータグが設定されていない場合 DEFAULT のポリシータグを返す' do
      column_profile = double(
        table_full_resource: "//bigquery.googleapis.com/projects/#{project_id}/datasets/dataset_id/tables/table_id",
        column_info_type: double(
          info_type: double(
            name: 'UNKNOWN_INFO_TYPE'
          )
        )
      )
      expected_policy_tags = ["projects/#{project_id}/locations/#{location}/taxonomies/#{taxonomies_id}/policyTags/#{default_policy_tag_id}"]
      expect(instance.send(:resolve_policy_tags, column_profile)).to eq(expected_policy_tags)
    end
  end

  describe '#config' do
    it 'CONFIG_YAML_PATH が設定されていない場合エラーを発生させる' do
      allow(ENV).to receive(:fetch).with('CONFIG_YAML_PATH').and_raise(KeyError)
      expect do
        instance.send(:config)
      end.to raise_error(KeyError)
    end

    it 'CONFIG_YAML_PATH が不正な場合エラーを発生させる' do
      allow(ENV).to receive(:fetch).with('CONFIG_YAML_PATH').and_return('none')
      allow(YAML).to receive(:safe_load_file).with('none', aliases: true).and_raise(Errno::ENOENT)

      expect do
        instance.send(:config)
      end.to raise_error(Errno::ENOENT)
    end
  end
end

ポリシータグのマッピング

検出された InfoType に応じて、適切なポリシータグを割り当てています。
対応するポリシータグが設定されていない場合は DEFAULT のポリシータグを付与します。

# files/function/bq_policy_tagger/config/prd.yml より抜粋
taxonomy_id: "xxxxxxxxxxxxxxxxxx"
policy_tags:
  high:
    self: &high "xxxxxxxxxxxxxxxxxx1"
    japan_bank_account: &high_japan_bank_account "xxxxxxxxxxxxxxxxxx2"
    password: &high_password "xxxxxxxxxxxxxxxxxx3"
    transaction_data: &high_transaction_data "xxxxxxxxxxxxxxxxxx4"
  medium:
    self: &medium "xxxxxxxxxxxxxxxxxx5"
    email_address: &medium_email_address "xxxxxxxxxxxxxxxxxx6"
    person_name: &medium_person_name "xxxxxxxxxxxxxxxxxx7"
    phone_number: &medium_phone_number "xxxxxxxxxxxxxxxxxx8"
  low:
    self: &low "xxxxxxxxxxxxxxxxxx9"
policy_tag_mapping:
  DEFAULT: *medium
  EMAIL_ADDRESS: *medium_email_address
  JAPAN_BANK_ACCOUNT: *high_japan_bank_account
  PASSWORD: *high_password
  PERSON_NAME: *medium_person_name
  PHONE_NUMBER: *medium_phone_number

スキップリスト

一部のカラムは、誤検出や業務上の理由でポリシータグの自動設定をスキップしています。

# files/function/bq_policy_tagger/config/prd.yml より抜粋
skip_columns:
  - admin_users.email # 社内ユーザーの情報なのでマスキングしない
  - contracts.bank_name # JAPAN_BANK_ACCOUNT を付与
  - projects.title # プロジェクトタイトルは公開情報のため除外
  # ...

元ネタ

今回の実装は Google Cloud のサンプル実装 bq-pii-classifier を参考にしています。

なぜ自作したのか

元ネタは Java で実装されており、カスタマイズが難しそうでした。また、READYFOR は Ruby をメイン言語として使っている会社なので、Ruby で実装すれば他のエンジニアも修正しやすくなります。

そのため、元ネタを参考にしつつ、必要最小限の機能に絞って Ruby で自作することにしました。

運用での工夫

1. スキャン頻度の調整

データセットの特性に応じてスキャン頻度を調整しています:

  • スキーマ変更時: 毎日チェック
  • テーブル更新時: 毎月チェック

これにより、DLP のスキャンコストを抑えつつ、新規カラムや変更されたカラムを適時検出できます。

2. 誤検出への対応

DLP は高精度ですが、誤検出も発生します。例えば

  • プロジェクトタイトルが人名と誤検出される
  • 都道府県名だけが入ったカラムが住所と誤検出される

これらは設定ファイルの skip_columns に追加することで除外しています。

効果

この仕組みを約1年間運用してきましたが、安定して動作しています。

セキュリティの向上

  • 個人情報を含むカラムが自動的に検出され、適切なアクセス制御が適用される
  • 新規テーブルやカラムも自動的に保護される
  • 人的ミスによる漏れを防止

運用負荷の削減

  • 手動でのポリシータグ設定作業が不要に
  • 新規テーブル追加時の対応が自動化
  • 監査対応が容易に

可視化

  • どのテーブル・カラムに個人情報が含まれるか一覧化
  • DLP のダッシュボードで機密データの分布を把握

安定運用

約1年間の運用を通じて、以下の点が確認できました:

  • Cloud Functions のエラー率は極めて低く、安定して動作
  • DLP のスキャンコストも想定内に収まっている
  • 誤検出は初期に skip_columns で対応し、その後は追加がほぼ不要
  • 新規テーブルやカラム追加時も自動的にタグが付与され、運用負荷がほぼゼロ

ハマったポイント

1. サービスアカウントの権限借用が必要

ローカルでの開発時、gcloud auth application-default login でログインした権限では DLP 系の API が叩けませんでした。

--impersonate-service-account オプションを使用してサービスアカウントの権限を借用する必要があります。

gcloud auth application-default login --impersonate-service-account=SERVICE_ACCOUNT_EMAIL

Terraform の Google Provider でも同様に設定します

provider "google" {
  project        = local.google_project_id
  region         = "asia-northeast1"
  # Application Default Credentials を利用すると DLP API が利用できないため、Terraform のサービスアカウントの権限借用をする
  impersonate_service_account = "terraform@xxx.iam.gserviceaccount.com"
}

2. DLP API の Rate Limit

DLP のスキャン結果を確認する際、すぐに rate limit にかかってしまいます。

実装時は適切なリトライ処理を入れる必要がありました。

3. DLP 開発用のスクリプトを作成

実装中に DLP のスキャン結果を確認したり、テスト後にプロファイルを削除して再スキャンしたりする作業が頻繁に発生しました。

Rate Limit にもかかるため、手作業では辛く、以下のようなスクリプトを作成しました:

これらのスクリプトは、開発環境でのテストや動作確認を大幅に効率化してくれました。

おわりに

Google Cloud DLP と Cloud Functions を組み合わせることで、BigQuery のポリシータグを自動設定する仕組みを構築できました。

セキュリティ向上と運用負荷削減を両立できる様になったことは、大きな成果だと感じています。

明日は READYFOR Advent Calendar 2025 9日目、resqnet さんによる記事が公開される予定です。

お楽しみに!

READYFORテックブログ

Discussion