Open7
dbtの説明文管理
データリスト取得
with
column_list as (
select
table_catalog
, table_schema
, table_name
, column_name
, description
from
`project.dataset`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
)
, table_description as (
select
table_name
, replace(option_value, '\"', '') as table_description
from
`project.dataset`.INFORMATION_SCHEMA.TABLE_OPTIONS
where
option_name = 'description'
)
select
column_list.table_catalog
, column_list.table_schema
, column_list.table_name
, table_description.table_description
, column_list.column_name
, column_list.description
from
column_list
left join table_description
on column_list.table_name = table_description.table_name
yaml変換
import csv
import yaml
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパスを指定して実行
csv_file_path = 'data.csv'
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
CSVファイル
table_catalog,table_schema,table_name,table_description,column_name,description
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_date,イベントが発生した日付。形式はYYYY-MM-DD。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_timestamp,イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_name,イベントの名前。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_params,イベントに関連する追加のパラメータ。JSON形式で保存。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_previous_timestamp,前回のイベントが発生したタイムスタンプ。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_value_in_usd,イベントの価値(米ドル)。購入や取引の金額を示す。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_bundle_sequence_id,イベントがバンドル内で発生した順序を示すID。
結果
sources:
- name: dataset
database: project
schema: dataset
tables:
- name: events1
description: このテーブルはユーザーイベントの詳細を記録します。
columns:
- name: event_date
description: イベントが発生した日付。形式はYYYY-MM-DD。
- name: event_timestamp
description: イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
- name: event_name
description: イベントの名前。
- name: event_params
description: イベントに関連する追加のパラメータ。JSON形式で保存。
- name: events2
description: このテーブルはイベントの詳細を記録します。
columns:
- name: event_previous_timestamp
description: 前回のイベントが発生したタイムスタンプ。
- name: event_value_in_usd
description: イベントの価値(米ドル)。購入や取引の金額を示す。
- name: event_bundle_sequence_id
description: イベントがバンドル内で発生した順序を示すID。
の方がchatgptよりよかった
import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
data = {
'version': 2,
'sources': []
}
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
data['sources'] = list(sources.values())
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
return '\n'.join(yaml_lines)
# CSVファイルのパス
csv_file_path = 'data.csv'
# 出力するYAMLファイルのパス
output_file_name = os.path.splitext(os.path.basename(csv_file_path))[0] + '_sources.yml'
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write(yaml_output)
print(f"YAMLファイルが {output_file_path} に保存されました。")
データセット複数選択
WITH RECURSIVE datasets AS (
SELECT schema_name
FROM `project`.INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name IN ('other_dataset1', 'other_dataset2') -- ここに必要なデータセット名を追加
),
column_list AS (
SELECT
table_catalog,
table_schema,
table_name,
column_name,
description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
WHERE table_schema = datasets.schema_name
),
table_description AS (
SELECT
table_schema,
table_name,
REPLACE(option_value, '\"', '') AS table_description
FROM datasets,
`project`.`region-asia-northeast1`.INFORMATION_SCHEMA.TABLE_OPTIONS
WHERE
table_schema = datasets.schema_name
AND option_name = 'description'
)
SELECT
column_list.table_catalog,
column_list.table_schema,
column_list.table_name,
table_description.table_description,
column_list.column_name,
column_list.description
FROM
column_list
LEFT JOIN table_description
ON column_list.table_schema = table_description.table_schema
AND column_list.table_name = table_description.table_name
ORDER BY
column_list.table_schema,
column_list.table_name,
column_list.column_name
table_schemaごとに分ける
import os
import yaml
import csv
from collections import defaultdict
def generate_yaml_from_csv(csv_file_path):
# データを格納する辞書
schemas = defaultdict(lambda: {
'version': 2,
'sources': []
})
# テーブル情報を一時的に格納する辞書
sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})
# CSVファイルを読み込む
with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
schema = row['table_schema']
database = row['table_catalog']
table_name = row['table_name']
# ソース情報を更新
sources[schema]['name'] = schema
sources[schema]['database'] = database
sources[schema]['schema'] = schema
# テーブル情報を追加
table = {
'name': table_name,
'description': row['table_description'],
'columns': [{
'name': row['column_name'],
'description': row['description']
}]
}
# 既存のテーブルに列を追加するか、新しいテーブルを作成
existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
if existing_table:
existing_table['columns'].append(table['columns'][0])
else:
sources[schema]['tables'].append(table)
# ソース情報をデータに追加
for schema, source in sources.items():
schemas[schema]['sources'].append(source)
# YAMLファイルごとに処理
for schema, data in schemas.items():
# YAMLに変換
yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)
# 空白行を挿入
yaml_lines = yaml_output.split('\n')
yaml_lines.insert(1, '') # version: 2 の後に空白行を挿入
for i, line in enumerate(yaml_lines):
if line.strip().startswith('schema:'):
yaml_lines.insert(i + 1, '') # schema: の後に空白行を挿入
# 出力するYAMLファイルのパス
output_file_name = f"{schema}_sources.yml"
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)
# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
outfile.write('\n'.join(yaml_lines))
print(f"YAMLファイルが {output_file_path} に保存されました。")
# CSVファイルのパス
csv_file_path = 'data.csv'
generate_yaml_from_csv(csv_file_path)