SNS + SQS + Lambdaを使ってWebhookを開発した話
ハコベルのサーバーサイドエンジニアの飯盛です。
ハコベルのシステムに、様々なイベントを外部に通知するWebhookの機構を開発したのでその実現方法をシェアします。
Webhookのようなイベントを通知する仕組みの構築を検討されている方の参考になれば幸いです!
背景
ハコベルでは、ハコベル運送手配とハコベル運送手配PLUSという2つの求荷求車サービスを提供しています。(リポジトリやインフラも全く別です)
それらを利用しているユーザーの中には、ハコベルで何らかの操作を行った後、同じ情報を自社の運行管理システムに手動で入力している会社がありました。
このような二重入力の手間があることから、業務の非効率化やミスが発生する可能性が高くなっていました。
この非効率を解消するために、ハコベルの各サービスで登録や変更があった場合外部のシステムに自動で連携されるようにしたいというモチベーションがありました。
何をやったか
構成
前述のモチベーションから、hacobell-webhooksというハコベルの登録と更新を外部に通知するWebhookシステムを作りました。ユーザーには情報連携用のAPIを設置してもらい、登録・更新イベントの度にそのAPIにPOSTリクエストを送信するような実装をしました。
Webhookの部分は以下のように、AWS SNS, SQS, Lamndaを使って構築しました。外部システムに送るリクエストボディの形はOpenAPIで事前に外部システムの開発チームと定義し、それに添う形でリクエストを送信します。
送信の流れ
以下のような流れでWebhook通知を送ります。
-
ハコベルの各サービスから非同期(Sidekiq)でSNSトピックにメッセージを送る
def publish(payload, message_group_id) Aws::SNS::Client.new.publish({ topic_arn: sns_topic_arn, message: { default: payload }.to_json, subject: 'subject', message_structure: 'json', message_group_id: message_group_id, }) end
-
SNS topic をサブスクライブしている SQS の キューに対してメッセージを配信する
-
Lambdaがキューを消費する
-
Lambda関数内で、1で送られてきたメッセージからOpenAPIで定義したスキーマに変換して、POSTリクエストを送る
package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "net/http" "os" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) error { fmt.Println("Received SNS event:") for _, record := range sqsEvent.Records { bodyJson := message.Body var body struct{ Message string } err := json.Unmarshal([]byte(bodyJson), &body) if err != nil { reporter.SendErrorToSentry(err, &message) log.Error().Err(err).Msg("Error Unmarshal SQSJson") return nil } // 送信するリクエストの形に変換 requestBody := convert(bodyJson) data := []byte(requestBody) resp, err := http.Post(TargetUrl, "application/json", bytes.NewBuffer(data)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("HTTP POST request failed with status code %d", resp.StatusCode) } fmt.Println("POST request successful") } return nil } func main() { lambda.Start(handler) }
※ サンプルコードはわかりやすさのため簡潔にしています。
この構成にした意図としては
- ハコベルの各サービスから非同期でイベント通知を送ることで各システム間を疎結合に出来る
- 送信対象が増えても、SNSを挟めばファンアウト(同一イベントで複数キューを積むこと)ができる
などです。
Lambda部分は自動生成と相性が良い、チームの技術スタックとあっているなどの理由でGolangで書きました。
よかったこと
自動生成にできるだけ頼って、コードを書く量を減らせた
以下の図のうち、
- ハコベル運送手配(Ruby) → SNSのメッセージ(json)
- ハコベル運送手配PLUS(Ruby) → SNSのメッセージ(json)
- SNSのメッセージ(json)→ Lambda(Golang)
- Lambda(Golang) → OpenAPI(json)
の4箇所でデータのやりとりが発生します。
これらすべての箇所でデータの変換を一から実装するのは中々骨が折れるので、できるだけコードの自動生成に頼りました。
そこで大変お世話になったのはJSON Type Definition とOpenAPIです。
どちらもスキーマ定義の規格で、これに従ってスキーマを書けば様々な言語でコードの自動生成ができます。
前述の図のうち①〜③の部分は単なるJSONのやり取りなのでJSON Type Definition, ④の部分はHTTPリクエストなのでOpenAPIを使って定義しました。
JSON Type Definitionの自動生成にはjtd-codegen, OpenAPIの自動生成にはoapi-codegen というライブラリを使用しました。
これによりRubyでは送信するjsonを生成するクラス、Goでは受信するメッセージと送信するリクエストボディの構造体が生成されて、整形やバリデーションが可能になりました。
JSON Type Definitionの場合の例を説明します。
Rubyからjsonを送る場合
まず例として、以下のような定義ファイルorder.jtd.json
を書きます
{
"properties": {
"id": { "type": "int32" },
"name": { "type": "string" },
"notes": { "type": "string", "nullable": true }
}
}
jtd-codegenを使って以下のようなコマンドを実行すると
jtd-codegen order.jtd.json --ruby-out app --ruby-module Webhook
以下のようなファイルwebhook.rb
が生成されます
# Code generated by jtd-codegen for Ruby v0.1.1
require 'json'
require 'time'
module Webhook
class Order
attr_accessor :id
attr_accessor :name
attr_accessor :notes
def self.from_json_data(data)
out = Order.new
out.id = Webhook::from_json_data(Integer, data["id"])
out.name = Webhook::from_json_data(String, data["name"])
out.notes = Webhook::from_json_data(String, data["notes"])
out
end
def to_json_data
data = {}
data["id"] = Webhook::to_json_data(id)
data["name"] = Webhook::to_json_data(name)
data["notes"] = Webhook::to_json_data(notes)
data
end
end
private
def self.from_json_data(type, data)
if data.nil? || [Object, TrueClass, Integer, Float, String].include?(type)
data
elsif type == DateTime
DateTime.rfc3339(data)
elsif type.is_a?(Array)
data.map { |elem| from_json_data(type.first, elem) }
elsif type.is_a?(Hash)
data.transform_values { |elem| from_json_data(type.values.first, elem) }
else
type.from_json_data(data)
end
end
def self.to_json_data(data)
if data.nil? || [TrueClass, FalseClass, Integer, Float, String].include?(data.class)
data
elsif data.is_a?(DateTime)
data.rfc3339
elsif data.is_a?(Array)
data.map { |elem| to_json_data(elem) }
elsif data.is_a?(Hash)
data.transform_values { |elem| to_json_data(elem) }
else
data.to_json_data
end
end
end
このクラスを使って、以下のように書けば、
def order_json
data = Webhook::Order.new
data.id = 1
data.name = 'テスト'
data.notes = 'テストです'
data.to_json_data
end
以下のようなjsonが生成できます!
{
"id": 1,
"name": "テスト",
"notes": "テストです"
}
Golangからjsonを受け取る場合
先程と同様の定義ファイルを使って
{
"properties": {
"id": { "type": "int32" },
"name": { "type": "string" },
"notes": { "type": "string", "nullable": true }
}
}
Goの生成コマンドを実行します
jtd-codegen order.jtd.json --go-out . --go-package webhook
そうすると、以下のようなwebhook.go
ファイルが生成されます
// Code generated by jtd-codegen for Go v0.2.1. DO NOT EDIT.
package webhooks
type Order struct {
ID int32 `json:"id"`
Name string `json:"name"`
Notes *string `json:"notes"`
}
これを使って、jsonをアンマーシャルすれば、structとして使えるようになります!
import (
"encoding/json"
"fmt"
)
func convert(json string) {
// JSONデータをOrder構造体にアンマーシャル
var order Order
err := json.Unmarshal([]byte(jsonData), &order)
if err != nil {
fmt.Println("error: ", err)
return
}
}
拡張性高い構成にできた
またインフラの構築にはAWS-SAM を使ってインフラをコード上で管理していました。それによりインフラの管理が簡単になり、追加や修正がし易い構成になっています。
以下の様なtemplate.yamlを記述してsam deploy
を実行すれば、最低限のSNS+SQS+Lambdaの構成が出来ます
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: AWS SAM SNS, SQS, and Lambda Example
Resources:
MySNSTopic:
Type: 'AWS::SNS::Topic'
Properties:
DisplayName: MySNSTopic
Subscription:
- Protocol: sqs
Endpoint: !GetAtt MySQSQueue.Arn
MySQSQueue:
Type: 'AWS::SQS::Queue'
Properties:
QueueName: MySQSQueue
MyLambdaFunction:
Type: 'AWS::Serverless::Function'
Properties:
Handler: hello-world
Runtime: go1.x
CodeUri: ./
FunctionName: MyLambdaFunction
MemorySize: 128
Timeout: 5
Environment:
Variables:
SQS_QUEUE_URL: !GetAtt MySQSQueue.QueueUrl
Events:
MySNSTopicEvent:
Type: SNS
Properties:
Topic: !Ref MySNSTopic
以下の図のように、ハコベル側で連携したいサービスが増えた場合や、連携したい他の会社のシステムが発生した場合でも、できるだけ変更しやすいインフラ構成や実装にできたと思います。
例えばハコベル側で連携したいサービスが増えた場合
- JSON Type Definitionのスキーマ定義に追加する
- 増やすサービス側でSNSにメッセージを送る処理を足す
- Lambda側で増えたサービスからのリクエストを変換する処理を足す
ステップを踏めばよさそうですし、連携したいユーザーが増えた場合では
- OpenAPIに新しい連携先のスキーマを追加
- 新しいLambda関数の追加
とすれば増やせます。
自動生成しているので、スキーマから直していけばコードの修正は最小限に済ませられます👏
辛かったこと
ローカルだけで疎通確認しながら開発するのが難しい
lambdaだけの実行であればsam local invokeというコマンドを使って、ローカルでlambda関数を動かすことができます。ですが今回のように、別サーバーで動いているアプリケーションからSNSにメッセージを送る→それをトリガーにlambdaを動かす、というような疎通確認をすべてローカルで完結するのは難しかったです。
localstack というものを使えばDockerでAWS環境をエミュレートできるみたいですが、これの構築自体にそこそこ手間がかかりそうだったのでそこまではやりませんでした。
かわりに開発環境用のAWSアカウントを用意して、疎通確認したい場合はそこにデプロイしていました。samなのでデプロイコマンドを叩けばすぐ実行はできるものの、毎回その作業をやるのは少し辛かったです。
処理に失敗したときの考慮
lambdaの処理に失敗するパターンとして
- lambdaのコード内にバグがありエラーが発生する
- webhookの送信先でエラーが発生する
が挙げられます。このうち1の場合はバグとして検知したいので、エラーを起こしてハコベルのエンジニアが検知する必要があります。
2の場合はwebhookの送信先で500エラーなどが発生し、webhookの取り込みに失敗する場合です。この場合はlambda起因のエラーではなく再度通知を送る必要があるため、処理のリトライが必要です。
その機構の実現のために、SQSのvisibility timeout(可視性タイムアウト) を設定して対策しました。これによりlambdaがメッセージを受信できなくなる期間を設定できるので、一定時間後にlambdaの処理をリトライできます。
リトライ期間の設定にはExponential Backoffを使いました。リトライ回数を増やす度にリトライ期間を指数関数的に増やすことで、無駄なリトライ処理を行わないようにしました。
参考: https://qiita.com/po3rin/items/c80dea298f16a2625dbe
まとめ
AWS SNS, SQS, Lambdaを使ったWebhookシステムの構成を紹介しました。また、OpenAPIとJSON Type Definitionを使ってスキーマ駆動で開発することで、保守しやすい仕組みを作ることが出来ました。
是非参考にしてみてください!
「物流の次を発明する」をミッションに物流のシェアリングプラットフォームを運営する、ハコベル株式会社 開発チームのテックブログです! 【エンジニア積極採用中】t.hacobell.com/blog/career
Discussion