🦔

DagsterのDeclarative Automation/Automation Conditionメモ

に公開

DagsterのDeclarative Automation/Automation Conditionについて触ってたのでメモです。

Declarative Automation/Automation Conditionとは

Declarative AutomationはAssetを実行(Materialize)する条件を設定し、それを満たした時にMaterializeする機能です。この時の条件をAutomation Conditionと呼びます。
元々Auto-Materializeという機能が1.3くらいにありましたが、それを1.8で汎用化・改名したようです。

元々、DagsterではAsset間の依存関係を

で定義することができます。しかし、引数・depsでは、「何に」依存しているかを定義していますが「どのように」依存しているかについては定義していません。
Declarative Automation/AUtomation Conditionによって「どのように」依存しているかを柔軟に定義できます。

「どのように」依存しているかとは

ここにあるような、

  • スケジュール(on_cron)
  • 依存先がMaterializeされてる、かつ、このAssetがMaterializeされない(on_missing)
  • 依存先が更新された時(eager)

だったり、プリミティブな条件の組みわせで条件を定義することが出来ます。

具体的な条件の詳細はソースコード(eagerだとここ)をご覧頂いた方がむしろわかりやすいかもしれません。

とりあえず試してみる

  1. Assetの準備
  2. Automation Conditionの定義
  3. Sensorの有効化
  4. 条件を満たすように状況を変更

で試してみます。

  1. Assetの準備
  2. Automation Conditionの定義

として、下のようなAssetを定義し、downstreamのAsset(my_second_asset)にupstreamの更新を条件(eager)を設定してみます。

import dagster as dg

@dg.asset
def my_first_asset(context: dg.AssetExecutionContext):
    return "Hello, world!"

condition = dg.AutomationCondition.eager()

@dg.asset(automation_condition=condition)
def my_second_asset(context: dg.AssetExecutionContext, my_first_asset: str):
    return f"{my_first_asset} This is my second asset!"

(Definitionsへの読み込みは省略)

ConditionはSensorによって監視されます。デフォルトではオフになってるので、Automationタブから有効化する必要があります。

上流(my_first_asset)だけをMaterializeした後、少し待つと下流(my_second_asset)がMaterializeされました。

気になったこと色々

条件の組み合わせ

ドキュメントに色々なビルトインの条件がありますが、主語は「target」(条件を設定する方のAsset)で「dependencies」(依存先のAsset)のものはありません。
例えば「依存先のMaterializeが失敗した時」や「依存先が失敗・実行にかかわらず実行された時」は、どのように定義するのでしょうか。

その方法としては、all_deps_match/any_deps_matchというOperatorがあり、これに依存先に指定したい条件を渡すと設定できるようです。
例えば依存先の更新any_deps_updatedは下のように実装されています

        return AutomationCondition.any_deps_match(
            (
                AutomationCondition.newly_updated()
                # executed_with_root_target is fairly expensive on a per-partition basis,
                # but newly_updated is bounded in the number of partitions that might be
                # updated on a single tick
                & ~AutomationCondition.executed_with_root_target()
            ).with_label("newly_updated_without_root")
            | AutomationCondition.will_be_requested()
        ).with_label("any_deps_updated")

これを真似して、「依存先のMaterializeが失敗した時」はany_deps_matchの中にexecution_failedを入れることで設定できるようです。

# なお、newly_true()つけないと、上流が失敗した以降何度もMaterializeされます
condition = dg.AutomationCondition.any_deps_match(
          dg.AutomationCondition.execution_failed().newly_true()
    ).since_last_handled()

Asset Checkとの関係

DagsterにはAsset Checkという、Assetの状態をMaterializeの後にチェックする機能があります。Asset Checkの失敗はAutomation Conditionにどう影響するのでしょうか?

試した感じ、以下の挙動なようです:

  • eagerでは、Asset Checkに失敗しても条件を満たしたとみなされ、downstreamのAssetが実行されうる(Asset Check失敗でもMaterialize自体は出来てるので)
  • all_deps_blocking_checks_passed(もしくはよりプリミティブなcheck_passed)という条件があり、Asset Checkの状態をチェックすることができます
  • (余談)all_deps_blocking_checks_passedだけだと、Asset Checkが成功した後ずっとAutomation Conditionが成り立ち、Materializeされまくるのでeagerとかon_missingと組み合わせた方が良さそうです

試してみる

  • 上流のAsset(upstream_with_asset_check)とそれに依存するAsset(downstream)を設定
  • 下流のAssetにall_deps_blocking_checks_passedを設定
  • 上流のAssetにAsset Checkを設定しチェックを失敗させる
@dg.asset
def upstream_with_asset_check():
   return 1

@dg.asset_check(
      asset=upstream_with_asset_check,
      blocking=True
)
def check_upstream_with_asset_check():
    return dg.AssetCheckResult(
        passed=False
    )

# Materializeされまくるので、eagerとかon_missingと本当は組み合わせた方が良いです
check_condition = dg.AutomationCondition.all_deps_blocking_checks_passed()

@dg.asset(
      automation_condition=check_condition,
)
def downstream(upstream_with_asset_check: int):
    return 1 + upstream_with_asset_check

この状態で、上流のAsset(upstream_with_asset_check)だけをMaterializeすると、下流のAssetはMaterialize*されません*。

一方、Asset Checkを通る(AssetCheckResultのpassedをTrue)にすると下流のAssetがMaterializeされます。

dagster-dbt

dagster-dbtでもDagsterDbtTranslaterで、Automation Conditionsを設定できます。

dbt testとの組み合わせ

dagster-dbtのtestをAsset Checkとして実行する時も、上述のAsset CheckとAutomation Conditionsの関係が成り立ちます。
ただし、1.10.6時点では、dbt testに対応するAssetCheckのblockingはfalseなので、以下が言えそうです:

  • Materializeだけ見ている(eagerとか)場合、上流のモデルのdbt testが失敗してもDownstreamのモデルのMaterializeは発動する
  • その挙動を変えたい(上流のdbt testが失敗したら下流のMaterializeをストップ)場合、all_deps_blocking_checks_passedのような条件をblocking以外もチェックする条件に設定
  • testのSeverityは見られてない

Discussion