AWS CDK で Step Functions の並列分散処理を実装してみるときに気にしたところ
この記事は AWS CDK Advent Calendar 2024 19 日目の記事です。
前提
AWS Step Functions を使用して並列分散処理を実装する機会がありました。その際に、AWS CDK を使用して実装を行い、いくつかの注意点や工夫が必要でした。
前提となる技術スタックは以下の通りです。
- AWS CDK v2
- TypeScript
- Node.js 22.x
- AWS Step Functions
- AWS Lambda
- Amazon S3
今回は後述する課題を解決するためにやったことを、本エントリで説明用に実装したコードを交えて説明します。
課題
大量のデータを処理する際に、処理時間を短縮するために並列処理を実装する必要がありました。具体的には以下のような要件を考える必要がありました。
- 100 個程度の CSV ファイルを生成し、それぞれのファイルに対して共通した処理を行う
- 処理結果を別の CSV ファイルとして保存する
- ステート間でデータを受け渡す際に、データサイズが 256KB を超える可能性があるため、S3 に保存したファイルを経由してデータを受け渡す
- 一部の並列処理が失敗しても、全体の処理を継続して最終的な結果を判定する
これらの要件を満たすために、Step Functions の Distributed Map を使用することにしました。
また、AWS CDK でも Distributed Map がサポートされており、非常に書きやすいと感じたため採用しました。
全体構成
実装したアーキテクチャの全体像は以下のようになっています。
-
Step Functions
- Distributed Map を使用して共通の処理を並列化できるようにする
- エラーハンドリングで並列処理中に一部のステートが失敗しても全体が落ちないようにする
-
Lambda 関数
- データ生成用 Lambda: 100 個の CSV ファイルを生成
- データ処理用 Lambda: 各 CSV ファイルを読み込んで処理を実行
-
S3
- ステート間のデータ受け渡しに使用
- 処理前後のデータを保存
Distributed Mapを利用した理由としては、既存の Map ステートだと同時実行の並列処理が40個まが上限となっており、40個以上対応したい場合は必然的に Distributed Mapを使うことになります。
Distributed Mapを使うと、最大 10,000 回の並列のステートを維持できるため、かなりの数を並列でまかなうことができます。
さらに、1 つのステートマシンの実行履歴の最大イベント数は 25,000 件が上限となっており、それ以上をこえると、実行自体が失敗します。
Map ステート内でループを繰り返すような処理がある場合は、そのたびに実行履歴が記録されていくため簡単に 25,000 件を超えてしまいかねないです。
私の別の環境でも実行履歴を超えてしまいエラーとなってしまいました(そのときのマネコン上のエラーメッセージをとりそこねてしまいました。。)
これらの理由から、Distributed Map を使って実装を進めることにしました。
コード
コードはアップロードしておりますので、デプロイして確認できます。
まず、Step Functions のスタック定義です。
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda-nodejs";
import * as logs from "aws-cdk-lib/aws-logs";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";
export class AwsCdkParrallelStateMachineBySfnSampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// S3バケットの作成
const bucket = new s3.Bucket(this, "DataBucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// Lambda関数の作成
const generateDataFunction = new lambda.NodejsFunction(
this,
"GenerateDataFunction",
{
entry: "../server/src/generateData.ts",
handler: "handler",
runtime: cdk.aws_lambda.Runtime.NODEJS_22_X,
timeout: cdk.Duration.minutes(15),
memorySize: 1024,
environment: {
BUCKET_NAME: bucket.bucketName,
},
bundling: {
externalModules: ["aws-sdk"],
},
}
);
const processDataFunction = new lambda.NodejsFunction(
this,
"ProcessDataFunction",
{
entry: "../server/src/processData.ts",
handler: "handler",
runtime: cdk.aws_lambda.Runtime.NODEJS_22_X,
environment: {
BUCKET_NAME: bucket.bucketName,
},
bundling: {
externalModules: ["aws-sdk"],
},
}
);
// S3バケットへのアクセス権限を付与
bucket.grantReadWrite(generateDataFunction);
bucket.grantReadWrite(processDataFunction);
// Step Functionsのログ設定
const logGroup = new logs.LogGroup(this, "StateMachineLogGroup", {
logGroupName: "/aws/stepfunctions/ParallelProcessing",
retention: logs.RetentionDays.ONE_WEEK,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
// エラーハンドリング用のステート
const handleMapError = new sfn.Pass(this, "HandleMapError", {
parameters: {
"error.$": "$.error",
"cause.$": "$.cause",
},
});
// データ生成タスク
const generateDataTask = new tasks.LambdaInvoke(this, "GenerateData", {
lambdaFunction: generateDataFunction,
outputPath: "$.Payload",
payload: sfn.TaskInput.fromObject({
jobId: sfn.JsonPath.format(
"{}",
sfn.JsonPath.stringAt("$$.Execution.StartTime")
),
}),
});
// データ処理タスク
const processDataTask = new tasks.LambdaInvoke(this, "ProcessData", {
lambdaFunction: processDataFunction,
outputPath: "$.Payload",
}).addCatch(handleMapError, {
resultPath: "$.error",
});
// Distributed Mapの定義
const processMap = new sfn.DistributedMap(this, "ProcessMap", {
maxConcurrency: 100,
itemsPath: "$.items",
itemSelector: {
"jobId.$": "$.jobId",
"index.$": "$$.Map.Item.Value.index",
"inputLocation.$": "$$.Map.Item.Value.location",
},
resultPath: "$.mapResults",
});
processMap.itemProcessor(processDataTask);
// 成功・失敗の判定ステート
const success = new sfn.Succeed(this, "Success");
const failed = new sfn.Fail(this, "Failed", {
error: "MapStateError",
cause: "Error in map state execution",
});
// エラーチェックの条件分岐
const checkError = new sfn.Choice(this, "CheckError")
.when(sfn.Condition.isPresent("$.error"), failed)
.otherwise(success);
// ステートマシンの定義
const stateMachine = new sfn.StateMachine(
this,
"ParallelProcessingStateMachine",
{
definition: generateDataTask.next(processMap).next(checkError),
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
includeExecutionData: true,
},
tracingEnabled: true,
timeout: cdk.Duration.hours(1),
}
);
// 出力
new cdk.CfnOutput(this, "StateMachineArn", {
value: stateMachine.stateMachineArn,
description: "State Machine ARN",
});
new cdk.CfnOutput(this, "BucketName", {
value: bucket.bucketName,
description: "S3 Bucket Name",
});
}
}
テスト的に今回は実装しているため、データを生成する Lambda 関数は以下のように実装しました。
今回は簡単のために、このように実装していますが、本来の本番運用する場合は、ここでDynamoDBなどのデータストアからデータを取得したりして適切なフィルタリング後に後続の Distributed Map にデータを渡すと思います。
import { PutObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { Context } from "aws-lambda";
const s3Client = new S3Client({});
interface GenerateDataEvent {
jobId?: string;
}
export const handler = async (event: GenerateDataEvent, _context: Context) => {
const jobId = event.jobId || new Date().getTime().toString();
const fileCount = 100;
const results = [];
// 100個のファイルを生成
for (let index = 0; index < fileCount; index++) {
// サンプルデータの生成
const data = Array.from({ length: 1000 }, (_, i) => ({
id: `${index}-${i}`,
value: Math.random(),
timestamp: new Date().toISOString(),
}));
// CSVデータの作成
const csvContent = data
.map((row) => `${row.id},${row.value},${row.timestamp}`)
.join("\n");
// S3にアップロード
const bucketName = process.env.BUCKET_NAME;
const key = `jobs/${jobId}/data/${index}.csv`;
await s3Client.send(
new PutObjectCommand({
Bucket: bucketName,
Key: key,
Body: csvContent,
ContentType: "text/csv",
})
);
results.push({
index,
location: {
bucket: bucketName,
key,
},
});
}
return {
statusCode: 200,
jobId,
items: results,
};
};
Distributed Map で各並列のステートでデータ処理してS3に書き込みにいく Lambda のコードです。
あまり難しことはしておらず、S3 のファイル内の読み込んでデータを2倍にして書き込むだけです。
import {
S3Client,
GetObjectCommand,
PutObjectCommand,
} from "@aws-sdk/client-s3";
import { Context } from "aws-lambda";
const s3Client = new S3Client({});
export const handler = async (event: any, context: Context) => {
const { jobId, inputLocation } = event;
// S3からデータを読み込む
const getResponse = await s3Client.send(
new GetObjectCommand({
Bucket: inputLocation.bucket,
Key: inputLocation.key,
})
);
const csvContent = await getResponse.Body?.transformToString();
if (!csvContent) {
throw new Error("No content found");
}
// CSVデータを処理
const rows = csvContent.split("\n").map((row) => {
const [id, value, timestamp] = row.split(",");
return {
id,
value: parseFloat(value),
timestamp,
};
});
// データ処理(この例では単純に値を2倍にする)
const processedRows = rows.map((row) => ({
...row,
value: row.value * 2,
}));
// 処理結果をCSVに変換
const processedCsvContent = processedRows
.map((row) => `${row.id},${row.value},${row.timestamp}`)
.join("\n");
// 処理結果をS3に保存
const outputKey = inputLocation.key.replace("/data/", "/processed/");
await s3Client.send(
new PutObjectCommand({
Bucket: inputLocation.bucket,
Key: outputKey,
Body: processedCsvContent,
ContentType: "text/csv",
})
);
return {
statusCode: 200,
jobId,
outputLocation: {
bucket: inputLocation.bucket,
key: outputKey,
},
};
};
解説
実装において、特に注意を払った点がいくつかあります。
1. データの受け渡し
Step Functions のステート間でデータを受け渡す際、ペイロードサイズの制限(256KB)に注意が必要です。この制限を超えないように、以下の工夫を行いました。
- S3 を使用してデータを受け渡し
- ステート間では、S3 のバケット名とキーのみを受け渡し
- 処理結果も同様に S3 に保存
この方法は、AWS の re:Post 上でも公式見解としてもデータ受け渡しのサイズが 256KB を超える場合は S3 にデータを吐き出して取り回すことが推奨のソリューションである言及があります。
そのため、この方法は Step Functions の制約として覚えておいたほうがよいかとおもいます。
2. エラーハンドリング
分散処理では、一部の処理が失敗しても全体の処理を継続するひつようがありました。
以下のブログを参考にしました。
CDK の実装では以下の方針をとりました。
- Lambda 失敗した場合をキャッチする Catch 句を使用して、並列処理内のエラーをハンドリング
- 成否の判定のステートで Catch 句でキャッチしたエラーの判定をおこなって全体を失敗させるか判断する
const processDataTask = new tasks.LambdaInvoke(this, "ProcessData", {
lambdaFunction: processDataFunction,
outputPath: "$.Payload",
}).addCatch(handleMapError, {
resultPath: "$.error",
});
-- 中略 --
// 成功・失敗の判定ステート
const success = new sfn.Succeed(this, "Success");
const failed = new sfn.Fail(this, "Failed", {
error: "MapStateError",
cause: "Error in map state execution",
});
// エラーチェックの条件分岐
const checkError = new sfn.Choice(this, "CheckError")
.when(sfn.Condition.isPresent("$.error"), failed)
.otherwise(success);
3. 並列で実行されるステートの同時実行数
const processMap = new sfn.DistributedMap(this, "ProcessMap", {
maxConcurrency: 100,
itemsPath: "$.items",
itemSelector: {
"jobId.$": "$.jobId",
"index.$": "$$.Map.Item.Value.index",
"inputLocation.$": "$$.Map.Item.Value.location",
},
resultPath: "$.mapResults",
});
あまり他のパラメータについては考えられていないですが、並列で生成さえる同時実行のステートが100を超えないように指定しています。
ここは各要件で指定された同時実行数を指定して、増えすぎて Lambda から処理されるデータの出力や他の API を叩く場合の負荷を見積もって決める必要があるかとおもいます。
4. Step Functions のステートの流れのコーディング
ステートマシン自体の定義は以下でやっています。
definition という部分でステートがどのような順番で実行されていくかを定義できます。
なお、next というメソッドの引数に次に実行するステートを渡すことでチェーン上に記載していくことができて、JSON 形式で書かれるようにステート構文よりは理解しやすいかとおもいます。
const stateMachine = new sfn.StateMachine(
this,
"ParallelProcessingStateMachine",
{
definition: generateDataTask.next(processMap).next(checkError),
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
includeExecutionData: true,
},
tracingEnabled: true,
timeout: cdk.Duration.hours(1),
}
);
Distributed Map のステートを使ってデータを並列で出力する処理は以下です。
processMap は Distributed Map ステートのインスタンスです。
一方で、processDataTask は Lambda 関数を実行するタスクになっています。
processMap.itemProcessor(processDataTask) の部分については、Distributed Map の各イテレーションで実行される処理として processDataTask が設定されます。
// データ処理タスク
const processDataTask = new tasks.LambdaInvoke(this, "ProcessData", {
lambdaFunction: processDataFunction,
outputPath: "$.Payload",
}).addCatch(handleMapError, {
resultPath: "$.error",
});
// Distriuted Map の定義
const processMap = new sfn.DistributedMap(this, "ProcessMap", {
maxConcurrency: 100,
itemsPath: "$.items",
itemSelector: {
"jobId.$": "$.jobId",
"index.$": "$$.Map.Item.Value.index",
"inputLocation.$": "$$.Map.Item.Value.location",
},
resultPath: "$.mapResults",
});
processMap.itemProcessor(processDataTask);
なお、$.items から取得した配列の各要素のオブジェクトに対して、同一の処理を並列実行します。
各実行では、以下のパラメータが Lambda 関数に event として渡されます。
- jobId: 実行全体を識別するID
- index: 処理対象のインデックス
- inputLocation: 処理対象のS3ファイルの場所情報
この設定により、100個のCSVファイルを最大100の並列度で同時に処理することが可能になります。
実行結果
Step Functions のマネコンからテスト実行します。
S3 に100ファイル分のデータが出力されていました。
さいごに
AWS CDK を使用して Step Functions を実装してみて大規模な並列処理を Step Functions で実装する場合に Discributed Map を使うのは非常に簡単に実装できると思いました。
先日のアップデートで Step Functions のステート間で変数を使えるようになりました。
しかし、こちらの変数も割り当てられるデータサイズは 256 KB までとこれまでと同様のため、そういった場合が考えられる場合は S3 へのデータ出力は必要になります。
同じような処理を ECS Fargate で実装できるかとおもいます。
一方で、Step Functions を AWS CDK で書く利点としてはコード管理できることやステートの流れをメソッドチェーンのようにして理解しやすくするメリットが享受できるところだとかんじています。
さらに、Step Functions のマネコンのグラフビューで複雑な処理を視覚的にどこで成功/失敗したのかわかりやすいところも保守する上でのメリットになりそうです。
Discussion