🎈

Amazon SQSに溜めてLambda/boto3でまとめて取り出したい

2022/12/29に公開約6,300字

やりたいこと

SQSにメッセージを溜めておいて、ある時点でLambdaでまとめて取り出したいです。例えばEventBridgeで30分ごとにLambdaを呼んで処理させる、とかです。

準備

SQS作成

fifoのデフォルトで作成します。名前はmyqueue.fifoとします。URLが得られるのでqueue_urlとして使います。

queue_url = "https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo"

Lambda/boto3

Lambdaでやっても同じなのですが、今回はboto3でやります。
python 3.9
boto3 1.26.12

やってみる

メッセージを送信する

  • 30個のメッセージを送ります
  • メッセージは Index: 0, EventTime: 2022-12-31 12:00:00 など
  • MessageDeduplicationIdをずらすためにepochナノ秒を入れています(これがいいのかは不明)
  • MessageGroupIdはGroup1で固定
  • print(response)は別になくてもよいです(最後の送信を確認しているだけ)
sqs_send.py
import time
from datetime import datetime

import boto3

client = boto3.client('sqs')
url_fifo = 'https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo'

for i in range(30):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    response = client.send_message(
        QueueUrl=url_fifo,
        MessageBody=f'Index: {i}, EventTime: {now}',
        MessageDeduplicationId=str(time.time_ns()),
        MessageGroupId='Group1'
    )

print(response)

送信すると、コンソール上ではMessages available が30になっています。

メッセージを受信して、削除する

receive_messageで取り出すとき、1回のAPI実行で取得できるメッセージは「10個が最大」です。ちょっと困りますね。

キューからメッセージを取り出すと、例えばこういうものが得られます。Bodyが元のメッセージで、MD5OfBody、MessageId、ReceiptHandleが追加されています。

{
'Body': 'Index: 0, EventTime: 2022-12-29 14:57:29',
'MD5OfBody': '5f47b1dc59543e49a624f055fe0c26bc',
'MessageId': '0cbd8441-ce59-4751-a17e-8e9c60af0360',
'ReceiptHandle': 'AQEBT5CpnNHQ8dpL7wSVxHKBJ3dkoMevHDufpXTiPc5PLjnjoTUTLoXIYYR03oEp0L5SPJFuds1q(略)OZlGJlepUsR+HPDEJvohyotp3LeR92Wkkpdp/m2b2M8M6JsGUXIdkGin6x6pTrZeHjv8TYqRE='
},
  • 上のサンプルで30個のメッセージを作成してから、下の取得削除のスクリプトを動かすと、3回ループして全部消せるはずです。
  • 最初に10個取り出したあと、削除しないと次の10個は取り出せません
    • 可視性タイムアウトはデフォルトの30秒にしていますが、すぐ消しているのであまり関係ないです
  • receive_messageの直後にdeleteしていますが、各メッセージのもつReceiptHandleを入力する必要があり、responseから取り出しています。このReceiptHandleはすぐに(数十秒?)で期限が切れてしまうので手動でやるときは注意が必要です。
  • is_empty()は、キューが空であることを確認しています。(AWSドキュメント)
sqs-receive.py
import boto3


def is_empty(client, queue_url):
    attr1 = "ApproximateNumberOfMessages"
    attr2 = "ApproximateNumberOfMessagesNotVisible"
    attr3 = "ApproximateNumberOfMessagesDelayed"

    response = client.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=[
            attr1,
            attr2,
            attr3,
        ],
    )
    r = response["Attributes"]
    if r[attr1] == "0" and r[attr2] == "0" and r[attr3] == "0":
        print("Empty")
        return True
    else:
        print("Not empty")
        print(attr1, r[attr1])
        print(attr2, r[attr2])
        print(attr3, r[attr3])
        return False


client = boto3.client("sqs")
queue_url = "https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo"


while True:
    response = client.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
    )

    try:
        for i in response["Messages"]:
            print(i["Body"])
            ReceiptHandle = i["ReceiptHandle"]
            client.delete_message(QueueUrl=queue_url, ReceiptHandle=ReceiptHandle)
    except KeyError:
        pass

    if is_empty(client, queue_url):
        break

実行結果

  • 正しく消せるのですが、is_empty()の反応が遅く、空回りしていることが分かります。
  • 数秒で30個は取り出せるので、Lambdaの1回実行の範囲で1000個くらいのメッセージは楽に取り出せるのではないでしょうか。1万だと真面目に検討しないといけないかもしれません。
# python sqs-receive.py
Index: 0, EventTime: 2022-12-29 16:08:25
Index: 1, EventTime: 2022-12-29 16:08:25
Index: 2, EventTime: 2022-12-29 16:08:26
Index: 3, EventTime: 2022-12-29 16:08:26
Index: 4, EventTime: 2022-12-29 16:08:26
Index: 5, EventTime: 2022-12-29 16:08:26
Index: 6, EventTime: 2022-12-29 16:08:26
Index: 7, EventTime: 2022-12-29 16:08:26
Index: 8, EventTime: 2022-12-29 16:08:26
Index: 9, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Index: 10, EventTime: 2022-12-29 16:08:26
Index: 11, EventTime: 2022-12-29 16:08:26
Index: 12, EventTime: 2022-12-29 16:08:26
Index: 13, EventTime: 2022-12-29 16:08:26
Index: 14, EventTime: 2022-12-29 16:08:26
Index: 15, EventTime: 2022-12-29 16:08:26
Index: 16, EventTime: 2022-12-29 16:08:26
Index: 17, EventTime: 2022-12-29 16:08:26
Index: 18, EventTime: 2022-12-29 16:08:26
Index: 19, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Index: 20, EventTime: 2022-12-29 16:08:26
Index: 21, EventTime: 2022-12-29 16:08:26
Index: 22, EventTime: 2022-12-29 16:08:26
Index: 23, EventTime: 2022-12-29 16:08:26
Index: 24, EventTime: 2022-12-29 16:08:26
Index: 25, EventTime: 2022-12-29 16:08:26
Index: 26, EventTime: 2022-12-29 16:08:26
Index: 27, EventTime: 2022-12-29 16:08:26
Index: 28, EventTime: 2022-12-29 16:08:26
Index: 29, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Empty

まとめ

  • SQSでメッセージを溜めてboto3で取り出す方法を紹介しました
  • SQSほとんど使ったこと無く、難しかったです
  • パラメータは適当にデフォルトのままだったので、うまく使えるといいのかも?

Discussion

ログインするとコメントできます