😷

[小ネタ] マスキングされてていかにもコピーしちゃダメそうなデータをコピーしたい in Snowflake

に公開

はじめに

本番環境データに近しいデータを検証環境・開発環境などで使いたいときって、あるじゃないですか。
テーブル同士でキーが突合できるデータ、データ量が実体に近いデータ、自分で作るのはめんどいな〜って時、稀によくあるじゃないですか。
そこで、Snowflake ユーザーであれば真っ先に思いつくのがクローンです。タグベースのマスキングポリシーによる保護は維持されるし、コスト的にも実行にかかる時間的にも、クローンするのが一番簡単な対応方法でしょう。
でも、マスキングされているとはいっても、本番データは本番環境以外の環境に入れてはいかんぞ!という制約がかけられた状況、あると思います。そこを何とか...と言いたい気持ちはあるものの、そこを何とかで済んだらデータセキュリティはいらなくなっちゃいますよね。
そんなとき、データをコピーするためにやったことをご紹介します。

やったこと

生の本番データは困るけど、マスキングされたデータならOKってことですよね。
...じゃあ、マスキングで値が変換されたテーブルをコピーするストアドプロシージャを作っちゃいましょうか。

準備

コピー元スキーマ・コピー先スキーマ、タグとマスキングポリシー、マスキング確認用ロールを作っておきます。

スキーマ名 役割
M_KAJIYA_DB1.PUBLIC コピー元
M_KAJIYA_DB2.PUBLIC コピー先
ロール名 役割 コピー元スキーマに対する権限 コピー先スキーマに対する権限
READWRITE_TEST_ROLE 開発者を想定。
書き込み可能、マスキングが適用されない
INSERT, DELETE, SELECT INSERT, DELETE, SELECT
READ_TEST_ROLE 一般利用者を想定。
読み取り専用、マスキングが適用される
SELECT SELECT
クエリ
use role SYSADMIN;

-- コピー元
create or replace database m_kajiya_db1;
create schema if not exists m_kajiya_db1.public;

-- コピー先を作る
create or replace database m_kajiya_db2;
create schema if not exists m_kajiya_db2.public;

use schema m_kajiya_db1.public;

--------------------------------------------------------------------

-- マスキング確認用ロール
-- アクセスロール・ファンクショナルロールの構成は省略、、、

-- READ_TEST_ROLE
-- コピー元テーブルを SELECT 可能、マスクされた値しか参照できない
-- コピー先テーブルを SELECT 可能
use role securityadmin;
create or replace role READ_TEST_ROLE;
grant role READ_TEST_ROLE to role SYSADMIN;
grant usage on database m_kajiya_db1 to role READ_TEST_ROLE;
grant usage on schema m_kajiya_db1.public to role READ_TEST_ROLE;
grant select on future tables in schema m_kajiya_db1.public to role READ_TEST_ROLE;
grant usage on database m_kajiya_db2 to role READ_TEST_ROLE;
grant usage on schema m_kajiya_db2.public to role READ_TEST_ROLE;
grant select on future tables in schema m_kajiya_db2.public to role READ_TEST_ROLE;

grant usage on warehouse m_kajiya_wh to role READ_TEST_ROLE;

-- READWRITE_TEST_ROLE
-- コピー元テーブルを SELECT・INSERT・DELETE 可能
-- コピー先テーブルを SELECT・INSERT・DELETE 可能
create or replace role READWRITE_TEST_ROLE;
grant role READWRITE_TEST_ROLE to role SYSADMIN;
grant role READ_TEST_ROLE to role READWRITE_TEST_ROLE;
grant insert,delete on future tables in schema m_kajiya_db1.public to role READWRITE_TEST_ROLE;
grant insert,delete on future tables in schema m_kajiya_db2.public to role READWRITE_TEST_ROLE;

-- マスキングに使用するUDF
use role sysadmin;
create or replace function m_kajiya_db1.public.mask_udf(val string)
    returns varchar
    as
$$
sha1(val)
$$
;

desc function m_kajiya_db1.public.mask_udf(string);

-- マスキングポリシー
-- READ_TEST_ROLE には見せられないよ
create or replace masking policy m_kajiya_db1.public.mask
as (val string) returns string ->
  case
    when current_role() = 'READ_TEST_ROLE' then mask_udf(val)
    else val
  end
;

-- タグを作成し、ポリシーを紐づけ
create tag if not exists m_kajiya_db1.public.tag;

use role accountadmin; -- sysadmin だと権限が足りない
alter tag m_kajiya_db1.public.tag set masking policy m_kajiya_db1.public.mask;

実装

さて、マスキングを保持しつつテーブルをコピーするやつを作ります。
こんなストアドを用意します。

ストアド
CREATE OR REPLACE PROCEDURE copy_masked_table_in_schema(src_database VARCHAR, src_schema VARCHAR, dst_database VARCHAR,  dst_schema VARCHAR)
  RETURNS VARCHAR
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  PACKAGES = ('snowflake-snowpark-python==1.30.0', 'snowflake==1.1.0')
  HANDLER = 'main'
  COMMENT = '指定スキーマにある全てのテーブルをコピーする。コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。マスキングされていないテーブルはクローンする。注意:(1) データベース名、スキーマ名は解決された識別子で指定すること。(2) ポリシー設定検出に SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES を使用するので、ポリシー設定が POLICY_REFERENCES で検出できるのを確認してから実行すること。(3) 単一のマスキング関数のみ対応。(4) テーブルの Ownership はコピー元と同じロールとなる。'
  EXECUTE AS OWNER
  AS
$$
from logging import Formatter, Logger, StreamHandler, getLogger

import snowflake.snowpark as snowpark
from snowflake.core import CreateMode, Root
from snowflake.core.table import Table


class MaskedTableCopier:
    '''マスキングされたテーブルをコピーする'''

    def __init__(self, session: snowpark.Session):
        '''
        - Snowflake セッションを設定
        - セッションから Snowflake Python API インスタンスを取得
        - ログ出力の設定
        - 固定のメッセージを設定

        Parameters
        ----------
        session : snowpark.Session
            Snowflakeセッション
        '''
        self.session = session
        self.root = Root(self.session)
        self.success_msg = {'STATUS': 'Succeeded'}

        # ログ出力の設定
        self.logger = getLogger(__name__)
        handler = StreamHandler()
        handler.setFormatter(
            Formatter('[%(asctime)s] %(name)s %(levelname)s [%(lineno)d]: %(message)s')
        )
        self.logger.addHandler(handler)
        self.logger.propagate = False  # 上位ロガーにイベントを渡さない

    def get_error_msg(self, details: str, excepion: Exception) -> dict:
        '''エラー発生時にストアドプロシージャが返すメッセージを作成'''
        msg = {
            'STATUS': 'Error',
            'DETAILS': details,
            'EXCEPTION': excepion
        }
        self.logger.error(msg)
        return msg

    def copy_masked_table(
            self,
            src_table: Table,
            src_policy_ref_columns: list[str],
            dst_database: str,
            dst_schema: str,
            masking_function: str,
            sep=','):
        '''マスキングされたテーブルをコピー(CREATE TABLE LIKE + INSERT INTO)する。

        Parameters
        ----------
        src_table : snowflake.core.table.Table
            コピー元テーブル
        src_policy_ref_columns : List[str]
            マスキング対象のカラム名リスト
        dst_database : str
            コピー先データベース名
        dst_schema : str
            コピー先スキーマ名
        masking_function : str
            マスキングに使用するUDF名
        sep : str, optional
            カラム名を結合する際の区切り文字。デフォルトは ','
        '''
        # src 側
        # dst_cols = sep_comma.join(src_table.columns)  # NOTE: snowflake.core.table.Table.columns で取得できそうだが None なので使わない
        src_table_df = self.session.table(
            f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"'
        )
        src_policy_ref_columnss = src_table_df.select(src_policy_ref_columns).columns  # NOTE: Dataframe から columns を取得すると " が必要なカラムには " が付く。POLICY_REFERENCES の値だと " が付かないので判別が面倒
        src_masked_columns_str = sep.join(
            [f'{masking_function}({c}) as {c}' for c in src_policy_ref_columnss]
        )

        # dst 側
        dst_columns_str = sep.join(src_table_df.columns)  # " が必要なカラムには " が付くのでこれでよい

        try:
            # 新規作成するか定義を上書き(コピー元で定義を変更した場合に備えて)
            dst_tables = self.root.databases[dst_database].schemas[dst_schema].tables
            dst_tables.create(
                table=src_table.name,
                like_table=f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"',
                copy_grants=True,
                mode=CreateMode.or_replace
            )
            # 保護されている列を変換しつつ insert
            self.session.sql(
                f'''
                    insert into "{dst_database}"."{dst_schema}"."{src_table.name}" (
                        {dst_columns_str}
                    )
                        with cte AS (
                            select
                                *
                                replace ({src_masked_columns_str})
                            from
                                "{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"
                        )
                        select
                            {dst_columns_str}
                        from
                            cte;
                '''
            ).collect()

        except Exception as e:
            return self.get_error_msg(
                f'Failed to insert masked records from `"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"` to `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
                e
            )

    def copy_nonmasked_table(
            self,
            src_table: Table,
            dst_database: str,
            dst_schema: str):
        '''指定したデータベース・スキーマにテーブルをコピーする。
        コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。マスキングされていないテーブルはクローンする。

        Parameters
        ----------
        src_table : snowflake.core.table.Table
            コピー元テーブル
        dst_database : str
            コピー先データベース名
        dst_schema : str
            コピー先スキーマ名
        '''
        try:
            dst_tables = self.root.databases[dst_database].schemas[dst_schema].tables
            dst_tables.create(
                table=src_table.name,
                clone_table=f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"',
                copy_grants=True,
                mode=CreateMode.or_replace
            )

        except Exception as e:
            return self.get_error_msg(
                f'Failed to clone from `"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"` to `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
                e
            )

    def copy_table(
            self,
            src_table: Table,
            dst_database: str,
            dst_schema: str,
            masking_function: str) -> dict:
        '''異なるDB間でテーブルのコピーを行う。
        - コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をinsertする。
        - マスキングされていないテーブルはクローンする。
        コピー先テーブルの Ownership はコピー元と同じロールになる。

        Parameters
        ----------
        src_table : snowflake.core.table.Table
            コピー元テーブル
        dst_database : str
            コピー先データベース名
        dst_schema : str
            コピー先スキーマ名
        masking_function : str
            マスキングに使用するUDF名
        sep : str, optional
            カラム名を結合する際の区切り文字。デフォルトは ','
        '''
        # NOTE:
        # desc table <table> で、設定されたポリシー名が取得できるはずだが、できないので policy_references で取得する。
        # また、ACCOUNTADMIN で information_schema.policy_references() を呼ぶと
        # 「Requested information on the current user is not accessible in stored procedure.」 となるので ACCOUNT_USAGE.POLICY_REFERENCES を呼ぶ
        # cf. https://community.snowflake.com/s/article/Stored-Procedure-fails-with-error-Requested-information-on-the-current-user-is-not-accessible-in-stored-procedure
        query = f'''
            select distinct
                REF_COLUMN_NAME as COL
            from
                SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES
            where
                ref_database_name = '{src_table.database_name}'
                and ref_schema_name = '{src_table.schema_name}'
                and ref_entity_name = '{src_table.name}' -- ここでの一致判定はダブルクオート不要
            ;
        '''
        self.logger.debug(f'query: {query}')
        policy_refs = self.session.sql(query)

        # TODO: Dataframe -> list にするもっときれいな方法があれば、置き換えたい。。。
        #  (例えば、PySpark なら rdd.map(lambda x: x).collect() のように書ける)
        src_policy_ref_columnss = [
            f'"{r["COL"]}"' for r in policy_refs.to_local_iterator()
        ]
        self.logger.info(f'Columns with policy: {src_policy_ref_columnss}')
        
        # コピー実行
        # マスキングが必要な場合
        if len(src_policy_ref_columnss) > 0:
            self.logger.info(f'Copy masked table: {src_table.name}')
            self.copy_masked_table(
                src_table=src_table,
                src_policy_ref_columns=src_policy_ref_columnss,
                dst_database=dst_database,
                dst_schema=dst_schema,
                masking_function=masking_function,
                sep=','
            )

        # マスキングが不要な場合
        else:
            self.logger.info(f'Copy nonmasked table: {src_table.name}')
            self.copy_nonmasked_table(
                src_table=src_table,
                dst_database=dst_database,
                dst_schema=dst_schema
            )

        try:
            # Ownership をコピー元と同じロールに変更
            # src_table.owner が None なことがあるので使わずに show tables から取得する
            query = f'''
                show tables like '{src_table.name}' in schema "{src_table.database_name}"."{src_table.schema_name}"
            '''
            self.logger.debug(f'query: {query}')
            src_table_show = self.session.sql(query).collect()[0]
            owner_role = src_table_show['owner']

            query = f'''
                    grant ownership on table "{dst_database}"."{dst_schema}"."{src_table.name}"
                        to role "{owner_role}"
                        copy current grants
                '''
            self.logger.debug(f'query: {query}')
            self.session.sql(query).collect()

        except Exception as e:
            return self.get_error_msg(
                f'Failed to grant ownership on table `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
                e
            )

        return {'STATUS': 'Succeeded', 'TABLE': src_table.name}

    def copy_tables_in_schema(
            self,
            src_database: str,
            src_schema: str,
            dst_database: str,
            dst_schema: str,
            masking_function: str) -> dict:
        '''異なるデータベース・スキーマ間でテーブルのコピーを行う。
           コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。
           マスキングされていないテーブルはクローンする

        Parameters
        ----------
        src_database: str
            コピー元データベース
        src_schema: str
            コピー元スキーマ。全てのテーブルがコピー対象となる
        dst_database: str
            コピー先データベース
        dst_schema: str
            コピー先スキーマ
        masking_function : str, optional
            マスキングに使用するUDF名

        Returns
        -------
        dict
            status
        '''
        # コピー元テーブルを取得
        root = Root(self.session)
        src_tables = root.databases[src_database].schemas[src_schema].tables

        # NOTE: 対象が大量でパフォーマンスが気になるなら iter_async() のほうがいいかも
        for table in src_tables.iter():
            msg = self.copy_table(
                src_table=table,
                dst_database=dst_database,
                dst_schema=dst_schema,
                masking_function=masking_function
            )
            self.logger.info(f'Table copy result: {msg}')

        return self.success_msg


def main(
        session: snowpark.Session,
        src_database: str,
        src_schema: str,
        dst_database: str,
        dst_schema: str) -> dict:
    '''異なるデータベース・スキーマ間でテーブルのコピーを行う

    Parameters
    ----------
    session: snowpark.Session
        Snowflakeセッション
    src_database: str
        コピー元データベース
    src_schema: str
        コピー元スキーマ
    dst_database: str
        コピー先データベース
    dst_schema: str
        コピー先スキーマ

    Returns
    -------
    dict
        処理結果のメッセージ
    '''
    copier = MaskedTableCopier(session)
    return copier.copy_tables_in_schema(
        src_database=src_database,
        src_schema=src_schema,
        dst_database=dst_database,
        dst_schema=dst_schema,
        masking_function='mask_udf'
    )
$$
;

-- 検証実施者が自分でコピーできるようにしておきたいので、RWロールで使用可能にしておく。
grant usage on procedure copy_masked_table_in_schema(varchar, varchar) to role READWRITE_TEST_ROLE;

実践

早速使ってみましょう。保護された列があるテーブルを用意します。

use role sysadmin;
create or replace table m_kajiya_db1.public.tbl (
    id int
    , name varchar
    , etc varchar
) as
select
    *
from (
    values
        (1, 'ほげほげ', 'etc'),
        (2, 'ぴよぴよ', 'hosts'),
        (3, null, null)
);

use role accountadmin;
alter table m_kajiya_db1.public.tbl alter column name set tag m_kajiya_db1.public.tag = 'true';

マスクされていることを確認します。

use role READ_TEST_ROLE;
table m_kajiya_db1.public.tbl;

returns:

実行する前に、ACCOUNT_USAGE.POLICY_REFERENCES からマスキング情報を取得できるかも確認しておきます。
(ストアド内で ACCOUNT_USAGE.POLICY_REFERENCES を参照するため。なので、マスキング用のタグを付与したばかりのテーブルでは、マスクされずにコピーされてしまいます。注意。。。)

クエリ
use role accountadmin;

-- 遅延に注意
select distinct
    REF_DATABASE_NAME as database
    , REF_SCHEMA_NAME as schema
    , REF_ENTITY_NAME as entity
    , REF_COLUMN_NAME as COL
from
    SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES
where
    ref_database_name = 'M_KAJIYA_DB1'
    and ref_schema_name = 'PUBLIC'
    and ref_entity_name = 'TBL'
;

さて、コピーを実行します。

use role READWRITE_TEST_ROLE;
call copy_masked_table_in_schema('M_KAJIYA_DB1.PUBLIC', 'M_KAJIYA_DB2.PUBLIC');
-- COPY_MASKED_TABLE_IN_SCHEMA
-- {'STATUS': 'Succeeded'}

ちゃんとマスクされた値が入っていますね。

table m_kajiya_db2.public.tbl;

returns:

注意

マスキング処理により VARCHAR などのサイズを超えると、次のエラーになります。サイズを修正するか、マスキング処理を修正するかが必要です。

{'STATUS': 'Error', 'DETAILS': 'Failed to insert masked records from `M_KAJIYA_DB1.PUBLIC."TBL"` to `M_KAJIYA_DB2.PUBLIC."TBL"`', 'EXCEPTION': SnowparkSQLException("100078 (22000): 01bbebd6-0002-5116-0000-2c79041b6f82: String '4bb7ae317ea728941c77bea79c869db015d48bc6' is too long and would be truncated", '1304', '01bbebd6-0002-5116-0000-2c79041b6f82')}
こんな感じの状況
create or replace table m_kajiya_db1.public.tbl (
    id int
    , name varchar(10) -- sha1() かけたら、まあ、超えるよね
    , etc varchar(10)
) as
select
    *
from (
    values
        (1, 'ほげほげ', 'etc'),
        (2, 'ぴよぴよ', 'hosts'),
        (3, null, null)
);

use role accountadmin;
alter table m_kajiya_db1.public.tbl alter column name set tag m_kajiya_db1.public.tag = 'true';

use role READWRITE_TEST_ROLE;
call copy_masked_table_in_schema('M_KAJIYA_DB1.PUBLIC', 'M_KAJIYA_DB2.PUBLIC');
-- COPY_MASKED_TABLE_IN_SCHEMA
-- {'STATUS': 'Error', 'DETAILS': 'Failed to insert masked records from `M_KAJIYA_DB1.PUBLIC."TBL"` to `M_KAJIYA_DB2.PUBLIC."TBL"`', 'EXCEPTION': SnowparkSQLException("100078 (22000): 01bbebd6-0002-5116-0000-2c79041b6f82: String '4bb7ae317ea728941c77bea79c869db015d48bc6' is too long and would be truncated", '1304', '01bbebd6-0002-5116-0000-2c79041b6f82')}

おわりに

全部マスキングでよくなれ!!!!!!
マスクされているから大丈夫!がまかり通らない、そんな環境のときに、用法用量を守ってお使いください。

DATUM STUDIO

Discussion