Kubeflow Pipelinesでループを取り扱う方法
今回はKubeflow Pipelinesでループを利用する方法について調べてみたので共有したいと思います。このほかにKubeflowでは条件分岐も提供されており、昨日の記事にて共有していますので興味がある方はぜひこちらもご覧ください。
早速やってみる
今回もKubeflowが提供している公式ドキュメントを元にしています。構成としては、ローカルでKubeflow Piprlines(以下、kfp)を用いてパイプラインを実装し、パイプラインファイルをCloud Storageにアップロードした上でVertex AI Pipelinesから起動します。前回と同じ方法になります。
並列ループの実行
dls.ParallelFor
を利用することで並列実行をさせることができます。コードは以下のようになります。
from kfp import dsl, compiler
@dsl.component
def print_epoch(epoch: int) -> None:
print(f"{epoch=}")
@dsl.pipeline
def pipeline():
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
print_epoch(epoch=epochs)
compiler.Compiler().compile(pipeline, "loop.yaml")
まずは呼び出されるコンポーネントとしてprint_epoch
を定義しています。@dsl.component
デコレータを利用しているため、kfp上のコンポーネントとして取り扱われます。このコンポーネントでは受け取ったepochと言う変数をただ表示する内容となっています。
@dsl.component
def print_epoch(epoch: int) -> None:
print(f"{epoch=}")
次にパイプラインを実祖します。with dsl.ParallelFor
を利用することにより、指定したアイテムを並列ループのなかで扱うことができます。この例ではparallelism=2
にしており、2つの並列プロセス上で取り扱われます。withで取得されるepochsにはitemsで指定された数値がそれぞれ格納されます。
@dsl.pipeline
def pipeline():
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
print_epoch(epoch=epochs)
最後にcompiler.Compile().compile
関数にてパイプラインをコンパイルしてYAMLファイルを生成します。このYAMLファイルをVertex AI Pipelinesに受け渡して実行させます。
loop.yaml
# PIPELINE DEFINITION
# Name: pipeline
components:
comp-for-loop-2:
dag:
tasks:
print-epoch:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-epoch
inputs:
parameters:
epoch:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: print-epoch
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
comp-print-epoch:
executorLabel: exec-print-epoch
inputDefinitions:
parameters:
epoch:
parameterType: NUMBER_INTEGER
deploymentSpec:
executors:
exec-print-epoch:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_epoch
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_epoch(epoch: int) -> None:\n print(f\"{epoch=}\")\n\n"
image: python:3.9
pipelineInfo:
name: pipeline
root:
dag:
tasks:
for-loop-2:
componentRef:
name: comp-for-loop-2
iteratorPolicy:
parallelismLimit: 2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 5, 10, 25]'
taskInfo:
name: for-loop-2
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3
それではこちらをVertex AI Pipelines上で実行してみましょう。Cloud Storageにアップロードした後、以下の記事のように設定することでVertex AI Pipelines上で実行できます。
実行した結果は以下のようになります。
また、並列実行数は指定したとおり2になっており、同時に2つまでは実行されていることが確認できました。
そして、ログをみるとepochs=10の例ではprint_epoch
の結果としてepoch=10と表示できていることが確認できました。
並列ループの結果の集約について
先ほどの例では各itemごとに個別にコンポーネントを適用することはできますが、集約することができません。dsl.ParallelFor
の結果を集約して結果を取り扱うためにdsl.Collected
が提供されています。例えば以下のようにして4つの異なるepochの結果を集約して一つの文字列として出力する例を作ってみました。
from typing import List
from kfp import dsl, compiler
@dsl.component
def epoch_to_str(epoch: int) -> str:
return f"{epoch=}"
@dsl.component
def concat_str(items: List[str]) -> str:
return ",".join(items)
@dsl.pipeline
def pipeline() -> str:
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
epoch_to_str_result = epoch_to_str(epoch=epochs)
collected = dsl.Collected(epoch_to_str_result.output)
concat_result = concat_str(items=collected)
return concat_result.output
compiler.Compiler().compile(pipeline, "loop_collect.yaml")
epoch_to_str
コンポーネントを経由すると、最初はint型だったepochが文字列(epoch=xxx
の形式)に変換されます。また、concat_str
コンポーネントを経由すると、入力のリストの中のデータをコンマ区切りで全て結合した文字列を取得できます。
@dsl.component
def epoch_to_str(epoch: int) -> str:
return f"{epoch=}"
@dsl.component
def concat_str(items: List[str]) -> str:
return ",".join(items)
最後に、パイプラインでdsl.Collected
を利用します。dsl.ParallelFor
のなかで利用したコンポーネントの結果をdsl.Collected
に受け渡すように記述することで、全てのアイテムの結果を一つのアイテムに集約できます。なお、アノテーションでtyping.List
を利用していますが、これはlist
で代用できないため、typing.List
をインポートして利用するようにしてください。dsl.Collected
で集約した結果をまずはconcat_str
に与えて文字列を結合し、その結果をパイプラインの最終結果として出力します。
@dsl.pipeline
def pipeline() -> str:
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
epoch_to_str_result = epoch_to_str(epoch=epochs)
collected = dsl.Collected(epoch_to_str_result.output)
concat_result = concat_str(items=collected)
return concat_result.output
loop_collect.yaml
# PIPELINE DEFINITION
# Name: pipeline
# Outputs:
# Output: str
components:
comp-concat-str:
executorLabel: exec-concat-str
inputDefinitions:
parameters:
items:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-epoch-to-str:
executorLabel: exec-epoch-to-str
inputDefinitions:
parameters:
epoch:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--epoch-to-str-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: epoch-to-str
tasks:
epoch-to-str:
cachingOptions:
enableCache: true
componentRef:
name: comp-epoch-to-str
inputs:
parameters:
epoch:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: epoch-to-str
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--epoch-to-str-Output:
parameterType: LIST
deploymentSpec:
executors:
exec-concat-str:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- concat_str
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef concat_str(items: List[str]) -> str:\n return \",\".join(items)\n\
\n"
image: python:3.9
exec-epoch-to-str:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- epoch_to_str
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef epoch_to_str(epoch: int) -> str:\n return f\"{epoch=}\"\n\n"
image: python:3.9
pipelineInfo:
name: pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: concat-str
tasks:
concat-str:
cachingOptions:
enableCache: true
componentRef:
name: comp-concat-str
dependentTasks:
- for-loop-2
inputs:
parameters:
items:
taskOutputParameter:
outputParameterKey: pipelinechannel--epoch-to-str-Output
producerTask: for-loop-2
taskInfo:
name: concat-str
for-loop-2:
componentRef:
name: comp-for-loop-2
iteratorPolicy:
parallelismLimit: 2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 5, 10, 25]'
taskInfo:
name: for-loop-2
outputDefinitions:
parameters:
Output:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3
こちらのパイプラインを実行すると、以下のような画面が表示されます。結果を見ると、dsl.ParallelFor
の個別の結果がconcat-str
に対してリストとしてまとまって与えられていることが確認できます。出力を見ると文字列でepoch=1,epoch=5,epoch=10,epoch=25
と言う出力になっており、所望の結果が得られていることが確認できます。
まとめ
今回はkfpで提供されているループ機能とその結果の集約方法をみてみました。この方法を利用することで並列でモデルを学習し、その結果を集約するといったことが可能になり、MLOpsの並列パイプラインの実装に一役買うのではないでしょうか?
Discussion