【dbt】DuckDB
mkdir dbt
cd dbt
python -m venv .venv
source .venv/bin/activate
python -m pip install dbt-core==1.8.4 dbt-duckdb==1.8.4
pip install --upgrade pip
pip install -r requirements.txt
pip freeze > requirements.txt
deactivate
brew install duckdb
mkdir db
dbt init
dbt_project
The profile dbt_project already exists in /Users/~/.dbt/profiles.yml. Continue and overwrite it? [y/N]: N
cd dbt_project
touch profiles.yml
mkdir db
touch db/dbt.duckdb
dbt_project:
target: dev
outputs:
dev:
type: duckdb
path: 'db/dbt.duckdb'
extensions:
- parquet
target: dev
dbt --version
dbt debug
pip install "dlt[duckdb]"
duckdb db/dbt.duckdb
dlt init filesystem duckdb
新しいパイプライン ファイルシステムをカスタマイズする準備ができました。
* filesystem_pipeline.py で dlt がデータをロードする方法を確認し、変更します
* ./.dlt/secrets.toml で duckdb およびその他のシークレットの認証情報を追加します
* requirements.txt に必要な依存関係を追加します:
dlt[duckdb,filesystem]>=1.5.0
dlt 依存関係がすでに追加されている場合は、duckdb の追加を必ずインストールしてください
pip を使用してインストールするには: pip3 install 'dlt[duckdb,filesystem]>=1.5.0'
* 詳細については、https://dlthub.com/docs/walkthroughs/create-a-pipeline を参照してください
source は resource をグルーピングしたものであり、resource ごとに table が作られると思っていい。
raw_ → データレイク(未加工データ)
stg_ → ステージング(加工済みデータ)
int_ → 中間データ(ロジック適用後のデータ)
mrt_ → マート(最終分析用データ)
1. シンプルなレイヤー名
役割が短い接頭辞で明確に区別できる命名規則。
-
raw_* → データレイク(未加工データ)
例:raw_sales
,raw_logs
-
stage_* → ステージング(加工・整形データ)
例:stage_customers
,stage_orders
-
transform_* → 中間データ(ビジネスロジック適用後のデータ)
例:transform_metrics
,transform_finance
-
final_* → マート(完成形データ)
例:final_kpis
,final_entities
2. カラーモデル命名
レイヤーを「ブロンズ→シルバー→ゴールド」のように色で表現する方法。
-
bronze_* → データレイク
例:bronze_transactions
,bronze_events
-
silver_* → ステージング
例:silver_customers
,silver_sessions
-
gold_* → 中間データまたはマート
例:gold_financials
,gold_marketing_kpis
-
platinum_* → 最終マート(特に重要な分析用データ)
例:platinum_insights
,platinum_strategic_kpis
3. アクションベース命名
データのライフサイクルに基づき、動詞を用いた接頭辞を付ける方法。
-
ingest_* → データレイク(取り込み段階)
例:ingest_logs
,ingest_raw_data
-
clean_* → ステージング(データ整形)
例:clean_orders
,clean_events
-
prepare_* → 中間データ(準備段階)
例:prepare_metrics
,prepare_entities
-
deliver_* → マート(最終データ配信)
例:deliver_kpis
,deliver_dashboard_data
4. レイヤー番号命名
レイヤーの順序を数値で示し、データフローのステップを明示する方法。
-
l1_* → データレイク(Layer 1: Raw Data)
例:l1_raw_events
,l1_raw_logs
-
l2_* → ステージング(Layer 2: Cleaned Data)
例:l2_stage_customers
,l2_stage_transactions
-
l3_* → 中間データ(Layer 3: Transformed Data)
例:l3_enriched_kpis
,l3_prepared_finance
-
l4_* → マート(Layer 4: Final Data)
例:l4_mart_kpis
,l4_dashboard_ready
5. 業務・目的別命名
データの用途や最終消費者に合わせて役割を明確化。
-
raw_* → データレイク
例:raw_finance
,raw_marketing
-
base_* → ステージング(基本構築データ)
例:base_customers
,base_products
-
logic_* → 中間データ(ロジック適用済み)
例:logic_sales
,logic_operations
-
output_* → マート(最終出力データ)
例:output_kpis
,output_analysis
6. 日本語ベースの命名
日本語チーム向けに親しみやすく、業務フローを意識した命名規則。
-
raw_データ → データレイク
例:raw_売上
,raw_ログ
-
整形_データ → ステージング
例:整形_顧客
,整形_注文
-
加工_データ → 中間データ
例:加工_指標
,加工_在庫
-
分析_データ → マート
例:分析_収益
,分析_KPI
7. その他のカスタム案
用途や好みに応じた独自の接頭辞を検討する方法。
- lake_* → データレイク
- refine_* → ステージング
- model_* → 中間データ
- serve_* → マート
例:
-
lake_sessions
,refine_sales
,model_revenue
,serve_kpis
提案まとめ
以下のいずれかを選ぶことで、わかりやすく管理しやすい命名規則になります。
-
カラーモデル(
bronze_
,silver_
,gold_
) → 直感的で広く使われている。 -
レイヤー番号(
l1_
,l2_
,l3_
,l4_
) → 明確なフローの順序。 -
アクションベース(
ingest_
,clean_
,prepare_
,deliver_
) → データ処理の意図がわかりやすい。
特定の命名規則がチームに合うか、好みを考慮して選んでください。
CSVでエンコードできるものを探すスクリプト
エンコード設定リスト
import pandas as pd
# CSVファイルパス
file_path = "your_file.csv"
# 試すエンコードのリスト
encodings = [
"cp932",
"euc_jp",
"euc_jis_2004",
"euc_jisx0213",
"euc_kr",
"iso2022_jp",
"iso2022_jp_1",
"iso2022_jp_2",
"iso2022_jp_2004",
"iso2022_jp_3",
"iso2022_jp_ext",
"iso2022_kr",
"shift_jis",
"shift_jis_2004",
"shift_jisx0213"
]
# エンコードを順番に試して読み込む
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
print(f"成功: {encoding}")
print(df.head()) # 読み取ったデータの最初の5行を表示
break
except Exception as e:
print(f"失敗: {encoding} - {e}")
謎
失敗: cp932 - 'cp932' codec can't decode byte 0x85 in position 187543: illegal multibyte sequence
失敗: euc_jp - 'euc_jp' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_jis_2004 - 'euc_jis_2004' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_jisx0213 - 'euc_jisx0213' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: euc_kr - 'euc_kr' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp - 'iso2022_jp' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_1 - 'iso2022_jp_1' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_2 - 'iso2022_jp_2' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_2004 - 'iso2022_jp_2004' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_3 - 'iso2022_jp_3' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_jp_ext - 'iso2022_jp_ext' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: iso2022_kr - 'iso2022_kr' codec can't decode byte 0x89 in position 0: illegal multibyte sequence
失敗: shift_jis - 'shift_jis' codec can't decode byte 0xf1 in position 52883: illegal multibyte sequence
失敗: shift_jis_2004 - 'shift_jis_2004' codec can't decode byte 0x87 in position 414311: illegal multibyte sequence
失敗: shift_jisx0213 - 'shift_jisx0213' codec can't decode byte 0x87 in position 414311: illegal multibyte sequence
pip install streamlit
dlt pipeline csv_pipeline show
Pipeline with name csv_pipeline in working directory /~/.dlt/pipelines could not be restored: the pipeline was not found in /~/.dlt/pipelines/csv_pipeline.
Try command dlt pipeline csv_pipeline sync to restore the pipeline state from destination
NOTE: Please refer to our docs at 'https://dlthub.com/docs/reference/command-line-interface#dlt-pipeline' for further assistance.
どうやら.dltの直下にpipelinesフォルダ作ってそこに~.py入れとかないといけないらしい
harlequinよりこっちが全然いい
行政が出している日本語カラムCSVを英訳してスネークケースにするのスクリプト
googletransは更新が頻繁に行われるため、バージョンによって動作が異なる場合があります。特に、4.0.0-rc1のリリース候補が安定して動作することが知られています。
pip install --upgrade googletrans==4.0.0-rc1
以下は、CSVのカラム名を英訳しスネークケースに変換するPythonスクリプトの例です。
リストに既存の英訳済みカラム名がある場合はそれを使い、リストにない場合は翻訳を追加してからカラム名を書き換えます。
import os
import pandas as pd
import yaml
from googletrans import Translator
# YAML 設定ファイルのパス
config_file = "config.yml"
# 初期設定の定義
default_config = {
"translations": {} # 空の翻訳リスト
}
# YAMLファイルの読み込みまたは作成
if os.path.exists(config_file):
with open(config_file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
else:
# ファイルがない場合は初期設定で作成
config = default_config
with open(config_file, "w", encoding="utf-8") as f:
yaml.dump(config, f, allow_unicode=True)
print(f"New config file created: {config_file}")
# 翻訳済みカラム名リストの取得
existing_translations = config.get("translations", {})
# 翻訳用関数
def translate_column_name(column_name, translator):
# 既存の翻訳を確認
if column_name in existing_translations:
return existing_translations[column_name]
# Google翻訳で英訳
translated = translator.translate(column_name, src="ja", dest="en").text
# スネークケースに変換
snake_case_name = translated.lower().replace(" ", "_")
# 新しい翻訳をリストに追加
existing_translations[column_name] = snake_case_name
return snake_case_name
# CSVファイルの処理関数
def process_csv_file(input_path, output_path, translator):
print(f"Processing file: {input_path}")
df = pd.read_csv(input_path)
# カラム名を翻訳して変更
df.columns = [translate_column_name(col, translator) for col in df.columns]
# 処理済みファイルの保存
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df.to_csv(output_path, index=False)
print(f"Translated CSV saved to {output_path}")
# 入出力フォルダの設定
input_folder = "csv"
output_folder = "csv_translated"
# 翻訳インスタンスを作成
translator = Translator()
# 入力フォルダ内のすべてのCSVファイルを検索
csv_files = [f for f in os.listdir(input_folder) if f.endswith(".csv")]
# 各CSVファイルを処理
for csv_file in csv_files:
input_path = os.path.join(input_folder, csv_file)
output_path = os.path.join(output_folder, csv_file)
process_csv_file(input_path, output_path, translator)
# YAMLファイルを更新
config["translations"] = existing_translations
with open(config_file, "w", encoding="utf-8") as f:
yaml.dump(config, f, allow_unicode=True)
# 完了メッセージ
print(f"Updated config file: {config_file}")
翻訳がイマイチだったら、手で修正して再実行
translations:
名前: "name"
住所: "address"
年齢: "age"
電話番号: "phone_number"
Jupyter Notebooks
from google.cloud import bigquery
import dlt
def load_sql_data() -> None:
"""BigQuery からデータを取得し、dlt を使って DuckDB に保存する"""
# BigQuery クライアントを初期化
client = bigquery.Client()
# クエリを定義
query = """
SELECT *
FROM `〜〜`
"""
# クエリを実行し、データを取得
query_job = client.query(query) # クエリジョブを送信
rows = query_job.result() # 結果を取得
# dlt パイプラインを構築
pipeline = dlt.pipeline(
pipeline_name="〜〜",
destination="duckdb",
dataset_name="〜〜",
dev_mode=False,
# progress="log"
)
# 行データを辞書形式に変換してロード
load_info = pipeline.run(
map(lambda row: dict(row.items()), rows),
table_name="〜〜",
)
# ロード情報を表示
print(load_info) # noqa: T201
if __name__ == "__main__":
load_sql_data()
同じ階層のcsv_pipeline.duckdbに接続
import duckdb
import pandas as pd
%load_ext sql
conn = duckdb.connect('csv_pipeline.duckdb')
%sql conn --alias duckdb
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False
%sql SHOW ALL TABLES
%sql SELECT * FROM information_schema.tables
次のマジックコマンドを利用し、Notebook上でJupySQLを使っていきます。
- %sql: 1行のSQL命令文の実行
- %%sql:複数行のSQL命令文の実行(セル内がSQL環境になる)
- %sqlplot:グラフ表示
統計情報確認
%%sql result <<
SELECT
count(*) as Count,
AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;
df = pd.DataFrame(result)
df.describe()
df.describe(include='all')
PRAGMA storage_info('table_name');
CALL pragma_storage_info('table_name');
ydata-profilingでデータ探索
pip install ydata_profiling
import pandas as pd
from ydata_profiling import ProfileReport
profile = ProfileReport(df)
profile.to_notebook_iframe()
mydata.users
id | name | _dlt_id | _dlt_load_id |
---|---|---|---|
1 | Alice | wX3f5vn801W16A | 1234562350.98417 |
2 | Bob | rX8ybgTeEmAmmA | 1234562350.98417 |
mydata.users__pets
id | name | type | _dlt_id | _dlt_parent_id | _dlt_list_idx |
---|---|---|---|---|---|
1 | Fluffy | cat | w1n0PEDzuP3grw | wX3f5vn801W16A | 0 |
2 | Spot | dog | 9uxh36VU9lqKpw | wX3f5vn801W16A | 1 |
3 | Fido | dog | pe3FVtCWz8VuNA | rX8ybgTeEmAmmA | 0 |
_dlt_id (行キー)
- 各テーブル(ルートおよびネストされたテーブル)の行を一意に識別するための列。
- この列はすべての行に含まれ、ユニークな値が設定されます。
_dlt_parent_id (親キー)
- ネストされたテーブル内に存在し、特定の親テーブルの行(_dlt_id)を参照します。
- ネストされたデータ構造における親子関係を表現するために使用されます。
_dlt_list_idx (リスト位置)
- Pythonのリストからネストされたテーブルの行が作成される場合、リスト内の各アイテムの位置を示します。
- ネストされたデータの順序を保持するために利用されます。
_dlt_root_id (ルートキー)
- ネストされたテーブルが
merge
書き込み設定でロードされる場合に追加される列。 - ルートテーブル内の特定の行を参照し、ネストされたテーブルとルートテーブルを関連付けます。
- ネストされたデータをルートデータと関連付けるために重要な役割を果たします。
dlt.pipeline
関数の引数
引数名 | 説明 | デフォルト値 |
---|---|---|
pipeline_name |
パイプラインの名前。トレースやモニタリングイベントで識別するために使用される。指定がない場合、実行中のPythonモジュールのファイル名から自動生成される。 | モジュールのファイル名に基づく |
destination |
データをロードする先の名前(例: bigquery , postgresql , duckdb など)。run メソッドで指定することも可能。 |
指定なし |
dataset_name |
データをロードするデータセットの名前。データセットは、リレーショナルデータベースではスキーマ、他ではファイルフォルダのような論理グループとして扱われる。指定がない場合は{pipeline_name}_dataset となる。 |
{pipeline_name}_dataset または空 |
progress | progress="log":進行状況情報をログ、コンソール、またはテキスト ストリームにダンプします。本番環境では、オプションでメモリと CPU 使用率の統計情報を追加すると最も便利です。 | |
pipelines_dir | 作業フォルダーを特定の場所に保存するように引数を設定します | ~/.dlt/pipelines/<pipeline_name> |
dev_mode | パイプラインが毎回状態をリセットし、新しいデータセットにデータをロードするようにしたい場合は、メソッドdev_mode の引数をdlt.pipeline True に設定、データセットの後ろにタイムスタンプがつく |
True |
refresh | ソースの一部またはすべてをリセットできます。つまり、パイプラインを実行すると、処理中のソース/リソースの状態がリセットされ、使用されている更新モードに応じて、テーブルが削除または切り捨てられます。 | |
drop_sources |
スキーマにリストされているすべてのテーブルが削除され、それらのソースおよびすべてのリソースに属する状態が完全に消去されます。テーブルは、パイプラインのスキーマと宛先データベースの両方から削除されます。 | |
drop_resources |
これらのリソースに属するテーブルは削除され、リソースの状態はワイプされます(インクリメンタルな状態も含まれます)。テーブルは、パイプラインのスキーマとデスティネーション・データベースの両方から削除されます。 | |
drop_data |
drop_resourcesと同じですが、スキーマからテーブルを削除するのではなく、データのみを削除します(つまり、SQL目的地のTRUNCATE <table_name>で削除します)。選択されたリソースのリソース状態も消去されます。インクリメンタルリソースの場合、これはカーソル状態をリセットし、initial_valueからデータを完全にリロードします。 この場合、スキーマは変更されません。 |
run
メソッドの引数
引数名 | 説明 | デフォルト値 |
---|---|---|
data |
ロードするデータ。dltのsource 、resource 、ジェネレータ関数、またはIterator やIterable (例: リストやmap 関数の結果)を指定可能。 |
必須引数 |
write_disposition |
データの書き込み動作を制御する方法。以下のいずれかを指定可能: - append : 常に新しいデータをテーブルの末尾に追加する。- replace : 既存のデータを新しいデータで置き換える。- skip : データのロードをスキップする。- merge : 主キーやマージキーの指定に基づき、データを重複排除してマージする。 |
"append" |
table_name |
テーブル名を指定する必要がある場合に使用。例えば、リソースやジェネレータ関数からテーブル名を推測できない場合など。 | 指定なし |
EXPLAIN ANALYZE SELECT * FROM tbl;
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Total Time: 0.0008s ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌───────────────────────────┐
│ EXPLAIN_ANALYZE │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 0 │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ name │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 2 │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ HASH_JOIN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ INNER │
│ sid = sid │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ├──────────────┐
│ EC: 1 │ │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
│ 2 │ │
│ (0.00s) │ │
└─────────────┬─────────────┘ │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ SEQ_SCAN ││ FILTER │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ││ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ exams ││ prefix(name, 'Ma') │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ││ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ sid ││ EC: 1 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ││ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 3 ││ 2 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ││ (0.00s) │
│ 3 ││ │
│ (0.00s) ││ │
└───────────────────────────┘└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ SEQ_SCAN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ students │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ sid │
│ name │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ Filters: name>=Ma AND name│
│ <Mb AND name IS NOT NULL │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 1 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 2 │
│ (0.00s) │
└───────────────────────────┘
duckdbにBQの方言でクエリしたい
%%bq_duckdbっていうマジックコマンド自作した
sqlglotの対応次第で、いろんなクエリの方言でdcukbdにクエリできるぜ
import duckdb
import pandas as pd
import sqlglot
from IPython.display import HTML, display
%load_ext sql
conn = duckdb.connect('~~~')
%sql conn --alias duckdb
# pandas の表示設定:最大表示行数を設定(例えば 10 行)
pd.set_option('display.max_rows', 10) # 最大行数を100に設定
# BigQuery SQL を DuckDB 用に変換するヘルパー関数
def convert_bq_to_duckdb(sql):
try:
return sqlglot.transpile(sql, read="bigquery", write="duckdb", pretty=True)[0]
except Exception as e:
print(f"Error in SQL conversion: {e}")
return sql
# 変換した SQL を表示する関数(折りたたみ対応)
def display_converted_sql(sql):
html_content = f"""
<details>
<summary>Converted SQL (Click to expand)</summary>
<pre>{sql}</pre>
</details>
"""
display(HTML(html_content))
# 変換した SQL を実行するためのカスタムマジック
from IPython.core.magic import register_cell_magic
@register_cell_magic
def bq_duckdb(line, cell):
# 変数名を取得
var_name = line.strip()
# BigQuery SQL を DuckDB SQL に変換
converted_sql = convert_bq_to_duckdb(cell)
display_converted_sql(converted_sql)
# 変換後の SQL を実行
result = conn.execute(converted_sql).fetchdf()
if not var_name:
return result
else:
# 結果を指定した変数に格納
globals()[var_name] = result
print(f'Store the result in a DataFrame : {var_name}')
%%bq_duckdb
SELECT
count(*) as Count,
AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;
%%bq_duckdb
SELECT
count(*) as Count,
AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;
結果をDataFrameに格納する、この場合はresult
%%bq_duckdb result
SELECT
count(*) as Count,
AIRLINE
FROM flights.csv
GROUP BY AIRLINE
ORDER BY Count DESC;