🐙🐙🐙オーケストレーションツールDagster入門してみた(dbt・DuckDB編)🦆🦆🦆
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#11です。
🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙の続きで、Dagsterとdbt・DuckDBを連携させるチュートリアルを触ってみました。
tl;dr
- DagsterはAssetとしてdbtのModelを扱えるよ。ETLTのETL部分をDagsterで行えるよ
- DagsterはIOManager(Assetの保存先)としてDuckDBを使えるよ
-
ExamplesはDagsterの入門として勉強になるよ
- ただし、自分の環境ではちょっと変更が必要だったよ
Dagsterとは
Dagster is an orchestrator that's designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.
です。詳しくは、🐙🐙🐙オーケストレーションツールDagster入門してみた(概念編)🐙🐙🐙や公式サイトを参照してください。
DuckDBとは
DuckDB is an in-process SQL OLAP database management system
で、SQLiteのOLAP版のようなデータベースです。詳しくは、🦆🦆🦆🦆🦆🦆DuckDB入門🦆🦆🦆🦆🦆🦆や公式サイトを参照してください。
dagster-duckdbプラグインを利用することで、DagsterはAsset(Dagsterが扱うデータ)の保存先(IOManager)としてDuckDBを利用することが出来るので、DuckDBにデータを保存したりDuckDB上のデータを処理に利用できます。
DuckDBとデータをやり取りする時はPySparkのDataFrameか、PandasのDataFrame経由を経由してやり取りします。
dbtとは
dbt™ is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, CI/CD, and documentation. Now anyone on the data team can safely contribute to production-grade data pipelines.
です。日本語のドキュメントも色々ありますので、例えばdbt-tokyoさんの資料などを参照してください。
dagster-dbtプラグインを使うことで、Dagsterからdbtの機能を使うことができます。「機能を使う」とは具体的には、
- Asset(Dagsterが扱うデータ)の定義先として使う
- Op(Dagsterでの処理)として呼び出す
ことができます(dbt Core・dbt Cloud双方に可能)。
チュートリアルでは、前者のAssetとして扱う方法が紹介されています。この方法では、dbtのmodelを、DagsterのAsset(Dagsterが扱うデータ)として取り込みます。
テーブルの間の処理だけであれば、dbt単体でも出来ると思いますが、Dagsterを組みあわせることにより
- dbtのデータから、データベースの外で処理をする(今回のチュートリアルではグラフ化)
- データベースの外でデータを取得し、データベースに入れる
- ETLTのETLの部分
をDagsterで実行することが出来ます。
dbt and Dagster software-defined assets tutorial
概要
Dagsterのexamples/で公開されているチュートリアルの一つで、DagsterとDuckDB・dbtを連携させ、以下の処理を行います。
- HTTP(S)リクエストでCSVを取得し、DuckDBにAssetとして保存
- dbtのモデルを実行し、(1)で取得したデータから、別のDuckDBテーブルを作成
- (2)で作成されたテーブルから、plotlyでグラフを作成
DuckDB・dbtとの連携の概要がわかるほか、AssetsやIOManagerの概念のキャッチアップの助けにもなるチュートリアルだと思います。
なお、このチュートリアルではDuckDBとdbtの両方を使っていますが、この二つは別々の連携で、Dagsterをdbtだけと連携させること、あるいは、DuckDBだけと連携させることも可能です。
やってみた時にハマった点
基本的にドキュメントに従えば良いですが、私の環境ではいくつかハマる点がありました。
- 各種プラグインの最新バージョン(0.17.6)ではdagit起動時にエラーが発生しました。バージョンを一つ(0.17.5)下げるとエラーが解消しました
pip install dagster-dbt==0.17.5 dagster-duckdb==0.17.5 dagster-duckdb-pandas==0.17.5 dagster-pandas==0.17.5
- HTMLにグラフを出力する最後のAsset(order_count_chart)で、ドキュメントの記載ではエラーが発生しました(HTMLの出力自体はできている)
- 戻り値をNoneにするとエラーが解消しました
def order_count_chart(context, customers: pd.DataFrame) -> None:
最新バージョンのDagsterでは以下のエラーが発生
The above exception was caused by the following exception:
ImportError: cannot import name 'OpExecutionContext' from 'dagster._legacy' (/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_legacy/__init__.py)
Stack Trace:
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 848, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_template/tutorial_dbt_dagster/__init__.py", line 4, in <module>
from dagster_dbt import dbt_cli_resource
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster_dbt/__init__.py", line 3, in <module>
from .asset_defs import load_assets_from_dbt_manifest, load_assets_from_dbt_project
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 41, in <module>
from dagster._legacy import OpExecutionContext
最後のAsset((order_count_chart)で、ドキュメントの記載ではエラー
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "order_count_chart":
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
for elt in iterate_with_context(
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
return
File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Invariant failed. Description: Unexpected 'None' output value. If a 'None' value is intentional, set the output type to None.
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
next_output = next(iterator)
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_core/storage/db_io_manager.py", line 106, in handle_output
check.invariant(
File "/home/notrogue/project/dagster/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1627, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
触ってみる
初期化
# 前述の理由で最新バージョンの一つ前Dagsterを使用
pip install dagster==1.1.5
# チュートリアルのディレクトリとパッケージの初期化
dagster project from-example --name tutorial_dbt_dagster --example tutorial_dbt_dagster
cd tutorial_dbt_dagster
pip install -e ".[dev]"
# 前述の理由で最新バージョンの一つ前を使用
pip install dagster-dbt==0.17.5 dagster-duckdb==0.17.5 dagster-duckdb-pandas==0.17.5 dagster-pandas==0.17.5
チュートリアルのドキュメントでは、tutorial_templateに記載を少しづつ処理を追加していきます。行う設定(AssetやRepositoryの記載)や、設定の意味の説明はそちらを見ていただきたいのですが、タイピングが面倒な人や先に着地点を見たい人は、tutorial_finishedディレクトリで実行できます。
(上述のorder_count_chartの変更は必要)
cd tutorial_finished
dagit
でdagit画面を起動し、左上のtutorial_db_dagster->stagingを開き、「Materialize All」します。
(order_count_chartをそのままで実行したのでエラーになっています)
なお、単にdagitを起動すると設定や履歴が消えるので、DAGSTER_HOMEを適当に設定し、履歴を残すことをおススメします。
# 保存用のディレクトリを作成
mkdir dagster_home
# DAGSTER_HOMEを指定して実行
DAGSTER_HOME=$(pwd)/dagster_home dagit
結果の確認
このチュートリアルのJob・Assetを実行すると、
- グラフがプロットされたHTMLファイル(tutorial_dbt_dagster/assets/order_count_chart.html)
- DuckDBファイル・テーブル(jaffle_shop/tutorial.duckdb)
が作成されますので、順に見ていきます。
最終結果のHTMLファイルはtutorial_dbt_dagster/assets/index.html
に出力されます。
DuckDBのファイルはjaffle_shop/tutorial.duckdb
に出力されます。なお、DuckDBの制約上、duckdbコマンドとDagsterの片方しかアクセスできないので、Dagsterの処理が動いてる時はduckdbコマンドで開かないようにしましょう。
# duckdbコマンドはhttps://duckdb.org/docs/installation/から適当にインストールしてください
duckdb jaffle_shop/tutorial.duckdb -c '.tables'
customers customers_raw orders_raw stg_customers stg_orders
# 念のため空テーブルで無いことも確認
duckdb jaffle_shop/tutorial.duckdb -c 'SELECT COUNT(*) FROM jaffle_shop.orders_raw'
┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 99 │
└──────────────┘
(補足)dbtプロジェクト
dbtのディレクトリ(jaffle_shop)側はDagsterとは独立(Dagsterに関する記載が無い)ので、dbtコマンドが普通に使えます。既存のdbtプロジェクトを使う時も安心ですね。
試しに~/.dbt/profiles.ymlにプロファイルを追加(下)し、dbt docsを作成してみます。
jaffle_shop:
target: dev
outputs:
dev:
type: duckdb
path: /home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_finished/jaffle_shop/tutorial.duckdb
pushd jaffle_shop/
dbt docs generate
11:58:00 Running with dbt=1.3.1
11:58:00 Found 3 models, 7 tests, 0 snapshots, 0 analyses, 292 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
11:58:00
11:58:00 Concurrency: 1 threads (target='dev')
11:58:00
11:58:00 Done.
11:58:00 Building catalog
11:58:00 Catalog written to /home/notrogue/project/dagster/tutorial_dbt_dagster/tutorial_finished/jaffle_shop/target/catalog.json
dbt docs serve --port 1234
Discussion