DagsterのDeclarative Automation/Automation Conditionメモ
DagsterのDeclarative Automation/Automation Conditionについて触ってたのでメモです。
Declarative Automation/Automation Conditionとは
Declarative AutomationはAssetを実行(Materialize)する条件を設定し、それを満たした時にMaterializeする機能です。この時の条件をAutomation Conditionと呼びます。
元々Auto-Materializeという機能が1.3くらいにありましたが、それを1.8で汎用化・改名したようです。
元々、DagsterではAsset間の依存関係を
で定義することができます。しかし、引数・depsでは、「何に」依存しているかを定義していますが「どのように」依存しているかについては定義していません。
Declarative Automation/AUtomation Conditionによって「どのように」依存しているかを柔軟に定義できます。
「どのように」依存しているかとは
ここにあるような、
- スケジュール(on_cron)
- 依存先がMaterializeされてる、かつ、このAssetがMaterializeされない(on_missing)
- 依存先が更新された時(eager)
だったり、プリミティブな条件の組みわせで条件を定義することが出来ます。
具体的な条件の詳細はソースコード(eagerだとここ)をご覧頂いた方がむしろわかりやすいかもしれません。
とりあえず試してみる
- Assetの準備
- Automation Conditionの定義
- Sensorの有効化
- 条件を満たすように状況を変更
で試してみます。
- Assetの準備
- Automation Conditionの定義
として、下のようなAssetを定義し、downstreamのAsset(my_second_asset)にupstreamの更新を条件(eager)を設定してみます。
import dagster as dg
@dg.asset
def my_first_asset(context: dg.AssetExecutionContext):
return "Hello, world!"
condition = dg.AutomationCondition.eager()
@dg.asset(automation_condition=condition)
def my_second_asset(context: dg.AssetExecutionContext, my_first_asset: str):
return f"{my_first_asset} This is my second asset!"
(Definitionsへの読み込みは省略)
ConditionはSensorによって監視されます。デフォルトではオフになってるので、Automationタブから有効化する必要があります。
上流(my_first_asset)だけをMaterializeした後、少し待つと下流(my_second_asset)がMaterializeされました。
気になったこと色々
条件の組み合わせ
ドキュメントに色々なビルトインの条件がありますが、主語は「target」(条件を設定する方のAsset)で「dependencies」(依存先のAsset)のものはありません。
例えば「依存先のMaterializeが失敗した時」や「依存先が失敗・実行にかかわらず実行された時」は、どのように定義するのでしょうか。
その方法としては、all_deps_match/any_deps_matchというOperatorがあり、これに依存先に指定したい条件を渡すと設定できるようです。
例えば依存先の更新any_deps_updatedは下のように実装されています。
return AutomationCondition.any_deps_match(
(
AutomationCondition.newly_updated()
# executed_with_root_target is fairly expensive on a per-partition basis,
# but newly_updated is bounded in the number of partitions that might be
# updated on a single tick
& ~AutomationCondition.executed_with_root_target()
).with_label("newly_updated_without_root")
| AutomationCondition.will_be_requested()
).with_label("any_deps_updated")
これを真似して、「依存先のMaterializeが失敗した時」はany_deps_matchの中にexecution_failedを入れることで設定できるようです。
# なお、newly_true()つけないと、上流が失敗した以降何度もMaterializeされます
condition = dg.AutomationCondition.any_deps_match(
dg.AutomationCondition.execution_failed().newly_true()
).since_last_handled()
Asset Checkとの関係
DagsterにはAsset Checkという、Assetの状態をMaterializeの後にチェックする機能があります。Asset Checkの失敗はAutomation Conditionにどう影響するのでしょうか?
試した感じ、以下の挙動なようです:
- eagerでは、Asset Checkに失敗しても条件を満たしたとみなされ、downstreamのAssetが実行されうる(Asset Check失敗でもMaterialize自体は出来てるので)
- all_deps_blocking_checks_passed(もしくはよりプリミティブなcheck_passed)という条件があり、Asset Checkの状態をチェックすることができます
- (余談)all_deps_blocking_checks_passedだけだと、Asset Checkが成功した後ずっとAutomation Conditionが成り立ち、Materializeされまくるのでeagerとかon_missingと組み合わせた方が良さそうです
試してみる
- 上流のAsset(upstream_with_asset_check)とそれに依存するAsset(downstream)を設定
- 下流のAssetにall_deps_blocking_checks_passedを設定
- 上流のAssetにAsset Checkを設定しチェックを失敗させる
@dg.asset
def upstream_with_asset_check():
return 1
@dg.asset_check(
asset=upstream_with_asset_check,
blocking=True
)
def check_upstream_with_asset_check():
return dg.AssetCheckResult(
passed=False
)
# Materializeされまくるので、eagerとかon_missingと本当は組み合わせた方が良いです
check_condition = dg.AutomationCondition.all_deps_blocking_checks_passed()
@dg.asset(
automation_condition=check_condition,
)
def downstream(upstream_with_asset_check: int):
return 1 + upstream_with_asset_check
この状態で、上流のAsset(upstream_with_asset_check)だけをMaterializeすると、下流のAssetはMaterialize*されません*。
一方、Asset Checkを通る(AssetCheckResultのpassedをTrue)にすると下流のAssetがMaterializeされます。
dagster-dbt
dagster-dbtでもDagsterDbtTranslaterで、Automation Conditionsを設定できます。
dbt testとの組み合わせ
dagster-dbtのtestをAsset Checkとして実行する時も、上述のAsset CheckとAutomation Conditionsの関係が成り立ちます。
ただし、1.10.6時点では、dbt testに対応するAssetCheckのblockingはfalseなので、以下が言えそうです:
- Materializeだけ見ている(eagerとか)場合、上流のモデルのdbt testが失敗してもDownstreamのモデルのMaterializeは発動する
- その挙動を変えたい(上流のdbt testが失敗したら下流のMaterializeをストップ)場合、all_deps_blocking_checks_passedのような条件をblocking以外もチェックする条件に設定
- testのSeverityは見られてない
Discussion