Kubeflow Pipelines の構築メモ
Install Pipelines SDK
まず Google Colaboratory で試す
!pip install kfp --upgrade --quiet
import kfp
kfp.__version__
バージョンが出力できればOK
'1.8.12'
Kubeflow Pipelines SDK は以下のパッケージから構成されている
kfp.compiler
kfp.components
kfp.dsl
kfp.client
- Kubeflow Pipelines extension modules
- Kubeflow Pipelines diagnose_me modules
kfp.compiler
Python で書いたワークフローを YAML に変換する。
kfp.components
主なメソッド
kfp.components.func_to_container_op
関数を Pipeline のコンポーネントに変換し、ファクトリ関数を返す。
ファクトリ関数を呼び出すと、コンテナ内で元の関数を実行するタスク(ContainerOp)のインスタンスを構築できる。
kfp.components.load_component_from_file
kfp.components.func_to_container_op
と同様だが、ファイルからコンポーネントをロードできる。
kfp.components.load_component_from_url
同様に指定したURLからコンポーネントをロードする。
kfp.dsl
Pipeline、コンポーネントの定義や操作に使用する DSL
kfp.Client
Kubeflow Pipeline API のクライアントライブラリ。
Kubeflow上の Experiments, Run Pipelineなどを実行できる。
Component の定義
- Input / Output
- Implementation
- Container Image / command to execute
- Meta data (name, description...)
これらの定義を YAML または Python の関数で作っていく。
今回は Python function-based Component を使ってみる
Hello, Component
import kfp
import kfp.components as comp
def add(a: float, b: float) -> float:
return a + b
add_op = comp.create_component_from_func(add, output_component_file="add_component.yaml")
add_component.yaml
というファイルが出力される
name: Add
inputs:
- {name: a, type: Float}
- {name: b, type: Float}
outputs:
- {name: Output, type: Float}
implementation:
container:
image: python:3.7
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def add(a, b):
return a + b
def _serialize_float(float_value: float) -> str:
if isinstance(float_value, str):
return float_value
if not isinstance(float_value, (float, int)):
raise TypeError('Value "{}" has type "{}" instead of float.'.format(
str(float_value), str(type(float_value))))
return str(float_value)
import argparse
_parser = argparse.ArgumentParser(prog='Add', description='')
_parser.add_argument("--a", dest="a", type=float, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--b", dest="b", type=float, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = add(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_float,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --a
- {inputValue: a}
- --b
- {inputValue: b}
- '----output-paths'
- {outputPath: Output}
Pipeline を組み立てる
import kfp
import kfp.components as comp
import kfp.dsl as dsl
# Component となる関数を定義する
def add(a: float, b: float) -> float:
return a + b
# Pipeline を定義
@dsl.pipeline(
name="Addition pipeline",
description="An example pipeline that performs addition calculations."
)
def add_pipeline(a='1', b='7'):
first_add_task = add_op(a, 4)
second_add_task = add_op(first_add_task.output, b)
YAML (pipeline.yaml
)にコンパイル
kfp.compiler.Compiler().compile(
pipeline_func=add_pipeline,
package_path="pipeline.yaml"
)
pipeline.yaml
を Kubeflow の Pipelines
からアップロードする
Pipeline は DSL で以下のように定義した。
def add_pipeline(a='1', b='7'):
first_add_task = add_op(a, 4)
second_add_task = add_op(first_add_task.output, b)
Pipeline の起動パラメータに a
, b
を渡し
add_op( add(a: float, b: float) -> float
)に a
と 4
、その結果を再度 add_op
にb
を渡した結果を得るPipelineなので、以下のようなグラフになっている
適当な Experiment を作って、GUIから引数を渡して実行する
実行すると、コンテナが起動しタスクの終了と一緒にコンテナも終了している
Pipeline の実行結果を見てみる
起動パラメータとして a = 1, b = 7
が指定されていて
1回目の add_op
へ a = 1
を渡している。ここでは b = 4
が表示されていないが ML Metadata
タブを見ると書いてあった
2回目の add_op
へは 前のステップの出力と b = 7
を渡している