AWS CDK (TypeScript) で Step Functions を構築する
AWS CDK で Step Functions を構築したい
AWS CDK バージョン
2.130.0 を使用しています。
Map で Lambda を呼び出すワークフローを構築する
ざっとワークフローの流れを定義すると以下のようになります。
- Lambda 関数の呼び出し -> ユーザー一覧を取得
-
fail
で処理を続行するか判定 - Map でユーザー一覧分の繰り返し処理
- 20歳以上かの判定
- 判定に応じた Lambda 関数の呼び出し
ディレクトリ構成
今回は以下の構成で実装します。
root
├── bin
│ └── stepfunctions-builder.ts
├── lambda
│ ├── output-function.ts
│ └── users-function.ts
└── lib
└── stepfunctions-builder-stack.ts
stepfunctions-builder.ts
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { StepfunctionsBuilderStack } from '../lib/stepfunctions-builder-stack';
const app = new cdk.App();
new StepfunctionsBuilderStack(app, 'StepfunctionsBuilderStack', {});
呼び出すための Lambda 関数
lambda フォルダには、それぞれ Lambda 関数を定義します。
output-function.ts
ペイロードで受け取った user
をログ出力する簡単な関数です。
import { Handler } from "aws-lambda";
const handler: Handler = async (event, context) => {
if (!event.user) {
throw new Error("User not found");
}
console.info(event.user);
};
export { handler };
users-function.ts
固定の users
とペイロードに含まれる fail
を返却する関数です。
import { Handler } from "aws-lambda";
const handler: Handler = async (event, context) => {
console.info(event);
console.info(context);
const fail = event?.fail ?? false;
return {
fail,
users: [
{ username: "alex", age: 20 },
{ username: "john", age: 30 },
{ username: "smith", age: 40 },
{ username: "thomas", age: 50 },
{ username: "jane", age: 18 },
{ username: "william", age: 10 },
{ username: "james", age: 25 }
]
};
};
export { handler };
stepfunctions-builder-stack.ts
Step Functions のワークフローを MyStateMachine
として定義していきます。
基本的には next
でステートマシンの要素を連結していきます。
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import * as path from "path";
export class StepfunctionsBuilderStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Lambda 関数の定義
const usersLambdaFunction = new NodejsFunction(this, 'MyUsersLambdaFunction', {
runtime: lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, "..", "lambda", "users-function.ts"),
handler: "handler",
timeout: cdk.Duration.seconds(10)
});
const outputLambdaFunction = new NodejsFunction(this, 'MyOutputLambdaFunction', {
runtime: lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, "..", "lambda", "output-function.ts"),
handler: "handler",
timeout: cdk.Duration.seconds(10)
});
// ユーザー一覧出力 Lambda 関数の呼び出し
const invokeUsersLambdaTask = new tasks.LambdaInvoke(this, "MyUsersLambdaTask", {
lambdaFunction: usersLambdaFunction,
resultSelector: {
"users.$": "$.Payload.users",
"fail.$": "$.Payload.fail"
},
});
// Output Lambda 関数の呼び出し
const invokeOutputLambdaTask = new tasks.LambdaInvoke(this, "MyOutputLambdaTask", {
lambdaFunction: outputLambdaFunction
});
// 20歳以上のユーザーのみを処理する Output Lambda 関数の呼び出し
const invokeOutputLambdaTaskOver20 = new tasks.LambdaInvoke(this, "MyOutputLambdaTaskOver20", {
lambdaFunction: outputLambdaFunction
});
// 年齢によって Lambda 関数を切り替えるための Choice の定義
const ageChoice = new sfn.Choice(this, 'MyAgeChoice')
.when(
sfn.Condition.numberGreaterThanEquals("$.user.age", 20),
invokeOutputLambdaTaskOver20
)
.otherwise(invokeOutputLambdaTask);
// Map の定義
const map = new sfn.Map(this, 'MyMap', {
maxConcurrency: 10,
itemsPath: '$.users',
itemSelector: {
"user.$": "$$.Map.Item.Value",
},
});
map.itemProcessor(ageChoice).next(new sfn.Succeed(this, "Succeed"));
// Fail の場合の Choice の定義
const failChoice = new sfn.Choice(this, 'MyFailChoice')
.when(
sfn.Condition.booleanEquals("$.fail", true),
new sfn.Fail(this, "Fail")
)
.when(
sfn.Condition.booleanEquals("$.fail", false),
map
);
new sfn.StateMachine(this, 'MyStateMachine', {
definition: invokeUsersLambdaTask.next(failChoice),
});
}
}
Lambda Invoke
Lambda 関数の呼び出しは aws-cdk-lib/aws-stepfunctions-tasks
の LambdaInvoke
を使用します。
ワークフロービルダーでも「AWS Lambda Inveke」という名前なのでイメージしやすいかと思います。
resultSelector
を定義することで出力結果を変換できます。
Choice
条件分岐を定義します。
こちらも同じ名前のなのでイメージしやすいです。
when
でチェインすることで、条件と次のアイテムを指定できます。
otherwise
は Default rule に該当します。
Map
ユーザー一覧をペイロードとして受け取りるため itemsPath
を定義します。
また、ユーザー一覧のユーザー情報にアクセスするため itemSelector
も定義しています。
itemProcessor
で Map で繰り返し処理したいアイテムを指定します。
実行結果
成功
入力を指定せずに実行すると、無事に成功しました!
Map の user.age
の条件分岐が正しく機能しているかも確認します。
失敗
以下の入力を指定します。
{
"fail": true
}
Fail の条件分岐で Fail に進むことを確認できました!
まとめ
簡単な Step Functions のワークフローを構築してみました。
コードでワークフローを定義していくのは想像よりも大変でした🙄🙄🙄
でも、定義してしまえば、複雑なワークフローが何度でも簡単に構築できます。
やはり CDK は偉大です🎉
Discussion