🚧

TFX Partial Runについて

2022/02/27に公開

TFX Partial Run

TFXの1.5.0からPartial Runの機能が追加されたもののあまりに情報がないので、とりあえず試してみました。

1.5.0のリリースノート にあるようにLocalDagRunnerBeamDagRunner でしか実行できないので、Vertex Pipelinesではまだ利用できません。

Vertex Pipelinesにはキャッシュ機能があるので、componentの識別子を設定するwith_id を設定すればもしかしたら同じようなことができるかもしれませんが未検証です

tfxのpartial runはrunnerのrun メソッドにtfx.orchestration.pipeline.RunOptions を渡してあげることで実行されます。

runner = tfx.orchestration.LocalDagRunner()

# pipelineの一部を実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["start_component"],
	to_nodes=["end_component"]))

どのコンポーネントを部分実行したいかをRunOptionsfrom_nodesto_nodes で指定します。

これらの引数はCollection[str] を渡す必要があり、リストの要素となるstr はcomponentのidになります。

componentのidは何も指定しなければComponentのクラス名, 任意の値を指定したい場合にはComponent().with_id(name) で指定したnameになります。
(実際の値はcomponent.id に格納されてます)

次のような簡単なパイプラインを例にいくつかの実行例について説明をします

パイプラインのある区間

パイプライン中のある区間について実行したい場合には、RunOptionsfrom_nodesto_nodesの両方の引数に値を渡してあげる必要があります。

runner = tfx.orchestration.LocalDagRunner()

# pipelineの一部を実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["start_component"],
	to_nodes=["end_component"]))

この時from_nodesto_nodes 間のコンポーネントのパスは自動で解決されます。

from_nodesto_nodes で指定したコンポーネントのパスが途切れている場合には、無視されます。

例えば以下のようにすると, independent Componentは何も実行されません

runner = tfx.orchestration.LocalDagRunner()

# pipelineの一部を実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["start_component"],
	to_nodes=["end_component", "independent component"]))


# pipelineの一部を実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["start_component", "independent component"],
	to_nodes=["end_component"]))

Component単体を実行したい場合には、from_nodesto_nodes の両方に同名のComponentを指定してあげると実行することができます

runner = tfx.orchestration.LocalDagRunner()


# パイプライン中の独立したコンポーネントを実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["independent_component"],
	to_nodes=["independent_component"]))


# 依存関係のあるパイプラインのコンポーネントを独立で実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["end_component"],
	to_nodes=["end_component"]))

また、あるパイプライン中の一部のパスのコンポーネントとパスから独立したコンポーネントを実行したい場合には以下のように記述することで可能です

あるコンポーネントから後続コンポーネント全てを実行したい場合

from_nodes に値を渡してあげ、to_nodes に何も指定しないことで実行されます。

# 指定したコンポーネントを含めた全ての後続コンポーネントを実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(from_nodes=["start_component_name"]))

あるコンポーネントより先行するコンポーネントを実行したい場合

to_nodesに値を渡してあげ、from_nodes には何も指定しないことで実行が可能です

# 指定したコンポーネントより先行するコンポーネントを実行
runner.run(pipeline=pipeline, 
	run_options=RunOptions(to_nodes=["end_component"]))

注意

Partial Runでは何も指定しない場合はデフォルトだと前回に実行した時の結果を必要とします。そのため前に一度も実行されたことがないパイプラインについてはPartial Runの実行ができません。

前回実行以外のパイプライン実行についてのメタデータを参照したい場合には、RunOptionsの引数のbase_pipeline_run_id を渡してあげて実行すればよいとコードのコメントには書かれています。

LocalDagRunnerの場合には、run実行時のdatetimeのisoformatになります(実装箇所)

実際に試してみたところ、現状の1.6.1だとbase_pipeline_run_id を指定したり適当な文字列を指定しても前回の結果のArtifactが参照されて実行されていたため、無条件に前回実行時を取得しに行ってるような気がします。

値の確認のためにはML Metadataから取得する必要があり、以下のコードで実行可能です。

他にスマートなやり方があるのかもしれませんが、MLMDについてもprotobuf の取り扱いについてもあまり詳しくないので、愚直なやり方で取得しています。

import ml_metadata as mlmd
store = mlmd.MetadataStore(METADATA_CONNECTION_CONFIG)
contexts = store.get_contexts()
for context in store.get_contexts():
  if context.type_id == 11:
    print(context.name)

Discussion