TFX Partial Runについて
TFX Partial Run
TFXの1.5.0からPartial Runの機能が追加されたもののあまりに情報がないので、とりあえず試してみました。
1.5.0のリリースノート にあるようにLocalDagRunner
や BeamDagRunner
でしか実行できないので、Vertex Pipelinesではまだ利用できません。
Vertex Pipelinesにはキャッシュ機能があるので、componentの識別子を設定するwith_id
を設定すればもしかしたら同じようなことができるかもしれませんが未検証です
tfxのpartial runはrunnerのrun
メソッドにtfx.orchestration.pipeline.RunOptions
を渡してあげることで実行されます。
runner = tfx.orchestration.LocalDagRunner()
# pipelineの一部を実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["start_component"],
to_nodes=["end_component"]))
どのコンポーネントを部分実行したいかをRunOptions
のfrom_nodes
とto_nodes
で指定します。
これらの引数はCollection[str]
を渡す必要があり、リストの要素となるstr
はcomponentのidになります。
componentのidは何も指定しなければComponentのクラス名, 任意の値を指定したい場合にはComponent().with_id(name)
で指定したnameになります。
(実際の値はcomponent.id
に格納されてます)
次のような簡単なパイプラインを例にいくつかの実行例について説明をします
パイプラインのある区間
パイプライン中のある区間について実行したい場合には、RunOptions
のfrom_nodes
とto_nodes
の両方の引数に値を渡してあげる必要があります。
runner = tfx.orchestration.LocalDagRunner()
# pipelineの一部を実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["start_component"],
to_nodes=["end_component"]))
この時from_nodes
とto_nodes
間のコンポーネントのパスは自動で解決されます。
from_nodes
とto_nodes
で指定したコンポーネントのパスが途切れている場合には、無視されます。
例えば以下のようにすると, independent Component
は何も実行されません
runner = tfx.orchestration.LocalDagRunner()
# pipelineの一部を実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["start_component"],
to_nodes=["end_component", "independent component"]))
# pipelineの一部を実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["start_component", "independent component"],
to_nodes=["end_component"]))
Component単体を実行したい場合には、from_nodes
とto_nodes
の両方に同名のComponentを指定してあげると実行することができます
runner = tfx.orchestration.LocalDagRunner()
# パイプライン中の独立したコンポーネントを実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["independent_component"],
to_nodes=["independent_component"]))
# 依存関係のあるパイプラインのコンポーネントを独立で実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["end_component"],
to_nodes=["end_component"]))
また、あるパイプライン中の一部のパスのコンポーネントとパスから独立したコンポーネントを実行したい場合には以下のように記述することで可能です
あるコンポーネントから後続コンポーネント全てを実行したい場合
from_nodes
に値を渡してあげ、to_nodes
に何も指定しないことで実行されます。
# 指定したコンポーネントを含めた全ての後続コンポーネントを実行
runner.run(pipeline=pipeline,
run_options=RunOptions(from_nodes=["start_component_name"]))
あるコンポーネントより先行するコンポーネントを実行したい場合
to_nodes
に値を渡してあげ、from_nodes
には何も指定しないことで実行が可能です
# 指定したコンポーネントより先行するコンポーネントを実行
runner.run(pipeline=pipeline,
run_options=RunOptions(to_nodes=["end_component"]))
注意
Partial Runでは何も指定しない場合はデフォルトだと前回に実行した時の結果を必要とします。そのため前に一度も実行されたことがないパイプラインについてはPartial Runの実行ができません。
前回実行以外のパイプライン実行についてのメタデータを参照したい場合には、RunOptions
の引数のbase_pipeline_run_id
を渡してあげて実行すればよいとコードのコメントには書かれています。
LocalDagRunnerの場合には、run実行時のdatetimeのisoformatになります(実装箇所)
実際に試してみたところ、現状の1.6.1だとbase_pipeline_run_id
を指定したり適当な文字列を指定しても前回の結果のArtifactが参照されて実行されていたため、無条件に前回実行時を取得しに行ってるような気がします。
値の確認のためにはML Metadataから取得する必要があり、以下のコードで実行可能です。
他にスマートなやり方があるのかもしれませんが、MLMDについてもprotobuf の取り扱いについてもあまり詳しくないので、愚直なやり方で取得しています。
import ml_metadata as mlmd
store = mlmd.MetadataStore(METADATA_CONNECTION_CONFIG)
contexts = store.get_contexts()
for context in store.get_contexts():
if context.type_id == 11:
print(context.name)
Discussion