🐧

Step FunctionsのMap処理後のステップに、実行結果の一部と任意の文字列を入力したい

2022/01/27に公開

はじめに

こんにちは、クラスメソッド サービスグロースチームの筧です。

Step Functions の Map 処理後のステップに、「実行結果の一部」と「任意の文字列」を渡したい要件があり調べました。「実行結果の一部」のみを渡すのは簡単でしたが、「任意の文字列」とあわせて表示するのに少し苦労したので、調査結果を汎化してご紹介します。

前提

  • Serverless Framework を利用してデプロイしています。

https://www.serverless.com/

やりたいこと

ステートマシンです。

以下を実現したいです。

  • Hello ステップには Lambda 関数が紐付けられており、成功有無で分岐
  • 成功した場合は、Done ステップに分岐して、「入力の account_id の辞書」 と「"static_value": "Success" という任意の文字列からなる辞書」を出力
  • 失敗した場合は、Error ステップに分岐して、「入力の account_id の辞書」と「"static_value": "Failed" という任意の文字列からなる辞書」を出力
Start の入力
[
  {
    "account_id": "123456789012-1",
    "account_name": "hogehoge1"
  },
  {
    "account_id": "123456789012-2",
    "account_name": "hogehoge2"
  },
  {
    "account_id": "123456789012-3",
    "account_name": "hogehoge3"
  }
]
  • 最終的に、IsComplete ステップに以下のような入力を渡す
IsComplete ステップの入力
[
  {
    "account_id": "123456789012-1",
    "static_value": "Success"
  },
  {
    "account_id": "123456789012-2",
    "static_value": "Success"
  },
  {
    "account_id": "123456789012-3",
    "static_value": "Success"
  }
]

なお、Map処理中にエラーが発生しても、予めステートマシン全体が中断しないようにしています。

https://dev.classmethod.jp/articles/step-functions-map-behavior/

結論

  • 出力関連のフィールド(ResultPath, ResultSelector)では、「任意の文字列」を渡すことができないように見受けられるので、ステートマシンの定義だけでは、「実行結果の一部」と「任意の文字列」をあわせて、次のステップに渡すのは難しそう
  • ステートマシンの入力では「任意の文字列」を渡すことができるので、Lambdaで「任意の文字列」を取得しつつ、「実行結果の一部」とあわせて return することで、要件を実現することができた

以下が要件を実現したコードです。

includes/state-machines.yml
stateMachines:
  MapTestFunc:
    name: MapTestFunc-${self:provider.stage}
    definition:
      StartAt: MapTest
      States:
        MapTest:
          Type: Map
          InputPath: "$"
          Next: IsComplete
          Iterator:
            StartAt: Hello
            States:
              Hello:
                Type: Task
                Parameters:
                  param.$: $
                  Execution.$: $$.Execution
                Resource:
                  Fn::GetAtt: [hello, Arn]
                ResultPath: $.result-info
                Catch:
                  - ErrorEquals:
                      - States.ALL
                    ResultPath: "$.error-info"
                    Next: Error
                Next: Done
              Done:
                Type: Task
                Parameters:
                  param.$: $
                  Execution.$: $$.Execution
                  StaticValue: Success
                Resource:
                  Fn::GetAtt: [map_result, Arn]
                End: true
              Error:
                Type: Task
                Parameters:
                  param.$: $
                  Execution.$: $$.Execution
                  StaticValue: Failed
                Resource:
                  Fn::GetAtt: [map_result, Arn]
                End: true
        IsComplete:
          Type: Pass
          End: true

src/handlers/hello.py
import logging
import random

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    """
    hello エントリーポイント
    """
    raise
    resp = str(random.uniform(100, 200))
    return resp


def map_result(event, context):
    account_id = str(event["param"]["account_id"])
    static_value = str(event["StaticValue"])
    return {"account_id": account_id, "static_value": static_value}

解説

Map処理の1つ目が成功した場合のフローを見ていきます。

Hello ステップの入力

  • Hello ステップの出力。
  • Hello ステップに紐づく Lambda関数(src/handlers/hello.handler) が成功して、結果が result-info に出力されます。

Done ステップの入力

  • グラフインスペクターのステップ入力には表示されていませんが、Done ステップのステートマシンの定義で、StaticValue: Success という「任意の文字列からなる辞書」を、Done ステップに紐づく Lambda関数(src/handlers/hello.map_result) に渡しています。

Done ステップの出力

  • Done ステップに紐づく Lambda関数(src/handlers/hello.map_result) で return に「実行結果の一部」と「任意の文字列」を出力しています。

IsComplete ステップの入力

  • 各Map処理の最終出力が配列で渡されています。
  • 「実行結果の一部」と「任意の文字列」も出力できています!

補足

失敗した場合のフローを確認したい場合は、src/handlers/hello.py 内で raise を差し込んで確認してください。

src/handlers/hello.handler
def handler(event, context):
    """
    hello エントリーポイント
    """
    raise
    resp = str(random.uniform(100, 200))
    return resp

結論に至るまでの調査概要

結論に至るまでの調査内容を書きます。

ステートマシンの定義上で、ステップの出力には、任意の文字列を加えることはできない模様

  • ResultPath は出力に渡す組み合わせを制御できるが、任意の文字列は加えることができなかった

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-resultpath.html

  • ResultSelector は出力の一部を表示するなどの制御はできるが、任意の文字列は加えることができなかった

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector

ステートマシンの定義上で、ステップの入力では、前のステップからの入力に加えて、静的な値を入力可能

  • Parametersフィールドでは、ステップの入力に静的な値を加えることができる
  • Lambda関数で静的な値を返してあげることで要件を達成できた

それぞれの値は、ステートマシンの定義に含める静的な値、または入力からパスまたはコンテキストオブジェクトで選択した静的な値を使用できます。

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-inputpath-params.html

あとがき

ステートマシンの出力関係のフィールドで、静的な値も出力できると、今回のように Lambda を挟まなくてよくなるので嬉しいなと思いました!もし、既に出来る方法があったらごめんないm(_ _)m

それではまた!

Discussion