[AWS] コードを触りながら AWS Step Functionsの挙動を確認する①(Task/Choice)
はじめに
ご覧いただきありがとうございます。阿河です。
業務の中でAWS Step Functionsを触る機会がありました。
GUIベースでワークフローを組み立てられる機能は便利ですが、どういう仕組みで動いているのかが掴みづらいなと思いました。
そのためStatement言語で記述を行いながら、ワークフローを作成していきたいと思います。
対象者
- AWSを運用中の方
- ワークフロー制御を行ってみたい方
概要
※本記事の対象
- 事前準備
- Step Functionsを実行するトリガーを用意する
- AWS Step Functions側の実装
- 検証①
本記事では、以下のフローを検証します。
- S3にCSVファイルを置く。Lambdaを経由してステートマシンにトリガーをかける。
- DynamoDBに該当CSVファイルに関するレコードがあるかジャッジする。なければ1を返す。
- choiceで分岐。1なら「Debug」に移行。それ以外なら「Fail」に移行。
AWS Step Functionsの挙動理解のために、あえてWorkflow Studioを利用した作成は行わない方向でいきます。
また次回以降の記事では今回作成した構成にGlueジョブを組み合わせたり、Parallel構成を試す予定なので、ご興味あれば参照いただけると嬉しいです。
1. 事前準備
S3バケットとCSVデータの準備
以前のブログで利用したフォルダ構造とCSVデータを流用します。
適宜 環境に合わせて、御用意ください。
cf.CSVファイル
日本語でカラムを追加しました。
cf. S3バケット内のフォルダ構造
私は本検証の中で、「バケット名/branch_officeA/12/1/」直下にファイルを保存しています。
AWS Step Functionsのステートマシンを作成
そのうちステートマシンは「ワークフロー」を表しています。
ステートマシンを作成します。
「コードでワークフローを記述」を選択
⇒タイプ: 標準
⇒ステートマシン名: 任意の名前
⇒実行ロール: 新しいロールの作成
⇒ログ記録: OFF
⇒X-Rayトレースの有効化: オフ
ステートマシンが作成されました。
また実行ロールの内容は以下の通りです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords",
"xray:GetSamplingRules",
"xray:GetSamplingTargets"
],
"Resource": [
"*"
]
}
]
}
X-Rayに関する権限が付与されています。
Lambda実行用に以下のポリシーをロールに追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"*"
]
}
]
}
ステートマシンの画面で「編集」ボタンを選択します。
左側のコードブロックにコードを書くと、右側に自動でワークフロー図が生成されます。
「"Type": "Pass"」は、何も作業せずに入力を出力に渡します。
ひとまず実行してみましょう。
ワークフローは成功して、"World"と表示されました。
DynamoDBテーブルの作成
今回は処理を行うにあたり、DynamoDBのテーブルをチェックする仕様にします。
テーブル名: 任意の名前
パーティションキー: file_name
テーブル設定: デフォルト設定
上記の設定で、DynamoDBテーブルを作成します。
2. AWS Step Functionsを実行するトリガーを用意する
S3イベント通知の設定/トリガー用Lambdaコード作成
S3イベントからEventBridgeを使ってStep Functionsを実行のトリガーをかけることも可能ですが、今回はS3イベント通知⇒Lambda⇒Step Functionsを実行します。
まずトリガー用のLambda関数を作成します。
- 関数名: Trigger_StateMachine
- ランタイム: Python3.9
Lambdaの実行ロールには、AWS LambdaBasicExecutionRoleと合わせて、下記ポリシーを付与します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "states:StartExecution",
"Resource": "*"
}
]
}
コードは以下の通りです。
import boto3
import json
from urllib.parse import unquote_plus
client = boto3.client('stepfunctions')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
object = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
s3_params = {
"bucket": bucket,
"key": object
}
res = client.start_execution(
input = json.dumps(s3_params),
stateMachineArn = 'xxxxxxxxxxxxxxxxxxx'
)
ここで必要な要素は以下の通りです。
- S3バケットとオブジェクトの読み込み
S3から受け取るイベント情報の構造は、下記の通りです。
{
"Records":[
{
"eventVersion":"2.2",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":"object-size in bytes",
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
イベント構造に従って、バケット名とオブジェクト名の情報を読み取ります。
- Step Functionsの実行
https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
inputには、実行用のJSON入力データを含む文字列を指定。
またstateMachineArnには、ステートマシンのARNを指定します。
S3イベント通知の設定
イベント通知の送信先にLambdaを指定します。
イベント名: 任意の名前
イベントタイプ: PUT
送信先: Lambda関数
3. AWS Step Functions側の実装
DynamoDBチェック用のLambda関数作成
DynamoDBのレコードをチェックするLambda関数を作成します。
- 関数名: Check_DB
- ランタイム: Python3.9
- IAMロール(S3およびDynamoDBのアクセス権限)
IAMロールの権限は要件に応じて絞ってください。
DynamoDBチェック用のLambdaコード作成
- S3自動通知をトリガーとして情報を受け取る
- DynamoDBに対象CSVファイルに関するレコードがなければ、DynamoDBに情報を書き込む。1を出力する
上記を踏まえてLambdaコードを書きます。
import boto3
s3 = boto3.client('s3')
dynamo = boto3.resource('dynamodb')
dynamo_table = dynamo.Table('csv-transform-db')
def lambda_handler(event, context):
bucket = event['bucket']
filename = event['key']
try:
add_item = {
'file_name': filename.split('/')[3],
'file_exist': 'exist',
'fix1': 'false'
}
response = dynamo_table.put_item(
Item=add_item,
ConditionExpression='attribute_not_exists(file_name)'
)
return 1
except Exception:
print('already exist')
ここでやっていることは以下の通りです。
- S3バケットとオブジェクト名の取得
前セクションで作成したトリガー用Lambdaから、Step Functionsに以下の入力が渡されます。
s3_params = {
"bucket": bucket,
"key": object
}
s3_paramsはjson.dumpsでjson形式に変換されています。
入力値がLambdaに渡される。
これにより対象バケット名とオブジェクト名を抽出します。
- DynamoDBへの書き込み
boto3の「DynamoDB.Table.put_item()」を使って、DynamoDBに書き込みを行います。ただし「ConditionExpression」を指定しているため、DynamoDBに既にレコードがある場合は処理が行われません。
- 返り値
DynamoDBへの書き込みが成功したら、1を返します。
この1の値によって、ステートマシンの次のState(Choice)の際に次のStateに進むことができます。
ステートマシンの編集
ステートマシンのJSONを編集します。
{
"StartAt": "Check_DB1",
"States":{
"Check_DB1": {
"Type": "Task",
"Resource":"arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:Check_DB",
"ResultPath": "$.result",
"Next": "Choice1"
},
"Choice1": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.result",
"NumericEquals": 1,
"Next": "Debug"
},
{
"Variable": "$.result",
"NumericEquals": 0,
"Next": "Fail"
}
],
"Default": "Fail"
},
"Debug": {
"Type": "Pass",
"Result": "OK",
"End": true
},
"Fail": {
"Type": "Fail",
"Cause":"Exist"
}
}
}
複雑そうに見えるかもしれませんが、シンプルなフォーマットに沿って記述していくだけです。各Stateに必要なフィールドは、Stateごとに説明の記載がありますので御確認ください。
"States"は、「Check_DB1」「Choice1」「Debug」「Fail」を定義しました。
状態(State)はステートマシンの要素です。
- 「Check_DB1」(Type:Task)では、先ほど作成したLambda関数を実行します。DynamoDBにレコードが存在しないならば、Lambdaから1の返答が返ってきます。処理が終わったら「Choice1」に移行します。
- 「Choice1」(Type:Choice)では、渡された値に基づいて処理を分岐させます。値が1なら「Debug」、値がそれ以外なら「Fail」に移行します。
- 「Debug」(Type:Pass)では、シンプルにデバッグのみ行います。"OK"と表示します。
- 「Fail」(Type:Fail)では、"Exist"と表示します。
JSONを書いてフォーマットが問題ないなら、右側にフロー図が表示されます。
4. 検証①
準備ができたので、S3バケット内にCSVファイルをアップロードしてみましょう。
アップロード後に、Step Functions側の画面を確認します。
以下フロー図です。
実行は成功しました。
Check_DB1で「1」が渡されたことにより、Choice1からDebugに分岐しています。
結果的にDebugの結果である"OK"という文字列が出力されています。
またCheckDBの処理の際に、DynamoDB側にも書き込みがされています。
ではもう一度S3バケットに同じファイルをアップロードし直してみます。
今度は処理に失敗しています。
DynamoDBのレコードが存在するため、Check_DB1は1を返しません。
よってChoice1の際に、Failに移行をしてしまいます。
これは想定通りの動きです。
今回は以上です。
さいごに
今回はStep Functionsの基本的な挙動を確認していきました。
次回の記事は今回作成したワークフローにGlueジョブを加えていきます。
御覧いただき ありがとうございました!
Discussion