🙆
① Prefect × dbt モデルを単発で実行する
0. やりたいこと
dbt のモデルたちを prefect を使って実行する。
1. Prefect とは
Prefectとはデータオーケストレーションフレームワークであり、データパイプラインの自動化、スケジューリング、モニタリング、および管理を行うためのPythonツール
※ ドキュメント
Welcome to Prefect
2. flow とは
Flowsとは関数のようなものであり、入力を受け取り、実行し、出力を返すことができます。@flow
デコレータを追加することで、任意の関数を Flow に変えることができます。
フローのサンプル
# sample.py
from prefect import flow
@flow
def my_flow():
return 1 + 1
sample.py を作成し、python -i sample.py
で対話的にフロー実行できます。今回作成した Flow を実行します(→ my_flow())。すると下記のような実行結果を取得できます。
21:03:14.530 | INFO | prefect.engine - Created flow run 'true-potoo' for flow 'my-flow'
21:03:14.583 | INFO | Flow run 'true-potoo' - Finished in state Completed()
2
3. dbt の モデルたちを実行
ディレクトリ構成
.
├── run_model.py
├── dbt_project
│ ├── .dbt
│ │ └── profiles.yml
│ └── models
│ ├── my_first_dbt_model.sql
│ └── my_second_dbt_model.sql
└── dbt_project.yml
# run_model.py
from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation
@flow
def trigger_dbt_flow() -> str:
result = DbtCoreOperation(
commands=["dbt run"],
project_dir="./demo_dbt/",
profiles_dir="./demo_dbt/.dbt"
).run()
return result
trigger_dbt_flow()
python run_model.py
を実行し、dbt のモデルたちを回します。排出されたログを見るとモデルが実行されていることが分かりました。
Discussion