🙆

① Prefect × dbt モデルを単発で実行する

2023/09/22に公開

0. やりたいこと

dbt のモデルたちを prefect を使って実行する。

1. Prefect とは

Prefectとはデータオーケストレーションフレームワークであり、データパイプラインの自動化、スケジューリング、モニタリング、および管理を行うためのPythonツール

※ ドキュメント
Welcome to Prefect

2. flow とは

Flowsとは関数のようなものであり、入力を受け取り、実行し、出力を返すことができます。@flowデコレータを追加することで、任意の関数を Flow に変えることができます。

フローのサンプル

# sample.py
from prefect import flow

@flow
def my_flow():
    return 1 + 1

sample.py を作成し、python -i sample.py で対話的にフロー実行できます。今回作成した Flow を実行します(→ my_flow())。すると下記のような実行結果を取得できます。

21:03:14.530 | INFO    | prefect.engine - Created flow run 'true-potoo' for flow 'my-flow'
21:03:14.583 | INFO    | Flow run 'true-potoo' - Finished in state Completed()
2

3. dbt の モデルたちを実行

ディレクトリ構成

.
├── run_model.py
├── dbt_project
│   ├── .dbt
│   │    └── profiles.yml
│   └── models
│        ├── my_first_dbt_model.sql
│        └── my_second_dbt_model.sql
└── dbt_project.yml
# run_model.py
from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def trigger_dbt_flow() -> str:
    result = DbtCoreOperation(
        commands=["dbt run"],
        project_dir="./demo_dbt/",
        profiles_dir="./demo_dbt/.dbt"
    ).run()
    return result

trigger_dbt_flow()

python run_model.pyを実行し、dbt のモデルたちを回します。排出されたログを見るとモデルが実行されていることが分かりました。

4. 参考

Discussion