😎

AWSのAPI GatewayとStepFunctionsのMAPステートだけで時間のかかる処理を実行しながら進捗を取得する方法

2024/01/30に公開

概要

AWSのAPI Gatewayの最長のタイムアウトは29秒ですが、それ以上に時間がかかる処理があった場合に、リクエストがタイムアウトしてしまい正常なステータスのレスポンスを返すことができません。そこでAPI Gatewayから直接StepFunctionsに統合し、StepFunctionsのMapステートの機能を活用することで、フロントからAPI Gatewayを経由してStepFunctionsの進捗情報を定期的に取得できないか試してみたところ、うまくいきましたのでご紹介します。
Mapステートは、入力データのリストまたは配列に対して処理を繰り返したいときに分散して並列実行できる機能です。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/tutorial-get-started-distributed-map.html

今回想定するユースケース

1万件を超えるユーザー情報のCSVファイルをアップロードし、そのCSVファイルの情報を元にマスターデータベースを更新するユースケースを想定します。処理時間は30秒を超えます。

クラウドアーキテクチャ全体

CSVのアップロード先にS3、マスターデータベースにAuroraを選定しています。
AuroraはVPCのプライベートサブネットにあるため、Lambdaは同じVPCで起動します。

今回説明をしない部分

  • StepFunctionsのロール(AWSコンソールが自動で作成したIAMロールをそのまま使います)
  • S3,VPC,Lambda,Aurora,Amplifyの設定詳細
  • CSVファイルをアップロードする方法(既にS3にファイルがアップロードされている前提で進めます)
  • Lambdaの中身(正常終了すれば今回の実験ではOKです)
  • フロントの実装方法

フロント表示

まずはざっくり実装イメージとして、どのようなアウトプットが完成したのかフロント画面の紹介です。
実装後は以下画像のように、開始時間、終了時間、成功件数、トータル件数などがリアルタイムで取得できるのでフロントに表示することができました。Vue.jsのフレームワークであるQuasarを使ってSPAモードで実装しました。

開始直後

処理中

完了後

StepFunctionsの定義

StepFunctionsのMapステートの機能としてS3のバケット名とオブジェクトキーを指定するだけで、Map内に入力を渡してくれるので大変便利です。

デザイン

コード

少し長いコードなので先にコードを示して、あとからポイントを説明します。

{
  "Comment": "CSVユーザー一括更新ワークフロー",
  "StartAt": "CSV10行ずつ並列実行",
  "States": {
    "CSV10行ずつ並列実行": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "ユーザー更新",
        "States": {
          "ユーザー更新": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:<region>:<account>:function:updateUser:$LATEST"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException",
                  "Lambda.TooManyRequestsException"
                ],
                "IntervalSeconds": 1,
                "MaxAttempts": 3,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "End": true,
      "Label": "CSV10行ずつ並列実行",
      "MaxConcurrency": 100,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "CSV",
          "CSVHeaderLocation": "FIRST_ROW"
        },
        "Parameters": {
          "Bucket.$": "$.bucketName",
          "Key.$": "$.objectKey"
        }
      },
      "ItemBatcher": {
        "MaxItemsPerBatch": 10
      }
    }
  }
}

ポイント1: MaxItemsPerBatch

1回のLambda実行で最大10アイテムずつ処理するようにMapの MaxItemsPerBatch を10にしています。

ポイント2: MaxConcurrency

Lambdaの同時実行数を100に制限するためにMapの MaxConcurrency を100にしました。今回は30秒以上かかる処理にしたかったことと、途中の進捗を取得するのが目的であったため数を小さくしていますが、通常は大規模なデータを高速に処理するために使うケースが多いと思いますので、その場合は MaxItemsPerBatchMaxConcurrency を大きくすることで処理を高速にすることができます。なお MaxConcurrency はAWSアカウント全体の同時実行数のクォータが適用されるため、他のLambdaに影響しないように調整が必要になりますね。

ポイント3: バケット名とオブジェクトキーに入力値を使う

以下抜粋したコードになりますが、これはAPI Gatewayから渡される入力値を使う設定にしています。

"Parameters": {
  "Bucket.$": "$.bucketName",
  "Key.$": "$.objectKey"
}

API Gateway(REST API)の設定

今回の肝となる設定です。(WAFを設定したかったためREST APIを選択しています。)
全部で3つのAPIを作成し、それぞれStepFunctionsのアクション「StartExecution」「ListMapRuns」「DescribeMapRun」と統合します。ひとつずつ説明します。

StartExecution

https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
StepFunctionsのステートマシンを実行するためのアクションです。実行には stateMachineArn が必須です。

リクエスト例

{
   "input": "string",
   "name": "string",
   "stateMachineArn": "string",
   "traceHeader": "string"
}

レスポンス例

{
  "executionArn": "arn:aws:states:<region>:<account>:execution:<ステートマシン名>:8f5e86ca-811e-4679-abed-57284f58a12e",
  "startDate":1.706027027934E9
}

API Gatewayの設定

基本設定(残りはデフォルト値)

項目 設定値
リソースとメソッド /state-machine/start-execution (POST)
統合タイプ AWSのサービス
AWSのサービス Step Functions
HTTPメソッド POST
アクション名 StartExecution
実行ロール AWS管理ポリシー「AWSStepFunctionsFullAccess」を付与したロール

リクエスト本文

{"s3ObjectKey": "user.csv"}

リクエストマッピングテンプレート(application/json)

#set($inputRoot = $input.path('$'))
#set($bucketName = '"sample-bucket-f4dydcvc5bik"')
#set($objectKey = '"' + $inputRoot.s3ObjectKey + '"')
#set($payloadMap = {
  '"bucketName"' : $bucketName,
  '"objectKey"' : $objectKey
})
#set($payload = $payloadMap.toString().replace("=", ":"))
{
  "input": "$util.escapeJavaScript($payload)",
  "stateMachineArn": "arn:aws:states:<region>:<account>:stateMachine:<ステートマシン名>"
}

レスポンスマッピングテンプレート(application/json)

ユーザーへのレスポンスにはARNを直接見せたくないため、executionArnのうち固有文字列部分だけを返すための設定です。

#set($executionId = $input.json("executionArn").replaceAll("arn:aws:states:<region>:<account>:execution:<ステートマシン名>:", ""))
{"executionId":$executionId}

レスポンス

{"executionId": "8f5e86ca-811e-4679-abed-57284f58a12e"}

ListMapRuns

https://docs.aws.amazon.com/step-functions/latest/apireference/API_ListMapRuns.html
特定のステートマシンの実行によって開始されたすべてのマップ実行をリストします。実行には executionArn が必須です。

リクエスト例

{
   "executionArn": "string",
   "maxResults": number,
   "nextToken": "string"
}

レスポンス例

{
  "mapRuns": [
    {
      "executionArn": "arn:aws:states:<region>:<account>:execution:<ステートマシン名>:8f5e86ca-811e-4679-abed-57284f58a12e",
      "mapRunArn": "arn:aws:states:<region>:<account>:mapRun:<ステートマシン名>/<Mapの状態名>:3a15d2c7-0ec8-4b58-8664-1d6c9d632815",
      "startDate": 1.706026779348e9,
      "stateMachineArn": "arn:aws:states:<region>:<account>:stateMachine:<ステートマシン名>/<Mapの状態名>"
    }
  ]
}

API Gatewayの設定

基本設定(残りはデフォルト値)

項目 設定値
リソースとメソッド /state-machine/list-map-runs (POST)
統合タイプ AWSのサービス
AWSのサービス Step Functions
HTTPメソッド POST
アクション名 ListMapRuns
実行ロール AWS管理ポリシー「AWSStepFunctionsFullAccess」を付与したロール

リクエスト本文

{"executionId": "8f5e86ca-811e-4679-abed-57284f58a12e"}

リクエストマッピングテンプレート(application/json)

#set($executionId = $input.path('$.executionId'))
{
   "executionArn": "arn:aws:states:<region>:<account>:execution:<ステートマシン名>:$executionId"
}

レスポンスマッピングテンプレート(application/json)

ユーザーへのレスポンスにはARNを直接見せたくないため、mapRunArnのうち固有文字列部分だけを返すための設定です。

#set($mapRunId = $input.json("mapRuns[0].mapRunArn").replaceAll("arn:aws:states:<region>:<account>:mapRun:<ステートマシン名>/<Mapの状態名>:", ""))
{"mapRunId":$mapRunId}

レスポンス

{"mapRunId": "3a15d2c7-0ec8-4b58-8664-1d6c9d632815"}

DescribeMapRun

https://docs.aws.amazon.com/step-functions/latest/apireference/API_DescribeMapRun.html
マップ実行の構成、進行状況、結果に関する情報を提供します。マップ実行を再ドライブした場合、そのマップ実行の再ドライブに関する情報も返します。

リクエスト例

{
   "mapRunArn": "string"
}

レスポンス例

実行中

{
  "executionArn": "arn:aws:states:<region>:<account>:execution:<ステートマシン名>:8f5e86ca-811e-4679-abed-57284f58a12e",
  "executionCounts": {
    "aborted": 0,
    "failed": 0,
    "failuresNotRedrivable": 0,
    "pending": 273,
    "pendingRedrive": 0,
    "resultsWritten": 0,
    "running": 98,
    "succeeded": 1503,
    "timedOut": 0,
    "total": 1874
  },
  "itemCounts": {
    "aborted": 0,
    "failed": 0,
    "failuresNotRedrivable": 0,
    "pending": 2728,
    "pendingRedrive": 0,
    "resultsWritten": 0,
    "running": 980,
    "succeeded": 15030,
    "timedOut": 0,
    "total": 18738
  },
  "mapRunArn": "arn:aws:states:<region>:<account>:mapRun:<ステートマシン名>/<Mapの状態名>:3a15d2c7-0ec8-4b58-8664-1d6c9d632815",
  "maxConcurrency": 100,
  "redriveCount": 0,
  "startDate": 1.706541933885e9,
  "status": "RUNNING",
  "toleratedFailureCount": 0,
  "toleratedFailurePercentage": 0.0
}

完了後

{
  "executionArn": "arn:aws:states:<region>:<account>:execution:<ステートマシン名>:8f5e86ca-811e-4679-abed-57284f58a12e",
  "executionCounts": {
    "aborted": 0,
    "failed": 0,
    "failuresNotRedrivable": 0,
    "pending": 0,
    "pendingRedrive": 0,
    "resultsWritten": 1874,
    "running": 0,
    "succeeded": 1874,
    "timedOut": 0,
    "total": 1874
  },
  "itemCounts": {
    "aborted": 0,
    "failed": 0,
    "failuresNotRedrivable": 0,
    "pending": 0,
    "pendingRedrive": 0,
    "resultsWritten": 18738,
    "running": 0,
    "succeeded": 18738,
    "timedOut": 0,
    "total": 18738
  },
  "mapRunArn": "arn:aws:states:<region>:<account>:mapRun:<ステートマシン名>/<Mapの状態名>:3a15d2c7-0ec8-4b58-8664-1d6c9d632815",
  "maxConcurrency": 100,
  "redriveCount": 0,
  "startDate": 1.706541933885e9,
  "status": "SUCCEEDED",
  "stopDate": 1.70654199684e9,
  "toleratedFailureCount": 0,
  "toleratedFailurePercentage": 0.0
}

API Gatewayの設定

基本設定(残りはデフォルト値)

項目 設定値
リソースとメソッド /state-machine/describe-map-run (POST)
統合タイプ AWSのサービス
AWSのサービス Step Functions
HTTPメソッド POST
アクション名 DescribeMapRun
実行ロール AWS管理ポリシー「AWSStepFunctionsFullAccess」を付与したロール

リクエスト本文

{"mapRunId": "3a15d2c7-0ec8-4b58-8664-1d6c9d632815"}

リクエストマッピングテンプレート(application/json)

#set($mapRunId = $input.path('$.mapRunId'))
{
   "mapRunArn": "arn:aws:states:<region>:<account>:mapRun:<ステートマシン名>/<Mapの状態名>:$mapRunId"
}

レスポンスマッピングテンプレート(application/json)

{
  "itemCounts": $input.json("itemCounts"),
  "startDate": $input.json("startDate"),
  "status": $input.json("status"),
  "stopDate": $input.json("stopDate"),
  "toleratedFailureCount": $input.json("toleratedFailureCount"),
  "toleratedFailurePercentage": $input.json("toleratedFailurePercentage")
}

レスポンス

{
  "itemCounts": {
    "aborted": 0,
    "failed": 0,
    "failuresNotRedrivable": 0,
    "pending": 0,
    "pendingRedrive": 0,
    "resultsWritten": 18738,
    "running": 0,
    "succeeded": 18738,
    "timedOut": 0,
    "total": 18738
  },
  "startDate": 1.706541933885e9,
  "status": "SUCCEEDED",
  "stopDate": 1.70654199684e9,
  "toleratedFailureCount": 0,
  "toleratedFailurePercentage": 0.0
}

以上でAPI Gatewayの設定は終わりです。あとはフロントから順番にAPIを叩きます。最後のDescribeMapRunは、処理が完了するまで一定間隔を開けてループ実行します。
次でシーケンス図にまとめました。

シーケンス図

フロントがListMapRunsを実行する前に1秒待機しているのは、ステートマシン実行直後はまだMapが実行されていないためで、実行される前にリクエストすると {"mapRuns":[]} のようなレスポンスになります。より丁寧に実装するには、レスポンスが空だった場合は時間をあけてリトライを実装したほうがいいと思います。

ブラウザの開発コンソールで見たAPIリクエストの流れ

start-execution, list-map-runs, describe-map-runの順でリクエストが投げられdescribe-map-runが一定間隔をあけてループしている様子が分かります。また処理全体では1分を超えますが、一つ一つのリクエストの時間を見ると50〜160ミリ秒で返ってきていることがわかります。時間がかかる場合はループの数が増えていくだけのため、処理全体で30秒を超えてもタイムアウトすることはありません。

フロントの実装


フロントはAmplifyホスティングでSPAをホスティングする予定ですが、今回はローカルでお手軽に試してみました。
実装方法は今回は主題ではないので、一部抜粋のみにします。ループにはwhileを、待機にはsetTimeout()を使いました。setTimeoutのコールバックの中でPromiseをresolveする作りにしました。以下はサンプルコードです。

if (status.value == 'RUNNING') {
  while (status.value == 'RUNNING') {
    progress = await describeMapRun(
      mapRunId,
      5000
    );
    totalCount.value = progress.total;
    succeededCount.value = progress.succeeded;
    status.value = progress.status;
  }
}
type MapRun = {
  status: 'RUNNING' | 'SUCCEEDED';
  startDateTimestamp: number;
  endDateTimestamp: number | null;
  succeeded: number;
  total: number;
};
const describeMapRun = async (
  mapRunId: string,
  ms: number
): Promise<MapRun> => {
  return new Promise(function (resolve) {
    setTimeout(async () => {
      try {
        const res = await api.post('/state-machine/describe-map-run', {
          mapRunId: mapRunId,
        });
        resolve({
          status: res.data.status,
          startDateTimestamp: res.data.startDate,
          endDateTimestamp: res.data.stopDate ? res.data.stopDate : null,
          succeeded: res.data.itemCounts.succeeded,
          total: res.data.itemCounts.total,
        });
      } catch (e: any) {
        console.error(e);
        throw new Error();
      }
    }, ms);
  });
};

まとめ

サーバーレスアーキテクチャのみでここまで細かいリアルタイムな情報が取れるのが感動でした。StepFunctionsのMapステートの機能であるS3と簡単に統合する機能などは、もし知らなかったらLambdaなどでソースコードを書いてSDKを使って自分でS3からCSVファイルをダウンロードするような無駄な実装をしてしまいそうです。知らないって恐ろしいですね。今回は色々実験しながらだったのでAWSコンソールで作成したのですが、そのうちTerraformなどでコード化する予定です。
ステートマシンをAWSのコンソール画面から作ろうとすると、たくさんのテンプレートが用意されていて、眺めているだけで「え?こんなこともできるの?」という驚きがありますので、ぜひ見てみてください。

レスキューナウテックブログ

Discussion