dbtをもっと便利に:Dagsterでdbtを任意のオプションで実行する
はじめに
ラクスルでは Dagster + dbt でデータ基盤のパイプラインを制御しています。Dagster の概要やコンセプトは過去の Tech Blog で紹介しているのでこちらも合わせてご覧ください。
先日開催した、ラクスル × ノバセル × primeNumber社 のデータエンジニアリング勉強会で発表した内容の一部を元に Dagster + dbt の利用事例・実装例を紹介します。
どんな場面で役立つか
Dagster + dbt 連携でこういう処理を含む Job を1つ作っておくと色んな場面で使い勝手がよいですという紹介です。
Dagster の開発元が提供する公式学習コンテンツの Dagster University で紹介されている Asset-based Job、または Dagster におけるタスク表現にあたる Op を元にした Op-based Job をメインに運用していても、今回紹介する Op-based Job があると例えば下記のような場面で役立ちます。
- 新機能や機能変更をリリースし、特定 Data Asset やその後続を更新したいとき
- dbt を full-refresh オプション付き、かつ一部モデルに絞り dbt run したいとき
- dbt でない先行処理や後続処理と一緒に dbt run したいとき
Dagster から dbt を実行することで次のことができ、特に本番環境の運用面で恩恵があります。 (dbtコマンドで直接実行したらできない)
- Dagster の Data Asset に更新を反映できる
- 実行の履歴を残すことで、後で何かあった場合にも追える
- Sensor を使って Job の成功/失敗をアラート通知できる
dbt の Op 実装例
実装例は下記のようになります。前半部分ではどんな dbt オプションを渡すかの定義をしています。後半部分では dbt run で Asset を更新したことをDagsterに伝搬させています。
ここでdbtの --select
, --vars
, --exclude
, --full-refresh
などあらゆるオプションを渡せるようにしています。
なお、後半部分がないと Dagster の Asset に更新が反映されません。
※Source code
@op(
ins={
"previous_op": In(str, default_value=""),
"select_arg": In(str, default_value="*"),
"vars_arg": In(str, default_value=""),
"other_args": In(str, default_value=""),
},
out={"success_dbt_run": Out(str)},
)
def dbt_run(context, previous_op, select_arg, vars_arg, other_args):
context.log.info(f"select_arg={select_arg}, vars_arg={vars_arg}, other_args={other_args}")
run_args = ["run", "--select", select_arg, "--state", "target"]
if vars_arg:
run_args.extend(["--vars", vars_arg])
if other_args:
run_args.extend(other_args.split(" "))
dbt_run = context.resources.dbt.cli(run_args, context=context)
for raw_event in dbt_run.stream_raw_events():
context.log.info(raw_event)
for asset_event in raw_event.to_default_asset_events(dbt_manifest_path, context=context):
if isinstance(asset_event, AssetMaterialization):
yield asset_event
yield Output(value="success_dbt_run", output_name="success_dbt_run")
Op-based Job の例
上記を元にした Op-based Job は下記のようになります。例として分かりやすいように config も書きましたが、デフォルトでオプションを指定する必要なければ、ここで config 定義はなしでも大丈夫です。
Launchpad から手動実行
Dagster では Launchpad ページから config を任意に設定してジョブを手動実行することができます。
先の Op の実装で見せたように、dbt のあらゆるオプションをここで自由に指定して実行できます。
参考
さいごに
dbt を任意のオプションで実行できる Dagster + dbt 連携の方法について紹介しました。
今回、dbt run を含む Job を紹介しましたが、ユースケース次第で前段に dbt run-operation であったり、先行または後続として dbt でない Op を追加してもよいと思います。
ちなみに、実装にあたっては 公式 Docs に加えて Github Discussions を調べながら進めました。公式 Docs に書かれてない内容が Github Discussions にあったりするので、困ったときはこちらを調べてみるのはオススメです。
Dagster の実装例について情報はまだ多くないと思うので、参考になれば幸いです。
Discussion