Step Functionsの並列処理をCDKで構築する
re:Invent2022でやたら登場機会の多かったStep Functionsについて理解を深めたくて、休暇中にHello Worldレベルで少し触ってみました。
つくったもの
以下の条件で動くワークフローを構築しました。
- first-stateのLambdaは10秒waitして次のstateに移行する。
- second-stateの2つのLambdaは並列で動作し、それぞれ5秒、10秒waitする。両方の処理が終わると次のstateに移行する。
- third-stateのLambadは10秒waitして次のstateに移行する。
そもそもStep Functionsを使うと何が嬉しいか?
Black Beltなど読んでみた感じ、以下の3つが大きなメリットのようです。
- 小さな機能を持つコンポーネントを組み合わせることでアプリケーションを構築できる
- ちなみに、re:Invent2022で「1つのLambda関数は1つの機能に留め、大きくしすぎない」、「20行〜50行のLambda関数を持つことはよくあることで、それはちょうど良いんだ」という印象に残る表現があった。これに通じる考え方。
- 定義したワークフローは見やすい形で可視化できる、GUI上で編集できる。
- 実行結果も可視化され、各ステップの状態も記録されるため、デバッグしやすい。
やったこと
ディレクトリ構成
最終的なディレクトリ構成は以下です(代表的なファイルのみ抽出)。
.
├── cdk
│ ├── bin
│ │ └── cdk.ts
│ ├── lib
│ │ ├── aws-stack.ts
│ │ ├── lambda.ts
│ │ └── step-functions.ts
├── hisa-test1-lambda
│ └── main.go
├── hisa-test2-lambda
│ └── main.go
├── hisa-test3-lambda
│ └── main.go
└── hisa-test4-lambda
└── main.go
Lambda(Go)の作成
まずは、Step Functionsの各ステートとなるLambdaを記述していきます。
以下、代表として1関数分記述しますが、Sleep
処理の箇所などを適宜変更した同様の関数を4つ用意します。
package main
import (
"context"
"fmt"
"time"
"github.com/aws/aws-lambda-go/lambda"
)
type JsonResponse struct {
Message string `json:"message"`
}
func HandleRequest(ctx context.Context) (JsonResponse, error) {
fmt.Print("関数1開始時刻:" + time.Now().GoString())
time.Sleep(time.Second * 10) // 10秒待つ
fmt.Print("関数1終了時刻:" + time.Now().GoString())
response := JsonResponse{
Message: "func1",
}
return response, nil
}
func main() {
lambda.Start(HandleRequest)
}
CDK
続いて、CDKを作成します。CDKをGoで書くこともできますが、こちらはTypeScriptで書いていきます。
Lambda、Step Functionsを作成する関数、それらを呼び出すコントローラを作成します。
Lambda
CDKのv2でGoの関数を作成するには、@aws-cdk/aws-lambda-go-alpha
というライブラリを使います。@aws-cdk/aws-lambda-go
はCDKのv1にしか対応していないのでご注意を。
import { Duration } from "aws-cdk-lib";
import { Construct } from "constructs";
import { GoFunction } from "@aws-cdk/aws-lambda-go-alpha";
export const createHisaTest1Lambda = (scope: Construct): GoFunction => {
const createHisaTest1Lambda = new GoFunction(scope, "hisa-test1-lambda", {
entry: "../hisa-test1-lambda",
functionName: "hisa-test1-lambda",
timeout: Duration.minutes(1),
});
return createHisaTest1Lambda;
};
export const createHisaTest2Lambda = (scope: Construct): GoFunction => {
const createHisaTest2Lambda = new GoFunction(scope, "hisa-test2-lambda", {
entry: "../hisa-test2-lambda",
functionName: "hisa-test2-lambda",
timeout: Duration.minutes(1),
});
return createHisaTest2Lambda;
};
export const createHisaTest3Lambda = (scope: Construct): GoFunction => {
const createHisaTest3Lambda = new GoFunction(scope, "hisa-test3-lambda", {
entry: "../hisa-test3-lambda",
functionName: "hisa-test3-lambda",
timeout: Duration.minutes(1),
});
return createHisaTest3Lambda;
};
export const createHisaTest4Lambda = (scope: Construct): GoFunction => {
const createHisaTest4Lambda = new GoFunction(scope, "hisa-test4-lambda", {
entry: "../hisa-test4-lambda",
functionName: "hisa-test4-lambda",
timeout: Duration.minutes(1),
});
return createHisaTest4Lambda;
};
Step Functions
Step Functionsの各ステートに、コントローラからの引数で渡ってくるLambda関数を割り当てていきます。
並列処理にはParallel
を使います。
また、outputPath
に$.Payload
を指定することで、Lambda関数からリターンする値だけを、次のステートのインプットに渡すことができます(これを指定しなかったら、その他にもいろんな値がインプットとして渡される)。
import { Construct } from "constructs";
import { StateMachine, Parallel } from "aws-cdk-lib/aws-stepfunctions";
import { LambdaInvoke } from "aws-cdk-lib/aws-stepfunctions-tasks";
import { GoFunction } from "@aws-cdk/aws-lambda-go-alpha";
export const createStateMachine = (
scope: Construct,
lambdaFunction1: GoFunction,
lambdaFunction2: GoFunction,
lambdaFunction3: GoFunction,
lambdaFunction4: GoFunction
): StateMachine => {
const stateMachine = new StateMachine(scope, "hisa-test-state-machine", {
stateMachineName: "hisa-test-state-machine",
definition: new LambdaInvoke(scope, "first-state", {
lambdaFunction: lambdaFunction1,
outputPath: "$.Payload",
})
.next(
new Parallel(scope, "second-state").branch(
new LambdaInvoke(scope, "second-first-state", {
lambdaFunction: lambdaFunction2,
outputPath: "$.Payload",
}),
new LambdaInvoke(scope, "second-second-state", {
lambdaFunction: lambdaFunction3,
outputPath: "$.Payload",
})
)
)
.next(
new LambdaInvoke(scope, "third-state", {
lambdaFunction: lambdaFunction4,
outputPath: "$.Payload",
})
),
});
return stateMachine;
};
コントローラ
最後に、Lambda、Step Functionsを実際に生成するコントローラを記述します。
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
createHisaTest1Lambda,
createHisaTest2Lambda,
createHisaTest3Lambda,
createHisaTest4Lambda,
} from "./lambda";
import { createStateMachine } from "./step-functions";
export class GoTestAwsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const hisaTest1Lambda = createHisaTest1Lambda(this);
const hisaTest2Lambda = createHisaTest2Lambda(this);
const hisaTest3Lambda = createHisaTest3Lambda(this);
const hisaTest4Lambda = createHisaTest4Lambda(this);
createStateMachine(
this,
hisaTest1Lambda,
hisaTest2Lambda,
hisaTest3Lambda,
hisaTest4Lambda
);
}
}
以上、参考になると嬉しいです。
Step Functionsを使うと、条件分岐やエラー処理もいい感じに書けるみたいなので(わかってないので抽象的)、試していきたいです。
Discussion