Go実装のAWS LambdaでSQSを使う
はじめに
AWS LambdaのランタイムとしてGoを採用した上でAmazon SQS (Simple Queue Service) を使う機会があったので、その際のコードの書き方を簡単にまとめました。
トリガーの設定等、コード以外の部分は別の資料を参照してください。
また、記事の最後に「GoでLambdaってどうなん?」という話を書いたりもしています。
SQSへのメッセージ挿入をトリガーにLambdaを起動する
まず、実行したい処理を関数として実装します。今回は Handler
という名前です。
main関数内で lambda.Start(Handler)
とすることでLambdaで実行可能となります。
このとき、SQSトリガーを設定した上でHandler関数の引数を context.Context
、events.SQSEvent
とすることで、SQSに挿入されたメッセージをGoのプログラムで受け取れます。
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error
imports (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
// 何かしらの処理
}
func main() {
lambda.Start(Handler)
}
SQSからメッセージを取り出す
メッセージ本体は sqsEvent.Records[].Body
に保存されています。
JSONで受け取って以下のような Input
という構造体にマッピングする例を示します。
type struct Input {
ID int64
Name string
}
for _, record := range sqsEvent.Records {
b := []byte(record.Body)
var input Input
err = json.Unmarshal(b, &input)
if err != nil {
return err
}
// 何かしらの処理
}
SQSにメッセージを送信する
今度は、トリガーとなるSQSキューとは別のキューにメッセージを送信したいときの書き方です。
準備
キューURLの用意と、SQSクライアントの作成をします。
キューURLを環境変数として渡す場合の例を書きます。
// 環境変数からキューURLを取得
queueURL := os.Getenv("QUEUE_URL")
// AWSのセッションを作成
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
}))
// SQSのクライアントを作成
sqsSvc := sqs.New(sess)
キューURLは長いので、キュー名を渡しておくこともできます。
この場合、プログラム内でキュー名からキューURLを取得します。
キュー名で直接送ることはできないようです。キュー名は特定AWSアカウントの特定リージョン内でしかユニークでないためです。
// 環境変数からキューURLを取得
queueName := os.Getenv("QUEUE_NAME")
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
}))
sqsSvc := sqs.New(sess)
// キュー名からキューURLを取得
result, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return err
}
queueURL := result.QueueUrl
SQSにメッセージを1件送信する
以下は、送りたいメッセージ msg Message
(構造体) をJSON文字列化してキューに送信する例です。
まず、sqs.SendMessageInput
を作成します。
このとき、この構造体の MessageBody
フィールドに送りたいメッセージを、QueueUrl
フィールドに送り先となるキューのURLを指定します。
その後、これを引数に sqsSvc.SendMessage()
すると送信できます。 sqsSvc
と queueURL
は準備のときに作成しておいたものです。
// この処理の前に msg Message という構造体が作られているとする
// メッセージをJSON化
msgJson, err := json.Marshal(msg)
if err != nil {
return err
}
// SQSに送信 (sqsSvc, queueURLは準備のときに作成したもの)
_, err = sqsSvc.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(string(msgJson)),
QueueUrl: queueURL,
})
if err != nil {
return err
}
SQSにメッセージをバッチとしてまとめて送信する
メッセージがスライスで与えられ、1回のリクエストでまとめて送りたいという状況も考えられます。
こちらの方が速度的にも有利です。
すでにJSON文字列化されたメッセージのスライス msgJsons []string
が与えられたとして、それらをバッチ送信するコードが以下です。
1件だけ送信する場合とほとんど同じで、sqs.SendMessageBatchInput
を引数として sqsSvc.SendMessageBatch()
を実行することでバッチ送信ができます。
違う部分としては、sqs.SendMessageBatchInput
のフィールドに MessageBody
ではなく、Entries
を持ちます。ざっくり言うと、メッセージが複数なので、スライスになっているだけと考えて問題ありません。
// この処理の前に msgJsons []string というスライスが作られているとする
var entries []*sqs.SendMessageBatchRequestEntry
entries = make([]*sqs.SendMessageBatchRequestEntry, len(msgJsons))
for i, msgJson := range msgJsons {
entries[i] = &sqs.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(i)),
MessageBody: aws.String(msgJson),
}
}
// メッセージを送信 (sqsSvc, queueURLは準備のときに作成したもの)
return sqsSvc.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: queueUrl,
})
なお、sqsSvc.SendMessageBatch
の戻り値である sqs.SendMessageBatchOutput
を使うことで、バッチ送信した際に一部だけ失敗した場合のハンドリングが可能ですが、本記事では扱いません。
SQSメッセージをトリガーにLambdaを起動し、他のSQSにメッセージを送るコードの全体像
- 準備する
- メッセージを受け取る (
Input
にマッピング) - 何かしらの処理をする (
Input
を受け取ってOutput
を返す) - メッセージを送る (
Output
をJSON化して送信)
という一連の処理の全体像が以下です。
メッセージの送信は、バッチ送信ではなく、1件ずつ送る方法で書いています。
imports (
"context"
"json"
"os"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
// Input SQSから受け取るデータ
type struct Input {
ID int64 `json:"id"`
Name string `json:"name"`
}
// Output 次のSQSに送るデータ
type struct Output {
ID int64
Email string
}
// doSomething 何かしらの処理をする関数
func doSomething(input Input) Output {
// 何かする
}
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
// 環境変数からキューURLを取得
queueURL := os.Getenv("QUEUE_URL")
// AWSのセッションを作成
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
}))
// SQSのクライアントを作成
sqsSvc := sqs.New(sess)
// レコード (メッセージ) 単位でループ
for _, record := range sqsEvent.Records {
// インプットメッセージを受け取って構造体にマッピング
b := []byte(record.Body)
var input Input
err = json.Unmarshal(b, &input)
if err != nil {
return err
}
// 何かしらの処理
output := doSomething
// アウトプットメッセージをJSON化
outputJson, err := json.Marshal(output)
if err != nil {
return err
}
// 別のSQSに送信
_, err = sqsSvc.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(string(outputJson)),
QueueUrl: queueURL,
})
if err != nil {
return err
}
}
}
func main() {
lambda.Start(Handler)
}
GoでLambdaってどうなん?
LambdaといえばNode.jsかPythonで書くのが一般的だと思いますが、Goで実装するのも悪くないです。
やはり静的型付け言語は書きやすいです。
とはいえ、Lambdaを使うタイミングではインスタントに素早く実装したいことも多いと思うので、自分が慣れた言語を使えばいいと思います。
Goを使うメリットとして、ランタイムのバージョンがひとつしかないことが挙げられます。2022年10月現在、LambdaのGoのランタイムは Go 1.x
のみです。
そのため、頻繁にアップデートする必要がありません。Goの後方互換性のおかげですね。
参考までに、現在Node.jsは12, 14, 16、Pythonは3.7, 3.8, 3.9のランタイムが提供されており、マイナーバージョンが更新される度に別のランタイムとして扱われます。
デメリットは、各種サポートがメジャー言語から若干遅れることです。
弊社ではモニタリングにDatadogを使っています。DatadogでLambdaのログを収集する場合、LambdaのExtentionを使用すると簡単です。しかしながら、現在Node.js版とPython版しか提供されていません。
Discussion