🐙
dgで始める新dagster
はじめに
dagster もウォッチしてまして、ETL pipeline tutorial もやってみたのですが、ある日、ふと見ると見慣れないものが…
ということで、こちらをやっていきます。
やってみた
事前準備
uv は、ここ見て、インストールしましょう。(別目的でインストール済み)
$ curl -LsSf https://astral.sh/uv/install.sh | sh
dg は、ここ見て、インストールしましょう。
$ uv --version
uv 0.6.10
$ uv tool install dagster-dg
Resolved 43 packages in 645ms
Prepared 13 packages in 43ms
Installed 43 packages in 60ms
+ annotated-types==0.7.0
:
+ yaspin==3.1.0
Installed 1 executable: dg
$ dg --version
dg, version 0.26.9
duckdb は、ここ見て、インストールしましょう。(別目的でインストール済み)
$ curl https://install.duckdb.org | sh
プロジェクトを作る
$ dg scaffold project jaffle-platform
Creating a Dagster project at /home/ec2-user/work/mds/jaffle-platform.
Scaffolded files for Dagster project at /home/ec2-user/work/mds/jaffle-platform.
Using CPython 3.12.9
Creating virtual environment at: .venv
Resolved 74 packages in 273ms
Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 38 packages in 599ms
Installed 68 packages in 84ms
+ alembic==1.15.2
:
+ yarl==1.19.0
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
Slingコンポーネント型を環境に追加する
Sling は、こちらですね。dagster とは仲良しのようです。
$ cd jaffle-platform
$ dg list component-type
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent │ An arbitrary set of dagster definitions. │
│ dagster.components.DefsFolderComponent │ A folder containing multiple submodules. │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's │
│ │ PipesSubprocessClient. │
└─────────────────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────┘
$ uv add dagster-sling
Resolved 81 packages in 1.25s
Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 4 packages in 1.13s
Uninstalled 1 package in 0.18ms
Installed 4 packages in 5ms
+ dagster-sling==0.26.9
~ jaffle-platform==0.1.0 (from file:///home/ec2-user/work/mds/jaffle-platform)
+ sling==1.4.4
+ sling-linux-amd64==1.4.4
$ dg list component-type
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent │ An arbitrary set of dagster definitions. │
│ dagster.components.DefsFolderComponent │ A folder containing multiple submodules. │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's │
│ │ PipesSubprocessClient. │
│ dagster_sling.SlingReplicationCollectionComponent │ Expose one or more Sling replications to Dagster as assets. │
└─────────────────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────
Slingコンポーネントのインスタンスを作る
$ dg scaffold 'dagster_sling.SlingReplicationCollectionComponent' ingest_files
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── jaffle_platform_tests
│ └── __init__.py
├── pyproject.toml
├── src
│ └── jaffle_platform
│ ├── __init__.py
│ ├── __pycache__
│ │ └── __init__.cpython-312.pyc
│ ├── definitions.py
│ ├── defs
│ │ ├── __init__.py
│ │ └── ingest_files
│ │ ├── component.yaml
│ │ └── replication.yaml
│ └── lib
│ ├── __init__.py
│ └── __pycache__
│ └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent
attributes:
replications:
- path: replication.yaml
DuckDB を設定する
$ uv run sling conns set DUCKDB type=duckdb instance=/tmp/jaffle_platform.duckdb
6:41AM INF connection `DUCKDB` has been set in /home/ec2-user/.sling/env.yaml. Please test with `sling conns test DUCKDB`
$ uv run sling conns test DUCKDB
6:42AM INF downloading duckdb 1.1.3 for linux/amd64
6:42AM INF success!
Slingソースのファイルをダウンロード
$ curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1302 100 1302 0 0 4387 0 --:--:-- --:--:-- --:--:-- 4398
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2723 100 2723 0 0 8544 0 --:--:-- --:--:-- --:--:-- 8562
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2560 100 2560 0 0 8374 0 --:--:-- --:--:-- --:--:-- 8366
src/jaffle_platform/defs/ingest_files/replication.yaml
source: LOCAL
target: DUCKDB
defaults:
mode: full-refresh
object: "{stream_table}"
streams:
file://raw_customers.csv:
object: "main.raw_customers"
file://raw_orders.csv:
object: "main.raw_orders"
file://raw_payments.csv:
object: "main.raw_payments"
src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent
attributes:
sling:
connections:
- name: DUCKDB
type: duckdb
instance: /tmp/jaffle_platform.duckdb
replications:
- path: replication.yaml
UIでアセットの表示とマテリアライズ
$ dg dev
All components validated successfully.
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster
2025-04-08 06:57:04 +0000 - dagster - INFO - Using temporary directory /home/ec2-user/work/mds/jaffle-platform/.tmp_dagster_home_icbv7wsl for storage. This will be removed when dagster dev exits.
2025-04-08 06:57:04 +0000 - dagster - INFO - To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
2025-04-08 06:57:05 +0000 - dagster - INFO - Launching Dagster services...
2025-04-08 06:57:08 +0000 - dagster.daemon - INFO - Instance is configured with the following daemons: ['AssetDaemon', 'BackfillDaemon', 'QueuedRunCoordinatorDaemon', 'SchedulerDaemon', 'SensorDaemon']
2025-04-08 06:57:08 +0000 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 56538
2025-04-08 06:57:23 +0000 - dagster.daemon.QueuedRunCoordinatorDaemon - INFO - Priority sorting and checking tag concurrency limits for queued runs.
2025-04-08 06:57:24 +0000 - dagster.daemon.QueuedRunCoordinatorDaemon - INFO - Launched 1 runs.
$ ~/.duckdb/cli/latest/duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM raw_customers LIMIT 5;"
┌───────┬────────────┬───────────┬──────────────────┐
│ id │ first_name │ last_name │ _sling_loaded_at │
│ int32 │ varchar │ varchar │ int64 │
├───────┼────────────┼───────────┼──────────────────┤
│ 1 │ Michael │ P. │ 1744095445 │
│ 2 │ Shawn │ M. │ 1744095445 │
│ 3 │ Kathleen │ P. │ 1744095445 │
│ 4 │ Jimmy │ C. │ 1744095445 │
│ 5 │ Katherine │ R. │ 1744095445 │
└───────┴────────────┴───────────┴──────────────────┘
サンプルdbtプロジェクトのコピー
$ git clone --depth=1 https://github.com/dagster-io/jaffle-platform.git dbt && rm -rf dbt/.git
Cloning into 'dbt'...
remote: Enumerating objects: 15, done.
remote: Counting objects: 100% (15/15), done.
remote: Compressing objects: 100% (11/11), done.
Receiving objects: 100% (15/15), done.
remote: Total 15 (delta 2), reused 15 (delta 2), pack-reused 0 (from 0)
Resolving deltas: 100% (2/2), done.
$ tree
.
├── dbt
│ └── jdbt
│ ├── README.md
│ ├── dbt_project.yml
│ ├── models
│ │ ├── customers.sql
│ │ ├── orders.sql
│ │ ├── sources.yml
│ │ └── stg
│ │ ├── stg_customers.sql
│ │ ├── stg_orders.sql
│ │ └── stg_payments.sql
│ └── profiles.yml
├── jaffle_platform_tests
│ └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│ └── jaffle_platform
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-312.pyc
│ │ └── definitions.cpython-312.pyc
│ ├── definitions.py
│ ├── defs
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ └── __init__.cpython-312.pyc
│ │ └── ingest_files
│ │ ├── component.yaml
│ │ └── replication.yaml
│ └── lib
│ ├── __init__.py
│ └── __pycache__
│ └── __init__.cpython-312.pyc
└── uv.lock
dbtプロジェクトコンポーネントのインストール
$ uv add dagster-dbt dbt-duckdb
Resolved 120 packages in 802ms
Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 7 packages in 219ms
Uninstalled 1 package in 0.15ms
Installed 40 packages in 76ms
+ agate==1.9.1
:
+ zipp==3.21.0
$ dg list component-type
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent │ An arbitrary set of dagster definitions. │
│ dagster.components.DefsFolderComponent │ A folder containing multiple submodules. │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's PipesSubprocessClient. │
│ dagster_dbt.DbtProjectComponent │ Expose a DBT project to Dagster as a set of assets. │
│ dagster_sling.SlingReplicationCollectionComponent │ Expose one or more Sling replications to Dagster as assets. │
└─────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘
dbtコンポーネントのインスタンスを構築
$ dg scaffold dagster_dbt.DbtProjectComponent jdbt --project-path dbt/jdbt
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── dbt
│ └── jdbt
│ ├── README.md
│ ├── dbt_project.yml
│ ├── models
│ │ ├── customers.sql
│ │ ├── orders.sql
│ │ ├── sources.yml
│ │ └── stg
│ │ ├── stg_customers.sql
│ │ ├── stg_orders.sql
│ │ └── stg_payments.sql
│ └── profiles.yml
├── jaffle_platform_tests
│ └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│ └── jaffle_platform
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-312.pyc
│ │ └── definitions.cpython-312.pyc
│ ├── definitions.py
│ ├── defs
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ └── __init__.cpython-312.pyc
│ │ ├── ingest_files
│ │ │ ├── component.yaml
│ │ │ └── replication.yaml
│ │ └── jdbt
│ │ └── component.yaml
│ └── lib
│ ├── __init__.py
│ └── __pycache__
│ └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
dbt:
project_dir: ../../../../dbt/jdbt
dbtプロジェクトコンポーネントの構成を更新
src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
dbt:
project_dir: ../../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}"
$ dg check yaml
All components validated successfully.
$ dg dev
UI から Materialize all
します。
$ duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM orders LIMIT 5;"
┌──────────┬─────────────┬────────────┬───────────┬────────────────────┬───────────────┬──────────────────────┬──────────────────┬────────┐
│ order_id │ customer_id │ order_date │ status │ credit_card_amount │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │
│ int32 │ int32 │ date │ varchar │ double │ double │ double │ double │ double │
├──────────┼─────────────┼────────────┼───────────┼────────────────────┼───────────────┼──────────────────────┼──────────────────┼────────┤
│ 1 │ 1 │ 2018-01-01 │ returned │ 10.0 │ 0.0 │ 0.0 │ 0.0 │ 10.0 │
│ 2 │ 3 │ 2018-01-02 │ completed │ 20.0 │ 0.0 │ 0.0 │ 0.0 │ 20.0 │
│ 3 │ 94 │ 2018-01-04 │ completed │ 0.0 │ 1.0 │ 0.0 │ 0.0 │ 1.0 │
│ 4 │ 50 │ 2018-01-05 │ completed │ 0.0 │ 25.0 │ 0.0 │ 0.0 │ 25.0 │
│ 5 │ 64 │ 2018-01-05 │ completed │ 0.0 │ 0.0 │ 17.0 │ 0.0 │ 17.0 │
└──────────┴─────────────┴────────────┴───────────┴────────────────────┴───────────────┴──────────────────────┴──────────────────┴────────┘
パイプラインを自動化する
$ dg scaffold dagster.schedule daily_jaffle.py
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── dbt
│ └── jdbt
│ ├── README.md
│ ├── dbt_project.yml
│ ├── models
│ │ ├── customers.sql
│ │ ├── orders.sql
│ │ ├── sources.yml
│ │ └── stg
│ │ ├── stg_customers.sql
│ │ ├── stg_orders.sql
│ │ └── stg_payments.sql
│ ├── profiles.yml
│ └── target
│ ├── dbt.log
│ ├── jdbt-8ef979d-67083e8
│ │ ├── compiled
│ │ │ └── jdbt
│ │ │ └── models
│ │ │ ├── customers.sql
│ │ │ ├── orders.sql
│ │ │ └── stg
│ │ │ ├── stg_customers.sql
│ │ │ ├── stg_orders.sql
│ │ │ └── stg_payments.sql
│ │ ├── dbt.log
│ │ ├── graph.gpickle
│ │ ├── graph_summary.json
│ │ ├── manifest.json
│ │ ├── partial_parse.msgpack
│ │ ├── run
│ │ │ └── jdbt
│ │ │ └── models
│ │ │ ├── customers.sql
│ │ │ ├── orders.sql
│ │ │ └── stg
│ │ │ ├── stg_customers.sql
│ │ │ ├── stg_orders.sql
│ │ │ └── stg_payments.sql
│ │ ├── run_results.json
│ │ └── semantic_manifest.json
│ ├── manifest.concurrent-update-lock
│ ├── manifest.json
│ ├── partial_parse.msgpack
│ ├── perf_info.json
│ └── semantic_manifest.json
├── jaffle_platform_tests
│ └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│ └── jaffle_platform
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-312.pyc
│ │ └── definitions.cpython-312.pyc
│ ├── definitions.py
│ ├── defs
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ └── __init__.cpython-312.pyc
│ │ ├── daily_jaffle.py
│ │ ├── ingest_files
│ │ │ ├── component.yaml
│ │ │ └── replication.yaml
│ │ └── jdbt
│ │ └── component.yaml
│ └── lib
│ ├── __init__.py
│ └── __pycache__
│ └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/daily_jaffle.py
import dagster as dg
@dg.schedule(cron_schedule="@daily", target="*")
def daily_jaffle(context: dg.ScheduleEvaluationContext):
return dg.RunRequest()
おわりに
新しい dg cli と新しいプロジェクト構成は、いかがでしたか?
component.yaml から project_dir などで参照できれば、uv のワークスペース機能も活用して、dlt や dbt の単体の機能も活かしつつ、全体の統合もできそうな気がしますね。
GitHubで編集を提案
Discussion