🐰

Step Functionsの並列処理をCDKで構築する

2023/01/03に公開

re:Invent2022でやたら登場機会の多かったStep Functionsについて理解を深めたくて、休暇中にHello Worldレベルで少し触ってみました。

つくったもの

以下の条件で動くワークフローを構築しました。

  • first-stateのLambdaは10秒waitして次のstateに移行する。
  • second-stateの2つのLambdaは並列で動作し、それぞれ5秒、10秒waitする。両方の処理が終わると次のstateに移行する。
  • third-stateのLambadは10秒waitして次のstateに移行する。

そもそもStep Functionsを使うと何が嬉しいか?

Black Beltなど読んでみた感じ、以下の3つが大きなメリットのようです。

  • 小さな機能を持つコンポーネントを組み合わせることでアプリケーションを構築できる
    • ちなみに、re:Invent2022で「1つのLambda関数は1つの機能に留め、大きくしすぎない」、「20行〜50行のLambda関数を持つことはよくあることで、それはちょうど良いんだ」という印象に残る表現があった。これに通じる考え方。
  • 定義したワークフローは見やすい形で可視化できる、GUI上で編集できる。
  • 実行結果も可視化され、各ステップの状態も記録されるため、デバッグしやすい。

やったこと

ディレクトリ構成

最終的なディレクトリ構成は以下です(代表的なファイルのみ抽出)。

.
├── cdk
│   ├── bin
│   │   └── cdk.ts
│   ├── lib
│   │   ├── aws-stack.ts
│   │   ├── lambda.ts
│   │   └── step-functions.ts
├── hisa-test1-lambda
│   └── main.go
├── hisa-test2-lambda
│   └── main.go
├── hisa-test3-lambda
│   └── main.go
└── hisa-test4-lambda
    └── main.go

Lambda(Go)の作成

まずは、Step Functionsの各ステートとなるLambdaを記述していきます。
以下、代表として1関数分記述しますが、Sleep処理の箇所などを適宜変更した同様の関数を4つ用意します。

hisa-test1-lambda/main.go
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/aws-lambda-go/lambda"
)

type JsonResponse struct {
	Message string `json:"message"`
}

func HandleRequest(ctx context.Context) (JsonResponse, error) {
	fmt.Print("関数1開始時刻:" + time.Now().GoString())
	time.Sleep(time.Second * 10) // 10秒待つ
	fmt.Print("関数1終了時刻:" + time.Now().GoString())

	response := JsonResponse{
		Message: "func1",
	}
	return response, nil
}

func main() {
	lambda.Start(HandleRequest)
}

CDK

続いて、CDKを作成します。CDKをGoで書くこともできますが、こちらはTypeScriptで書いていきます。
Lambda、Step Functionsを作成する関数、それらを呼び出すコントローラを作成します。

Lambda

CDKのv2でGoの関数を作成するには、@aws-cdk/aws-lambda-go-alphaというライブラリを使います。@aws-cdk/aws-lambda-goはCDKのv1にしか対応していないのでご注意を。

cdk/lib/lambda.ts
import { Duration } from "aws-cdk-lib";
import { Construct } from "constructs";
import { GoFunction } from "@aws-cdk/aws-lambda-go-alpha";

export const createHisaTest1Lambda = (scope: Construct): GoFunction => {
  const createHisaTest1Lambda = new GoFunction(scope, "hisa-test1-lambda", {
    entry: "../hisa-test1-lambda",
    functionName: "hisa-test1-lambda",
    timeout: Duration.minutes(1),
  });

  return createHisaTest1Lambda;
};

export const createHisaTest2Lambda = (scope: Construct): GoFunction => {
  const createHisaTest2Lambda = new GoFunction(scope, "hisa-test2-lambda", {
    entry: "../hisa-test2-lambda",
    functionName: "hisa-test2-lambda",
    timeout: Duration.minutes(1),
  });

  return createHisaTest2Lambda;
};

export const createHisaTest3Lambda = (scope: Construct): GoFunction => {
  const createHisaTest3Lambda = new GoFunction(scope, "hisa-test3-lambda", {
    entry: "../hisa-test3-lambda",
    functionName: "hisa-test3-lambda",
    timeout: Duration.minutes(1),
  });

  return createHisaTest3Lambda;
};

export const createHisaTest4Lambda = (scope: Construct): GoFunction => {
  const createHisaTest4Lambda = new GoFunction(scope, "hisa-test4-lambda", {
    entry: "../hisa-test4-lambda",
    functionName: "hisa-test4-lambda",
    timeout: Duration.minutes(1),
  });

  return createHisaTest4Lambda;
};

Step Functions

Step Functionsの各ステートに、コントローラからの引数で渡ってくるLambda関数を割り当てていきます。
並列処理にはParallelを使います。
また、outputPath$.Payloadを指定することで、Lambda関数からリターンする値だけを、次のステートのインプットに渡すことができます(これを指定しなかったら、その他にもいろんな値がインプットとして渡される)。

cdk/lib/step-functions.ts
import { Construct } from "constructs";
import { StateMachine, Parallel } from "aws-cdk-lib/aws-stepfunctions";
import { LambdaInvoke } from "aws-cdk-lib/aws-stepfunctions-tasks";
import { GoFunction } from "@aws-cdk/aws-lambda-go-alpha";

export const createStateMachine = (
  scope: Construct,
  lambdaFunction1: GoFunction,
  lambdaFunction2: GoFunction,
  lambdaFunction3: GoFunction,
  lambdaFunction4: GoFunction
): StateMachine => {
  const stateMachine = new StateMachine(scope, "hisa-test-state-machine", {
    stateMachineName: "hisa-test-state-machine",
    definition: new LambdaInvoke(scope, "first-state", {
      lambdaFunction: lambdaFunction1,
      outputPath: "$.Payload",
    })
      .next(
        new Parallel(scope, "second-state").branch(
          new LambdaInvoke(scope, "second-first-state", {
            lambdaFunction: lambdaFunction2,
            outputPath: "$.Payload",
          }),
          new LambdaInvoke(scope, "second-second-state", {
            lambdaFunction: lambdaFunction3,
            outputPath: "$.Payload",
          })
        )
      )
      .next(
        new LambdaInvoke(scope, "third-state", {
          lambdaFunction: lambdaFunction4,
          outputPath: "$.Payload",
        })
      ),
  });
  return stateMachine;
};

コントローラ

最後に、Lambda、Step Functionsを実際に生成するコントローラを記述します。

cdk/lib/aws-stack.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
  createHisaTest1Lambda,
  createHisaTest2Lambda,
  createHisaTest3Lambda,
  createHisaTest4Lambda,
} from "./lambda";
import { createStateMachine } from "./step-functions";

export class GoTestAwsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const hisaTest1Lambda = createHisaTest1Lambda(this);
    const hisaTest2Lambda = createHisaTest2Lambda(this);
    const hisaTest3Lambda = createHisaTest3Lambda(this);
    const hisaTest4Lambda = createHisaTest4Lambda(this);
    createStateMachine(
      this,
      hisaTest1Lambda,
      hisaTest2Lambda,
      hisaTest3Lambda,
      hisaTest4Lambda
    );
  }
}

以上、参考になると嬉しいです。
Step Functionsを使うと、条件分岐やエラー処理もいい感じに書けるみたいなので(わかってないので抽象的)、試していきたいです。

Discussion