SageMaker Processing を使ってみる
はじめに
S3から比較的大きなサイズのcsvを読み込み、データ整形後、S3に保存するという処理を自動化させたいと考え、Amazon SageMaker Processingの定期実行をやってみました。
SageMaker Processing について
SageMaker Processingは、Amazonが用意しているコンテナや独自のコンテナ上でPythonコードを実行でき、処理が完了するとインスタンスが自動で停止されるサービスになっています。
必要なものは以下です。
- コンテナイメージ
今回は、既に用意されているPyTorchコンテナイメージを使用しました。Pythonバージョンは3.10系になります。 - データ整形の処理を書いたPythonファイル
- 上記を実行するためのシェルスクリプト
自由度が上がりそうだったためこの実行方法でやってみました。 - SageMaker Processingジョブ等を作成するPythonファイル
具体的な処理のことは、SageMaker Processingジョブと呼ばれます。
なお、SageMaker Processingジョブを定期実行させるための流れとしては、SageMakerパイプラインを作成後、それにジョブを乗せて、パイプラインにスケジュールをアタッチするというものになります。つまり、Processingジョブを乗せたパイプラインを定期実行させることになります。
ファイルの構成
batch/
└── src/
└── batch/
├── algorithm/
│ ├── __init__.py
│ ├── preprocess_main.py
│ ├── run-code.sh
├── __init__.py
└── main.py
ローカルリポジトリを上記の通り構成した場合、algorithm配下のファイル(データ整形の処理を書いたPythonファイル等)をS3にアップロードしておく必要があります。
S3バケット名/
└── preprocess/
├── algorithm/
│ ├── __init__.py
│ ├── preprocess_main.py
│ ├── run-code.sh
├── input/
│ └── input.csv
└── output/
└── train.csv # ジョブ実行後に出力されるcsv
ソースコード
SageMaker Processingジョブの作成やSageMakerパイプラインの作成は、SageMaker Python SDKを使って行います。
- データ整形の処理を書いたPythonファイル
from pathlib import Path
import pandas as pd
LOCAL_INPUT_PATH = "/opt/ml/processing/input"
LOCAL_OUTPUT_PATH = "/opt/ml/processing/output/train"
# csvファイルの読み込み
input_csv_path = Path(
LOCAL_INPUT_PATH,
"input_csv",
"input.csv",
)
df: pd.DataFrame = pd.read_csv(
input_csv_path,
)
# 変換処理
# ...
# 変換したDataFrameをcsv形式で保存する
output_csv_path = Path(
LOCAL_OUTPUT_PATH,
"train.csv"
)
Path(LOCAL_OUTPUT_PATH).mkdir(exist_ok=True)
df_train.to_csv(output_csv_path, index=False)
- 上記を実行するためのシェルスクリプト
python /opt/ml/processing/input/code/src/preprocess_main.py
- SageMaker Processingジョブ等を作成するPythonファイル
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.workflow.pipeline import (
Pipeline,
PipelineDefinitionConfig,
PipelineSchedule,
)
from sagemaker.workflow.steps import ProcessingStep
role = "arn:aws:iam::<アカウントID>:<リージョン>:role/<ロール名>"
# コンテナイメージやインスタンスタイプ等を設定する
processor = PyTorchProcessor(
framework_version="2.2.0",
py_version="py310",
# コンテナ起動時に以下でシェルスクリプトを実行できる
command=["sh", "/opt/ml/processing/input/code/run-code.sh"],
role=role,
instance_type="ml.t3.xlarge",
instance_count=1,
env={"ENV_KEY": "ENV_VALUE"}, # 環境変数を使用できる
)
# Processingジョブを作成する
processing_step = ProcessingStep(
name="my-preprocess-job",
code="s3://<S3バケット名>/preprocess/algorithm/run-code.sh",
processor=processor,
inputs=[
ProcessingInput(
input_name="src",
source="s3://<S3バケット名>/preprocess/algorithm",
destination="/opt/ml/processing/input/code/src",
),
ProcessingInput(
input_name="input_csv",
source="s3://<S3バケット名>/preprocess/input/input.csv",
destination="/opt/ml/processing/input/input_csv",
),
],
outputs=[
ProcessingOutput(
output_name="df_train_csv",
source="/opt/ml/processing/output/train",
destination="s3://<S3バケット名>/preprocess/output/train.csv",
),
],
)
# パイプラインを作成する
pipeline = Pipeline(
name="my-pipline",
steps=[
processing_step,
],
pipeline_definition_config=PipelineDefinitionConfig(
use_custom_job_prefix=True,
),
)
pipeline.create(role_arn=role)["PipelineArn"]
# スケジュールを作成する(EventBridgeのスケジュールが作成される)
schedule = PipelineSchedule(
name="schedule-my-pipline",
# cron="分 時 日 月 曜日 年", UTCで表現する: JST=UTC+9
# 日本時間で毎日4時に処理する
cron="0 19 ? * * *",
)
# パイプラインにスケジュールを設定する
pipeline.put_triggers(
triggers=[
schedule,
],
role_arn=role,
)
コンテナ内のディレクトリ構成は以下の通りになります。
opt/
└── ml/
└── processing/
├── input/
│ ├── input_csv
│ │ └── input.csv
│ └── code/
│ ├── run-code.sh
│ └── src/
│ ├── __init__.py
│ ├── preprocess_main.py
│ └── run-code.sh
└── output/
└── train/
└── train.csv
"/opt/ml/processing/input"配下および"/opt/ml/processing/output"配下に各ファイルを置くことになり、これに則る必要があります。
実行
以下を実行すると、ジョブやパイプライン、EventBridgeのスケジュールが作成されます。
python src/batch/main.py
コンソール上で、SageMaker Studio > Pipelines > Pipeline For Get Previous Sites > Executions に"my-pipline"というパイプラインができます。
Createをクリック後、実行名や説明を記載して、モーダル内のCreateをクリックするとパイプラインを手動実行できます。
参考
NCDC株式会社( ncdc.co.jp/ )のエンジニアチームです。 募集中のエンジニアのポジションや、採用している技術スタックの紹介などはこちら( github.com/ncdcdev/recruitment )をご覧ください! ※エンジニア以外も記事を投稿することがあります
Discussion