AWS Step Functionsの使い方(Lambdaとの組み合わせ)
Step Functionsとは
Step Functionsを使うことでサーバーレスの分散アプリケーションを構築することができます。
たとえば
- MLのパイプラインを作成
- データの加工、前処理のパイプラインを作成
などが挙げられます。
データの加工に関しては、大量のデータを扱う際に便利だと思います。
Lambdaは実行最大時間が15分となっているので、Step Funtionsと組み合わせることで大量のデータの分散処理ができるうようになります。
ステートマシンとは
Step Funtions
でに行くとステートマシンというものがあります。
要するにワークフローのことで(公式にも書いてあります)、ここで処理したい一連の流れを定義することでパイプラインを作ることができます。
ステートメントマシンの作り方
では早速作成してみます。
- 文字列の出力
- Lambdaとの連携
- 並列処理
という流れで実践してみます。ステートマシンはGUIベースで作成できますが、JSONを使って定義していこうと思います!
1. 文字列の出力
まずはステートマシンを作成します。ステートマシンの作成
ボタンをクリックします。
色々と便利なテンプレがありますが、Blankを選択します。
初期画面はGUIでワークフローを組むような画面ですが、上にあるコード
をクリックします。
コードで以下を記載します。
{
"Comment": "A simple Step Functions state machine that returns input.",
"StartAt": "PassState",
"States": {
"PassState": {
"Type": "Pass",
"ResultPath": "$",
"End": true
}
}
}
このようなワークフローが出来上がると思います。
簡単に解説します。
基本構成
定義 | 説明 |
---|---|
Comment | このステートマシンの説明を記載 |
StartAt | 開始をどのステートにするかを定義、後続のStatesにあるものでないといけない(当然) |
States | 各ステートの詳細を記載、ワークフローの部品を定義するイメージ |
State
フィールド内
定義 | 説明 |
---|---|
Type | Task, Pass, Waitなどのステータスを定義するときに使用[1] |
ResultPath | Lambdaを使い出したときに必要になる、Lambda関数の出力を次の入力JSONのどこに埋め込むかを決めるもの[2] |
End | true なら実行処理が終了する |
Type
のそれぞれの機能の紹介は省きますが、Pass
は入力をそのまま返すものです。
実行
実行をして適当なJSONを入力すると、そのまま入力が返ってくると思います。これで完成です。
2. Lambdaとの連携
Lambdaの詳細は話しませんが、get_s3_bucket
というs3バケット一覧を取得するLambdaを作成します。(ロールなどの設定はお任せします)
簡単に言うと
- ConsoleでLambdaを選択
- 関数の作成を選択
- 名前と使用言語(今回はPython)とロールを決めて作成
- タイムアウトを10sくらいにしておく
- コードを書く
- Deployする
- Testしてみる
という流れかと思います。もし標準ライブラリ以外を使いたい場合は
- zipを使ってアップロード
- ECRにDocker Imageをpushし、Lambdaで使用
とかになると思います。(後者がおすすめ)
Lambda作成
import boto3
def lambda_handler(event, context):
s3 = boto3.client('s3')
# S3のバケット一覧を取得
try:
response = s3.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
return {
"statusCode": 200,
"body": buckets
}
except Exception as e:
return {
"statusCode": 500,
"body": str(e)
}
テストを実行して無事バケット一覧が返ってきました。
{
"statusCode": 200,
"body": [
"aaa",
"bbb"
]
}
ステートマシン作成
{
"Comment": "State machine to get S3 bucket list using Lambda",
"StartAt": "GetS3Buckets",
"States": {
"GetS3Buckets": {
"Type": "Task",
"Resource": "<Lambdaの関数ARN>",
"ResultPath": "$.buckets",
"End": true
}
}
}
以下のようなワークローができると思います。
⚠️ 同じステートマシンで実行するとLambdaの実行権限がない可能性があります。新しく作り直した方が早いですが、該当ロールにLambdaのポリシーを与えば実行できると思います。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "<Lambdaの関数ARN>"
}
]
}
実行
同じ要領で実行すれば、状態の出力にS3バケット一覧が出てくると思います。
3. 並列処理
あくまでも練習ということで、
- S3バケットを取得
- それぞれのバケット名をそのまま出力する
という処理を分散で作成してみましょう。Lambdaはもう作成しているのでステートマシンだけ作成します。
{
"Comment": "Retrieve and print S3 bucket names",
"StartAt": "GetS3Buckets",
"States": {
"GetS3Buckets": {
"Type": "Task",
"Resource": "<Lambdaの関数ARN>",
"ResultPath": "$.buckets",
"Next": "ForEachBucket"
},
"ForEachBucket": {
"Type": "Map",
"ItemsPath": "$.buckets.body",
"Iterator": {
"StartAt": "PrintBucketName",
"States": {
"PrintBucketName": {
"Type": "Pass",
"Parameters": {
"BucketName.$": "$"
},
"ResultPath": null,
"End": true
}
}
},
"End": true
}
}
}
少々解説
-
Map
はループ処理ですが、ついでにデフォルトで並列処理にしてくれます。 -
GetS3Buckets
での出力$.buckets
として格納して、後段のForEachBucket
でbodyキーを取り出しています -
BucketName
は$
としていますが、原則出力がそのまま入力になります - 一方で
$$.Map.Item.Value
というものがあり、個々のループの変数にアクセスできます
おわりに
簡単にStep Functionsのメモ書きでした。
もっと具体的なユースケースがあったら書いてみます。
Discussion