Open9

AWS公式のStep Functions セルフワークショップをやってみた

ふじしろふじしろ

「サービスの統合」について

  • タスクステートメントでリクエストを行い、即座にレスポンスを受け取ってステップを進めるのが「リクエストレスポンス統合パターン」
  • 他のタスクを呼び出して同期的に完了を待ってからタスクを終了するのを ジョブの実行(.sync) と呼び、同期タスクを設定するパターンを「sync統合パターン」と呼ぶ
    • タイトルが「ジョブの実行」(Run a Job)になってるのが気になる。「サービスの統合」について話していて、リクエストレスポンスはそのまま統合パターン名なのに、なんでこっちのタイトルが「同期統合パターン」じゃないんだ...
  • コールバック待ち(.waitForTaskToken) を使用して外部プロセスが完了するまでワークフローを一時停止するパターンを「タスクトークンサービス統合パターン」と呼ぶ。最大1年間一時停止できる

整理すると

  • 即実行・即完了で同期せずにステップを進める:リクエストレスポンス統合パターン
  • AWS内のジョブについて完了を待ってから同期的にステップを進める:Sync統合パターン
  • AWS外のプロセスについて、トークンを用いたコールバックによりプロセスを同期させてステップを進める:タスクトークンサービス統合パターン

リクエストレスポンスパターン

  • デフォルトはこのパターン。

sync統合パターンについて

  • 他のジョブを呼び出す場合に勝手にこのパターンになるわけではなく、ASL定義のSubmit Batch JobのResource定義の末尾に.syncを追加する必要がある。
    • 追加しないとリクエストレスポンス統合パターンとなり、ジョブの完了を待たずにステップが進む

例:初期状態(リクエストレスポンス統合パターン)

    "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob",

例:sync統合パターン

    "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync", ←末尾に".sync"が追加されている

タスクトークンサービス統合パターンについて

  • トークンを渡した相手から回答が来るのを待ち、その結果に応じて次の作業へと進むような処理フローに使えそう。ワークでは、StepFunction → SQS でコールバックを待ち、SQSを処理するlambdaから結果を受け取っていた。
    • 自分の理解としては、.syncは、処理の開始をStepFunction側が指示するのに対して、waitForTaskTokenでは処理の開始から終了まで他のサービスに任せるイメージ。
  • TaskTokenを対象に渡し、対象からsendTaskSuccess() or sendTaskFailuer()で結果が返ってくるのを待ってから次のステップに進んでいる
  • Step Functions API - SendTaskSuccessSendTaskFailure
  • ドキュメント:タスクトークンによるコールバックの待機 (.waitForTaskToken)
    コールバック待ちをサポートする最適化されたサービスのリストについては、 Step Functionsの最適化された統合 を参照

メモ:実行結果のeventViewについて

  • truncated は通常falseだが、cloudwatch logの制限を超えるとtrueになる

メモ:Amazon Comprehendについて

ふじしろふじしろ

エラーの処理

  • どのステートにおいても様々な理由でランタイムエラーが発生する可能性がある
    • ステートマシン定義エラー
    • タスクの失敗(lambda等)
    • 一時的な問題(ネットワーク等)
  • Step Functionsはデフォルトですべてのステートマシン実行を失敗させる

関連ドキュメント

失敗時のリトライ

ASL定義の中でStatesの中に以下を追加すればOK

"Retry": [
        {
          "ErrorEquals": [
            "{エラー名と一致させるための非空の文字列配列。必須。}"
          ],
          "IntervalSeconds": {初回リトライ前の待機秒数。整数。オプション},
          "MaxAttempts": {最大リトライ回数。整数。オプション。0ならリトライされない},
          "BackoffRate": {各リトライごとに再リトライ間隔を増加させる際の乗数。デフォルトは2}
        }
      ],

エラーのキャッチ

  • Task、Map、およびParallelステートのCatchフィールドを利用する。
  • ASL定義の中にRetryとCatchフィールドが両方あった場合は、まずリトライ実施が行われ最終的にCatchが実行される。
  • 組み込みエラータイプ一覧(詳細):
    • States.ALL - 既知のエラー名に一致するワイルドカード
    • States.DataLimitExceeded - 出力がクォータを超えている
    • States.Runtime - ランタイム例外が処理できなかった
    • States.HeartbeatTimeout - Taskステートがハートビートの送信に失敗した
    • States.Timeout - Taskステートがタイムアウトした
    • States.TaskFailed - Taskステートが実行中に失敗した
    • States.Permissions - Taskステートに十分な権限がなかった

例:複数定義できる。
リトライ同様にErrorEquals で対象のエラーを特定するが、その後Nextで次のステートを指定する。

  "States": {
    "StartExecution": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:FailureFunction",
      "TimeoutSeconds": 5,
      // ハンドラー
      "Catch": [
        {
          "ErrorEquals": [
            "CustomError"
          ],
          "Next": "CustomErrorFallback"
        },
        {
          "ErrorEquals": [
            "States.Timeout"
          ],
          "Next": "TimeoutFallback"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "CatchAllFallback"
        }
      ],
      "End": true
    },
    // ハンドラーから指定されるステップたち
    "CustomErrorFallback": {
      "Type": "Pass",
      "Output": "This is a fallback from a custom Lambda function exception",
      "End": true
    },
    "TimeoutFallback": {
      "Type": "Pass",
      "Output": "This is a fallback from a timeout error",
      "End": true
    },
    "CatchAllFallback": {
      "Type": "Pass",
      "Output": "This is a fallback from any error",
      "End": true
    }

障害からの再駆動(redrive)

  • 再駆動/再実行(redrive)によりワークフロー全体ではなく、失敗した時点からステップを再開できる。
    • StepFunctionsの費用は、ステート遷移の数に基づいて請求するので、これによりコストが削減できる。
  • やり方はコンソールからRedrive from failureを選択して再駆動させること。
  • 再駆動を行った場合、同じState Viewタブで最初の実行と再駆動の両方を確認できる
    • 失敗した際の試行には赤い四角、成功した再駆動には緑の四角が表示される。

プログラムによる再駆動(redrive)

※redriveの訳は再駆動再実行で揺れている模様。注意。

  • 再駆動はリトライやキャッチと違ってStepFunctionsのASLだけでは実行できない。
  • しかし自動化したい場合は別のプログラムから再駆動させることができる。
  • 具体的にはStates.RedriveExecution API を使用してワークフローを再駆動させることができる。
  • ワークでは以下の構成により実現していた。
    1. StepFunctionsの失敗した実行イベントをEventBridgeがSQSに送信。
    2. lambdaが検知し、States.RedriveExcecutionを実行して再駆動
ふじしろふじしろ

バージョンとエイリアス

  • バージョン - ステートマシンの番号付けされた不変のスナップショット
  • エイリアス - 最大2つのステートマシンバージョンを指すポインタ(新しいワークフローへの負荷分散に役立ちます)
  • バージョンは同じステートマシンをバージョン管理できる機能っぽい
    • CLIからバージョンを指定(ARNで指定する)して実行することも可能
  • そしてエイリアスは、バージョンにエイリアスを設定することでバージョンごとのARNを参照することなく実行を切り替えることができる。

エイリアスの用途などについてはコンソール上の説明がわかりやすい

エイリアスの仕組み

エイリアスは、トラフィックを同じステートマシンの 1 つまたは 2 つのバージョンにルーティングするポインタです。エイリアスを使用して、アプリケーションコードを更新して新しいステートマシンの ARN を参照することなく、ブルー/グリーンデプロイ戦略、canary デプロイ戦略、およびローリングデプロイ戦略を実装できます。 詳細はこちら

ふじしろふじしろ

コスト最適化(Expressワークフローについて)

  • ワークフロー内に別のワークフローをネストする(モジュール化する)ことができる。
  • 通常のワークフロー(Standardワークフロー)とネストされたワークフロー(Expressワークフロー)は料金体系が異なり、うまく利用することでコストを最適化できる。
カテゴリ 標準ワークフロー (Standard Workflows) Expressワークフロー (Express Workflows)
最大実行時間 1年間 5分間
料金 ステート遷移の回数に基づいて課金されます。ステート遷移は、実行内のステップが完了するたびにカウントされます。 実行回数、実行時間、メモリ消費量に基づいて課金されます。
実行履歴 実行はStep Functions APIで一覧表示および詳細な説明ができ、コンソールから視覚的にデバッグできます。実行履歴データは90日後に削除されます。CloudWatch Logs経由でログ記録を有効にできます。 5分間の期間内で生成できる数のエントリを上限とした、無制限の実行履歴。ログ記録を有効にすることで、CloudWatch LogsまたはStep Functionsコンソールで実行を調査できます。
実行のセマンティクス 厳密に1回 (Exactly-once) のワークフロー実行。 非同期Expressワークフロー: 少なくとも1回 (At-least-once) のワークフロー実行。
同期Expressワークフロー: 最大1回 (At-most-once) のワークフロー実行。
サービス統合 すべてのサービス統合とパターンをサポート。 すべてのサービス統合をサポートしますが、ジョブ実行 (.sync) およびコールバック (.waitForTaskToken) のサービス統合パターンはサポートしません。
分散マップ サポート 非サポート
アクティビティ サポート 非サポート
ふじしろふじしろ

データの並列処理(Parallel stateの使い方及びAPI Gatewayとの統合)

  • Parallel state:データの並列処理
  • API Gatewayからステートマシンを呼び出すことができる
  • つまりAPIでステートマシンの処理を呼び出せる
  • ワークでは、API Gateway → Step Functions → lambda というフローを実行
  • lambdaで入力値に対する合計、平均、最大最小を求める関数を実装し、StepFunctionsのParallel stateを用いてこれらを並列で処理した。

データ配列の処理(Map state及びChoices stateの使い方)

  • Map State: データの反復処理
    • データをデータ構造に関係なく順次処理してくれる
    • データに対する何かしらの操作はMapの中のステップで実行する
  • Choices state: ワークフロー内の条件ロジック
    • 複数のルールをセットし、各ルールに当てはまった場合にどの後続処理につなげるかを定義できる
ふじしろふじしろ

IaCを使用したステートマシンの作成

  • aws cdk でトライ
  • code-server 上で操作
  • web から叩ける hello stepfunction 関数を作る感じ。
  • 最小構成だとこのくらいでかけるのか

ユースケース

具体的なユースケースに応じた構築例ががいくつも紹介されてるので、後から参照するのにも良さそう。

Human approval

  • よくありそうなユースケースなのでやってみる
  • 学習内容コピペ
    • 有効なEメールアドレスを使用してSNSトピックをサブスクライブします。
    • ワークフローを一時停止し、Eメールによる人の承認を待つwaitForTaskToken統合パターンを使用してステートマシンを実行します。
    • Eメールを受信し、承認リンクを選択して、ワークフローを続行するためにSendTasksSuccessをステートマシンに送信します。
    • ステートマシンの実行が完了したことを確認します。
  • インフラ構成/処理内容言語化
    • StepFunctionsがコールバック待ち(waitForTaskToken)で承認依頼メールを送信するlamda関数を呼び出す
    • メール送信lambda関数は環境変数に設定されたSNS_TOPICのARNに向けて承認確認メールを送信する
      • メールの中には、承認用リンクと差し戻し用リンク(ApiGatewayのAPIエンドポイント)が掲載されている
    • 承認確認メールを受け取った承認者は承認用/差し戻し用リンクを押下する
    • ApiGatewayのAPIからlambda関数が実行され、リクエストに付与されたクエリパラメータの承認/差し戻し情報とともにStepFunctionsにコールバックする。
    • StepFunctionsはコールバックを受けて、一緒に受け取った承認/差し戻し情報から、最終的な結果を判断する
ふじしろふじしろ

リソースのクリーンアップ(重要)

  • このワークショップ上で作成したリソースのクリーンアップ手順を説明してくれているセクション
  • 不要なリソースを残さないために重要だし、若干苦手意識があるので手順を明記してくれているのは非常にありがたい。