⏮️

SQSのメッセージをStep FunctionsのMapState内で扱いたい

に公開

背景

株式会社DGビジネステクノロジー NaviPlusレビューの飯塚です。

以下のような構成でStep Functionsを書いていたとき、Iterate内でSQSメッセージの内容を取得するのに、少しだけ困ったのでメモしておきます。

SQS -> Lambda(Go)-> Step Functions(Start -> MapState -> Iterate)

GoのLambdaをSQSトリガーに設定し、以下のようにStepFunctionsを起動させています。
InputにSQSから受け取ったメッセージの内容をJSON形式で入れているイメージです。

.
.
.
result, err := sfnClient.StartExecution(ctx, &sfn.StartExecutionInput{
    StateMachineArn: aws.String(stateMachineArn),
    Input:           aws.String(string(inputJSON)),
    Name:            aws.String(executionName),
})

このInputの中身を使ってMapState内でHTTPリクエストをしたかったのですが、以下のエラーになってしまいました。エラー内容はシンプルに、「inputにexample1はないよ」ということでした。

An error occurred while executing the state '<MapStateName>' (entered at the event id #n). The JSONata expression '$states.input.example1' specified for the field 'Arguments/RequestBody/Example1' returned nothing (undefined).

理由は明確ではないのですが、ログを見るとデータの持ち方は以下のようになっていました。
つまりMapStateの場合、

  • Variablesの定義はMapStateStarted時点でなされる
  • しかし、その時点ではinput情報は持っていない
  • なので、MapState上ではinput情報をVariablesに定義できない

と推測しています。(このへん詳しい方いらっしゃればコメントいただけると嬉しいです。)

MapStateEntered // この時点ではinput情報を持っている
↓
MapStateStarted // この時点ではinputの情報はなく、Itemsの数の情報のみ
↓
MapIterationStarted

前置きが長くなりましたが、どうやって渡せばいいのかというのを以下にまとめておきます。

やりかた

結論、Pass Stateを挟んで、その中でVariablesを定義しましょう。

※「Map state input and output fields in Step Functions」というそれっぽい公式ドキュメントがあり、この方法でも同じようなことは実現できそうですが、一部ItemsPathだけJSONPath向けの内容のようです。

以下のイメージです。

SQS -> Lambda(Go)-> Step Functions(Start -> Pass -> MapState -> Iterate)
  • Pass Stateの変数(Variables)タブで以下のように定義する
{
  "example1": "{% $states.input.example1 %}",
  "example2": "{% $states.input.example2 %}"
}
  • ASLでは以下のように定義される
"Assign": {
  "example1": "{% $states.input.example1 %}",
  "example2": "{% $states.input.example2 %}"
}
  • Map StateのIterate内で {% $example %}で呼び出す
    • HTTP Endpoint Stateなら、リクエスト本文などで以下のように呼び出せる
{
  "example1": "{% $example1 %}",
  "example2": "{% $example2 %}"
}


Variables利用イメージ

DGBTテックブログ

Discussion