📝
Step Functions の Map ステートで CSV ファイルを一行ずつ処理してみた
Map workflow state - AWS Step Functions
S3 に保存してある CSV ファイルを Map ステートで並列に処理する簡単なサンプルです。
S3 バケットに CSV ファイルをアップロード
以下のような CSV ファイルを S3 バケットにアップロードしました。
test01.csv
id,name
1,test1
2,test2
3,test3
Lambda 関数作成
Map ステートから呼び出して CSV ファイルの一行を出力する関数を作成しました。
ランタイムは Node.js 22.x です。
index.mjs
export const handler = async (event) => {
console.log(JSON.stringify(event, null, 2))
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
ステートマシン作成
以下の定義で作成しました。
{
"Comment": "A description of my state machine",
"StartAt": "ProcessFiles",
"States": {
"ProcessFiles": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "Lambda Invoke",
"States": {
"Lambda Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:012345678901:function:test:$LATEST",
"Payload.$": "$"
},
"End": true
}
}
},
"End": true,
"Label": "ProcessFiles",
"MaxConcurrency": 1000,
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "FIRST_ROW",
"CSVDelimiter": "COMMA"
},
"Parameters": {
"Bucket": "your-s3-bucket-name",
"Key": "test01.csv"
}
}
}
}
}
FunctionName
は作成済みの関数 ARN に置換してください。
Bucket
は CSV ファイルをアップロードした S3 バケット名に置換してください。
実行結果
input なしでステートマシンを実行すると、以下のような Lambda の実行ログが出力されます。
{
"name": "test1",
"id": "1"
}
{
"name": "test2",
"id": "2"
}
{
"name": "test3",
"id": "3"
}
CSV ファイルが 1 行ずつ処理されたことを確認できました。
ファイルパスを input で動的に指定する方法
上記定義の場合、S3 バケットに保存した CSV ファイル名をハードコードしていますが、以下のように input で動的に指定することも可能です。
{
"Comment": "A description of my state machine",
"StartAt": "ProcessFiles",
"States": {
"ProcessFiles": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "Lambda Invoke",
"States": {
"Lambda Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:012345678901:function:test:$LATEST",
"Payload.$": "$"
},
"End": true
}
}
},
"End": true,
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "FIRST_ROW",
"CSVDelimiter": "COMMA"
},
"Parameters": {
"Bucket": "your-s3-bucket-name",
"Key.$": "$.key"
}
}
}
}
}
Key
の部分の input の変数を参照するように変更してあります。
この状態で以下の input を定義してステートマシンを実行すると、上述の実行結果と同様の結果を得られます。
{
"key": "test01.csv"
}
Lambda 関数からの実行例
上記ステートマシンを Node.js 22.x の Lambda 関数から呼び出すコード例です。
index.js
const { SFNClient, StartExecutionCommand } = require("@aws-sdk/client-sfn");
exports.handler = async function (event, context) {
const client = new SFNClient();
const input = {
stateMachineArn: "your-statemachine-arn",
input: JSON.stringify({
key: "test01.csv"
})
};
const command = new StartExecutionCommand(input);
const response = await client.send(command);
return response
};
stateMachineArn
はステートマシンの ARN に置換してください。
上記の Lambda と ステートマシンを使用して以下のようなフローも実装可能です。
- S3 バケットに CSV ファイルをアップロード
- S3 イベント通知でステートマシンを実行する Lambda A を起動
- ステートマシンの Map ステートから CSV ファイルを処理する Lambda B を起動
- Lambda B で任意の処理を実行
まとめ
今回は Step Functions の Map ステートで CSV ファイルを一行ずつ処理してみました。
どなたかの参考になれば幸いです。
Discussion