🎈
Amazon SQSに溜めてLambda/boto3でまとめて取り出したい
やりたいこと
SQSにメッセージを溜めておいて、ある時点でLambdaでまとめて取り出したいです。例えばEventBridgeで30分ごとにLambdaを呼んで処理させる、とかです。
準備
SQS作成
fifoのデフォルトで作成します。名前はmyqueue.fifo
とします。URLが得られるのでqueue_url
として使います。
queue_url = "https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo"
Lambda/boto3
Lambdaでやっても同じなのですが、今回はboto3でやります。
python 3.9
boto3 1.26.12
やってみる
メッセージを送信する
- 30個のメッセージを送ります
- メッセージは Index: 0, EventTime: 2022-12-31 12:00:00 など
- MessageDeduplicationIdをずらすためにepochナノ秒を入れています(これがいいのかは不明)
- MessageGroupIdはGroup1で固定
-
print(response)
は別になくてもよいです(最後の送信を確認しているだけ)
sqs_send.py
import time
from datetime import datetime
import boto3
client = boto3.client('sqs')
url_fifo = 'https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo'
for i in range(30):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = client.send_message(
QueueUrl=url_fifo,
MessageBody=f'Index: {i}, EventTime: {now}',
MessageDeduplicationId=str(time.time_ns()),
MessageGroupId='Group1'
)
print(response)
送信すると、コンソール上ではMessages available が30になっています。
メッセージを受信して、削除する
receive_messageで取り出すとき、1回のAPI実行で取得できるメッセージは「10個が最大」です。ちょっと困りますね。
キューからメッセージを取り出すと、例えばこういうものが得られます。Bodyが元のメッセージで、MD5OfBody、MessageId、ReceiptHandleが追加されています。
{
'Body': 'Index: 0, EventTime: 2022-12-29 14:57:29',
'MD5OfBody': '5f47b1dc59543e49a624f055fe0c26bc',
'MessageId': '0cbd8441-ce59-4751-a17e-8e9c60af0360',
'ReceiptHandle': 'AQEBT5CpnNHQ8dpL7wSVxHKBJ3dkoMevHDufpXTiPc5PLjnjoTUTLoXIYYR03oEp0L5SPJFuds1q(略)OZlGJlepUsR+HPDEJvohyotp3LeR92Wkkpdp/m2b2M8M6JsGUXIdkGin6x6pTrZeHjv8TYqRE='
},
- 上のサンプルで30個のメッセージを作成してから、下の取得削除のスクリプトを動かすと、3回ループして全部消せるはずです。
- 最初に10個取り出したあと、削除しないと次の10個は取り出せません
- 可視性タイムアウトはデフォルトの30秒にしていますが、すぐ消しているのであまり関係ないです
- receive_messageの直後にdeleteしていますが、各メッセージのもつ
ReceiptHandle
を入力する必要があり、responseから取り出しています。このReceiptHandle
はすぐに(数十秒?)で期限が切れてしまうので手動でやるときは注意が必要です。 -
is_empty()
は、キューが空であることを確認しています。(AWSドキュメント)
sqs-receive.py
import boto3
def is_empty(client, queue_url):
attr1 = "ApproximateNumberOfMessages"
attr2 = "ApproximateNumberOfMessagesNotVisible"
attr3 = "ApproximateNumberOfMessagesDelayed"
response = client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
attr1,
attr2,
attr3,
],
)
r = response["Attributes"]
if r[attr1] == "0" and r[attr2] == "0" and r[attr3] == "0":
print("Empty")
return True
else:
print("Not empty")
print(attr1, r[attr1])
print(attr2, r[attr2])
print(attr3, r[attr3])
return False
client = boto3.client("sqs")
queue_url = "https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo"
while True:
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
)
try:
for i in response["Messages"]:
print(i["Body"])
ReceiptHandle = i["ReceiptHandle"]
client.delete_message(QueueUrl=queue_url, ReceiptHandle=ReceiptHandle)
except KeyError:
pass
if is_empty(client, queue_url):
break
実行結果
- 正しく消せるのですが、
is_empty()
の反応が遅く、空回りしていることが分かります。 - 数秒で30個は取り出せるので、Lambdaの1回実行の範囲で1000個くらいのメッセージは楽に取り出せるのではないでしょうか。1万だと真面目に検討しないといけないかもしれません。
# python sqs-receive.py
Index: 0, EventTime: 2022-12-29 16:08:25
Index: 1, EventTime: 2022-12-29 16:08:25
Index: 2, EventTime: 2022-12-29 16:08:26
Index: 3, EventTime: 2022-12-29 16:08:26
Index: 4, EventTime: 2022-12-29 16:08:26
Index: 5, EventTime: 2022-12-29 16:08:26
Index: 6, EventTime: 2022-12-29 16:08:26
Index: 7, EventTime: 2022-12-29 16:08:26
Index: 8, EventTime: 2022-12-29 16:08:26
Index: 9, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Index: 10, EventTime: 2022-12-29 16:08:26
Index: 11, EventTime: 2022-12-29 16:08:26
Index: 12, EventTime: 2022-12-29 16:08:26
Index: 13, EventTime: 2022-12-29 16:08:26
Index: 14, EventTime: 2022-12-29 16:08:26
Index: 15, EventTime: 2022-12-29 16:08:26
Index: 16, EventTime: 2022-12-29 16:08:26
Index: 17, EventTime: 2022-12-29 16:08:26
Index: 18, EventTime: 2022-12-29 16:08:26
Index: 19, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Index: 20, EventTime: 2022-12-29 16:08:26
Index: 21, EventTime: 2022-12-29 16:08:26
Index: 22, EventTime: 2022-12-29 16:08:26
Index: 23, EventTime: 2022-12-29 16:08:26
Index: 24, EventTime: 2022-12-29 16:08:26
Index: 25, EventTime: 2022-12-29 16:08:26
Index: 26, EventTime: 2022-12-29 16:08:26
Index: 27, EventTime: 2022-12-29 16:08:26
Index: 28, EventTime: 2022-12-29 16:08:26
Index: 29, EventTime: 2022-12-29 16:08:26
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Not empty
ApproximateNumberOfMessages 20
ApproximateNumberOfMessagesNotVisible 0
ApproximateNumberOfMessagesDelayed 0
Empty
まとめ
- SQSでメッセージを溜めてboto3で取り出す方法を紹介しました
- SQSほとんど使ったこと無く、難しかったです
- パラメータは適当にデフォルトのままだったので、うまく使えるといいのかも?
Discussion