😎

SageMaker Pipelineで学習からデプロイまでを自動化したい

2024/06/03に公開

会社のありがたーい行事があったので、開発合宿なる社内イベントに行って一夜漬けでSagemakerをたくさん触ってきました
実際にSagemMaker を触り、簡単なモデルでSageMaker Pipelineを用いて学習からデプロイまでのパイプラインを作成しました

Sagemakerとは

AWSにおける機械学習を行うためのサービスです。
GPUインスタンスでJupyter notebookをたてたり、機械学習モデルを載せたサーバの管理などができます。

MLOpsとは

機械学習オペレーション (MLOps) は、機械学習 (ML) の各種ワークフローを自動化および簡素化する一連のプラクティスです。

つまり、機械学習の学習からデプロイまでを楽にするための自動化を指します。
スケーリングや分析もMLOpsですが、今回は対象外です。

MLOpsについて詳しく

AWSによると、ビジネスにおけるMLOpsの成熟度は4段階あります

第一段階 研究が可能な段階
第二段階 デプロイが自由自在にできる段階
第三段階 モデルに関するテスト、モニタリングを行っている状態
第四段階 上記の段階を数百のモデルを自由自在に扱いつつ満たしている状態

今回はMLOpsの第二段階であり、MLOpsの根幹とも言える学習から自動デプロイに関する部分を自動化していきます。

作成した構成

はじめに、Jupiter notebookを起動し、簡単な学習および推論のコードを書きました。
その後、学習、モデル作成、推論エンドポイント作成を自動化しています。

最後に、簡単なWebサーバから推論と学習データの追加ができるようにしました

しなかったこと

Terraformなどを使った自動化は今回時間もなかったので行いませんでした
また、学習データは画像一枚のみで機械学習そのものに関する詳しい話をしません

Jupiter notebookを起動

AWSコンソールを使ってSageMaker studioから、Jupiter notebookを起動していきます、難しいことはないですがGPUインスタンスを指定すると、10分近く時間がかかることがあるので注意が必要です
特に断りがない場合今後の作業はこのJupiter notebook上で行います

モデル選定

今回はどのようなタスクを実行するかは重要でないので、ImageNetの1000クラス画像分類タスクを実行するMobileNetV2を使用します

推論

まずは、Tensorflowの学習データを使用して推論ができることを確認します。
最初は適当な猫の画像を持ってきて、S3に配置し推論を行いました。

import numpy as np
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.imagenet_utils import preprocess_input
from tensorflow.keras.applications.imagenet_utils import decode_predictions
from tensorflow.keras.preprocessing import image
from tensorflow.keras.optimizers import Adam
import boto3

def load_img(img_path):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('gassyuku2024510')
    # cat.webpがS3上のパス
    bucket.download_file("cat.webp", img_path)
    return img_path

def train():
    # ローカルの画像保存先
    img_path = './cat.webp'
    load_img(img_path)
    img = image.load_img(img_path, target_size=(224, 224))  # MobileNetV2のデフォルトの入力サイズにリサイズ
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    model=MobileNetV2(weights='imagenet',include_top=True)
    model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

    y_sample = np.zeros((1, 1000))

    # 出力を確認
    prediction = model.predict(x).argmax()
285

出力により、この猫の画像は285番のegyptian_cat だということがわかりました。

学習

学習のコードを書いていきます。
ここで、後から学習がしやすいようにバケットの構成は以下のようにしました。
今回の学習は形だけの学習なので、 全てのクラスに対応するディレクトリだけを用意し、中に.keepという空のファイルを作成しています。
その状態から先ほどの猫の画像のみをclass285に入れました
また、テストデータ、検証データは今回ありません。

-----  class0/
  |          |
  |          |---- .keep
  |
  |
  |--- class1/
  .
  .
  |--- class285/cat.webp
  .
  .  
  |--- class999/
import numpy as np
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.imagenet_utils import preprocess_input
from tensorflow.keras.applications.imagenet_utils import decode_predictions
from tensorflow.keras.preprocessing import image
from tensorflow.keras.optimizers import Adam
import boto3
import os


def download_imgs():
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('gassyuku2024510')
    image_objects = bucket.objects.all()
    os.system("mkdir -p train")
    for i in range(1000):
        os.system(f"mkdir -p train/class{i}")
    for image_object in image_objects:
        bucket.download_file(image_object.key, f"./train/{str(image_object.key)}")


def train():

    download_imgs()
    training_data = []
    training_label = []
    for i in range(1000):
        dir_name = f"train/class{i}"
        for img_path in os.listdir(dir_name):
            if img_path == ".keep":
                continue
            print(f"train/class{i}/{img_path}")
            img = image.load_img(f"train/class{i}/{img_path}", target_size=(224, 224))
            x = image.img_to_array(img)
            x = np.expand_dims(x, axis=0)
            training_data += x,
            y_sample = np.zeros((1, 1000))
            y_sample[0, i] = 1
            training_label += y_sample,
    training_data = np.array(training_data)
    training_label = np.array(training_label)

    print(training_data.shape)
    print(training_label.shape)

    training_data = preprocess_input(np.array(training_data))
    model = MobileNetV2(weights='imagenet', include_top=True)
    model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

    # モデルの訓練
    model.fit(x, y_sample, batch_size=1, epochs=3)

    # 訓練後の予測を確認
    final_prediction = model.predict(x).argmax()
    # h5形式ではなくtensorflowの形式で出力
    model.save('/opt/ml/model/1')

train()

h5形式で保存してはいけない、modelの保存パスが決まっているという部分に注意してください。

パイプラインの作成

合宿中は、推論エンドポイントの手作業での作成と動作確認を行いたくさんの時間を消費しましたが、ここでは完成品のSageMakerパイプラインの作成コードを示します。
パイプラインの作成をJupiter notebook上のpythonコードから行うのが個人的な驚きでした。

前準備

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline_context import PipelineSession

# SageMaker セッションとロールの取得
sagemaker_session = pipeline_session = PipelineSession()
role = sagemaker.get_execution_role()

Training Jobの定義

Training Jobを定義しています。Jupiter notebook上でtrain.pyを作成し、そのファイルを参照することでトレーニングスクリプトを指定しています。
トレーニングの結果のモデルファイルを特定のディレクトリに保存する必要があります
h5ファイルではなく、Tensorflowの形式で保存する必要がある点は注意です。
Pytorchの場合は変わってくると思いますが、今回は試していません。

# トレーニングジョブの設定
estimator = TensorFlow(
    entry_point='train.py',          # トレーニングスクリプト
    role=role,
    instance_count=1,                # トレーニングインスタンスの数
    instance_type='ml.m5.large',     # トレーニングインスタンスタイプ
    framework_version='2.3',         # TensorFlowのバージョン
    py_version='py37'                # Pythonのバージョン
)

Modelの定義

モデルの定義です

training_step = TrainingStep(
    name="TensorFlowTraining",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data='s3://gassyuku2024510/',
            content_type='x-image'
        )
    }
)

推論エンドポイント用Lambda作成

推論エンドポイントの作成のためにLambdaを使用します。
以下のようなコードで推論エンドポイントを作成します

import boto3
import uuid
def lambda_handler(event, context):
    # boto3 クライアントを初期化
    client = boto3.client('sagemaker')

    # イベントからモデル名とエンドポイント名を取得
    model_name = event['ModelName']

    endpoint_config_name="myconfig"+str(uuid.uuid1())
    initial_instance_count=1
    instance_type="ml.c4.xlarge"
    endpoint_name = event['EndpointName']  # 呼び出し元から渡される想定

   # エンドポイント設定の作成
    response = client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                'VariantName': 'AllTraffic',
                'ModelName': model_name,
                'InstanceType': instance_type,
                'InitialInstanceCount': initial_instance_count,
                'InitialVariantWeight': 1
            }
        ]
    )
    print(response)

    # エンドポイントの更新
    response = client.update_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=endpoint_config_name
    )

    return {
        'statusCode': 200,
        'body': response
    }

推論エンドポイントの定義

先ほど作成したLambdaを用いて推論エンドポイントを定義します

lambda_function = Lambda(
    function_arn="<自分が作成したlambdaのarn>",
    session=sagemaker_session
)
update_endpoint_step = LambdaStep(
    name="UpdateEndpoint",
    lambda_func=lambda_function,
    inputs={"ModelName": model_step.properties.ModelName, "EndpointName": "作成されたendpoint名"}
)

パイプラインの定義

これまで作ってきたStepを持つパイプラインを定義します。

pipeline = Pipeline(
    name="MyModelTrainingPipeline2",
    steps=[training_step, model_step, update_endpoint_step],
    sagemaker_session=sagemaker_session
)

# パイプラインの定義を作成または更新
pipeline.upsert(role_arn="各種アクセス権限を持ったroleのarn")

パイプラインの実行

作成したパイプラインを実行します。
このパイプラインはS3バケット上のデータから学習を行い、モデルの作成、デプロイまでを一気にやってくれます。
進捗状況はSagemakerStudio上でリアルタイムで確認できます。(SagemakerStudio上でパイプライン実行もできます!)

execution = pipeline.start()

作成結果確認

パイプラインを実行し、各作成物をAWSのコンソールから確認が可能です。

Training Job:SagemakerStudio上
Model:SagemakerStudio→Model → deployableモデル
推論エンドポイント:AWSコンソール→SagemakerStudio→推論エンドポイント
パイプライン:SagemakerStudio上

推論エンドポイントの実行

推論エンドポイントを叩く簡単なpython サーバを書きます。

from flask import Flask, request, jsonify
import numpy as np
import boto3
import numpy as np
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from PIL import Image
import tempfile
import json

app = Flask(__name__)

# SageMaker 推論エンドポイント名
ENDPOINT_NAME = '作成した推論エンドポイントの名前'


# boto3 を使って SageMaker ランタイムクライアントを作成
sagemaker_runtime = boto3.client('sagemaker-runtime', region_name='ap-northeast-1')

HTML_FORM = """
<!DOCTYPE html>
<html>
<head>
<title>Upload Image</title>
</head>
<body>
    <h1>Upload Image for Prediction</h1>
    <form method="post" action="/predict" enctype="multipart/form-data">
        <p><input type="file" name="file"></p>
        <p><input type="submit" value="Upload"></p>
    </form>
</body>
</html>
"""


@app.route('/upload', methods=['GET'])
def upload():
    # HTML フォームを返す
    return HTML_FORM


@app.route('/predict', methods=['POST'])
def predict():
    # 画像ファイルをリクエストから取得
    if 'file' not in request.files:
        return "No file part", 400
    file = request.files['file']
    if file.filename == '':
        return "No selected file", 400
    if file:
        temp_path = tempfile.NamedTemporaryFile(delete=False)
        file.save(temp_path.name)
        # 画像を読み込み、前処理
        img = image.load_img(temp_path.name, target_size=(224, 224))
        img_array = image.img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = preprocess_input(img_array)

        # SageMaker 推論エンドポイントを使用して予測を行う
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName=ENDPOINT_NAME,
            ContentType='application/json',
            Body=json.dumps(img_array.tolist())
        )
        result = response['Body'].read()
        l = json.loads(result.decode())
        l = np.array(l["predictions"][0])

        return str(np.argmax(l))
    return "OK"


if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True, port=8080)

動作確認

上記のpythonコードをローカルで実行し、http://localhost:8080/predict にアクセスします。
そこで適当な猫ちゃんの画像をアップロードすると推論された結果の番号だけが返ってきます。
上記のコードでは、クラスの番号を返すだけです、実際にはここで取得した番号を用いて色々な処理をしていく必要があるでしょう
合宿中は、加えてS3バケットにデータを追加できるフォーム等をRuby on Railsを用いて作成していましたが、今回は割愛します。

ここまでで何ができたのか

ここまで作成したものにより、WebサーバがS3にデータを追加、好きなタイミングでパイプラインを実行することで、追加データを含めて学習、モデル保存、デプロイまで自動で行うことができるようになりました。
パイプラインの実行をスケジューラから起動したLambdaにより定期実行することも簡単ですし、学習からデプロイまでの自動化ができたと言っていいでしょう。

終わりに

今回はMLOpsを体験するために、Sagemaker Pipelineを触ってみました。
実際に学習からデプロイまでの自動化は達成できたので満足です。

Fusic 技術ブログ

Discussion