Closed4

BigQueryのDescriptionをdbtのyamlに取り込む

oxonoxon

選択肢を考える

dbt-osmosis

https://github.com/z3z1ma/dbt-osmosis/blob/8c5781c059b7ac517f950995fc92b520a3532a3c/src/dbt_osmosis/core/osmosis.py#L405

https://github.com/dbt-labs/dbt-bigquery/blob/f454c47fc5cc1538f6b87d203b3fdc3b88b5242d/dbt/adapters/bigquery/impl.py#L377

oxonoxon

Google Cloud SDKでお手製スクリプト作る

すでに存在するdbt projectのsource.ymlをBigQueryのDescriptionで上書きしたい+ソースとなるテーブルが大量だったため一旦これに落ち着いた

結果

pythonスクリプトを使ってゴニョゴニョすることにした

import click
import ruamel.yaml
from google.cloud import bigquery
from typing import Dict, Any
import logging


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


# ruamel.yamlを使ってyamlファイルを読み込む (jinja文字列も安全にload)
yaml = ruamel.yaml.YAML(typ="rt")
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.preserve_quotes = True  # クォートを保持
yaml.default_flow_style = False  # ブロックスタイルで出力
yaml.allow_unicode = True  # 日本語も文字列として扱う
yaml.sort_keys = False  # ソートしない
yaml.width = 4096  # 折り返しなし


def load_source_yml(source_yml_path: str) -> Dict[str, Any]:
    """source.ymlファイルの読み込み
    """
    with open(source_yml_path, "r") as f:
        return yaml.load(f)


def get_table_info(
    client: bigquery.Client, source: Dict[str, Any], table: Dict[str, Any]
) -> bigquery.Table:
    """BigQueryからテーブル情報を取得"""
    dataset_id = source["schema"] if source["schema"] else source["name"]
    table_id = table["identifier"] if table["identifier"] else table["name"]
    table_ref = client.dataset(dataset_id).table(table_id)
    return client.get_table(table_ref)


def update_descriptions(table: Dict[str, Any], bq_table: bigquery.Table) -> None:
    """source_dataのdescriptionを更新
    同名のテーブルとカラムのdescriptionを更新する
    """
    if table["description"] == "":
        table["description"] = bq_table.description
        logger.info(
            f"Updated description for {table['name']}.description: {bq_table.description}"
        )
    for column in table["columns"]:
        for field in bq_table.schema:
            if (
                column["name"] == field.name
                and column["description"] == ""
                and field.description is not None
            ):
                column["description"] = field.description
                logger.info(
                    f"Updated description for {column['name']}.description: {field.description}"
                )


def save_source_yml(source_yml_path: str, source_data: Dict[str, Any]) -> None:
    """更新したsource.ymlを保存"""
    with open(source_yml_path, "w", encoding="utf-8") as f:
        yaml.dump(source_data, f)


@click.command()
@click.option("--project-id", required=True, help="GCPプロジェクトID")
@click.argument("source_yml_path", type=click.Path(exists=True))
def main(project_id: str, source_yml_path: str):
    """source.ymlのdescriptionをBigQueryのテーブル情報から更新"""
    logger.info(f"Starting update process for {source_yml_path}")

    source_data = load_source_yml(source_yml_path)
    client = bigquery.Client(project=project_id)

    for source in source_data["sources"]:
        for table in source["tables"]:
            logger.info(f"Processing table: {table['name']}")
            try:
                bq_table = get_table_info(client, source, table)
                update_descriptions(table, bq_table)
            except Exception as e:
                logger.error(f"Failed to get table info: {e}")
                continue

    save_source_yml(source_yml_path, source_data)
    logger.info(f"Finished update process for {source_yml_path}")


if __name__ == "__main__":
    main()

本家adapterを変えたらcodegenやosmosisを使って連携できてみんな幸せになりそうだが・・・一旦やりたいことはできたので良しとする

このスクラップは2024/06/28にクローズされました