👻

Kubeflow Pipelinesで条件分岐をする方法

に公開

今回はKubeflow Pipelinesで条件分岐を利用する方法について調べてみたので共有したいと思います。

早速やってみる

今回はKubeflowが提供している公式ドキュメントを元にしています。構成としては、まずはローカルでKubeflow Piprlines(以下、kfp)を用いてパイプラインを実装し、パイプラインファイルをCloud Storageにアップロードした上でVertex AI Pipelinesから起動します。

https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/control-flow/

コードの実装

まずはコードの全体間ですが、以下のようになります。

coin_flip.py
from kfp import dsl, compiler, local


@dsl.component
def flip_coin() -> str:
    import random
    return random.choice(['heads', 'tails'])


@dsl.component
def head() -> str:
    return "It'll be sunny tomorrow."


@dsl.component
def tail() -> str:
    return "It'll rain tomorrow."


@dsl.component
def undefined_behaviour() -> str:
    return "Weather forecast must not be beleived."


@dsl.pipeline
def pipeline() -> str:
    flip_result = flip_coin()
    with dsl.If(flip_result.output == 'heads'):
        head_result = head()
    with dsl.Elif(flip_result.output == 'tails'):
        tail_result = tail()
    with dsl.Else():
        undefined_result = undefined_behaviour()


    oneof = dsl.OneOf(head_result.output, tail_result.output, undefined_result.output)
    return oneof


compiler.Compiler().compile(pipeline, "pipeline.yaml")

今回のコードでは、以下の手順で処理が進みます。

  1. flip_coinにより表と裏どちらかを取得する
  2. 条件分岐によりどの面がでたかによって呼び出すコンポーネントを変更する
  3. 2で呼び出されたコンポーネントからの戻り値をパイプラインのOutputとする

それではそれぞれの実装をみてみましょう。

最初に実装しているflip_coinでは表と裏どちらかをrandom.choiceを用いて返す単純な関数になっています。kfpではコンポーネントを作成するときは@dsk.componentをデコレータとしてつけることになっているので、ここでもそのように実装しています。

@dsl.component
def flip_coin() -> str:
    import random
    return random.choice(['heads', 'tails'])

次にそれぞれの面がでた時に呼び出される関数を定義しています。今顔はifの条件を表、elifの条件を裏としたのですが、elseの条件を作る必要があるので未定義動作としてundefined_behaviourコンポーネントも用意しています。なお、戻り値については深い意味はないですが天気を返すようにしています。

@dsl.component
def head() -> str:
    return "It'll be sunny tomorrow."


@dsl.component
def tail() -> str:
    return "It'll rain tomorrow."


@dsl.component
def undefined_behaviour() -> str:
    return "Weather forecast must not be beleived."

最後にここまで作成したコンポーネントを呼び出すパイプラインを定義します。今回重要な点として、dsl.Ifdls.Elifそしてdls.Elseを利用している点です。これはPythonのif/elif/elseでも実装はできますが、用意された機能を利用することによりパイプラインとして条件分岐がなされていることを明示できます。条件を書くときはwith dsl.Ifのようにwithステートメントを利用します。最後に利用しているdls.OneOfですが、withステートメントは全て呼び出されるので、それぞれの結果の変数自体は計算されます。その中で結果として返すものをdls.OneOfに入れることで、実際に計算された結果のみを取得できます。

@dsl.pipeline
def pipeline() -> str:
    flip_result = flip_coin()
    with dsl.If(flip_result.output == 'heads'):
        head_result = head()
    with dsl.Elif(flip_result.output == 'tails'):
        tail_result = tail()
    with dsl.Else():
        undefined_result = undefined_behaviour()


    oneof = dsl.OneOf(head_result.output, tail_result.output, undefined_result.output)
    return oneof

パイプラインの実装ができればcompiler.Compiler().compileを利用してYAMLファイルとして出力することができます。このコードを実行するとpipeline.yamlと言うファイルが生成されます。

pipeline.yaml
# PIPELINE DEFINITION
# Name: pipeline
# Outputs:
#    Output: str
components:
  comp-condition-2:
    dag:
      outputs:
        parameters:
          pipelinechannel--head-Output:
            valueFromParameter:
              outputParameterKey: Output
              producerSubtask: head
      tasks:
        head:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-head
          taskInfo:
            name: head
    inputDefinitions:
      parameters:
        pipelinechannel--flip-coin-Output:
          parameterType: STRING
    outputDefinitions:
      parameters:
        pipelinechannel--head-Output:
          parameterType: STRING
  comp-condition-3:
    dag:
      outputs:
        parameters:
          pipelinechannel--tail-Output:
            valueFromParameter:
              outputParameterKey: Output
              producerSubtask: tail
      tasks:
        tail:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-tail
          taskInfo:
            name: tail
    inputDefinitions:
      parameters:
        pipelinechannel--flip-coin-Output:
          parameterType: STRING
    outputDefinitions:
      parameters:
        pipelinechannel--tail-Output:
          parameterType: STRING
  comp-condition-4:
    dag:
      outputs:
        parameters:
          pipelinechannel--undefined-behaviour-Output:
            valueFromParameter:
              outputParameterKey: Output
              producerSubtask: undefined-behaviour
      tasks:
        undefined-behaviour:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-undefined-behaviour
          taskInfo:
            name: undefined-behaviour
    inputDefinitions:
      parameters:
        pipelinechannel--flip-coin-Output:
          parameterType: STRING
    outputDefinitions:
      parameters:
        pipelinechannel--undefined-behaviour-Output:
          parameterType: STRING
  comp-condition-branches-1:
    dag:
      outputs:
        parameters:
          pipelinechannel--condition-branches-1-oneof-1:
            valueFromOneof:
              parameterSelectors:
              - outputParameterKey: pipelinechannel--head-Output
                producerSubtask: condition-2
              - outputParameterKey: pipelinechannel--tail-Output
                producerSubtask: condition-3
              - outputParameterKey: pipelinechannel--undefined-behaviour-Output
                producerSubtask: condition-4
      tasks:
        condition-2:
          componentRef:
            name: comp-condition-2
          inputs:
            parameters:
              pipelinechannel--flip-coin-Output:
                componentInputParameter: pipelinechannel--flip-coin-Output
          taskInfo:
            name: condition-2
          triggerPolicy:
            condition: inputs.parameter_values['pipelinechannel--flip-coin-Output']
              == 'heads'
        condition-3:
          componentRef:
            name: comp-condition-3
          inputs:
            parameters:
              pipelinechannel--flip-coin-Output:
                componentInputParameter: pipelinechannel--flip-coin-Output
          taskInfo:
            name: condition-3
          triggerPolicy:
            condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
              == ''heads'') && inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
              == ''tails'''
        condition-4:
          componentRef:
            name: comp-condition-4
          inputs:
            parameters:
              pipelinechannel--flip-coin-Output:
                componentInputParameter: pipelinechannel--flip-coin-Output
          taskInfo:
            name: condition-4
          triggerPolicy:
            condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
              == ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
              == ''tails'')'
    inputDefinitions:
      parameters:
        pipelinechannel--flip-coin-Output:
          parameterType: STRING
    outputDefinitions:
      parameters:
        pipelinechannel--condition-branches-1-oneof-1:
          parameterType: STRING
  comp-flip-coin:
    executorLabel: exec-flip-coin
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-head:
    executorLabel: exec-head
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-tail:
    executorLabel: exec-tail
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-undefined-behaviour:
    executorLabel: exec-undefined-behaviour
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-flip-coin:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - flip_coin
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef flip_coin() -> str:\n    import random\n    return random.choice(['heads',\
          \ 'tails'])\n\n"
        image: python:3.9
    exec-head:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - head
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef head() -> str:\n    return \"It'll be sunny tomorrow.\"\n\n"
        image: python:3.9
    exec-tail:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - tail
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef tail() -> str:\n    return \"It'll rain tomorrow.\"\n\n"
        image: python:3.9
    exec-undefined-behaviour:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - undefined_behaviour
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef undefined_behaviour() -> str:\n    return \"Weather forecast\
          \ must not be beleived.\"\n\n"
        image: python:3.9
pipelineInfo:
  name: pipeline
root:
  dag:
    outputs:
      parameters:
        Output:
          valueFromParameter:
            outputParameterKey: pipelinechannel--condition-branches-1-oneof-1
            producerSubtask: condition-branches-1
    tasks:
      condition-branches-1:
        componentRef:
          name: comp-condition-branches-1
        dependentTasks:
        - flip-coin
        inputs:
          parameters:
            pipelinechannel--flip-coin-Output:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: flip-coin
        taskInfo:
          name: condition-branches-1
      flip-coin:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-flip-coin
        taskInfo:
          name: flip-coin
  outputDefinitions:
    parameters:
      Output:
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3

Vertex AI Pipelinesでの実行

それではVertex AI Pipelines上で実行してみましょう。

まずは先ほど生成したpipeline.yamlをCloud Storageにアップロードします。なお、アップロードかr実行までの手順は以下の記事で紹介しているのでぜひ参考にしてください。

https://zenn.dev/akasan/articles/5697384428538b

今回作ったパイプラインを実行すると例えば以下のような結果になりました。この例では最初のflip_cointailsがでたので、条件分岐としてはdls.Elifに進み、結果として It'll rain tomorrow.が出力されました。

まとめ

今回はkfpで提供されている条件分岐の機能を利用してみました。応用方法で言うと、取得されたデータの内容によってどのモデルで学習するかを動的に変更したり、特別な前処理が必要なデータがくればそれを適用するみたいなことがあるのではないでしょうか。みなさんもぜひkfpを利用して様々なパイプラインを実装してみてください。

Discussion