[小ネタ] マスキングされてていかにもコピーしちゃダメそうなデータをコピーしたい in Snowflake
はじめに
本番環境データに近しいデータを検証環境・開発環境などで使いたいときって、あるじゃないですか。
テーブル同士でキーが突合できるデータ、データ量が実体に近いデータ、自分で作るのはめんどいな〜って時、稀によくあるじゃないですか。
そこで、Snowflake ユーザーであれば真っ先に思いつくのがクローンです。タグベースのマスキングポリシーによる保護は維持されるし、コスト的にも実行にかかる時間的にも、クローンするのが一番簡単な対応方法でしょう。
でも、マスキングされているとはいっても、本番データは本番環境以外の環境に入れてはいかんぞ!という制約がかけられた状況、あると思います。そこを何とか...と言いたい気持ちはあるものの、そこを何とかで済んだらデータセキュリティはいらなくなっちゃいますよね。
そんなとき、データをコピーするためにやったことをご紹介します。
やったこと
生の本番データは困るけど、マスキングされたデータならOKってことですよね。
...じゃあ、マスキングで値が変換されたテーブルをコピーするストアドプロシージャを作っちゃいましょうか。
準備
コピー元スキーマ・コピー先スキーマ、タグとマスキングポリシー、マスキング確認用ロールを作っておきます。
スキーマ名 | 役割 |
---|---|
M_KAJIYA_DB1.PUBLIC | コピー元 |
M_KAJIYA_DB2.PUBLIC | コピー先 |
ロール名 | 役割 | コピー元スキーマに対する権限 | コピー先スキーマに対する権限 |
---|---|---|---|
READWRITE_TEST_ROLE | 開発者を想定。 書き込み可能、マスキングが適用されない |
INSERT, DELETE, SELECT | INSERT, DELETE, SELECT |
READ_TEST_ROLE | 一般利用者を想定。 読み取り専用、マスキングが適用される |
SELECT | SELECT |
クエリ
use role SYSADMIN;
-- コピー元
create or replace database m_kajiya_db1;
create schema if not exists m_kajiya_db1.public;
-- コピー先を作る
create or replace database m_kajiya_db2;
create schema if not exists m_kajiya_db2.public;
use schema m_kajiya_db1.public;
--------------------------------------------------------------------
-- マスキング確認用ロール
-- アクセスロール・ファンクショナルロールの構成は省略、、、
-- READ_TEST_ROLE
-- コピー元テーブルを SELECT 可能、マスクされた値しか参照できない
-- コピー先テーブルを SELECT 可能
use role securityadmin;
create or replace role READ_TEST_ROLE;
grant role READ_TEST_ROLE to role SYSADMIN;
grant usage on database m_kajiya_db1 to role READ_TEST_ROLE;
grant usage on schema m_kajiya_db1.public to role READ_TEST_ROLE;
grant select on future tables in schema m_kajiya_db1.public to role READ_TEST_ROLE;
grant usage on database m_kajiya_db2 to role READ_TEST_ROLE;
grant usage on schema m_kajiya_db2.public to role READ_TEST_ROLE;
grant select on future tables in schema m_kajiya_db2.public to role READ_TEST_ROLE;
grant usage on warehouse m_kajiya_wh to role READ_TEST_ROLE;
-- READWRITE_TEST_ROLE
-- コピー元テーブルを SELECT・INSERT・DELETE 可能
-- コピー先テーブルを SELECT・INSERT・DELETE 可能
create or replace role READWRITE_TEST_ROLE;
grant role READWRITE_TEST_ROLE to role SYSADMIN;
grant role READ_TEST_ROLE to role READWRITE_TEST_ROLE;
grant insert,delete on future tables in schema m_kajiya_db1.public to role READWRITE_TEST_ROLE;
grant insert,delete on future tables in schema m_kajiya_db2.public to role READWRITE_TEST_ROLE;
-- マスキングに使用するUDF
use role sysadmin;
create or replace function m_kajiya_db1.public.mask_udf(val string)
returns varchar
as
$$
sha1(val)
$$
;
desc function m_kajiya_db1.public.mask_udf(string);
-- マスキングポリシー
-- READ_TEST_ROLE には見せられないよ
create or replace masking policy m_kajiya_db1.public.mask
as (val string) returns string ->
case
when current_role() = 'READ_TEST_ROLE' then mask_udf(val)
else val
end
;
-- タグを作成し、ポリシーを紐づけ
create tag if not exists m_kajiya_db1.public.tag;
use role accountadmin; -- sysadmin だと権限が足りない
alter tag m_kajiya_db1.public.tag set masking policy m_kajiya_db1.public.mask;
実装
さて、マスキングを保持しつつテーブルをコピーするやつを作ります。
こんなストアドを用意します。
ストアド
CREATE OR REPLACE PROCEDURE copy_masked_table_in_schema(src_database VARCHAR, src_schema VARCHAR, dst_database VARCHAR, dst_schema VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python==1.30.0', 'snowflake==1.1.0')
HANDLER = 'main'
COMMENT = '指定スキーマにある全てのテーブルをコピーする。コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。マスキングされていないテーブルはクローンする。注意:(1) データベース名、スキーマ名は解決された識別子で指定すること。(2) ポリシー設定検出に SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES を使用するので、ポリシー設定が POLICY_REFERENCES で検出できるのを確認してから実行すること。(3) 単一のマスキング関数のみ対応。(4) テーブルの Ownership はコピー元と同じロールとなる。'
EXECUTE AS OWNER
AS
$$
from logging import Formatter, Logger, StreamHandler, getLogger
import snowflake.snowpark as snowpark
from snowflake.core import CreateMode, Root
from snowflake.core.table import Table
class MaskedTableCopier:
'''マスキングされたテーブルをコピーする'''
def __init__(self, session: snowpark.Session):
'''
- Snowflake セッションを設定
- セッションから Snowflake Python API インスタンスを取得
- ログ出力の設定
- 固定のメッセージを設定
Parameters
----------
session : snowpark.Session
Snowflakeセッション
'''
self.session = session
self.root = Root(self.session)
self.success_msg = {'STATUS': 'Succeeded'}
# ログ出力の設定
self.logger = getLogger(__name__)
handler = StreamHandler()
handler.setFormatter(
Formatter('[%(asctime)s] %(name)s %(levelname)s [%(lineno)d]: %(message)s')
)
self.logger.addHandler(handler)
self.logger.propagate = False # 上位ロガーにイベントを渡さない
def get_error_msg(self, details: str, excepion: Exception) -> dict:
'''エラー発生時にストアドプロシージャが返すメッセージを作成'''
msg = {
'STATUS': 'Error',
'DETAILS': details,
'EXCEPTION': excepion
}
self.logger.error(msg)
return msg
def copy_masked_table(
self,
src_table: Table,
src_policy_ref_columns: list[str],
dst_database: str,
dst_schema: str,
masking_function: str,
sep=','):
'''マスキングされたテーブルをコピー(CREATE TABLE LIKE + INSERT INTO)する。
Parameters
----------
src_table : snowflake.core.table.Table
コピー元テーブル
src_policy_ref_columns : List[str]
マスキング対象のカラム名リスト
dst_database : str
コピー先データベース名
dst_schema : str
コピー先スキーマ名
masking_function : str
マスキングに使用するUDF名
sep : str, optional
カラム名を結合する際の区切り文字。デフォルトは ','
'''
# src 側
# dst_cols = sep_comma.join(src_table.columns) # NOTE: snowflake.core.table.Table.columns で取得できそうだが None なので使わない
src_table_df = self.session.table(
f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"'
)
src_policy_ref_columnss = src_table_df.select(src_policy_ref_columns).columns # NOTE: Dataframe から columns を取得すると " が必要なカラムには " が付く。POLICY_REFERENCES の値だと " が付かないので判別が面倒
src_masked_columns_str = sep.join(
[f'{masking_function}({c}) as {c}' for c in src_policy_ref_columnss]
)
# dst 側
dst_columns_str = sep.join(src_table_df.columns) # " が必要なカラムには " が付くのでこれでよい
try:
# 新規作成するか定義を上書き(コピー元で定義を変更した場合に備えて)
dst_tables = self.root.databases[dst_database].schemas[dst_schema].tables
dst_tables.create(
table=src_table.name,
like_table=f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"',
copy_grants=True,
mode=CreateMode.or_replace
)
# 保護されている列を変換しつつ insert
self.session.sql(
f'''
insert into "{dst_database}"."{dst_schema}"."{src_table.name}" (
{dst_columns_str}
)
with cte AS (
select
*
replace ({src_masked_columns_str})
from
"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"
)
select
{dst_columns_str}
from
cte;
'''
).collect()
except Exception as e:
return self.get_error_msg(
f'Failed to insert masked records from `"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"` to `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
e
)
def copy_nonmasked_table(
self,
src_table: Table,
dst_database: str,
dst_schema: str):
'''指定したデータベース・スキーマにテーブルをコピーする。
コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。マスキングされていないテーブルはクローンする。
Parameters
----------
src_table : snowflake.core.table.Table
コピー元テーブル
dst_database : str
コピー先データベース名
dst_schema : str
コピー先スキーマ名
'''
try:
dst_tables = self.root.databases[dst_database].schemas[dst_schema].tables
dst_tables.create(
table=src_table.name,
clone_table=f'"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"',
copy_grants=True,
mode=CreateMode.or_replace
)
except Exception as e:
return self.get_error_msg(
f'Failed to clone from `"{src_table.database_name}"."{src_table.schema_name}"."{src_table.name}"` to `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
e
)
def copy_table(
self,
src_table: Table,
dst_database: str,
dst_schema: str,
masking_function: str) -> dict:
'''異なるDB間でテーブルのコピーを行う。
- コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をinsertする。
- マスキングされていないテーブルはクローンする。
コピー先テーブルの Ownership はコピー元と同じロールになる。
Parameters
----------
src_table : snowflake.core.table.Table
コピー元テーブル
dst_database : str
コピー先データベース名
dst_schema : str
コピー先スキーマ名
masking_function : str
マスキングに使用するUDF名
sep : str, optional
カラム名を結合する際の区切り文字。デフォルトは ','
'''
# NOTE:
# desc table <table> で、設定されたポリシー名が取得できるはずだが、できないので policy_references で取得する。
# また、ACCOUNTADMIN で information_schema.policy_references() を呼ぶと
# 「Requested information on the current user is not accessible in stored procedure.」 となるので ACCOUNT_USAGE.POLICY_REFERENCES を呼ぶ
# cf. https://community.snowflake.com/s/article/Stored-Procedure-fails-with-error-Requested-information-on-the-current-user-is-not-accessible-in-stored-procedure
query = f'''
select distinct
REF_COLUMN_NAME as COL
from
SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES
where
ref_database_name = '{src_table.database_name}'
and ref_schema_name = '{src_table.schema_name}'
and ref_entity_name = '{src_table.name}' -- ここでの一致判定はダブルクオート不要
;
'''
self.logger.debug(f'query: {query}')
policy_refs = self.session.sql(query)
# TODO: Dataframe -> list にするもっときれいな方法があれば、置き換えたい。。。
# (例えば、PySpark なら rdd.map(lambda x: x).collect() のように書ける)
src_policy_ref_columnss = [
f'"{r["COL"]}"' for r in policy_refs.to_local_iterator()
]
self.logger.info(f'Columns with policy: {src_policy_ref_columnss}')
# コピー実行
# マスキングが必要な場合
if len(src_policy_ref_columnss) > 0:
self.logger.info(f'Copy masked table: {src_table.name}')
self.copy_masked_table(
src_table=src_table,
src_policy_ref_columns=src_policy_ref_columnss,
dst_database=dst_database,
dst_schema=dst_schema,
masking_function=masking_function,
sep=','
)
# マスキングが不要な場合
else:
self.logger.info(f'Copy nonmasked table: {src_table.name}')
self.copy_nonmasked_table(
src_table=src_table,
dst_database=dst_database,
dst_schema=dst_schema
)
try:
# Ownership をコピー元と同じロールに変更
# src_table.owner が None なことがあるので使わずに show tables から取得する
query = f'''
show tables like '{src_table.name}' in schema "{src_table.database_name}"."{src_table.schema_name}"
'''
self.logger.debug(f'query: {query}')
src_table_show = self.session.sql(query).collect()[0]
owner_role = src_table_show['owner']
query = f'''
grant ownership on table "{dst_database}"."{dst_schema}"."{src_table.name}"
to role "{owner_role}"
copy current grants
'''
self.logger.debug(f'query: {query}')
self.session.sql(query).collect()
except Exception as e:
return self.get_error_msg(
f'Failed to grant ownership on table `"{dst_database}"."{dst_schema}"."{src_table.name}"`',
e
)
return {'STATUS': 'Succeeded', 'TABLE': src_table.name}
def copy_tables_in_schema(
self,
src_database: str,
src_schema: str,
dst_database: str,
dst_schema: str,
masking_function: str) -> dict:
'''異なるデータベース・スキーマ間でテーブルのコピーを行う。
コピー元テーブルでカラムがマスキングされている場合、マスキングされた値をコピーする。
マスキングされていないテーブルはクローンする
Parameters
----------
src_database: str
コピー元データベース
src_schema: str
コピー元スキーマ。全てのテーブルがコピー対象となる
dst_database: str
コピー先データベース
dst_schema: str
コピー先スキーマ
masking_function : str, optional
マスキングに使用するUDF名
Returns
-------
dict
status
'''
# コピー元テーブルを取得
root = Root(self.session)
src_tables = root.databases[src_database].schemas[src_schema].tables
# NOTE: 対象が大量でパフォーマンスが気になるなら iter_async() のほうがいいかも
for table in src_tables.iter():
msg = self.copy_table(
src_table=table,
dst_database=dst_database,
dst_schema=dst_schema,
masking_function=masking_function
)
self.logger.info(f'Table copy result: {msg}')
return self.success_msg
def main(
session: snowpark.Session,
src_database: str,
src_schema: str,
dst_database: str,
dst_schema: str) -> dict:
'''異なるデータベース・スキーマ間でテーブルのコピーを行う
Parameters
----------
session: snowpark.Session
Snowflakeセッション
src_database: str
コピー元データベース
src_schema: str
コピー元スキーマ
dst_database: str
コピー先データベース
dst_schema: str
コピー先スキーマ
Returns
-------
dict
処理結果のメッセージ
'''
copier = MaskedTableCopier(session)
return copier.copy_tables_in_schema(
src_database=src_database,
src_schema=src_schema,
dst_database=dst_database,
dst_schema=dst_schema,
masking_function='mask_udf'
)
$$
;
-- 検証実施者が自分でコピーできるようにしておきたいので、RWロールで使用可能にしておく。
grant usage on procedure copy_masked_table_in_schema(varchar, varchar) to role READWRITE_TEST_ROLE;
実践
早速使ってみましょう。保護された列があるテーブルを用意します。
use role sysadmin;
create or replace table m_kajiya_db1.public.tbl (
id int
, name varchar
, etc varchar
) as
select
*
from (
values
(1, 'ほげほげ', 'etc'),
(2, 'ぴよぴよ', 'hosts'),
(3, null, null)
);
use role accountadmin;
alter table m_kajiya_db1.public.tbl alter column name set tag m_kajiya_db1.public.tag = 'true';
マスクされていることを確認します。
use role READ_TEST_ROLE;
table m_kajiya_db1.public.tbl;
returns:
実行する前に、ACCOUNT_USAGE.POLICY_REFERENCES
からマスキング情報を取得できるかも確認しておきます。
(ストアド内で ACCOUNT_USAGE.POLICY_REFERENCES
を参照するため。なので、マスキング用のタグを付与したばかりのテーブルでは、マスクされずにコピーされてしまいます。注意。。。)
クエリ
use role accountadmin;
-- 遅延に注意
select distinct
REF_DATABASE_NAME as database
, REF_SCHEMA_NAME as schema
, REF_ENTITY_NAME as entity
, REF_COLUMN_NAME as COL
from
SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES
where
ref_database_name = 'M_KAJIYA_DB1'
and ref_schema_name = 'PUBLIC'
and ref_entity_name = 'TBL'
;
さて、コピーを実行します。
use role READWRITE_TEST_ROLE;
call copy_masked_table_in_schema('M_KAJIYA_DB1.PUBLIC', 'M_KAJIYA_DB2.PUBLIC');
-- COPY_MASKED_TABLE_IN_SCHEMA
-- {'STATUS': 'Succeeded'}
ちゃんとマスクされた値が入っていますね。
table m_kajiya_db2.public.tbl;
returns:
注意
マスキング処理により VARCHAR などのサイズを超えると、次のエラーになります。サイズを修正するか、マスキング処理を修正するかが必要です。
{'STATUS': 'Error', 'DETAILS': 'Failed to insert masked records from `M_KAJIYA_DB1.PUBLIC."TBL"` to `M_KAJIYA_DB2.PUBLIC."TBL"`', 'EXCEPTION': SnowparkSQLException("100078 (22000): 01bbebd6-0002-5116-0000-2c79041b6f82: String '4bb7ae317ea728941c77bea79c869db015d48bc6' is too long and would be truncated", '1304', '01bbebd6-0002-5116-0000-2c79041b6f82')}
こんな感じの状況
create or replace table m_kajiya_db1.public.tbl (
id int
, name varchar(10) -- sha1() かけたら、まあ、超えるよね
, etc varchar(10)
) as
select
*
from (
values
(1, 'ほげほげ', 'etc'),
(2, 'ぴよぴよ', 'hosts'),
(3, null, null)
);
use role accountadmin;
alter table m_kajiya_db1.public.tbl alter column name set tag m_kajiya_db1.public.tag = 'true';
use role READWRITE_TEST_ROLE;
call copy_masked_table_in_schema('M_KAJIYA_DB1.PUBLIC', 'M_KAJIYA_DB2.PUBLIC');
-- COPY_MASKED_TABLE_IN_SCHEMA
-- {'STATUS': 'Error', 'DETAILS': 'Failed to insert masked records from `M_KAJIYA_DB1.PUBLIC."TBL"` to `M_KAJIYA_DB2.PUBLIC."TBL"`', 'EXCEPTION': SnowparkSQLException("100078 (22000): 01bbebd6-0002-5116-0000-2c79041b6f82: String '4bb7ae317ea728941c77bea79c869db015d48bc6' is too long and would be truncated", '1304', '01bbebd6-0002-5116-0000-2c79041b6f82')}
おわりに
全部マスキングでよくなれ!!!!!!
マスクされているから大丈夫!がまかり通らない、そんな環境のときに、用法用量を守ってお使いください。
Discussion