Kaggle Titanic で AWS を使った ML パイプライン構築 2

6 min read読了の目安(約5700字

前回の記事で、AWS の Amazon SageMaker などを使い、ML パイプラインの学習や推論といったそれぞれのステップをジョブとして実行する方法を確認しました。今回は、それぞれのジョブを AWS のサーバーレスなワークフローエンジンである AWS Step Functions を用いてパイプラインにまとめる方法を見ていきます。特に、Python を用いてワークフローを記述できる AWS Step Functions Data Science SDK for Python を使うことで、簡単に ML パイプラインを構築することができます。

https://zenn.dev/tkazusa/articles/41b07be4b7c3ba

具体的な実装は、こちらのリポジトリにありますので、ご興味湧きましたら御覧ください。

AWS Step Functions を使った前処理、学習、推論の自動化

前回の記事で準備したようなそれぞれのジョブをパイプライン化する手順は下記のようになります。

  • ワークフローへ渡す入力パラメタを準備
  • 前処理、学習、推論といった各ステップの定義
  • 必要に応じて、条件分岐やエラーハンドリングのためのステップの準備
  • ワークフローとして各ステップを結合
  • AWS Step Functions 上へワークフローを構築
  • ワークフローの実行

では、もう少し詳細を見ていきましょう。

ワークフローへ渡す入力パラメタを準備

ワークフローの実行のタイミングで、入力を渡す事ができます。この渡す情報について、どのような情報なのかの定義を事前に準備します。

execution_input = ExecutionInput(
    schema={
    'ModelName': str,
    'TrainPreprocessingJobName': str,
    'TrainingJobName': str, 
    'TestPreprocessingJobName': str,
    'TransformJobName': str
    }
)

前処理、学習、推論といった各ステップの定義

データの前処理、学習、推論のステップをワークフローに組み込む方法は基本的には同じです。SageMaker で実行するジョブを定義して、Step Functions DS SDK で Step としてそのジョブ定義を活用します。例えば前処理ステップを定義する際は下記のようなイメージになります。ProcessingStep にジョブ定義や、実行するスクリプト、ジョブ実行時のコンテナと S3 でのデータを受け渡すパスなどといった情報が渡されています。

from stepfunctions.steps import ProcessingStep

processor = ScriptProcessor(
			base_job_name=job_name,
			image_uri=image_uri,
			command=['python3'],
			role=role,
			instance_count=1,
			instance_type='ml.c5.xlarge'
		)
				  
train_preprocess_step = ProcessingStep(
    'Preprocess for Training Step', 
    processor=processor,
    job_name=execution_input["TrainPreprocessingJobName"],
    inputs=[
        ProcessingInput(source=input_code, destination=processing_code_dir, input_name="code"),
        ProcessingInput(source=input_train, destination=processing_input_dir, input_name="train_data"),
    ],
    outputs=[ProcessingOutput(source=processing_output_dir,
    destination=output_s3_path_preprocess,
    output_name="processed_train_data")],
    container_arguments=[
                  '--data_type', 'train',
                  '--input_dir',processing_input_dir,
                  '--output_dir',processing_output_dir
                      ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocess.py"]
)

学習ステップの定義もほぼ同様ですね。

from sagemaker.sklearn.estimator import SKLearn
from stepfunctions.steps import TrainingStep

output_s3_path_train = output_s3_path + '/train'

sklearn = SKLearn(
    entry_point='scripts/train/train.py',
    framework_version="0.23-1",
    train_instance_type="ml.m5.xlarge",
    output_path=output_s3_path_train,
    role=role)

training_step = TrainingStep(
    'Train Step', 
    estimator=sklearn,
    data={'train': train_input},
    job_name=execution_input['TrainingJobName']  
)

推論や他の処理でも、これらと同様に任意の処理を AWS のどのサービス、機能を使ってジョブとして実行するのかを決め、Step としてまとめます。

必要に応じて、条件分岐やエラーハンドリングのためのステップの準備

ML パイプラインとしてまとめて自動化する場合、各ジョブが失敗した場合に特定の処理を走らせたり、実行状況に応じて、ワークフローを分岐させたりしたくなることもあります。Step Functions はこういった分岐や並列化なども定義できます。
例えば、ジョブが失敗した際に対応する Catch ステップは Data Science SDK だとこのように簡単に定義できます。

failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerProcessingJobFailed"
)

catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure,
)

train_preprocess_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)
test_preprocess_step.add_catch(catch_state_processing)
transform_step.add_catch(catch_state_processing)

ワークフローとして各ステップを結合

これまで作成したステップを結合します。この段階ではまだ AWS Step Functions 上へは反映されておらず、定義としてまとまっただけになります。workflow.render_graph() とするとワークフローを可視化することもできます。また AWS CloudFormation のテンプレートを出力することもできるので、Terraform でのデプロイなどにも流用できて便利です。

workflow_definition = steps.Chain([train_preprocess_step, training_step, model_step, test_preprocess_step, transform_step])

workflow = Workflow(
    name="titanic-ml-pipeline",
    definition=workflow_definition,
    role=workflow_execution_role,
)

ワークフローの実行

最後にワークフローを作成し、実行します。実行されている様子は AWS のコンソール上でも確認することができます。

workflow.create()

execution = workflow.execute(
    inputs={
        "ModelName": 'model-{}'.format(uuid.uuid1().hex),
        "TrainPreprocessingJobName": 'train-preprocess-job-{}'.format(uuid.uuid1().hex), 
        "TrainingJobName": 'training-job-{}'.format(uuid.uuid1().hex),
        "TestPreprocessingJobName": 'test-preprocess-job-{}'.format(uuid.uuid1().hex),
        "TransformJobName": 'transform-job-{}'.format(uuid.uuid1().hex),
    }
)

まとめ

AWS を使って ML パイプラインを構築する方法を見ていきました。必要な計算リソースを柔軟に活用し、かつ、それらの処理を半自動化するようなことが簡単に実装できます。特に機械学習エンジニアがモデル開発に注力したい場合、このようなマネージドだったりサーバーレスなサービスの活用は開発工数少なくすることができるので有用ではないでしょうか。ご興味あれば、詳細な実装もご確認下さい。

参考