Open9
【dbt】dbtの説明文管理
データリスト取得
with
column_list as (
select
table_catalog
, table_schema
, table_name
, column_name
, description
from
`project.dataset`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
)
, table_description as (
select
table_name
, replace(option_value, '\"', '') as table_description
from
`project.dataset`.INFORMATION_SCHEMA.TABLE_OPTIONS
where
option_name = 'description'
)
select
column_list.table_catalog
, column_list.table_schema
, column_list.table_name
, table_description.table_description
, column_list.column_name
, column_list.description
from
column_list
left join table_description
on column_list.table_name = table_description.table_name
yaml変換
import csv
import yaml
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパスを指定して実行
csv_file_path = 'data.csv'
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
CSVファイル
table_catalog,table_schema,table_name,table_description,column_name,description
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_date,イベントが発生した日付。形式はYYYY-MM-DD。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_timestamp,イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_name,イベントの名前。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_params,イベントに関連する追加のパラメータ。JSON形式で保存。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_previous_timestamp,前回のイベントが発生したタイムスタンプ。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_value_in_usd,イベントの価値(米ドル)。購入や取引の金額を示す。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_bundle_sequence_id,イベントがバンドル内で発生した順序を示すID。
結果
sources:
- name: dataset
database: project
schema: dataset
tables:
- name: events1
description: このテーブルはユーザーイベントの詳細を記録します。
columns:
- name: event_date
description: イベントが発生した日付。形式はYYYY-MM-DD。
- name: event_timestamp
description: イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
- name: event_name
description: イベントの名前。
- name: event_params
description: イベントに関連する追加のパラメータ。JSON形式で保存。
- name: events2
description: このテーブルはイベントの詳細を記録します。
columns:
- name: event_previous_timestamp
description: 前回のイベントが発生したタイムスタンプ。
- name: event_value_in_usd
description: イベントの価値(米ドル)。購入や取引の金額を示す。
- name: event_bundle_sequence_id
description: イベントがバンドル内で発生した順序を示すID。
の方がchatgptよりよかった
import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパス
csv_file_path = 'data.csv'
# 出力するYAMLファイルのパス
output_file_name = os.path.splitext(os.path.basename(csv_file_path))[0] + '_sources.yml'
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write(yaml_output)
print(f"YAMLファイルが {output_file_path} に保存されました。")
データセット複数選択
WITH RECURSIVE datasets AS (
SELECT schema_name
FROM `project`.INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name IN ('other_dataset1', 'other_dataset2') -- ここに必要なデータセット名を追加
),
column_list AS (
SELECT
table_catalog,
table_schema,
table_name,
column_name,
description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
WHERE table_schema = datasets.schema_name
),
table_description AS (
SELECT
table_schema,
table_name,
REPLACE(option_value, '\"', '') AS table_description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.TABLE_OPTIONS
WHERE
table_schema = datasets.schema_name
AND option_name = 'description'
)
SELECT
column_list.table_catalog,
column_list.table_schema,
column_list.table_name,
table_description.table_description,
column_list.column_name,
column_list.description
FROM
column_list
LEFT JOIN table_description
ON column_list.table_schema = table_description.table_schema
AND column_list.table_name = table_description.table_name
ORDER BY
column_list.table_schema,
column_list.table_name,
column_list.column_name
table_schemaごとに分ける
import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
schemas = defaultdict(lambda: {
'version': 2,
'sources': []
})
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
for schema, source in sources.items():
schemas[schema]['sources'].append(source)
# YAMLファイルごとに処理
for schema, data in schemas.items():
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
# 出力するYAMLファイルのパス
output_file_name = f"{schema}_sources.yml"
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write('\n'.join(yaml_lines))
print(f"YAMLファイルが {output_file_path} に保存されました。")
# CSVファイルのパス
csv_file_path = 'data.csv'
generate_yaml_from_csv(csv_file_path)
{% docs col_app_id %} Application ID e.g. ‘angry-birds’ is used to distinguish different applications that are being tracked by the same Snowplow stack, e.g. production versus dev. {% enddocs %}
{% docs col_platform %} Platform e.g. ‘web’ {% enddocs %}
{% docs col_etl_tstamp %} Timestamp event began ETL e.g. ‘2017-01-26 00:01:25.292’ {% enddocs %}
{% docs table_page_view_context %}
This context table contains the page_view_id associated with a given page view.
{% enddocs %}
{% docs table_ecommerce_user_context %}
This context table contains the data generated by the E-commerce user context. This context enables you to generate more user information about the user that is completing an action, such as whether they are a guest or not, and what their email address and user_id is in the case that they are logged in.
{% enddocs %}
# the intent of this .md is to remove redundancy in the documentation
# the below are descriptions from stg_tpch_line_items
{% docs order_item_key %} surrogate key for the model -- combo of order_key + line_number {% enddocs %}
{% docs line_number %} sequence of the order items within the order {% enddocs %}
{% docs return_flag %} letter determining the status of the return {% enddocs %}
{% docs ship_date %} the date the order item is being shipped {% enddocs %}
{% docs commit_date %} the date the order item is being commited {% enddocs %}
{% docs receipt_date %} the receipt date of the order item {% enddocs %}
{% docs ship_mode %} method of shipping {% enddocs %}
{% docs comment %} additional commentary {% enddocs %}
{% docs extended_price %} line item price {% enddocs %}
{% docs discount_percentage %} percentage of the discount {% enddocs %}
# the below are descriptions from stg_tpch_supppliers
{% docs supplier_name %} id of the supplier {% enddocs %}
{% docs supplier_address %} address of the supplier {% enddocs %}
{% docs phone_number %} phone number of the supplier {% enddocs %}
{% docs account_balance %} raw account balance {% enddocs %}
[varchar(256)]の情報入れるの良いアイデア
sources:
- name: log
schema: source__log
tables:
- name: nginx_access_logs
description: nginxが出力するログを取り込んだもの
columns:
- name: time
description: アクセス時刻 [timestamp without time zone]
- name: method
description: リクエストメソッド [varchar(8)]
- name: uri
description: リクエストURL [varchar(max)]
- name: user_id
description: ログインユーザーのID [varchar(256)]