🍣

自作Docker ImageからSageMaker推論エンドポイントをdeployしLambdaから呼ぶ方法

2021/12/11に公開

はじめに + 背景

アドベントカレンダー11日目!
仕事で、AI開発の基盤を作るということをやりました。
働いているところでは、いわゆるソフトウェアエンジニア(Webとかnativeアプリとか作るエンジニア)のチームといわゆるデータサイエンティスト(データ解析とかAI開発とかやる人)のチームが別で存在しています。

その体制が故、データサイエンティストが

  • 学習環境構築
  • AIモデルの開発
  • トレーニング
  • 推論APIサーバ構築

を全てやっていました。
大体のプロジェクトで学習用、推論用にEC2インスタンスを用意し、それぞれのインスタンスの中でFlaskサーバを手動で立てるみたいな手順でリリースと運用を繰り返していました。

また、サーバの管理もデータサイエンティストがやるのか、ソフトウェアエンジニアがやるのかというルールも明確に定まっていない状態でした。

そこで、AI開発を効率化すべく、AI開発環境を整えるということをやっていきました。

満たすべき要件

以下のような要件で動くことになりました。

  • 新規でAIを作る際に、手動でサーバを立てなくても良くする。
  • ある程度データサイエンティストとソフトウェアエンジニアで役割を分けて運用ができる。(チームが違うため)
  • 学習サーバのメンテナンスを不要にする
  • 推論サーバのデプロイを簡単にする(目安1コマンド)
  • 任意のpythonライブラリ(例えばpytorch-metric-learning)が使えること

オンプレGPU使い放題!!ってわけでは全然なかったので、AWSのSageMaker Studioを用いて仕組みを作ることにしました。
その前段階として、既存のEC2上でFlaskサーバが動いているAPIをSageMakerに移植するということを試みました。

やったことの概要

  • 学習モデル、トレーニング処理、推論サーバ起動処理が定義されたpythonスクリプトをSageMakerの仕様に合わせた再定義
  • 学習環境に必要なライブラリ等がインストールされたdocker imageの作成(とECRへのpush)
  • ECRイメージをもとにSageMaker学習インスタンスを作り、その中でトレーニング
  • SageMakerの推論エンドポイントの作成
  • lambdaから推論エンドポイントへのリクエスト

ざっくり、こんな感じで学習も推論サーバのデプロイも全部SageMakerで完結できるようになりました。

ちなみに、docker imageを必ず作らなくてはいけないわけではなく、PyTorchやTensorFlowのmodelをSageMaker標準の学習インスタンスで扱ってトレーニングすることは普通にできます。

この方法だと、NginxのようなWebサーバを用意する必要もなく、かなり簡単にデプロイできます。
新しくPyTorchモデル等でAIを作る時はこの方法でデプロイすることになると思います。

今回は

  • すでにFlaskサーバ+nginxの構成でAPIが存在していた(からコピペでいける)
  • SageMakerの標準学習インスタンスにないライブラリを手動でインストールする必要がある

という理由から、docker imageを自作し、それを用いて学習、推論を行う仕組みを採用しました。

構成

  • 開発はSageMakerインスタンスで行う。
  • githubでソースは管理する
  • 学習に使うインスタンスはECRにpushしたimageを元にEC2スポットインスタンスを立てる。
  • 教師データ等のinputは基本的にS3におき、学習する時に取りにいく。
  • モデル等のoutputは基本的にS3におく。
  • 推論サーバはSageMakerインスタンスからdeployし、endpointとして動く
  • lambdaが推論エンドポイントにリクエストを投げ、推論結果を取得する。
  • API GatewayとLambdaの連携によりAPIを外部に公開する。

AWS上の環境構築

domainを作る

大前提、SageMaker上であらゆる操作をするためのdomainが必要です。
terraformで定義したのは以下のような雰囲気。
execution_roleとの紐づきが必須です。

resource "aws_iam_role" "sm_execution_role" {
  name = "sm_execution_role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Sid    = ""
        Principal = {
          Service = "sagemaker.amazonaws.com"
        }
      },
    ]
  })
}

resource "aws_sagemaker_domain" "sm-domain" {
  domain_name = "sm-domain"
  auth_mode   = "IAM"
  vpc_id      = var.vpc-id
  subnet_ids  = [var.subnet-id]

  default_user_settings {
    execution_role = aws_iam_role.sm_execution_role.arn
  }
}

domain-userを作る

domainの中にuserを作ることができます。
これは開発者一人に1userで分けていけそう。

resource "aws_sagemaker_user_profile" "sm-user" {
  domain_id = aws_sagemaker_domain.sm-domain.id
  user_profile_name = "sm-user"
}

ECRを作る

学習環境、推論サーバを作る時に必要になるimageのpush先を作っておきます。

resource "aws_ecr_repository" "sm-ecr-repository" {
  name = "dst-ecr-images"
  image_tag_mutability = "MUTABLE"
  image_scanning_configuration {
    scan_on_push = true
  }
}

ロールに権限を紐付ける

実は

  • SageMakerからS3
  • SageMakerからECR
  • LambdaFunctionからSageMaker

へのアクセスを許可するために、policyを定義し、execution_roleに色々アタッチする必要がありました。
tfファイルが結構長いので省略!

SageMakerStudio上でNotebookを扱う

環境構築ができたらSageMakerStudio上でAppを起動できます。
Launcherからお馴染みのPython Notebookが開けます。
ここのファイルシステムにローカルのファイルをアップロードすることもできるし、github repositoryを扱うこともできます。

Python Notebookを開くと自動的に裏側でインスタンスが立ち上がりアタッチされます。
アタッチされるインスタンスのタイプは↓のように選べます。
なんと、GPUも使える!!

このSageMakerStudio上で扱うNotebookが開発環境になります。

AIモデル、トレーニング、推論をSageMakerの仕様に書き換える

本当は、超優秀な弊社のデータサイエンティストが作ったモデルをお見せしたいところでしたが、
流石にNGらしいので、定番の\bold「アヤメの花の花弁の長さ、幅からどの品種かをロジスティック回帰で推定する」という分類タスクを例に示します。

モデルを作る

最初にモデルを定義します。
今回は最急降下法で推定します。
ロジスティック回帰なので活性化関数

\phi(\bm{z}) = \dfrac{1}{1+\exp(-\bm{z})}

を使うところが味噌です。

この辺りの詳しいことは過去に記事にしたのでご覧ください。
https://qiita.com/Umibows/items/d7c9cc691b7bb2d9b77f

model.py

import numpy as np
# gradient descent 最急降下法
class LogisticRegressionGD(object):
  # eta: float # 学習率
  # n_iter: int # トレーニング回数
  # random_state: int # 乱数シード(初期化用)
  # w_ # (重み行列)
  # const_ # 誤差平方和コスト関数

  def __init__(self, eta=0.05, n_iter=100, random_state=1):
    self.eta = eta
    self.n_iter = n_iter
    self.random_state = random_state
    rgen = np.random.RandomState(self.random_state)
    self.w_ = rgen.normal(loc = 0.0, scale = 0.01, size = 3)

  #z(推定値)を計算する
  def net_input(self, x):
    return np.dot(x, self.w_[1:]) + self.w_[0]

  def activation(self, z):
    # シグモイド活性化関数
    return 1. / (1. + np.exp(-z))

  def fit(self, x, y):
    # x: n * 2次元ベクトル(n個の2次元ベクトル)
    # y: n個のラベル

    rgen = np.random.RandomState(self.random_state)
    # https://numpy.org/doc/1.16/reference/generated/numpy.random.RandomState.normal.html#numpy.random.RandomState.normal
    # loc 分布の平均
    # scale 標準偏差
    # w0*x0 + w1*x1 + w2*x2
    self.w_ = rgen.normal(loc = 0.0, scale = 0.01, size = 1 + x.shape[1])
    self.cost_ = []

    for i in range(self.n_iter):
      net_input = self.net_input(x)
      # outputは\phi(z)
      output = self.activation(net_input)
      errors = y - output

      self.w_[1:] += self.eta * x.T.dot(errors)
      self.w_[0] += self.eta * errors.sum()

      cost = -y.dot(np.log(output)) - ((1- y).dot(np.log(1 - output)))
      self.cost_.append(cost)

  def predict(self, x):
    return np.where(self.net_input(x) >= 0.0, 1, 0)
  
  def input_fn(self, params):
    return [params.get('petal_length'), params.get('petal_width')]
  
  def output_fn(self, category_index):
    return ['Iris-Setosa', 'Iris-Versicolour'][category_index]

クラスに定義された

  • fit: 学習の処理
  • input_fn: 前処理(イメージとしては推論エンドポイントへのリクエストをpredictorが読みやすくする)
  • predict: 学習済モデルを用いて推論処理をする
  • output_fn: 後処理(レスポンスを整形する)

が主要なメソッドになります。
推論ではpetal_length(花弁の長さ)とpetal_width(花弁の幅)をparameterで受け取って、
どの品種('Iris-Setosa', 'Iris-Versicolour')かを推定して、その品種を返すという処理を行います。

トレーニングの処理

train(実行ファイル)

#!/usr/bin/env python
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np
import pickle
from model import LogisticRegressionGD

if __name__ == '__main__':
  print('start training')
  # 学習データを読む
  iris = datasets.load_iris()
  # 2:3に花弁の長さ、幅のデータが入っている
  x = iris.data[:, [2, 3]]
  y = iris.target

  # 70%を教師データにする
  x_train, x_test, y_train, y_test = train_test_split(x, y ,test_size = 0.3, random_state = 1, stratify = y)
  x_train_01_subset = x_train[(y_train == 0) | (y_train == 1)]
  y_train_01_subset = y_train[(y_train == 0) | (y_train == 1)]

  x_test_01_subset = x_train[(y_train == 0) | (y_train == 1)]
  y_test_01_subset = y_train[(y_train == 0) | (y_train == 1)]

  # estimatorのインスタンス
  estimator = LogisticRegressionGD(eta = 0.05, n_iter = 1000, random_state = 1)

  # 学習
  estimator.fit(x_train_01_subset, y_train_01_subset)

  # 学習したモデルを保存する
  with open('/opt/ml/model/model.pth', mode='wb') as f:
    pickle.dump(estimator, f)
    

教師データをモデルに渡して、学習させたのちモデルを保存する処理です。
これをtrainという名前で実行ファイルで作る必要があります。

SageMakerでは後に出てくる

estimator.fit()

という処理で学習処理を行いますが、これはSageMakerで新たに学習用インスタンスをたて、そこで

docker run <containerId> train

というコマンドが呼ばれる仕様です。

そのため、trainという実行ファイルで学習処理を定義しておく必要があります。

推論サーバの起動

serve(実行ファイル)

#!/usr/bin/env python
import multiprocessing
import os
import signal
import subprocess
import sys

cpu_count = multiprocessing.cpu_count()

model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 60)
model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))

def sigterm_handler(nginx_pid, gunicorn_pid):
    try:
        os.kill(nginx_pid, signal.SIGQUIT)
    except OSError:
        pass
    try:
        os.kill(gunicorn_pid, signal.SIGTERM)
    except OSError:
        pass

    sys.exit(0)

def start_server():
    print('Starting the inference server with {} workers.'.format(model_server_workers))


    # link the log streams to stdout/err so they will be logged to the container logs
    subprocess.check_call(['ln', '-sf', '/dev/stdout', '/var/log/nginx/access.log'])
    subprocess.check_call(['ln', '-sf', '/dev/stderr', '/var/log/nginx/error.log'])

    nginx = subprocess.Popen(['nginx', '-c', '/opt/program/nginx.conf'])
    gunicorn = subprocess.Popen(['gunicorn',
                                 '--timeout', str(model_server_timeout),
                                 '-k', 'sync',
                                 '-b', 'unix:/tmp/gunicorn.sock',
                                 '-w', str(model_server_workers),
                                 'wsgi:app'])

    signal.signal(signal.SIGTERM, lambda a, b: sigterm_handler(nginx.pid, gunicorn.pid))

    # If either subprocess exits, so do we.
    pids = set([nginx.pid, gunicorn.pid])
    while True:
        pid, _ = os.wait()
        if pid in pids:
            break

    sigterm_handler(nginx.pid, gunicorn.pid)
    print('Inference server exiting')

# The main routine just invokes the start function.

if __name__ == '__main__':
    start_server()

参考
https://github.com/aws/amazon-sagemaker-examples/tree/master/advanced_functionality/scikit_bring_your_own/container

ここが一番めんどくさかったです。要はwebサーバでnginxを、AppサーバでFlaskを立ててます。

アプリケーションサーバ

#!/usr/bin/env python
from flask import Flask, request, jsonify, make_response
import pickle
from model import LogisticRegressionGD

app = Flask(__name__)

# 学習済modelの読み込み
with open('/opt/ml/model/model.pth', mode='rb') as f:
  model = pickle.load(f)

@app.route("/ping", methods=['GET'])
def ping():
  return { "status": 200 }

@app.route("/invocations", methods=['POST'])
def post():
  # 前処理
  params = jsonify(request.json)
  input = model.input_fn(request.json)
  
  # 推論
  _output = model.predict(input)

  # 後処理
  output = model.output_fn(_output)
  
  return { "catetory": output }

if __name__ == "__main__":
  app.debug = True
  app.run(host='0.0.0.0', debug=True, port=8080)

アプリケーションサーバはGET /pingというURLとPOST /invocationsというURLで待ち受けています。
先程のnginxの設定の詳細は省きましたが、8080portに来た時、アプリケーションサーバに転送する設定を行なっています。

SageMakerは後に説明する

estimator.deploy()

で内部的に

docker run <ImageUrl> serve

が動きます。trainと同様にserveが実行コマンドとして定義しておく必要があります。
serveファイルの処理が実行されWebサーバ、アプリケーションサーバが起動します。

この後、ヘルスチェックにGET /pingが叩かれます。
200が帰ればエンドポイントがIn Serviceになります。

推論のエンドポイントが呼ばれると内部的にはPOST /invocationsが呼ばれることになります。

参考
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html#your-algorithms-inference-algo-ping-requests

Dockerfile

FROM tensorflow/tensorflow:2.3.0
RUN apt-get update
RUN apt-get install -y git
RUN pip install --upgrade pip
RUN pip install sklearn 'boto3>1.0<2.0' 'sagemaker>2.0<3.0'
RUN pip install flask && pip install flask_restful
RUN apt-get install -y nginx
RUN apt-get install -y lsof
RUN pip install gunicorn
ENV PATH="/opt/program:${PATH}"
COPY . /opt/program
WORKDIR /opt/program

ここで、必要なライブラリ等任意のライブラリをインストールする手順を記述します。
大切なのが、

ENV PATH="/opt/program:${PATH}"

の部分でPATH通すのありきで、先程のtrain, serveがコマンドとして動きます。

Docker Imageの作成

Dockerfileができれば後はbuildしてECRへGO!

aws ecr get-login-password | docker login --username AWS --password-stdin https://${ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-ecr-images
docker build . -t demo -t ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${REPOSITORY_NAME}:demo --no-cache
docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${REPOSITORY_NAME}:demo

できた!

SageMakerStudioからの操作

ECRにimageがあがれば、後はSageMakerから呼ぶだけです。

学習

import sagemaker
sess = sagemaker.Session()
bucket = sagemaker.Session().default_bucket()
role = sagemaker.get_execution_role() ←domainがexecution_roleと紐づいているから取れる
bucket = sagemaker.Session().default_bucket()
prefix = 'sagemaker/logistic-regression'
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'data')

で、s3の保存先bucketを指定して、

以下のようにestimatorを定義できます。
この時、自作したDocker Imageを指定しています。

estimator = sagemaker.estimator.Estimator(
  image_uri = '${ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-ecr-images:demo',
  role = role,
  instance_count = 1,
  instance_type = 'ml.m5.large',
  max_run = 8640,
  sagemaker_session = sess,
  output_path = output_path
)

定義したら学習できます。
実行時に
docker run <ImageUrl> trainが動きます。

estimator.fit()
2021-12-10 20:27:44 Starting - Starting the training job...
2021-12-10 20:28:00 Starting - Launching requested ML instancesProfilerReport-1639168064: InProgress
......
2021-12-10 20:29:04 Starting - Preparing the instances for training......
2021-12-10 20:30:13 Downloading - Downloading input data...
2021-12-10 20:30:44 Training - Downloading the training image.....start training

2021-12-10 20:31:44 Uploading - Uploading generated training model
2021-12-10 20:31:44 Completed - Training job completed
Training seconds: 82
Billable seconds: 82

こんな感じのログが出ます。
ここですごいのが、トレーニングが終わったら学習インスタンスが勝手に閉じてくれます。
めっちゃ楽です。1コマンドで完結します!

学習が終わると学習インスタンスの/opt/ml/modelに学習済モデルが保存されます。
このpathはSageMakerの仕様です。
これは先程指定したS3bucketに自動的に保存されます。

推論

predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

このように、1コマンドで終わりです。
ここでは

docker run <ImageUrl> serve

が動きます。

このように、学習の場合も、デプロイの場合も、SageMaker上では1行で実行できるのでとてもシンプルになっています。

エンドポイントができたことはコンソール上からも確認できます。

Lambdaからのアクセス

import boto3
import json
client = boto3.client("sagemaker-runtime", region_name="us-east-1")
response = client.invoke_endpoint(
    EndpointName='sagemaker-ecr-images-2021-12-10-20-32-19-726',
    Body=json.dumps({"petal_length": 0.4, "petal_width": 1.4}),
    ContentType='application/json'
)

このようにboto3クライアントを用いてSageMakerのエンドポイントにリクエストを送ることができます。
ここでは花弁の長さ0.4, 幅1.4でリクエストしています。

お待ちかねのレスポンスは

_output = response['Body'].read()
output = json.loads(_output)
=> {'catetory': 'Iris-Setosa'}

となります。
つまり、花弁の長さ0.4, 幅1.4のアヤメの品種の推定結果はIris-Setosaでした。

lambdaで取得することができたので、後はAPI Gateway経由で外にも渡せるし、
煮るなり焼くなり好きにできます。

まとめ

大まかな手順

  1. お好きなDocker Imageを作る
  2. ECRにあげる
  3. モデル、training処理、predict処理をSageMakerの仕様に合わせて定義、実装する
    4. 学習する(estimator.fit())
    5. デプロイする(estimator.deploy())
    6. lambdaからよぶ

SageMakerの学習、推論インスタンスを採用するメリット

  • サーバメンテナンスがいらない
  • 長い時間の学習であっても、終わったら閉じてくれるから安心できる
  • 1コマンドで学習、デプロイできる。
  • 学習時間を効率化してくれて、その点はコスト削減につながる
  • インターフェイスだけ決めれば、AIを開発するデータサイエンティストと環境を準備するソフトウェアエンジニアの役割を明確にさせやすくなる。→我々にとっては結構いいポイント!

SageMakerの学習、推論インスタンスを採用するデメリット

  • SageMakerの仕様に合わせた実装が強制される
  • エラーが見にくい(CloudWatchで頑張って探すことになる)

最後に

総じて使いこなせればめちゃくちゃ効率化できるなーーと感じています。
DockerとかECRとかFlaskとかよくわからんっていうデータサイエンティストはいるし、
学習とか推論とかよくわからんていうソフトウェアエンジニアはいると思うので、その人同士が役割明確にして連携しながらプロダクト開発できるようなSageMaker結構いいじゃんって思いました。
だから、SageMakerの実装に制限がある仕様っていうのはそれなりに受け入れられました。

Discussion