📝

Step Functions の Map ステートで CSV ファイルを一行ずつ処理してみた

2025/03/06に公開

Map workflow state - AWS Step Functions

S3 に保存してある CSV ファイルを Map ステートで並列に処理する簡単なサンプルです。

S3 バケットに CSV ファイルをアップロード

以下のような CSV ファイルを S3 バケットにアップロードしました。

test01.csv
id,name
1,test1
2,test2
3,test3

Lambda 関数作成

Map ステートから呼び出して CSV ファイルの一行を出力する関数を作成しました。
ランタイムは Node.js 22.x です。

index.mjs
export const handler = async (event) => {

  console.log(JSON.stringify(event, null, 2))
  const response = {
    statusCode: 200,
    body: JSON.stringify('Hello from Lambda!'),
  };
  return response;
};

ステートマシン作成

以下の定義で作成しました。

{
  "Comment": "A description of my state machine",
  "StartAt": "ProcessFiles",
  "States": {
    "ProcessFiles": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "Lambda Invoke",
        "States": {
          "Lambda Invoke": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "FunctionName": "arn:aws:lambda:ap-northeast-1:012345678901:function:test:$LATEST",
              "Payload.$": "$"
            },
            "End": true
          }
        }
      },
      "End": true,
      "Label": "ProcessFiles",
      "MaxConcurrency": 1000,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "CSV",
          "CSVHeaderLocation": "FIRST_ROW",
          "CSVDelimiter": "COMMA"
        },
        "Parameters": {
          "Bucket": "your-s3-bucket-name",
          "Key": "test01.csv"
        }
      }
    }
  }
}

FunctionName は作成済みの関数 ARN に置換してください。
Bucket は CSV ファイルをアップロードした S3 バケット名に置換してください。

実行結果

input なしでステートマシンを実行すると、以下のような Lambda の実行ログが出力されます。

{
    "name": "test1",
    "id": "1"
}
{
    "name": "test2",
    "id": "2"
}
{
    "name": "test3",
    "id": "3"
}

CSV ファイルが 1 行ずつ処理されたことを確認できました。

ファイルパスを input で動的に指定する方法

上記定義の場合、S3 バケットに保存した CSV ファイル名をハードコードしていますが、以下のように input で動的に指定することも可能です。

{
  "Comment": "A description of my state machine",
  "StartAt": "ProcessFiles",
  "States": {
    "ProcessFiles": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "Lambda Invoke",
        "States": {
          "Lambda Invoke": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "FunctionName": "arn:aws:lambda:ap-northeast-1:012345678901:function:test:$LATEST",
              "Payload.$": "$"
            },
            "End": true
          }
        }
      },
      "End": true,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "CSV",
          "CSVHeaderLocation": "FIRST_ROW",
          "CSVDelimiter": "COMMA"
        },
        "Parameters": {
          "Bucket": "your-s3-bucket-name",
          "Key.$": "$.key"
        }
      }
    }
  }
}

Key の部分の input の変数を参照するように変更してあります。
この状態で以下の input を定義してステートマシンを実行すると、上述の実行結果と同様の結果を得られます。

{
  "key": "test01.csv"
}

Lambda 関数からの実行例

上記ステートマシンを Node.js 22.x の Lambda 関数から呼び出すコード例です。

index.js
const { SFNClient, StartExecutionCommand } = require("@aws-sdk/client-sfn");

exports.handler = async function (event, context) {
  const client = new SFNClient();

  const input = {
    stateMachineArn: "your-statemachine-arn",
    input: JSON.stringify({
      key: "test01.csv"
    })
  };
  const command = new StartExecutionCommand(input);
  const response = await client.send(command);

  return response
};

stateMachineArn はステートマシンの ARN に置換してください。

上記の Lambda と ステートマシンを使用して以下のようなフローも実装可能です。

  1. S3 バケットに CSV ファイルをアップロード
  2. S3 イベント通知でステートマシンを実行する Lambda A を起動
  3. ステートマシンの Map ステートから CSV ファイルを処理する Lambda B を起動
  4. Lambda B で任意の処理を実行

まとめ

今回は Step Functions の Map ステートで CSV ファイルを一行ずつ処理してみました。
どなたかの参考になれば幸いです。

参考資料

Discussion