🍊
dbt の persist_docs を source にも反映させたい!(ちょい足し)
はじめに
まず、下記の記事を閲覧ください
dbt で管理しているカラムのメタデータ(dbt Docs の description)を DWH にも反映させたいとき、dbt_project.yml などの設定ファイルに persist_docs プロパティを仕込むと、これを実現することができます。
dbt_project.yml
models:
<resource-path>:
+persist_docs:
relation: true
columns: true
しかし、この persist_docs プロパティは、記事公開時点の 2024 年 12 月 11 日では、残念ながら source の dbt Docs には対応していません。
下記のissueにあるように今後対応しなさそうな気がします。
ちょい足しパート
記事の最後にある課題に取り組みます。
- doc 関数の反映
- source の設定ファイルに複数のテーブルを記述しているときの対応
- git diff で更新があったファイルだけ更新
対応済み
- source の設定ファイルに複数のテーブルを記述しているときの対応
- git diff で更新があったファイルだけ更新
- テーブルやデータセット自体の説明の更新
残課題
- doc 関数の反映
- ワイルドカードテーブル
- レコード型のカラムを含むテーブル
スクリプトの仕組み
YAMLファイルをもとにBigQueryのデータセットとテーブルの説明(description)を更新します
主な機能
-
BigQueryの更新
- データセットの説明を更新(
update_dataset_description
)。 - テーブルの説明を更新(
update_table_description
)。 - 各カラムの説明を更新(
update_schema
)。
- データセットの説明を更新(
-
除外リストの処理
特定のYAMLファイルは除外リストに登録されており、それらをスキップします。
補足
- YAMLファイルは、
models/sources/
フォルダ内に配置されていることを想定。 - 特定のBigQueryの制約(例: RECORD型カラムは
ALTER COLUMN
不可)に対応していないため、除外リストを活用。
コード例
.github/script/update_source_schemas.py
from google.cloud import bigquery
from google.cloud.bigquery.client import Client
from glob import glob
import yaml
import sys
import os
def load_yaml(filepath: str) -> dict:
with open(filepath, "r") as file:
return yaml.safe_load(file)
def get_dataset_info(data: dict, i: int) -> tuple[str, str, str, str, list[dict[str, str]]]:
dataset = data["sources"][0]["name"]
dataset_description = data["sources"][0]["description"]
table = data["sources"][0]["tables"][i]["name"]
table_description = data["sources"][0]["tables"][i]["description"]
columns = data["sources"][0]["tables"][i]["columns"]
return dataset, dataset_description, table, table_description, columns
def update_schema(client: Client, dataset: str, table: str, columns: list[dict[str, str]]) -> None:
destination = f"{client.project}.{dataset}.{table}"
query_string = f"ALTER TABLE {destination} " + ", ".join(
[
f"""ALTER COLUMN `{col.get('name')}` SET OPTIONS(description='''{col.get('description')}''')"""
for col in columns
]
)
print(f"\033[33m[INFO]\033[0m - " + destination)
query_job = client.query(query_string)
query_job.result()
def update_dataset_description(client: Client, dataset_id: str, dataset_description: str):
if dataset_description is not None:
dataset_ref = client.dataset(dataset_id)
dataset = client.get_dataset(dataset_ref) # データセットを取得
dataset.description = dataset_description # 説明を設定
client.update_dataset(dataset, ["description"])
def update_table_description(client: Client, dataset_id: str, table_id: str, table_description: str):
if table_description is not None:
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref) # テーブルを取得
table.description = table_description # 新しい説明を設定
client.update_table(table, ["description"])
def handle_bq_error(error, message=None):
# エラーメッセージを抽出
error_msg = "\n".join([item["message"] for item in error.errors])
# エラーに関連するクエリジョブの情報があればURLを生成
if hasattr(error, "query_job"):
location = error.query_job.location
project_id = error.query_job.project
job_id = error.query_job.job_id
bq_console_url = (
f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"
)
print(f"\033[31m[ERROR]\033[0m BigQuery Job Link: {bq_console_url}")
else:
print("\033[31m[ERROR]\033[0m No query job information available in the error.")
def main() -> None:
# 除外リストにワイルドカードテーブルやRECORDカラムのあるテーブルを設定する
deleted_filepath_names = {}
files = sys.argv[1].split()
print(f"\033[33m[INFO]\033[0m files : " + str(files))
try:
client = bigquery.Client()
for filepath in files:
if os.path.basename(filepath) in deleted_filepath_names:
print(f"\033[33m[WARNING]\033[0m 除外リストに入っているため未処理 : {os.path.basename(filepath)}")
continue
data = load_yaml(filepath)
if "sources" not in data.keys():
print(f"\033[33m[WARNING]\033[0m sources項目がないため未処理")
continue
table_count = len(data.get("sources", [])[0].get("tables", []))
for i in range(table_count):
dataset, dataset_description, table, table_description, columns = get_dataset_info(data, i)
update_schema(client, dataset, table, columns)
update_table_description(client, dataset, table, table_description)
update_dataset_description(client, dataset, dataset_description)
print(f"\033[33m[INFO]\033[0m " + dataset + " : " + str(table_count))
print("\033[32m[SUCCESS]\033[0m 全ての処理が終了しました。")
except Exception as e:
handle_bq_error(e, "Failed to execute BigQuery query")
print(f"\033[31m[ERROR]\033[0m {e}")
if __name__ == "__main__":
main()
GitHub Actionsの活用
GitHub Actionsを用いて、コードの変更時にスクリプトを自動実行する方法を説明します。
ワークフローの構成
このGitHub Actionsのワークフローは、my_project/models/staging/内の更新されたYAMLファイルを検出し、それをもとにBigQueryのスキーマを更新するPythonスクリプトを実行します。
- UPDATED_FILES: 更新されたファイルのリストを環境変数として渡します。
.github/workflows/update_source_schemas.yml
name: Update Source Schemas
run-name: Run by @${{ github.actor }} - ${{ github.workflow }}
on:
push:
branches:
- main
paths:
- 'my_project/models/staging/*.yml'
workflow_dispatch: # 手動実行用
jobs:
update_source_schemas:
runs-on: ubuntu-latest
steps:
# リポジトリをチェックアウト
- name: Check out repository
uses: actions/checkout@v3
with:
fetch-depth: 0 # 全履歴をフェッチ
# 更新されたファイルのリストを取得
- name: Get updated YAML files
id: get_updated_files
run: |
UPDATED_FILES=$(git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep '^my_project/models/staging/.*\.yml$' || true)
echo "updated_files<<EOF" >> $GITHUB_OUTPUT
echo "$UPDATED_FILES" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: 'pip' # caching pip dependencies
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pyyaml google-cloud-bigquery
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Run Python script
env:
UPDATED_FILES: ${{ steps.get_updated_files.outputs.updated_files }}
run: |
python .github/script/update_source_schemas.py "$UPDATED_FILES"
おわりに
source の dbt Docs を DWH に反映するために、GitHub Actions を使用して実現する方法を紹介しました。
今回、dbtのsource.ymlからsourceのテーブルをアップデートするアプローチですが、参照元のテーブルにすでにメタデータが入っていればdbt-source-importer
を使用するアプローチも良いと思います。
残課題として下記があります。力尽きたので、次の人に託します。
- doc 関数の反映
- ワイルドカードテーブル
- レコード型のカラムを含むテーブル
参考
Discussion