🗂

AWS Step Functions Distributed Map で並列処理

2023/03/27に公開

久々に業務で、AWS Step Functions触る機会があり、ついでに、Distributed Map使ってみる機会があったので、アウトプット。
※中身は別途自分で検証した結果です。


Step Functions Distributed Map とは

各所で紹介されているので、あらためて紹介しなくても良さそうだけど。念のため。
昨年のAWS re:Invent 2022のCTO Werner VogelsのKeynoteで発表されたStep Functionsで大規模な並列処理を可能にするアップデートですね。
今までも、Mapを駆使すれば、並列度あげられたと思いますが、より簡単に大規模な並列処理ができるようになったものです。

https://aws.amazon.com/jp/blogs/aws/step-functions-distributed-map-a-serverless-solution-for-large-scale-parallel-data-processing/

上記Blogには、今までのMap Stateとの違いも表にして載っていまして、以下になっています。

※Step Functions Distributed Map – A Serverless Solution for Large-Scale Parallel Data Processing より

大きいのはこの2つかなと思います。※上記表より抜粋

Original map state flow New distributed map flow
Parallel branches Original map state flow : Map iterations run in parallel, with an effective maximum concurrency of around 40 at a time. Can pass millions of items to multiple child executions, with concurrency of up to 10,000 executions at a time.
Input source Accepts only a JSON array as input. Accepts input as Amazon S3 object list, JSON arrays or files, csv files, or Amazon S3 inventory.

Parallel branches

最大の同時実行数ですが、従来のMapでは最大40同時実行でしたが、Distributed Mapでは最大10000同時実行になってます。また、child executions(子実行)という仕組みができていて、呼び出し元、親実行とは別の処理の扱いになっているのも変更点ですね。最大100万実行まで渡せるみたいですが、自分はそこまで使わないかな・・・。
従来のMapで並列増えると実行履歴見るの面倒だったりするので、子実行として、別に実行履歴見れるのは個人的には結構いいかなと思います。
あと、子実行の処理結果をS3に吐き出すことができ、そのファイル内に子実行での実行結果が入っているので、
それ読み取って後続処理を実行するということも当然できます。
あ、同時実行10000ですが、無論その中でLambdaを呼ぶような場合、東京リージョンの標準1000同時実行だと、スロットリング起きちゃうので、その辺は注意が必要じゃないかと。

Input source

Mapでは、前処理などからのJSON Objectだけですが、Distributed Mapだと、Amazon S3のObjectリスト、CSVファイル、JSON配列もしくはJSONファイル、S3 Inventoryに一気に増えています。
数万行のCSV処理をもとに同時並列処理っていうのは確かにケースとしてはありそうです。

試してみた

CSVの例はよく見かけたのと、
JSONファイルでやりたいなーと思ったので、JSONでやってみることにします。

JSONファイルを準備

JSONファイルは、以前作った日本の空港コード付きの空港および飛行場の一覧をJSONファイルにしたものを使います。

余談)IATAに入っている IATA3レターコード は、Amazon CloudFrontCloudflare など、CDNのエンドポイントのIDとして出てくるので、覚えておいて損はないです(流石に全部覚えておく必要まではないですが)

[
  {
    "ICAO": "RJAA",
    "IATA": "NRT",
    "NAME": "成田",
    "ANAME1": "成田国際",
    "ANAME2": "",
    "NNAME": "",
    "REGION": "千葉"
  },
  {
    "ICAO": "RJAF",
    "IATA": "MMJ",
    "NAME": "松本",
    "ANAME1": "",
    "ANAME2": "",
    "NNAME": "",
    "REGION": "長野"
  },
  ...
]

こちらをS3バケットにアップロードします。
なお、S3バケットを作ったリージョンと、Step FunctionsのStateマシンがあるリージョンは同じにしないとダメだそうです。

Lambda関数を準備(とりあえず仮)

子処理のLambdaのEventオブジェクトとして、配列内のJSONオブジェクトが渡ってくるので、
一旦、Eventオブジェクトの中身だけLogに吐き出すLambda関数を作ります。

export const handler = async(event) => {
    console.log(JSON.stringify(event, null, 2));
    const response = {
        body: event,
    };
    return response;
};

Workflow StudioでStateマシンを構築

今回はWorkflow Studio使って作ってみました。
作成後、jsonファイルに出力したものです。

{
  "Comment": "A description of my state machine",
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "EXPRESS"
        },
        "StartAt": "LambdaInvokeChild",
        "States": {
          "LambdaInvokeChild": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:distributedMapChild:$LATEST"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException",
                  "Lambda.TooManyRequestsException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "Label": "Map",
      "MaxConcurrency": 100,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "JSON"
        },
        "Parameters": {
          "Bucket": "wada-distributed-map",
          "Key": "resource/airport.json"
        }
      },
      "Next": "Lambda Invoke"
    },
    "Lambda Invoke": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:distributedMapChild:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

ちなみに、最初S3に結果ファイル吐き出させる忘れて、後から追加したんですが、
Roleの変更を忘れないようにしないと・・・です(つまり忘れたことがあります)
StateMachine作成時にRole作った場合、Lambdaの実行も関数固定になるので、
後からLambda追加する場合も同様ですね。
まあ、どのLambdaでもOKってしちゃってもいいかなと思うんです。最小権限付与ってことで。

正常終了すると、Map処理の結果がS3保存されています。

意図的にエラー発生させてみる

さて、S3に結果ファイルを吐き出すようにしましたが、
別にそのままMap実行のResult、JSONオブジェクトで貰えばいいんじゃ?っていう話もあったりします。
100件とかなら、いいかもしれませんが、1000単位だと、ログで見たとしても辛いかなというところです。

また、S3に結果を吐き出すと、成功と失敗で出力されるファイルが別々になります。
結果ファイル(SUCCEEDED_x.json)だけ読み取って後続処理を実行する。
失敗ファイル(FAILED_x.json)があったら、その中身を読み取って、SNS使って通知する・・・なんてこともできるんじゃないかと。

失敗時の挙動も確認ということで、先程のLambda関数に、とあるKeyに値が入っていたら、
意図的にエラーになるコードを組み込んでみます。

export const handler = async(event) => {
    console.log(JSON.stringify(event, null, 2));
    if (event.NNAME != '') {
        throw new Error('Error!');
    }
    const response = {
        body: event,
    };
    return response;
};

定義ファイルはこんな感じです。
Map実行において、エラーの閾値を
今回は100%にしています。つまり、とりあえずエラー出ても、Map実行自体は正常に終了するようにしています。
指定しないと、ある程度エラーが発生すると、Map自体がエラーになります。
今回は、Map自体は正常終了させたかったので、閾値100%にしました。
https://docs.aws.amazon.com/step-functions/latest/dg/maprun-fail-threshold.html

{
  "Comment": "A description of my state machine",
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "EXPRESS"
        },
        "StartAt": "LambdaInvokeChild",
        "States": {
          "LambdaInvokeChild": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:distributedMapChild:$LATEST"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException",
                  "Lambda.TooManyRequestsException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "Label": "Map",
      "MaxConcurrency": 100,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "JSON"
        },
        "Parameters": {
          "Bucket": "wada-distributed-map",
          "Key": "resource/airport.json"
        }
      },
      "Next": "MapResults",
      "ResultWriter": {
        "Resource": "arn:aws:states:::s3:putObject",
        "Parameters": {
          "Bucket": "wada-distributed-map",
          "Prefix": "result/"
        }
      },
      "ToleratedFailurePercentage": 100
    },
    "MapResults": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:distributedMapResult:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

さて、実行させると、処理自体は正常に終わりました。
S3を見ると、成功ファイルと失敗ファイルができています。

ちなみに業務だと・・・

上記した通り、前処理でJSONファイル作ってそのファイルを元に並列処理させようと思ったんですが、
JSONファイルおかなくてもいいじゃんってなって、従来と同じJSON配列で分散処理させようとしてます。
子実行内で呼び出すLambda関数内でJSON配列を作ってレスポンスにセットしてしています。
それらの結果がS3の結果ファイルに保持されているので、それを読み取って、処理をする
・・・なんてことをしてるところです(絶賛開発中)

まとめ

S3にあるファイル使って分散処理できるのって結構便利ですね(小並感)
今までMapで並列頑張ってたところ、置き換えたら色々便利になったりしないかなと思ったりしてます。

次回

今回は動作の確認だったので、次はAWS CDKを使って、もう少しアプリとして動作するものを作ってみようと思います。

Discussion