📝

Powertools for AWS (Python) を用いたSQS トリガーのバッチ処理の実装

2023/11/02に公開

こんにちは。CTO の hayata-yamamoto です。

弊社では、Serverless Framework を用いて Lambda アプリケーションを構築しています。APIサーバーや非同期処理のワーカーなどもなるべく Lambda で完結するものは完結するように書いており、その際に、Powertools for AWS (Python) というライブラリを多用しています。

https://docs.powertools.aws.dev/lambda/python/latest/

このライブラリは大変便利で、筆者個人としては「もっと早く知っておけば良かった」と思うことが多い一方で、普段からかなり Lambda 関数と接していないと「何がそんなに嬉しいのか」を実感持ってわかりにくいなと感じていました。

もちろん、ライブラリを使わなくても実装することはできるのですが、勘所が伝われば「なぜこのツールを採用しているのか」を理解しやすくなるなと思い、本記事を執筆しています。

本記事では、ライブラリ自体の解説についてはほどほどに、実際にどのように弊社で運用されており、何が解決されているのかを共有します。今回は、わかりやすさと簡潔さのために、Lambda 関数を用いたバッチ処理にフォーカスを当てて解説しますが、その他にもできることは多数あります。本ライブラリに親しみを持つ一つのきっかけとなりましたら幸いです。

Powertools記事

Powertools for AWS (Python) とはどんなライブラリなのか

公式によりますと

Powertools for AWS Lambda (Python) is a developer toolkit to implement Serverless best practices and increase developer velocity.

と説明されています。ざっと訳してみると、「サーバーレスのベストプラクティスを実装するためのツールで、開発者の開発速度を上げてくれる」ツールとなります。ツールキットとあるように、本ライブラリはフレームワークというよりも、便利ツールがまとまったものと解釈しておく方が親しみを持ちやすいかと思います。

例えば、現在すでに Lambda 関数を運用している際に本ツールが導入できないか?と言えばそんなこともなく、関数内のいくつかの部分を書き換えればロジックを変更せずに導入することも可能です。(もちろん、いくつかお作法はありますが)

いったいどんな時に Lambda でバッチ処理をする必要があるのか?

「Lambda関数を用いてバッチ処理をかける方法」に入っていく前に、「Lambda関数でバッチ処理を実行するときはいつなのか」を少しお話ししましょう。イメージつきますでしょうか?

いくつかのパターンや可能性があるのですが、おそらく一番よく使われており、導入も簡単なパターンは SQS と Lambda を併用して用いるものでしょう。アーキテクチャを簡単に図式化すると以下のようなイメージとなります。

SQSのキューにはいくつか種類がありますが、どちらにせよキューに溜まってきたメッセージを Lambda 関数内で何某か処理するような仕組みを考えるイメージです。Lambda だけを用いて、非同期呼び出ししても内部的には似たような挙動をしますが、ここではあえて SQS を採用する必要があるケースを想定します。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html

この時の、Lambda の挙動は以下のようになります。細かいことを言うとさまざまな仕様がありますが、大枠はこんな感じの理解で良いです。

  1. 対象のSQSキューをLambdaがポーリングする
  2. メッセージが存在していたら、あらかじめ設定した取得数分だけメッセージを取得する(少ない場合は取得できるだけ)
  3. Lambda内で処理を実行する
  4. メッセージを消す、もしくは残す

この際に、「複数取得したメッセージを一回の処理で捌く処理」がバッチ処理となるわけです。もちろん、Lambdaの取得設定を調整して、単一のメッセージだけ対応するようにもできますが、仕様上はN件のメッセージを同時に捌けます。SQS と Lambda の親和性はとてもよく、サーバーの管理をすることなくキューイングされたメッセージを”よしなに”スケールしながら対処してくれる機構が出来上がります。

例えば、弊社では Kinesis data stream を使うほど大量に、短い時間にリクエストがくるわけではないが、それなりに同時にリクエストが発生する見込みがあり、Lambda内部の処理が少し時間のかかるものである時などに、SQSを緩衝材として採用したりしています。Lambda が勝手に並列化することによって、処理がファンアウトされ、多少時間のかかる処理を入れていても、それなりにパフォーマンスが出る、という設計になりますし、Lambda 及び SQS の設定値を調整することでスケールの度合いもある程度調整できる状態になります。

バッチ処理は他に Kinesis data stream や DynamoDB Streams などがありますが、ベースの考え方は大体一緒です。(細かい仕様の違いはありますが)

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-kinesis.html
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.Lambda.html

Powertools for AWS (Python) を使うと何が嬉しいのか?

ここからは、実際の便益について話をしていきます。ざっくり言えば、以下の3点ほど嬉しいポイントがあります。

例えば、以下のような簡単な関数を考えてみましょう

def do_something(record: dict) -> None: 
    ...

def handler(event: dict, context: dict) -> dict: 
    print(event)
    for record in event["Records"]: 
        do_something(record) 
    return event

このとき、SQS の標準キューイベントは以下のようなオブジェクトになります(参考

イベントオブジェクト
{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

正直、SQS のイベントを覚えていられないですし、実際ほとんどのケースで body ばかり扱います。しかし、例えば部分応答を自分で対応しようとすると、SQSのイベントも理解して Lambda 関数を書かなくてはいけなくなります。

これが、Powertools を用いると以下のようになります。

from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)  


def do_something(record: SQSRecord) -> None:  
    # 単一のレコードに対する処理を行う関数
    ...


def handler(event: dict, context: LambdaContext) -> dict:
    # Lambdaが呼び出す関数
    return process_partial_response(  
        event=event,
        record_handler=do_something,
        processor=processor,
        context=context,
    )

初見だと少し難しく感じるかもしれませんが、先ほど for文で書いていた部分を record_handler として Powertools の process_partial_response に引き渡しただけです。この実装ですでに、Lambda の部分応答機能に対応しています。部分応答とは、取得したメッセージのうち一部でエラーが発生した場合に、エラーが発生したメッセージだけをキューに残してそれ以外は削除する機能です。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting

例えば、 do_something の中に、try/except を用意してエラーが発生する可能性のある処理を実装しておくと、処理に失敗した際に、Powertools の BatchProcessor が対応するメッセージIDをレスポンスに含めて返却してくれるようになります。

def do_something(record: SQSRecord) -> None: 
    try: 
        ... 
    except Exception as e : 
        print(e)
        raise 

もし Powertools を使用しない場合、以下のような感じで実装することになります。

def handler(event: dict, context: dict) -> dict:
    print(event) 
    batch_item_failures = []
    sqs_batch_response = {}

    for record in event["Records"]:
        try:
	    # process message
        except Exception as e:
	    batch_item_failures.append({"itemIdentifier": record['messageId']})

    sqs_batch_response["batchItemFailures"] = batch_item_failures
    return sqs_batch_response

もちろん実装自体は難しくはないのですが、Lambda 関数だけ見ている時に、なぜこの実装を入れていたのかを忘れやすく、ミスを起こしかねません。

各メンバーのAWSに対する習熟度にかなり実装が依存してしまうのもあり、弊社ではPowertoolsを使って、ある程度仕様を知らなくてもコード上から意図が汲み取れるようにしています。

おわりに

本記事では、Powertools for AWS (Python) を用いたバッチ処理の実装について紹介しました。
この記事や弊社に少しでもご興味持っていただけたら、是非カジュアル面談などにお越しください!

トドケールテックブログ

Discussion