NTT DATA TECH
🔍

SnowflakeでGET_LINEAGEを拡張して深いリネージュ分析を可能にするアプリを作ってみた

に公開

1. GET_LINEAGE 関数とは

  • Snowflakeのリネージュ取得関数 GET_LINEAGEGAになっています。これにより、SQLから直接、オブジェクト間の依存関係を構造化データとして取得でき、下流・上流のつながりを素早く把握できます。
  • この機能を使って、取込データに遅延や不正データが含まれた際に、どのデータマートテーブルに影響があるかを簡単に調査するアプリを作ってみたいと思います。
  • 最終的に、以下のようなStreamlit in Snowflakeアプリを作成します。
    SiS画面

2. 「影響オブジェクトを全て返せるわけではない」という制約

  • GET_LINEAGEは、指定したオブジェクトを起点に、上下流の依存関係のあるオブジェクトを返す関数です。
  • ただし、この関数には「最大で5ホップまでの範囲で依存関係のあるオブジェクトを返す」制約があるため、依存関係のあるオブジェクトを全て返すわけではありません。

<distance> 検索する系統のレベル数。最大値は5で、これがデフォルトでもあります。[参照]

  • この制約を検証するため、モックで小さなパイプラインを考えます。DDL/DMLは付録に記載。
  • 実務では、上記のように、品質担保や変更容易性のために中間層(INT)を細かく積み重ねる設計が増えています。
  • ここで、GET_LINEAGE 関数を、起点テーブル(RAW.PUBLIC.ORDERS)から呼ぶと、中間加工(INT)までは取得できるものの、データマートテーブル(DM.PUBLIC.SALES_MART)に届かないという状況が発生します。
SELECT
  TARGET_OBJECT_DATABASE
  ,TARGET_OBJECT_SCHEMA
  ,TARGET_OBJECT_NAME
  ,TARGET_OBJECT_DOMAIN
  ,TARGET_STATUS
  ,DISTANCE
FROM
  TABLE(
    SNOWFLAKE.CORE.GET_LINEAGE('RAW.PUBLIC.ORDERS','TABLE','DOWNSTREAM')
  );

DM.PUBLIC.SALES_MARTが表示されない

  • つまり、中間加工が多いと、標準の1回の呼び出しでは「最終的に知りたいデータマート」に未到達になるということです。

3. 制約の解決アプローチ「GET_LINEAGE_CUSTOMプロシージャの作成」

  • 単発のGET_LINEAGEで取り切れないなら、グラフ探索の考え方で段階的に範囲を広げます。
  • 今回はストアドプロシージャを使って、GET_LINEAGE 関数を拡張します。DDLは付録に記載。
  • ストアドプロシージャのフローチャートは以下になります。
  • 実際に、ストアドプロシージャを実行してみます。
CALL RAW.PUBLIC.GET_LINEAGE_CUSTOM('RAW.PUBLIC.ORDERS','TABLE','DOWNSTREAM',100);
SELECT
  TARGET_OBJECT_DATABASE
  ,TARGET_OBJECT_SCHEMA
  ,TARGET_OBJECT_NAME
  ,TARGET_OBJECT_DOMAIN
  ,TARGET_STATUS
  ,DISTANCE
FROM
  TABLE(RESULT_SCAN(LAST_QUERY_ID()))
;
  • 6ホップ目のデータマートテーブルも結果に表示されるようになりました。
  • 拡張して表示させたDM.PUBLIC.SALES_MARTのDISTANCE列も正常に表示できています。
    DM.PUBLIC.SALES_MARTが表示されている!

4. Streamlit in Snowflakeでアプリ化

  • 拡張ロジックを運用に乗せるには、非エンジニアでも扱えるUIが有効です。

  • Streamlit in Snowflakeでリネージュ取得アプリを用意してみます。ソースは付録に記載。

    • 入力UI
      • 起点オブジェクト(DB.SCHEMA.OBJECT)
      • 方向(DOWNSTREAM/UPSTREAM)
      • 最大反復回数(安全上限:100)
      • 簡易フィルタ(対象DB/スキーマ、オブジェクト種別)
    • 出力UI
      • 影響テーブルの一覧
      • エクスポート(CSVのダウンロード)
      • 詳細(エッジ一覧)の表示
  • アプリの画面はこんな感じです。
    SiSアプリの初期画面

  • 実際に取込テーブルを指定して下流検索を実行してみます。データマートテーブルまで表示できています。
    SiSアプリの下流検索画面

  • せっかくなので、データマートテーブルを指定して、上流検索も実行してみます。こちらも取込テーブルまで表示できています。

  • このアプリにより以下を即時に把握できるようになりました。

    • 特定の取込テーブルが遅延/失敗したら、どのデータマートテーブルに影響するか
    • 特定のデータマートテーブルで不正を検知した場合、原因である取込テーブルや中間テーブルはどれか
  • 中間加工が厚い環境でも、データマートまで到達する結果を返せるため、従来のGET_LINEAGE関数の制約を意識せずに運用判断ができます。

5. まとめ

  • GET_LINEAGE 関数のGAにより、SnowflakeのリネージュがSQLから直接扱えるようになりました。
  • ただし、多段の中間加工が挟まる実務環境では、単発の呼び出し(最大5ホップ相当)では最下流のデータマートに届かないことがあるのも事実です。
  • 反復的な展開でGET_LINEAGEを合成すれば、標準的な出力のまま、到達範囲をデータマートまで拡張できます。
  • UIとしてStreamlit in Snowflakeを用いれば、非エンジニアもセルフサービスで影響範囲を確認でき、障害時の初動や判断が加速します。
  • ぜひ、みなさんもGET_LINEAGE関数を活用して、データオブザーバビリティを促進していきましょう!

6. 仲間募集

NTTデータ ソリューション事業本部 では、以下の職種を募集しています。

Snowflake、生成AIを活用したデータ基盤構築/活用支援(Snowflake Data Superheroesとの協働)
Databricks、生成AIを活用したデータ基盤構築/活用支援(Databricks Championとの協働)
プロジェクトマネージャー(データ分析プラットフォームソリューションの企画~開発~導入/生成AI活用)
クラウドを活用したデータ分析プラットフォームの開発(ITアーキテクト/PM/クラウドエンジニア)

7. ソリューション紹介

Trusted Data Foundationについて

~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://www.nttdata.com/jp/ja/lineup/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。

TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について

~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://www.nttdata.com/jp/ja/lineup/tdf_am/
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。

NTTデータとSnowflakeについて

NTTデータとSnowflakeについて
NTTデータでは、Snowflake Inc.とソリューションパートナー契約を締結し、クラウド・データプラットフォーム「Snowflake」の導入・構築、および活用支援を開始しています。
NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。
https://www.nttdata.com/jp/ja/lineup/snowflake/

NTTデータとDatabricksについて

NTTデータは、お客様企業のデジタル変革・DXの成功に向けて、「databricks」のソリューションの提供に加え、情報活用戦略の立案から、AI技術の活用も含めたアナリティクス、分析基盤構築・運用、分析業務のアウトソースまで、ワンストップの支援を提供いたします。
https://www.nttdata.com/jp/ja/lineup/databricks/

8. 付録

モックのために利用したテーブルのDDLおよびDML

CREATE DATABASE IF NOT EXISTS RAW;
CREATE SCHEMA IF NOT EXISTS RAW.PUBLIC;
CREATE OR REPLACE TABLE RAW.PUBLIC.ORDERS (COL STRING);

CREATE DATABASE IF NOT EXISTS STG;
CREATE SCHEMA IF NOT EXISTS STG.PUBLIC;
CREATE OR REPLACE TABLE STG.PUBLIC.ORDERS_STG (COL STRING);

CREATE DATABASE IF NOT EXISTS INT;
CREATE SCHEMA IF NOT EXISTS INT.PUBLIC;
CREATE OR REPLACE TABLE INT.PUBLIC.ORDERS_CLEAN (COL STRING);
CREATE OR REPLACE TABLE INT.PUBLIC.ORDERS_ENRICH (COL STRING);
CREATE OR REPLACE TABLE INT.PUBLIC.ORDERS_AGG (COL STRING);
CREATE OR REPLACE TABLE INT.PUBLIC.ORDERS_CURATED (COL STRING);

CREATE DATABASE IF NOT EXISTS DM;
CREATE SCHEMA IF NOT EXISTS DM.PUBLIC;
CREATE OR REPLACE TABLE DM.PUBLIC.SALES_MART (COL STRING);

INSERT INTO RAW.PUBLIC.ORDERS VALUES ('data');
INSERT INTO STG.PUBLIC.ORDERS_STG   SELECT * FROM RAW.PUBLIC.ORDERS;
INSERT INTO INT.PUBLIC.ORDERS_CLEAN   SELECT * FROM STG.PUBLIC.ORDERS_STG;
INSERT INTO INT.PUBLIC.ORDERS_ENRICH  SELECT * FROM INT.PUBLIC.ORDERS_CLEAN;
INSERT INTO INT.PUBLIC.ORDERS_AGG     SELECT * FROM INT.PUBLIC.ORDERS_ENRICH;
INSERT INTO INT.PUBLIC.ORDERS_CURATED SELECT * FROM INT.PUBLIC.ORDERS_AGG;
INSERT INTO DM.PUBLIC.SALES_MART      SELECT * FROM INT.PUBLIC.ORDERS_CURATED;

拡張GET_LINEAGEのプロシージャ

CREATE OR REPLACE PROCEDURE RAW.PUBLIC.GET_LINEAGE_CUSTOM(
    OBJECT_NAME STRING,
    OBJECT_DOMAIN STRING DEFAULT 'TABLE',
    DIRECTION STRING DEFAULT 'DOWNSTREAM',
    MAX_HOPS INTEGER DEFAULT 100
)
RETURNS TABLE (
    SOURCE_OBJECT_DATABASE STRING,
    SOURCE_OBJECT_SCHEMA   STRING,
    SOURCE_OBJECT_NAME     STRING,
    SOURCE_OBJECT_DOMAIN   STRING,
    SOURCE_OBJECT_VERSION  STRING,
    SOURCE_COLUMN_NAME     STRING,
    SOURCE_STATUS          STRING,
    TARGET_OBJECT_DATABASE STRING,
    TARGET_OBJECT_SCHEMA   STRING,
    TARGET_OBJECT_NAME     STRING,
    TARGET_OBJECT_DOMAIN   STRING,
    TARGET_OBJECT_VERSION  STRING,
    TARGET_COLUMN_NAME     STRING,
    TARGET_STATUS          STRING,
    DISTANCE               INTEGER
)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
EXECUTE AS OWNER
AS
$$
from typing import List, Tuple, Set, Optional
from snowflake.snowpark import Session, functions as F, types as T

# ----------------------------
# ユーティリティ
# ----------------------------
def _build_fqn(db: Optional[str], sch: Optional[str], name: Optional[str]) -> Optional[str]:
    """DB,SCHEMA,NAME から FQN を生成。いずれか欠損なら None。"""
    if not db or not sch or not name:
        return None
    return f"{db}.{sch}.{name}"

def _normalize_direction(direction: str) -> str:
    """方向パラメータを正規化し検証。"""
    d = (direction or "").strip().upper()
    if d not in {"DOWNSTREAM", "UPSTREAM"}:
        raise ValueError(f"Invalid DIRECTION: {direction}. Use 'DOWNSTREAM' or 'UPSTREAM'.")
    return d

def _normalize_domain(domain: str) -> str:
    """ドメインを大文字化。存在チェックは GET_LINEAGE 側に委譲。"""
    d = (domain or "").strip().upper()
    if not d:
        raise ValueError("OBJECT_DOMAIN must not be empty.")
    return d

def _resolve_object_name(session: Session, obj_name: str) -> str:
    """
    オブジェクト名が非完全修飾の場合、CURRENT_DATABASE/CURRENT_SCHEMA を付与して FQN にする。
    既に完全名(DB.SCHEMA.NAME)ならそのまま返す。
    """
    if obj_name.count(".") == 2:
        return obj_name
    cur_db = session.sql("SELECT CURRENT_DATABASE()").collect()[0][0]
    cur_sch = session.sql("SELECT CURRENT_SCHEMA()").collect()[0][0]
    if not cur_db or not cur_sch:
        raise ValueError("CURRENT_DATABASE or CURRENT_SCHEMA is not set. Set both or pass a fully qualified OBJECT_NAME.")
    return f"{cur_db}.{cur_sch}.{obj_name}"

# ----------------------------
# メイン処理
# ----------------------------
def run(session: Session, OBJECT_NAME: str, OBJECT_DOMAIN: str, DIRECTION: str, MAX_HOPS: int):
    """
    幅優先で系譜情報を展開し、重複のないエッジ集合を返す。
    """
    # 入力正規化・バリデーション
    if not OBJECT_NAME or not OBJECT_NAME.strip():
        raise ValueError("OBJECT_NAME must not be empty.")
    DIRECTION_N = _normalize_direction(DIRECTION)
    DOMAIN_N = _normalize_domain(OBJECT_DOMAIN)

    if MAX_HOPS is None or MAX_HOPS < 1:
        MAX_HOPS = 1
    if MAX_HOPS > 1000:
        # 誤設定による過剰展開を防止
        MAX_HOPS = 1000

    root_fqn = _resolve_object_name(session, OBJECT_NAME.strip())

    # 返却用スキーマ(GET_LINEAGE の出力に合わせる)
    out_cols = [
        "SOURCE_OBJECT_DATABASE","SOURCE_OBJECT_SCHEMA","SOURCE_OBJECT_NAME","SOURCE_OBJECT_DOMAIN",
        "SOURCE_OBJECT_VERSION","SOURCE_COLUMN_NAME","SOURCE_STATUS",
        "TARGET_OBJECT_DATABASE","TARGET_OBJECT_SCHEMA","TARGET_OBJECT_NAME","TARGET_OBJECT_DOMAIN",
        "TARGET_OBJECT_VERSION","TARGET_COLUMN_NAME","TARGET_STATUS","DISTANCE"
    ]
    empty_schema = T.StructType([
        T.StructField("SOURCE_OBJECT_DATABASE", T.StringType()),
        T.StructField("SOURCE_OBJECT_SCHEMA",   T.StringType()),
        T.StructField("SOURCE_OBJECT_NAME",     T.StringType()),
        T.StructField("SOURCE_OBJECT_DOMAIN",   T.StringType()),
        T.StructField("SOURCE_OBJECT_VERSION",  T.StringType()),
        T.StructField("SOURCE_COLUMN_NAME",     T.StringType()),
        T.StructField("SOURCE_STATUS",          T.StringType()),
        T.StructField("TARGET_OBJECT_DATABASE", T.StringType()),
        T.StructField("TARGET_OBJECT_SCHEMA",   T.StringType()),
        T.StructField("TARGET_OBJECT_NAME",     T.StringType()),
        T.StructField("TARGET_OBJECT_DOMAIN",   T.StringType()),
        T.StructField("TARGET_OBJECT_VERSION",  T.StringType()),
        T.StructField("TARGET_COLUMN_NAME",     T.StringType()),
        T.StructField("TARGET_STATUS",          T.StringType()),
        T.StructField("DISTANCE",               T.IntegerType()),
    ])
    result_df = session.create_dataframe([], schema=empty_schema)

    # 既に展開したオブジェクト(FQN)
    expanded: Set[str] = set()

    # frontier: (object_fqn, accumulated_offset)
    frontier: List[Tuple[str, int]] = [(root_fqn, 0)]
    hops = 0

    while frontier and hops < MAX_HOPS:
        next_frontier: List[Tuple[str, int]] = []

        for obj_fqn, offset in frontier:
            if obj_fqn in expanded:
                continue
            expanded.add(obj_fqn)

            # GET_LINEAGE 呼び出し
            #   返却 DISTANCE は 0..5 を想定。呼び出し側で offset を加算して全体距離に補正
            gl = session.table_function(
                "SNOWFLAKE.CORE.GET_LINEAGE",
                F.lit(obj_fqn),
                F.lit(DOMAIN_N),
                F.lit(DIRECTION_N)
            )

            # 念のため None ガード
            if gl is None:
                continue

            # 距離補正
            gl_adj = gl.with_column("DISTANCE", F.col("DISTANCE") + F.lit(offset))

            # 結果に追加(選択列を固定)
            result_df = result_df.union_all(gl_adj.select(out_cols))

            # 端点(DISTANCE==5) を次のフロンティアへ(オフセット +5)
            #   DISTINCT で重複対象を削減
            edge5_rows = (
                gl.filter(F.col("DISTANCE") == 5)
                  .select("TARGET_OBJECT_DATABASE","TARGET_OBJECT_SCHEMA","TARGET_OBJECT_NAME")
                  .distinct()
                  .collect()
            )

            for r in edge5_rows:
                fqn = _build_fqn(r[0], r[1], r[2])
                if fqn:
                    next_frontier.append((fqn, offset + 5))

        frontier = next_frontier
        hops += 1

    # 最終的に重複エッジを排除
    result_df = result_df.distinct()
    return result_df
$$;

拡張GET_LINEAGEのプロシージャ

from typing import List, Tuple
import pandas as pd
import streamlit as st
from snowflake.snowpark.context import get_active_session

# --------------------------------------------------------------
# Snowpark セッション取得
# --------------------------------------------------------------
session = get_active_session()

# --------------------------------------------------------------
# ヘルパ: データベース一覧の取得
# --------------------------------------------------------------
@st.cache_data(show_spinner=False, ttl=300)
def list_accessible_databases() -> List[str]:
    # 直前の SHOW の結果を RESULT_SCAN で取得し、name 列だけを選択
    session.sql("SHOW DATABASES").collect()  # まず実行
    df = session.sql('SELECT "name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))').to_pandas()
    return sorted(df["name"].astype(str).tolist())

# --------------------------------------------------------------
# ヘルパ: ストアドプロシージャ呼び出し
# --------------------------------------------------------------
@st.cache_data(show_spinner=True)
def fetch_lineage(object_name: str, direction: str, max_hops: int, domain: str = "TABLE") -> pd.DataFrame:
    """GET_LINEAGE_CUSTOM を呼び出し、CALL → RESULT_SCAN で結果を取得して返す。"""
    direction_u = direction.strip().upper()
    domain_u = domain.strip().upper()

    # 1) CALL を実行(結果セットはサーバ側に保持される)
    session.sql(
        """
        CALL RAW.PUBLIC.GET_LINEAGE_CUSTOM(
            OBJECT_NAME => ?,
            OBJECT_DOMAIN => ?,
            DIRECTION    => ?,
            MAX_HOPS     => ?
        )
        """,
        params=[object_name, domain_u, direction_u, int(max_hops)]
    ).collect()  # 実行して結果を確定させる(ここでは取り出さない)

    # 2) 直近の結果セットを RESULT_SCAN で取り出す(列を明示)
    df = session.sql(
        """
        SELECT
            "SOURCE_OBJECT_DATABASE","SOURCE_OBJECT_SCHEMA","SOURCE_OBJECT_NAME","SOURCE_OBJECT_DOMAIN",
            "SOURCE_OBJECT_VERSION","SOURCE_COLUMN_NAME","SOURCE_STATUS",
            "TARGET_OBJECT_DATABASE","TARGET_OBJECT_SCHEMA","TARGET_OBJECT_NAME","TARGET_OBJECT_DOMAIN",
            "TARGET_OBJECT_VERSION","TARGET_COLUMN_NAME","TARGET_STATUS",
            "DISTANCE"
        FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
        """
    ).to_pandas()

    return df


# --------------------------------------------------------------
# UI: タイトルと設定
# --------------------------------------------------------------
st.set_page_config(page_title="Lineage Impact Viewer", layout="wide")
st.title("Lineage Impact Viewer")
st.caption("GET_LINEAGEを用いた上流/下流テーブルの影響調査アプリ")

with st.sidebar:
    st.subheader("共通設定")
    db_list = list_accessible_databases()
    # デフォルト候補を明示的に指定
    default_dbs = ["RAW", "STG", "INT", "DM"]
    # 存在するものだけを残す(環境にないDBが指定されていても安全に動くように)
    default_dbs = [db for db in default_dbs if db in db_list]
    selected_dbs = st.multiselect(
        "影響調査対象データベース (複数選択可)",
        options=db_list,
        default=default_dbs
    )
    max_hops = st.slider("最大展開回数 (MAX_HOPS)", min_value=1, max_value=100, value=20, step=1)
    domain = st.selectbox("オブジェクトのドメイン", options=["TABLE"], index=0)

# --------------------------------------------------------------
# タブ: 下流 / 上流
# --------------------------------------------------------------
TAB_DOWN, TAB_UP = st.tabs(["下流検索", "上流検索"])

# ---- 下流検索 ----
with TAB_DOWN:
    st.subheader("下流検索")
    object_name_down = st.text_input(
        "起点テーブル/ビュー名(下流影響)",
        placeholder="例: RAW.PUBLIC.ORDERS",
        help="選択中のデータベース内で、影響を受けるテーブル/ビュー(影響先)を一覧表示します。"
    )
    col1, col2 = st.columns([1, 5])
    with col1:
        do_search_down = st.button("影響を表示", type="primary")
    with col2:
        st.empty()  # 余白

    if do_search_down and object_name_down.strip():
        df_all = fetch_lineage(object_name_down.strip(), direction="DOWNSTREAM", max_hops=max_hops, domain=domain)
        # 下流側は TARGET_* が影響先
        df_filtered = df_all[df_all["TARGET_OBJECT_DATABASE"].isin(selected_dbs)].copy()
        # テーブル単位でユニーク化
        df_targets = (
            df_filtered[[
                "TARGET_OBJECT_DATABASE","TARGET_OBJECT_SCHEMA","TARGET_OBJECT_NAME",
                "TARGET_OBJECT_DOMAIN","DISTANCE"
            ]]
            .drop_duplicates()
            .sort_values(["DISTANCE","TARGET_OBJECT_DATABASE","TARGET_OBJECT_SCHEMA","TARGET_OBJECT_NAME"], ascending=[True, True, True, True])
            .reset_index(drop=True)
        )
        st.markdown("#### 影響オブジェクト一覧 (下流)")
        st.dataframe(df_targets, use_container_width=True, height=420)

        # ダウンロード
        csv = df_targets.to_csv(index=False).encode("utf-8-sig")
        st.download_button("CSV ダウンロード", data=csv, file_name="downstream_impacts.csv", mime="text/csv")

        with st.expander("詳細 (エッジ一覧) "):
            st.dataframe(df_all, use_container_width=True, height=300)

# ---- 上流検索 ----
with TAB_UP:
    st.subheader("上流検索")
    object_name_up = st.text_input(
        "起点テーブル/ビュー名(上流影響)",
        placeholder="例: DM.PUBLIC.SALES_MART",
        help="選択中のデータベース内で、元になっているテーブル/ビュー(影響元)を一覧表示します。"
    )
    col1, col2 = st.columns([1, 5])
    with col1:
        do_search_up = st.button("影響を表示", key="btn_up", type="primary")
    with col2:
        st.empty()

    if do_search_up and object_name_up.strip():
        df_all = fetch_lineage(object_name_up.strip(), direction="UPSTREAM", max_hops=max_hops, domain=domain)
        # 上流側は SOURCE_* が影響元
        df_filtered = df_all[df_all["SOURCE_OBJECT_DATABASE"].isin(selected_dbs)].copy()
        df_sources = (
            df_filtered[[
                "SOURCE_OBJECT_DATABASE","SOURCE_OBJECT_SCHEMA","SOURCE_OBJECT_NAME",
                "SOURCE_OBJECT_DOMAIN","DISTANCE"
            ]]
            .drop_duplicates()
            .sort_values(["DISTANCE","SOURCE_OBJECT_DATABASE","SOURCE_OBJECT_SCHEMA","SOURCE_OBJECT_NAME"], ascending=[True, True, True, True])
            .reset_index(drop=True)
        )
        st.markdown("#### 影響オブジェクト一覧 (上流)")
        st.dataframe(df_sources, use_container_width=True, height=420)

        csv = df_sources.to_csv(index=False).encode("utf-8-sig")
        st.download_button("CSV ダウンロード", data=csv, file_name="upstream_impacts.csv", mime="text/csv")

        with st.expander("詳細 (エッジ一覧) "):
            st.dataframe(df_all, use_container_width=True, height=300)
NTT DATA TECH
NTT DATA TECH
設定によりコメント欄が無効化されています