🌏

AWS Step FunctionsをServerlessで動かしてみた

2024/12/12に公開

この記事は MOSH Advent Calendar 2024 の12日目の記事です。

https://adventar.org/calendars/9989

はじめに

こんにちは! MOSH プロダクティビティチーム所属の鳴瀬です。

MOSH では AWS Lambda を利用するサーバーレスアーキテクチャを採用しています。

また、サーバーレスアーキテクチャを実現するためのサーバーレス管理フレームワークである Serverless を利用しています。

今回はそんな MOSH を支えている AWS Lambda の1つの機能である AWS Step Functions を Serverless で動かしてみました。

AWS Step Functions とは

AWS Step Functions は、様々なAWSのサービス同士を簡単に組み合わせて、ビジネスワークフローを作成するためのサービスです。

引用: Step Functions とは

例えば、上記画像の場合は以下のような処理が行われます。

  1. 開始(Start): フローが開始されます。
  2. 選択状態(Choice state): 条件に応じてフローが分岐します。この Choose your path... ステップで、 $.condition3P に等しいかを確認します。
  3. 条件分岐による処理:
    • 条件が 3P の場合:
      • HTTPエンドポイント呼び出し(Call third-party API): サードパーティのAPIを呼び出します。
      • Textract: テキスト抽出(Extract text): 呼び出した結果に基づいて、Amazon Textract を使いドキュメントからテキストを抽出します。
    • デフォルトのパス(Default):
      • Lambda関数の呼び出し(Retrieve data): Lambda関数を使用してデータを取得します。
      • Glueジョブの開始(Start data processing): Amazon Glue を使用してデータ処理を開始します。
  4. 終了(End): 全ての処理が終了します。

実際に動かしてみる

ユースケースを決める

今回は 「毎月月初に自社のサービスの利用ユーザー対してシステム利用料の決済を行う」 というユースケースを想定して実装してみます。

決済処理の中では、外部の決済サービスに対してAPIリクエストを送信し、それが対象のユーザーの数だけ行われ、最終的に全て成功したかどうかをDBに保存します。

まずは AWS Step Functions を使わない場合の処理を考えてみます。

1つの関数で完結させてみます。

ソースコードだと以下のようになりました。

app/
├── handlers/
│   ├── paymentBatch.js
│   └── paymentMockServer.js
└── serverless.yml

ソースコード

serverless.yml

serverlessの設定ファイルです。

関数にはバッチ処理を行う paymentBatch と、決済サービスのモックサーバー paymentMockServer を定義しています。

バッチの結果を保存するための DB は DynamoDB を使います。

MonthlyPaymentBatchResults というテーブルを作成し、バッチの結果を保存します。

service: test-app

provider:
  name: aws
  runtime: nodejs18.x
  region: ap-northeast-1
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - dynamodb:PutItem
          Resource:
            - arn:aws:dynamodb:ap-northeast-1:000000000000:table/MonthlyPaymentBatchResults

functions:
  paymentBatch:
    handler: handlers/paymentBatch.handler
    timeout: 900
  paymentMockServer:
    handler: handlers/paymentMockServer.handler
    events:
      - http:
          path: api/v1/payments
          method: post

resources:
  Resources:
    MonthlyPaymentBatchResultsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: MonthlyPaymentBatchResults
        AttributeDefinitions:
          - AttributeName: Id
            AttributeType: S
        KeySchema:
          - AttributeName: Id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5

paymentBatch.js

バッチ処理を行う関数です。

const TEST_USER_COUNT = 100; にて決済対象のユーザー数を設定しています。

100回のループ処理の中で、決済サービスに対してAPIリクエストを送信し、全てのAPIリクエストに全件成功した場合はバッチを成功、1度でも失敗した場合は失敗として DynamoDB に保存します。

const AWS = require("aws-sdk");
const axios = require("axios");
const { v4: uuidv4 } = require("uuid");

const dynamoDb = new AWS.DynamoDB.DocumentClient({
  region: "ap-northeast-1",
});

// 請求対象のユーザー数
const TEST_USER_COUNT = 100;
const PAYMENT_API_URL = "https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/api/v1/payments";

module.exports.handler = async () => {
  const startTime = Date.now();

  let status = "success";

  const users = Array.from({ length: TEST_USER_COUNT }, (_, i) => ({
    id: i + 1,
    name: `User-${i + 1}`,
  }));

  for (const user of users) {
    try {
      await axios.post(PAYMENT_API_URL);
    } catch (error) {
      status = "error";
    }
  }

  const params = {
    TableName: 'MonthlyPaymentBatchResults',
    Item: {
      Id: uuidv4(),
      Status: status,
      Timestamp: new Date().toISOString(),
    },
  };

  await dynamoDb.put(params).promise();

  const endTime = Date.now();
  return {
    statusCode: 200,
    body: JSON.stringify({
      result: status,
      executionTime: `${endTime - startTime} ms`,
    }),
  };
};

paymentMockServer.js

外部決済サービスのモックサーバーです。

最大1秒のランダムな時間でレスポンスを返します。
また、ランダムで成功または失敗を返します。

module.exports.handler = async (event) => {
  const paymentId = `payment_${Math.random().toString(36).slice(-10)}`;

  // 最大1秒待機
  await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000));

  if (Math.random() < 0.005) {
    return {
      statusCode: 500,
      body: JSON.stringify({ error: "Mock server error" }),
    };
  }

  return {
    statusCode: 200,
    body: JSON.stringify({ paymentId }),
  };
};

実行する

serverless invoke -f paymentBatch

結果

実行してみると、以下のような結果になりました。

100回のループ処理の中で、決済サービスに対してAPIリクエストを送信し、全てのAPIリクエストに全件成功した場合はバッチを成功、1度でも失敗した場合は失敗として DynamoDB に保存した結果

回数 結果 実行時間
1回目 error 54,650 ms
2回目 success 53,802 ms
3回目 success 51,474 ms
4回目 error 54,095 ms
5回目 success 53,560 ms

この結果を見ると、100ユーザーに対してAPIリクエストを送信した場合にかかる時間は50秒程度であることがわかります。

今回はテストデータで100ユーザーを対象にしていますが、実際のサービスでは日々ユーザー数は増加していくので、ユーザーが増加するにつれてバッチ処理にかかる時間も増加していくことが予想されます。

AWS Step Functionsを使って動かしてみる

次に、 AWS Step Functions を使った場合の処理を考えてみます。

1つの関数でバッチを完結させる処理では、以下の処理を行っていました。

  1. 決済対象のユーザー一覧を取得
  2. 決済対象のユーザー一覧に対してループ処理の中で決済サービスに対してAPIリクエストを送信する
  3. 全件成功した場合は成功、1件でも失敗した場合は失敗として DynamoDB に保存

関数を分割する

これらの処理を関数として分割し、AWS Step Functions を使って以下のように実現してみます。

  • 決済対象のユーザーを取得する関数
  • 決済サービスのAPIリクエストを送信する関数
  • 結果をDBに保存する関数

決済対象のユーザーを取得する関数 にて、決済対象ユーザー一覧を取得したあとに、 決済サービスのAPIリクエストを送信する関数 を並列で実行させます。

決済サービスのAPIリクエストを送信する関数 を並列で実行させることで、処理時間を短縮することができます。

そして、決済サービスのAPIリクエストを送信する関数が全て終了したあとに、 結果をDBに保存する関数 にて 決済サービスのAPIリクエストを送信する関数 が全て成功したかどうかを確認し、結果をDBに保存させます。

ソースコード

ソースコードだと以下のようになりました。

app/
├── handlers/
│   ├── getPaymentTargetUsers.js // [NEW] 決済対象のユーザーを取得する関数
│   ├── paymentMockServer.js     // 決済サービスのモックサーバー
│   ├── savePaymentResult.js     // [NEW] 結果をDBに保存する関数
│   └── sendPaymentRequest.js    // [NEW] 決済サービスのAPIリクエストを送信する関数
└── serverless.yml

getPaymentTargetUsers.js

決済対象のユーザーを取得する関数です。

const TEST_USER_COUNT = 100;

module.exports.handler = async () => {
  const users = Array.from({ length: TEST_USER_COUNT }, (_, i) => ({
    userId: i + 1,
  }));
  
  return {
    users,
  };
};

sendPaymentRequest.js

決済サービスにAPIリクエストを送信する関数です。

const axios = require("axios");

const PAYMENT_API_URL = "https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/api/v1/payments";

module.exports.handler = async (event) => {
  const { userId } = event;

  await axios.post(PAYMENT_API_URL, { userId });
  return { status: "success" };
};

SaveBatchResult.js
バッチの結果を保存する関数です。

const AWS = require("aws-sdk");
const { v4: uuidv4 } = require("uuid");

const dynamoDb = new AWS.DynamoDB.DocumentClient({
  region: "ap-northeast-1",
});

module.exports.handler = async (event) => {
  const hasError = event.some((result) => result.status === "error");
  const status = hasError ? "error" : "success";

  const params = {
    TableName: "MonthlyPaymentBatchResults",
    Item: {
      Id: uuidv4(),
      Status: status,
      Timestamp: new Date().toISOString(),
    },
  };

  try {
    await dynamoDb.put(params).promise();

    return {
      statusCode: 200,
      body: JSON.stringify({ message: "Batch result saved successfully" }),
    };
  } catch (error) {
    return {
      statusCode: 500,
      body: JSON.stringify({ error: "Failed to save batch result" }),
    };
  }
};

serverless.yml

serverlessの設定ファイルです。

service: test-app
provider:
  name: aws
  runtime: nodejs18.x
  region: ap-northeast-1
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - dynamodb:PutItem
          Resource:
            - arn:aws:dynamodb:ap-northeast-1:000000000000:table/MonthlyPaymentBatchResults
        - Effect: Allow
          Action:
            - states:StartExecution
          Resource:
            - "arn:aws:states:ap-northeast-1:000000000000:stateMachine:PaymentBatchStateMachine"
functions:
  getPaymentTargetUsers:
    handler: handlers/getPaymentTargetUsers.handler
  sendPaymentRequest:
    handler: handlers/sendPaymentRequest.handler
  saveBatchResult:
    handler: handlers/saveBatchResult.handler
  paymentMockServer:
    handler: handlers/paymentMockServer.handler
    events:
      - http:
          path: api/v1/payments
          method: post
stepFunctions:
  stateMachines:
    paymentBatch:
      name: PaymentBatchStateMachine
      definition:
        StartAt: getPaymentTargetUsers
        States:
          getPaymentTargetUsers:
            Type: Task
            Resource:
              Fn::GetAtt: [getPaymentTargetUsers, Arn]
            Next: ProcessPaymentTargetUsers
          ProcessPaymentTargetUsers:
            Type: Map
            ItemsPath: "$.users"
            Iterator:
              StartAt: SendPaymentRequest
              States:
                SendPaymentRequest:
                  Type: Task
                  Resource:
                    Fn::GetAtt: [sendPaymentRequest, Arn]
                  Catch:
                    - ErrorEquals: ["States.ALL"]
                      Next: Error
                  Next: Success
                Success:
                  Type: Pass
                  Result: { "status": "success" }
                  End: true
                Error:
                  Type: Pass
                  Result: { "status": "error" }
                  End: true
            Next: SaveBatchResult
          SaveBatchResult:
            Type: Task
            Resource:
              Fn::GetAtt: [saveBatchResult, Arn]
            Next: Finished
          Finished:
            Type: Succeed
resources:
  Resources:
    MonthlyPaymentBatchResultsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: MonthlyPaymentBatchResults
        AttributeDefinitions:
          - AttributeName: Id
            AttributeType: S
        KeySchema:
          - AttributeName: Id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5
plugins:
  - serverless-step-functions

AWS Step Functionsを利用するために serverless-step-functions を導入します。

$ yarn add --dev serverless-step-functions

plugins プロパティに AWS Step Functions のプラグイン(serverless-step-functions)を指定し、 stepFunctions プロパティにステートマシンを定義していきます。

plugins:
  - serverless-step-functions

stateMachines は振る舞いを定義するプロパティです。 ステートマシンはワークフローと呼ばれます。

PaymentBatchStateMachine という名前で、複数のユーザーに対して決済を行うワークフローを定義を設定しています。

States は、ステートマシンを構成するすべてのステートを定義します。
各ステートは、その名前をキーとして定義され、その中にタイプや動作、次に進む条件を記述します。

今回の例では各ステートは以下のように定義されています。

getPaymentTargetUsers

決済対象のユーザーを取得するステートです。

getPaymentTargetUsers:
  Type: Task
  Resource:
    Fn::GetAtt: [getPaymentTargetUsers, Arn]
  Next: ProcessPaymentTargetUsers

Type にはステートの種類を指定します。 Task を設定して Lambda関数を呼び出します。

Resource には Lambda関数を指定します。
Fn::GetAtt: [getPaymentTargetUsers, Arn] により、 getPaymentTargetUsers 関数のARNを取得して利用します。

Next には 次に実行するステートを指定します。ここでは ProcessPaymentTargetUsers に進みます。

ProcessPaymentTargetUsers

並列で処理を行うためのステートです。

ProcessPaymentTargetUsers:
 Type: Map
   ItemsPath: "$.users"
   Iterator:
     StartAt: SendPaymentRequest
     States:
       SendPaymentRequest:
         Type: Task
         Resource:
           Fn::GetAtt: [sendPaymentRequest, Arn]
         Catch:
           - ErrorEquals: ["States.ALL"]
             Next: Error
         Next: Success
       Success:
         Type: Pass
         Result: { "status": "success" }
         End: true
       Error:
         Type: Pass
         Result: { "status": "error" }
         End: true
    Next: SaveBatchResult

TypeMap を設定します。これにより、配列の各要素に対して並列で処理を行うことができます。

ItemsPath には、配列データのパスを指定します。ここでは $.users を指定しており、 getPaymentTargetUsers ステートから取得したユーザーリストを使用します。

Iterator は、配列の各要素に対しての処理を定義するプロパティです。この中に処理フローを記述し、各要素(ユーザー)ごとに SendPaymentRequest を実行します。

Resource には、呼び出す Lambda関数を指定します。
Fn::GetAtt: [sendPaymentRequest, Arn] により、 SendPaymentRequest 関数の ARN を取得して利用します。

Catch は、ステート内でエラーが発生した場合の処理を定義します。
ErrorEquals: ["States.ALL"] により、関数内で発生したエラーをキャッチし、 Error ステートに進みます。

エラーが発生しなかった場合は Next: Success により Success ステートに進みます。

Success, Error

Success:
  Type: Pass
  Result: { "status": "success" }
  End: true
Error:
  Type: Pass
  Result: { "status": "error" }
  End: true

TypePass を設定します。これは入力を出力に渡すだけなシンプルなTypeです。

各ユーザーの決済APIに成功した場合は Success ステートの { "status": "success" } を出力、失敗した場合は Error ステートの Result: { "status": "error" } を出力します。

この出力を最後のバッチの結果を保存する処理にて使用します。

後続の処理で出力された全てのステータスを見てバッチ全体が成功か失敗かを判断させます。

Endtrue にすることで、ステートの処理を終了させます。

Next: SaveBatchResult により、 SaveBatchResult ステートに進みます。

SaveBatchResult

バッチの結果を保存する関数を呼び出します。

SaveBatchResult:
  Type: Task
  Resource:
    Fn::GetAtt: [saveBatchResult, Arn]
  Next: Finished

Type には Task を設定して 全体のバッチ処理結果を保存するLambda関数を呼び出します。

Resource には Lambda関数を指定します。
Fn::GetAtt: [saveBatchResult, Arn] により、 saveBatchResult 関数の ARN を取得して利用します。

Next には、処理が完了した後に進むステートを指定します。ここでは Finished に進みます。

Finished

Finished:
  Type: Succeed

TypeSucceed を設定します。
これにより、すべての処理が正常に完了したことを示します。

serverless deploy コマンドを実行してデプロイします。

デプロイをすると AWS のコンソールの Step Functions にて、ステートマシンが作成されていることが確認できます。

デプロイされたステートマシンの詳細の定義タブを見ると、定義したステートマシンの内容が確認できます。

デプロイが確認できたので実行しています。コンソールから直接実行できますが、今回は aws cli を使って実行してみます。

  aws stepfunctions start-execution \
        --state-machine-arn arn:aws:states:ap-northeast-1:000000000000:stateMachine:PaymentBatchStateMachine

実行してみると、以下のような結果になりました。

請求対象のユーザー100件に対して、決済サービスにリクエストを並列で送り、全件成功した場合は成功、1度でも失敗した場合は失敗として DynamoDB に保存した結果

回数 結果 実行時間
1回目 error 8,351 ms
2回目 success 7,522 ms
3回目 success 6,354 ms
4回目 error 5,634 ms
5回目 success 6,060 ms

この結果を見ると、並列で処理すると1回あたりの処理時間は6秒程度であることがわかります。

結果も全てのユーザーに対しての処理が成功した場合は成功(success)、1度でも失敗した場合は失敗(error)として DynamoDB に保存されていることが確認できました。

改めて1つの関数で完結させる処理と AWS Step Functions を使った処理を比較してみると、並列で処理を行うことで処理時間を短縮することができました。

実行回数 1つのLambda関数でループ処理 (実行時間) AWS Step Functionsで並列処理 (実行時間)
1回目 54,650 ms 8,351 ms
2回目 53,802 ms 7,522 ms
3回目 51,474 ms 6,354 ms
4回目 54,095 ms 5,634 ms
5回目 53,560 ms 6,060 ms

今回のケースで検討すべきこと

今回の処理では、決済対象のユーザーに対して決済APIを並列で送信する処理を行いました。

モックサーバーを利用しましたが、実際に並列でAPIを投げる場合は、API側のレートリミットに注意が必要です。

終わりに

今回は AWS Step Functions を Serverless 環境で動かしてみました。

AWS Step Functions は今回紹介した内容以外にも豊富な機能を提供しています。例えば、関数の失敗時にリトライを自動的に行う仕組みや直接外部のAPIを呼び出すことも可能(外部APIを処理するためだけのLambdaを作らなくていい)です。

これらの機能を活用することで、より高度で柔軟なワークフローを構築することができます。

ぜひ、様々なユースケースで試してみてください!

ソースコードを公開しました。
https://github.com/ryy/serverless-step-functions-example

MOSH

Discussion