Closed8

Kubeflow Pipelines の構築メモ

d3d3

Install Pipelines SDK

まず Google Colaboratory で試す

!pip install kfp --upgrade --quiet
import kfp
kfp.__version__

バージョンが出力できればOK

'1.8.12'
d3d3

Kubeflow Pipelines SDK は以下のパッケージから構成されている

kfp.compiler

Python で書いたワークフローを YAML に変換する。

kfp.components

主なメソッド

kfp.components.func_to_container_op

関数を Pipeline のコンポーネントに変換し、ファクトリ関数を返す。

ファクトリ関数を呼び出すと、コンテナ内で元の関数を実行するタスク(ContainerOp)のインスタンスを構築できる。

kfp.components.load_component_from_file

kfp.components.func_to_container_opと同様だが、ファイルからコンポーネントをロードできる。

kfp.components.load_component_from_url

同様に指定したURLからコンポーネントをロードする。

kfp.dsl

Pipeline、コンポーネントの定義や操作に使用する DSL

kfp.Client

Kubeflow Pipeline API のクライアントライブラリ。
Kubeflow上の Experiments, Run Pipelineなどを実行できる。

d3d3

Component の定義

  • Input / Output
  • Implementation
  • Container Image / command to execute
  • Meta data (name, description...)

これらの定義を YAML または Python の関数で作っていく。

今回は Python function-based Component を使ってみる

d3d3

Hello, Component

import kfp
import kfp.components as comp

def add(a: float, b: float) -> float:
  return a + b
add_op = comp.create_component_from_func(add, output_component_file="add_component.yaml")

add_component.yaml というファイルが出力される

name: Add
inputs:
- {name: a, type: Float}
- {name: b, type: Float}
outputs:
- {name: Output, type: Float}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def add(a, b):
        return a + b

      def _serialize_float(float_value: float) -> str:
          if isinstance(float_value, str):
              return float_value
          if not isinstance(float_value, (float, int)):
              raise TypeError('Value "{}" has type "{}" instead of float.'.format(
                  str(float_value), str(type(float_value))))
          return str(float_value)

      import argparse
      _parser = argparse.ArgumentParser(prog='Add', description='')
      _parser.add_argument("--a", dest="a", type=float, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("--b", dest="b", type=float, required=True, default=argparse.SUPPRESS)
      _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
      _parsed_args = vars(_parser.parse_args())
      _output_files = _parsed_args.pop("_output_paths", [])

      _outputs = add(**_parsed_args)

      _outputs = [_outputs]

      _output_serializers = [
          _serialize_float,

      ]

      import os
      for idx, output_file in enumerate(_output_files):
          try:
              os.makedirs(os.path.dirname(output_file))
          except OSError:
              pass
          with open(output_file, 'w') as f:
              f.write(_output_serializers[idx](_outputs[idx]))
    args:
    - --a
    - {inputValue: a}
    - --b
    - {inputValue: b}
    - '----output-paths'
    - {outputPath: Output}
d3d3

Pipeline を組み立てる

import kfp
import kfp.components as comp
import kfp.dsl as dsl

# Component となる関数を定義する
def add(a: float, b: float) -> float:
  return a + b

# Pipeline を定義
@dsl.pipeline(
    name="Addition pipeline",
    description="An example pipeline that performs addition calculations."
)
def add_pipeline(a='1', b='7'):
  first_add_task = add_op(a, 4)
  second_add_task = add_op(first_add_task.output, b)

YAML (pipeline.yaml)にコンパイル

kfp.compiler.Compiler().compile(
    pipeline_func=add_pipeline,
    package_path="pipeline.yaml"
)
d3d3

pipeline.yaml を Kubeflow の Pipelines からアップロードする

Pipeline は DSL で以下のように定義した。

def add_pipeline(a='1', b='7'):
  first_add_task = add_op(a, 4)
  second_add_task = add_op(first_add_task.output, b)

Pipeline の起動パラメータに a, b を渡し
add_op( add(a: float, b: float) -> float )に a4、その結果を再度 add_opb を渡した結果を得るPipelineなので、以下のようなグラフになっている

d3d3

適当な Experiment を作って、GUIから引数を渡して実行する

d3d3

実行すると、コンテナが起動しタスクの終了と一緒にコンテナも終了している

Pipeline の実行結果を見てみる


起動パラメータとして a = 1, b = 7 が指定されていて


1回目の add_op へ a = 1を渡している。ここでは b = 4 が表示されていないが ML Metadata タブを見ると書いてあった

2回目の add_op へは 前のステップの出力と b = 7を渡している

このスクラップは2022/06/14にクローズされました