Open14

【dbt】DuckDB

YuichiYuichi
dbt --version
dbt debug

pip install "dlt[duckdb]"
duckdb db/dbt.duckdb

dlt init filesystem duckdb
新しいパイプライン ファイルシステムをカスタマイズする準備ができました。
* filesystem_pipeline.py で dlt がデータをロードする方法を確認し、変更します
* ./.dlt/secrets.toml で duckdb およびその他のシークレットの認証情報を追加します
* requirements.txt に必要な依存関係を追加します:
dlt[duckdb,filesystem]>=1.5.0
dlt 依存関係がすでに追加されている場合は、duckdb の追加を必ずインストールしてください
pip を使用してインストールするには: pip3 install 'dlt[duckdb,filesystem]>=1.5.0'

* 詳細については、https://dlthub.com/docs/walkthroughs/create-a-pipeline を参照してください
YuichiYuichi

raw_ → データレイク(未加工データ)
stg_ → ステージング(加工済みデータ)
int_ → 中間データ(ロジック適用後のデータ)
mrt_ → マート(最終分析用データ)

1. シンプルなレイヤー名

役割が短い接頭辞で明確に区別できる命名規則。

  • raw_* → データレイク(未加工データ)
    例: raw_sales, raw_logs
  • stage_* → ステージング(加工・整形データ)
    例: stage_customers, stage_orders
  • transform_* → 中間データ(ビジネスロジック適用後のデータ)
    例: transform_metrics, transform_finance
  • final_* → マート(完成形データ)
    例: final_kpis, final_entities

2. カラーモデル命名

レイヤーを「ブロンズ→シルバー→ゴールド」のように色で表現する方法。

  • bronze_* → データレイク
    例: bronze_transactions, bronze_events
  • silver_* → ステージング
    例: silver_customers, silver_sessions
  • gold_* → 中間データまたはマート
    例: gold_financials, gold_marketing_kpis
  • platinum_* → 最終マート(特に重要な分析用データ)
    例: platinum_insights, platinum_strategic_kpis

3. アクションベース命名

データのライフサイクルに基づき、動詞を用いた接頭辞を付ける方法。

  • ingest_* → データレイク(取り込み段階)
    例: ingest_logs, ingest_raw_data
  • clean_* → ステージング(データ整形)
    例: clean_orders, clean_events
  • prepare_* → 中間データ(準備段階)
    例: prepare_metrics, prepare_entities
  • deliver_* → マート(最終データ配信)
    例: deliver_kpis, deliver_dashboard_data

4. レイヤー番号命名

レイヤーの順序を数値で示し、データフローのステップを明示する方法。

  • l1_* → データレイク(Layer 1: Raw Data)
    例: l1_raw_events, l1_raw_logs
  • l2_* → ステージング(Layer 2: Cleaned Data)
    例: l2_stage_customers, l2_stage_transactions
  • l3_* → 中間データ(Layer 3: Transformed Data)
    例: l3_enriched_kpis, l3_prepared_finance
  • l4_* → マート(Layer 4: Final Data)
    例: l4_mart_kpis, l4_dashboard_ready

5. 業務・目的別命名

データの用途や最終消費者に合わせて役割を明確化。

  • raw_* → データレイク
    例: raw_finance, raw_marketing
  • base_* → ステージング(基本構築データ)
    例: base_customers, base_products
  • logic_* → 中間データ(ロジック適用済み)
    例: logic_sales, logic_operations
  • output_* → マート(最終出力データ)
    例: output_kpis, output_analysis

6. 日本語ベースの命名

日本語チーム向けに親しみやすく、業務フローを意識した命名規則。

  • raw_データ → データレイク
    例: raw_売上, raw_ログ
  • 整形_データ → ステージング
    例: 整形_顧客, 整形_注文
  • 加工_データ → 中間データ
    例: 加工_指標, 加工_在庫
  • 分析_データ → マート
    例: 分析_収益, 分析_KPI

7. その他のカスタム案

用途や好みに応じた独自の接頭辞を検討する方法。

  • lake_* → データレイク
  • refine_* → ステージング
  • model_* → 中間データ
  • serve_* → マート

:

  • lake_sessions, refine_sales, model_revenue, serve_kpis

提案まとめ

以下のいずれかを選ぶことで、わかりやすく管理しやすい命名規則になります。

  1. カラーモデルbronze_, silver_, gold_) → 直感的で広く使われている。
  2. レイヤー番号l1_, l2_, l3_, l4_) → 明確なフローの順序。
  3. アクションベースingest_, clean_, prepare_, deliver_) → データ処理の意図がわかりやすい。

特定の命名規則がチームに合うか、好みを考慮して選んでください。

YuichiYuichi

CSVでエンコードできるものを探すスクリプト

https://qiita.com/shilabo/items/dcf4630a8f6f83fb9af5
エンコード設定リスト
https://docs.python.org/3/library/codecs.html#standard-encodings

import pandas as pd

# CSVファイルパス
file_path = "your_file.csv"

# 試すエンコードのリスト
encodings = [
    "cp932", 
    "euc_jp", 
    "euc_jis_2004", 
    "euc_jisx0213", 
    "euc_kr",
    "iso2022_jp",
    "iso2022_jp_1",
    "iso2022_jp_2",
    "iso2022_jp_2004",
    "iso2022_jp_3",
    "iso2022_jp_ext",
    "iso2022_kr",
    "shift_jis", 
    "shift_jis_2004", 
    "shift_jisx0213"
]

# エンコードを順番に試して読み込む
for encoding in encodings:
    try:
        df = pd.read_csv(file_path, encoding=encoding)
        print(f"成功: {encoding}")
        print(df.head())  # 読み取ったデータの最初の5行を表示
        break
    except Exception as e:
        print(f"失敗: {encoding} - {e}")

失敗: cp932 - 'cp932' codec can't decode byte 0x85 in position 187543: illegal multibyte sequence
失敗: euc_jp - 'euc_jp' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_jis_2004 - 'euc_jis_2004' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_jisx0213 - 'euc_jisx0213' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_kr - 'euc_kr' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp - 'iso2022_jp' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_1 - 'iso2022_jp_1' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_2 - 'iso2022_jp_2' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_2004 - 'iso2022_jp_2004' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_3 - 'iso2022_jp_3' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_ext - 'iso2022_jp_ext' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_kr - 'iso2022_kr' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: shift_jis - 'shift_jis' codec can't decode byte 0xf1 in position 52883: illegal multibyte sequence
失敗: shift_jis_2004 - 'shift_jis_2004' codec can't decode byte 0x87 in position 414311: illegal multibyte sequence
失敗: shift_jisx0213 - 'shift_jisx0213' codec can't decode byte 0x87 in position 414311: illegal multibyte sequence
YuichiYuichi

pip install streamlit
dlt pipeline csv_pipeline show

Pipeline with name csv_pipeline in working directory /~/.dlt/pipelines could not be restored: the pipeline was not found in /~/.dlt/pipelines/csv_pipeline.
Try command dlt pipeline csv_pipeline sync to restore the pipeline state from destination
NOTE: Please refer to our docs at 'https://dlthub.com/docs/reference/command-line-interface#dlt-pipeline' for further assistance.

どうやら.dltの直下にpipelinesフォルダ作ってそこに~.py入れとかないといけないらしい
harlequinよりこっちが全然いい

YuichiYuichi

行政が出している日本語カラムCSVを英訳してスネークケースにするのスクリプト

googletransは更新が頻繁に行われるため、バージョンによって動作が異なる場合があります。特に、4.0.0-rc1のリリース候補が安定して動作することが知られています。

pip install --upgrade googletrans==4.0.0-rc1

以下は、CSVのカラム名を英訳しスネークケースに変換するPythonスクリプトの例です。
リストに既存の英訳済みカラム名がある場合はそれを使い、リストにない場合は翻訳を追加してからカラム名を書き換えます。

import os
import pandas as pd
import yaml
from googletrans import Translator

# YAML 設定ファイルのパス
config_file = "config.yml"

# 初期設定の定義
default_config = {
    "translations": {}  # 空の翻訳リスト
}

# YAMLファイルの読み込みまたは作成
if os.path.exists(config_file):
    with open(config_file, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)
else:
    # ファイルがない場合は初期設定で作成
    config = default_config
    with open(config_file, "w", encoding="utf-8") as f:
        yaml.dump(config, f, allow_unicode=True)
    print(f"New config file created: {config_file}")

# 翻訳済みカラム名リストの取得
existing_translations = config.get("translations", {})

# 翻訳用関数
def translate_column_name(column_name, translator):
    # 既存の翻訳を確認
    if column_name in existing_translations:
        return existing_translations[column_name]

    # Google翻訳で英訳
    translated = translator.translate(column_name, src="ja", dest="en").text
    # スネークケースに変換
    snake_case_name = translated.lower().replace(" ", "_")
    
    # 新しい翻訳をリストに追加
    existing_translations[column_name] = snake_case_name
    return snake_case_name

# CSVファイルの処理関数
def process_csv_file(input_path, output_path, translator):
    print(f"Processing file: {input_path}")
    df = pd.read_csv(input_path)
    
    # カラム名を翻訳して変更
    df.columns = [translate_column_name(col, translator) for col in df.columns]
    
    # 処理済みファイルの保存
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.to_csv(output_path, index=False)
    print(f"Translated CSV saved to {output_path}")

# 入出力フォルダの設定
input_folder = "csv"
output_folder = "csv_translated"

# 翻訳インスタンスを作成
translator = Translator()

# 入力フォルダ内のすべてのCSVファイルを検索
csv_files = [f for f in os.listdir(input_folder) if f.endswith(".csv")]

# 各CSVファイルを処理
for csv_file in csv_files:
    input_path = os.path.join(input_folder, csv_file)
    output_path = os.path.join(output_folder, csv_file)
    process_csv_file(input_path, output_path, translator)

# YAMLファイルを更新
config["translations"] = existing_translations
with open(config_file, "w", encoding="utf-8") as f:
    yaml.dump(config, f, allow_unicode=True)

# 完了メッセージ
print(f"Updated config file: {config_file}")

翻訳がイマイチだったら、手で修正して再実行

translations:
  名前: "name"
  住所: "address"
  年齢: "age"
  電話番号: "phone_number"
YuichiYuichi

Jupyter Notebooks

from google.cloud import bigquery
import dlt

def load_sql_data() -> None:
    """BigQuery からデータを取得し、dlt を使って DuckDB に保存する"""

    # BigQuery クライアントを初期化
    client = bigquery.Client()

    # クエリを定義
    query = """
        SELECT *
        FROM `〜〜`
    """

    # クエリを実行し、データを取得
    query_job = client.query(query)  # クエリジョブを送信
    rows = query_job.result()  # 結果を取得

    # dlt パイプラインを構築
    pipeline = dlt.pipeline(
        pipeline_name="〜〜",
        destination="duckdb",
        dataset_name="〜〜",
        dev_mode=False,
        # progress="log"
    )

    # 行データを辞書形式に変換してロード
    load_info = pipeline.run(
        map(lambda row: dict(row.items()), rows),
        table_name="〜〜",
    )

    # ロード情報を表示
    print(load_info)  # noqa: T201

if __name__ == "__main__":
    load_sql_data()

https://duckdb.org/docs/guides/python/jupyter.html
同じ階層のcsv_pipeline.duckdbに接続

import duckdb
import pandas as pd

%load_ext sql
conn = duckdb.connect('csv_pipeline.duckdb')
%sql conn --alias duckdb

%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False
%sql SHOW ALL TABLES
%sql SELECT * FROM information_schema.tables

https://www.salesanalytics.co.jp/datascience/datascience151/
次のマジックコマンドを利用し、Notebook上でJupySQLを使っていきます。

  • %sql: 1行のSQL命令文の実行
  • %%sql:複数行のSQL命令文の実行(セル内がSQL環境になる)
  • %sqlplot:グラフ表示

統計情報確認

%%sql result <<
SELECT 
    count(*) as Count,
    AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;
df = pd.DataFrame(result)
df.describe()
df.describe(include='all')

https://duckdb.org/docs/configuration/pragmas.html

PRAGMA storage_info('table_name');
CALL pragma_storage_info('table_name');

ydata-profilingでデータ探索

https://note.com/united_code/n/n0f83c76e4d2a

pip install ydata_profiling
import pandas as pd
from ydata_profiling import ProfileReport
profile = ProfileReport(df)
profile.to_notebook_iframe()
YuichiYuichi

https://dlthub.com/docs/general-usage/destination-tables#load-packages-and-load-ids
mydata.users

id name _dlt_id _dlt_load_id
1 Alice wX3f5vn801W16A 1234562350.98417
2 Bob rX8ybgTeEmAmmA 1234562350.98417

mydata.users__pets

id name type _dlt_id _dlt_parent_id _dlt_list_idx
1 Fluffy cat w1n0PEDzuP3grw wX3f5vn801W16A 0
2 Spot dog 9uxh36VU9lqKpw wX3f5vn801W16A 1
3 Fido dog pe3FVtCWz8VuNA rX8ybgTeEmAmmA 0

_dlt_id (行キー)

  • 各テーブル(ルートおよびネストされたテーブル)の行を一意に識別するための列。
  • この列はすべての行に含まれ、ユニークな値が設定されます。

_dlt_parent_id (親キー)

  • ネストされたテーブル内に存在し、特定の親テーブルの行(_dlt_id)を参照します。
  • ネストされたデータ構造における親子関係を表現するために使用されます。

_dlt_list_idx (リスト位置)

  • Pythonのリストからネストされたテーブルの行が作成される場合、リスト内の各アイテムの位置を示します。
  • ネストされたデータの順序を保持するために利用されます。

_dlt_root_id (ルートキー)

  • ネストされたテーブルが merge 書き込み設定でロードされる場合に追加される列。
  • ルートテーブル内の特定の行を参照し、ネストされたテーブルとルートテーブルを関連付けます。
  • ネストされたデータをルートデータと関連付けるために重要な役割を果たします。
YuichiYuichi

dlt.pipeline関数の引数

引数名 説明 デフォルト値
pipeline_name パイプラインの名前。トレースやモニタリングイベントで識別するために使用される。指定がない場合、実行中のPythonモジュールのファイル名から自動生成される。 モジュールのファイル名に基づく
destination データをロードする先の名前(例: bigquery, postgresql, duckdb など)。runメソッドで指定することも可能。 指定なし
dataset_name データをロードするデータセットの名前。データセットは、リレーショナルデータベースではスキーマ、他ではファイルフォルダのような論理グループとして扱われる。指定がない場合は{pipeline_name}_datasetとなる。 {pipeline_name}_datasetまたは空
progress progress="log":進行状況情報をログ、コンソール、またはテキスト ストリームにダンプします。本番環境では、オプションでメモリと CPU 使用率の統計情報を追加すると最も便利です。
pipelines_dir 作業フォルダーを特定の場所に保存するように引数を設定します ~/.dlt/pipelines/<pipeline_name>
dev_mode パイプラインが毎回状態をリセットし、新しいデータセットにデータをロードするようにしたい場合は、メソッドdev_modeの引数をdlt.pipelineTrue に設定、データセットの後ろにタイムスタンプがつく True
refresh ソースの一部またはすべてをリセットできます。つまり、パイプラインを実行すると、処理中のソース/リソースの状態がリセットされ、使用されている更新モードに応じて、テーブルが削除または切り捨てられます。
drop_sources スキーマにリストされているすべてのテーブルが削除され、それらのソースおよびすべてのリソースに属する状態が完全に消去されます。テーブルは、パイプラインのスキーマと宛先データベースの両方から削除されます。
drop_resources これらのリソースに属するテーブルは削除され、リソースの状態はワイプされます(インクリメンタルな状態も含まれます)。テーブルは、パイプラインのスキーマとデスティネーション・データベースの両方から削除されます。
drop_data drop_resourcesと同じですが、スキーマからテーブルを削除するのではなく、データのみを削除します(つまり、SQL目的地のTRUNCATE <table_name>で削除します)。選択されたリソースのリソース状態も消去されます。インクリメンタルリソースの場合、これはカーソル状態をリセットし、initial_valueからデータを完全にリロードします。

この場合、スキーマは変更されません。

runメソッドの引数

引数名 説明 デフォルト値
data ロードするデータ。dltのsourceresource、ジェネレータ関数、またはIteratorIterable(例: リストやmap関数の結果)を指定可能。 必須引数
write_disposition データの書き込み動作を制御する方法。以下のいずれかを指定可能:
- append: 常に新しいデータをテーブルの末尾に追加する。
- replace: 既存のデータを新しいデータで置き換える。
- skip: データのロードをスキップする。
- merge: 主キーやマージキーの指定に基づき、データを重複排除してマージする。
"append"
table_name テーブル名を指定する必要がある場合に使用。例えば、リソースやジェネレータ関数からテーブル名を推測できない場合など。 指定なし
YuichiYuichi

https://duckdb.org/docs/guides/meta/explain_analyze.html

EXPLAIN ANALYZE SELECT * FROM tbl;
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││        Total Time: 0.0008s        ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌───────────────────────────┐
│      EXPLAIN_ANALYZE      │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             0             │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         PROJECTION        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            name           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             2             │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         HASH_JOIN         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           INNER           │
│         sid = sid         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ├──────────────┐
│           EC: 1           │              │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │              │
│             2             │              │
│          (0.00s)          │              │
└─────────────┬─────────────┘              │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│         SEQ_SCAN          ││           FILTER          │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           exams           ││     prefix(name, 'Ma')    │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            sid            ││           EC: 1           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           EC: 3           ││             2             │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││          (0.00s)          │
│             3             ││                           │
│          (0.00s)          ││                           │
└───────────────────────────┘└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         SEQ_SCAN          │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│          students         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            sid            │
│            name           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│ Filters: name>=Ma AND name│
│  <Mb AND name IS NOT NULL │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           EC: 1           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             2             │
│          (0.00s)          │
└───────────────────────────┘
YuichiYuichi

duckdbにBQの方言でクエリしたい

%%bq_duckdbっていうマジックコマンド自作した

sqlglotの対応次第で、いろんなクエリの方言でdcukbdにクエリできるぜ

import duckdb
import pandas as pd
import sqlglot
from IPython.display import HTML, display

%load_ext sql
conn = duckdb.connect('~~~')
%sql conn --alias duckdb

# pandas の表示設定:最大表示行数を設定(例えば 10 行)
pd.set_option('display.max_rows', 10)  # 最大行数を100に設定

# BigQuery SQL を DuckDB 用に変換するヘルパー関数
def convert_bq_to_duckdb(sql):
    try:
        return sqlglot.transpile(sql, read="bigquery", write="duckdb", pretty=True)[0]
    except Exception as e:
        print(f"Error in SQL conversion: {e}")
        return sql

# 変換した SQL を表示する関数(折りたたみ対応)
def display_converted_sql(sql):
    html_content = f"""
    <details>
        <summary>Converted SQL (Click to expand)</summary>
        <pre>{sql}</pre>
    </details>
    """
    display(HTML(html_content))

# 変換した SQL を実行するためのカスタムマジック
from IPython.core.magic import register_cell_magic

@register_cell_magic
def bq_duckdb(line, cell):
    # 変数名を取得
    var_name = line.strip()
    
    # BigQuery SQL を DuckDB SQL に変換
    converted_sql = convert_bq_to_duckdb(cell)
    display_converted_sql(converted_sql)

    # 変換後の SQL を実行
    result = conn.execute(converted_sql).fetchdf()

    if not var_name:
        return result
    else:
        # 結果を指定した変数に格納
        globals()[var_name] = result
        print(f'Store the result in a DataFrame : {var_name}')
%%bq_duckdb
SELECT 
    count(*) as Count,
    AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;
%%bq_duckdb
SELECT 
    count(*) as Count,
    AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;

結果をDataFrameに格納する、この場合はresult

%%bq_duckdb result
SELECT 
    count(*) as Count,
    AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;