🦁

Amazon SageMakerを使用したRayベースの機械学習ワークフローのオーケストレーション

に公開

Translation: https://aws.amazon.com/jp/blogs/machine-learning/orchestrate-ray-based-machine-learning-workflows-using-amazon-sagemaker/

Raju RanganとSherry Dingによる2023年9月18日の投稿 Amazon SageMaker, ベストプラクティス, 中級(200), 技術的ハウツー

機械学習(ML)は、お客様がより複雑な課題を解決しようとするにつれて、ますます複雑になっています。この複雑さにより、多くの場合、単一のモデルを訓練するために複数のマシンを使用する分散MLの必要性が生じます。これにより、複数のノードにわたってタスクを並列化でき、トレーニング時間の短縮、スケーラビリティの向上、パフォーマンスの改善につながりますが、分散ハードウェアを効果的に使用するには大きな課題があります。データサイエンティストは、データの分割、負荷分散、障害耐性、スケーラビリティなどの課題に対処する必要があります。MLエンジニアは、並列化、スケジューリング、障害、再試行を手動で処理する必要があり、複雑なインフラストラクチャコードが必要です。

このポストでは、分散MLにおけるRayAmazon SageMakerの利点について説明し、これらのフレームワークを使用してスケーラブルなMLワークフローを構築およびデプロイする方法についてステップバイステップのガイドを提供します。

Rayはオープンソースの分散コンピューティングフレームワークで、MLモデルの分散トレーニングとサービングのための柔軟なフレームワークを提供します。データ前処理、分散トレーニング、ハイパーパラメータチューニング、強化学習、モデルサービングなどの一般的なMLタスク向けのシンプルでスケーラブルなライブラリを通じて、低レベルの分散システムの詳細を抽象化します。

SageMakerは、MLモデルの構築、トレーニング、デプロイのためのフルマネージドサービスです。RayはSageMakerの機能とシームレスに統合して、効率的で信頼性の高い複雑なMLワークロードを構築およびデプロイします。RayとSageMakerの組み合わせにより、スケーラブルなMLワークフローのためのエンドツーエンドの機能が提供され、以下のような特徴があります:

  • Rayの分散アクターと並列処理構造により、分散アプリケーションの開発が簡素化されます。
  • Ray AI Runtime(AIR)により、開発から本番環境への移行の摩擦が軽減されます。RayとAIRを使用すると、同じPythonコードがラップトップから大規模クラスターまでシームレスにスケールします。
  • SageMakerのマネージドインフラストラクチャと、処理ジョブ、トレーニングジョブ、ハイパーパラメータチューニングジョブなどの機能は、分散コンピューティングのためにRayライブラリを使用できます。
  • Amazon SageMaker Experimentsにより、迅速な反復とトライアルの追跡が可能になります。
  • Amazon SageMaker Feature Storeは、モデルトレーニングのためのML特徴量を保存、取得、共有するためのスケーラブルなリポジトリを提供します。
  • トレーニング済みモデルは、ガバナンスと管理のためにAmazon SageMaker Model Registryに保存、バージョン管理、追跡できます。
  • Amazon SageMaker Pipelinesにより、データ準備からトレーニング、モデルデプロイまでのエンドツーエンドのMLライフサイクルを自動化されたワークフローとしてオーケストレーションできます。

ソリューション概要

このポストでは、RayとSageMakerを一緒に使用する利点に焦点を当てています。SageMaker Pipelinesを使用してオーケストレーションされたエンドツーエンドのRayベースMLワークフローをセットアップします。このワークフローには、Rayアクターを使用したフィーチャーストアへのデータの並列取り込み、Ray Dataを使用したデータ前処理、Ray Trainとハイパーパラメータ最適化(HPO)チューニングジョブを使用したスケールでのモデルトレーニングとハイパーパラメータチューニング、そしてモデル評価とモデルレジストリへのモデル登録が含まれます。

データとしては、8つの特徴(YEAR_BUILTSQUARE_FEETNUM_BEDROOMNUM_BATHROOMSLOT_ACRESGARAGE_SPACESFRONT_PORCHDECK)を持つ合成住宅データセットを使用し、モデルは住宅のPRICEを予測します。

MLワークフローの各段階は、入力と出力パラメータを取る独自のスクリプトを持つ個別のステップに分割されています。次のセクションでは、各ステップからの主要なコードスニペットを紹介します。完全なコードはaws-samples-for-ray GitHubリポジトリで見つけることができます。

前提条件

SageMaker Python SDKを使用し、このポストに関連するコードを実行するには、以下の前提条件が必要です:

SageMaker Feature Storeへのデータ取り込み

MLワークフローの最初のステップは、CSV形式のAmazon Simple Storage Service(Amazon S3)からソースデータファイルを読み取り、SageMaker Feature Storeに取り込むことです。SageMaker Feature Storeは、チームがML特徴量を作成、共有、管理することを容易にする目的に特化したリポジトリです。特徴量の発見、再利用、共有を簡素化し、顧客チーム内での開発の高速化、コラボレーションの増加、コスト削減につながります。

フィーチャーストアへの特徴量の取り込みには、以下のステップが含まれます:

  1. フィーチャーグループを定義し、フィーチャーストアにフィーチャーグループを作成します。
  2. ソースデータをフィーチャーストア用に準備し、各データ行にイベント時間とレコードIDを追加します。
  3. Boto3 SDKを使用して、準備されたデータをフィーチャーグループに取り込みます。

このセクションでは、Rayを使用した取り込みタスクの並列処理を含むステップ3のみを強調します。このプロセスの完全なコードはGitHubリポジトリで確認できます。

ingest_featuresメソッドはFeaturestoreというクラス内で定義されています。Featurestoreクラスには@ray.remoteデコレータが付いていることに注意してください。これは、このクラスのインスタンスがRayアクター、つまりRay内の状態を持ち並行計算ユニットであることを示しています。これは、Ray クラスター内の異なるノードで実行されている複数のタスクによって同時にアクセスできる分散オブジェクトを作成できるプログラミングモデルです。アクターは、可変状態を管理およびカプセル化する方法を提供し、分散環境で複雑な状態を持つアプリケーションを構築する上で価値があります。アクターでリソース要件を指定することもできます。この場合、FeatureStoreクラスの各インスタンスには0.5 CPUが必要になります。以下のコードをご覧ください:

@ray.remote(num_cpus=0.5)
class Featurestore:
    def ingest_features(self,feature_group_name, df, region):
        """
        Ingest features to Feature Store Group
        Args:
            feature_group_name (str): Feature Group Name
            data_path (str): Path to the train/validation/test data in CSV format.
        """
        
        ...

remoteオペレータを呼び出すことでアクターと対話できます。以下のコードでは、アクターの希望数がスクリプトへの入力引数として渡されます。次に、データはアクターの数に基づいて分割され、フィーチャーストアに取り込むためにリモートの並列プロセスに渡されます。オブジェクト参照に対してgetを呼び出して、リモート計算が完了し結果が利用可能になるまで現在のタスクの実行をブロックできます。結果が利用可能になると、ray.getは結果を返し、現在のタスクの実行が続行されます。

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []

for actor, partition in zip(actors, input_partitions):
    results.append(actor.ingest_features.remote(
                        args.feature_group_name, 
                        partition, args.region
                      )
                )

ray.get(results)

トレーニング、検証、テスト用のデータ準備

このステップでは、Ray Datasetを使用して、機械学習の準備としてデータセットを効率的に分割、変換、スケーリングします。Ray Datasetは、様々なストレージシステムやファイル形式をサポートし、Rayにデータを読み込むための標準的な方法を提供します。並列変換、シャッフリング、グループ化、集約などの一般的なML前処理操作のためのAPIを持っています。Ray Datasetは、状態のあるセットアップとGPUアクセラレーションを必要とする操作も処理します。Spark、Pandas、NumPy、その他のデータ処理ライブラリ、およびTensorFlowやPyTorchなどのMLフレームワークとシームレスに統合され、Rayの上にエンドツーエンドのデータパイプラインとMLワークフローを構築できます。目標は、実践者と研究者のための分散データ処理とMLを容易にすることです。

このデータ前処理を実行するスクリプトのセクションを見てみましょう。まず、フィーチャーストアからデータをロードします:

def load_dataset(feature_group_name, region):
    """
    Loads the data as a ray dataset from the offline featurestore S3 location
    Args:
        feature_group_name (str): name of the feature group
    Returns:
        ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Drop columns added by the feature store
    # Since these are not related to the ML problem at hand
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} count is {ds.count()}")
    return ds

次に、ray.dataライブラリから利用可能な高レベルの抽象化を使用してデータを分割およびスケーリングします:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Split dataset into train, validation and test samples
    Args:
        dataset (ray.data.Dataset): input data
        train_size (float): ratio of data to use as training dataset
        val_size (float): ratio of data to use as validation dataset
        test_size (float): ratio of data to use as test dataset
        random_state (int): Pass an int for reproducible output across multiple function calls.
    Returns:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
    """
    # Shuffle this dataset with a fixed random seed.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Split the data into train, validation and test datasets
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Fit StandardScaler to train_set and apply it to val_set and test_set
    Args:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
        target_col (str): target col
    Returns:
        train_transformed (ray.data.Dataset): train data scaled
        val_transformed (ray.data.Dataset): val data scaled
        test_transformed (ray.data.Dataset): test data scaled
    """
    tranform_cols = dataset.columns()
    # Remove the target columns from being scaled
    tranform_cols.remove(target_col)
    # set up a standard scaler
    standard_scaler = StandardScaler(tranform_cols)
    # fit scaler to training dataset
    print("Fitting scaling to training data and transforming dataset...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # apply scaler to validation and test datasets
    print("Transforming validation and test datasets...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

処理されたトレーニング、検証、テストデータセットはAmazon S3に保存され、後続のステップへの入力パラメータとして渡されます。

モデルトレーニングとハイパーパラメータ最適化の実行

データが前処理され、モデリングの準備ができたので、MLモデルをトレーニングし、予測性能を最大化するためにハイパーパラメータを微調整する時が来ました。Rayに構築された分散バックエンドであるXGBoostのXGBoost-Rayを使用します。これにより、複数のノードとGPUを使用して大規模なデータセットでXGBoostモデルをトレーニングできます。XGBoostのトレーニングと予測APIのシンプルなドロップイン置き換えを提供し、分散データ管理とトレーニングの複雑さを裏で処理します。

複数のノードにわたるトレーニングの分散を可能にするために、RayHelperという名前のヘルパークラスを利用します。以下のコードに示すように、トレーニングジョブのリソース構成を使用し、最初のホストをヘッドノードとして選択します:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        ....
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

ホスト情報を使用して、トレーニングジョブインスタンスの各々でRayをどのように初期化するかを決定できます:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # If the current host is the host choosen as the head node
   # run `ray start` with specifying the --head flag making this is the head node
    if self.resource_config["current_host"] == self.head_host:
        output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        ray.init(address="auto", include_dashboard=False)
        self._wait_for_workers()
        print("All workers present and accounted for")
        print(ray.cluster_resources())

    else:
       # If the current host is not the head node, 
       # run `ray start` with specifying ip address as the head_host as the head node
        time.sleep(10)
        output = subprocess.run(['ray', 'start', 
        f"--address={head_ip}:{self.ray_port}", 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        sys.exit(0)

トレーニングジョブが開始されると、RayHelperのインスタンスでstart_ray()メソッドを呼び出すことでRayクラスターを初期化できます:

if __name__ == '__main__':
    ray_helper = RayHelper()
    ray_helper.start_ray()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.region))

次に、トレーニングにXGBoost-RayからXGBoostトレーナーを使用します:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
    """
    Creates a XGBoost trainer, train it, and return the result.        
    Args:
        ds_train (ray.data.dataset): Training dataset
        ds_val (ray.data.dataset): Validation dataset
        params (dict): Hyperparameters
        num_workers (int): number of workers to distribute the training across
        target_col (str): target column
    Returns:
        result (ray.air.result.Result): Result of the training job
    """
    
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    
    evals_result = {}
    
    trainer = train(
        params=params,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(val_set, "validation")],
        verbose_eval=False,
        num_boost_round=100,
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    )
    
    output_path=os.path.join(args.model_dir, 'model.xgb')
    
    trainer.save_model(output_path)
    
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
 
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    
    local_testing = False
    try:
        load_run(sagemaker_session=sess)
    except:
        local_testing = True
    if not local_testing: # Track experiment if using SageMaker Training
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

trainerをインスタンス化する際に、アクター数と各アクターあたりのCPU数を取るRayParamsを渡すことに注意してください。XGBoost-Rayはこの情報を使用して、Rayクラスターに接続されているすべてのノードにトレーニングを分散します。

次に、SageMaker Python SDKに基づいてXGBoostエスティメータオブジェクトを作成し、それをHPOジョブに使用します。

前述のステップをSageMaker Pipelinesを使用してオーケストレーションする

エンドツーエンドのスケーラブルで再利用可能なMLワークフローを構築するには、前述のステップをパイプラインにオーケストレーションするためのCI/CDツールを使用する必要があります。SageMaker Pipelinesは、SageMaker、SageMaker Python SDK、およびSageMaker Studioと直接統合されています。この統合により、使いやすいPython SDKを使用してMLワークフローを作成し、SageMaker Studioを使用してワークフローを視覚化および管理できます。また、パイプライン実行内のデータの履歴を追跡し、キャッシュ用のステップを指定することもできます。

SageMaker PipelinesはMLワークフローを構築するために必要なステップを含む有向非巡回グラフ(DAG)を作成します。各パイプラインは、ステップ間のデータ依存関係によってオーケストレーションされる相互接続されたステップのシリーズであり、パラメータ化することができ、パイプラインの各実行に入力変数をパラメータとして提供できます。SageMaker Pipelinesには、ParameterStringParameterIntegerParameterFloatParameterBooleanの4種類のパイプラインパラメータがあります。このセクションでは、いくつかの入力変数をパラメータ化し、ステップキャッシュ構成を設定します:

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
feature_group_name = ParameterString(
    name='FeatureGroupName',
    default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
    name='Bucket_Prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
    name='TrainSize',
    default_value="0.6"
)
val_size = ParameterString(
    name='ValidationSize',
    default_value="0.2"
)
test_size = ParameterString(
    name='TestSize',
    default_value="0.2"
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

SageMaker Feature Store取り込み用と、データ準備用の2つの処理ステップを定義します。これは、前述のステップと非常に似ているはずです。唯一の新しいコード行は、処理ジョブの構成を取得し、それをパイプラインステップとして含めることができる、ステップ定義後のProcessingStepです。さらに、データ準備ステップがSageMaker Feature Store取り込みステップに依存することを指定します。以下のコードをご覧ください:

feature_store_ingestion_step = ProcessingStep(
    name='FeatureStoreIngestion',
    step_args=fs_processor_args,
    cache_config=cache_config
)

preprocess_dataset_step = ProcessingStep(
    name='PreprocessData',
    step_args=processor_args,
    cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

同様に、モデルトレーニングとチューニングステップを構築するには、モデルトレーニングステップのコードの後にTuningStepの定義を追加して、SageMakerハイパーパラメータチューニングをパイプラインのステップとして実行できるようにする必要があります:

tuning_step = TuningStep(
    name="HPTuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

チューニングステップの後、最良のモデルをSageMaker Model Registryに登録することを選択します。モデル品質を制御するために、最良のモデルの目標メトリック(RMSE)をパイプラインの入力パラメータrmse_thresholdとして定義された閾値と比較する最小品質ゲートを実装します。この評価を行うために、評価スクリプトを実行する別の処理ステップを作成します。モデル評価結果はプロパティファイルとして保存されます。プロパティファイルは、他のステップがどのように実行されるべきかを決定するために処理ステップの結果を分析する際に特に役立ちます。以下のコードをご覧ください:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. 
# In this case, the top performing model is evaluated. 
evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluate/script.py',
    property_files=[evaluation_report],
)

パイプラインで最良のモデルをSageMaker Model Registryに登録するためのModelStepを定義します。最良のモデルが事前に決められた品質チェックに合格しない場合に備えて、エラーメッセージを出力するFailStepも追加で指定します:

register_step = ModelStep(
    name='RegisterTrainedModel',
    step_args=model_registry_args
)

metrics_fail_step = FailStep(
    name="RMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

次に、ConditionStepを使用して、パイプラインの次のステップとしてモデル登録ステップまたは失敗ステップのどちらを取るべきかを評価します。この場合、最良のモデルはそのRMSEスコアが閾値より低い場合に登録されます。

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.rmse.value',
    ),
    right=rmse_threshold,
)
condition_step = ConditionStep(
    name='CheckEvaluation',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[metrics_fail_step],
)

最後に、定義されたすべてのステップをパイプラインにオーケストレーションします:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
             feature_store_ingestion_step,
             preprocess_dataset_step,
             tuning_step,
             evaluation_step,
             condition_step
            ]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        feature_group_name,
        train_size,
        val_size,
        test_size,
        bucket_prefix,
        rmse_threshold
    ],
    steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

上記のパイプラインはSageMaker Studioで直接視覚化および実行することができます。または、execution = training_pipeline.start()を呼び出すことで実行できます。以下の図はパイプラインのフローを示しています。

SageMaker Pipeline DAG

さらに、パイプライン実行によって生成されたアーティファクトの系統を確認することができます。

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

モデルのデプロイ

パイプライン実行を通じて最良のモデルがSageMaker Model Registryに登録された後、SageMakerの完全に管理されたモデルデプロイ機能を使用して、モデルをリアルタイムエンドポイントにデプロイします。SageMakerには、さまざまなユースケースのニーズを満たす他のモデルデプロイオプションもあります。詳細については、ユースケースに適したオプションを選択する際に推論のためのモデルのデプロイを参照してください。まず、SageMaker Model Registryに登録されているモデルを取得しましょう:

xgb_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

モデルの現在のステータスはPendingApprovalです。デプロイ前にステータスをApprovedに設定する必要があります:

sagemaker_client.update_model_package(
    ModelPackageArn=xgb_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

xgb_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name
)

クリーンアップ

実験が終わったら、不要な料金を避けるためにリソースをクリーンアップすることを忘れないでください。クリーンアップするには、DeleteEndpointDeleteModelPackageGroupDeletePipelineDeleteFeatureGroupのAPIをそれぞれ呼び出してリアルタイムエンドポイント、モデルグループ、パイプライン、フィーチャーグループを削除し、すべてのSageMaker Studioノートブックインスタンスをシャットダウンします。

結論

このポストでは、RayベースのMLワークフローをオーケストレーションするためにSageMaker Pipelinesを使用する方法についてステップバイステップの説明を行いました。また、SageMaker PipelinesがサードパーティのMLツールと統合する能力も示しました。パフォーマンスの優位性と運用効率を確保するために、スケーラブルで安全な方法でRayワークロードをサポートするさまざまなAWSサービスがあります。今度はあなたの番です。これらの強力な機能を探索し、Amazon SageMaker PipelinesとRayを使用して機械学習ワークフローの最適化を始めましょう。今すぐ行動して、MLプロジェクトの可能性を最大限に引き出しましょう!


著者について

Raju RanganはAmazon Web Services(AWS)のシニアソリューションアーキテクトです。彼は政府支援団体と協力し、AWSを使用してAI/MLソリューションを構築するのを支援しています。クラウドソリューションをいじっていないときは、家族と過ごしたり、友人とバドミントンの活気あるゲームでシャトルを打ち合ったりしています。

Sherry DingはAmazon Web Services(AWS)のシニアAI/MLスペシャリストソリューションアーキテクトです。彼女はコンピュータサイエンスの博士号を持ち、機械学習に関する豊富な経験を持っています。主に公共部門の顧客とさまざまなAI/ML関連のビジネス課題に取り組み、AWSクラウドでの機械学習の旅を加速するのを支援しています。顧客を支援していないときは、アウトドア活動を楽しんでいます。

Discussion