🐙

🐙🐙🐙オーケストレーションツールDagster入門してみた(dbt・DuckDB編)🦆🦆🦆

2022/12/12に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#11です。
🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙の続きで、Dagsterとdbt・DuckDBを連携させるチュートリアルを触ってみました。

tl;dr

  • DagsterはAssetとしてdbtのModelを扱えるよ。ETLTのETL部分をDagsterで行えるよ
  • DagsterはIOManager(Assetの保存先)としてDuckDBを使えるよ
  • ExamplesはDagsterの入門として勉強になるよ
    • ただし、自分の環境ではちょっと変更が必要だったよ

Dagsterとは

公式サイト曰く

Dagster is an orchestrator that's designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.

です。詳しくは、🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙や公式サイトを参照してください。

DuckDBとは

公式サイト曰く

DuckDB is an in-process SQL OLAP database management system

で、SQLiteのOLAP版のようなデータベースです。詳しくは、🦆🦆🦆🦆🦆🦆DuckDB入門🦆🦆🦆🦆🦆🦆や公式サイトを参照してください。

dagster-duckdbプラグインを利用することで、DagsterはAsset(Dagsterが扱うデータ)の保存先(IOManager)としてDuckDBを利用することが出来るので、DuckDBにデータを保存したりDuckDB上のデータを処理に利用できます。

DuckDBとデータをやり取りする時はPySparkのDataFrameか、PandasのDataFrame経由を経由してやり取りします。

dbtとは

公式サイト曰く

dbt™ is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, CI/CD, and documentation. Now anyone on the data team can safely contribute to production-grade data pipelines.

です。日本語のドキュメントも色々ありますので、例えばdbt-tokyoさんの資料などを参照してください。

dagster-dbtプラグインを使うことで、Dagsterからdbtの機能を使うことができます。「機能を使う」とは具体的には、

ことができます(dbt Core・dbt Cloud双方に可能)。

チュートリアルでは、前者のAssetとして扱う方法が紹介されています。この方法では、dbtのmodelを、DagsterのAsset(Dagsterが扱うデータ)として取り込みます。


Dagster公式サイトより

テーブルの間の処理だけであれば、dbt単体でも出来ると思いますが、Dagsterを組みあわせることにより

  • dbtのデータから、データベースの外で処理をする(今回のチュートリアルではグラフ化)
  • データベースの外でデータを取得し、データベースに入れる
    • ETLTのETLの部分

をDagsterで実行することが出来ます。

dbt and Dagster software-defined assets tutorial

概要

https://docs.dagster.io/integrations/dbt

Dagsterのexamples/で公開されているチュートリアルの一つで、DagsterとDuckDB・dbtを連携させ、以下の処理を行います。

  1. HTTP(S)リクエストでCSVを取得し、DuckDBにAssetとして保存
  2. dbtのモデルを実行し、(1)で取得したデータから、別のDuckDBテーブルを作成
  3. (2)で作成されたテーブルから、plotlyでグラフを作成

DuckDB・dbtとの連携の概要がわかるほか、AssetsやIOManagerの概念のキャッチアップの助けにもなるチュートリアルだと思います。

なお、このチュートリアルではDuckDBとdbtの両方を使っていますが、この二つは別々の連携で、Dagsterをdbtだけと連携させること、あるいは、DuckDBだけと連携させることも可能です。

やってみた時にハマった点

基本的にドキュメントに従えば良いですが、私の環境ではいくつかハマる点がありました。

  • 各種プラグインの最新バージョン(0.17.6)ではdagit起動時にエラーが発生しました。バージョンを一つ(0.17.5)下げるとエラーが解消しました
    • pip install dagster-dbt==0.17.5 dagster-duckdb==0.17.5 dagster-duckdb-pandas==0.17.5 dagster-pandas==0.17.5
  • HTMLにグラフを出力する最後のAsset(order_count_chart)で、ドキュメントの記載ではエラーが発生しました(HTMLの出力自体はできている)
    • 戻り値をNoneにするとエラーが解消しました
    • def order_count_chart(context, customers: pd.DataFrame) -> None:

最新バージョンのDagsterでは以下のエラーが発生

The above exception was caused by the following exception:
ImportError: cannot import name 'OpExecutionContext' from 'dagster._legacy' (/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_legacy/__init__.py)

Stack Trace:
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
    return importlib.import_module(module_name)
  File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 848, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_template/tutorial_dbt_dagster/__init__.py", line 4, in <module>
    from dagster_dbt import dbt_cli_resource
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster_dbt/__init__.py", line 3, in <module>
    from .asset_defs import load_assets_from_dbt_manifest, load_assets_from_dbt_project
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 41, in <module>
    from dagster._legacy import OpExecutionContext

最後のAsset((order_count_chart)で、ドキュメントの記載ではエラー

dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "order_count_chart":
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
    for elt in iterate_with_context(
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
    return
  File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Invariant failed. Description: Unexpected 'None' output value. If a 'None' value is intentional, set the output type to None.
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
    yield
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
    next_output = next(iterator)
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/storage/db_io_manager.py", line 106, in handle_output
    check.invariant(
  File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1627, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")

触ってみる

初期化

# 前述の理由で最新バージョンの一つ前Dagsterを使用
pip install dagster==1.1.5

# チュートリアルのディレクトリとパッケージの初期化
dagster project from-example --name tutorial_dbt_dagster --example tutorial_dbt_dagster
cd tutorial_dbt_dagster
pip install -e ".[dev]"

# 前述の理由で最新バージョンの一つ前を使用
pip install dagster-dbt==0.17.5 dagster-duckdb==0.17.5 dagster-duckdb-pandas==0.17.5 dagster-pandas==0.17.5

チュートリアルのドキュメントでは、tutorial_templateに記載を少しづつ処理を追加していきます。行う設定(AssetやRepositoryの記載)や、設定の意味の説明はそちらを見ていただきたいのですが、タイピングが面倒な人や先に着地点を見たい人は、tutorial_finishedディレクトリで実行できます。
(上述のorder_count_chartの変更は必要)

cd tutorial_finished
dagit

でdagit画面を起動し、左上のtutorial_db_dagster->stagingを開き、「Materialize All」します。


(order_count_chartをそのままで実行したのでエラーになっています)

なお、単にdagitを起動すると設定や履歴が消えるので、DAGSTER_HOMEを適当に設定し、履歴を残すことをおススメします。

# 保存用のディレクトリを作成
mkdir dagster_home
# DAGSTER_HOMEを指定して実行
DAGSTER_HOME=$(pwd)/dagster_home dagit

結果の確認

このチュートリアルのJob・Assetを実行すると、

  • グラフがプロットされたHTMLファイル(tutorial_dbt_dagster/assets/order_count_chart.html)
  • DuckDBファイル・テーブル(jaffle_shop/tutorial.duckdb)

が作成されますので、順に見ていきます。

最終結果のHTMLファイルはtutorial_dbt_dagster/assets/index.htmlに出力されます。

DuckDBのファイルはjaffle_shop/tutorial.duckdbに出力されます。なお、DuckDBの制約上、duckdbコマンドとDagsterの片方しかアクセスできないので、Dagsterの処理が動いてる時はduckdbコマンドで開かないようにしましょう。

# duckdbコマンドはhttps://duckdb.org/docs/installation/から適当にインストールしてください
duckdb  jaffle_shop/tutorial.duckdb -c '.tables'
customers      customers_raw  orders_raw     stg_customers  stg_orders

# 念のため空テーブルで無いことも確認
duckdb  jaffle_shop/tutorial.duckdb -c 'SELECT COUNT(*) FROM jaffle_shop.orders_raw'
┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│           99 │
└──────────────┘

(補足)dbtプロジェクト

dbtのディレクトリ(jaffle_shop)側はDagsterとは独立(Dagsterに関する記載が無い)ので、dbtコマンドが普通に使えます。既存のdbtプロジェクトを使う時も安心ですね。

試しに~/.dbt/profiles.ymlにプロファイルを追加(下)し、dbt docsを作成してみます。

jaffle_shop:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: /home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_finished/jaffle_shop/tutorial.duckdb
pushd jaffle_shop/

dbt docs generate
11:58:00  Running with dbt=1.3.1
11:58:00  Found 3 models, 7 tests, 0 snapshots, 0 analyses, 292 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
11:58:00
11:58:00  Concurrency: 1 threads (target='dev')
11:58:00
11:58:00  Done.
11:58:00  Building catalog
11:58:00  Catalog written to /home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_finished/jaffle_shop/target/catalog.json

dbt docs serve --port 1234

Discussion