dbtのモデルとTROCCOのデータ転送の依存関係をexposureで表現して、データ管理を効率的に行なおう
この記事はTROCCO® Advent Calendar 2024 6日目の記事です。
はじめに
テータ分析基盤を構築している中で、テーブル量産に伴う課題が生まれました。
課題: テーブルの用途が不明瞭に
「どのテーブルがどのダッシュボードやデータ出力に使われているか分からない」
テーブルが増え続けると、それぞれの用途や関連するダッシュボードやデータ出力が把握しづらくなり、メンテナンスやアップデートが困難になる問題が発生しました。
解決策: dbtのexposures機能を活用
dbtでは「〇〇テーブルは▲▲テーブルから抽出されている」といったテーブル間の依存関係を確認できます。exposures機能を追加することで、テーブル間の関係だけでなく、テーブルがどのアウトプットに使われているかも可視化できます。
これにより、下図のようなリネージュをdbt上で確認できるようになります。
補足:なぜ、TROCCOのデータリネージ機能を使わないのか
- TROCCOを中心にテータ分析基盤を構築している場合はTROCCOのデータリネージ機能で十分です
- dbtを中心に構築しているかつ、dbtのexposures機能はTROCCOを経由していないデータの出力情報(BI、Connected Sheetsなど)も登録できるため、dbtのexposures機能を使用します。
実現すること
このスクリプトは、以下のような処理を行います。
-
BigQueryからのデータ取得
TROCCOのデータ転送
に関連するクエリ情報をBigQueryから取得します。 -
取得データの整形
パイプラインIDごとにクエリ実行回数、クエリ内容、関連テーブル、クエリ実行者、実行時間(最新5件)を収集します。 -
YAMLファイルの生成
整形されたデータをYAML形式で出力し、dbt
のexposures
として利用可能な形式に変換します。
必要な前提条件
以下の環境が必要です。
- BigQuery
- TROCCO
- dbt
スクリプトの解説
BigQueryのINFORMATION_SCHEMA.JOBS_BY_PROJECT
テーブルを使用し、指定期間(例: 3ヶ月)内にTROCCOのデータ転送
が実行したクエリ情報を取得します。
import collections
from decimal import ROUND_HALF_UP, Decimal
from typing import Any, Dict
import yaml
from google.cloud import bigquery
import os
# 出力されたpipeline_idと内容を見ながらTROCCOのデータ転送名を登録
pipeline_name_by_pipeline_id = {
"abcdefg123456789abcdefg123456789": "TROCCOのデータ転送1 #99999",
}
# 除外リスト
deleted_pipeline_ids = {}
def round_to_highest_digit(number):
"""
yamlには大まかクエリ回数のみ記録したいため、最上位の桁で丸める
:param number: 丸める対象の数値(int または float)
:return: 最上位の桁で丸められた数値(int)
"""
if number == 0:
return 0
length = len(str(abs(number)))
highest_digit = int(str(number)[0])
return highest_digit * 10 ** (length - 1)
def get_trocco_pipelines_info(
client: bigquery.Client,
) -> Dict[str, Any]:
"""
TROCCOパイプラインに関する情報をBigQueryから取得し、クエリ回数やテーブル参照情報を元に整形した辞書を返します。
:param client: BigQueryのクライアントオブジェクト
:return: パイプラインIDをキーとし、各パイプラインに関連する情報(クエリ、オーナー、参照テーブル、クエリ回数、実行時刻)を含む辞書
"""
duration_days = 93 # 3ヶ月:93
infomation_schema_jobs = "region-asia-northeast1.INFORMATION_SCHEMA.JOBS_BY_PROJECT"
elementary_dbt_models = "my-project.my_dataset.dbt_models"
query = f"""
WITH trocco_audit_logs AS (
SELECT
creation_time
,TO_HEX(MD5(query)) AS pipeline_id -- 一意に特定できる情報がクエリしかないのでハッシュ化してidにする
,user_email
,query
,referenced_table
FROM `{infomation_schema_jobs}`
,UNNEST(referenced_tables) AS referenced_table
WHERE
destination_table.table_id LIKE 'trocco_%' -- TROCCOはデータ転送前に`trocco_%`の名前でテーブルを作成する
AND creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {duration_days} DAY)
)
,pipeline_owner AS (
SELECT
pipeline_id
,user_email AS pipeline_owner
FROM trocco_audit_logs
QUALIFY ROW_NUMBER() OVER (PARTITION BY pipeline_id ORDER BY creation_time DESC) = 1 -- 直近のクエリ発行者をレポートのオーナーと見なす
)
,queries_count_by_pipeline_id AS (
SELECT
pipeline_id
,COUNT(*) AS cnt
FROM trocco_audit_logs
GROUP BY pipeline_id
)
,queries_time_by_pipeline_id AS (
SELECT
pipeline_id
,ARRAY_AGG(
FORMAT_TIMESTAMP('%Y/%m/%d %H:%M', TIMESTAMP(creation_time), 'Asia/Tokyo')
ORDER BY creation_time DESC
LIMIT 5
) AS creation_times
FROM (SELECT DISTINCT pipeline_id,creation_time FROM trocco_audit_logs)
GROUP BY pipeline_id
)
,referenced_tables_by_pipeline_id AS (
SELECT DISTINCT
pipeline_id
,query
,referenced_table.table_id -- テーブル名とモデル名が同一ならこのまま
FROM trocco_audit_logs
)
SELECT
queries_count_by_pipeline_id.pipeline_id
,pipeline_owner.pipeline_owner
,referenced_tables_by_pipeline_id.query
,referenced_tables_by_pipeline_id.table_id
,queries_count_by_pipeline_id.cnt
,queries_time_by_pipeline_id.creation_times
FROM queries_count_by_pipeline_id
INNER JOIN queries_time_by_pipeline_id
USING (pipeline_id)
INNER JOIN pipeline_owner
USING (pipeline_id)
INNER JOIN referenced_tables_by_pipeline_id
USING (pipeline_id)
WHERE
cnt > 5
# ここは必須ではないが、dbtのモデルに関連するtableのみに絞るため入れている
-- AND referenced_tables_by_pipeline_id.table_id IN (SELECT alias FROM `{elementary_dbt_models}`)
ORDER BY queries_count_by_pipeline_id.cnt DESC
"""
# クエリ回数順にyamlに出力するため、OrderedDictを使用する
trocco_pipelines_info = collections.OrderedDict()
for _, item in client.query(query).result().to_dataframe().iterrows():
trocco_pipelines_info[item["pipeline_id"]] = {
"query": item["query"],
"pipeline_owner": item["pipeline_owner"],
"referenced_tables": trocco_pipelines_info.get(item["pipeline_id"], {}).get("referenced_tables", [])
+ [f"ref('{item['table_id']}')"],
"query_times": item["creation_times"],
"queries_count": item["cnt"],
}
return trocco_pipelines_info
def main():
"""
TROCCOのパイプライン情報を取得し、YAML形式でエクスポートするメイン処理。
この関数は、BigQueryからTROCCOのパイプラインに関する情報を取得し、各パイプラインの詳細をMarkdownテーブル形式で記録したYAMLファイルを生成します。
YAMLファイルは、データ転送のクエリ情報、関連するテーブル、最新のクエリ実行時刻、パイプラインのオーナーなどを含みます。
:return: None
"""
client = bigquery.Client()
trocco_pipelines_info = get_trocco_pipelines_info(
client,
)
result = {"version": 2, "exposures": []}
for pipeline_id, item in trocco_pipelines_info.items():
if pipeline_id in deleted_pipeline_ids:
continue
# query_times をマークダウンテーブルに変換
markdown_table = "| No | Query Time |\n|----|---------------------|\n"
for idx, query_time in enumerate(item["query_times"], start=1):
markdown_table += f"| {idx} | {query_time} |\n"
tmp = {
# レポート名はユニークである必要があるため、pipeline_idを使用する。dbtの制約からアンダーバーに変換する
"name": pipeline_id,
"label": pipeline_name_by_pipeline_id.get(pipeline_id, pipeline_id),
"description": f"### TROCCOのデータ転送\n\n#### Query\n\n```\n{item["query"]}\n```\n\n### Latest 5 Query Times\n{markdown_table}\n",
"type": "application",
"owner": {
"name": item["pipeline_owner"],
"email": item["pipeline_owner"],
},
"depends_on": sorted(item["referenced_tables"]),
"meta": {
# 定期的なyamlの更新により、yamlの修正行が不用意に増えるのを防ぎたいため大まかに丸める
"total_queries_count": round_to_highest_digit(item["queries_count"]),
},
}
result["exposures"].append(tmp)
print(yaml.dump(result, allow_unicode=True, sort_keys=False), end="")
if __name__ == "__main__":
main()
出力例
version: 2
exposures:
- name: abcdefg123456789abcdefg123456789
label: "TROCCOのデータ転送1 #99999"
description: |
### TROCCOのデータ転送
#### Query
```
SELECT ...
```
### Latest 5 Query Times
| No | Query Time |
|----|---------------------|
| 1 | 2024/12/01 12:34 |
| 2 | 2024/12/01 11:30 |
type: application
owner:
name: owner_email@example.com
email: owner_email@example.com
depends_on:
- ref('table_name')
meta:
total_queries_count: 1000
dbt docs
dbt docs(データリネージ)
手作業箇所
- pipeline_name_by_pipeline_id
- 出力されたpipeline_idと内容を見ながらTROCCOのデータ転送名を記載して行って下さい。
- deleted_pipeline_ids
- 古い転送設定や不要な転送設定があれば、pipeline_idを記載ください
pipeline_name_by_pipeline_id = {
"abcdefg123456789abcdefg123456789": "TROCCOのデータ転送1 #99999",
}
deleted_pipeline_ids = {}
実装上の工夫ポイント
TROCCOのデータ転送クエリの捕捉
TROCCOはデータ転送中にtrocco_%
の名前で一時テーブルを作成するため、'trocco_%'の条件でクエリを捕捉する
WHERE
destination_table.table_id LIKE 'trocco_%'
pipeline_idの作成
集計のためには、TROCCOはデータ転送ごとに一意のIDが必要です。今回は、データ転送で使用されるクエリ文字列をハッシュ化してIDにします。
- 上手くいかないパターン(現状許容するしかない)
- 同じクエリ文のデータ転送設定が複数ある場合→同じ転送設定とみなしてしまう
- データ転送設定の中のクエリが更新されて内容が変わる→別の転送設定とみなしてしまう
SELECT
creation_time
,TO_HEX(MD5(query)) AS pipeline_id
もしかしたら、TROCCO APIやBigQuery監査ログから一意のIDを特定できるかもしれません(未検証)
[おまけ] GitHub Actions
GitHub Actions
name: dbt exposure for TROCCO
run-name: Run by @${{ github.actor }} - ${{ github.workflow }}
on:
schedule:
- cron: "0 15 * * 5" # 金曜日の15:00に実行(UTC時間)→土曜のJST0時
workflow_dispatch: # 手動トリガーの追加
jobs:
dbt_exposure_for_trocco:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: 'pip' # caching pip dependencies
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install PyYAML==6.0.1 google-cloud-bigquery==3.25.0 pandas==2.2.2 db-dtypes==1.2.0
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Run Python script
run: python .github/script/dbt_exposure_for_trocco.py
- name: Remove sensitive files
if: always() # ワークフローの他のステップが失敗した場合でも、このステップが確実に実行
run: |
rm -rf gha*
rm -f gha-creds-*.json
- name: Diff
id: diff
run: |
set -x
git add -N .
git diff --name-only --exit-code
continue-on-error: true
- name: Get current date
id: date
run: |
echo "JST_DATE=$(TZ=Asia/Tokyo date +'%Y%m%d')" >> $GITHUB_ENV
- name: Get changed files
id: changed-files
if: steps.diff.outcome == 'failure'
run: |
echo "FILES<<EOF" >> $GITHUB_OUTPUT
# Process git diff output and format it
git -c core.quotepath=false diff --name-status | LC_ALL=C.UTF-8 sort -u | while read status file; do
# Remove leading whitespace and format status
status="[${status}]"
file=$(echo "$file" | sed 's/^[[:space:]]*//')
echo "$status$file" >> $GITHUB_OUTPUT
done
echo "EOF" >> $GITHUB_OUTPUT
- name: Create Pull Request
uses: peter-evans/create-pull-request@v6
with:
commit-message: 'update: TROCCO pipeline data ${{ env.JST_DATE }}'
branch: ${{ env.JST_DATE }}_update_trocco_pipeline_data
title: 'update: TROCCO pipeline data ${{ env.JST_DATE }}'
body: |
## 概要
BigQueryからTROCCOのパイプラインに関する情報を取得し、各パイプラインの詳細をMarkdownテーブル形式で記録したYAMLファイルを生成します。
## 変更されたファイル
- [ファイルのステータス](https://git-scm.com/docs/git-status/2.43.0#_short_format)(A = 新規追加、M = 変更、D = 削除)
```
${{ steps.changed-files.outputs.FILES }}
```
if: steps.diff.outcome == 'failure'
機能実装要望
TROCCOデータ転送中にtrocco_%
の名前で一時テーブルを作成するためのクエリにメタデータを付与して欲しい → これにより手作業箇所が完全になくなる。
dbtによるクエリ実行だと下記のメタデータが自動で付与される
/* {"app": "dbt", "dbt_version": "1.8.3", "profile_name": "xxxxxxx", "target_name": "xxxxxxx", "connection_name": "xxxxxxx"} */
select * from xxxxxxx
上記を参考に考えたメタデータを付与案
/* {"app": "trocco", "pipeline_id": "99999", "pipeline_name": "TROCCOのデータ転送1", "revision": "2"} */
select * from xxxxxxx
ご検討のほど、よろしくお願いします。
おわりに
このスクリプトを活用することで、TROCCOのデータ転送の使用状況を効率的に可視化し、監査や運用ドキュメントの作成を自動化できます。また、dbt
との統合により、データ基盤全体の透明性を高めることが可能です。
参考
下記の3部作はめっちゃ参考になりました〜
Discussion