🍊

dbtのモデルとTROCCOのデータ転送の依存関係をexposureで表現して、データ管理を効率的に行なおう

2024/12/03に公開

この記事はTROCCO® Advent Calendar 2024 6日目の記事です。

はじめに

テータ分析基盤を構築している中で、テーブル量産に伴う課題が生まれました。
課題: テーブルの用途が不明瞭に
「どのテーブルがどのダッシュボードやデータ出力に使われているか分からない」
テーブルが増え続けると、それぞれの用途や関連するダッシュボードやデータ出力が把握しづらくなり、メンテナンスやアップデートが困難になる問題が発生しました。

解決策: dbtのexposures機能を活用
dbtでは「〇〇テーブルは▲▲テーブルから抽出されている」といったテーブル間の依存関係を確認できます。exposures機能を追加することで、テーブル間の関係だけでなく、テーブルがどのアウトプットに使われているかも可視化できます。
これにより、下図のようなリネージュをdbt上で確認できるようになります。

https://docs.getdbt.com/docs/build/exposures

補足:なぜ、TROCCOのデータリネージ機能を使わないのか

  • TROCCOを中心にテータ分析基盤を構築している場合はTROCCOのデータリネージ機能で十分です
  • dbtを中心に構築しているかつ、dbtのexposures機能はTROCCOを経由していないデータの出力情報(BI、Connected Sheetsなど)も登録できるため、dbtのexposures機能を使用します。

実現すること

このスクリプトは、以下のような処理を行います。

  1. BigQueryからのデータ取得
    TROCCOのデータ転送に関連するクエリ情報をBigQueryから取得します。
  2. 取得データの整形
    パイプラインIDごとにクエリ実行回数、クエリ内容、関連テーブル、クエリ実行者、実行時間(最新5件)を収集します。
  3. YAMLファイルの生成
    整形されたデータをYAML形式で出力し、dbtexposuresとして利用可能な形式に変換します。

必要な前提条件

以下の環境が必要です。

  • BigQuery
  • TROCCO
  • dbt

スクリプトの解説

BigQueryのINFORMATION_SCHEMA.JOBS_BY_PROJECTテーブルを使用し、指定期間(例: 3ヶ月)内にTROCCOのデータ転送が実行したクエリ情報を取得します。

import collections
from decimal import ROUND_HALF_UP, Decimal
from typing import Any, Dict

import yaml
from google.cloud import bigquery
import os

# 出力されたpipeline_idと内容を見ながらTROCCOのデータ転送名を登録
pipeline_name_by_pipeline_id = {
    "abcdefg123456789abcdefg123456789": "TROCCOのデータ転送1 #99999",
}

# 除外リスト
deleted_pipeline_ids = {}


def round_to_highest_digit(number):
    """
    yamlには大まかクエリ回数のみ記録したいため、最上位の桁で丸める

    :param number: 丸める対象の数値(int または float)
    :return: 最上位の桁で丸められた数値(int)
    """
    if number == 0:
        return 0

    length = len(str(abs(number)))
    highest_digit = int(str(number)[0])
    return highest_digit * 10 ** (length - 1)


def get_trocco_pipelines_info(
    client: bigquery.Client,
) -> Dict[str, Any]:
    """
    TROCCOパイプラインに関する情報をBigQueryから取得し、クエリ回数やテーブル参照情報を元に整形した辞書を返します。

    :param client: BigQueryのクライアントオブジェクト
    :return: パイプラインIDをキーとし、各パイプラインに関連する情報(クエリ、オーナー、参照テーブル、クエリ回数、実行時刻)を含む辞書
    """
    duration_days = 93  # 3ヶ月:93
    infomation_schema_jobs = "region-asia-northeast1.INFORMATION_SCHEMA.JOBS_BY_PROJECT"
    elementary_dbt_models = "my-project.my_dataset.dbt_models"

    query = f"""
        WITH trocco_audit_logs AS (
        SELECT 
            creation_time
            ,TO_HEX(MD5(query)) AS pipeline_id -- 一意に特定できる情報がクエリしかないのでハッシュ化してidにする
            ,user_email
            ,query
            ,referenced_table
        FROM `{infomation_schema_jobs}`
        ,UNNEST(referenced_tables) AS referenced_table
        WHERE 
            destination_table.table_id LIKE 'trocco_%' -- TROCCOはデータ転送前に`trocco_%`の名前でテーブルを作成する
            AND creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
        )
        ,pipeline_owner AS (
            SELECT
                pipeline_id
                ,user_email AS pipeline_owner
            FROM trocco_audit_logs
            QUALIFY ROW_NUMBER() OVER (PARTITION BY pipeline_id ORDER BY creation_time DESC) = 1 -- 直近のクエリ発行者をレポートのオーナーと見なす
        )
        ,queries_count_by_pipeline_id AS (
            SELECT
                pipeline_id
                ,COUNT(*) AS cnt
            FROM trocco_audit_logs
            GROUP BY pipeline_id
        )
        ,queries_time_by_pipeline_id AS (
            SELECT
            pipeline_id
            ,ARRAY_AGG(
                FORMAT_TIMESTAMP('%Y/%m/%d %H:%M', TIMESTAMP(creation_time), 'Asia/Tokyo')
                ORDER BY creation_time DESC
                LIMIT 5
            ) AS creation_times
            FROM (SELECT DISTINCT pipeline_id,creation_time FROM trocco_audit_logs)
            GROUP BY pipeline_id
        )
        ,referenced_tables_by_pipeline_id AS (
            SELECT DISTINCT 
                pipeline_id
                ,query
                ,referenced_table.table_id -- テーブル名とモデル名が同一ならこのまま
            FROM trocco_audit_logs
        )
        SELECT
            queries_count_by_pipeline_id.pipeline_id
            ,pipeline_owner.pipeline_owner
            ,referenced_tables_by_pipeline_id.query
            ,referenced_tables_by_pipeline_id.table_id
            ,queries_count_by_pipeline_id.cnt
            ,queries_time_by_pipeline_id.creation_times
        FROM queries_count_by_pipeline_id
        INNER JOIN queries_time_by_pipeline_id
        USING (pipeline_id)
        INNER JOIN pipeline_owner
        USING (pipeline_id)
        INNER JOIN referenced_tables_by_pipeline_id
        USING (pipeline_id)
        WHERE
            cnt > 5
            # ここは必須ではないが、dbtのモデルに関連するtableのみに絞るため入れている
            -- AND referenced_tables_by_pipeline_id.table_id IN (SELECT alias FROM `{elementary_dbt_models}`)
        ORDER BY queries_count_by_pipeline_id.cnt DESC
    """

    # クエリ回数順にyamlに出力するため、OrderedDictを使用する
    trocco_pipelines_info = collections.OrderedDict()
    for _, item in client.query(query).result().to_dataframe().iterrows():

        trocco_pipelines_info[item["pipeline_id"]] = {
            "query": item["query"],
            "pipeline_owner": item["pipeline_owner"],
            "referenced_tables": trocco_pipelines_info.get(item["pipeline_id"], {}).get("referenced_tables", [])
            + [f"ref('{item['table_id']}')"],
            "query_times": item["creation_times"],
            "queries_count": item["cnt"],
        }
    return trocco_pipelines_info


def main():
    """
    TROCCOのパイプライン情報を取得し、YAML形式でエクスポートするメイン処理。

    この関数は、BigQueryからTROCCOのパイプラインに関する情報を取得し、各パイプラインの詳細をMarkdownテーブル形式で記録したYAMLファイルを生成します。
    YAMLファイルは、データ転送のクエリ情報、関連するテーブル、最新のクエリ実行時刻、パイプラインのオーナーなどを含みます。

    :return: None
    """
    client = bigquery.Client()

    trocco_pipelines_info = get_trocco_pipelines_info(
        client,
    )
    result = {"version": 2, "exposures": []}

    for pipeline_id, item in trocco_pipelines_info.items():
        if pipeline_id in deleted_pipeline_ids:
            continue

        # query_times をマークダウンテーブルに変換
        markdown_table = "| No | Query Time          |\n|----|---------------------|\n"
        for idx, query_time in enumerate(item["query_times"], start=1):
            markdown_table += f"| {idx}  | {query_time} |\n"

        tmp = {
            # レポート名はユニークである必要があるため、pipeline_idを使用する。dbtの制約からアンダーバーに変換する
            "name": pipeline_id,
            "label": pipeline_name_by_pipeline_id.get(pipeline_id, pipeline_id),
            "description": f"### TROCCOのデータ転送\n\n#### Query\n\n```\n{item["query"]}\n```\n\n### Latest 5 Query Times\n{markdown_table}\n",
            "type": "application",
            "owner": {
                "name": item["pipeline_owner"],
                "email": item["pipeline_owner"],
            },
            "depends_on": sorted(item["referenced_tables"]),
            "meta": {
                # 定期的なyamlの更新により、yamlの修正行が不用意に増えるのを防ぎたいため大まかに丸める
                "total_queries_count": round_to_highest_digit(item["queries_count"]),
            },
        }
        result["exposures"].append(tmp)

    print(yaml.dump(result, allow_unicode=True, sort_keys=False), end="")

if __name__ == "__main__":
    main()

出力例

version: 2
exposures:
  - name: abcdefg123456789abcdefg123456789
    label: "TROCCOのデータ転送1 #99999"
    description: |
      ### TROCCOのデータ転送
      
      #### Query
      
      ```
      SELECT ...
      ```
      
      ### Latest 5 Query Times
      | No | Query Time          |
      |----|---------------------|
      | 1  | 2024/12/01 12:34   |
      | 2  | 2024/12/01 11:30   |
    type: application
    owner:
      name: owner_email@example.com
      email: owner_email@example.com
    depends_on:
      - ref('table_name')
    meta:
      total_queries_count: 1000

dbt docs

dbt docs(データリネージ)

手作業箇所

  • pipeline_name_by_pipeline_id
    • 出力されたpipeline_idと内容を見ながらTROCCOのデータ転送名を記載して行って下さい。
  • deleted_pipeline_ids
    • 古い転送設定や不要な転送設定があれば、pipeline_idを記載ください
pipeline_name_by_pipeline_id = {
    "abcdefg123456789abcdefg123456789": "TROCCOのデータ転送1 #99999",
}

deleted_pipeline_ids = {}

実装上の工夫ポイント

TROCCOのデータ転送クエリの捕捉

TROCCOはデータ転送中にtrocco_%の名前で一時テーブルを作成するため、'trocco_%'の条件でクエリを捕捉する

        WHERE 
            destination_table.table_id LIKE 'trocco_%'

pipeline_idの作成

集計のためには、TROCCOはデータ転送ごとに一意のIDが必要です。今回は、データ転送で使用されるクエリ文字列をハッシュ化してIDにします。

  • 上手くいかないパターン(現状許容するしかない)
    • 同じクエリ文のデータ転送設定が複数ある場合→同じ転送設定とみなしてしまう
    • データ転送設定の中のクエリが更新されて内容が変わる→別の転送設定とみなしてしまう
        SELECT 
            creation_time
            ,TO_HEX(MD5(query)) AS pipeline_id

もしかしたら、TROCCO APIやBigQuery監査ログから一意のIDを特定できるかもしれません(未検証)

[おまけ] GitHub Actions

GitHub Actions
dbt_exposure_for_trocco.yml
name: dbt exposure for TROCCO
run-name: Run by @${{ github.actor }} - ${{ github.workflow }}

on:
  schedule:
    - cron: "0 15 * * 5"  # 金曜日の15:00に実行(UTC時間)→土曜のJST0時
  workflow_dispatch:  # 手動トリガーの追加

jobs:
  dbt_exposure_for_trocco:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'
          cache: 'pip' # caching pip dependencies

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install PyYAML==6.0.1 google-cloud-bigquery==3.25.0 pandas==2.2.2 db-dtypes==1.2.0
          
      - name: Authenticate to Google Cloud
        uses: google-github-actions/auth@v2
        with:
          credentials_json: ${{ secrets.GCP_SA_KEY }}
          
      - name: Run Python script
        run: python .github/script/dbt_exposure_for_trocco.py 
        
      - name: Remove sensitive files
        if: always() # ワークフローの他のステップが失敗した場合でも、このステップが確実に実行
        run: |
          rm -rf gha* 
          rm -f gha-creds-*.json

      - name: Diff
        id: diff
        run: |
          set -x
          git add -N .
          git diff --name-only --exit-code
        continue-on-error: true

      - name: Get current date
        id: date
        run: |
          echo "JST_DATE=$(TZ=Asia/Tokyo date +'%Y%m%d')" >> $GITHUB_ENV

      - name: Get changed files
        id: changed-files
        if: steps.diff.outcome == 'failure'
        run: |
          echo "FILES<<EOF" >> $GITHUB_OUTPUT

          # Process git diff output and format it
          git -c core.quotepath=false diff --name-status | LC_ALL=C.UTF-8 sort -u | while read status file; do
            # Remove leading whitespace and format status
            status="[${status}]"
            file=$(echo "$file" | sed 's/^[[:space:]]*//')
            echo "$status$file" >> $GITHUB_OUTPUT
          done
          echo "EOF" >> $GITHUB_OUTPUT

      - name: Create Pull Request
        uses: peter-evans/create-pull-request@v6
        with:
          commit-message: 'update: TROCCO pipeline data ${{ env.JST_DATE }}'
          branch: ${{ env.JST_DATE }}_update_trocco_pipeline_data
          title: 'update: TROCCO pipeline data ${{ env.JST_DATE }}'
          body: |
            ## 概要

            BigQueryからTROCCOのパイプラインに関する情報を取得し、各パイプラインの詳細をMarkdownテーブル形式で記録したYAMLファイルを生成します。

            ## 変更されたファイル
            - [ファイルのステータス](https://git-scm.com/docs/git-status/2.43.0#_short_format)(A = 新規追加、M = 変更、D = 削除)
            ```
            ${{ steps.changed-files.outputs.FILES }}
            ```

        if: steps.diff.outcome == 'failure'

機能実装要望

TROCCOデータ転送中にtrocco_%の名前で一時テーブルを作成するためのクエリにメタデータを付与して欲しい → これにより手作業箇所が完全になくなる。
dbtによるクエリ実行だと下記のメタデータが自動で付与される

dbtの例
/* {"app": "dbt", "dbt_version": "1.8.3", "profile_name": "xxxxxxx", "target_name": "xxxxxxx", "connection_name": "xxxxxxx"} */

select * from xxxxxxx

上記を参考に考えたメタデータを付与案

メタデータを付与案
/* {"app": "trocco", "pipeline_id": "99999", "pipeline_name": "TROCCOのデータ転送1", "revision": "2"} */

select * from xxxxxxx

ご検討のほど、よろしくお願いします。

おわりに

このスクリプトを活用することで、TROCCOのデータ転送の使用状況を効率的に可視化し、監査や運用ドキュメントの作成を自動化できます。また、dbtとの統合により、データ基盤全体の透明性を高めることが可能です。

参考

下記の3部作はめっちゃ参考になりました〜
https://www.yasuhisay.info/entry/2023/10/08/001623
https://www.yasuhisay.info/entry/2024/01/06/193734
https://www.yasuhisay.info/entry/2024/01/20/114845


https://qiita.com/bitkey_ryou_ikuta/items/539a87f4fd013e2ab686
https://tech.timee.co.jp/entry/2024/03/18/110548
https://docs.getdbt.com/reference/exposure-properties
https://docs.getdbt.com/docs/build/exposures

Discussion