Kubeflow Pipelinesで条件分岐をする方法
今回はKubeflow Pipelinesで条件分岐を利用する方法について調べてみたので共有したいと思います。
早速やってみる
今回はKubeflowが提供している公式ドキュメントを元にしています。構成としては、まずはローカルでKubeflow Piprlines(以下、kfp)を用いてパイプラインを実装し、パイプラインファイルをCloud Storageにアップロードした上でVertex AI Pipelinesから起動します。
コードの実装
まずはコードの全体間ですが、以下のようになります。
from kfp import dsl, compiler, local
@dsl.component
def flip_coin() -> str:
import random
return random.choice(['heads', 'tails'])
@dsl.component
def head() -> str:
return "It'll be sunny tomorrow."
@dsl.component
def tail() -> str:
return "It'll rain tomorrow."
@dsl.component
def undefined_behaviour() -> str:
return "Weather forecast must not be beleived."
@dsl.pipeline
def pipeline() -> str:
flip_result = flip_coin()
with dsl.If(flip_result.output == 'heads'):
head_result = head()
with dsl.Elif(flip_result.output == 'tails'):
tail_result = tail()
with dsl.Else():
undefined_result = undefined_behaviour()
oneof = dsl.OneOf(head_result.output, tail_result.output, undefined_result.output)
return oneof
compiler.Compiler().compile(pipeline, "pipeline.yaml")
今回のコードでは、以下の手順で処理が進みます。
-
flip_coin
により表と裏どちらかを取得する - 条件分岐によりどの面がでたかによって呼び出すコンポーネントを変更する
- 2で呼び出されたコンポーネントからの戻り値をパイプラインのOutputとする
それではそれぞれの実装をみてみましょう。
最初に実装しているflip_coin
では表と裏どちらかをrandom.choice
を用いて返す単純な関数になっています。kfpではコンポーネントを作成するときは@dsk.component
をデコレータとしてつけることになっているので、ここでもそのように実装しています。
@dsl.component
def flip_coin() -> str:
import random
return random.choice(['heads', 'tails'])
次にそれぞれの面がでた時に呼び出される関数を定義しています。今顔はifの条件を表、elifの条件を裏としたのですが、elseの条件を作る必要があるので未定義動作としてundefined_behaviour
コンポーネントも用意しています。なお、戻り値については深い意味はないですが天気を返すようにしています。
@dsl.component
def head() -> str:
return "It'll be sunny tomorrow."
@dsl.component
def tail() -> str:
return "It'll rain tomorrow."
@dsl.component
def undefined_behaviour() -> str:
return "Weather forecast must not be beleived."
最後にここまで作成したコンポーネントを呼び出すパイプラインを定義します。今回重要な点として、dsl.If
、dls.Elif
そしてdls.Else
を利用している点です。これはPythonのif/elif/elseでも実装はできますが、用意された機能を利用することによりパイプラインとして条件分岐がなされていることを明示できます。条件を書くときはwith dsl.If
のようにwithステートメントを利用します。最後に利用しているdls.OneOf
ですが、withステートメントは全て呼び出されるので、それぞれの結果の変数自体は計算されます。その中で結果として返すものをdls.OneOf
に入れることで、実際に計算された結果のみを取得できます。
@dsl.pipeline
def pipeline() -> str:
flip_result = flip_coin()
with dsl.If(flip_result.output == 'heads'):
head_result = head()
with dsl.Elif(flip_result.output == 'tails'):
tail_result = tail()
with dsl.Else():
undefined_result = undefined_behaviour()
oneof = dsl.OneOf(head_result.output, tail_result.output, undefined_result.output)
return oneof
パイプラインの実装ができればcompiler.Compiler().compile
を利用してYAMLファイルとして出力することができます。このコードを実行するとpipeline.yaml
と言うファイルが生成されます。
pipeline.yaml
# PIPELINE DEFINITION
# Name: pipeline
# Outputs:
# Output: str
components:
comp-condition-2:
dag:
outputs:
parameters:
pipelinechannel--head-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: head
tasks:
head:
cachingOptions:
enableCache: true
componentRef:
name: comp-head
taskInfo:
name: head
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
outputDefinitions:
parameters:
pipelinechannel--head-Output:
parameterType: STRING
comp-condition-3:
dag:
outputs:
parameters:
pipelinechannel--tail-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: tail
tasks:
tail:
cachingOptions:
enableCache: true
componentRef:
name: comp-tail
taskInfo:
name: tail
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
outputDefinitions:
parameters:
pipelinechannel--tail-Output:
parameterType: STRING
comp-condition-4:
dag:
outputs:
parameters:
pipelinechannel--undefined-behaviour-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: undefined-behaviour
tasks:
undefined-behaviour:
cachingOptions:
enableCache: true
componentRef:
name: comp-undefined-behaviour
taskInfo:
name: undefined-behaviour
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
outputDefinitions:
parameters:
pipelinechannel--undefined-behaviour-Output:
parameterType: STRING
comp-condition-branches-1:
dag:
outputs:
parameters:
pipelinechannel--condition-branches-1-oneof-1:
valueFromOneof:
parameterSelectors:
- outputParameterKey: pipelinechannel--head-Output
producerSubtask: condition-2
- outputParameterKey: pipelinechannel--tail-Output
producerSubtask: condition-3
- outputParameterKey: pipelinechannel--undefined-behaviour-Output
producerSubtask: condition-4
tasks:
condition-2:
componentRef:
name: comp-condition-2
inputs:
parameters:
pipelinechannel--flip-coin-Output:
componentInputParameter: pipelinechannel--flip-coin-Output
taskInfo:
name: condition-2
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--flip-coin-Output']
== 'heads'
condition-3:
componentRef:
name: comp-condition-3
inputs:
parameters:
pipelinechannel--flip-coin-Output:
componentInputParameter: pipelinechannel--flip-coin-Output
taskInfo:
name: condition-3
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''heads'') && inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''tails'''
condition-4:
componentRef:
name: comp-condition-4
inputs:
parameters:
pipelinechannel--flip-coin-Output:
componentInputParameter: pipelinechannel--flip-coin-Output
taskInfo:
name: condition-4
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''tails'')'
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
outputDefinitions:
parameters:
pipelinechannel--condition-branches-1-oneof-1:
parameterType: STRING
comp-flip-coin:
executorLabel: exec-flip-coin
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-head:
executorLabel: exec-head
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-tail:
executorLabel: exec-tail
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-undefined-behaviour:
executorLabel: exec-undefined-behaviour
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-flip-coin:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- flip_coin
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef flip_coin() -> str:\n import random\n return random.choice(['heads',\
\ 'tails'])\n\n"
image: python:3.9
exec-head:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- head
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef head() -> str:\n return \"It'll be sunny tomorrow.\"\n\n"
image: python:3.9
exec-tail:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- tail
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef tail() -> str:\n return \"It'll rain tomorrow.\"\n\n"
image: python:3.9
exec-undefined-behaviour:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- undefined_behaviour
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef undefined_behaviour() -> str:\n return \"Weather forecast\
\ must not be beleived.\"\n\n"
image: python:3.9
pipelineInfo:
name: pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: pipelinechannel--condition-branches-1-oneof-1
producerSubtask: condition-branches-1
tasks:
condition-branches-1:
componentRef:
name: comp-condition-branches-1
dependentTasks:
- flip-coin
inputs:
parameters:
pipelinechannel--flip-coin-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
taskInfo:
name: condition-branches-1
flip-coin:
cachingOptions:
enableCache: true
componentRef:
name: comp-flip-coin
taskInfo:
name: flip-coin
outputDefinitions:
parameters:
Output:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3
Vertex AI Pipelinesでの実行
それではVertex AI Pipelines上で実行してみましょう。
まずは先ほど生成したpipeline.yaml
をCloud Storageにアップロードします。なお、アップロードかr実行までの手順は以下の記事で紹介しているのでぜひ参考にしてください。
今回作ったパイプラインを実行すると例えば以下のような結果になりました。この例では最初のflip_coin
でtails
がでたので、条件分岐としてはdls.Elif
に進み、結果として It'll rain tomorrow.
が出力されました。
まとめ
今回はkfpで提供されている条件分岐の機能を利用してみました。応用方法で言うと、取得されたデータの内容によってどのモデルで学習するかを動的に変更したり、特別な前処理が必要なデータがくればそれを適用するみたいなことがあるのではないでしょうか。みなさんもぜひkfpを利用して様々なパイプラインを実装してみてください。
Discussion