AWS Step FunctionsをServerlessで動かしてみた
この記事は MOSH Advent Calendar 2024 の12日目の記事です。
はじめに
こんにちは! MOSH プロダクティビティチーム所属の鳴瀬です。
MOSH では AWS Lambda を利用するサーバーレスアーキテクチャを採用しています。
また、サーバーレスアーキテクチャを実現するためのサーバーレス管理フレームワークである Serverless を利用しています。
今回はそんな MOSH を支えている AWS Lambda の1つの機能である AWS Step Functions を Serverless で動かしてみました。
AWS Step Functions とは
AWS Step Functions は、様々なAWSのサービス同士を簡単に組み合わせて、ビジネスワークフローを作成するためのサービスです。
例えば、上記画像の場合は以下のような処理が行われます。
- 開始(Start): フローが開始されます。
-
選択状態(Choice state): 条件に応じてフローが分岐します。この
Choose your path...
ステップで、$.condition
が3P
に等しいかを確認します。 -
条件分岐による処理:
-
条件が
3P
の場合:- HTTPエンドポイント呼び出し(Call third-party API): サードパーティのAPIを呼び出します。
- Textract: テキスト抽出(Extract text): 呼び出した結果に基づいて、Amazon Textract を使いドキュメントからテキストを抽出します。
-
デフォルトのパス(Default):
- Lambda関数の呼び出し(Retrieve data): Lambda関数を使用してデータを取得します。
- Glueジョブの開始(Start data processing): Amazon Glue を使用してデータ処理を開始します。
-
条件が
- 終了(End): 全ての処理が終了します。
実際に動かしてみる
ユースケースを決める
今回は 「毎月月初に自社のサービスの利用ユーザー対してシステム利用料の決済を行う」 というユースケースを想定して実装してみます。
決済処理の中では、外部の決済サービスに対してAPIリクエストを送信し、それが対象のユーザーの数だけ行われ、最終的に全て成功したかどうかをDBに保存します。
まずは AWS Step Functions を使わない場合の処理を考えてみます。
1つの関数で完結させてみます。
ソースコードだと以下のようになりました。
app/
├── handlers/
│ ├── paymentBatch.js
│ └── paymentMockServer.js
└── serverless.yml
ソースコード
serverless.yml
serverlessの設定ファイルです。
関数にはバッチ処理を行う paymentBatch
と、決済サービスのモックサーバー paymentMockServer
を定義しています。
バッチの結果を保存するための DB は DynamoDB を使います。
MonthlyPaymentBatchResults
というテーブルを作成し、バッチの結果を保存します。
service: test-app
provider:
name: aws
runtime: nodejs18.x
region: ap-northeast-1
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
- arn:aws:dynamodb:ap-northeast-1:000000000000:table/MonthlyPaymentBatchResults
functions:
paymentBatch:
handler: handlers/paymentBatch.handler
timeout: 900
paymentMockServer:
handler: handlers/paymentMockServer.handler
events:
- http:
path: api/v1/payments
method: post
resources:
Resources:
MonthlyPaymentBatchResultsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: MonthlyPaymentBatchResults
AttributeDefinitions:
- AttributeName: Id
AttributeType: S
KeySchema:
- AttributeName: Id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
paymentBatch.js
バッチ処理を行う関数です。
const TEST_USER_COUNT = 100;
にて決済対象のユーザー数を設定しています。
100回のループ処理の中で、決済サービスに対してAPIリクエストを送信し、全てのAPIリクエストに全件成功した場合はバッチを成功、1度でも失敗した場合は失敗として DynamoDB に保存します。
const AWS = require("aws-sdk");
const axios = require("axios");
const { v4: uuidv4 } = require("uuid");
const dynamoDb = new AWS.DynamoDB.DocumentClient({
region: "ap-northeast-1",
});
// 請求対象のユーザー数
const TEST_USER_COUNT = 100;
const PAYMENT_API_URL = "https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/api/v1/payments";
module.exports.handler = async () => {
const startTime = Date.now();
let status = "success";
const users = Array.from({ length: TEST_USER_COUNT }, (_, i) => ({
id: i + 1,
name: `User-${i + 1}`,
}));
for (const user of users) {
try {
await axios.post(PAYMENT_API_URL);
} catch (error) {
status = "error";
}
}
const params = {
TableName: 'MonthlyPaymentBatchResults',
Item: {
Id: uuidv4(),
Status: status,
Timestamp: new Date().toISOString(),
},
};
await dynamoDb.put(params).promise();
const endTime = Date.now();
return {
statusCode: 200,
body: JSON.stringify({
result: status,
executionTime: `${endTime - startTime} ms`,
}),
};
};
paymentMockServer.js
外部決済サービスのモックサーバーです。
最大1秒のランダムな時間でレスポンスを返します。
また、ランダムで成功または失敗を返します。
module.exports.handler = async (event) => {
const paymentId = `payment_${Math.random().toString(36).slice(-10)}`;
// 最大1秒待機
await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000));
if (Math.random() < 0.005) {
return {
statusCode: 500,
body: JSON.stringify({ error: "Mock server error" }),
};
}
return {
statusCode: 200,
body: JSON.stringify({ paymentId }),
};
};
実行する
serverless invoke -f paymentBatch
結果
実行してみると、以下のような結果になりました。
100回のループ処理の中で、決済サービスに対してAPIリクエストを送信し、全てのAPIリクエストに全件成功した場合はバッチを成功、1度でも失敗した場合は失敗として DynamoDB に保存した結果
回数 | 結果 | 実行時間 |
---|---|---|
1回目 | error | 54,650 ms |
2回目 | success | 53,802 ms |
3回目 | success | 51,474 ms |
4回目 | error | 54,095 ms |
5回目 | success | 53,560 ms |
この結果を見ると、100ユーザーに対してAPIリクエストを送信した場合にかかる時間は50秒程度であることがわかります。
今回はテストデータで100ユーザーを対象にしていますが、実際のサービスでは日々ユーザー数は増加していくので、ユーザーが増加するにつれてバッチ処理にかかる時間も増加していくことが予想されます。
AWS Step Functionsを使って動かしてみる
次に、 AWS Step Functions を使った場合の処理を考えてみます。
1つの関数でバッチを完結させる処理では、以下の処理を行っていました。
- 決済対象のユーザー一覧を取得
- 決済対象のユーザー一覧に対してループ処理の中で決済サービスに対してAPIリクエストを送信する
- 全件成功した場合は成功、1件でも失敗した場合は失敗として DynamoDB に保存
関数を分割する
これらの処理を関数として分割し、AWS Step Functions を使って以下のように実現してみます。
- 決済対象のユーザーを取得する関数
- 決済サービスのAPIリクエストを送信する関数
- 結果をDBに保存する関数
決済対象のユーザーを取得する関数
にて、決済対象ユーザー一覧を取得したあとに、 決済サービスのAPIリクエストを送信する関数
を並列で実行させます。
決済サービスのAPIリクエストを送信する関数
を並列で実行させることで、処理時間を短縮することができます。
そして、決済サービスのAPIリクエストを送信する関数
が全て終了したあとに、 結果をDBに保存する関数
にて 決済サービスのAPIリクエストを送信する関数
が全て成功したかどうかを確認し、結果をDBに保存させます。
ソースコード
ソースコードだと以下のようになりました。
app/
├── handlers/
│ ├── getPaymentTargetUsers.js // [NEW] 決済対象のユーザーを取得する関数
│ ├── paymentMockServer.js // 決済サービスのモックサーバー
│ ├── savePaymentResult.js // [NEW] 結果をDBに保存する関数
│ └── sendPaymentRequest.js // [NEW] 決済サービスのAPIリクエストを送信する関数
└── serverless.yml
getPaymentTargetUsers.js
決済対象のユーザーを取得する関数です。
const TEST_USER_COUNT = 100;
module.exports.handler = async () => {
const users = Array.from({ length: TEST_USER_COUNT }, (_, i) => ({
userId: i + 1,
}));
return {
users,
};
};
sendPaymentRequest.js
決済サービスにAPIリクエストを送信する関数です。
const axios = require("axios");
const PAYMENT_API_URL = "https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/api/v1/payments";
module.exports.handler = async (event) => {
const { userId } = event;
await axios.post(PAYMENT_API_URL, { userId });
return { status: "success" };
};
SaveBatchResult.js
バッチの結果を保存する関数です。
const AWS = require("aws-sdk");
const { v4: uuidv4 } = require("uuid");
const dynamoDb = new AWS.DynamoDB.DocumentClient({
region: "ap-northeast-1",
});
module.exports.handler = async (event) => {
const hasError = event.some((result) => result.status === "error");
const status = hasError ? "error" : "success";
const params = {
TableName: "MonthlyPaymentBatchResults",
Item: {
Id: uuidv4(),
Status: status,
Timestamp: new Date().toISOString(),
},
};
try {
await dynamoDb.put(params).promise();
return {
statusCode: 200,
body: JSON.stringify({ message: "Batch result saved successfully" }),
};
} catch (error) {
return {
statusCode: 500,
body: JSON.stringify({ error: "Failed to save batch result" }),
};
}
};
serverless.yml
serverlessの設定ファイルです。
service: test-app
provider:
name: aws
runtime: nodejs18.x
region: ap-northeast-1
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
- arn:aws:dynamodb:ap-northeast-1:000000000000:table/MonthlyPaymentBatchResults
- Effect: Allow
Action:
- states:StartExecution
Resource:
- "arn:aws:states:ap-northeast-1:000000000000:stateMachine:PaymentBatchStateMachine"
functions:
getPaymentTargetUsers:
handler: handlers/getPaymentTargetUsers.handler
sendPaymentRequest:
handler: handlers/sendPaymentRequest.handler
saveBatchResult:
handler: handlers/saveBatchResult.handler
paymentMockServer:
handler: handlers/paymentMockServer.handler
events:
- http:
path: api/v1/payments
method: post
stepFunctions:
stateMachines:
paymentBatch:
name: PaymentBatchStateMachine
definition:
StartAt: getPaymentTargetUsers
States:
getPaymentTargetUsers:
Type: Task
Resource:
Fn::GetAtt: [getPaymentTargetUsers, Arn]
Next: ProcessPaymentTargetUsers
ProcessPaymentTargetUsers:
Type: Map
ItemsPath: "$.users"
Iterator:
StartAt: SendPaymentRequest
States:
SendPaymentRequest:
Type: Task
Resource:
Fn::GetAtt: [sendPaymentRequest, Arn]
Catch:
- ErrorEquals: ["States.ALL"]
Next: Error
Next: Success
Success:
Type: Pass
Result: { "status": "success" }
End: true
Error:
Type: Pass
Result: { "status": "error" }
End: true
Next: SaveBatchResult
SaveBatchResult:
Type: Task
Resource:
Fn::GetAtt: [saveBatchResult, Arn]
Next: Finished
Finished:
Type: Succeed
resources:
Resources:
MonthlyPaymentBatchResultsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: MonthlyPaymentBatchResults
AttributeDefinitions:
- AttributeName: Id
AttributeType: S
KeySchema:
- AttributeName: Id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
plugins:
- serverless-step-functions
AWS Step Functionsを利用するために serverless-step-functions を導入します。
$ yarn add --dev serverless-step-functions
plugins
プロパティに AWS Step Functions のプラグイン(serverless-step-functions
)を指定し、 stepFunctions
プロパティにステートマシンを定義していきます。
plugins:
- serverless-step-functions
stateMachines
は振る舞いを定義するプロパティです。 ステートマシンはワークフローと呼ばれます。
PaymentBatchStateMachine
という名前で、複数のユーザーに対して決済を行うワークフローを定義を設定しています。
States
は、ステートマシンを構成するすべてのステートを定義します。
各ステートは、その名前をキーとして定義され、その中にタイプや動作、次に進む条件を記述します。
今回の例では各ステートは以下のように定義されています。
getPaymentTargetUsers
決済対象のユーザーを取得するステートです。
getPaymentTargetUsers:
Type: Task
Resource:
Fn::GetAtt: [getPaymentTargetUsers, Arn]
Next: ProcessPaymentTargetUsers
Type
にはステートの種類を指定します。 Task
を設定して Lambda関数を呼び出します。
Resource
には Lambda関数を指定します。
Fn::GetAtt: [getPaymentTargetUsers, Arn]
により、 getPaymentTargetUsers
関数のARNを取得して利用します。
Next
には 次に実行するステートを指定します。ここでは ProcessPaymentTargetUsers
に進みます。
ProcessPaymentTargetUsers
並列で処理を行うためのステートです。
ProcessPaymentTargetUsers:
Type: Map
ItemsPath: "$.users"
Iterator:
StartAt: SendPaymentRequest
States:
SendPaymentRequest:
Type: Task
Resource:
Fn::GetAtt: [sendPaymentRequest, Arn]
Catch:
- ErrorEquals: ["States.ALL"]
Next: Error
Next: Success
Success:
Type: Pass
Result: { "status": "success" }
End: true
Error:
Type: Pass
Result: { "status": "error" }
End: true
Next: SaveBatchResult
Type
に Map
を設定します。これにより、配列の各要素に対して並列で処理を行うことができます。
ItemsPath
には、配列データのパスを指定します。ここでは $.users
を指定しており、 getPaymentTargetUsers
ステートから取得したユーザーリストを使用します。
Iterator
は、配列の各要素に対しての処理を定義するプロパティです。この中に処理フローを記述し、各要素(ユーザー)ごとに SendPaymentRequest
を実行します。
Resource
には、呼び出す Lambda関数を指定します。
Fn::GetAtt: [sendPaymentRequest, Arn]
により、 SendPaymentRequest
関数の ARN を取得して利用します。
Catch
は、ステート内でエラーが発生した場合の処理を定義します。
ErrorEquals: ["States.ALL"]
により、関数内で発生したエラーをキャッチし、 Error
ステートに進みます。
エラーが発生しなかった場合は Next: Success
により Success
ステートに進みます。
Success, Error
Success:
Type: Pass
Result: { "status": "success" }
End: true
Error:
Type: Pass
Result: { "status": "error" }
End: true
Type
に Pass
を設定します。これは入力を出力に渡すだけなシンプルなTypeです。
各ユーザーの決済APIに成功した場合は Success
ステートの { "status": "success" }
を出力、失敗した場合は Error
ステートの Result: { "status": "error" }
を出力します。
この出力を最後のバッチの結果を保存する処理にて使用します。
後続の処理で出力された全てのステータスを見てバッチ全体が成功か失敗かを判断させます。
End
を true
にすることで、ステートの処理を終了させます。
Next: SaveBatchResult
により、 SaveBatchResult
ステートに進みます。
SaveBatchResult
バッチの結果を保存する関数を呼び出します。
SaveBatchResult:
Type: Task
Resource:
Fn::GetAtt: [saveBatchResult, Arn]
Next: Finished
Type
には Task
を設定して 全体のバッチ処理結果を保存するLambda関数を呼び出します。
Resource
には Lambda関数を指定します。
Fn::GetAtt: [saveBatchResult, Arn]
により、 saveBatchResult
関数の ARN を取得して利用します。
Next
には、処理が完了した後に進むステートを指定します。ここでは Finished
に進みます。
Finished
Finished:
Type: Succeed
Type
に Succeed
を設定します。
これにより、すべての処理が正常に完了したことを示します。
serverless deploy
コマンドを実行してデプロイします。
デプロイをすると AWS のコンソールの Step Functions にて、ステートマシンが作成されていることが確認できます。
デプロイされたステートマシンの詳細の定義タブを見ると、定義したステートマシンの内容が確認できます。
デプロイが確認できたので実行しています。コンソールから直接実行できますが、今回は aws cli を使って実行してみます。
aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:000000000000:stateMachine:PaymentBatchStateMachine
実行してみると、以下のような結果になりました。
請求対象のユーザー100件に対して、決済サービスにリクエストを並列で送り、全件成功した場合は成功、1度でも失敗した場合は失敗として DynamoDB に保存した結果
回数 | 結果 | 実行時間 |
---|---|---|
1回目 | error | 8,351 ms |
2回目 | success | 7,522 ms |
3回目 | success | 6,354 ms |
4回目 | error | 5,634 ms |
5回目 | success | 6,060 ms |
この結果を見ると、並列で処理すると1回あたりの処理時間は6秒程度であることがわかります。
結果も全てのユーザーに対しての処理が成功した場合は成功(success)、1度でも失敗した場合は失敗(error)として DynamoDB に保存されていることが確認できました。
改めて1つの関数で完結させる処理と AWS Step Functions を使った処理を比較してみると、並列で処理を行うことで処理時間を短縮することができました。
実行回数 | 1つのLambda関数でループ処理 (実行時間) | AWS Step Functionsで並列処理 (実行時間) |
---|---|---|
1回目 | 54,650 ms | 8,351 ms |
2回目 | 53,802 ms | 7,522 ms |
3回目 | 51,474 ms | 6,354 ms |
4回目 | 54,095 ms | 5,634 ms |
5回目 | 53,560 ms | 6,060 ms |
今回のケースで検討すべきこと
今回の処理では、決済対象のユーザーに対して決済APIを並列で送信する処理を行いました。
モックサーバーを利用しましたが、実際に並列でAPIを投げる場合は、API側のレートリミットに注意が必要です。
終わりに
今回は AWS Step Functions を Serverless 環境で動かしてみました。
AWS Step Functions は今回紹介した内容以外にも豊富な機能を提供しています。例えば、関数の失敗時にリトライを自動的に行う仕組みや直接外部のAPIを呼び出すことも可能(外部APIを処理するためだけのLambdaを作らなくていい)です。
これらの機能を活用することで、より高度で柔軟なワークフローを構築することができます。
ぜひ、様々なユースケースで試してみてください!
ソースコードを公開しました。
Discussion