Step FunctionsのMap Stateによる動的並列処理の簡単な実装
(単なる偶然ですが)ちょうど3年前の9/18、dynamic parallelism in workflowsがStep Functionsでサポートされました。それ以前は、index
参照したりChoice
やWait
でループ処理したり並列実行するためにもう一つワークフローを起動したり、Step Functions(以下、SFn)による動的な並列処理実装はまあまあ手間がかかりました。
今回は並列処理したい量に関わらず、一つのワークフローと一つのLambda関数で動的並列処理する実装例を試しました。
Map Stateサンプル
AWSドキュメントとしてはマップ状態を使用して Lambda を複数回呼び出すに記載されてますが、VSCode - AWSToolkit
を使うとエディタに直接、ステートマシンのサンプルコードを読み込めるので学習に便利です。
今回試したステートマシンはこちら。
{
"StartAt": "Invoke Lambda function",
"States": {
"Invoke Lambda function": {
"Type": "Map",
"MaxConcurrency": 10,
"Next": "Pass",
"Iterator": {
"StartAt": "Iterator function",
"States": {
"Iterator function": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:000123456789:function:stepfunc_sample",
"End": true
}
}
}
},
"Pass": {
"Type": "Pass",
"End": true
}
}
}
MaxConcurrency
SFnから並列実行する上限ですが、実運用ではまず考慮が必要になるパラメータになります。
その理由はSFnによる並列実行数: MaxConcurrency > Lambda関数の同時実行数: Concurrent executions
の場合はワークフロー実行エラーになるためです。
逆にリージョンあたりLambda関数の同時実行数の範囲でSFn並列数が制御されていれば、MaxConcurrency
ごとにIteratorがループ処理されるためワークフローは正常終了できます。
SFnからLambda関数を呼び出す場合はLambda関数の同時実行数を予約しておくことがお薦めです。同時実行数の確保にはReserved
とProvisioned
の2種類がありますが、大抵のユースケースはReserved
で十分なはずです。
また、Iteratorの要素数をMaxConcurrency
で区切って実行するため、並列処理数が多い時はその分、SFnの実行時間は長くなるので費用対効果と同時実行数は応相談です。
ステートマシン入力
SFnは1回あたりの処理データ量が少ない場合に使い勝手が良いサービスですが、SFnのペイロード上限は256KB程度(正確には262,144 バイト超
)なので処理データをステート間で渡すことは不可能な場合も多いはずです。
処理データがS3にある場合、ペイロード対策としてオブジェクトARNをワークフローの入力としてLambda関数で並列処理できます。
今回の検証で使ったステートマシン入力を生成するコードはこちら。
def list_s3obj(bucket, key):
s3 = get_boto3Client('s3')
objs = []
nextToken = {}
while True:
res = s3.list_objects_v2(
Bucket=bucket,
# Delimiter='string',
# EncodingType='url',
MaxKeys=1000,
Prefix=key,
# ContinuationToken='string',
**nextToken,
# FetchOwner=True|False,
# StartAfter='string',
# RequestPayer='requester',
# ExpectedBucketOwner='string'
)
for obj in res['Contents']:
if obj['Size'] == 0:
continue
else:
obj_name = obj['Key']
data = {
"key": obj_name,
"bucket": f"arn:aws:s3:::{bucket}"
}
objs.append(data)
if res['IsTruncated']:
nextToken['ContinuationToken'] = res['NextContinuationToken']
else:
break
return (objs)
以下のような形式で入力パラメータを返します。
[
{'key': 'today/random_1.log', 'bucket': 'arn:aws:s3:::{bucket}'},
{'key': 'today/random_10.log', 'bucket': 'arn:aws:s3:::{bucket}'},
#略.....
]
ステートマシン実行
今回はローカルPCからコード検証していたので、ワークフローを呼び出す際は次のように実行します。
if __name__ == '__main__':
s3_objs = list_s3obj({bucket}, 'today/')
sfn = get_boto3Client('stepfunctions')
suffix = generate_randomStr(5)
response = sfn.start_execution(
stateMachineArn='arn:aws:states:ap-northeast-1:000123456789:stateMachine:MyState_Sample',
name=f"FromLocal_{dt}_{suffix}",
input=json.dumps(s3_objs),
# traceHeader='string'
)
本筋ではないですが、検証環境の補足です↓
get_boto3Client()
MFA設定したプロファイル情報を使うため、個人的にはbroamski/aws-mfaをよく使っています。それを前提にfrom boto3.session import Session
からプロファイル名を指定しています。
def get_boto3Client(serviceName):
session = Session(profile_name={profile_name})
client = session.client(serviceName)
return client
start_execution(input, name)
SFnにおける入力はJSON形式である必要があるため、input
ではjson.dumps()
する必要があります。また、name
が重複しているとワークフロー呼び出しが失敗するためsuffix
のようにランダム文字列を付与して対処しました。
def generate_randomStr(length):
strings = string.digits + string.ascii_lowercase + string.ascii_uppercase
randomStr = ''.join([random.choice(strings) for i in range(length)])
return randomStr
実行結果
AWSコンソールで確認できる様子はこちら。
-
ステップ入力
-
イテレーションの詳細
-
イテレーション出力(次ステップの入力)
Lambda関数コード
今回はランダムな整数を記述したファイルをS3に保存して「1ファイルずつLambda関数で読み取って中身の整数を2乗する」という処理を並列実行しています。
サンプルデータ生成のコードは省略しますが、random()
関数で雑に生成したので1ファイル1数値ではなくファイルによっては複数の値を保持しているためそれに対応するコードも必要になっています。テストデータだろうと、入力データの準備は大切ですね・・・
import boto3
s3 = boto3.client('s3')
def fetch_data(byteData):
# 改行コードで値を分割
# 末尾にも改行コードがあるため最終要素の空値はフィルタする
list_data = list(filter(None, byteData.decode('utf-8').split('\n')))
list_data = list(map(int, list_data))
return list_data
def lambda_handler(event, context):
print(event)
bucket = event['bucket'].split(':')[-1]
filename = event['key']
directory = f"/tmp/{filename}"
s3 = boto3.client('s3')
response = s3.get_object(
Bucket=bucket,
# IfMatch='string',
# IfModifiedSince=datetime(2015, 1, 1),
# IfNoneMatch='string',
# IfUnmodifiedSince=datetime(2015, 1, 1),
Key=filename,
# Range='string',
# ResponseCacheControl='string',
# ResponseContentDisposition='string',
# ResponseContentEncoding='string',
# ResponseContentLanguage='string',
# ResponseContentType='string',
# ResponseExpires=datetime(2015, 1, 1),
# VersionId='string',
# SSECustomerAlgorithm='string',
# SSECustomerKey='string',
# RequestPayer='requester',
# PartNumber=123,
# ExpectedBucketOwner='string',
# ChecksumMode='ENABLED'
)
data = response['Body'].read()
# type(data)
# <class 'bytes'>
# data
# ex.) b'57\n'
listed_data = fetch_data(data)
result = 0
for i in listed_data:
result += i ** 2
return result
最後に
動的並列処理という堅苦しい言い回しはともかく、Lambda関数のユースケースを拡張するSFnの機能としてMap Stateはとても強力でした。同時実行数やLambdaの実行時間に気をつければ、簡単に並列処理を実装してスケールできます。
DSLを覚える必要があるのは学習障壁ではありますが、個人的にはUMLを自分で作成する手間と大して変わりはないのでシーケンス図のないバッチ処理運用を避けられると思えば良い動機づけになりそう、と考えています。
実運用に当たっては、Step Functions のベストプラクティスを一読しましょう。TimeoutSeconds
やRetry - ErrorEquals
、Catch - ErrorEquals
句によってエラーハンドリングを定義できます。
Discussion