🔥

Step FunctionsのMap Stateによる動的並列処理の簡単な実装

2022/09/18に公開

(単なる偶然ですが)ちょうど3年前の9/18、dynamic parallelism in workflowsがStep Functionsでサポートされました。それ以前は、index参照したりChoiceWaitでループ処理したり並列実行するためにもう一つワークフローを起動したり、Step Functions(以下、SFn)による動的な並列処理実装はまあまあ手間がかかりました。
https://aws.amazon.com/about-aws/whats-new/2019/09/aws-step-functions-adds-support-for-dynamic-parallelism-in-workflows/

今回は並列処理したい量に関わらず、一つのワークフローと一つのLambda関数で動的並列処理する実装例を試しました。

Map Stateサンプル

AWSドキュメントとしてはマップ状態を使用して Lambda を複数回呼び出すに記載されてますが、VSCode - AWSToolkitを使うとエディタに直接、ステートマシンのサンプルコードを読み込めるので学習に便利です。
https://marketplace.visualstudio.com/items?itemName=AmazonWebServices.aws-toolkit-vscode

今回試したステートマシンはこちら。

{
    "StartAt": "Invoke Lambda function",
    "States": {
      "Invoke Lambda function": {
        "Type": "Map",
        "MaxConcurrency": 10,
        "Next": "Pass",
        "Iterator": {
          "StartAt": "Iterator function",
          "States": {
            "Iterator function": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:000123456789:function:stepfunc_sample",
              "End": true
            }
          }
        }
      },
      "Pass": {
        "Type": "Pass",
        "End": true
      }
    }
}

MaxConcurrency

SFnから並列実行する上限ですが、実運用ではまず考慮が必要になるパラメータになります。
その理由はSFnによる並列実行数: MaxConcurrency > Lambda関数の同時実行数: Concurrent executionsの場合はワークフロー実行エラーになるためです。

逆にリージョンあたりLambda関数の同時実行数の範囲でSFn並列数が制御されていれば、MaxConcurrencyごとにIteratorがループ処理されるためワークフローは正常終了できます。

SFnからLambda関数を呼び出す場合はLambda関数の同時実行数を予約しておくことがお薦めです。同時実行数の確保にはReservedProvisionedの2種類がありますが、大抵のユースケースはReservedで十分なはずです。

また、Iteratorの要素数をMaxConcurrencyで区切って実行するため、並列処理数が多い時はその分、SFnの実行時間は長くなるので費用対効果と同時実行数は応相談です。

https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html

ステートマシン入力

SFnは1回あたりの処理データ量が少ない場合に使い勝手が良いサービスですが、SFnのペイロード上限は256KB程度(正確には262,144 バイト超)なので処理データをステート間で渡すことは不可能な場合も多いはずです。

処理データがS3にある場合、ペイロード対策としてオブジェクトARNをワークフローの入力としてLambda関数で並列処理できます。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/avoid-exec-failures.html

今回の検証で使ったステートマシン入力を生成するコードはこちら。

def list_s3obj(bucket, key):
    s3 = get_boto3Client('s3')

    objs = []
    nextToken = {}
    while True:
        res = s3.list_objects_v2(
            Bucket=bucket,
            # Delimiter='string',
            # EncodingType='url',
            MaxKeys=1000,
            Prefix=key,
            # ContinuationToken='string',
            **nextToken,
            # FetchOwner=True|False,
            # StartAfter='string',
            # RequestPayer='requester',
            # ExpectedBucketOwner='string'
        )

        for obj in res['Contents']:
            if obj['Size'] == 0:
                continue
            else:
                obj_name = obj['Key']
                data = {
                    "key": obj_name,
                    "bucket": f"arn:aws:s3:::{bucket}"
                }
                objs.append(data)
    
        if res['IsTruncated']:
            nextToken['ContinuationToken'] = res['NextContinuationToken']
        else:
            break
                
    return (objs)

以下のような形式で入力パラメータを返します。

[
  {'key': 'today/random_1.log', 'bucket': 'arn:aws:s3:::{bucket}'},
  {'key': 'today/random_10.log', 'bucket': 'arn:aws:s3:::{bucket}'},
  #略..... 
]

ステートマシン実行

今回はローカルPCからコード検証していたので、ワークフローを呼び出す際は次のように実行します。

if __name__ == '__main__':
    s3_objs = list_s3obj({bucket}, 'today/')

    sfn = get_boto3Client('stepfunctions')
    suffix = generate_randomStr(5)
    response = sfn.start_execution(
        stateMachineArn='arn:aws:states:ap-northeast-1:000123456789:stateMachine:MyState_Sample',
        name=f"FromLocal_{dt}_{suffix}",
        input=json.dumps(s3_objs),
        # traceHeader='string'
    )

本筋ではないですが、検証環境の補足です↓

get_boto3Client()

MFA設定したプロファイル情報を使うため、個人的にはbroamski/aws-mfaをよく使っています。それを前提にfrom boto3.session import Sessionからプロファイル名を指定しています。

def get_boto3Client(serviceName):
    session = Session(profile_name={profile_name})
    client = session.client(serviceName)

    return client

start_execution(input, name)

SFnにおける入力はJSON形式である必要があるため、inputではjson.dumps()する必要があります。また、nameが重複しているとワークフロー呼び出しが失敗するためsuffixのようにランダム文字列を付与して対処しました。

def generate_randomStr(length):
    strings = string.digits + string.ascii_lowercase + string.ascii_uppercase
    randomStr = ''.join([random.choice(strings) for i in range(length)])    
    return randomStr

実行結果

AWSコンソールで確認できる様子はこちら。

  • ステップ入力

  • イテレーションの詳細

  • イテレーション出力(次ステップの入力)

Lambda関数コード

今回はランダムな整数を記述したファイルをS3に保存して「1ファイルずつLambda関数で読み取って中身の整数を2乗する」という処理を並列実行しています。

サンプルデータ生成のコードは省略しますが、random()関数で雑に生成したので1ファイル1数値ではなくファイルによっては複数の値を保持しているためそれに対応するコードも必要になっています。テストデータだろうと、入力データの準備は大切ですね・・・

import boto3

s3 = boto3.client('s3')


def fetch_data(byteData):
    # 改行コードで値を分割
    # 末尾にも改行コードがあるため最終要素の空値はフィルタする
    list_data = list(filter(None, byteData.decode('utf-8').split('\n')))
    list_data = list(map(int, list_data))
    return list_data

def lambda_handler(event, context):
    print(event)

    bucket = event['bucket'].split(':')[-1]
    filename = event['key']
    directory = f"/tmp/{filename}"

    s3 = boto3.client('s3')
    response = s3.get_object(
        Bucket=bucket,
        # IfMatch='string',
        # IfModifiedSince=datetime(2015, 1, 1),
        # IfNoneMatch='string',
        # IfUnmodifiedSince=datetime(2015, 1, 1),
        Key=filename,
        # Range='string',
        # ResponseCacheControl='string',
        # ResponseContentDisposition='string',
        # ResponseContentEncoding='string',
        # ResponseContentLanguage='string',
        # ResponseContentType='string',
        # ResponseExpires=datetime(2015, 1, 1),
        # VersionId='string',
        # SSECustomerAlgorithm='string',
        # SSECustomerKey='string',
        # RequestPayer='requester',
        # PartNumber=123,
        # ExpectedBucketOwner='string',
        # ChecksumMode='ENABLED'
    )

    data = response['Body'].read()
    # type(data)
    # <class 'bytes'> 
    # data
    # ex.) b'57\n'
    
    listed_data = fetch_data(data)

    result = 0
    for i in listed_data:
        result += i ** 2
        
    return result

最後に

動的並列処理という堅苦しい言い回しはともかく、Lambda関数のユースケースを拡張するSFnの機能としてMap Stateはとても強力でした。同時実行数やLambdaの実行時間に気をつければ、簡単に並列処理を実装してスケールできます。

DSLを覚える必要があるのは学習障壁ではありますが、個人的にはUMLを自分で作成する手間と大して変わりはないのでシーケンス図のないバッチ処理運用を避けられると思えば良い動機づけになりそう、と考えています。

実運用に当たっては、Step Functions のベストプラクティスを一読しましょう。TimeoutSecondsRetry - ErrorEqualsCatch - ErrorEquals句によってエラーハンドリングを定義できます。

Discussion