👻

Kubeflow Pipelinesでループを取り扱う方法

に公開

今回はKubeflow Pipelinesでループを利用する方法について調べてみたので共有したいと思います。このほかにKubeflowでは条件分岐も提供されており、昨日の記事にて共有していますので興味がある方はぜひこちらもご覧ください。

https://zenn.dev/akasan/articles/3d8bad30009c85

早速やってみる

今回もKubeflowが提供している公式ドキュメントを元にしています。構成としては、ローカルでKubeflow Piprlines(以下、kfp)を用いてパイプラインを実装し、パイプラインファイルをCloud Storageにアップロードした上でVertex AI Pipelinesから起動します。前回と同じ方法になります。

https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/control-flow/#loops

並列ループの実行

dls.ParallelForを利用することで並列実行をさせることができます。コードは以下のようになります。

loop_parallel.py
from kfp import dsl, compiler


@dsl.component
def print_epoch(epoch: int) -> None:
    print(f"{epoch=}")


@dsl.pipeline
def pipeline():
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
        parallelism=2
    ) as epochs:
        print_epoch(epoch=epochs)

compiler.Compiler().compile(pipeline, "loop.yaml")

まずは呼び出されるコンポーネントとしてprint_epochを定義しています。@dsl.componentデコレータを利用しているため、kfp上のコンポーネントとして取り扱われます。このコンポーネントでは受け取ったepochと言う変数をただ表示する内容となっています。

@dsl.component
def print_epoch(epoch: int) -> None:
    print(f"{epoch=}")

次にパイプラインを実祖します。with dsl.ParallelForを利用することにより、指定したアイテムを並列ループのなかで扱うことができます。この例ではparallelism=2にしており、2つの並列プロセス上で取り扱われます。withで取得されるepochsにはitemsで指定された数値がそれぞれ格納されます。

@dsl.pipeline
def pipeline():
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
        parallelism=2
    ) as epochs:
        print_epoch(epoch=epochs)

最後にcompiler.Compile().compile関数にてパイプラインをコンパイルしてYAMLファイルを生成します。このYAMLファイルをVertex AI Pipelinesに受け渡して実行させます。

loop.yaml
# PIPELINE DEFINITION
# Name: pipeline
components:
  comp-for-loop-2:
    dag:
      tasks:
        print-epoch:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-print-epoch
          inputs:
            parameters:
              epoch:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: print-epoch
    inputDefinitions:
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: NUMBER_INTEGER
  comp-print-epoch:
    executorLabel: exec-print-epoch
    inputDefinitions:
      parameters:
        epoch:
          parameterType: NUMBER_INTEGER
deploymentSpec:
  executors:
    exec-print-epoch:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - print_epoch
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef print_epoch(epoch: int) -> None:\n    print(f\"{epoch=}\")\n\n"
        image: python:3.9
pipelineInfo:
  name: pipeline
root:
  dag:
    tasks:
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        iteratorPolicy:
          parallelismLimit: 2
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '[1, 5, 10, 25]'
        taskInfo:
          name: for-loop-2
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3

それではこちらをVertex AI Pipelines上で実行してみましょう。Cloud Storageにアップロードした後、以下の記事のように設定することでVertex AI Pipelines上で実行できます。

https://zenn.dev/akasan/articles/5697384428538b

実行した結果は以下のようになります。

また、並列実行数は指定したとおり2になっており、同時に2つまでは実行されていることが確認できました。

そして、ログをみるとepochs=10の例ではprint_epochの結果としてepoch=10と表示できていることが確認できました。

並列ループの結果の集約について

先ほどの例では各itemごとに個別にコンポーネントを適用することはできますが、集約することができません。dsl.ParallelForの結果を集約して結果を取り扱うためにdsl.Collectedが提供されています。例えば以下のようにして4つの異なるepochの結果を集約して一つの文字列として出力する例を作ってみました。

loop_collect.py
from typing import List
from kfp import dsl, compiler


@dsl.component
def epoch_to_str(epoch: int) -> str:
    return f"{epoch=}"


@dsl.component
def concat_str(items: List[str]) -> str:
    return ",".join(items)



@dsl.pipeline
def pipeline() -> str:
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
        parallelism=2
    ) as epochs:
        epoch_to_str_result = epoch_to_str(epoch=epochs)

    collected = dsl.Collected(epoch_to_str_result.output)
    concat_result = concat_str(items=collected)
    return concat_result.output


compiler.Compiler().compile(pipeline, "loop_collect.yaml")

epoch_to_strコンポーネントを経由すると、最初はint型だったepochが文字列(epoch=xxxの形式)に変換されます。また、concat_strコンポーネントを経由すると、入力のリストの中のデータをコンマ区切りで全て結合した文字列を取得できます。

@dsl.component
def epoch_to_str(epoch: int) -> str:
    return f"{epoch=}"


@dsl.component
def concat_str(items: List[str]) -> str:
    return ",".join(items)

最後に、パイプラインでdsl.Collectedを利用します。dsl.ParallelForのなかで利用したコンポーネントの結果をdsl.Collectedに受け渡すように記述することで、全てのアイテムの結果を一つのアイテムに集約できます。なお、アノテーションでtyping.Listを利用していますが、これはlistで代用できないため、typing.Listをインポートして利用するようにしてください。dsl.Collectedで集約した結果をまずはconcat_strに与えて文字列を結合し、その結果をパイプラインの最終結果として出力します。

@dsl.pipeline
def pipeline() -> str:
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
        parallelism=2
    ) as epochs:
        epoch_to_str_result = epoch_to_str(epoch=epochs)

    collected = dsl.Collected(epoch_to_str_result.output)
    concat_result = concat_str(items=collected)
    return concat_result.output
loop_collect.yaml
# PIPELINE DEFINITION
# Name: pipeline
# Outputs:
#    Output: str
components:
  comp-concat-str:
    executorLabel: exec-concat-str
    inputDefinitions:
      parameters:
        items:
          parameterType: LIST
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-epoch-to-str:
    executorLabel: exec-epoch-to-str
    inputDefinitions:
      parameters:
        epoch:
          parameterType: NUMBER_INTEGER
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-for-loop-2:
    dag:
      outputs:
        parameters:
          pipelinechannel--epoch-to-str-Output:
            valueFromParameter:
              outputParameterKey: Output
              producerSubtask: epoch-to-str
      tasks:
        epoch-to-str:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-epoch-to-str
          inputs:
            parameters:
              epoch:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: epoch-to-str
    inputDefinitions:
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: NUMBER_INTEGER
    outputDefinitions:
      parameters:
        pipelinechannel--epoch-to-str-Output:
          parameterType: LIST
deploymentSpec:
  executors:
    exec-concat-str:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - concat_str
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef concat_str(items: List[str]) -> str:\n    return \",\".join(items)\n\
          \n"
        image: python:3.9
    exec-epoch-to-str:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - epoch_to_str
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef epoch_to_str(epoch: int) -> str:\n    return f\"{epoch=}\"\n\n"
        image: python:3.9
pipelineInfo:
  name: pipeline
root:
  dag:
    outputs:
      parameters:
        Output:
          valueFromParameter:
            outputParameterKey: Output
            producerSubtask: concat-str
    tasks:
      concat-str:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-concat-str
        dependentTasks:
        - for-loop-2
        inputs:
          parameters:
            items:
              taskOutputParameter:
                outputParameterKey: pipelinechannel--epoch-to-str-Output
                producerTask: for-loop-2
        taskInfo:
          name: concat-str
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        iteratorPolicy:
          parallelismLimit: 2
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '[1, 5, 10, 25]'
        taskInfo:
          name: for-loop-2
  outputDefinitions:
    parameters:
      Output:
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3

こちらのパイプラインを実行すると、以下のような画面が表示されます。結果を見ると、dsl.ParallelForの個別の結果がconcat-strに対してリストとしてまとまって与えられていることが確認できます。出力を見ると文字列でepoch=1,epoch=5,epoch=10,epoch=25と言う出力になっており、所望の結果が得られていることが確認できます。

まとめ

今回はkfpで提供されているループ機能とその結果の集約方法をみてみました。この方法を利用することで並列でモデルを学習し、その結果を集約するといったことが可能になり、MLOpsの並列パイプラインの実装に一役買うのではないでしょうか?

Discussion