API Gateway LamdaのリクエストをトリガーにしてAWS SQSに紐付けたLambdaを動作させたいが動かないのなぜ
結論
Typescript側の実装の問題であった😇
こういうことがしたい
WebHook経由で処理すべきデータをキューにためておいて、キューへのsendmessage時に実行させる。
リトライや実行順、冪等性の管理を兼ねてfifoのSQSを経由させて処理をしたい
今遭遇している問題は、reserverからSQSにsendMessageをしても後続のfunctionが発火しない。
実行時ログを見る限りSQSのPushが正常に終わっていない?ように見えてる。
sendMessage投げてるコード片
const body = JSON.stringify(message);
const params = {...clientParams, MessageBody: body} as unknown as AWS.SQS.SendMessageRequest
return await sqsClient.sendMessage(params, (err, data) => {
if (err) {
console.log(err.stack)
} else {
console.log(data)
}
}).promise()
cloudwatchのログはこんな感じでログがでていない。
送信完了していない?
START RequestId: xxxxxxxxxx-4c14-4c91-92fb-xxxxxxxxxx Version: $LATEST
END RequestId: xxxxxxxxxx-4c14-4c91-92fb-xxxxxxxxxx
REPORT RequestId: xxxxxxxxxx-4c14-4c91-92fb-xxxxxxxxxx Duration: 258.12 ms Billed Duration: 259 ms Memory Size: 128 MB Max Memory Used: 83 MB Init Duration: 449.22 ms
実装的にはここを参考。
実際プロジェクトで見ているものは
"node_modules/aws-sdk": {
"version": "2.1197.0",
なので、2系で問題なし。
const res = await sqsClient.sendMessage(params).promise()
console.log(res)
return res
これでもログでない。うーん。
呼び出し元がこうなっている
const promises = array.map(async (to) => {
const message = { ... }
return sendQueue(client, message)
})
Promise.all(promises)
これで通るはずではあるんだが
あ、Promise.all自体が返してくるの、promiseでは
Promise.all([p1, p2, p3]).then(values => {
console.log(values); // [3, 1337, "foo"]
});
とある。これハンドリングがスッポ抜けてる系かもしれない。
だった。
await Promise.all()に変更したところ、正常にログがでるようになった
2022-09-06T08:54:49.588Z xxxxxxxxxxxx-f421-427c-9264-xxxxxxxxxxxx INFO error: The request must contain the parameter MessageGroupId.
エラー落ちですね😇
FIFOキューを使っている場合、MessageDeduplicationId だけでなく、MessageGroupIdも必須になっている。
MessageDeduplicationIdは重複したメッセージ送信の排除に使う。排除と言っておきながら、受付自体はされるので、冪等性の評価は別で実施する必要がありそう。
同じメッセージ重複除外 ID で送信されたメッセージは全て正常に受け入れられますが、5 分間の重複除外間隔中に配信されません。
MessageGroupIdは、FIFOの制御のために使われる。これが異なる場合は直列に処理されない模様。
同じメッセージグループに属するメッセージは、そのメッセージグループに関連して厳密な順序で常に 1 つずつ処理されます (ただし、異なるメッセージグループに属するメッセージの順序が一致しない場合があります)。
今回の要件だと、とあるキーに関連する情報の新規作成、更新を行うもの。なので、処理単位ではこんな感じにした
MessageGroupId: キー値
MessageDeduplicationId: {データ更新日:ymmddhh}-{ キー値 }
SQSのsendMessageはこれで通るようになって、実際にSQSのキューにメッセージが格納されたことを確認できている。