SageMaker: リアルタイムエンドポイントからBatch Transformへのソフトランディング
はじめに
阿河です。
この記事では、SageMakerノートブックを使用して機械学習のコーディングを行い、IRISデータセットを活用します。
同一のモデルを用いてリアルタイムエンドポイントとBatch Transformジョブの両方で推論を実施し、それぞれのアプローチにおける体験を共有します。Batch Transformについては、初見での理解が難しい点があるかもしれませんが、その入口としての役割を果たせれば幸いです。
目次
- 環境セットアップ
- 前準備(カスタムスクリプト)
- 前準備(トレーニングの実行まで)
- リアルタイムエンドポイントでの予測
- Batch Transformを使った予測
1. 環境セットアップ
事前準備として、SageMaker Studio環境を用意します。
作成したモデルをもとに、「リアルタイムエンドポイント」「Batch Transform」それぞれで推論を行います。
リアルタイム推論は、リアルタイム、インタラクティブ、低レイテンシが要求される推論ワークロードに最適です。
バッチ推論は、オフライン推論とも呼ばれ、オブザベーションのバッチに対してモデル予測を生成します。
大規模なデータセットや、モデル予測リクエストに対する即時応答が必要でない場合に適したオプションです。
2. 前準備(カスタムスクリプトの作成)
まずはトレーニングジョブ実行までを進めていきます。
SageMaker Studio上でノートブックを作成して、コードを書いていきます。
サンプルコードは上記にもあげていますので、別途参照ください。
※random_forest.py
# random_forest.py
import argparse
import os
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# SageMaker固有の引数
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
args = parser.parse_args()
# トレーニングデータを読み込み
train_data = pd.read_csv(os.path.join(args.train, 'iris_train.csv'), header=0)
X_train = train_data.drop('label', axis=1)
y_train = train_data['label']
# モデルをトレーニング
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# モデルを保存
joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))
def model_fn(model_dir):
"""モデルファイルをロードするための関数。
Args:
model_dir (str): モデルファイルが保存されているディレクトリのパス。
Returns:
model: ロードされたモデルオブジェクト。
"""
model = joblib.load(os.path.join(model_dir, "model.joblib"))
return model
ライブラリのインポート
まずは必要なライブラリをインポートします。
argparse: コマンドライン引数を解析するための標準ライブラリ。
RandomForestClassifier: sklearnのランダムフォレスト分類器。
ランダム フォレストは、データセットのさまざまなサブサンプルに多数のデシジョン ツリー分類器を適合させ、平均化を使用して予測精度を向上させ、過剰適合を制御するメタ推定器です。
accuracy_score: 分類の正解率を計算する関数。
マルチラベル分類では、この関数はサブセット精度を計算します。
joblib: Pythonオブジェクトの保存と読み込みを行うためのユーティリティ。
Joblib は、 Python で軽量のパイプライン処理を提供するツールのセットです。
パラメータを受け取る
スクリプトはargparseを使用して、SageMakerの実行環境から渡されるパラメータ(model-dirとtrain)を受け取る。
引数を指定しない場合のデフォルト値を指定しておく。
- model-dir: モデルの保存先(default=os.environ['SM_MODEL_DIR'])
- train: トレーニングデータの保存先(default=os.environ['SM_CHANNEL_TRAIN'])
トレーニングデータの読み込み
指定パスからトレーニングデータを受け取り、特徴量(X)とラベル(y)にデータを分割する。
モデルのトレーニング
RandomForestClassifierを使って、モデルのトレーニングを行います。
fit(X, y[, サンプルの重み])
トレーニング セット (X, y) から決定木を構築します。
random_stateを指定して、再現性を担保します。
モデルの保存
dump(値, ファイル名[, 圧縮, プロトコル, ...])
任意の Python オブジェクトを 1 つのファイルに永続化します。
model_fn内の処理
load(ファイル名[, mmap_mode])
joblib.dump で永続化されたファイルから Python オブジェクトを再構築します。
ロードされたモデルオブジェクトを返します。
SageMaker Scikit-learn モデルサーバーは、スクリプトで指定する必要がある model_fn 関数を呼び出すことによってモデルをロードします。
3. 前準備(トレーニングの実行まで)
※Studioノートブック
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
配列または行列をランダムなトレーニングとテストのサブセットに分割します。
アイリスデータセットをロードして返します。
iris データセットは、古典的で非常に簡単な多クラス分類データセットです。
# Irisデータセットのロード
iris = load_iris()
X = iris.data
y = iris.target
load_irisで、アイリスデータセットをロードします。
戻り値のオブジェクトには、dataとtargetのフィールドがあります。
X
array([[5.1, 3.5, 1.4, 0.2],
[4.9, 3. , 1.4, 0.2],
[4.7, 3.2, 1.3, 0.2],
[4.6, 3.1, 1.5, 0.2],
[5. , 3.6, 1.4, 0.2],
dataには特徴量情報が含まれます。
Irisデータセットの4つの特徴量は、がく片(Sepal)の長さと幅、花弁(Petal)の長さと幅です。
y
array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
targetは、各サンプルの花が属するアヤメの種類(クラス)を表します。
0,1,2は、それぞれIris Setosa、Iris Versicolor、Iris Virginicaを表します。
# データセットをトレーニングセットとテストセットに分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
- train_test_split関数を使用して、データセットをトレーニングセットとテストセットに分割します。
- テスト分割に含めるデータセットの割合
- random_stateにより、分割の再現性を担保しています。
# データフレームを作成
train_df = pd.DataFrame(X_train, columns=iris.feature_names)
train_df['label'] = y_train
test_df = pd.DataFrame(X_test, columns=iris.feature_names)
test_df['label'] = y_test
# CSVファイルとして保存
train_df.to_csv("iris_train.csv", index=False)
test_df.to_csv("iris_test.csv", index=False)
- トレーニングセットとテストセットをpandasのデータフレームに変換
- ラベル列の追加
- CSVファイルとして保存
これでS3等の外部サービスに連携できるようになりました。
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
# データをS3にアップロード
train_path = sagemaker_session.upload_data('iris_train.csv', key_prefix='iris/data')
test_path = sagemaker_session.upload_data('iris_test.csv', key_prefix='iris/data')
Sessionは、Amazon SageMaker API および必要なその他の AWS サービスとのやり取りを管理します。
このクラスは、S3 のトレーニングジョブ、エンドポイント、入力データセットなど、Amazon SageMaker が使用するエンティティやリソースを操作するための便利なメソッドを提供します。
get_execution_roleは、API の呼び出しに認証情報が使用されるロール ARN を返します。
最後にupload_dataを使って、トレーニングデータとテストデータをS3バケットにアップロードします。
次にトレーニングジョブの準備を行います。
sklearn_estimator = SKLearn(
entry_point='random_forest.py',
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
framework_version='0.23-1',
py_version='py3',
hyperparameters={}
)
# トレーニングジョブの実行
sklearn_estimator.fit({'train': train_path})
カスタム Scikit-learn コードのエンドツーエンドのトレーニングとデプロイメントを処理します。Scikit-learn 環境用の SKLearn Estimator を作成します。
SageMaker トレーニング ジョブ内で Scikit-learn スクリプトを実行します。マネージド Scikit-learn 環境は、提供されたentry_pointPython スクリプトで定義された関数を実行する Amazon が構築した Docker コンテナです。
パラメータは以下の通りです。
- entry_point: トレーニングへのエントリポイントとして実行されるPythonソースファイルのパス。事前にrandom_forest.pyを用意してください。パスの指定は環境に合わせてください。
- Framework_version: モデルトレーニングコード実行に使用されるScikit-learnのバージョン
- py_version: モデルトレーニングコードの実行に使用するPythonのバージョン。Noneが渡される場合は、image_uriを指定する必要があります。
- hyperparameters: トレーニングに使用されるハイパーパラメータ。ハイパーパラメータは、dict[str, str] として SageMaker のトレーニング コードにアクセスできるようになります。
トレーニングは、この Estimator で fit() を呼び出すことによって開始されます。
トレーニングが完了した後、deploy() を呼び出すと、ホストされた SageMaker エンドポイントが作成され、ホストされたモデルに対して推論を実行するために使用できる SKLearnPredictor インスタンスが返されます。
4. リアルタイム推論
predictor = sklearn_estimator.deploy(initial_instance_count=1, instance_type='ml.m5.large')
トレーニングが完了した後、deploy() を呼び出すと、ホストされた SageMaker エンドポイントが作成され、ホストされたモデルに対して推論を実行するために使用できる SKLearnPredictor インスタンスが返されます。
from sagemaker.predictor import Predictor
import json
# エンドポイント名を指定
endpoint_name = "xxxxxxxx"
predictor = Predictor(endpoint_name=endpoint_name)
test_samples = test_df.drop('label', axis=1).head().values
json_data = json.dumps(test_samples.tolist())
predictions = predictor.predict(json_data, initial_args={"ContentType": "application/json"})
print(predictions)
b'[1, 0, 2, 1, 1]'
サンプル(テストデータの最初の5行)を用意します。
テストデータをJSON形式でエンコードし、predictメソッドを使用して推論を行います。
ContentTypeをapplication/jsonに設定することで、JSON形式のデータをエンドポイントに送信します。
5. Batch Transform
test_data_no_label = test_df.drop('label', axis=1)
test_data_no_label.to_csv("iris_test_no_label.csv", index=False, header=False)
test_data_path = sagemaker_session.upload_data('iris_test_no_label.csv', key_prefix='iris/batch_input')
テストデータをCSVファイルとして保存して、S3にアップロードします。
transformer = sklearn_estimator.transformer(
instance_count=1,
instance_type='ml.m5.large',
output_path='s3://{}/iris/batch_output'.format(sagemaker_session.default_bucket())
)
transformer.transform(
data=test_data_path,
content_type='text/csv',
split_type='Line'
)
transformer.wait()
Transformer
トレーニング済モデルに基づいて、Transformerオブジェクトを作成し、バッチ変換ジョブを実行します。
S3にアップロードされたテストデータに対する予測を行い、指定されたS3パスに保存します。
パラメータは以下です。
- intance_count: 使用するEC2の数
- instance_type: 使用するEC2インスタンスのタイプ
- output_path: 変換結果を保存するためのS3の場所
Transform
新しい変換ジョブを開始します。
パラメータは以下です。
- data: S3の入力データの場所
- content_type: 入力データのMIMEタイプ
- split_type: 入力オブジェクトのレコード区切り文字
5分ほどで、Transformジョブが完了しました。
import boto3
import os
import boto3
s3_client = boto3.client('s3',region_name='ap-northeast-1')
bucket_name = sagemaker_session.default_bucket()
s3_prefix = transformer.output_path.replace(f"s3://{bucket_name}/", "")
s3_key = f"{s3_prefix}/iris_test_no_label.csv.out"
output_file = 'batch_transform_output.csv'
s3_client.download_file(Bucket=bucket_name, Key=s3_key, Filename=output_file)
predictions = pd.read_csv(output_file, header=None)
print(predictions.head())
0 1 2 3 4 5 6 7 8 9 ... 20 21 22 23 24 25 26 \
0 [1 0 2 1 1 0 1 2 1 1 ... 0 2 0 2 2 2 2
27 28 29
0 2 0 0]
[1 rows x 30 columns]
最後に予測結果が保存されたファイルをダウンロードし、中身を確認します。
おわりに
リアルエンドポイントとBatch Transformで、推論を行ってみました。
誰かの参考になれば幸いです。
Discussion