😃

【AWS】StepFunctionsで10連Lambda

2021/06/20に公開

StepFunctionsなら連結させたLambdaが管理しやすいと聞いて10回続けてLamdaをInvokeしてみました。

StepFunctions とは

StepFunctionsはAWSサービスを組み合わせ、複数のステップで構成されるアプリケーションを構築できるサービスです。
実行するサービスの条件分岐、タイムアウトなどをStepFunctionsで定義することができ、その結果それぞれのステップが管理しやすいものとなります。
それぞれのステップで用いるサービス自体にはロジックだけを任せ、サービス間を疎結合に保つことができそうですね。
作成するアプリケーションのワークフローはASL(Amazon States Language)と呼ばれるJSON形式の言語で定義し、定義されたワークフローはステートマシンと呼ばれます。
ちょっと調べた感じだとASLをJSON形式でゴリゴリ書くのは辛そうです。
CDKでリソースを作成してASLにあたる部分をプログラミング言語で表現するのが楽かもしれません。

やってみる

今回は10連Lambdaを実行するステートマシンをCDKを使って作成します
CDKのインストール、プロジェクトの初期化作業は割愛します。

今回作成したStackはこんな感じです。
TypeScriptで書きました。

export class StepFunctionsStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const table = new ddb.Table(this, 'Table', {
      partitionKey: {
        name: 'ID',
        type: ddb.AttributeType.STRING,
      },
    });

    const putItem = new tasks.DynamoPutItem(this, 'PutItem', {
      table: table,
      item: {
        "ID": tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.Payload.ID')),
        "Message": tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.Payload.message')),
      },
    })

    const lambaFunc = new lambda.Function(this, 'Lambda Func', {
      runtime: lambda.Runtime.GO_1_X,
      handler: 'main',
      code: lambda.Code.asset('./lib/lambda')
    });

    const definition = new tasks.LambdaInvoke(this, `Invoke Lamda 1`, {
      lambdaFunction: lambaFunc,
      payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
    }).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 2`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 3`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 4`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 5`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 6`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 7`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 8`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 9`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).
      next(new tasks.LambdaInvoke(this, `Invoke Lamda 10`, {
        lambdaFunction: lambaFunc,
        payload: sfn.TaskInput.fromJsonPathAt('$.Payload')
      })).next(putItem)

    new sfn.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: definition,
    })
  }
}

LambdaからLambdaへと値を渡し、最後にDynamoDBにPutするような構成になっています。

next()で連結している部分が、Lambdaが連なっている部分です。
もうちょっとスマートに書けそうですが、一意なIDを振らなければいけなかったことと、とにかく10連したかったことから今回は横着しています。

LambdaのソースコードはGoで書いています。
入力をそのまま出力としているだけのプログラムです。

package main

import (
	"fmt"

	"github.com/aws/aws-lambda-go/lambda"
)

// 型名はPayloadじゃなくても良い
type Payload struct {
	ID      string `json:"ID"`
	Message string `json:"message"`
}

func handler(p Payload) (Payload, error) {
	fmt.Printf(`"ID" is %v, "Message" is %v`, p.ID, p.Message)
	return p, nil
}

func main() {
	lambda.Start(decode)
}

Stackから作成されたASLも見てみましょう。

{
  "StartAt": "Invoke Lamda 1",
  "States": {
    "Invoke Lamda 1": {
      "Next": "Invoke Lamda 2",
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "OutputPath": "$.Payload",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:stepFunctions-LambdaFunc75E80FD3-Hci23LRut7Xp",
        "Payload.$": "$"
      }
    },
    "Invoke Lamda 2": {
      "Next": "Invoke Lamda 3",
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "OutputPath": "$.Payload",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:stepFunctions-LambdaFunc75E80FD3-Hci23LRut7Xp",
        "Payload.$": "$"
      }
    },
    .
    . 省略
    .
    "Invoke Lamda 9": {
      "Next": "Invoke Lamda 10",
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "OutputPath": "$.Payload",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:stepFunctions-LambdaFunc75E80FD3-Hci23LRut7Xp",
        "Payload.$": "$"
      }
    },
    "Invoke Lamda 10": {
      "Next": "PutItem",
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "OutputPath": "$.Payload",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:stepFunctions-LambdaFunc75E80FD3-Hci23LRut7Xp",
        "Payload.$": "$"
      }
    },
    "PutItem": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "Item": {
          "ID": {
            "S.$": "$.Payload.ID"
          },
          "Message": {
            "S.$": "$.Payload.message"
          }
        },
        "TableName": "stepFunctions-TableCD117FA1-1DYZ1NY76FLS7"
      }
    }
  }
}

毎ステップ定義されているOutputPathParametersが重要なポイントです。
OutputPathでは次の関数(State)に渡す値を指定し、Parametersでは関数(State)で使用する値をJSON形式で指定しています。
この定義によって関数の結果を次の関数に引き渡すということが実現できているということです。
また、今回はCDKで自動生成されるデフォルトのままにしていますが、失敗時のリトライ回数なども指定できるのでエラーに備えた柔軟な設定も可能です。やはり便利なサービスですね。

最後にコンソールからテストのJSONを渡して実行してみます。
いよいよ10連Lambdaが始まります。

無事成功したようです。

DynamoDBにもしっかり書き込まれています。

さいごに

結構柔軟に様々な設定ができるサービスだということがわかりました。
今回はちょっと遊んでみただけですが、実際のプロダクトのバッチ処理なんかで使えそうな印象です。
ただただ趣味で技術を触ってみることが、業務でのプロダクトの貢献に繋がることもあるのではないかと思います。
これからも業務外では遊び心を持って多くの技術に触れるようにしていきたいなあと思います。

Discussion