❄️

Snowflake カラムリネージの機能を試してみた

2023/01/31に公開

はじめに

2023年1月の Release 7.2 にてカラムリネージの機能がGAとなりました。
この機能によってACCOUNT_USAGE.ACCESS_HISTORYビューのOBJECTS_MODIFIEDカラムの内容が拡張されます。4パターンのDMLでどのように動作するのかを確認してみました。

検証

1.単純なINSERT INTO SELECT

-- 前準備
create or replace table from_tab 
(from_c1 integer, from_c2 integer, from_c3 string, from_c4 string);
create or replace table to_tab 
(to_c1 integer, to_c2 integer, to_c3 string, to_c4 string);
insert into from_tab values (1,10,'A','B');
-- DML実行
insert into to_tab select * from from_tab;
-- 確認
select OBJECTS_MODIFIED from snowflake.account_usage.access_history where query_id='...';

結果として以下のようなJSONが得られます。

結果1
[
  {
    "columns": [
      {
        "baseSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68611,
        "columnName": "TO_C3",
        "directSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68609,
        "columnName": "TO_C1",
        "directSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68612,
        "columnName": "TO_C4",
        "directSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68610,
        "columnName": "TO_C2",
        "directSources": [
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      }
    ],
    "objectDomain": "Table",
    "objectId": 57346,
    "objectName": "TEST01.PUBLIC.TO_TAB"
  }
]

FROM_Cxカラムの値を使用してTO_Cxの値が更新されたことがわかります。

2.演算結果をINSERT SELECT

では複数のカラムを演算してひとつのカラムを更新したときはどうなるでしょうか。

-- DML
insert into to_tab 
select from_c1+from_c2
     , length(from_c3)
     , concat(from_c3,from_c4)
     , to_varchar(from_c1) 
from from_tab;

結果はこうなりました。(宛先のTO_Cxの隣にどのような演算をしたかを注記しています。)

結果2
[
  {
    "columns": [
      {
        "baseSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68610,
        "columnName": "TO_C2",  -- length(from_c3)
        "directSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68609,
        "columnName": "TO_C1",  -- from_c1+from_c2
        "directSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68612,
        "columnName": "TO_C4", -- to_varchar(from_c1)
        "directSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68611,
        "columnName": "TO_C3", -- concat(from_c3,from_c4)
        "directSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      }
    ],
    "objectDomain": "Table",
    "objectId": 57346,
    "objectName": "TEST01.PUBLIC.TO_TAB"
  }
]

型が変わっても演算に使用したカラムが辿れることがわかります。

3.UPDATE文

念の為UPDATE文を見てみます。

-- DML
update to_tab 
set to_c2 = from_c1 + from_c2
  , to_c3 = repeat(from_c3, 3)
  , to_c4 = concat(from_c3,from_c4)
from from_tab where to_c1 = from_c1;
結果3
[
  {
    "columns": [
      {
        "baseSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68611,
        "columnName": "TO_C3", -- repeat(from_c3, 3)
        "directSources": [
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68610,
        "columnName": "TO_C2",  -- from_c1 + from_c2
        "directSources": [
          {
            "columnName": "FROM_C1",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C2",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ],
        "columnId": 68612,
        "columnName": "TO_C4", -- concat(from_c3,from_c4)
        "directSources": [
          {
            "columnName": "FROM_C4",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          },
          {
            "columnName": "FROM_C3",
            "objectDomain": "Table",
            "objectId": 56322,
            "objectName": "TEST01.PUBLIC.FROM_TAB"
          }
        ]
      }
    ],
    "objectDomain": "Table",
    "objectId": 57346,
    "objectName": "TEST01.PUBLIC.TO_TAB"
  }
]

予想通りですね。

4.結合ビューをSELECTしてINSERT

最後にビューをSELECTしてINSERTしてみます。

-- 前準備
create or replace table base_tab1 (id integer, name string);
create or replace table base_tab2 (id integer, num integer);
create or replace view from_view as
select base_tab1.id, name, sum(num) sum_num 
from base_tab1
join base_tab2 on base_tab1.id = base_tab2.id
group by base_tab1.id, name;
insert into base_tab1 values(1,'TEST');
insert into base_tab2 values(1,1);
insert into base_tab2 values(1,5);
-- DML
insert into to_tab (to_c1, to_c2, to_c3) 
select id, sum_num, name from from_view;
結果4
[
  {
    "columns": [
      {
        "baseSources": [
          {
            "columnName": "ID",
            "objectDomain": "Table",
            "objectId": 58370,
            "objectName": "TEST01.PUBLIC.BASE_TAB1"
          }
        ],
        "columnId": 68609,
        "columnName": "TO_C1",
        "directSources": [
          {
            "columnName": "ID",
            "objectDomain": "View",
            "objectId": 60420,
            "objectName": "TEST01.PUBLIC.FROM_VIEW"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "NAME",
            "objectDomain": "Table",
            "objectId": 58370,
            "objectName": "TEST01.PUBLIC.BASE_TAB1"
          }
        ],
        "columnId": 68611,
        "columnName": "TO_C3",
        "directSources": [
          {
            "columnName": "NAME",
            "objectDomain": "View",
            "objectId": 60420,
            "objectName": "TEST01.PUBLIC.FROM_VIEW"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "NUM",
            "objectDomain": "Table",
            "objectId": 59394,
            "objectName": "TEST01.PUBLIC.BASE_TAB2"
          }
        ],
        "columnId": 68610,
        "columnName": "TO_C2",
        "directSources": [
          {
            "columnName": "SUM_NUM",
            "objectDomain": "View",
            "objectId": 60420,
            "objectName": "TEST01.PUBLIC.FROM_VIEW"
          }
        ]
      }
    ],
    "objectDomain": "Table",
    "objectId": 57346,
    "objectName": "TEST01.PUBLIC.TO_TAB"
  }
]

ここで初めてdirectSourcesbaseSourcesが区別されている意味が出てきます。directSourcesではビューの情報が、baseSourcesではビューの元になっているテーブルの情報が得られます。

せっかくなのでグラフィカルに表現したい

AdHocにデータの出自を調査するには十分な情報が得られると思います。
しかしできればもう少し見やすくビジュアライズできないかと思ってしまいますね。少々冗長ではありますが、Mermaidのテキストを出力できるようSQLを書いてみました。

with tab_col as (
select t2.value:objectName target_obj
    , t3.value:columnName target_col
    , target_obj||'.'||target_col||'['||target_col||']' target_node
    , t4.value:objectName direct_obj
    , t4.value:columnName direct_col
    , direct_obj||'.'||direct_col||'['||direct_col||']' direct_node
    , t5.value:objectName base_obj
    , t5.value:columnName base_col
    , base_obj||'.'||base_col||'['||base_col||']' base_node
from snowflake.account_usage.access_history t1
    , lateral flatten(t1.OBJECTS_MODIFIED) t2
    , lateral flatten(t2.value:columns) t3
    , lateral flatten(t3.value:directSources) t4
    , lateral flatten(t3.value:baseSources) t5
where t1.query_id='<query_id>'
), 
tab_col_union as (
select target_obj tab, target_col col, target_node node from tab_col
union
select direct_obj tab, direct_col col, direct_node node from tab_col
union
select base_obj tab, base_col col, base_node node from tab_col
),
nodes as (
select node,
    iff(tab != nvl(lag(tab) over(order by node),''), 'subgraph '||tab||'\n', '') 
    || '\t' || node 
    || iff(tab != nvl(lead(tab) over(order by node),''), '\nend', '') text
from tab_col_union order by node
),
edges as (
select null node, base_node||' ---> '||direct_node from tab_col where base_node != direct_node
union
select null node, direct_node||' ---> '||target_node from tab_col
),
texts as (
select * from nodes
union
select * from edges
)
select 'flowchart LR\n' || listagg(text,'\n') within group (order by node) from texts;

4.のDMLについてこのSQLを実行すると以下のような出力が得られます。

flowchart LR
subgraph TEST01.PUBLIC.BASE_TAB1
	TEST01.PUBLIC.BASE_TAB1.ID[ID]
	TEST01.PUBLIC.BASE_TAB1.NAME[NAME]
end
subgraph TEST01.PUBLIC.BASE_TAB2
	TEST01.PUBLIC.BASE_TAB2.NUM[NUM]
end
subgraph TEST01.PUBLIC.FROM_VIEW
	TEST01.PUBLIC.FROM_VIEW.ID[ID]
	TEST01.PUBLIC.FROM_VIEW.NAME[NAME]
	TEST01.PUBLIC.FROM_VIEW.SUM_NUM[SUM_NUM]
end
subgraph TEST01.PUBLIC.TO_TAB
	TEST01.PUBLIC.TO_TAB.TO_C1[TO_C1]
	TEST01.PUBLIC.TO_TAB.TO_C2[TO_C2]
	TEST01.PUBLIC.TO_TAB.TO_C3[TO_C3]
end
TEST01.PUBLIC.BASE_TAB1.NAME[NAME] ---> TEST01.PUBLIC.FROM_VIEW.NAME[NAME]
TEST01.PUBLIC.FROM_VIEW.ID[ID] ---> TEST01.PUBLIC.TO_TAB.TO_C1[TO_C1]
TEST01.PUBLIC.BASE_TAB1.ID[ID] ---> TEST01.PUBLIC.FROM_VIEW.ID[ID]
TEST01.PUBLIC.BASE_TAB2.NUM[NUM] ---> TEST01.PUBLIC.FROM_VIEW.SUM_NUM[SUM_NUM]
TEST01.PUBLIC.FROM_VIEW.SUM_NUM[SUM_NUM] ---> TEST01.PUBLIC.TO_TAB.TO_C2[TO_C2]
TEST01.PUBLIC.FROM_VIEW.NAME[NAME] ---> TEST01.PUBLIC.TO_TAB.TO_C3[TO_C3]

だいぶ分かりやすいですね。
ただ、残念ながらこれは完璧ではなく、3.のクエリに対して実行してみるとこうなってしまいます。

FROM_TAB内の各カラムが相互参照するような形になってしまいました。もう少し工夫の仕方はあるかも知れませんが、元のjsonにdirectSourcesの各カラムがbaseSourcesのどのカラムと関係があるかという情報はないため、やむを得ないのではないかと思います。
なので、ターゲットテーブルに対してdirectSourcesbaseSourcesそれぞれからどのようにデータが連携されたかを分けてみるべきなのでしょう。

directSourcesとの関係を確認
with tab_col as (
select t2.value:objectName target_obj
    , t3.value:columnName target_col
    , target_obj||'.'||target_col||'['||target_col||']' target_node
    , t4.value:objectName direct_obj
    , t4.value:columnName direct_col
    , direct_obj||'.'||direct_col||'['||direct_col||']' direct_node
from snowflake.account_usage.access_history t1
    , lateral flatten(t1.OBJECTS_MODIFIED) t2
    , lateral flatten(t2.value:columns) t3
    , lateral flatten(t3.value:directSources) t4
where t1.query_id='<query_id>'
), 
tab_col_union as (
select target_obj tab, target_col col, target_node node from tab_col
union
select direct_obj tab, direct_col col, direct_node node from tab_col
),
nodes as (
select node,
    iff(tab != nvl(lag(tab) over(order by node),''), 'subgraph '||tab||'\n', '') 
    || '\t' || node 
    || iff(tab != nvl(lead(tab) over(order by node),''), '\nend', '') text
from tab_col_union order by node
),
edges as (
select null node, direct_node||' ---> '||target_node from tab_col
),
texts as (
select * from nodes
union
select * from edges
)
select 'flowchart LR\n' || listagg(text,'\n') within group (order by node) from texts;
baseSourcesとの関係を確認
with tab_col as (
select t2.value:objectName target_obj
    , t3.value:columnName target_col
    , target_obj||'.'||target_col||'['||target_col||']' target_node
    , t5.value:objectName base_obj
    , t5.value:columnName base_col
    , base_obj||'.'||base_col||'['||base_col||']' base_node
from snowflake.account_usage.access_history t1
    , lateral flatten(t1.OBJECTS_MODIFIED) t2
    , lateral flatten(t2.value:columns) t3
    , lateral flatten(t3.value:baseSources) t5
where t1.query_id='<query_id>'
), 
tab_col_union as (
select target_obj tab, target_col col, target_node node from tab_col
union
select base_obj tab, base_col col, base_node node from tab_col
),
nodes as (
select node,
    iff(tab != nvl(lag(tab) over(order by node),''), 'subgraph '||tab||'\n', '') 
    || '\t' || node 
    || iff(tab != nvl(lead(tab) over(order by node),''), '\nend', '') text
from tab_col_union order by node
),
edges as (
select null node, base_node||' ---> '||target_node from tab_col
),
texts as (
select * from nodes
union
select * from edges
)
select 'flowchart LR\n' || listagg(text,'\n') within group (order by node) from texts;

おわりに

今回は試しにMermaidを出力するSQLを書いてみました。もしかしたら複雑なデータ連携をおこなっている場合は完璧に動作しないケースもあるかも知れませんが、あくまで一例ということでご容赦ください。
この情報を利用するデータカタログ・ツールが出てきて、データの流れを包括的に把握することができたら大変面白いのではないかと思います。

Discussion