🔨

[VertexAI Pipeline]#3コンポーネントを作成する2つの方法

2022/05/08に公開

こんにちは! chameleonです!

ここまでで、コンポーネントで処理を記載して、パイプラインでそれをつなぎ合わせるといったKubeflowPipelineの基本的な流れがわかってきたかと思います。

今回は実際の処理を記載するコンポーネントの作成方法を2つ紹介します。

コンポーネントの作成方法

コンポーネントの作成方法は主に下記の2つがあります。

方法1.作成済みのコンテナイメージをコンポーネント化する方法
1つ目の方法は作成済みのコンテナイメージを指定したXMLファイルをkfp.components.load_component_from_fileで読込みコンポーネントを作成する方法です。

この方法を利用することでgithub等に公開されている作成済みのコンテナイメージを利用することができます。

方法2.関数をコンポーネント化する方法
2つ目の方法は関数をコンポーネント化する方法です。
通常のpython関数として処理を実装し、それをkfp.components.func_to_container_opを利用してコンポーネント作成する方法です。
引数としてコンテナイメージを指定できるため、コンテナイメージの処理を関数で上書きすることもできます。

それでは実際に実装して処理を確認していきましょう。

想定する読者

・KubeflowPipelineのコンポーネントの作成方法がわからない
・VertexAIのパイプラインでワークフローを構築したい

ゴール

方法1のXMLからコンテナイメージを読込みコンポーネントを作成する「hello-world」と、方法2の方法で作成する「add,add2,add3,show-result」の2つ処理を1つのパイプライン(ワークフロー)として構築します。

コンポーネントhello-worldはDockerImageのdocker/whalesayを利用して引数で与えたメッセージを出力するコンポーネントです。
もう一つの「add,add2,add3,show-result」のワークフローは、add、add2で加算した結果をさらにadd3で加算し、show-resultコンポーネントで結果を出力する処理となります。

構築の大まかな流れ

0.必要なライブラリの宣言
1.方法1.作成済みのコンテナイメージをコンポーネント化
2.方法2.関数をコンポーネント化する方法
3.パイプラインの作成(前回までと同様)
4.コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行(前回までと同様)
5.コンパイルしたパイプラインの実行(前回までと同様)

構築スタート!!!

0.必要なライブラリの宣言

import kfp
from kfp.v2.dsl import component #コンポーネント作成用
from kfp import dsl #パイプライン作成用
from kfp.v2 import compiler #コンパイラ用
import google.cloud.aiplatform as aip
from kfp.components import func_to_container_op #メソッドをコンポーネント化するライブラリ

1.方法1.作成済みのコンテナイメージをコンポーネント化

それでは1つめのコンポーネントをあらかじめ作成してあるdocker imageから作成してみましょう。
まずは、利用するDockerImageを指定するxmlファイルを作成します。

name: hello-world
metadata:
implementation:
  container:
    image: docker/whalesay:latest
    command: [cowsay]
    args: ["hello world"]

imageの部分で利用するDockerImageを指定します。
commandでこのDockerImage上で実行するコマンド「cowsay」を指定します。cowsayコマンドは引数で渡した文字列をメッセージとして出力するコマンドです。
argsでは上記のコマンドに渡すための引数を指定します。今回の場合は「hello world」と出力させたいのでこれを指定してます。

次に、このxmlファイルを読込みコンポーネント化する処理を記載します。

#手順1 方法1でのコンポーネントの作成
# コンテナイメージを指定したyamlファイルを読込みコンポーネント作成
# 事前にhelloworld.yamlを作成しクジラにメッセージを出力するdocker imageを指定しておく

#load_component_from_fileメソッドを利用してyamlファイルを読込む
hw_op = kfp.components.load_component_from_file(
    './helloworld.yaml'
)

これで、あらかじめ作成してDockerImageからコンポーネントを作成できました。

2.方法2.関数をコンポーネント化する方法

続いて、関数をコンポーネント化する方法です。
まずは、通常のpython関数と同じように関数を定義します。ここでは、下記の2つの関数を定義してます。
add関数:引数に与えられた2つの数値を加算する
show_result:引数に与えられた数値を「result is 引数」として出力する

次に、定義した2つの関数をkfp.components.func_to_container_opを利用してコンポーネント化します。こちらについては、@func_to_container_opとしてデコレータで実装してももちろんOKです。
今回はデコレータの処理が何をしているかを理解しやすいようにあえて、関数として呼び出して利用しています。

#手順2 方法2でのコンポーネントの作成
# func_to_container_opを利用して関数をコンポーネント化する
# 今回はデコレータの動きを実感するためにデコレータを利用せずに実装してみる

# 1つめの関数として足し算を行う関数を作成
def add(a: float, b: float) -> float:
    print("Adding two values %s and %s" %(a, b))
    return a + b

# 2つめの関数としてこれを表示する関数を作成
def show_rusult(result: float):
    print("rusult is {}".format(result))

# func_to_container_op関数を利用して2つの関数をコンポーネント化
# もちろん@func_to_container_opとしてデコレータで実装してもOK
add_op = kfp.components.func_to_container_op(func = add,
                                             base_image = "python:alpine")

show_rusult_op = kfp.components.func_to_container_op(func = show_rusult,
                                             base_image = "python:alpine")

3.パイプラインの作成

作成したコンポーネントを利用してパイプラインを作成します。
今回はパイプラインの引数としてa,b,cを定義し、それぞれに初期値として1,2,3を設定します。

まずは、手順1で作成したメッセージを出力するコンポーネントhw_opを実行します。
次に、手順2で作成した加算を行うコンポーネントを利用して、add_task_1、add_task_2、add_task_3を実行し、その結果をshow_result_taskで表示します。

ここでKubeflowPipelineの賢いところは、これらの処理順序を明示的に指定していなくても、正しく前提後続をつけて各タスクを実行してくれるようにワークフローを組んでくれるところです。
実行後のパイプラインUIからワークフローを確認してみてください。

#手順3 パイプラインの作成
@dsl.pipeline(name="tuto-pipeline3")
def tuto_pipeline(
        a:float =1,
        b:float =2,
        c:float =3,
):
    hw_op()
    add_task_1 = add_op(a, b)
    add_task_2 = add_op(b, c)
    add_task_3 = add_op(add_task_1.output,add_task_2.output)
    show_rusult_task = show_rusult_op(add_task_3.output)
    pass
    ```

## 4.コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行(前回までと同様)
前回までと同様にコンパイルを行います。
```python
# コンパイル
# 手順3:コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
compiler.Compiler().compile(
    pipeline_func = tuto_pipeline,
    package_path  = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v3.json",
)

5.コンパイルしたパイプラインの実行(前回までと同様)

そして、実行

# パイプラインの実行
# 手順4:コンパイルしたパイプラインの実行
aip.init(
    project="awesome-height-240603",
    location="us-central1"
)

aip.PipelineJob(
    display_name = "Pipeline基礎編",
    template_path = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v3.json",
    pipeline_root = "gs://vertex_chameleon_data"
).run()

ソース全体

ソースの全体は下記のような感じです。

import kfp
from kfp.v2.dsl import component #コンポーネント作成用
from kfp import dsl #パイプライン作成用
from kfp.v2 import compiler #コンパイラ用
import google.cloud.aiplatform as aip
from kfp.components import func_to_container_op #メソッドをコンポーネント化するライブラリ

#やりたいこと
#下記の2つのコンポーネント作成方法を試す
#方法1.作成済みのコンテナイメージをコンポーネント化する方法
#方法2.関数をコンポーネント化する方法


#手順1 方法1でのコンポーネントの作成
# コンテナイメージを指定したyamlファイルを読込みコンポーネント作成
# 事前にhelloworld.yamlを作成しクジラにメッセージを出力するdocker imageを指定しておく

#load_component_from_fileメソッドを利用してyamlファイルを読込む
hw_op = kfp.components.load_component_from_file(
    './helloworld.yaml'
)

#手順2 方法2でのコンポーネントの作成
# func_to_container_opを利用して関数をコンポーネント化する
# 今回はデコレータの動きを実感するためにデコレータを利用せずに実装してみる

# 1つめの関数として足し算を行う関数を作成
def add(a: float, b: float) -> float:
    print("Adding two values %s and %s" %(a, b))
    return a + b

# 2つめの関数としてこれを表示する関数を作成
def show_rusult(result: float):
    print("rusult is {}".format(result))

# func_to_container_op関数を利用して2つの関数をコンポーネント化
# もちろん@func_to_container_opとしてデコレータで実装してもOK
add_op = kfp.components.func_to_container_op(func = add,
                                             base_image = "python:alpine")

show_rusult_op = kfp.components.func_to_container_op(func = show_rusult,
                                             base_image = "python:alpine")

#手順3 パイプラインの作成
@dsl.pipeline(name="tuto-pipeline3")
def tuto_pipeline(
        a:float =1,
        b:float =2,
        c:float =3,
):
    hw_op()
    add_task_1 = add_op(a, b)
    add_task_2 = add_op(b, c)
    add_task_3 = add_op(add_task_1.output,add_task_2.output)
    show_rusult_task = show_rusult_op(add_task_3.output)
    pass

# コンパイル
# 手順3:コンパイルする対象のパイプラインと出力json名を指定してコンパイルを実行
compiler.Compiler().compile(
    pipeline_func = tuto_pipeline,
    package_path  = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v3.json",
)


# パイプラインの実行
# 手順4:コンパイルしたパイプラインの実行
aip.init(
    project="awesome-height-240603",
    location="us-central1"
)

aip.PipelineJob(
    display_name = "Pipeline基礎編",
    template_path = "/home/npprw160/Sandbox/kpf/tuto_pipeline_v3.json",
    pipeline_root = "gs://vertex_chameleon_data"
).run()
name: hello-world
metadata:
implementation:
  container:
    image: docker/whalesay:latest
    command: [cowsay]
    args: ["hello world"]

パイプラインの実行

パイプラインの実行はGCPのクラウドシェルから実行します。
GCPコンソールの右上にあるマークをクリックするとクラウドシェルが立ち上がります。

クラウドシェルで適当なディレクトリを作成し、そこに上記で作成したpythonファイルとxmlファイルを配置します。

ちなみに、ローカルのファイルをクラウドシェル上のパスにアップロードするには、クラウドシェルターミナルの右上にあるメニューの「アップロード」から実行することができます。

そして、pythonファイルの実行を行います。

 python pipeline_v3.py

結果確認

VertexAIのパイプライン画面に移動して結果を確認してみましょう。
上記で実行したパイプラインが表示されているはずです。

対象のパイプラインをクリックすると処理結果の詳細が確認できます。

右の方にhello-worldがあり、左側に加算を行い結果を表示するワークフローが記載されてます。
左側のワークフローは2つの加算結果が処理(add,add2)されたあとにその出力を加算する処理(add3)が配置されており、その後続にadd3の処理結果を表示するshow-resultコンポーネントが配置されてます。

ちゃんとKubeflowPipelineが処理順序を考慮してワークフローを作ってくれるんですね。
素晴らしい!!!

一応、処理結果も確認しましょう。
hello-worldの方はログにちゃんと「hello world」と出力されてますね。

show-resultの方も結果に「result is 8.0」と表示されてます!

バッチリですね!!

Discussion