AWS StepFunctions入門
今まで AWS Lambda でバッチ処理してきたんですが、限界(最大実行時間15分)が来たのでバッチ処理を AWS StepFunctions に移行することにしました。
で、初めて使うサービスなので使い方を整理することにしました。
概要
ユースケース
以下のユースケースを想定してStepFunctionsでバッチ処理を作成していきます。
全テナントに対して外部サービスから顧客データを取得してアプリDBに保存する
使用するAWSサービス
サービス | 説明 |
---|---|
AWS Step Functions | サーバーレスなワークフローオーケストレーションサービス。要は複数の処理(Lambdaなど)を繋ぎ合わせて、順番や分岐を制御できるサービス。ローコードで作るプログラムの作業手順書のイメージ |
AWS Lambda | 処理担当(Python使用) |
Amazon SNS | 特定の処理で失敗した際に管理者に通知をするために使用します。 |
詳細
サービス | 名前 | 説明 |
---|---|---|
StepFunctions | BatchTenantSync | バッチ管理用のStepFunctions |
Lambda | BatchTenantSync_LogStartBatch | バッチ開始時に、バッチID・開始時刻・バッチ種別をCloudWatch Logsに記録します。 |
Lambda | BatchTenantSync_GetTenantList | 処理対象となるテナント一覧をアプリケーションDBから取得します。 |
Lambda | BatchTenantSync_FetchCustomerDataList | 各テナント単位で外部サービスに接続し、対象顧客のデータ一覧を取得します。 |
Lambda | BatchTenantSync_SaveCustomerData | 取得した顧客データ一覧を整形し、アプリケーションDBに保存します。 |
Lambda | BatchTenantSync_SummarizeBatch | 全テナントの処理結果(成功・失敗件数)を集計し、バッチ全体のサマリーを作成します。 |
Lambda | BatchTenantSync_LogEndBatch | バッチ終了時に、バッチID・終了時刻・成功/失敗結果をCloudWatch Logsに記録します。 |
SNS | batch-alert | 通知用のSNS。 |
バッチ イメージ図
作成するバッチのイメージ
Step1: 開始ログと終了ログを出そう!
では、実際にStepFunctionsを作成していきます。
「BatchTenantSync」 というステートマシンを作成してください。
ステートマシンのタイプは 標準 です。
また、ステートマシンで使用するLambdaを作成します。
BatchTenantSync_LogStartBatch コード
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
timestamp = datetime.now(timezone.utc).isoformat()
def lambda_handler(event, context):
batch_id = event.get("batchId")
batch_type = event.get("batchType")
start_time = event.get("startTime") or timestamp
log_entry = {
"batchId": batch_id,
"batchType": batch_type,
"status": "STARTED",
"startTime": start_time,
"timestamp": timestamp
}
logger.info(log_entry)
return {
"batchId": batch_id,
"status": "STARTED",
"startTime": start_time,
"loggedAt": timestamp
}
BatchTenantSync_LogEndBatch コード
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
timestamp = datetime.now(timezone.utc).isoformat(timespec="seconds")
def lambda_handler(event, context):
batch_id = event.get("batchId")
status = event.get("status")
batch_type = event.get("batchType")
start_time = event.get("startTime")
end_time = timestamp
duration_seconds = _calculate_duration(start_time, end_time) if start_time else None
log_entry = {
"batchId": batch_id,
"status": status,
"batchType": batch_type,
"startTime": start_time,
"endTime": end_time,
"durationSeconds": duration_seconds,
"timestamp": timestamp
}
logger.info(log_entry)
return {
"batchId": batch_id,
"status": status,
"loggedAt": timestamp
}
def _calculate_duration(start, end):
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
return int((end_dt - start_dt).total_seconds())
except Exception:
return None
Step Functionsのコードを書く
実際に作成するならGUIで作成してASL (Amazon States Language) で調整することになると思います。
今回は Step Funcations の確認のためGUIでは作成せず、最初からASLで作成します
では実際に下記のコードで作成します。
{
"Comment": "テナント一覧を取得し、各テナントに対して外部サービスから顧客データを取得・保存するバッチ処理を行うステートマシン。エラー発生時はSNS通知+FailStateでバッチを異常終了させる設計。",
"StartAt": "SetBatchMeta",
"States": {
"SetBatchMeta": {
"Type": "Pass",
"Comment": "バッチ共通メタ情報(batchId、startTime、batchType)を作成する",
"Parameters": {
"batchId.$": "$$.Execution.Name",
"startTime.$": "$$.Execution.StartTime",
"batchType.$": "$$.StateMachine.Name"
},
"ResultPath": "$.meta",
"Next": "LogBatchStart"
},
"LogBatchStart": {
"Type": "Task",
"Comment": "バッチ開始ログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:BatchTenantSync_LogStartBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"ResultPath": null,
"Next": "LogBatchSuccess"
},
"LogBatchSuccess": {
"Type": "Task",
"Comment": "バッチ正常終了時にログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:BatchTenantSync_LogEndBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"status": "SUCCEEDED"
},
"ResultPath": null,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "バッチ処理成功。正常終了する"
}
}
}
上記ASLの解説
上記のASLを見ながら、どんなことが書いてあるのかを見ていきます。
StartAt
このステートマシンの開始ステートを指定します。
ここで指定されたステートから処理がスタートします。
States
ステートの集合です。
つまり、ASLは以下のような形で「処理の流れ」を順番に並べていきます。
{
"StartAt": "最初のステート名",
"States": {
"ステート名1": { ... },
"ステート名2": { ... },
"ステート名3": { ... }
}
}
ステートの中身
では、ステートの中身がどういうものかを確認します。
ざっくり以下のイメージです。
{
"Type": "何をするステートなのかを決めます(Lambdaを呼び出す、条件分岐する、一定時間待機、など)",
"Parameters": "ステートへの入力を決めます",
"ResultPath": "ステートからの出力を決めます",
"Next": "次のステートを指定します"
}
Type
Typeを指定することで、何をするステートなのかを指定します。
以下のようなタイプがあります。
Type | 役割 | ユースケース |
---|---|---|
Task | Lambdaや外部サービスを呼び出すステート | API実行、データ取得など |
Pass | 何もせずデータをそのまま or 加工して次に渡す | メタ情報付与、Input整形 |
Choice | 条件分岐を行うステート | 成功/失敗に応じて処理を分岐したい時 |
Wait | 一定時間待つステート | リトライ間隔を空ける、スケジュール調整 |
Map | 配列に対して並列 or 逐次で繰り返し処理するステート | テナント一覧などループ処理したい時 |
Succeed | 正常終了を明示的に宣言する | 処理が成功した後にワークフローを終了させたい時 |
Fail | 異常終了を明示的に宣言する | 処理に致命的な失敗があった場合、ワークフロー全体を止めたい時 |
今回作成するバッチ全体では以下のようなタイプを組み合わせてバッチを作成します。
Pass(バッチメタ作成)
→ Task(テナント一覧取得)
→ Map(各テナント処理)
├ Task(顧客データ取得)
└ Task(顧客データ保存)
→ Task(結果サマリー)
→ Choice(失敗数でSNS通知するか判断)
→ Succeed(正常終了) or Fail(異常終了)
Parameters, ResultPath
ステート同士で受け渡しをするペイロードの扱いについて。
ここら辺は説明すると長いので割愛。
ざっくり次のイメージ
- Parameters でステート内で呼び出すもの(Lambdaとか)に渡す入力データを選択
- ResultPath で出力結果をペイロードに加える
Next
次のステートを指示します。
バッチ実行
とりあえず上記だけ抑えれば、StepFunctionsでバッチは作成できます。
実際に実行してみましょう。
次のような画像になってバッチ実行が成功するはず
これで簡単なバッチ実行を確認できました。
次は少し複雑なバッチを作成・実行してみましょう
Step2: 少し複雑なバッチ
少し複雑なバッチを作成・実行します。
完成イメージはこんな感じ
Lambda関数
BatchTenantSync_GetTenantList
処理対象となるテナント一覧を取得する
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
event = {
"batchId": str, # 必須 - バッチ実行ID
"startTime": str, # 必須 - バッチ開始時刻 (ISO8601形式)
"batchType": str # 必須 - バッチ種別
}
"""
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
tenants = [
{"tenantId": f"tenant-{i:03}"} for i in range(1, 16)
]
logger.info({
"batchType": batch_type,
"batchId": batch_id,
"status": "STARTED",
"startTime": start_time,
"message": "Fetched tenant list",
"tenantCount": len(tenants)
})
return {
"tenants": tenants
}
BatchTenantSync_FetchCustomerDataList
指定されたテナントの外部サービスから顧客データ一覧を取得する
import uuid
import logging
from datetime import datetime, timezone, timedelta
import random
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
tenant_id = event.get("tenantId")
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"tenantId": tenant_id,
"message": "Fetching external data START"
})
# 疑似的に複数顧客データを生成
num_customers = random.randint(5, 10)
customer_data_list = []
for _ in range(num_customers):
customer = {
"customerId": f"cust-{uuid.uuid4()}",
"customerName": f"Customer-{uuid.uuid4().hex[:6]}",
"createdAt": random_created_at_within_6_months()
}
customer_data_list.append(customer)
logger.info({
"batchId": batch_id,
"tenantId": tenant_id,
"message": f"Fetching external data COMPLETE ({num_customers} customers)"
})
return {
"customerDataList": customer_data_list
}
# createdAtが同じだと味気ないので、幅をもたせる。
def random_created_at_within_6_months():
now = datetime.now(timezone.utc)
six_months_ago = now - timedelta(days=180)
random_seconds = random.randint(0, int((now - six_months_ago).total_seconds()))
random_timestamp = six_months_ago + timedelta(seconds=random_seconds)
return random_timestamp.isoformat(timespec="seconds")
BatchTenantSync_SaveCustomerData
取得した顧客データをアプリDB用に整形し保存する
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
tenant_id = event.get("tenantId")
customer_data_list = event["customerDataList"]
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"tenantId": tenant_id,
"message": f"Saving {len(customer_data_list)} customers to RDS START"
})
save_results = []
for customer in customer_data_list:
result = {
"customerId": customer["customerId"],
"status": "saved"
}
save_results.append(result)
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"tenantId": tenant_id,
"message": f"Saving {len(customer_data_list)} customers to RDS COMPLETE"
})
return {
"saveResults": save_results
}
BatchTenantSync_SummarizeBatch
各テナントの処理結果を集計し、ログに出力する
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
process_outputs = event["tenantResults"]
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
total = 0 # 顧客件数ベース
for result in process_outputs:
save_results = result.get("saveResult", {}).get("saveResults", [])
for save_result in save_results:
total += 1
summary = {
"batchId": batch_id,
"startTime": start_time,
"batchType": batch_type,
"summary": {
"total": total
}
}
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"message": "Batch summary complete",
"summary": summary
})
return summary
ASL
{
"Comment": "テナント一覧を取得し、各テナントに対して外部サービスから顧客データを取得・保存するバッチ処理を行うステートマシン。エラー発生時はSNS通知+FailStateでバッチを異常終了させる設計。",
"StartAt": "SetBatchMeta",
"States": {
"SetBatchMeta": {
"Type": "Pass",
"Comment": "バッチ共通メタ情報(batchId、startTime、batchType)を作成する",
"Parameters": {
"batchId.$": "$$.Execution.Name",
"startTime.$": "$$.Execution.StartTime",
"batchType.$": "$$.StateMachine.Name"
},
"ResultPath": "$.meta",
"Next": "LogBatchStart"
},
"LogBatchStart": {
"Type": "Task",
"Comment": "バッチ開始ログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_LogStartBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"ResultPath": null,
"Next": "GetTenantList"
},
"GetTenantList": {
"Type": "Task",
"Comment": "処理対象となるテナント一覧を取得する",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_GetTenantList",
"ResultPath": "$.tenantResult",
"Next": "ProcessTenants"
},
"ProcessTenants": {
"Type": "Map",
"Comment": "各テナントごとにデータ取得〜保存を並列処理する",
"Parameters": {
"meta.$": "$.meta",
"tenantId.$": "$$.Map.Item.Value.tenantId"
},
"ItemsPath": "$.tenantResult.tenants",
"MaxConcurrency": 3,
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "FetchCustomerDataList",
"States": {
"FetchCustomerDataList": {
"Type": "Task",
"Comment": "指定されたテナントの外部サービスから顧客データ一覧を取得する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_FetchCustomerDataList",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId"
},
"ResultPath": "$.fetchResult",
"Next": "SaveCustomerData"
},
"SaveCustomerData": {
"Type": "Task",
"Comment": "取得した顧客データをアプリDB用に整形し保存する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_SaveCustomerData",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId",
"customerDataList.$": "$.fetchResult.customerDataList"
},
"ResultPath": "$.saveResult",
"End": true
}
}
},
"ResultPath": "$.processTenantsResult",
"Next": "tenantBatchResults"
},
"tenantBatchResults": {
"Type": "Pass",
"Comment": "各テナントの処理結果をまとめて、サマリー用の入力データを整形する",
"Parameters": {
"meta.$": "$.meta",
"tenantResults.$": "$.processTenantsResult"
},
"ResultPath": "$",
"Next": "SummarizeBatch"
},
"SummarizeBatch": {
"Type": "Task",
"Comment": "各テナントの処理結果を集計し、ログに出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_SummarizeBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantResults.$": "$.tenantResults"
},
"ResultPath": "$.batchSummary",
"Next": "LogBatchSuccess"
},
"LogBatchSuccess": {
"Type": "Task",
"Comment": "バッチ正常終了時にログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_LogEndBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"status": "SUCCEEDED"
},
"ResultPath": null,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "バッチ処理成功。正常終了する"
}
}
}
Type: Map
ポイントは下記の部分
Type: Map
によってリストに対して繰り返し処理を行うことを示しています。
"ProcessTenants": {
"Type": "Map",
"Comment": "各テナントごとにデータ取得〜保存を並列処理する",
"Parameters": {
"meta.$": "$.meta",
"tenantId.$": "$$.Map.Item.Value.tenantId"
},
"ItemsPath": "$.tenantResult.tenants",
"MaxConcurrency": 3,
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "FetchCustomerDataList",
"ItemsPath": "$.tenantResult.tenants"
- 繰り返し対象となるリストの場所を指定しています。
- 今回は
tenantResult.tenants
という配列([{ "tenantId": "tenant-001", ...}]
)が対象 - 配列の各要素が反復処理されます
"MaxConcurrency": 3
- 並列実行できる最大数を制限する設定
- 例えば100テナントあっても最大3つまでしか同時に処理されません
- APIリミットやDB負荷、Lambdaの同時実行数制限を回避するために設定する
"ItemProcessor": { ... }
-
1回の繰り返し処理で実行される構成を定義
-
"Mode": "INLINE"
- このMapステートの中に小さなステートマシンをネストする方式
-
"StartAt": "FetchCustomerDataList"
- この小さなステートマシン内で最初に実行されるステート
バッチ実行
これで実行してみましょう。
バッチが成功するはずです。
Step3: リトライ&エラーハンドリング
次はリトライと失敗時の挙動について確認します。
バッチの構成として最初の GetTenantList
で失敗したら後の処理は全部スキップして失敗として扱いたいです。
なのでそのための構成に変更します。
イメージはこんな感じ
Lambda関数
確認のためわざとエラーを発生させます。
BatchTenantSync_GetTenantList
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
event = {
"batchId": str, # 必須 - バッチ実行ID
"startTime": str, # 必須 - バッチ開始時刻 (ISO8601形式)
"batchType": str # 必須 - バッチ種別
}
"""
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
+ # 意図的にエラーを発生させる
+ if True:
+ error_message = "Database connection failed: timeout after 5 seconds."
+ logger.error({
+ "batchId": batch_id,
+ "batchType": batch_type,
+ "status": "FAILED",
+ "startTime": start_time,
+ "errorType": "DatabaseConnectionError",
+ "errorMessage": error_message
+ })
+ raise Exception(error_message)
tenants = [
{"tenantId": f"tenant-{i:03}"} for i in range(1, 16)
]
logger.info({
"batchType": batch_type,
"batchId": batch_id,
"status": "STARTED",
"startTime": start_time,
"message": "Fetched tenant list",
"tenantCount": len(tenants)
})
return {
"tenants": tenants
}
ASL
GetTenantList
にリトライ設定とエラー時の対応について追記。
さらにエラー時にSNSを飛ばすようのステートも追加。
{
"Comment": "テナント一覧を取得し、各テナントに対して外部サービスから顧客データを取得・保存するバッチ処理を行うステートマシン。エラー発生時はSNS通知+FailStateでバッチを異常終了させる設計。",
"StartAt": "SetBatchMeta",
"States": {
"SetBatchMeta": {
"Type": "Pass",
"Comment": "バッチ共通メタ情報(batchId、startTime、batchType)を作成する",
"Parameters": {
"batchId.$": "$$.Execution.Name",
"startTime.$": "$$.Execution.StartTime",
"batchType.$": "$$.StateMachine.Name"
},
"ResultPath": "$.meta",
"Next": "LogBatchStart"
},
"LogBatchStart": {
"Type": "Task",
"Comment": "バッチ開始ログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_LogStartBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"ResultPath": null,
"Next": "GetTenantList"
},
"GetTenantList": {
"Type": "Task",
"Comment": "処理対象となるテナント一覧を取得する",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_GetTenantList",
"ResultPath": "$.tenantResult",
"Next": "ProcessTenants",
+ "Retry": [
+ {
+ "ErrorEquals": ["States.ALL"],
+ "IntervalSeconds": 5,
+ "MaxAttempts": 2,
+ "BackoffRate": 2.0
+ }
+ ],
+ "Catch": [
+ {
+ "ErrorEquals": ["States.ALL"],
+ "ResultPath": "$.errorInfo",
+ "Next": "NotifyFailure"
+ }
+ ]
},
"ProcessTenants": {
"Type": "Map",
"Comment": "各テナントごとにデータ取得〜保存を並列処理する",
"Parameters": {
"meta.$": "$.meta",
"tenantId.$": "$$.Map.Item.Value.tenantId"
},
"ItemsPath": "$.tenantResult.tenants",
"MaxConcurrency": 3,
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "FetchCustomerDataList",
"States": {
"FetchCustomerDataList": {
"Type": "Task",
"Comment": "指定されたテナントの外部サービスから顧客データ一覧を取得する",
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_FetchCustomerDataList",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId"
},
"ResultPath": "$.fetchResult",
"Next": "SaveCustomerData"
},
"SaveCustomerData": {
"Type": "Task",
"Comment": "取得した顧客データをアプリDB用に整形し保存する",
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_SaveCustomerData",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId",
"customerDataList.$": "$.fetchResult.customerDataList"
},
"ResultPath": "$.saveResult",
"End": true
}
}
},
"ResultPath": "$.processTenantsResult",
"Next": "tenantBatchResults"
},
"tenantBatchResults": {
"Type": "Pass",
"Comment": "各テナントの処理結果をまとめて、サマリー用の入力データを整形する",
"Parameters": {
"meta.$": "$.meta",
"tenantResults.$": "$.processTenantsResult"
},
"ResultPath": "$",
"Next": "SummarizeBatch"
},
"SummarizeBatch": {
"Type": "Task",
"Comment": "各テナントの処理結果を集計し、ログに出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_SummarizeBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantResults.$": "$.tenantResults"
},
"ResultPath": "$.batchSummary",
"Next": "LogBatchSuccess"
},
"LogBatchSuccess": {
"Type": "Task",
"Comment": "バッチ正常終了時にログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:1234456789012:function:BatchTenantSync_LogEndBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"status": "SUCCEEDED"
},
"ResultPath": null,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "バッチ処理成功。正常終了する"
},
+ "NotifyFailure": {
+ "Type": "Task",
+ "Comment": "バッチ失敗通知(SNS)を送信する",
+ "Resource": "arn:aws:states:::sns:publish",
+ "Parameters": {
+ "TopicArn": "arn:aws:sns:ap-northeast-1:1234456789012:batch-alert",
+ "Message": {
+ "default.$": "States.Format('【バッチ失敗通知】\nバッチ種別: {}\nバッチID: {}\nバッチ開始時刻: {}\nエラー内容: {}\n対応をお願いします。', $.meta.batchType, $.meta.batchId, $.meta.startTime, $.errorInfo.Cause)"
+ },
+ "MessageStructure": "json",
+ "Subject": "【緊急】バッチ処理失敗"
+ },
+ "ResultPath": null,
+ "Next": "LogBatchFailure"
+ },
+ "LogBatchFailure": {
+ "Type": "Task",
+ "Comment": "バッチ異常終了時にログを出力する",
+ "Resource": "arn:aws:lambda:ap-northeast-:1234456789012:function:BatchTenantSync_LogEndBatch",
+ "Parameters": {
+ "batchId.$": "$.meta.batchId",
+ "startTime.$": "$.meta.startTime",
+ "batchType.$": "$.meta.batchType",
+ "status": "FAILED",
+ "errorType.$": "$.errorInfo.Error",
+ "errorMessage.$": "$.errorInfo.Cause"
+ },
+ "ResultPath": null,
+ "Next": "FailState"
+ },
+ "FailState": {
+ "Type": "Fail",
+ "Comment": "バッチ異常終了(ステータスFAILEDで停止する)",
+ "Error": "BatchFailed",
+ "Cause": "See CloudWatch Logs for error details."
+ }
}
}
バッチ実行
バッチ処理が失敗し、ちゃんと次のような通知が飛んでくればOKです。
【緊急】バッチ処理失敗
【バッチ失敗通知】
バッチ種別: BatchTenantSync
バッチID: 9a4321af-e517-447d-b3da-6d354dd3b9c9
バッチ開始時刻: 2025-04-29T07:45:19.588Z
エラー内容: {"errorMessage": "Database connection failed: timeout after 5 seconds.", "errorType": "Exception", "requestId": "57b07d1f-81bd-4c4f-9276-7d82ed0bf9ca", "stackTrace": [" File \"/var/task/lambda_function.py\", line 30, in lambda_handler\n raise Exception(error_message)\n"]}
対応をお願いします。
これでバッチ処理に失敗した場合の対応もOKです。
Step4: 失敗数をカウントしてみる
Step3
では、大元が失敗したらバッチを失敗させて通知を送信する、ということをやりました。
次は「各テナント内で保存に失敗し、その失敗数をカウントする。かつ失敗が多い場合は通知を送信する」というのをやりたいと思います。
イメージはこんな感じ
Lambda関数
BatchTenantSync_GetTenantList
エラーが不要になったためコメントアウトします。
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
event = {
"batchId": str, # 必須 - バッチ実行ID
"startTime": str, # 必須 - バッチ開始時刻 (ISO8601形式)
"batchType": str # 必須 - バッチ種別
}
"""
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
# 意図的にエラーを発生させる
+ # if True:
+ # error_message = "Database connection failed: timeout after 5 seconds."
+ # logger.error({
+ # "batchId": batch_id,
+ # "batchType": batch_type,
+ # "status": "FAILED",
+ # "startTime": start_time,
+ # "errorType": "DatabaseConnectionError",
+ # "errorMessage": error_message
+ # })
+ # raise Exception(error_message)
tenants = [
{"tenantId": f"tenant-{i:03}"} for i in range(1, 16)
]
logger.info({
"batchType": batch_type,
"batchId": batch_id,
"status": "STARTED",
"startTime": start_time,
"message": "Fetched tenant list",
"tenantCount": len(tenants)
})
return {
"tenants": tenants
}
BatchTenantSync_SaveCustomerData
確率で保存処理を失敗させるようにします。
import logging
+ import random
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
tenant_id = event.get("tenantId")
customer_data_list = event["customerDataList"]
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"tenantId": tenant_id,
"message": f"Saving {len(customer_data_list)} customers to RDS START"
})
save_results = []
for customer in customer_data_list:
+ # 20%くらいの確率で失敗させる
+ if random.random() < 0.2:
+ result = {
+ "customerId": customer["customerId"],
+ "status": "failed",
+ "errorMessage": "Simulated DB error"
+ }
+ else:
+ result = {
+ "customerId": customer["customerId"],
+ "status": "saved"
+ }
save_results.append(result)
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"tenantId": tenant_id,
+ "message": f"Saving to RDS COMPLETE. Success: {sum(1 for r in save_results if r['status'] == 'saved')}, Failed: {sum(1 for r in save_results if r['status'] == 'failed')}"
})
return {
"saveResults": save_results
}
BatchTenantSync_SummarizeBatch
成功と失敗の件数をカウントするように修正
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
process_outputs = event["tenantResults"]
batch_id = event.get("batchId")
start_time = event.get("startTime")
batch_type = event.get("batchType")
total = 0 # 顧客件数ベース
+ success = 0
+ failed = 0
for result in process_outputs:
save_results = result.get("saveResult", {}).get("saveResults", [])
for save_result in save_results:
total += 1
+ if save_result.get("status") == "saved":
+ success += 1
+ else:
+ failed += 1
summary = {
"batchId": batch_id,
"startTime": start_time,
"batchType": batch_type,
"summary": {
"total": total,
+ "success": success,
+ "failed": failed
}
}
logger.info({
"batchId": batch_id,
"batchType": batch_type,
"startTime": start_time,
"message": "Batch summary complete",
"summary": summary
})
return summary
ASL
"Type": "Choice"
に失敗カウントを渡して十件以上ならSNS通知ステートに飛ぶようにしています。
{
"Comment": "テナント一覧を取得し、各テナントに対して外部サービスから顧客データを取得・保存するバッチ処理を行うステートマシン。エラー発生時はSNS通知+FailStateでバッチを異常終了させる設計。",
"StartAt": "SetBatchMeta",
"States": {
"SetBatchMeta": {
"Type": "Pass",
"Comment": "バッチ共通メタ情報(batchId、startTime、batchType)を作成する",
"Parameters": {
"batchId.$": "$$.Execution.Name",
"startTime.$": "$$.Execution.StartTime",
"batchType.$": "$$.StateMachine.Name"
},
"ResultPath": "$.meta",
"Next": "LogBatchStart"
},
"LogBatchStart": {
"Type": "Task",
"Comment": "バッチ開始ログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_LogStartBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"ResultPath": null,
"Next": "GetTenantList"
},
"GetTenantList": {
"Type": "Task",
"Comment": "処理対象となるテナント一覧を取得する",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType"
},
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_GetTenantList",
"ResultPath": "$.tenantResult",
"Next": "ProcessTenants",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 5,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.errorInfo",
"Next": "NotifyFailure"
}
]
},
"ProcessTenants": {
"Type": "Map",
"Comment": "各テナントごとにデータ取得〜保存を並列処理する",
"Parameters": {
"meta.$": "$.meta",
"tenantId.$": "$$.Map.Item.Value.tenantId"
},
"ItemsPath": "$.tenantResult.tenants",
"MaxConcurrency": 3,
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "FetchCustomerDataList",
"States": {
"FetchCustomerDataList": {
"Type": "Task",
"Comment": "指定されたテナントの外部サービスから顧客データ一覧を取得する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_FetchCustomerDataList",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId"
},
"ResultPath": "$.fetchResult",
"Next": "SaveCustomerData"
},
"SaveCustomerData": {
"Type": "Task",
"Comment": "取得した顧客データをアプリDB用に整形し保存する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_SaveCustomerData",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantId.$": "$.tenantId",
"customerDataList.$": "$.fetchResult.customerDataList"
},
"ResultPath": "$.saveResult",
"End": true
}
}
},
"ResultPath": "$.processTenantsResult",
"Next": "tenantBatchResults"
},
"tenantBatchResults": {
"Type": "Pass",
"Comment": "各テナントの処理結果をまとめて、サマリー用の入力データを整形する",
"Parameters": {
"meta.$": "$.meta",
"tenantResults.$": "$.processTenantsResult"
},
"ResultPath": "$",
"Next": "SummarizeBatch"
},
"SummarizeBatch": {
"Type": "Task",
"Comment": "各テナントの処理結果を集計し、ログに出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_SummarizeBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"tenantResults.$": "$.tenantResults"
},
"ResultPath": "$.batchSummary",
+ "Next": "CheckFailedCustomers"
},
+ "CheckFailedCustomers": {
+ "Type": "Choice",
+ "Comment": "バッチ全体で保存失敗数が5件以上なら警告通知を送信する",
+ "Choices": [
+ {
+ "Variable": "$.batchSummary.summary.failed",
+ "NumericGreaterThanEquals": 10,
+ "Next": "NotifyPartialFailure"
+ }
+ ],
+ "Default": "LogBatchSuccess"
+ },
+ "NotifyPartialFailure": {
+ "Type": "Task",
+ "Comment": "保存失敗数が10件以上だったので警告通知を飛ばす",
+ "Resource": "arn:aws:states:::sns:publish",
+ "Parameters": {
+ "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:batch-alert",
+ "Message": {
+ "default.$": "States.Format('【バッチ処理警告】\nバッチ種別: {}\nバッチID: {}\n総件数: {}\n成功件数: {}\n失敗件数: {}\n一部データ保存に失敗しました。内容を確認してください。', $.meta.batchType, $.batchSummary.batchId, $.batchSummary.summary.total, $.batchSummary.summary.success, $.batchSummary.summary.failed)"
+ },
+ "MessageStructure": "json",
+ "Subject": "【警告】バッチ一部失敗"
+ },
+ "ResultPath": null,
+ "Next": "LogBatchSuccess"
+ },
"LogBatchSuccess": {
"Type": "Task",
"Comment": "バッチ正常終了時にログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_LogEndBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"status": "SUCCEEDED"
},
"ResultPath": null,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "バッチ処理成功。正常終了する"
},
"NotifyFailure": {
"Type": "Task",
"Comment": "バッチ失敗通知(SNS)を送信する",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:batch-alert",
"Message": {
"default.$": "States.Format('【バッチ失敗通知】\nバッチ種別: {}\nバッチID: {}\nバッチ開始時刻: {}\nエラー内容: {}\n対応をお願いします。', $.meta.batchType, $.meta.batchId, $.meta.startTime, $.errorInfo.Cause)"
},
"MessageStructure": "json",
"Subject": "【緊急】バッチ処理失敗"
},
"ResultPath": null,
"Next": "LogBatchFailure"
},
"LogBatchFailure": {
"Type": "Task",
"Comment": "バッチ異常終了時にログを出力する",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:BatchTenantSync_LogEndBatch",
"Parameters": {
"batchId.$": "$.meta.batchId",
"startTime.$": "$.meta.startTime",
"batchType.$": "$.meta.batchType",
"status": "FAILED",
"errorType.$": "$.errorInfo.Error",
"errorMessage.$": "$.errorInfo.Cause"
},
"ResultPath": null,
"Next": "FailState"
},
"FailState": {
"Type": "Fail",
"Comment": "バッチ異常終了(ステータスFAILEDで停止する)",
"Error": "BatchFailed",
"Cause": "See CloudWatch Logs for error details."
}
}
}
バッチ実行
20%で失敗なので多分通知が飛ぶはず。
次のような通知が飛べばOKです。
【警告】バッチ一部失敗
【バッチ処理警告】
バッチ種別: BatchTenantSync
バッチID: 208c98e5-2890-4244-ab09-871a5c91af89
総件数: 116
成功件数: 82
失敗件数: 34
一部データ保存に失敗しました。内容を確認してください。
終わり
これでStepFunctionsはなんとなくわかりました。
もしこの記事が誰かの助けになれば嬉しいです。
最後に
- Lambdaの同時実行数上限(1000)に注意
- リージョン内での、同時に動いている数なので、
MaxConcurrency
で制限かけないと割と簡単に爆発する-
MaxConcurrency: 10
あたりから様子見すると良い
-
- バッチ内のLambdaが落ちるならまだマシで、別システムのLambdaを巻き込むと詰む
- リージョン内での、同時に動いている数なので、
- StepFunctionsを使ったからといって、Lambdaの15分以内の時間制限は変わらないから気をつけよう!
- 1つの処理が15分以上かかるなら ECS を採用した方が良い
- ステートのInput/Outputのペイロードは、最大256KB
- そのため大きなデータをやり取りしたい場合は S3かDynamoDB に保存して、キーのみをペイロードで受け渡しするのがベター
- あと入出力に関しては
Parameters
,ResultPath
以外にもある-
InputPath
,ResultSelector
,OutputPath
がある。これらを使うとさらに柔軟にハンドリングできる
-
- DynamoDBなどでバッチ処理の状態を管理して、バッチが実行中かどうかを管理できるとプロっぽい
- 部分失敗した情報(失敗リスト)を S3かDynamoDB に保存してあとでリトライできる設計にすると楽
StepFunctions料金
あと料金についても
USD 0.025/1,000 回の状態遷移
バッチ全体としては下の感じなので、100テナントあるとすると一回の実行で大体 208 ステート
1. SetBatchMeta → メタ情報定義(batchIdなど)
2. LogBatchStart → バッチ開始ログ
3. GetTenantList → テナント一覧取得(失敗時はSNS通知)
4. ProcessTenants (Map) → テナントごとのデータ取得・保存(並列)
- FetchCustomerDataList → 外部データ取得
- SaveCustomerData → DB保存
5. tenantBatchResults → Mapの結果を整形
6. SummarizeBatch → 処理集計
7. CheckFailedCustomers → 失敗数によって通知分岐
8. NotifyPartialFailure → 部分失敗の警告通知(オプション)
9. LogBatchSuccess → 終了ログ出力
10. SuccessState → 成功ステート
なので1バッチ実行あたりの料金は
(208 / 1000) * $0.025 = $0.052 > 約 0.8 円
Discussion