🔌

メッセージキューを用いた Lambda 関数間の接続

2022/04/10に公開

はじめに

a. 記事概要

Amazon SNS および Amazon SQS を用いて、サーバーレスアプリケーションにおける Lambda 関数間のコネクタを実装する手順について解説します。

b. 本記事で解説しないこと

各種 AWS リソースの詳細、および開発手順については説明を割愛します。

  • Lambda 関数
  • SNS トピック
  • SQS キュー
  • S3 バケット ... etc.

目次

1. サーバーレスコンポーネント間の接続

1-1. Step Functions によるワークフローの実装

Lambda 関数ECS タスク のようなサーバーレスなコンピューティング環境を利用してアプリケーションを開発されている方は多いかと思います。機能を適切に分割した上でそれぞれのタスクを実装し、Step Functions のようなワークフローエンジンから呼び出すことで、保守性の高いサーバーレスアプリケーションを簡単に構築することができます。

こちらの記事 では、『入力された地名に関する様々な地理情報を取得するアプリケーション』を Lambda と Step Functions で実装し、その実行状態を AWS X-Ray で可視化するデモについてご紹介しました。

1-2. 今回実現したい要件

Step Functions ではステートマシンを実行すると、[ Start ] を起点に事前定義された順序で各ステップの実行が進行します。ワークフロー途中のステップを起点とする実行はサポートされていません。(本記事執筆時点: 2022/04/09)

しかしアプリケーションによっては、このような途中からの実行を実現したい場合があります。これに対するアプローチはいくつか考えられますが、今回は記事タイトルの通り、Lambda 関数間に Pub/Sub 型のコネクタを挟む方式について解説します。

2. デモアプリケーションについて

単一の前処理用 Lambda 関数 (Preprocessor) および複数のサービス用 Lambda 関数 (Services) を用意し、その間に SNS トピックと SQS キューからなるコネクタ (Connector) を配置します。また、サービス用 Lambda の実行と出力を確認しやすいよう、通知用の SNS トピック (Notifier) を後段に配置しています。

前処理用 Lambda 関数では、リクエスト ID の発行および JSON ファイルの S3 アップロードを行います。サービス用 Lambda 関数では、リクエスト ID に基づいてアップロードされた JSON ファイルをダウンロードし、適当な処理を加えてこれを出力とします。リクエスト ID の連携はコネクタを介して行います。

3. 実施手順

3-1. Lambda 関数

各種 Lambda 関数を開発します。以下はサンプルです。

前処理用 (Preprocessor)

lambda_function.py
# coding: utf-8
import os
import json
import hashlib
from datetime import datetime
import boto3

s3 = boto3.resource("s3")
sns = boto3.resource("sns")

def lambda_handler(event, context): 

    service_name = "preprocessor"
    s3_bucket = os.environ["S3_BUCKET"]
    s3_prefix = os.environ["S3_PREFIX"]
    sns_topic_arn = os.environ["SNS_TOPIC_ARN_CONNECTOR"]
    print(sns_topic_arn)

    timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
    request_id = hashlib.md5(timestamp.encode()).hexdigest()
    
    # Upload sample data
    s3_key = os.path.join(s3_prefix, service_name, f"{request_id}.json")
    s3_obj = s3.Object(bucket_name=s3_bucket, key=s3_key)
    data = {"key1": "val1", "key2": "val2", "key3": "val3", }
    print(s3_obj.put(Body=bytes(json.dumps(data).encode('UTF-8'))))

    # Publish to SNS Topic
    sns_topic = sns.Topic(sns_topic_arn)
    response_body = {"request_id": request_id}
    print(sns_topic.publish(Message=json.dumps(response_body)))
    
    return {'statusCode': 200, 'body': response_body}

サービス用 (Services)

lambda_function.py
# coding: utf-8
import os
import json
import boto3

s3 = boto3.resource("s3")
sns = boto3.resource("sns")
sqs = boto3.resource("sqs")
sqs_client = boto3.client("sqs")

def lambda_handler(event, context): 

    previous_service_name = "preprocessor"
    service_name = "service01"
    s3_bucket = os.environ["S3_BUCKET"]
    s3_prefix = os.environ["S3_PREFIX"]
    sns_topic_arn = os.environ["SNS_TOPIC_ARN_NOTIFIER"]
    records = event["Records"]
    consumed_request_ids = list()
    
    for record in records:
        
        record_body = json.loads(record["body"])
        message = json.loads(record_body["Message"])
        request_id = message["request_id"]
        consumed_request_ids.append(request_id)
    
        # Download data and add some changes
        s3_key = os.path.join(s3_prefix, previous_service_name, f"{request_id}.json")
        s3_obj = s3.Object(bucket_name=s3_bucket, key=s3_key)
        data = json.loads(s3_obj.get()['Body'].read().decode('utf-8'))
        data["key1"] = None
    
        # Upload edited data
        s3_key_new = os.path.join(s3_prefix, service_name, f"{request_id}.json")
        s3_obj_new = s3.Object(bucket_name=s3_bucket, key=s3_key_new)
        print(s3_obj_new.put(Body=bytes(json.dumps(data).encode('UTF-8'))))
        
        # Publish to SNS Topic
        sns_topic = sns.Topic(sns_topic_arn)
        print(sns_topic.publish(Message=json.dumps({"request_id": request_id})))

    return {
        'statusCode': 200, 
        'body': {"consumed_request_ids": consumed_request_ids}
    }

3-2. コネクタ

コネクタに必要なリソースの作成、および設定を行います。

SNS トピック

適当なリソース名で、コネクタ毎に SNS トピックを作成します。ARN は前処理用 Lambda 関数 (Preprocessor) の環境変数に設定しておきます。

SQS キュー

適当なリソース名で、サービス毎に SQS キューを作成します。

[ SNS subscriptions ] タブにて [ Subscribe to Amazon SNS Topic ] を押下し、SNS トピックの購読を設定します。

[ Lambda triggers ] タブにて [ Configure Lambda function trigger ] を押下し、Lambda 関数トリガーを設定します。

3-3. 動作確認

Preprocessor の [ Test ] から、前処理用 Lambda 関数を実行します。サンプルコードではイベント変数は利用していないので、任意のもので問題ありません。

発行されたリクエスト ID は 26d84a1ffe20c326499614a49b48dc8f となっています。

SNS トピックからのメール通知を確認すると、リクエスト ID がメッセージキューを通して連携できていることが分かります。

S3 バケットには、Lambda 関数 (service01) によって変更された JSON データが、所定の S3 ロケーションにアップロードされています。

4. 落穂拾い

4-1. Amazon SQS と AWS Step Functions の使い分け

SQS と Step Functions の使い分けについて、公式には以下のような記述があります。Step Functions では X-Ray との連携がネイティブでサポートされていますが、SQS の場合トラッキングの仕組みを導入するには別途開発する必要があります。

Q: AWS Step Functions とAmazon SQS はどのように使い分けますか?

高度にスケーラブルで監査可能なアプリケーションの開発においてサービスコンポーネントを調整する必要がある場合は、AWS Step Functions を使用してください。サービス間でメッセージを送信、保存、および受信するために、信頼性が高く、高度にスケーラブルなホストキューが必要な場合は、Amazon Simple Queue Service (Amazon SQS) を使用することをお勧めします。Step Functions では、アプリケーション内のすべてのタスクとイベントのトラッキングが行われます。 Amazon SQS では、アプリケーションレベルのトラッキングを独自に実装する必要があります。 (... 省略 ...)

(出典: AWS Step Functions のよくある質問)

4-2. Lambda 関数による SQS キューのポーリング

SQS キューは Pull 型で動作します。すなわち、メッセージ受信をトリガーとしてキュー側から Push するのではなく、メッセージ保持期間内の任意のタイミングでコンシューマー側からメッセージを取得する方式です。

ただし Lambda 関数トリガーを設定した場合、Lambda サービスがコンシューマーの代わりにキューに対してポーリングを行います。これにより、コネクタの外側から見ればあたかも Push 型で動作しているように捉えることができます。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html

さいごに

本記事では、サーバーレスアプリケーションにおけるコンポーネント間接続として、Pub/Sub 型のコネクタを実装する方法について紹介しました。いかがでしたでしょうか。

アプリケーション要件によっては、Step Functions の条件分岐 (Choice ステート) を用いてスキップを実装する、ステートマシンをネストする なども考えられます。またサーバーレスにこだわらなければ、ワークフローエンジンとして、ワークフロー途中からの実行をサポートしている Apache Airflow (あるいは Amazon MWAA) を利用することも検討するべきかもしれません。

今回はメッセージキューサービスとして Amaozn SQS に焦点を当ててみましたが、機会があれば Amazon KinesisAmazon MQ についても扱ってみたいと思います。

最後までご覧いただき、ありがとうございました。

参考

Discussion