🐙

dbtをもっと便利に:Dagsterでdbtを任意のオプションで実行する

2024/12/16に公開

はじめに

ラクスルでは Dagster + dbt でデータ基盤のパイプラインを制御しています。Dagster の概要やコンセプトは過去の Tech Blog で紹介しているのでこちらも合わせてご覧ください。

https://techblog.raksul.com/entry/2022/12/02/101007

先日開催した、ラクスル × ノバセル × primeNumber社 のデータエンジニアリング勉強会で発表した内容の一部を元に Dagster + dbt の利用事例・実装例を紹介します。

https://note.primenumber.co.jp/n/n7e772a5a9de7
https://recruit.raksul.com/story/raksul-primenumber

どんな場面で役立つか

Dagster + dbt 連携でこういう処理を含む Job を1つ作っておくと色んな場面で使い勝手がよいですという紹介です。

Dagster の開発元が提供する公式学習コンテンツの Dagster University で紹介されている Asset-based Job、または Dagster におけるタスク表現にあたる Op を元にした Op-based Job をメインに運用していても、今回紹介する Op-based Job があると例えば下記のような場面で役立ちます。

  • 新機能や機能変更をリリースし、特定 Data Asset やその後続を更新したいとき
  • dbt を full-refresh オプション付き、かつ一部モデルに絞り dbt run したいとき
  • dbt でない先行処理や後続処理と一緒に dbt run したいとき

Dagster から dbt を実行することで次のことができ、特に本番環境の運用面で恩恵があります。 (dbtコマンドで直接実行したらできない)

  • Dagster の Data Asset に更新を反映できる
  • 実行の履歴を残すことで、後で何かあった場合にも追える
  • Sensor を使って Job の成功/失敗をアラート通知できる

dbt の Op 実装例

実装例は下記のようになります。前半部分ではどんな dbt オプションを渡すかの定義をしています。後半部分では dbt run で Asset を更新したことをDagsterに伝搬させています。
ここでdbtの --select, --vars, --exclude, --full-refresh などあらゆるオプションを渡せるようにしています。
なお、後半部分がないと Dagster の Asset に更新が反映されません。

※Source code
@op(
    ins={
        "previous_op": In(str, default_value=""),
        "select_arg": In(str, default_value="*"),
        "vars_arg": In(str, default_value=""),
        "other_args": In(str, default_value=""),
    },
    out={"success_dbt_run": Out(str)},
)
def dbt_run(context, previous_op, select_arg, vars_arg, other_args):
    context.log.info(f"select_arg={select_arg}, vars_arg={vars_arg}, other_args={other_args}")
    run_args = ["run", "--select", select_arg, "--state", "target"]
    if vars_arg:
        run_args.extend(["--vars", vars_arg])
    if other_args:
        run_args.extend(other_args.split(" "))
    dbt_run = context.resources.dbt.cli(run_args, context=context)

    for raw_event in dbt_run.stream_raw_events():
        context.log.info(raw_event)
        for asset_event in raw_event.to_default_asset_events(dbt_manifest_path, context=context):
            if isinstance(asset_event, AssetMaterialization):
                yield asset_event
    yield Output(value="success_dbt_run", output_name="success_dbt_run")

Op-based Job の例

上記を元にした Op-based Job は下記のようになります。例として分かりやすいように config も書きましたが、デフォルトでオプションを指定する必要なければ、ここで config 定義はなしでも大丈夫です。

Launchpad から手動実行

Dagster では Launchpad ページから config を任意に設定してジョブを手動実行することができます。
先の Op の実装で見せたように、dbt のあらゆるオプションをここで自由に指定して実行できます。

参考

https://github.com/dagster-io/dagster/discussions/18065

さいごに

dbt を任意のオプションで実行できる Dagster + dbt 連携の方法について紹介しました。

今回、dbt run を含む Job を紹介しましたが、ユースケース次第で前段に dbt run-operation であったり、先行または後続として dbt でない Op を追加してもよいと思います。

ちなみに、実装にあたっては 公式 Docs に加えて Github Discussions を調べながら進めました。公式 Docs に書かれてない内容が Github Discussions にあったりするので、困ったときはこちらを調べてみるのはオススメです。

Dagster の実装例について情報はまだ多くないと思うので、参考になれば幸いです。

RAKSUL Data Analytics

Discussion