Open9

【dbt】dbtの説明文管理

YuichiYuichi

データリスト取得

with
column_list as (
  select
    table_catalog
    , table_schema
    , table_name
    , column_name
    , description
  from
    `project.dataset`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
)
, table_description as (
  select
    table_name
    , replace(option_value, '\"', '') as table_description
  from
    `project.dataset`.INFORMATION_SCHEMA.TABLE_OPTIONS
  where
    option_name = 'description'
)
select
  column_list.table_catalog
  , column_list.table_schema
  , column_list.table_name
  , table_description.table_description
  , column_list.column_name
  , column_list.description
from
  column_list
  left join table_description
    on column_list.table_name = table_description.table_name
YuichiYuichi

yaml変換

import csv
import yaml
from collections import defaultdict

def generate_yaml_from_csv(csv_file_path):
    # データを格納する辞書
    data = {
        'version': 2,
        'sources': []
    }

    # テーブル情報を一時的に格納する辞書
    sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})

    # CSVファイルを読み込む
    with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            schema = row['table_schema']
            database = row['table_catalog']
            table_name = row['table_name']
            
            # ソース情報を更新
            sources[schema]['name'] = schema
            sources[schema]['database'] = database
            sources[schema]['schema'] = schema

            # テーブル情報を追加
            table = {
                'name': table_name,
                'description': row['table_description'],
                'columns': [{
                    'name': row['column_name'],
                    'description': row['description']
                }]
            }
            
            # 既存のテーブルに列を追加するか、新しいテーブルを作成
            existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
            if existing_table:
                existing_table['columns'].append(table['columns'][0])
            else:
                sources[schema]['tables'].append(table)

    # ソース情報をデータに追加
    data['sources'] = list(sources.values())

    # YAMLに変換
    yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)

    # 空白行を挿入
    yaml_lines = yaml_output.split('\n')
    yaml_lines.insert(1, '')  # version: 2 の後に空白行を挿入
    for i, line in enumerate(yaml_lines):
        if line.strip().startswith('schema:'):
            yaml_lines.insert(i + 1, '')  # schema: の後に空白行を挿入

    return '\n'.join(yaml_lines)

# CSVファイルのパスを指定して実行
csv_file_path = 'data.csv'
yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)

CSVファイル

table_catalog,table_schema,table_name,table_description,column_name,description
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_date,イベントが発生した日付。形式はYYYY-MM-DD。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_timestamp,イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_name,イベントの名前。
project,dataset,events1,このテーブルはユーザーイベントの詳細を記録します。,event_params,イベントに関連する追加のパラメータ。JSON形式で保存。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_previous_timestamp,前回のイベントが発生したタイムスタンプ。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_value_in_usd,イベントの価値(米ドル)。購入や取引の金額を示す。
project,dataset,events2,このテーブルはイベントの詳細を記録します。,event_bundle_sequence_id,イベントがバンドル内で発生した順序を示すID。

結果

sources:
- name: dataset
  database: project
  schema: dataset

  tables:
  - name: events1
    description: このテーブルはユーザーイベントの詳細を記録します。
    columns:
    - name: event_date
      description: イベントが発生した日付。形式はYYYY-MM-DD。
    - name: event_timestamp
      description: イベントが発生した正確なタイムスタンプ。UNIXエポック形式。
    - name: event_name
      description: イベントの名前。
    - name: event_params
      description: イベントに関連する追加のパラメータ。JSON形式で保存。
  - name: events2
    description: このテーブルはイベントの詳細を記録します。
    columns:
    - name: event_previous_timestamp
      description: 前回のイベントが発生したタイムスタンプ。
    - name: event_value_in_usd
      description: イベントの価値(米ドル)。購入や取引の金額を示す。
    - name: event_bundle_sequence_id
      description: イベントがバンドル内で発生した順序を示すID。
YuichiYuichi
import os
import yaml
import csv
from collections import defaultdict

def generate_yaml_from_csv(csv_file_path):
    # データを格納する辞書
    data = {
        'version': 2,
        'sources': []
    }

    # テーブル情報を一時的に格納する辞書
    sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})

    # CSVファイルを読み込む
    with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            schema = row['table_schema']
            database = row['table_catalog']
            table_name = row['table_name']
            
            # ソース情報を更新
            sources[schema]['name'] = schema
            sources[schema]['database'] = database
            sources[schema]['schema'] = schema

            # テーブル情報を追加
            table = {
                'name': table_name,
                'description': row['table_description'],
                'columns': [{
                    'name': row['column_name'],
                    'description': row['description']
                }]
            }
            
            # 既存のテーブルに列を追加するか、新しいテーブルを作成
            existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
            if existing_table:
                existing_table['columns'].append(table['columns'][0])
            else:
                sources[schema]['tables'].append(table)

    # ソース情報をデータに追加
    data['sources'] = list(sources.values())

    # YAMLに変換
    yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)

    # 空白行を挿入
    yaml_lines = yaml_output.split('\n')
    yaml_lines.insert(1, '')  # version: 2 の後に空白行を挿入
    for i, line in enumerate(yaml_lines):
        if line.strip().startswith('schema:'):
            yaml_lines.insert(i + 1, '')  # schema: の後に空白行を挿入

    return '\n'.join(yaml_lines)

# CSVファイルのパス
csv_file_path = 'data.csv'

# 出力するYAMLファイルのパス
output_file_name = os.path.splitext(os.path.basename(csv_file_path))[0] + '_sources.yml'
output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)

yaml_output = generate_yaml_from_csv(csv_file_path)
print(yaml_output)

# YAMLファイルに直接書き込む
with open(output_file_path, 'w', encoding='utf-8') as outfile:
    outfile.write(yaml_output)

print(f"YAMLファイルが {output_file_path} に保存されました。")

YuichiYuichi

データセット複数選択

WITH RECURSIVE datasets AS (
  SELECT schema_name
  FROM `project`.INFORMATION_SCHEMA.SCHEMATA
  WHERE schema_name IN ('other_dataset1', 'other_dataset2') -- ここに必要なデータセット名を追加
),
column_list AS (
  SELECT
    table_catalog,
    table_schema,
    table_name,
    column_name,
    description
  FROM datasets,
    `project`.`region-asia-northeast1`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
  WHERE table_schema = datasets.schema_name
),
table_description AS (
  SELECT
    table_schema,
    table_name,
    REPLACE(option_value, '\"', '') AS table_description
  FROM datasets,
    `project`.`region-asia-northeast1`.INFORMATION_SCHEMA.TABLE_OPTIONS
  WHERE 
    table_schema = datasets.schema_name
    AND option_name = 'description'
)
SELECT
  column_list.table_catalog,
  column_list.table_schema,
  column_list.table_name,
  table_description.table_description,
  column_list.column_name,
  column_list.description
FROM
  column_list
  LEFT JOIN table_description
    ON column_list.table_schema = table_description.table_schema
    AND column_list.table_name = table_description.table_name
ORDER BY
  column_list.table_schema,
  column_list.table_name,
  column_list.column_name
YuichiYuichi

table_schemaごとに分ける

import os
import yaml
import csv
from collections import defaultdict

def generate_yaml_from_csv(csv_file_path):
    # データを格納する辞書
    schemas = defaultdict(lambda: {
        'version': 2,
        'sources': []
    })

    # テーブル情報を一時的に格納する辞書
    sources = defaultdict(lambda: {'name': '', 'database': '', 'schema': '', 'tables': []})

    # CSVファイルを読み込む
    with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            schema = row['table_schema']
            database = row['table_catalog']
            table_name = row['table_name']
            
            # ソース情報を更新
            sources[schema]['name'] = schema
            sources[schema]['database'] = database
            sources[schema]['schema'] = schema

            # テーブル情報を追加
            table = {
                'name': table_name,
                'description': row['table_description'],
                'columns': [{
                    'name': row['column_name'],
                    'description': row['description']
                }]
            }
            
            # 既存のテーブルに列を追加するか、新しいテーブルを作成
            existing_table = next((t for t in sources[schema]['tables'] if t['name'] == table_name), None)
            if existing_table:
                existing_table['columns'].append(table['columns'][0])
            else:
                sources[schema]['tables'].append(table)

    # ソース情報をデータに追加
    for schema, source in sources.items():
        schemas[schema]['sources'].append(source)

    # YAMLファイルごとに処理
    for schema, data in schemas.items():
        # YAMLに変換
        yaml_output = yaml.dump(data, sort_keys=False, allow_unicode=True)

        # 空白行を挿入
        yaml_lines = yaml_output.split('\n')
        yaml_lines.insert(1, '')  # version: 2 の後に空白行を挿入
        for i, line in enumerate(yaml_lines):
            if line.strip().startswith('schema:'):
                yaml_lines.insert(i + 1, '')  # schema: の後に空白行を挿入

        # 出力するYAMLファイルのパス
        output_file_name = f"{schema}_sources.yml"
        output_file_path = os.path.join(os.path.dirname(csv_file_path), output_file_name)

        # YAMLファイルに直接書き込む
        with open(output_file_path, 'w', encoding='utf-8') as outfile:
            outfile.write('\n'.join(yaml_lines))

        print(f"YAMLファイルが {output_file_path} に保存されました。")

# CSVファイルのパス
csv_file_path = 'data.csv'

generate_yaml_from_csv(csv_file_path)
YuichiYuichi

https://zenn.dev/koh_yoshi/articles/4833d6522f751b

{% docs col_app_id %} Application ID e.g. ‘angry-birds’ is used to distinguish different applications that are being tracked by the same Snowplow stack, e.g. production versus dev. {% enddocs %}

{% docs col_platform %} Platform e.g. ‘web’ {% enddocs %}

{% docs col_etl_tstamp %} Timestamp event began ETL e.g. ‘2017-01-26 00:01:25.292’ {% enddocs %}

{% docs table_page_view_context %}

This context table contains the page_view_id associated with a given page view.

{% enddocs %}

{% docs table_ecommerce_user_context %}

This context table contains the data generated by the E-commerce user context. This context enables you to generate more user information about the user that is completing an action, such as whether they are a guest or not, and what their email address and user_id is in the case that they are logged in.

{% enddocs %}

# the intent of this .md is to remove redundancy in the documentation


# the below are descriptions from stg_tpch_line_items

{% docs order_item_key %} surrogate key for the model -- combo of order_key + line_number {% enddocs %}

{% docs line_number %} sequence of the order items within the order {% enddocs %}

{% docs return_flag %} letter determining the status of the return {% enddocs %}

{% docs ship_date %} the date the order item is being shipped {% enddocs %}

{% docs commit_date %} the date the order item is being commited {% enddocs %}

{% docs receipt_date %} the receipt date of the order item {% enddocs %}

{% docs ship_mode %} method of shipping {% enddocs %}

{% docs comment %} additional commentary {% enddocs %}

{% docs extended_price %} line item price {% enddocs %}

{% docs discount_percentage %} percentage of the discount {% enddocs %}


# the below are descriptions from stg_tpch_supppliers

{% docs supplier_name %} id of the supplier {% enddocs %}

{% docs supplier_address %} address of the supplier {% enddocs %}

{% docs phone_number %} phone number of the supplier {% enddocs %}

{% docs account_balance %} raw account balance {% enddocs %}
YuichiYuichi

[varchar(256)]の情報入れるの良いアイデア
https://techblog.kayac.com/working-with-dbt-incremental-model

sources:
  - name: log
    schema: source__log
    tables:
      - name: nginx_access_logs
        description: nginxが出力するログを取り込んだもの
        columns:
          - name: time
            description: アクセス時刻 [timestamp without time zone]
          - name: method
            description: リクエストメソッド [varchar(8)]
          - name: uri
            description: リクエストURL [varchar(max)]
          - name: user_id 
            description: ログインユーザーのID [varchar(256)]