Open11
【dbt】dbtの説明文管理

データリスト取得
with
column_list as (
select
table_catalog
, table_schema
, table_name
, column_name
, description
from
`project.dataset`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
)
, table_description as (
select
table_name
, replace(option_value, '\"', '') as table_description
from
`project.dataset`.INFORMATION_SCHEMA.TABLE_OPTIONS
where
option_name = 'description'
)
select
column_list.table_catalog
, column_list.table_schema
, column_list.table_name
, table_description.table_description
, column_list.column_name
, column_list.description
from
column_list
left join table_description
on column_list.table_name = table_description.table_name

yaml変換
import csv
import yaml
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパスを指定して実行
csv_file_path = 'data.csv'
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
CSVファイル
table_catalog,table_schema,table_name,table_description,column_name,description
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_date,イベントが発生した日付。形式はYYYY-MM-DD。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_timestamp,イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_name,イベントの名前。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_params,イベントに関連する追加のパラメータ。JSON形式で保存。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_previous_timestamp,前回のイベントが発生したタイムスタンプ。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_value_in_usd,イベントの価値(米ドル)。購入や取引の金額を示す。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_bundle_sequence_id,イベントがバンドル内で発生した順序を示すID。
結果
sources:
- name: dataset
database: project
schema: dataset
tables:
- name: events1
description: このテーブルはユーザーイベントの詳細を記録します。
columns:
- name: event_date
description: イベントが発生した日付。形式はYYYY-MM-DD。
- name: event_timestamp
description: イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
- name: event_name
description: イベントの名前。
- name: event_params
description: イベントに関連する追加のパラメータ。JSON形式で保存。
- name: events2
description: このテーブルはイベントの詳細を記録します。
columns:
- name: event_previous_timestamp
description: 前回のイベントが発生したタイムスタンプ。
- name: event_value_in_usd
description: イベントの価値(米ドル)。購入や取引の金額を示す。
- name: event_bundle_sequence_id
description: イベントがバンドル内で発生した順序を示すID。

の方がchatgptよりよかった

import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパス
csv_file_path = 'data.csv'
# 出力するYAMLファイルのパス
output_file_name = os.path.splitext(os.path.basename(csv_file_path))[0] + '_sources.yml'
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write(yaml_output)
print(f"YAMLファイルが {output_file_path} に保存されました。")

データセット複数選択
WITH RECURSIVE datasets AS (
SELECT schema_name
FROM `project`.INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name IN ('other_dataset1', 'other_dataset2') -- ここに必要なデータセット名を追加
),
column_list AS (
SELECT
table_catalog,
table_schema,
table_name,
column_name,
description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
WHERE table_schema = datasets.schema_name
),
table_description AS (
SELECT
table_schema,
table_name,
REPLACE(option_value, '\"', '') AS table_description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.TABLE_OPTIONS
WHERE
table_schema = datasets.schema_name
AND option_name = 'description'
)
SELECT
column_list.table_catalog,
column_list.table_schema,
column_list.table_name,
table_description.table_description,
column_list.column_name,
column_list.description
FROM
column_list
LEFT JOIN table_description
ON column_list.table_schema = table_description.table_schema
AND column_list.table_name = table_description.table_name
ORDER BY
column_list.table_schema,
column_list.table_name,
column_list.column_name

table_schemaごとに分ける
import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
schemas = defaultdict(lambda: {
'version': 2,
'sources': []
})
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
for schema, source in sources.items():
schemas[schema]['sources'].append(source)
# YAMLファイルごとに処理
for schema, data in schemas.items():
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
# 出力するYAMLファイルのパス
output_file_name = f"{schema}_sources.yml"
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write('\n'.join(yaml_lines))
print(f"YAMLファイルが {output_file_path} に保存されました。")
# CSVファイルのパス
csv_file_path = 'data.csv'
generate_yaml_from_csv(csv_file_path)

{% docs col_app_id %} Application ID e.g. ‘angry-birds’ is used to distinguish different applications that are being tracked by the same Snowplow stack, e.g. production versus dev. {% enddocs %}
{% docs col_platform %} Platform e.g. ‘web’ {% enddocs %}
{% docs col_etl_tstamp %} Timestamp event began ETL e.g. ‘2017-01-26 00:01:25.292’ {% enddocs %}
{% docs table_page_view_context %}
This context table contains the page_view_id associated with a given page view.
{% enddocs %}
{% docs table_ecommerce_user_context %}
This context table contains the data generated by the E-commerce user context. This context enables you to generate more user information about the user that is completing an action, such as whether they are a guest or not, and what their email address and user_id is in the case that they are logged in.
{% enddocs %}
# the intent of this .md is to remove redundancy in the documentation
# the below are descriptions from stg_tpch_line_items
{% docs order_item_key %} surrogate key for the model -- combo of order_key + line_number {% enddocs %}
{% docs line_number %} sequence of the order items within the order {% enddocs %}
{% docs return_flag %} letter determining the status of the return {% enddocs %}
{% docs ship_date %} the date the order item is being shipped {% enddocs %}
{% docs commit_date %} the date the order item is being commited {% enddocs %}
{% docs receipt_date %} the receipt date of the order item {% enddocs %}
{% docs ship_mode %} method of shipping {% enddocs %}
{% docs comment %} additional commentary {% enddocs %}
{% docs extended_price %} line item price {% enddocs %}
{% docs discount_percentage %} percentage of the discount {% enddocs %}
# the below are descriptions from stg_tpch_supppliers
{% docs supplier_name %} id of the supplier {% enddocs %}
{% docs supplier_address %} address of the supplier {% enddocs %}
{% docs phone_number %} phone number of the supplier {% enddocs %}
{% docs account_balance %} raw account balance {% enddocs %}

[varchar(256)]の情報入れるの良いアイデア
sources:
- name: log
schema: source__log
tables:
- name: nginx_access_logs
description: nginxが出力するログを取り込んだもの
columns:
- name: time
description: アクセス時刻 [timestamp without time zone]
- name: method
description: リクエストメソッド [varchar(8)]
- name: uri
description: リクエストURL [varchar(max)]
- name: user_id
description: ログインユーザーのID [varchar(256)]

dbt docs generate で作成するdocumentにおいて一部のmodelだけを表示することはできるのか?
結論としては selectやexcludeは利用可能だけど、適用されるのはcompileのみなのでdocumentの表示を限定することは無理っぽい
dbt_project.ymlにおいて enabled っというconfigを使うことで特定のmodelに限定して compileやdocument作成を行うことはできる。ただし部分的にdocumentを作るためにその都度dbt_project.ymlを書き換えるのはあまり良くない感じがする。こういったケースではdbt project自体を別に切り分けるほうがよい。
models:
tutorial:
# Config indicated by + and applies to all files under models/example/
+materialized: table
orders:
enabled: false
docs
falseにしたらdocsは作成されないが、リネージにグレーで表示される
models:
<resource-path>:
+docs:
show: true | false
enabled
リソースを有効化または無効化するためのオプション
dbt docs generate --vars 'enabled_schema: false'
でいけると思う
models:
work:
+materialized: view
+schema: work
+enabled: '{{ var("enabled_schema", true) }}'

import os
import yaml
import pandas as pd
import json
def extract_model_info(folder_path):
path = 'dbt_project/target/manifest.json'
dbt_docs_path = 'http://localhost:8080/#!/'
bq_path = 'https://console.cloud.google.com/bigquery?'
# manifest.json のデータを読み込む
with open(path, "r", encoding="utf-8") as f:
manifest = json.load(f)
# 必要な情報を抽出
rows = []
for node in manifest["nodes"].values(): # `data` を `manifest` に変更
if node.get("resource_type") != "model":
continue
if node.get("package_name") != "dbt_project":
continue
# if node.get("schema") != "~~":
# continue
dbt_docs_path = dbt_docs_path +"/"+node.get("resource_type")+"/"+ node.get("unique_id")
bq_path = bq_path + "p=" + node.get("database") + "&d=" + node.get("schema") + "&t=" + node.get("alias") + "&page=table"
model_name = "[" + node.get("name") + "](" + dbt_docs_path + ")"
bq = "["+node.get("database")+"."+node.get("schema")+"."+node.get("alias")+"]("+bq_path+")"
DB = "[*]("+bq_path+")"
rows.append({
# "database": node.get("database"),
# "schema": node.get("schema"),
# "unique_id": node.get("unique_id"),
# "alias": node.get("alias"),
"model_name": model_name,
"DB": DB,
"description": node.get("description", "").replace("\n", "<br />") # description が無い場合は空文字列を設定
})
# pandas DataFrame に変換
df = pd.DataFrame(rows)
print(df)
return df
folder_path = "dbt_project/models" # フォルダパスを指定
df = extract_model_info(folder_path)
# 先頭と末尾に追加する文字列
header_text = "{% docs __dbt_project__ %}\n# dbt_project\n"
footer_text = "\n\n{% enddocs %}"
# Markdown形式をファイルに保存
markdown_output = df.to_markdown(index=False)
# print(markdown_output)
with open("dbt_project/docs/dbt_project.md", "w", encoding="utf-8") as md_file:
md_file.write(header_text) # 先頭に追加
md_file.write(markdown_output) # 本文(Markdownテーブル)を追加
md_file.write(footer_text) # 末尾に追加
dbt_project
model_name | DB | description |
---|