🐾

AWS Step Functions で同時実行数制御を行う方法

2024/04/14に公開

🤔 どんな人向けの記事か

☑︎ これから AWS Step Functions で定期バッチのワークフローを構築したい人
☑︎ AWS Step Functions の同時実行数制御を行いたい人

🐈‍⬛ 背景

AWS Step Functions(以下、Step Functions)のユースケースの一つとして、EventBridge Schedulerと組み合わせた定期バッチの実行があります。
EventBridge自体の実行保証は、1回以上となっています。そのため、スケジュールしたイベントが複数回発行されるケースが存在します。また、定期バッチを行う場合、特定のタイミングでバッチの実行時間が長くなり、実行中に次のバッチが開始されることがあります。

基本的には、冪等性を保つことで重複実行されても問題ない実装をアプリケーション側で行うことが重要ですが、冪等性のある実装でも、実行コストが高いタスクのため重複実行は避けたいケースや冪等性を保証することが困難なケースも存在します。

現在、AWS Step Functionsには同時実行数制御の仕組みが組み込まれていません。同時実行数制御について紹介する記事では、AWS Lambdaを利用して同時実行数を制御し、重複実行を回避する方法が紹介されているのを多くみました。
本記事では、Lambdaを使用せずに同時実行数を制御する方法を紹介します。

💡 結論

Step Functions の API List Executions と組み込み関数の States.ArrayLength を組み合わせて同時実行数を制御できます。

🖋️ 解説

具体的にStep Functionsで同時実行数を制御する方法について解説し、どのようなユースケースで利用できるかのパターンを紹介します。

Step Functions の API List Executions

Step Functions はステップ内で、AWS サービスのAPIにリクエストし、実行することができます。今回は、Step Functions のAPIの一つである、List Executions を利用します。 List Executionsはリクエストのペイロードとして、State MachineのARNやStatusのフィルターを設定できます。今回は、自身の State Machine ARNを設定し、Statusは実行中(RUNNING)とします。

以下は、該当のStateの部分を一部抜粋しています。

"GetRunningStatesNum": {
  "Type": "Task",
  "Parameters": {
    "StateMachineArn.$": "$$.StateMachine.Id",
    "StatusFilter": "RUNNING"
  },
  "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions",
  "Next": "CheckConcurrentExecutionNum"
}

※ 本記事の主題とは離れますが、API側のパラメーターのフォーマットが camelCaseになっている場合でも、Step Functions から実行する場合には、PascalCaseで設定する必要があります。

States.ArrayLength

States.ArrayLengthは、Step Functionsの組み込み関数の一つで、名前の通り配列の要素数を取得します。
以下の例では、前述の ListExecutions の結果から実行中の数を取得しています。

"ResultSelector": {
  "execution_num.$": "States.ArrayLength($.Executions)"
},

📄 ユースケースの紹介

重複実行の場合は失敗させる

最も単純な同時実行数の制御方法は、重複実行がある場合にはその実行を失敗させるというものです。

具体的な実装手順:

  1. List Executionsを利用して、現在実行中のタスクの一覧を取得
  2. List Executionsの結果をStates.ArrayLengthで実行数を取得
  3. 実行数が同時実行数(下記の例では1)を超えている場合、Failを実行


StateMachineの構造


同時実行制御を入れたStateMachineの同時実行例(左: 1回目, 右: 2回目)

States の定義
{
  "Comment": "A description of my state machine",
  "StartAt": "GetRunningStatesNum",
  "States": {
    "GetRunningStatesNum": {
      "Type": "Task",
      "Parameters": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "StatusFilter": "RUNNING"
      },
      "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions",
      "ResultSelector": {
        "execution_num.$": "States.ArrayLength($.Executions)"
      },
      "Next": "CheckConcurrentExecutionNum"
    },
    "CheckConcurrentExecutionNum": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.execution_num",
          "NumericGreaterThanEquals": 2,
          "Next": "Fail"
        }
      ],
      "Default": "Wait"
    },
    "Fail": {
      "Type": "Fail"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 15,
      "End": true
    }
  }
}

重複実行の場合は時間をおいて再度実行する

先ほどの例では、重複実行がある場合にはその実行を失敗させていました。ユースケースによっては、一定時間をおいて再実行したいケースもあると思います。そのようなケースの場合は、Waitステートを使用して一定時間待機した後に再度List Executionsを実行することで、ステートマシン内でタスクの再実行が可能になります。

具体的な実装手順:

  1. List Executionsを利用して、現在実行中のタスクの一覧を取得
  2. List Executionsの結果をStates.ArrayLengthで実行数を取得
  3. 実行数が同時実行数(下記の例では1)を超えている場合、一定時間待機(下記の例では、60秒)し、再度実行

※ このような利用の仕方をする場合は、無限ループしないための仕組みが別途必須になると思います。


StateMachineの構造


同時実行制御を入れたStateMachineの同時実行例(左: 1回目, 右: 2回目)

Statesの定義
{
  "Comment": "A description of my state machine",
  "StartAt": "GetRunningStatesNum",
  "States": {
    "GetRunningStatesNum": {
      "Type": "Task",
      "Parameters": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "StatusFilter": "RUNNING"
      },
      "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions",
      "ResultSelector": {
        "execution_num.$": "States.ArrayLength($.Executions)"
      },
      "Next": "CheckConcurrentExecutionNum"
    },
    "CheckConcurrentExecutionNum": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.execution_num",
          "NumericGreaterThanEquals": 2,
          "Next": "WaitingForRetry"
        }
      ],
      "Default": "Wait"
    },
    "WaitingForRetry": {
      "Type": "Wait",
      "Seconds": 60,
      "Next": "GetRunningStatesNum"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 15,
      "End": true
    }
  }
}

🚀 まとめ

本記事では、Step Functions で同時実行数を制御し、重複実行を回避する方法を紹介しました。 Lambdaは簡単に実装が可能ですが、Lambdaの中でAWSのAPIを利用している場合は、Lambdaを使わずに Step Functionsから直接APIを利用することで同じ状態を簡単に表現できるかどうかを一度考えてみても良いと思います。

📚 参考

Discussion