🚂

AWS CDK (TypeScript) で Step Functions を構築する

2024/05/22に公開

AWS CDK で Step Functions を構築したい

AWS CDK バージョン

2.130.0 を使用しています。

Map で Lambda を呼び出すワークフローを構築する

ざっとワークフローの流れを定義すると以下のようになります。

  1. Lambda 関数の呼び出し -> ユーザー一覧を取得
  2. fail で処理を続行するか判定
  3. Map でユーザー一覧分の繰り返し処理
  4. 20歳以上かの判定
  5. 判定に応じた Lambda 関数の呼び出し

ディレクトリ構成

今回は以下の構成で実装します。

root
├── bin
│   └── stepfunctions-builder.ts
├── lambda
│   ├── output-function.ts
│   └── users-function.ts
└── lib
    └── stepfunctions-builder-stack.ts

stepfunctions-builder.ts

stepfunctions-builder.ts
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { StepfunctionsBuilderStack } from '../lib/stepfunctions-builder-stack';

const app = new cdk.App();
new StepfunctionsBuilderStack(app, 'StepfunctionsBuilderStack', {});

呼び出すための Lambda 関数

lambda フォルダには、それぞれ Lambda 関数を定義します。

output-function.ts

ペイロードで受け取った user をログ出力する簡単な関数です。

output-function.ts
import { Handler } from "aws-lambda";

const handler: Handler = async (event, context) => {
    if (!event.user) {
        throw new Error("User not found");
    }

    console.info(event.user);
};

export { handler };

users-function.ts

固定の users とペイロードに含まれる fail を返却する関数です。

users-function.ts
import { Handler } from "aws-lambda";

const handler: Handler = async (event, context) => {
    console.info(event);
    console.info(context);

    const fail = event?.fail ?? false;

    return {
        fail,
        users: [
            { username: "alex", age: 20 },
            { username: "john", age: 30 },
            { username: "smith", age: 40 },
            { username: "thomas", age: 50 },
            { username: "jane", age: 18 },
            { username: "william", age: 10 },
            { username: "james", age: 25 }
        ]
    };
};

export { handler };

stepfunctions-builder-stack.ts

Step Functions のワークフローを MyStateMachine として定義していきます。

基本的には next でステートマシンの要素を連結していきます。

stepfunctions-builder-stack.ts
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import * as path from "path";

export class StepfunctionsBuilderStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);

        // Lambda 関数の定義
        const usersLambdaFunction = new NodejsFunction(this, 'MyUsersLambdaFunction', {
            runtime: lambda.Runtime.NODEJS_18_X,
            entry: path.join(__dirname, "..", "lambda", "users-function.ts"),
            handler: "handler",
            timeout: cdk.Duration.seconds(10)
        });
        const outputLambdaFunction = new NodejsFunction(this, 'MyOutputLambdaFunction', {
            runtime: lambda.Runtime.NODEJS_18_X,
            entry: path.join(__dirname, "..", "lambda", "output-function.ts"),
            handler: "handler",
            timeout: cdk.Duration.seconds(10)
        });

        // ユーザー一覧出力 Lambda 関数の呼び出し
        const invokeUsersLambdaTask = new tasks.LambdaInvoke(this, "MyUsersLambdaTask", {
            lambdaFunction: usersLambdaFunction,
            resultSelector: {
                "users.$": "$.Payload.users",
                "fail.$": "$.Payload.fail"
            },
        });

        // Output Lambda 関数の呼び出し
        const invokeOutputLambdaTask = new tasks.LambdaInvoke(this, "MyOutputLambdaTask", {
            lambdaFunction: outputLambdaFunction
        });

        // 20歳以上のユーザーのみを処理する Output Lambda 関数の呼び出し
        const invokeOutputLambdaTaskOver20 = new tasks.LambdaInvoke(this, "MyOutputLambdaTaskOver20", {
            lambdaFunction: outputLambdaFunction
        });

        // 年齢によって Lambda 関数を切り替えるための Choice の定義
        const ageChoice = new sfn.Choice(this, 'MyAgeChoice')
            .when(
                sfn.Condition.numberGreaterThanEquals("$.user.age", 20), 
                invokeOutputLambdaTaskOver20
            )
            .otherwise(invokeOutputLambdaTask);

        // Map の定義
        const map = new sfn.Map(this, 'MyMap', {
            maxConcurrency: 10,
            itemsPath: '$.users',
            itemSelector: {
                "user.$": "$$.Map.Item.Value",
            },
        });
        map.itemProcessor(ageChoice).next(new sfn.Succeed(this, "Succeed"));

        // Fail の場合の Choice の定義
        const failChoice = new sfn.Choice(this, 'MyFailChoice')
            .when(
                sfn.Condition.booleanEquals("$.fail", true), 
                new sfn.Fail(this, "Fail")
            )
            .when(
                sfn.Condition.booleanEquals("$.fail", false),
                map
            );

        new sfn.StateMachine(this, 'MyStateMachine', {
            definition: invokeUsersLambdaTask.next(failChoice),
        });
    }
}

Lambda Invoke

Lambda 関数の呼び出しは aws-cdk-lib/aws-stepfunctions-tasksLambdaInvoke を使用します。

ワークフロービルダーでも「AWS Lambda Inveke」という名前なのでイメージしやすいかと思います。

resultSelector を定義することで出力結果を変換できます。

Choice

条件分岐を定義します。

こちらも同じ名前のなのでイメージしやすいです。

when でチェインすることで、条件と次のアイテムを指定できます。

otherwise は Default rule に該当します。

Map

ユーザー一覧をペイロードとして受け取りるため itemsPath を定義します。

また、ユーザー一覧のユーザー情報にアクセスするため itemSelector も定義しています。

itemProcessor で Map で繰り返し処理したいアイテムを指定します。

実行結果

成功

入力を指定せずに実行すると、無事に成功しました!

Map の user.age の条件分岐が正しく機能しているかも確認します。

失敗

以下の入力を指定します。

{
  "fail": true
}

Fail の条件分岐で Fail に進むことを確認できました!

まとめ

簡単な Step Functions のワークフローを構築してみました。

コードでワークフローを定義していくのは想像よりも大変でした🙄🙄🙄

でも、定義してしまえば、複雑なワークフローが何度でも簡単に構築できます。

やはり CDK は偉大です🎉

コラボスタイル Developers

Discussion