🍣

AWS Step Functions で実現する並列処理パターン: パラレル・Map ステートの組み合わせによる大規模データ処理

2025/01/06に公開

はじめに

本記事は AWS Step Functions を使用した並列処理パターンについて、パラレルステートと Mapステートを組み合わせた処理フローの解説をします。

実装するユースケース

今回実装するのは以下のようなデータ処理フローです。

  1. パラレルステートで処理を2つに分岐:
  • 一方はリストデータの並列処理を担当。
  • もう一方は、S3から取得したCSVファイルを処理。
  1. 各分岐で Map ステートを使用し、データを並列処理。
  2. 最後にすべての処理結果を集約。

Step Functions ステートマシンの設計

全体のアーキテクチャ

イメージ

定義

{
  "Comment": "並列処理パターンのデモ",
  "StartAt": "ParallelProcessing",
  "States": {
    "ParallelProcessing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ProcessListData",
          "States": {
            "ProcessListData": {
              "Type": "Map",
              "MaxConcurrency": 10,
              "ToleratedFailurePercentage": 100,
              "Iterator": {
                "StartAt": "何かデータ処理",
                "States": {
                  "何かデータ処理": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::lambda:invoke",
                    "ResultPath": "$.taskResult",
                    "Catch": [
                      {
                        "ErrorEquals": [
                          "States.ALL"
                        ],
                        "ResultPath": "$.taskError",
                        "Next": "ProcessError"
                      }
                    ],
                    "Next": "ProcessSuccess"
                  },
                  "ProcessError": {
                    "Type": "Pass",
                    "End": true,
                    "Result": false
                  },
                  "ProcessSuccess": {
                    "Type": "Pass",
                    "End": true,
                    "Result": true
                  }
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "ProcessS3Files",
          "States": {
            "ProcessS3Files": {
              "Type": "Map",
              "ToleratedFailurePercentage": 100,
              "Iterator": {
                "StartAt": "何かS3データの処理",
                "States": {
                  "何かS3データの処理": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::lambda:invoke",
                    "OutputPath": "$.Payload",
                    "Parameters": {
                      "Payload.$": "$"
                    },
                    "Retry": [
                      {
                        "ErrorEquals": [
                          "Lambda.ServiceException",
                          "Lambda.AWSLambdaException",
                          "Lambda.SdkClientException",
                          "Lambda.TooManyRequestsException"
                        ],
                        "IntervalSeconds": 1,
                        "MaxAttempts": 3,
                        "BackoffRate": 2,
                        "JitterStrategy": "FULL"
                      }
                    ],
                    "Next": "S3ProcessSuccess",
                    "Catch": [
                      {
                        "ErrorEquals": [
                          "States.ALL"
                        ],
                        "Next": "S3ProcessError"
                      }
                    ]
                  },
                  "S3ProcessError": {
                    "Type": "Pass",
                    "End": true,
                    "Result": false
                  },
                  "S3ProcessSuccess": {
                    "Type": "Pass",
                    "End": true,
                    "Result": true
                  }
                },
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "EXPRESS"
                }
              },
              "End": true,
              "Label": "ProcessS3Files",
              "MaxConcurrency": 10,
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                  "InputType": "CSV",
                  "CSVHeaderLocation": "FIRST_ROW"
                },
                "Parameters": {
                  "Bucket.$": "$.bucket",
                  "Key.$": "$.key"
                }
              },
              "ItemBatcher": {
                "MaxItemsPerBatch": 100
              }
            }
          }
        }
      ],
      "ResultSelector": {
        "mapResults.$": "$[*][*]"
      },
      "ResultPath": "$.parallelResults",
      "Next": "エラー判定"
    },
    "エラー判定": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Next": "エラーあり?",
      "OutputPath": "$.Payload"
    },
    "エラーあり?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.errorCount",
          "NumericGreaterThan": 0,
          "Next": "Fail"
        }
      ],
      "Default": "Success"
    },
    "Fail": {
      "Type": "Fail"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

ステートマシンの各ステートの解説

1. ParallelProcessing (パラレルステート)
役割: 処理を2つの分岐に並列で分けて実行します。

  • ProcessListData: リスト形式のデータの並列処理
  • ProcessS3Files: S3に保存されたCSVデータの処理
    結果の集約:
  • ResultSelector を使い、各処理結果を$.mapResults として一次元配列化
  • ResultPath を使い、一次元化した処理結果$.mapResults$.parallelResults に格納
{
"ParallelProcessing": {
  "Type": "Parallel",
  "Branches": [
    ・・・略・・・
  ],
  "ResultSelector": {
    "mapResults.$": "$[*][*]"
  },
  "ResultPath": "$.parallelResults",
  "Next": "エラー判定"
},

2. ProcessListData (Mapステート)
役割: リストの各データを並列で処理します。

  • MaxConcurrency: 同時実行数を10に設定
  • ToleratedFailurePercentage: 許容される失敗率を100%に設定
  • Iterator: 各データを処理するためのステートマシン
    • 何かデータ処理: データを処理するLambda関数を呼び出し
    • ProcessSuccess: 処理成功時にtrueを返して終了
    • ProcessError: エラー発生時にfalseを返して処理を終了
{
"ProcessListData": {
  "Type": "Map",
  "MaxConcurrency": 10,
  "ToleratedFailurePercentage": 100,
  "Iterator": {
    ・・・略・・・
  },
  "End": true
}

3. ProcessS3Files (Mapステート)
役割: S3から取得したCSVファイルを並列で処理します。

  • MaxConcurrency: 同時実行数を10に設定
  • ItemBatcher:
    • MaxItemsPerBatch: 最大100アイテムを1バッチとして処理
  • ToleratedFailurePercentage: 許容される失敗率を100%に設定
  • ProcessorConfig:
    • ExecutionType: EXPRESS を指定して高速処理化
  • ItemReader: S3からCSVファイルを取得
    • ReaderConfig: CSV形式のデータを扱う際、ヘッダーの位置などを指定
    • Parameters: 対象のバケットとキー(ファイルパス)を指定
  • Iterator: 各S3オブジェクトを処理するためのステートマシン
    • 何かS3データの処理: S3ファイルを処理するLambda関数を呼び出し
    • S3ProcessSuccess: 処理成功時にtrueを返して終了
    • S3ProcessError: エラー発生時にfalseを返して処理を終了
"ProcessS3Files": {
  "Type": "Map",
  "ToleratedFailurePercentage": 100,
  "Iterator": {
    ・・・略・・・
    },
    "ProcessorConfig": {
      "Mode": "DISTRIBUTED",
      "ExecutionType": "EXPRESS"
    }
  },
  "End": true,
  "Label": "ProcessS3Files",
  "MaxConcurrency": 10,
  "ItemReader": {
    "Resource": "arn:aws:states:::s3:getObject",
    "ReaderConfig": {
      "InputType": "CSV",
      "CSVHeaderLocation": "FIRST_ROW"
    },
    "Parameters": {
      "Bucket.$": "$.bucket",
      "Key.$": "$.key"
    }
  },
  "ItemBatcher": {
    "MaxItemsPerBatch": 100
  }
}

4. エラー判定 (Taskステート)
役割: パラレルステートからの結果を分析し、エラーの有無を判定します。

  • Resource: エラー解析用のLambda関数を呼び出し、処理結果を分析
    Lambda関数への引数として、パラレルステートの出力データ($.parallelResults.mapResults)を受け渡し
  • Lambda関数の戻り値として、次のステートでエラーの有無を判定するために$.errorCountを含めておきます。
"エラー判定": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Next": "エラーあり?",
  "OutputPath": "$.Payload"
},

5. エラーあり? (Choiceステート)
役割: エラー判定処理の結果に基づき、エラーが発生しているかを判断します。

  • $.errorCount > 0 の場合、Failステートへ遷移
  • デフォルト(エラーがない場合)は、Successステートへ遷移
"エラーあり?": {
  "Type": "Choice",
  "Choices": [
    {
      "Variable": "$.errorCount",
      "NumericGreaterThan": 0,
      "Next": "Fail"
    }
  ],
  "Default": "Success"
},

6. Fail / Success (終端ステート)

  • Fail: 処理全体を失敗として終了
  • Success: 処理全体を成功として終了

実装のポイント

1. パラレルステートの効果的な使用

パラレルステートを使用することで、独立した処理を同時に実行できます。
今回のケースでは以下の2つの処理を並列で実行します:

  • リストデータ処理
  • S3 ファイル処理

これらの処理が互いに依存関係なく並列で実行されます。
これにより、全体の処理時間を大幅に短縮し、効率的なデータ処理が可能となります。

2. Map ステートによる並列処理

同時実行数(MaxConcurrency)の設定:
MaxConcurrencyは、Mapステートで同時に実行できる並列処理の最大数を制御するパラメータです。

設定例

{
  "Type": "Map",
  "MaxConcurrency": 10,
  "Iterator": {
    // イテレーターの設定
  }
}
  • MaxConcurrency: 指定した数分並列に処理が実行されます。10の場合、10個、100の場合100個の処理が同時に実行されます。
  • 100個のデータがあり、MaxConcurrencyが20の場合、まず20個が同時に処理され、最大20個同時実行数を維持しながら残りのデータ処理を順に実行されていきます。
  • 設定しない場合、Mapの処理モードによって違いがあります。
    インラインモードでは最大値の40、分散モードでは10,000となります。

Lambdaの同時実行数の制限や、処理内で行うDB操作の接続制限や負荷などを考慮して設定すると良いでしょう。

分散処理モード(ProcessorConfig):
ProcessorConfigModeDISTRIBUTEDに設定することで、Map ステートの処理を分散実行できます。これにより、大規模なデータセットの処理を効率的に行うことが可能になります。

{
  "ProcessorConfig": {
    "Mode": "DISTRIBUTED",
    "ExecutionType": "EXPRESS"
  }
}

バッチ処理の項目数制限(ItemBatcher)の設定:
Mapステートで大量のデータを処理する際に、データを適切なサイズのバッチに分割して処理する機能です。
バッチ処理の設定をしない場合は1項目ずつ処理が実行されます。
大量データ扱う際には、バッチ処理化してある程度まとめて処理することで効率化がはかれ、処理全体の速度向上が期待できると思います。

設定例

{
  "Type": "Map",
  "ItemBatcher": {
    "MaxItemsPerBatch": 100
  }
}
  • MaxItemsPerBatch: 1バッチ処理で扱う最大の項目数。1000件のCSVデータを入力とした場合、100件ずつ分割されてMap内の処理を実行します。
  • MaxInputBytesPerBatch: 今回の例では設定していませんが、バイト数でも制御が可能です。最大は256KBです。MaxItemsPerBatchのみ設定した場合でも、項目数の合計バイト数が256KBを超える場合は自動的に256KBに収まる項目数に調整されます。

3. エラーハンドリング

エラーハンドリングは以下の層で実装しています:

1. 個別タスク

  • Catch ブロックで個々のタスクのエラーを捕捉

2. Map ステート

  • Result を使用して処理結果を統一化
  • エラーが発生しても他のタスクの実行を継続させる

Mapステートの結果は、並列で実行した子ワークフロー結果が配列となって出力されます。
そのため、後続処理で扱いやすくするためには、Mapステート内で最後に実行される処理については、同じ構造にすると良いでしょう。
今回の例では単純にPassステートに成功したらtrue、失敗したらfalseを返すようにResultで定義しています。
Mapステートでは、Map内のエラーを許容するしきい値を設定することができます。このしきい値を100%にすることで処理が途中で止まらないようにしています。

3. パラレルステート

  • 分岐処理全体の結果を集約
    パラレルステートの結果は、分岐したそれぞれの結果を配列として返します。
    今回の例ではパラレルステート内ではMapステートの結果が返されているため、二次元配列の形で結果が出力されます。
    ResultSelectorを、以下のように定義することで、一元配列に変換が可能です。
{
  "mapResults.$": "$[*][*]"
}
# 出力の変換イメージ
## mapAの結果
[true, true, true]

## mapBの結果
[false, false, true]

## パラレルの結果(ResultSelector未使用)
[
  [true, true, true],
  [false, false, true]
]

## パラレルの結果(ResultSelectorを使用した場合)
{
  "mapResults": [true, true, true, false, false, true]
}

さらに、今回の例ではReslultPathを使用してパラレルステートに渡された入力値を引き継がせて、パラレルステートの結果を結合して出力しています。
Step Functions実行時に入力として渡された値を後続処理にも入力として使いたい場合に便利です。

## パラレルの結果(ResultPathを使用した場合)
{
  "元の入力1": "xxx",
  "元の入力2": "yyy",
  "元の入力3": "zzz",
  ・・・
  "parallelResults": {
    "mapResults": [true, true, true, false, false, true]
  }
}

実装上の課題と注意点

1. データの処理順序について

Map ステートやパラレルステートを使用する際、処理結果の順序は入力データの順序と必ずしも一致しません。これは並列処理の特性によるものです。
また、Mapステート内の各処理においては渡されたデータの塊が全体の何番目のデータなのかは判別できません。データ内に順番を特定できる要素を持つなどすれば判別可能です。

影響と対策

以下のような対策は取れますが、どれも決め手にかけます。

  • Map ステートの MaxConcurrency を 1 に設定。(ただし並列処理の利点が失われます)
  • 入力データにシーケンス番号や順序を示す識別子を付与。
  • 処理結果に順序情報を含めて、後続処理で並び替えを実施。

データ取り込み系の場合、以下のような方法が考えられます。

  1. 取り込み対象となるデータを抽出、バリデーションや加工などは並列で処理。
  2. 後続の処理で集約して並び替えて順番に登録更新を実施。

処理が細切れになり煩雑化したり、各処理の初期処理で時間がかかることもあるので、従来通り順番に処理することも再検討するのが良いでしょう。

2. Map ステート内でエラーが発生した場合、どの子ワークフローでエラーが発生したかを見つけるのが困難

今回の例では、Map ステート内の子ワークフローでエラーが発生した場合でも、Passステートでエラーがあった事としてfalseを返しますが、ステート自体が失敗扱いではないため、後から実行のステータスを確認する際にはすべて成功扱いとなってしまいます。
プログラムで言うところのCatchを握りつぶしている状態です。

まとめ

Step Functions のパラレルステートと Mapステートを組み合わせることで、大量データを扱うケースや複数処理を同時実行する際のデータ処理を効率的に実装できることができます。

参考資料

レスキューナウテックブログ

Discussion