🔀

動かして理解する Step Functions の入出力

2024/09/11に公開

こんにちは、初めましての方は初めまして。株式会社 Fusic の瓦です。エアコンとパソコンを使いすぎたせいか今年の 8 月の電気代が 5 桁台に突入して驚愕しています。

AWS Step Functions (以降 SFn) は色々なサービスを組み合わせたワークフローを定義することが出来るサービスです。例えば S3 に何かしらの数値データがアップロードされた際に、最大値を計算する処理、最小値を計算する処理、平均値を計算する処理を一緒に行うことが出来ます(あまりいい例が思いつかなかったのですが、要するに並列に処理を行うことが出来るということです)また、EC2 を起動し、EC2 に関するデータを Lambda で取得し、それを SNS で通知するような一連のワークフローも定義できます。このように、SFn は小さいサービスの処理をまとめて一つの機能を実現したいときに役立つサービスです。

SFn ではタスクをつなげることで、どのタスクからどのタスクへデータを渡すのかを定義できますが、入力と出力を表すフィールドが複数存在しており、使用する際にいつも混乱してしまいます。そこでこの記事では SFn のタスクへの入出力について、実際の実行結果を見ながら理解していきたいと思います。(とりあえず SFn のデータフローだけ確かめたい人は一番下の追記の部分まで飛ばしてください)

入出力を表すパラメータについて

SFn のフィールドについては公式ドキュメントの図が分かりやすいです。

InputPathParameters を通じて入力はタスクへと渡され、タスクの出力は ResultSelectorResultPathOutputPath を通って次への入力となります。各フィールドの役割は以下になります。

フィールド名 説明
InputPath 入力される JSON のどの部分を使用するのかを定義
Parameters InputPath で選択したデータをタスクに渡す変換方法を定義
ResultSelector タスクの出力からどの部分を使用するのかを定義
ResultPath 入力される JSON と ResultSelector からの出力の組み合わせ型を定義
OutputPath ResultPath の出力のどの部分を使用するのかを定義

まあ言葉で書いても分かりづらいと思うので実際に実行して見ていきましょう。

SFn を実際に実行してみる

今回は簡単に入出力を確かめてみるために、データを受け取って内容を標準出力する一つの Lambda だけの SFn を作成しました。

ではそれぞれのフィールドを変えて確かめてみましょう。以下では SFn の入力は固定で

{
    "data": {
        "text": "This is test!",
        "num": 500,
        "animal": [
            {
                "name": "cat",
                "num": 1
            },
            {
                "name": "dog",
                "num": 3
            }
        ]
    }
}

とします。また Lambda の中身は

def lambda_handler(event, context):
    print(event)
    return {
        "message": "Hello, I am Lambda!",
        "num": 800
    }

としています。また、確かめたいフィールド以外は基本的に何もしない(受け取ったデータをそのまま流す)ようにしています。

InputPath を確かめる

InputPath を

"InputPath": "$.data"

として実行してみます。


コンソール上での表示

CloudWatch Logs では

{'text': 'This is test!', 'num': 500, 'animal': [{'name': 'cat', 'num': 1}, {'name': 'dog', 'num': 3}]}

と出力されています。コンソールでの入力では分かりませんが、Lambda での標準出力を見ると SFn の入力の data が渡されていることが分かります。このように InputPath は入力データのフィルタリングとして使用できます。

Parameters を確かめる

Parameters を

"Parameters": {
    "Payload.$": "$.data.animal",
    "FunctionName": "<Lambda's ARN>"
}

とします(今回は Lambda を使用している関係でフィールドが固定となっています。これについて詳しくはドキュメントを参照してください[1]

実行した際の CloudWatch Logs を見ると

[{'name': 'cat', 'num': 1}, {'name': 'dog', 'num': 3}]

と出力されています。これはデータを上の形式に変換した上で Payload にあたる部分を Lambda への入力としていることを示します。このように Parameters は入力データの整形として使用できます。

ResultSelector を確かめる

ResultSelector を

"ResultSelector": {
    "body.$": "$.Payload"
}

として実行してみます。


コンソール上での表示

ここでは body の値を Lambda 関数の出力としています($.Payload は Lambda 関数の出力を表します)この結果から分かるように、ResultSelector ではタスクの出力の整形として使用できます。

ResultPath を確かめる

ResultPath を

"ResultPath": "$.result"

として実行してみます。ここでは分かりやすいように ResultSelector は上で試したままにしています。


コンソール上での表示

この結果から分かるように、ResultPath は元の入力にタスクの出力を合わせることが出来ます。ResultPath に "$" と指定したり、そもそも何も指定しないと入力は破棄され、タスクの出力のみとなります。

OutputPath を確かめる

OutputPath を

"OutputPath": "$.data.animal[0]"

として実行してみます。ここでは分かりやすいように ResultSelector と ResultPath は上で試したままにしています。


コンソール上での表示

この結果から分かるように、OutputPath は最終的な出力のフィルタリングとして使用できます。また、この結果から分かるように配列の要素を 0-index で指定することも出来ます。

まとめ

この記事では SFn の入出力について、実際に SFn を実行して確かめてみました。タスクによっては入力や出力が決まっているものも多く、そのようなデータの整合性を合わせるためには各フィールドが何をするためのものなのかを把握しておけばそれぞれの入出力の形式に合わせてデータの整形を行うことが出来ます。上の実行結果から分かるように、InputPath でデータのフィルタリングを行って Parameters でタスクの入力形式へと整形し、ResultSelector でタスクの出力を整形して ResultPath で次のタスクへの入力にしたい部分とまとめた後に OutputPath で次のタスクへの入力をフィルタリングするという使い方をすると考えるといいのではと思います。

最後に宣伝になりますが、機械学習でビジネスの成長を加速するために、Fusic の機械学習チームがお手伝いたします。機械学習のPoCから運用まで、すべての場面でサポートした実績があります。もし、困っている方がいましたら、ぜひ Fusic にご相談ください。お問い合わせからでも気軽にご連絡いただけます。また Twitter の DM でのメッセージも大歓迎です。

追記

この記事を書き終わってから知ったのですが、SFn にはデータフローシミュレーターというもの[2]があり、これを使えばデータがどのように変形していくかを見ることが出来ます。頑張ってデータフローを考えるよりもシミュレーターで試してみる方が楽だと思うので、実際に SFn を使う場合はシミュレーターの利用を検討するといいと思います。

脚注
  1. https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-lambda.html ↩︎

  2. https://console.aws.amazon.com/states/home?region=us-east-1#/simulator ↩︎

GitHubで編集を提案
Fusic 技術ブログ

Discussion