📚

Vertex Piplinesのコンポーネントのスキップ処理

2022/12/08に公開

目的

Vertex Pipelinesの一部のコンポーネント実行をスキップする処理を実現する

背景

パイプラインでdsl.Conditionを使用し、コンポーネントの出力結果によって一部のコンポーネント実行を条件分岐でスキップすることを考えていたが、以下のようなコードではエラーとなった。

def pipeline_fn():

    cond_task = cond_op()

    with dsl.Condition(cond_task.output == "true"):
        task1 = op1_a(...)

    with dsl.Condition(cond_task.output == "false"):
        task1 = op1_b(...)

    task2 = op2(task1.output)
    _ = op3(task2.output)

解決策

条件分岐後に実行されるステップに関しては関数としてまとめ、各dsl.Condition内で呼び出すようにした

def pipeline_fn():
    def _common_tasks(task1_out):
        task2 = op2(task1_out)
        _ = op3(task2.output)

    cond_task = cond_op()

    with dsl.Condition(cond_task.output == "true"):
        task1_a = op1_a(...)
        _common_tasks(task1_a.output)

    with dsl.Condition(cond_task.output == "false"):
        task1_b = op1_b(...)
        _common_tasks(task1_b.output)

Pipeline Parameter で条件分岐する場合

def pipeline_fn(cond):

    if cond:
        task1_a = op1_a(...)
        task1_out = task1_a.output
    else:
        task1_b = op1_b(...)
        task1_out = task1_b.output

    _ = op2(task1_out)

参考

同じような要望はあるが対応されておらず、背景に記述したようなやり方では実現が難しかった。
https://github.com/kubeflow/pipelines/issues/5422
https://github.com/kubeflow/pipelines/issues/6949

Discussion