👂

AWS Step Functionsの使い方(Lambdaとの組み合わせ)

2024/11/20に公開

Step Functionsとは

Step Functionsを使うことでサーバーレスの分散アプリケーションを構築することができます。
たとえば

  • MLのパイプラインを作成
  • データの加工、前処理のパイプラインを作成

などが挙げられます。
データの加工に関しては、大量のデータを扱う際に便利だと思います。

Lambdaは実行最大時間が15分となっているので、Step Funtionsと組み合わせることで大量のデータの分散処理ができるうようになります。

ステートマシンとは

Step Funtionsでに行くとステートマシンというものがあります。

要するにワークフローのことで(公式にも書いてあります)、ここで処理したい一連の流れを定義することでパイプラインを作ることができます。

ステートメントマシンの作り方

では早速作成してみます。

  1. 文字列の出力
  2. Lambdaとの連携
  3. 並列処理

という流れで実践してみます。ステートマシンはGUIベースで作成できますが、JSONを使って定義していこうと思います!

1. 文字列の出力

まずはステートマシンを作成します。ステートマシンの作成ボタンをクリックします。

色々と便利なテンプレがありますが、Blankを選択します。

初期画面はGUIでワークフローを組むような画面ですが、上にあるコードをクリックします。

コードで以下を記載します。

{
  "Comment": "A simple Step Functions state machine that returns input.",
  "StartAt": "PassState",
  "States": {
    "PassState": {
      "Type": "Pass",
      "ResultPath": "$",
      "End": true
    }
  }
}

このようなワークフローが出来上がると思います。

簡単に解説します。

基本構成

定義 説明
Comment このステートマシンの説明を記載
StartAt 開始をどのステートにするかを定義、後続のStatesにあるものでないといけない(当然)
States 各ステートの詳細を記載、ワークフローの部品を定義するイメージ

Stateフィールド内

定義 説明
Type Task, Pass, Waitなどのステータスを定義するときに使用[1]
ResultPath Lambdaを使い出したときに必要になる、Lambda関数の出力を次の入力JSONのどこに埋め込むかを決めるもの[2]
End true なら実行処理が終了する

Typeのそれぞれの機能の紹介は省きますが、Passは入力をそのまま返すものです。

実行

実行をして適当なJSONを入力すると、そのまま入力が返ってくると思います。これで完成です。

2. Lambdaとの連携

Lambdaの詳細は話しませんが、get_s3_bucketというs3バケット一覧を取得するLambdaを作成します。(ロールなどの設定はお任せします)

簡単に言うと

  1. ConsoleでLambdaを選択
  2. 関数の作成を選択
  3. 名前と使用言語(今回はPython)とロールを決めて作成
  4. タイムアウトを10sくらいにしておく
  5. コードを書く
  6. Deployする
  7. Testしてみる

という流れかと思います。もし標準ライブラリ以外を使いたい場合は

  • zipを使ってアップロード
  • ECRにDocker Imageをpushし、Lambdaで使用

とかになると思います。(後者がおすすめ)

Lambda作成

import boto3

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    
    # S3のバケット一覧を取得
    try:
        response = s3.list_buckets()
        buckets = [bucket['Name'] for bucket in response['Buckets']]
        return {
            "statusCode": 200,
            "body": buckets
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": str(e)
        }

テストを実行して無事バケット一覧が返ってきました。

{
  "statusCode": 200,
  "body": [
    "aaa",
    "bbb"
  ]
}

ステートマシン作成

{
  "Comment": "State machine to get S3 bucket list using Lambda",
  "StartAt": "GetS3Buckets",
  "States": {
    "GetS3Buckets": {
      "Type": "Task",
      "Resource": "<Lambdaの関数ARN>",
      "ResultPath": "$.buckets",
      "End": true
    }
  }
}

以下のようなワークローができると思います。

⚠️ 同じステートマシンで実行するとLambdaの実行権限がない可能性があります。新しく作り直した方が早いですが、該当ロールにLambdaのポリシーを与えば実行できると思います。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "lambda:InvokeFunction",
      "Resource": "<Lambdaの関数ARN>"
    }
  ]
}

実行

同じ要領で実行すれば、状態の出力にS3バケット一覧が出てくると思います。

3. 並列処理

あくまでも練習ということで、

  1. S3バケットを取得
  2. それぞれのバケット名をそのまま出力する

という処理を分散で作成してみましょう。Lambdaはもう作成しているのでステートマシンだけ作成します。

{
  "Comment": "Retrieve and print S3 bucket names",
  "StartAt": "GetS3Buckets",
  "States": {
    "GetS3Buckets": {
      "Type": "Task",
      "Resource": "<Lambdaの関数ARN>",
      "ResultPath": "$.buckets",
      "Next": "ForEachBucket"
    },
    "ForEachBucket": {
      "Type": "Map",
      "ItemsPath": "$.buckets.body",
      "Iterator": {
        "StartAt": "PrintBucketName",
        "States": {
          "PrintBucketName": {
            "Type": "Pass",
            "Parameters": {
              "BucketName.$": "$"
            },
            "ResultPath": null,
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

少々解説

  • Mapはループ処理ですが、ついでにデフォルトで並列処理にしてくれます。
  • GetS3Bucketsでの出力$.bucketsとして格納して、後段のForEachBucketでbodyキーを取り出しています
  • BucketName$としていますが、原則出力がそのまま入力になります
  • 一方で$$.Map.Item.Valueというものがあり、個々のループの変数にアクセスできます

おわりに

簡単にStep Functionsのメモ書きでした。
もっと具体的なユースケースがあったら書いてみます。

脚注
  1. GUIだと以下に該当
    ↩︎

  2. $は指定しない場合と同じ挙動になる(公式に解説あり) ↩︎

Discussion