🚀

[AWS] コードを触りながら AWS Step Functionsの挙動を確認する①(Task/Choice)

2022/12/23に公開約11,000字


はじめに

ご覧いただきありがとうございます。阿河です。

業務の中でAWS Step Functionsを触る機会がありました。

GUIベースでワークフローを組み立てられる機能は便利ですが、どういう仕組みで動いているのかが掴みづらいなと思いました。

そのためStatement言語で記述を行いながら、ワークフローを作成していきたいと思います。

対象者

  • AWSを運用中の方
  • ワークフロー制御を行ってみたい方

概要

※本記事の対象

  1. 事前準備
  2. Step Functionsを実行するトリガーを用意する
  3. AWS Step Functions側の実装
  4. 検証①

本記事では、以下のフローを検証します。

  • S3にCSVファイルを置く。Lambdaを経由してステートマシンにトリガーをかける。
  • DynamoDBに該当CSVファイルに関するレコードがあるかジャッジする。なければ1を返す。
  • choiceで分岐。1なら「Debug」に移行。それ以外なら「Fail」に移行。

AWS Step Functionsの挙動理解のために、あえてWorkflow Studioを利用した作成は行わない方向でいきます。

また次回以降の記事では今回作成した構成にGlueジョブを組み合わせたり、Parallel構成を試す予定なので、ご興味あれば参照いただけると嬉しいです。

1. 事前準備

S3バケットとCSVデータの準備

https://zenn.dev/megazone_japan/articles/b2680244e49c43

以前のブログで利用したフォルダ構造とCSVデータを流用します。
適宜 環境に合わせて、御用意ください。

cf.CSVファイル

日本語でカラムを追加しました。

cf. S3バケット内のフォルダ構造

私は本検証の中で、「バケット名/branch_officeA/12/1/」直下にファイルを保存しています。

AWS Step Functionsのステートマシンを作成

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html
AWS Step Functionsは「ステートマシン」と「タスク」に基づいています。
そのうちステートマシンは「ワークフロー」を表しています。

ステートマシンを作成します。

「コードでワークフローを記述」を選択

⇒タイプ: 標準
⇒ステートマシン名: 任意の名前
⇒実行ロール: 新しいロールの作成
⇒ログ記録: OFF
⇒X-Rayトレースの有効化: オフ

ステートマシンが作成されました。

また実行ロールの内容は以下の通りです。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "xray:PutTraceSegments",
                "xray:PutTelemetryRecords",
                "xray:GetSamplingRules",
                "xray:GetSamplingTargets"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

X-Rayに関する権限が付与されています。
Lambda実行用に以下のポリシーをロールに追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

ステートマシンの画面で「編集」ボタンを選択します。

左側のコードブロックにコードを書くと、右側に自動でワークフロー図が生成されます。

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-pass-state.html

「"Type": "Pass"」は、何も作業せずに入力を出力に渡します。


ひとまず実行してみましょう。


ワークフローは成功して、"World"と表示されました。

DynamoDBテーブルの作成

今回は処理を行うにあたり、DynamoDBのテーブルをチェックする仕様にします。

テーブル名: 任意の名前
パーティションキー: file_name
テーブル設定: デフォルト設定

上記の設定で、DynamoDBテーブルを作成します。

2. AWS Step Functionsを実行するトリガーを用意する

S3イベント通知の設定/トリガー用Lambdaコード作成

S3イベントからEventBridgeを使ってStep Functionsを実行のトリガーをかけることも可能ですが、今回はS3イベント通知⇒Lambda⇒Step Functionsを実行します。

まずトリガー用のLambda関数を作成します。

  • 関数名: Trigger_StateMachine
  • ランタイム: Python3.9

Lambdaの実行ロールには、AWS LambdaBasicExecutionRoleと合わせて、下記ポリシーを付与します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "states:StartExecution",
            "Resource": "*"
        }
    ]
}

コードは以下の通りです。

import boto3
import json
from urllib.parse import unquote_plus

client = boto3.client('stepfunctions')


def lambda_handler(event, context):
  
  bucket = event['Records'][0]['s3']['bucket']['name']
  object = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
  
  s3_params = {
    "bucket": bucket,
    "key": object
  }
  
  res = client.start_execution(
    input = json.dumps(s3_params),
    stateMachineArn = 'xxxxxxxxxxxxxxxxxxx'
  )

ここで必要な要素は以下の通りです。

  • S3バケットとオブジェクトの読み込み

https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/notification-content-structure.html
S3から受け取るイベント情報の構造は、下記の通りです。

{  
   "Records":[  
      {  
         "eventVersion":"2.2",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
         "eventName":"event-type",
         "userIdentity":{  
            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
         },
         "requestParameters":{  
            "sourceIPAddress":"ip-address-where-request-came-from"
         },
         "responseElements":{  
            "x-amz-request-id":"Amazon S3 generated request ID",
            "x-amz-id-2":"Amazon S3 host that processed the request"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"ID found in the bucket notification configuration",
            "bucket":{  
               "name":"bucket-name",
               "ownerIdentity":{  
                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
               },
               "arn":"bucket-ARN"
            },
            "object":{  
               "key":"object-key",
               "size":"object-size in bytes",
               "eTag":"object eTag",
               "versionId":"object version if bucket is versioning-enabled, otherwise null",
               "sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
            }
         },
         "glacierEventData": {
            "restoreEventData": {
               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
               "lifecycleRestoreStorageClass": "Source storage class for restore"
            }
         }
      }
   ]
}

イベント構造に従って、バケット名とオブジェクト名の情報を読み取ります。

inputには、実行用のJSON入力データを含む文字列を指定。
またstateMachineArnには、ステートマシンのARNを指定します。

S3イベント通知の設定

イベント通知の送信先にLambdaを指定します。

イベント名: 任意の名前
イベントタイプ: PUT
送信先: Lambda関数

3. AWS Step Functions側の実装

DynamoDBチェック用のLambda関数作成

DynamoDBのレコードをチェックするLambda関数を作成します。

  • 関数名: Check_DB
  • ランタイム: Python3.9
  • IAMロール(S3およびDynamoDBのアクセス権限)

IAMロールの権限は要件に応じて絞ってください。

DynamoDBチェック用のLambdaコード作成

  • S3自動通知をトリガーとして情報を受け取る
  • DynamoDBに対象CSVファイルに関するレコードがなければ、DynamoDBに情報を書き込む。1を出力する

上記を踏まえてLambdaコードを書きます。

import boto3

s3 = boto3.client('s3')


dynamo = boto3.resource('dynamodb')
dynamo_table = dynamo.Table('csv-transform-db')


def lambda_handler(event, context):
    
    bucket = event['bucket']
    filename = event['key']
    
    try:
        
        add_item = {
            'file_name': filename.split('/')[3],
            'file_exist': 'exist',
            'fix1': 'false'
        }
        response = dynamo_table.put_item(
            Item=add_item,
            ConditionExpression='attribute_not_exists(file_name)'
        )
        
        return 1
    
    except Exception:
        print('already exist')

ここでやっていることは以下の通りです。

  • S3バケットとオブジェクト名の取得

前セクションで作成したトリガー用Lambdaから、Step Functionsに以下の入力が渡されます。

s3_params = {
    "bucket": bucket,
    "key": object
    }

s3_paramsはjson.dumpsでjson形式に変換されています。
入力値がLambdaに渡される。

これにより対象バケット名とオブジェクト名を抽出します。

  • DynamoDBへの書き込み

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.put_item

boto3の「DynamoDB.Table.put_item()」を使って、DynamoDBに書き込みを行います。ただし「ConditionExpression」を指定しているため、DynamoDBに既にレコードがある場合は処理が行われません。

  • 返り値
    DynamoDBへの書き込みが成功したら、1を返します。
    この1の値によって、ステートマシンの次のState(Choice)の際に次のStateに進むことができます。

ステートマシンの編集

ステートマシンのJSONを編集します。

{
  "StartAt": "Check_DB1",
  "States":{
    "Check_DB1": {
      "Type": "Task",
	      "Resource":"arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:Check_DB",
      "ResultPath": "$.result",
      "Next": "Choice1"
    },
    "Choice1": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.result",
        "NumericEquals": 1,
        "Next": "Debug"
      },
      {
        "Variable": "$.result",
        "NumericEquals": 0,
        "Next": "Fail"
      } 
    ],
    "Default": "Fail"
    },
    "Debug": {
      "Type": "Pass",
      "Result": "OK",
      "End": true
    },
    "Fail": {
      "Type": "Fail",
      "Cause":"Exist"
    }
  }
}

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-states.html

複雑そうに見えるかもしれませんが、シンプルなフォーマットに沿って記述していくだけです。各Stateに必要なフィールドは、Stateごとに説明の記載がありますので御確認ください。

"States"は、「Check_DB1」「Choice1」「Debug」「Fail」を定義しました。
状態(State)はステートマシンの要素です。

  • 「Check_DB1」(Type:Task)では、先ほど作成したLambda関数を実行します。DynamoDBにレコードが存在しないならば、Lambdaから1の返答が返ってきます。処理が終わったら「Choice1」に移行します。
  • 「Choice1」(Type:Choice)では、渡された値に基づいて処理を分岐させます。値が1なら「Debug」、値がそれ以外なら「Fail」に移行します。
  • 「Debug」(Type:Pass)では、シンプルにデバッグのみ行います。"OK"と表示します。
  • 「Fail」(Type:Fail)では、"Exist"と表示します。

JSONを書いてフォーマットが問題ないなら、右側にフロー図が表示されます。

4. 検証①

準備ができたので、S3バケット内にCSVファイルをアップロードしてみましょう。

アップロード後に、Step Functions側の画面を確認します。

以下フロー図です。

実行は成功しました。

Check_DB1で「1」が渡されたことにより、Choice1からDebugに分岐しています。

結果的にDebugの結果である"OK"という文字列が出力されています。

またCheckDBの処理の際に、DynamoDB側にも書き込みがされています。

ではもう一度S3バケットに同じファイルをアップロードし直してみます。


今度は処理に失敗しています。

DynamoDBのレコードが存在するため、Check_DB1は1を返しません。
よってChoice1の際に、Failに移行をしてしまいます。
これは想定通りの動きです。

今回は以上です。

さいごに

今回はStep Functionsの基本的な挙動を確認していきました。
次回の記事は今回作成したワークフローにGlueジョブを加えていきます。

御覧いただき ありがとうございました!

Discussion

ログインするとコメントできます