🐙

dgで始める新dagster

に公開

はじめに

dagster もウォッチしてまして、ETL pipeline tutorial もやってみたのですが、ある日、ふと見ると見慣れないものが…

https://docs.dagster.io/guides/labs/components/components-etl-pipeline-tutorial

ということで、こちらをやっていきます。

やってみた

事前準備

uv は、ここ見て、インストールしましょう。(別目的でインストール済み)

$ curl -LsSf https://astral.sh/uv/install.sh | sh

dg は、ここ見て、インストールしましょう。

$ uv --version
uv 0.6.10
$ uv tool install dagster-dg
Resolved 43 packages in 645ms
Prepared 13 packages in 43ms
Installed 43 packages in 60ms
 + annotated-types==0.7.0
:
 + yaspin==3.1.0
Installed 1 executable: dg
$ dg --version
dg, version 0.26.9

duckdb は、ここ見て、インストールしましょう。(別目的でインストール済み)

$ curl https://install.duckdb.org | sh

プロジェクトを作る

$ dg scaffold project jaffle-platform
Creating a Dagster project at /home/ec2-user/work/mds/jaffle-platform.
Scaffolded files for Dagster project at /home/ec2-user/work/mds/jaffle-platform.
Using CPython 3.12.9
Creating virtual environment at: .venv
Resolved 74 packages in 273ms
      Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 38 packages in 599ms
Installed 68 packages in 84ms
 + alembic==1.15.2
:
 + yarl==1.19.0
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components

Slingコンポーネント型を環境に追加する

Sling は、こちらですね。dagster とは仲良しのようです。

$ cd jaffle-platform
$ dg list component-type
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type                                              ┃ Summary                                                                 ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent                     │ An arbitrary set of dagster definitions.                                │
│ dagster.components.DefsFolderComponent                      │ A folder containing multiple submodules.                                │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's                 │
│                                                             │ PipesSubprocessClient.                                                  │
└─────────────────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────┘
$ uv add dagster-sling
Resolved 81 packages in 1.25s
      Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 4 packages in 1.13s
Uninstalled 1 package in 0.18ms
Installed 4 packages in 5ms
 + dagster-sling==0.26.9
 ~ jaffle-platform==0.1.0 (from file:///home/ec2-user/work/mds/jaffle-platform)
 + sling==1.4.4
 + sling-linux-amd64==1.4.4
$ dg list component-type
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type                                              ┃ Summary                                                                 ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent                     │ An arbitrary set of dagster definitions.                                │
│ dagster.components.DefsFolderComponent                      │ A folder containing multiple submodules.                                │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's                 │
│                                                             │ PipesSubprocessClient.                                                  │
│ dagster_sling.SlingReplicationCollectionComponent           │ Expose one or more Sling replications to Dagster as assets.             │
└─────────────────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────

Slingコンポーネントのインスタンスを作る

$ dg scaffold 'dagster_sling.SlingReplicationCollectionComponent' ingest_files
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── jaffle_platform_tests
│   └── __init__.py
├── pyproject.toml
├── src
│   └── jaffle_platform
│       ├── __init__.py
│       ├── __pycache__
│       │   └── __init__.cpython-312.pyc
│       ├── definitions.py
│       ├── defs
│       │   ├── __init__.py
│       │   └── ingest_files
│       │       ├── component.yaml
│       │       └── replication.yaml
│       └── lib
│           ├── __init__.py
│           └── __pycache__
│               └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent

attributes:
  replications:
  - path: replication.yaml

DuckDB を設定する

$ uv run sling conns set DUCKDB type=duckdb instance=/tmp/jaffle_platform.duckdb
6:41AM INF connection `DUCKDB` has been set in /home/ec2-user/.sling/env.yaml. Please test with `sling conns test DUCKDB`
$ uv run sling conns test DUCKDB
6:42AM INF downloading duckdb 1.1.3 for linux/amd64
6:42AM INF success!

Slingソースのファイルをダウンロード

$ curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1302  100  1302    0     0   4387      0 --:--:-- --:--:-- --:--:--  4398
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2723  100  2723    0     0   8544      0 --:--:-- --:--:-- --:--:--  8562
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2560  100  2560    0     0   8374      0 --:--:-- --:--:-- --:--:--  8366
src/jaffle_platform/defs/ingest_files/replication.yaml
source: LOCAL
target: DUCKDB

defaults:
  mode: full-refresh
  object: "{stream_table}"

streams:
  file://raw_customers.csv:
    object: "main.raw_customers"
  file://raw_orders.csv:
    object: "main.raw_orders"
  file://raw_payments.csv:
    object: "main.raw_payments"
src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent

attributes:
  sling:
    connections:
      - name: DUCKDB
        type: duckdb
        instance: /tmp/jaffle_platform.duckdb
  replications:
    - path: replication.yaml

UIでアセットの表示とマテリアライズ

$ dg dev
All components validated successfully.
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster
2025-04-08 06:57:04 +0000 - dagster - INFO - Using temporary directory /home/ec2-user/work/mds/jaffle-platform/.tmp_dagster_home_icbv7wsl for storage. This will be removed when dagster dev exits.
2025-04-08 06:57:04 +0000 - dagster - INFO - To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
2025-04-08 06:57:05 +0000 - dagster - INFO - Launching Dagster services...
2025-04-08 06:57:08 +0000 - dagster.daemon - INFO - Instance is configured with the following daemons: ['AssetDaemon', 'BackfillDaemon', 'QueuedRunCoordinatorDaemon', 'SchedulerDaemon', 'SensorDaemon']
2025-04-08 06:57:08 +0000 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 56538
2025-04-08 06:57:23 +0000 - dagster.daemon.QueuedRunCoordinatorDaemon - INFO - Priority sorting and checking tag concurrency limits for queued runs.
2025-04-08 06:57:24 +0000 - dagster.daemon.QueuedRunCoordinatorDaemon - INFO - Launched 1 runs.
$ ~/.duckdb/cli/latest/duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM raw_customers LIMIT 5;"
┌───────┬────────────┬───────────┬──────────────────┐
│  id   │ first_name │ last_name │ _sling_loaded_at │
│ int32 │  varchar   │  varchar  │      int64       │
├───────┼────────────┼───────────┼──────────────────┤
│     1 │ Michael    │ P.        │       1744095445 │
│     2 │ Shawn      │ M.        │       1744095445 │
│     3 │ Kathleen   │ P.        │       1744095445 │
│     4 │ Jimmy      │ C.        │       1744095445 │
│     5 │ Katherine  │ R.        │       1744095445 │
└───────┴────────────┴───────────┴──────────────────┘

サンプルdbtプロジェクトのコピー

$ git clone --depth=1 https://github.com/dagster-io/jaffle-platform.git dbt && rm -rf dbt/.git
Cloning into 'dbt'...
remote: Enumerating objects: 15, done.
remote: Counting objects: 100% (15/15), done.
remote: Compressing objects: 100% (11/11), done.
Receiving objects: 100% (15/15), done.
remote: Total 15 (delta 2), reused 15 (delta 2), pack-reused 0 (from 0)
Resolving deltas: 100% (2/2), done.
$ tree
.
├── dbt
│   └── jdbt
│       ├── README.md
│       ├── dbt_project.yml
│       ├── models
│       │   ├── customers.sql
│       │   ├── orders.sql
│       │   ├── sources.yml
│       │   └── stg
│       │       ├── stg_customers.sql
│       │       ├── stg_orders.sql
│       │       └── stg_payments.sql
│       └── profiles.yml
├── jaffle_platform_tests
│   └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│   └── jaffle_platform
│       ├── __init__.py
│       ├── __pycache__
│       │   ├── __init__.cpython-312.pyc
│       │   └── definitions.cpython-312.pyc
│       ├── definitions.py
│       ├── defs
│       │   ├── __init__.py
│       │   ├── __pycache__
│       │   │   └── __init__.cpython-312.pyc
│       │   └── ingest_files
│       │       ├── component.yaml
│       │       └── replication.yaml
│       └── lib
│           ├── __init__.py
│           └── __pycache__
│               └── __init__.cpython-312.pyc
└── uv.lock

dbtプロジェクトコンポーネントのインストール

$ uv add dagster-dbt dbt-duckdb
Resolved 120 packages in 802ms
      Built jaffle-platform @ file:///home/ec2-user/work/mds/jaffle-platform
Prepared 7 packages in 219ms
Uninstalled 1 package in 0.15ms
Installed 40 packages in 76ms
 + agate==1.9.1
:
 + zipp==3.21.0
$ dg list component-type
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type                                              ┃ Summary                                                                        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster.components.DefinitionsComponent                     │ An arbitrary set of dagster definitions.                                       │
│ dagster.components.DefsFolderComponent                      │ A folder containing multiple submodules.                                       │
│ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap Python scripts executed with Dagster's PipesSubprocessClient. │
│ dagster_dbt.DbtProjectComponent                             │ Expose a DBT project to Dagster as a set of assets.                            │
│ dagster_sling.SlingReplicationCollectionComponent           │ Expose one or more Sling replications to Dagster as assets.                    │
└─────────────────────────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

dbtコンポーネントのインスタンスを構築

$ dg scaffold dagster_dbt.DbtProjectComponent jdbt --project-path dbt/jdbt
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── dbt
│   └── jdbt
│       ├── README.md
│       ├── dbt_project.yml
│       ├── models
│       │   ├── customers.sql
│       │   ├── orders.sql
│       │   ├── sources.yml
│       │   └── stg
│       │       ├── stg_customers.sql
│       │       ├── stg_orders.sql
│       │       └── stg_payments.sql
│       └── profiles.yml
├── jaffle_platform_tests
│   └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│   └── jaffle_platform
│       ├── __init__.py
│       ├── __pycache__
│       │   ├── __init__.cpython-312.pyc
│       │   └── definitions.cpython-312.pyc
│       ├── definitions.py
│       ├── defs
│       │   ├── __init__.py
│       │   ├── __pycache__
│       │   │   └── __init__.cpython-312.pyc
│       │   ├── ingest_files
│       │   │   ├── component.yaml
│       │   │   └── replication.yaml
│       │   └── jdbt
│       │       └── component.yaml
│       └── lib
│           ├── __init__.py
│           └── __pycache__
│               └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
  dbt:
    project_dir: ../../../../dbt/jdbt

dbtプロジェクトコンポーネントの構成を更新

src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
  dbt:
    project_dir: ../../../../dbt/jdbt
  asset_attributes:
    key: "target/main/{{ node.name }}"
$ dg check yaml
All components validated successfully.
$ dg dev

UI から Materialize all します。

$ duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM orders LIMIT 5;"
┌──────────┬─────────────┬────────────┬───────────┬────────────────────┬───────────────┬──────────────────────┬──────────────────┬────────┐
│ order_id │ customer_id │ order_date │  status   │ credit_card_amount │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │
│  int32   │    int32    │    date    │  varchar  │       double       │    double     │        double        │      double      │ double │
├──────────┼─────────────┼────────────┼───────────┼────────────────────┼───────────────┼──────────────────────┼──────────────────┼────────┤
│        1 │           1 │ 2018-01-01 │ returned  │               10.0 │           0.0 │                  0.0 │              0.0 │   10.0 │
│        2 │           3 │ 2018-01-02 │ completed │               20.0 │           0.0 │                  0.0 │              0.0 │   20.0 │
│        3 │          94 │ 2018-01-04 │ completed │                0.0 │           1.0 │                  0.0 │              0.0 │    1.0 │
│        4 │          50 │ 2018-01-05 │ completed │                0.0 │          25.0 │                  0.0 │              0.0 │   25.0 │
│        5 │          64 │ 2018-01-05 │ completed │                0.0 │           0.0 │                 17.0 │              0.0 │   17.0 │
└──────────┴─────────────┴────────────┴───────────┴────────────────────┴───────────────┴──────────────────────┴──────────────────┴────────┘

パイプラインを自動化する

$ dg scaffold dagster.schedule daily_jaffle.py
Using /home/ec2-user/work/mds/jaffle-platform/.venv/bin/dagster-components
$ tree
.
├── dbt
│   └── jdbt
│       ├── README.md
│       ├── dbt_project.yml
│       ├── models
│       │   ├── customers.sql
│       │   ├── orders.sql
│       │   ├── sources.yml
│       │   └── stg
│       │       ├── stg_customers.sql
│       │       ├── stg_orders.sql
│       │       └── stg_payments.sql
│       ├── profiles.yml
│       └── target
│           ├── dbt.log
│           ├── jdbt-8ef979d-67083e8
│           │   ├── compiled
│           │   │   └── jdbt
│           │   │       └── models
│           │   │           ├── customers.sql
│           │   │           ├── orders.sql
│           │   │           └── stg
│           │   │               ├── stg_customers.sql
│           │   │               ├── stg_orders.sql
│           │   │               └── stg_payments.sql
│           │   ├── dbt.log
│           │   ├── graph.gpickle
│           │   ├── graph_summary.json
│           │   ├── manifest.json
│           │   ├── partial_parse.msgpack
│           │   ├── run
│           │   │   └── jdbt
│           │   │       └── models
│           │   │           ├── customers.sql
│           │   │           ├── orders.sql
│           │   │           └── stg
│           │   │               ├── stg_customers.sql
│           │   │               ├── stg_orders.sql
│           │   │               └── stg_payments.sql
│           │   ├── run_results.json
│           │   └── semantic_manifest.json
│           ├── manifest.concurrent-update-lock
│           ├── manifest.json
│           ├── partial_parse.msgpack
│           ├── perf_info.json
│           └── semantic_manifest.json
├── jaffle_platform_tests
│   └── __init__.py
├── pyproject.toml
├── raw_customers.csv
├── raw_orders.csv
├── raw_payments.csv
├── src
│   └── jaffle_platform
│       ├── __init__.py
│       ├── __pycache__
│       │   ├── __init__.cpython-312.pyc
│       │   └── definitions.cpython-312.pyc
│       ├── definitions.py
│       ├── defs
│       │   ├── __init__.py
│       │   ├── __pycache__
│       │   │   └── __init__.cpython-312.pyc
│       │   ├── daily_jaffle.py
│       │   ├── ingest_files
│       │   │   ├── component.yaml
│       │   │   └── replication.yaml
│       │   └── jdbt
│       │       └── component.yaml
│       └── lib
│           ├── __init__.py
│           └── __pycache__
│               └── __init__.cpython-312.pyc
└── uv.lock
src/jaffle_platform/defs/daily_jaffle.py
import dagster as dg


@dg.schedule(cron_schedule="@daily", target="*")
def daily_jaffle(context: dg.ScheduleEvaluationContext):
    return dg.RunRequest()

おわりに

新しい dg cli と新しいプロジェクト構成は、いかがでしたか?

component.yaml から project_dir などで参照できれば、uv のワークスペース機能も活用して、dlt や dbt の単体の機能も活かしつつ、全体の統合もできそうな気がしますね。

GitHubで編集を提案
株式会社ROBONの技術ブログ

Discussion