🌉

SageMaker Processing を使ってみる

2024/05/25に公開

はじめに

S3から比較的大きなサイズのcsvを読み込み、データ整形後、S3に保存するという処理を自動化させたいと考え、Amazon SageMaker Processingの定期実行をやってみました。

SageMaker Processing について

SageMaker Processingは、Amazonが用意しているコンテナや独自のコンテナ上でPythonコードを実行でき、処理が完了するとインスタンスが自動で停止されるサービスになっています。

必要なものは以下です。

  • コンテナイメージ
    今回は、既に用意されているPyTorchコンテナイメージを使用しました。Pythonバージョンは3.10系になります。
  • データ整形の処理を書いたPythonファイル
  • 上記を実行するためのシェルスクリプト
    自由度が上がりそうだったためこの実行方法でやってみました。
  • SageMaker Processingジョブ等を作成するPythonファイル
    具体的な処理のことは、SageMaker Processingジョブと呼ばれます。

なお、SageMaker Processingジョブを定期実行させるための流れとしては、SageMakerパイプラインを作成後、それにジョブを乗せて、パイプラインにスケジュールをアタッチするというものになります。つまり、Processingジョブを乗せたパイプラインを定期実行させることになります。

ファイルの構成

ローカルリポジトリの構成
batch/
└── src/
    └── batch/
        ├── algorithm/
        │    ├── __init__.py
        │    ├── preprocess_main.py
        │    ├── run-code.sh
        ├── __init__.py
        └── main.py

ローカルリポジトリを上記の通り構成した場合、algorithm配下のファイル(データ整形の処理を書いたPythonファイル等)をS3にアップロードしておく必要があります。

S3の構成
S3バケット名/
└── preprocess/
     ├── algorithm/
     │    ├── __init__.py
     │    ├── preprocess_main.py
     │    ├── run-code.sh
     ├── input/
     │    └── input.csv 
     └── output/
                    └── train.csv # ジョブ実行後に出力されるcsv

ソースコード

SageMaker Processingジョブの作成やSageMakerパイプラインの作成は、SageMaker Python SDKを使って行います。

  • データ整形の処理を書いたPythonファイル
preprocess_main.py
from pathlib import Path

import pandas as pd

LOCAL_INPUT_PATH = "/opt/ml/processing/input"
LOCAL_OUTPUT_PATH = "/opt/ml/processing/output/train"

# csvファイルの読み込み
input_csv_path = Path(
    LOCAL_INPUT_PATH,
    "input_csv",
    "input.csv",
)
df: pd.DataFrame = pd.read_csv(
    input_csv_path,
)

# 変換処理
# ...

# 変換したDataFrameをcsv形式で保存する
output_csv_path = Path(
    LOCAL_OUTPUT_PATH,
    "train.csv"
)
Path(LOCAL_OUTPUT_PATH).mkdir(exist_ok=True)
df_train.to_csv(output_csv_path, index=False)
  • 上記を実行するためのシェルスクリプト
run-code.sh
python /opt/ml/processing/input/code/src/preprocess_main.py
  • SageMaker Processingジョブ等を作成するPythonファイル
main.py
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.workflow.pipeline import (
    Pipeline,
    PipelineDefinitionConfig,
    PipelineSchedule,
)
from sagemaker.workflow.steps import ProcessingStep

role = "arn:aws:iam::<アカウントID>:<リージョン>:role/<ロール名>"

# コンテナイメージやインスタンスタイプ等を設定する
processor = PyTorchProcessor(
    framework_version="2.2.0",
    py_version="py310",
    # コンテナ起動時に以下でシェルスクリプトを実行できる
    command=["sh", "/opt/ml/processing/input/code/run-code.sh"],
    role=role,
    instance_type="ml.t3.xlarge",
    instance_count=1,
    env={"ENV_KEY": "ENV_VALUE"},    # 環境変数を使用できる
)

# Processingジョブを作成する
processing_step = ProcessingStep(
    name="my-preprocess-job",
    code="s3://<S3バケット名>/preprocess/algorithm/run-code.sh",
    processor=processor,
    inputs=[
        ProcessingInput(
            input_name="src",
            source="s3://<S3バケット名>/preprocess/algorithm",
            destination="/opt/ml/processing/input/code/src",
        ),
        ProcessingInput(
            input_name="input_csv",
            source="s3://<S3バケット名>/preprocess/input/input.csv",
            destination="/opt/ml/processing/input/input_csv",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="df_train_csv",
            source="/opt/ml/processing/output/train",
            destination="s3://<S3バケット名>/preprocess/output/train.csv",
        ),
    ],
)

# パイプラインを作成する
pipeline = Pipeline(
    name="my-pipline",
    steps=[
        processing_step,
    ],
    pipeline_definition_config=PipelineDefinitionConfig(
        use_custom_job_prefix=True,
    ),
)
pipeline.create(role_arn=role)["PipelineArn"]

# スケジュールを作成する(EventBridgeのスケジュールが作成される)
schedule = PipelineSchedule(
    name="schedule-my-pipline",
    # cron="分 時 日 月 曜日 年", UTCで表現する: JST=UTC+9
    # 日本時間で毎日4時に処理する
    cron="0 19 ? * * *",
)

# パイプラインにスケジュールを設定する
pipeline.put_triggers(
    triggers=[
        schedule,
    ],
    role_arn=role,
)

コンテナ内のディレクトリ構成は以下の通りになります。

コンテナ内のディレクトリ構成
opt/
└── ml/
     └── processing/
          ├── input/
          │    ├── input_csv
          │    │    └── input.csv
          │    └── code/
          │         ├── run-code.sh
          │         └── src/
          │              ├── __init__.py
          │              ├── preprocess_main.py
          │              └── run-code.sh
          └── output/
               └── train/
                    └── train.csv

"/opt/ml/processing/input"配下および"/opt/ml/processing/output"配下に各ファイルを置くことになり、これに則る必要があります。

実行

以下を実行すると、ジョブやパイプライン、EventBridgeのスケジュールが作成されます。

python src/batch/main.py

コンソール上で、SageMaker Studio > Pipelines > Pipeline For Get Previous Sites > Executions に"my-pipline"というパイプラインができます。

Createをクリック後、実行名や説明を記載して、モーダル内のCreateをクリックするとパイプラインを手動実行できます。

参考

NCDCエンジニアブログ

Discussion