🧠

Step Functions で作る機械学習パイプライン

2022/12/20に公開

はじめに

これはAI/ML on AWS Advent Calendar 2022 20 日目の記事です。AWS Step Functions を使って日本語の分散表現を学習するパイプラインを作ってみました。リポジトリはこちら

Step Functions で作るパイプライン

やりたいこと (SageMaker Notebook 編)

今回やりたいのは日本語の分散表現を学習することで、ノートブックであればこちらで公開されています。クラスメソッドのこちらのブログ で git clone する部分を git clone https://github.com/aws-samples/amazon-sagemaker-examples-jp.git に変更して、nlp_amazon_review/BlazingText/blazingtext.ipynb を開いて Jupyter Notebook を開けばコードを実行できます。上から1つづつセルを実行していけば分散表現の学習・感情分類モデルのデプロイ・推論エンドポイントに対してリクエストを投げるといったことができます。

Jypyter Notebook であればデータを色々な軸で可視化したりパラメータを変えてアドホックに色々学習したりするのは便利ですが、やることが固まってきて決められた処理を決められた順番で定期的に実行したくなると面倒になってきます。やりたいことが明確になった後は面倒なことは自動化してしまいましょう。というわけでこのノートブックで行っている処理を Step Functions を使って自動化していきます。

やりたいこと (Step Functions でパイプライン化編)


最初に今回作ったパイプラインのアーキテクチャ図です。日本語の文書データを入力に単語の分散表現を BlazingText アルゴリズムで学習し、学習済みのモデルを S3 にアップロードします。

S3 にオブジェクトがアップロードされたことをトリガーにパイプラインが起動します。実際のワークロードの場合例えば日次バッチでデータベースなどからデータを抽出してきて S3 にアップロードすればパイプラインを起動させることができます。

次に前処理を行います。特に日本語を対象にする場合は形態素解析やストップワードの除去が重要になってきます。この前処理はワークロード次第でしょうが今回のパイプラインでは BlazingText アルゴリズムの入力形式に合うよう変換をおこなっています。前処理済みのデータを S3 にアップロードし、このオブジェクトキーを次のステート( SageMaker での学習)に渡します。

最後に SageMaker のトレーニングジョブを実行して学習が終われば S3 に学習済みモデルをアップロードします。

今回のパイプラインでは学習済みモデルのアップロードまでをスコープにしています。後はユースケースに応じてこの学習済みモデルを読み込んで使うことができます。例えばこの後で学習済みモデルをコンテナイメージに同梱して NLP API をデプロイするといった応用が考えられるでしょう。

SageMaker トレーニングジョブを実行する場合の注意点

SageMaker トレーニングジョブを Step Functions からキックした場合ジョブの実行完了まで待機させることができ、Step Functions を待機させるにはリソース URI の後に .sync を追加する必要があります。ドキュメントはこちら

そして .sync を使用するには Step Functions にアタッチする IAM ロールに追加で権限を付与する必要があります。具体的には SageMaker の場合こちらのドキュメントに書かれているポリシーを追加する必要があります。

SageMaker トレーニングジョブを実行する

今回のパイプラインの肝は SageMaker でトレーニングジョブを作成する部分です。Jypyter Notebook だと以下のようなコードで学習を行うことができます。

import sagemaker
from sagemaker.inputs import TrainingInput
import boto3

sess = sagemaker.Session()

s3_train_data = sess.upload_data(path='train.csv', key_prefix='amazon-review-data')
s3_validation_data = sess.upload_data(path='validation.csv', key_prefix='amazon-review-data')

train_data = TrainingInput(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/plain', s3_data_type='S3Prefix')
validation_data = TrainingInput(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/plain', s3_data_type='S3Prefix')
data_channels = {'train': train_data, 'validation': validation_data}

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("blazingtext", region_name)

bt_model = sagemaker.estimator.Estimator(container,
                                         role=sagemaker.get_execution_role(),
                                         instance_count=1, 
                                         instance_type='ml.m4.xlarge',
                                         input_mode= 'File',
                                         sagemaker_session=sess)

bt_model.set_hyperparameters(mode="supervised",
                            epochs=10,
                            vector_dim=10,
                            early_stopping=True,
                            patience=4,
                            min_epochs=5)

bt_model.fit(inputs=data_channels, logs=True)

Step Functions からトレーニングジョブを実行する場合は model.fit() で学習している部分を SageMaker API の呼び出しに書き換える必要があります。

ドキュメントに例が載っているのでこれをもとに一部パラメータを変えていきます。リクエストの項目全てはこちらのドキュメントに載っているのでさらにカスタマイズが可能です(スポットインスタンスで学習するようにしてコストを抑えるなど)。

エラーハンドリング

前処理を行う Lambda がどこまで入力ファイルのチェックを行うかは要件次第でしょうが、今回は単純にアップロードされた csv ファイルに特定のカラムが存在するかとカラムの方が int64 かどうかだけは確認するようにしています。これらの条件が満たされないファイルがアップロードされた場合は Lambda が例外を投げてステートマシンで捕捉するようにしました。

また Lambda のサービスエラーが発生する場合を想定しベストプラクティスに従ってこちらも捕捉するようにしました。

ローカルでのテスト

去年書いたこの記事のようにローカルでできるだけ Lambda のテストを済ませてからデプロイするようにしています。VSCode をお使いの場合はこれまた去年のこの記事で紹介したように Dev Containers 拡張を使えばコンテナの中で VSCode を使うことができます。python3.8 -m pytest と打ってテストを実行し Lambda の動作確認を行うことができます。AWS API のモックには motoを使っています。

モデルの学習ができたら

今回のパイプラインはモデルの学習と学習済みモデルを S3 にアップロードするところまではやってくれます。出来上がったモデルは要件に応じて使うことができます。例えば学習したモデルは model.tar.gz にまとめられていてこれを解凍すると vectors.txtvectors.bin が得られるので、これを gensim で読み込んで使うことができます。API サーバーを構築していれば学習したモデルの S3 へのアップロードをトリガーに別のパイプラインを回してみても面白いかもしれません。

from gensim.models import KeyedVectors
model = KeyedVectors.load_word2vec_format("vectors.txt",binary=False)
# キーに日本が入っていれば
model["日本"]
# array([-4.5406  ,  0.36764 , -2.184   , -4.8779  ,  0.19177 ,  1.4029  ,
#         0.62336 , -0.066915, -0.41356 , -0.37246 ], dtype=float32)

おわりに

このように Step Functions を使えば機械学習パイプラインを作ることもできます。冬休みはこれで日本語モデルを学習していきましょう。

Discussion