Closed4
BigQueryのDescriptionをdbtのyamlに取り込む
選択肢を考える
dbt-osmosis
- osmosis側には
comment
というキーで引き受ける準備はある - adapter側(dbt-bigquery)のカラムを取得する関数がdescriptionを取得できない
- パッと見snowflakeもathenaもcommentを取ってるアダプターが見当たらない
dbt-codegen
これもadapterから取得しているのでosmosisと同様
dbt-source-importer
自作している方がいた
テーブルごとにcliでyamlを作成していく形
Google Cloud SDKでお手製スクリプト作る
すでに存在するdbt projectのsource.ymlをBigQueryのDescriptionで上書きしたい+ソースとなるテーブルが大量だったため一旦これに落ち着いた
結果
pythonスクリプトを使ってゴニョゴニョすることにした
import click
import ruamel.yaml
from google.cloud import bigquery
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# ruamel.yamlを使ってyamlファイルを読み込む (jinja文字列も安全にload)
yaml = ruamel.yaml.YAML(typ="rt")
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.preserve_quotes = True # クォートを保持
yaml.default_flow_style = False # ブロックスタイルで出力
yaml.allow_unicode = True # 日本語も文字列として扱う
yaml.sort_keys = False # ソートしない
yaml.width = 4096 # 折り返しなし
def load_source_yml(source_yml_path: str) -> Dict[str, Any]:
"""source.ymlファイルの読み込み
"""
with open(source_yml_path, "r") as f:
return yaml.load(f)
def get_table_info(
client: bigquery.Client, source: Dict[str, Any], table: Dict[str, Any]
) -> bigquery.Table:
"""BigQueryからテーブル情報を取得"""
dataset_id = source["schema"] if source["schema"] else source["name"]
table_id = table["identifier"] if table["identifier"] else table["name"]
table_ref = client.dataset(dataset_id).table(table_id)
return client.get_table(table_ref)
def update_descriptions(table: Dict[str, Any], bq_table: bigquery.Table) -> None:
"""source_dataのdescriptionを更新
同名のテーブルとカラムのdescriptionを更新する
"""
if table["description"] == "":
table["description"] = bq_table.description
logger.info(
f"Updated description for {table['name']}.description: {bq_table.description}"
)
for column in table["columns"]:
for field in bq_table.schema:
if (
column["name"] == field.name
and column["description"] == ""
and field.description is not None
):
column["description"] = field.description
logger.info(
f"Updated description for {column['name']}.description: {field.description}"
)
def save_source_yml(source_yml_path: str, source_data: Dict[str, Any]) -> None:
"""更新したsource.ymlを保存"""
with open(source_yml_path, "w", encoding="utf-8") as f:
yaml.dump(source_data, f)
@click.command()
@click.option("--project-id", required=True, help="GCPプロジェクトID")
@click.argument("source_yml_path", type=click.Path(exists=True))
def main(project_id: str, source_yml_path: str):
"""source.ymlのdescriptionをBigQueryのテーブル情報から更新"""
logger.info(f"Starting update process for {source_yml_path}")
source_data = load_source_yml(source_yml_path)
client = bigquery.Client(project=project_id)
for source in source_data["sources"]:
for table in source["tables"]:
logger.info(f"Processing table: {table['name']}")
try:
bq_table = get_table_info(client, source, table)
update_descriptions(table, bq_table)
except Exception as e:
logger.error(f"Failed to get table info: {e}")
continue
save_source_yml(source_yml_path, source_data)
logger.info(f"Finished update process for {source_yml_path}")
if __name__ == "__main__":
main()
本家adapterを変えたらcodegenやosmosisを使って連携できてみんな幸せになりそうだが・・・一旦やりたいことはできたので良しとする
このスクラップは2024/06/28にクローズされました