🍊

dbt の persist_docs を source にも反映させたい!(ちょい足し)

2024/12/19に公開

はじめに

まず、下記の記事を閲覧ください
https://zenn.dev/tanuhack/articles/727fbbfa9eae6f

dbt で管理しているカラムのメタデータ(dbt Docs の description)を DWH にも反映させたいとき、dbt_project.yml などの設定ファイルに persist_docs プロパティを仕込むと、これを実現することができます。

dbt_project.yml
models:
  <resource-path>:
    +persist_docs:
      relation: true
      columns: true

しかし、この persist_docs プロパティは、記事公開時点の 2024 年 12 月 11 日では、残念ながら source の dbt Docs には対応していません。

下記のissueにあるように今後対応しなさそうな気がします。

https://github.com/dbt-labs/dbt-bigquery/issues/416

ちょい足しパート

記事の最後にある課題に取り組みます。

  • doc 関数の反映
  • source の設定ファイルに複数のテーブルを記述しているときの対応
  • git diff で更新があったファイルだけ更新

対応済み

  • source の設定ファイルに複数のテーブルを記述しているときの対応
  • git diff で更新があったファイルだけ更新
  • テーブルやデータセット自体の説明の更新

残課題

  • doc 関数の反映
  • ワイルドカードテーブル
  • レコード型のカラムを含むテーブル

スクリプトの仕組み

YAMLファイルをもとにBigQueryのデータセットとテーブルの説明(description)を更新します

主な機能

  • BigQueryの更新
    • データセットの説明を更新(update_dataset_description)。
    • テーブルの説明を更新(update_table_description)。
    • 各カラムの説明を更新(update_schema)。
  • 除外リストの処理
    特定のYAMLファイルは除外リストに登録されており、それらをスキップします。

補足

  • YAMLファイルは、models/sources/ フォルダ内に配置されていることを想定。
  • 特定のBigQueryの制約(例: RECORD型カラムはALTER COLUMN不可)に対応していないため、除外リストを活用。

コード例

.github/script/update_source_schemas.py
from google.cloud import bigquery
from google.cloud.bigquery.client import Client
from glob import glob
import yaml
import sys
import os


def load_yaml(filepath: str) -> dict:
    with open(filepath, "r") as file:
        return yaml.safe_load(file)


def get_dataset_info(data: dict, i: int) -> tuple[str, str, str, str, list[dict[str, str]]]:
    dataset = data["sources"][0]["name"]
    dataset_description = data["sources"][0]["description"]
    table = data["sources"][0]["tables"][i]["name"]
    table_description = data["sources"][0]["tables"][i]["description"]
    columns = data["sources"][0]["tables"][i]["columns"]
    return dataset, dataset_description, table, table_description, columns


def update_schema(client: Client, dataset: str, table: str, columns: list[dict[str, str]]) -> None:
    destination = f"{client.project}.{dataset}.{table}"
    query_string = f"ALTER TABLE {destination} " + ", ".join(
        [
            f"""ALTER COLUMN `{col.get('name')}` SET OPTIONS(description='''{col.get('description')}''')"""
            for col in columns
        ]
    )
    print(f"\033[33m[INFO]\033[0m  - " + destination)
    query_job = client.query(query_string)
    query_job.result()


def update_dataset_description(client: Client, dataset_id: str, dataset_description: str):
    if dataset_description is not None:
        dataset_ref = client.dataset(dataset_id)
        dataset = client.get_dataset(dataset_ref)  # データセットを取得
        dataset.description = dataset_description  # 説明を設定
        client.update_dataset(dataset, ["description"])


def update_table_description(client: Client, dataset_id: str, table_id: str, table_description: str):
    if table_description is not None:
        table_ref = client.dataset(dataset_id).table(table_id)
        table = client.get_table(table_ref)  # テーブルを取得
        table.description = table_description  # 新しい説明を設定
        client.update_table(table, ["description"])


def handle_bq_error(error, message=None):
    # エラーメッセージを抽出
    error_msg = "\n".join([item["message"] for item in error.errors])

    # エラーに関連するクエリジョブの情報があればURLを生成
    if hasattr(error, "query_job"):
        location = error.query_job.location
        project_id = error.query_job.project
        job_id = error.query_job.job_id
        bq_console_url = (
            f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"
        )
        print(f"\033[31m[ERROR]\033[0m BigQuery Job Link: {bq_console_url}")
    else:
        print("\033[31m[ERROR]\033[0m No query job information available in the error.")


def main() -> None:
    # 除外リストにワイルドカードテーブルやRECORDカラムのあるテーブルを設定する
    deleted_filepath_names = {}

    files = sys.argv[1].split()
    print(f"\033[33m[INFO]\033[0m files : " + str(files))

    try:
        client = bigquery.Client()
        for filepath in files:
            if os.path.basename(filepath) in deleted_filepath_names:
                print(f"\033[33m[WARNING]\033[0m 除外リストに入っているため未処理 : {os.path.basename(filepath)}")
                continue
            data = load_yaml(filepath)
            if "sources" not in data.keys():
                print(f"\033[33m[WARNING]\033[0m sources項目がないため未処理")
                continue
            table_count = len(data.get("sources", [])[0].get("tables", []))
            for i in range(table_count):
                dataset, dataset_description, table, table_description, columns = get_dataset_info(data, i)
                update_schema(client, dataset, table, columns)
                update_table_description(client, dataset, table, table_description)
            update_dataset_description(client, dataset, dataset_description)
            print(f"\033[33m[INFO]\033[0m " + dataset + " : " + str(table_count))
        print("\033[32m[SUCCESS]\033[0m 全ての処理が終了しました。")
    except Exception as e:
        handle_bq_error(e, "Failed to execute BigQuery query")
        print(f"\033[31m[ERROR]\033[0m {e}")


if __name__ == "__main__":
    main()


GitHub Actionsの活用

GitHub Actionsを用いて、コードの変更時にスクリプトを自動実行する方法を説明します。

ワークフローの構成

このGitHub Actionsのワークフローは、my_project/models/staging/内の更新されたYAMLファイルを検出し、それをもとにBigQueryのスキーマを更新するPythonスクリプトを実行します。

  • UPDATED_FILES: 更新されたファイルのリストを環境変数として渡します。
.github/workflows/update_source_schemas.yml
name: Update Source Schemas
run-name: Run by @${{ github.actor }} - ${{ github.workflow }}

on:
  push:
    branches:
      - main
    paths:
      - 'my_project/models/staging/*.yml'
  workflow_dispatch:  # 手動実行用

jobs:
  update_source_schemas:
    runs-on: ubuntu-latest

    steps:
      # リポジトリをチェックアウト
      - name: Check out repository
        uses: actions/checkout@v3
        with:
          fetch-depth: 0  # 全履歴をフェッチ

      # 更新されたファイルのリストを取得
      - name: Get updated YAML files
        id: get_updated_files
        run: |
          UPDATED_FILES=$(git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep '^my_project/models/staging/.*\.yml$' || true)
          echo "updated_files<<EOF" >> $GITHUB_OUTPUT
          echo "$UPDATED_FILES" >> $GITHUB_OUTPUT
          echo "EOF" >> $GITHUB_OUTPUT
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'
          cache: 'pip' # caching pip dependencies

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install pyyaml google-cloud-bigquery
          
      - name: Authenticate to Google Cloud
        uses: google-github-actions/auth@v2
        with:
          credentials_json: ${{ secrets.GCP_SA_KEY }}
          
      - name: Run Python script
        env:
          UPDATED_FILES: ${{ steps.get_updated_files.outputs.updated_files }}
        run: |
          python .github/script/update_source_schemas.py "$UPDATED_FILES"

おわりに

source の dbt Docs を DWH に反映するために、GitHub Actions を使用して実現する方法を紹介しました。
今回、dbtのsource.ymlからsourceのテーブルをアップデートするアプローチですが、参照元のテーブルにすでにメタデータが入っていればdbt-source-importerを使用するアプローチも良いと思います。

https://www.yasuhisay.info/entry/2022/01/22/121000
https://github.com/syou6162/dbt-source-importer
残課題として下記があります。力尽きたので、次の人に託します。

  • doc 関数の反映
  • ワイルドカードテーブル
  • レコード型のカラムを含むテーブル

参考

https://zenn.dev/tanuhack/articles/727fbbfa9eae6f
https://github.com/dbt-labs/dbt-bigquery/issues/416
https://www.yasuhisay.info/entry/2022/01/22/121000
https://github.com/syou6162/dbt-source-importer

Discussion