Lambdaの処理をSNS経由でSlackに通知する
実装時に参考になりそうなものがないかなと探したけどなさそうだったのでメモがてらに
Lambdaは実行結果を別のLambdaやSQS,SNSなどAWSのいくつかの処理にリレーするという機能があります。同期実行では使えない(基本的に呼び出し元に返す)ですが、非同期実行では使えます。
私がよく使う例だと、CloudWatch Eventsでバッチ処理のように定時で動かす処理の結果、エラーや異常が発生した時だけSNS経由で通知すると言った処理が実装できます。Lambdaの処理中からWebhookなどを叩いてもいいですが、(Goで処理を記述することが多いので)エラーハンドリングできていない想定外の部分でのエラーやタイムアウト・Lambda環境のエラーといった問題が発生した時にも通知しようと考えるとと通知処理を切り分ける必要が出てくるためこのような実装が想定される。
というわけで以下のようなフローを想定します。
Lambda(メイン処理)失敗→SNS→Lambda(通知用)
実装
メイン処理
実際にどんなデータが流れてくるのか確認も含めてこんなテストコードを用意しました
package main
import (
"context"
"fmt"
"os"
"github.com/aws/aws-lambda-go/lambda"
)
func HandleRequest(ctx context.Context) (string, error) {
if os.Getenv("MODE") == "ERROR" {
return "Hello Error!", fmt.Errorf("Error!!")
}
return "Hello World!", nil
}
func main() {
lambda.Start(HandleRequest)
}
環境変数を設定してあげればエラーが出るわけですね。これをビルドしてzipで固めてLambdaにあげます。ハンドラなどの設定も忘れずに
ためしに実行してみます。まず環境変数を設定しない場合
こんな感じで成功します。次に環境変数MODE=ERROR
の場合
ちゃんと失敗してます。
SNSトピック作成
次にSNSトピックを作成します。
今回はスタンダードでやります。今回のように通知だけできればいい場合はFIFOにする必要性は薄いですが、ユースケースによっては厳密さやスピードが必要なケースもあるでしょう(厳密な知見はない……)
その他オプションは適宜設定してください。暗号化やVPCなどは状況によっては必要でしょう。
次にSNSトピックに失敗時の通知を飛ばす設定をします。
関数の概要から送信先を追加で、連携先のSNSトピックはさっき設定した名前を選択します。ここで権限もつけてくれるのですが、権限をつけると表示される前に保存ボタンを押すと失敗します。ワンテンポおいた方がいいです(これ割とどうにかして欲しい)
こんな感じでSNSトピックと連携させます。こんな感じになります。
通知用Lambda作成
最後に通知用Lambdaを用意します。
SNSから受け取って処理をするLambdaは実は設計図が用意されているのでそれを使わせてもらいます。
Node.jsでいいでしょう。関数作成時にSNSトリガーを設定できるのでさっき作ったSNSを指定しておきます。
設計図の出力したコードに適当なSlack通知のWebhookに適切な情報を渡すようにしてあげます。Incoming Webhookのあれこれは別途調べてください。
地味な罠でhandlerに渡される関数が非同期だとhttpアクセスが終わらないまま終了したりします。非同期で完了するまで待ってあげるか同期処理に書き換えてください。
console.log('Loading function');
var https = require('https');
exports.handler = function(event, context){
//console.log('Received event:', JSON.stringify(event, null, 2));
const message = event.Records[0].Sns.Message;
console.log('From SNS:', message);
var host = "hooks.slack.com"
var path = "slackからとってきたINCAMING WEBHOOKのPATH情報"
let options = {
hostname: host,
path: path,
port: 443,
method: 'POST',
headers: {
'Content-Type' : 'application/json'
}
};
let payload = { text: message, username:"test_notification", blocks:[{
type: "section",
text: {
type: "plain_text",
text: message
}
}]};
console.log('payload : ', payload);
const data = JSON.stringify(payload);
let req = https.request(options, (res)=>{
res.setEncoding('utf8');
res.on('data', (chunk)=>{
});
});
req.on('error', (e)=>{
console.log('missingpoint : ' + e.message);
});
req.write(data);
req.end();
return message;
};
あとはLambdaのテストを使います。SNSトピックから渡されるテンプレートがあるのでそのまま使いましょう。
成功すればこんな感じで通知がきます。細かい部分はお好みで合わせていきましょう。
サブスクリプションの開始
一旦SNSトピックに戻ってサブスクリプションを生成します。
さっき作ったばかりのSNSトピックからサブスクリプションの作成を選び、今作った通知用Lambdaに対して発行したメッセージを流すように設定します。
サブスクリプションを発行したらいよいよそれぞれの連携をテストします。
連携テスト
まずSNSトピックでメッセージの発行を選び雑にメッセージを送ってみます。
雑ですね。オプションなどはスルーで可。
うまくいけば入力情報がそのままSlackに飛ばされます。
この状況でログをとっていれば(あとLambdaのテストのテンプレートを見れば)SNSトピックからLambdaに飛ばされる情報がどうなっているのかある程度推測がつきます(ドキュメントにまとまってなかったぽいんですけどどこにあるんでしょう。知ってる人いたら教えてください)。詳しくは後述します。
ではいよいよ処理の失敗を通知しましょう。
実動作テスト
先に言っておくと、この処理系は最初のLambdaが 非同期処理 じゃないと使えないので、そのままテストを押してもダメです
そして現在Lambdaは単体で非同期実行できる環境を持っていません。
適当なトリガーで起動させます。APIGatewayで然るべき設定をしてあげるか(ドキュメント)CloudWatch Eventで定期実行するかあたりがわかりやすいかと思います。
(余談ですがAPIGatewayと通知の仕組みをうまく組み立てればバックエンド非同期実行APIみたいなものも作れると思います)
まあテストするだけならCloudWatch Eventで1分おきに起動にしてちょっと放置しておけばいいでしょう(その際、非同期実行の自動再実行機能の回数を0にしておいたほうがいいでしょう。デフォルトだと3回エラーで失敗すると通知が飛んできます。なお通知は一回分(最後の失敗分)が飛んでくるようです)
そしてテストしたらエラーメッセージを含んだちょっと複雑なJSONがそのまま流れてきたと思います。
SNSから渡されるメッセージ
ドキュメントによるとこうだそうです
{
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "2019-01-02T12:45:07.000Z",
"Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
"SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"Message": "Hello from SNS!",
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
},
"Type": "Notification",
"UnsubscribeUrl": "https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"TopicArn":"arn:aws:sns:us-east-2:123456789012:sns-lambda",
"Subject": "TestInvoke"
}
}
]
}
Lambdaのテンプレートからもわかりますが、このJSONの Records[0].Sns.Message
がSNSに渡される出力です。ちなみにLambdaの結果はJSONなのでここにはJSON Stringが入ります(つまりデコードが必要)。
const message = event.Records[0].Sns.Message;
const errorMessage = JSON.parse(message).responsePayload.errorMessage;
とすることで本処理で出したエラーメッセージをそのまま持ってこれます。responsePayload
以外にもrequestContext
から呼び出し元の本処理LambdaのARNが持ってこれたりするので通知時に情報を乗っけて送るといろいろわかって便利です。
またトップ階層にある Records
の配列が複数の要素を入れてくるケースがあるのかは謎です。SNSのキューに大量にメッセージが流れ込めばあり得るのかもしれないですが現状そんなに大量に流すテストをしていないので確認してません。テンプレートでは配列の要素を1つに絞っているのでそもそも一つしか流れてこないかもしれません。
今回は失敗時のみですが、Lambdaから成功時の送信先で設定することで同様のこともできます。この時は同じようにSNSのMessageに結果の構造体のJSON Stringが入ってくるのでそれを解析します。成功時のフォーマットはこっちで勝手に決めたものが流れてくるので定義をうまく運用したいところです。
もし通知用のLambdaを使いまわす場合はMessageのJSONを解析した結果で場合分けを取ることになりますが、その辺り上手い知見があればぜひ参考にしたいところです。
なおMessage以外のSNSからのペイロードの詳細に関しては正直わからなかったので解説できません。ドキュメントが見つからない…… コメントにて教えていただきましたが、API Gatewayに流す場合に解説がありました。こちらです
最後に
ざっくりメリットを書き出すとこんな感じでしょうか。
- SNSを一旦噛ませることで抽象化レイヤーを作れる
- 通知処理を別に回すことで本処理の責務が明確化される
- 通知関係の変更が必要になった場合対応コストが大幅に安くなる
- Lambdaが想定外のエラーで死んだときに通知できる
- (取れれば)SNSを含むメタ情報を持ち込んで処理に活かせる
私が解決したかったのは主に3つ目と4つ目でした。ただ実際のところ他の手段でも解決できるといえばできるのでケースバイケースではあると思います。例えば
- LambdaからSNSを経由する必要はあるのか?(LambdaからLambdaに結果を直接流すこともできる)
- Lambdaから直接Lambdaを呼び出すのが心理的に抵抗がある
- CloudWatchの結果を監視するのではだめか?
- キーワードで全部監視するほどじゃない場合もある
- 成功時の通知などは能動的な通知処理を実装した方がいいと思う
- 複数のLambdaからSNS経由で通知処理に流れてくるのはかえって煩雑化するのではないか?
- これは否定しない。集中といいつつ場合分けが増えすぎて集中できてないパターンはある
と言ったあたりは議論の対象だと思います。事実として業務で使ってる通知用Lambdaはifだらけなので……
おまけでコード類をgithubにあげてます。
Discussion
SNS からのペイロードの詳細ですが、SNS → Lambda ではなく SNS → HTTP Endpoint の方に説明あります。
なるほどこっちにありましたか。ありがとうございます!