SQSとLambdaのイベントソースマッピングを試してみる
SQSをソースにしてLambdaにマッピングして処理させる、ということを試したものの、よくわからないという感触です。FIFOは比較的すっきりしているのですが、Standardは難しいです(制御できないという意味で)
準備
Lambdaを作る
Console -> Lambda Create function -> Python3.9 -> 作成します。関数名は適当に。
Codeは下記に置き換えて、Lambdaをデプロイします。
def lambda_handler(event, context):
for i in event['Records']:
print(i['body'])
LambdaのRoleにAmazonSQSFullAccess
を追加します。
これで、受け取ったeventのbody
部分をCloudWatch Logsに書き込むだけのLambdaができました。
SQSキューを作る
SQS Console -> Create Queue -> Standard -> Name は適当にmyqueue.fifo
とmyqueue
で。あとは全てデフォルトで。URL(https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo
)はあとで使います。
マッピングする
Add triggerから追加します。(関数名をタイポしているけどドンマイで)
バッチウィンドウは10、バッチサイズは0で(ここら辺が調整必要かも?)
boto3でメッセージをSQSに送る
それぞれfifoキューとstandardキューにメッセージを送ります。Index: 0, EventTime: 2022-12-31 12:00:00
といった形式で、これbody
です。Indexが0から9の10個になるようにします。
import time
from datetime import datetime
import boto3
client = boto3.client('sqs')
url_fifo = 'https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue.fifo'
for i in range(10):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = client.send_message(
QueueUrl=url_fifo,
MessageBody=f'Index: {i}, EventTime: {now}',
MessageDeduplicationId=str(time.time_ns()),
MessageGroupId='Group1'
)
from datetime import datetime
import boto3
client = boto3.client('sqs')
url_standard = 'https://sqs.ap-northeast-1.amazonaws.com/111122223333/myqueue'
for i in range(10):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = client.send_message(
QueueUrl=url_standard,
MessageBody=f'Index: {i}, EventTime: {now}',
)
結果
Lambdaは常にポーリングし続けていて、最大5つクライアントが実行可能だそうです。
FIFO
- 1つのLambda実行では10個全て取得できないが、何回かに分けて順番に取り出している。特に違和感はない
- 1秒以内で終わっている
Standard
- 10個が順番バラバラで取得される
- ある時間で何個かのLambdaが実行されたあと、可視性タイムアウトの時間だけ待って再実行している(下の図の線。このときは可視性タイムアウトをデフォルト30秒から20秒に変更していた。)
よくわからなかったこと
(私の手順が間違っていたのかもしれないですが、こういうことがありましたというメモ)
- CloudWatchログの時刻よりも遅れて(数分〜数時間?)コンソールに現れるようになったときがあった
- データが一部消えて、10個のはずが欠けてしまうようなことがあった。Standardでのみ。
空振りを気にしたほうが良さそう
Empty Receiveでも課金されるので、コンスタントにメッセージが来るときでないと無駄課金されるかもしれません。
不定期だったり溜めて取り出したいなら以前書いたこのやり方のほうがよいかなと思いました。
まとめ
SQSのイベントソースマッピングを試しました。使うならFIFOのほうが使いやすそうです。
TODO
- パラメータをもう少し調整してみたい
- フィルターを試してみたい
- EventBridge Pipesだとどうなるのかやってみたい
参考にしたもの
Discussion