Chapter 12

Lambda:S3トリガーでトレーニングジョブ発行

kazuhito
kazuhito
2021.12.23に更新
このチャプターの目次

概要

このチャプターではLambdaからトレーニングジョブを発行します。
LambdaはS3への動画格納をトリガーに起動します。

Lambda関数作成

Lambdaにて「関数の作成」をクリックします。

「一から作成」を選択し、任意の関数名を入力します。

ランタイムに「Python3.9」を選択し「関数の作成」をクリックします。

「トリガーを追加」をクリックします。

「S3」で検索します。

S3トリガーのバケット名に「S3:バケット作成」チャプターで作成したバケット名を指定します。
また「プレフィックス - オプション」に「input/」を指定して、
inputフォルダに置かれたファイルのみを処理対象にします。

「設定」から「アクセス権限」を表示します。

ロール名をクリックしIAMロール画面を表示します。

「ポリシーをアタッチします」をクリックします。


「AmazonS3ReadOnlyAccess」を選択し「ポリシーのアタッチ」をクリックします。

コードソース

以下のようなスクリプトを用意しました。
「PC:トレーニングジョブ発行」チャプターで使用した「create_training_job.py」とほぼ一緒ですが、S3トリガーのLambda用にいくつか修正しています。

import os
import json
from datetime import datetime

from boto3.session import Session

def lambda_handler(event, context):
    access_key = os.environ.get('ACCESS_KEY', 'XXXX')
    secret_access_key = os.environ.get('SECRET_ACCESS_KEY', 'XXXX')
        
    # SageMakerクライアント作成
    client = Session().client(
        "sagemaker",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_access_key,
        region_name="ap-northeast-1",
    )
    
    # トレーニングジョブ名
    training_job_name = "zenn-sample-" + datetime.now().strftime('%Y%m%d-%H%M%S')
    # 入力ファイルパス
    input_key = event['Records'][0]['s3']['object']['key']
    print(input_key)
    input_s3uri = "s3://zenn-sample/" + input_key
    # 出力パス
    output_s3uri = "s3://zenn-sample/output"
    
    # トレーニングパラメータ
    training_params = {
        # トレーニングのジョブ名
        "TrainingJobName": training_job_name,  
        # ハイパーパラメータ
        "HyperParameters": {
            'max_save_frames': '10',
        },
        # Dockerイメージ
        "AlgorithmSpecification": {
            'TrainingImage': "200822241028.dkr.ecr.ap-northeast-1.amazonaws.com/zenn_sample:latest",
            'TrainingInputMode': 'File'
        },
        # SageMakerにアタッチするロール
        "RoleArn": "arn:aws:iam::200822241028:role/zenn_sample", 
        # 入力チャネル
        "InputDataConfig": [
            # 動画格納パス
            {
                'ChannelName': 'movie',
                'DataSource': {
                    'S3DataSource': {
                        'S3DataType': 'S3Prefix',
                        'S3Uri': input_s3uri
                    }
                }
            }
        ],
        # 出力パス
        "OutputDataConfig": {
            'S3OutputPath': output_s3uri
        },
        # 学習インスタンス指定
        "ResourceConfig": {
            'InstanceType': 'ml.m4.xlarge',
            'InstanceCount': 1,
            'VolumeSizeInGB': 10
        },
        # タイムアウト時間指定
        "StoppingCondition": {
            'MaxRuntimeInSeconds': 60 * 60
        }
    }
    
    # トレーニングジョブ発行
    response = client.create_training_job(**training_params)
    print(response)

    return

上記のソースコードをlambda_functionに張り付けて「Deploy」をクリックしてください。

主な修正箇所は以下です。

def lambda_handler(event, context):

(当たり前ですが)処理はLambdaハンドラー内で実施しています。

    access_key = os.environ.get('ACCESS_KEY', 'XXXX')
    secret_access_key = os.environ.get('SECRET_ACCESS_KEY', 'XXXX')
        
    # SageMakerクライアント作成
    client = Session().client(
        "sagemaker",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_access_key,
        region_name="ap-northeast-1",
    )

アクセスキーとシークレットアクセスキーを指定してクライアントを作成するように修正しています。
アクセスキーとシークレットアクセスキーは「設定」の「環境変数」で用意しています。

    # 入力ファイルパス
    input_key = event['Records'][0]['s3']['object']['key']
    print(input_key)
    input_s3uri = "s3://zenn-sample/" + input_key

入力ファイルパスをS3トリガーイベントのキー(S3ファイル格納パス)を指定するように修正しています。

これでS3の指定パスに動画を格納したら、Lambda関数が起動し、SageMakerのトレーニングジョブが発行されます。