🐭

Go実装のAWS LambdaでSQSを使う

2022/10/23に公開約5,800字

はじめに

AWS LambdaのランタイムとしてGoを採用した上でAmazon SQS (Simple Queue Service) を使う機会があったので、その際のコードの書き方を簡単にまとめました。

トリガーの設定等、コード以外の部分は別の資料を参照してください。

また、記事の最後に「GoでLambdaってどうなん?」という話を書いたりもしています。

SQSへのメッセージ挿入をトリガーにLambdaを起動する

まず、実行したい処理を関数として実装します。今回は Handler という名前です。
main関数内で lambda.Start(Handler) とすることでLambdaで実行可能となります。

このとき、SQSトリガーを設定した上でHandler関数の引数を context.Contextevents.SQSEvent とすることで、SQSに挿入されたメッセージをGoのプログラムで受け取れます。
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error

imports (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
)

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	// 何かしらの処理
}

func main() {
	lambda.Start(Handler)
}

SQSからメッセージを取り出す

メッセージ本体は sqsEvent.Records[].Body に保存されています。
JSONで受け取って以下のような Input という構造体にマッピングする例を示します。

type struct Input {
	ID   int64
	Name string
}
	for _, record := range sqsEvent.Records {
		b := []byte(record.Body)
		var input Input
		err = json.Unmarshal(b, &input)
		if err != nil {
			return err
		}
		// 何かしらの処理
	}

SQSにメッセージを送信する

今度は、トリガーとなるSQSキューとは別のキューにメッセージを送信したいときの書き方です。

準備

キューURLの用意と、SQSクライアントの作成をします。

キューURLを環境変数として渡す場合の例を書きます。

	// 環境変数からキューURLを取得
	queueURL := os.Getenv("QUEUE_URL")
	// AWSのセッションを作成
	sess := session.Must(session.NewSession(&aws.Config{
		Region: aws.String("ap-northeast-1"),
	}))
	// SQSのクライアントを作成
	sqsSvc := sqs.New(sess)

キューURLは長いので、キュー名を渡しておくこともできます。
この場合、プログラム内でキュー名からキューURLを取得します。
キュー名で直接送ることはできないようです。キュー名は特定AWSアカウントの特定リージョン内でしかユニークでないためです。

	// 環境変数からキューURLを取得
	queueName := os.Getenv("QUEUE_NAME")
	sess := session.Must(session.NewSession(&aws.Config{
		Region: aws.String("ap-northeast-1"),
	}))
	sqsSvc := sqs.New(sess)
	// キュー名からキューURLを取得
	result, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{
		QueueName: aws.String(queueName),
	})
	if err != nil {
		return err
	}
	queueURL := result.QueueUrl

SQSにメッセージを1件送信する

以下は、送りたいメッセージ msg Message (構造体) をJSON文字列化してキューに送信する例です。

まず、sqs.SendMessageInput を作成します。
このとき、この構造体の MessageBody フィールドに送りたいメッセージを、QueueUrl フィールドに送り先となるキューのURLを指定します。

その後、これを引数に sqsSvc.SendMessage() すると送信できます。 sqsSvcqueueURL は準備のときに作成しておいたものです。

	// この処理の前に msg Message という構造体が作られているとする
	
	// メッセージをJSON化
	msgJson, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	// SQSに送信 (sqsSvc, queueURLは準備のときに作成したもの)
	_, err = sqsSvc.SendMessage(&sqs.SendMessageInput{
		MessageBody: aws.String(string(msgJson)),
		QueueUrl:    queueURL,
	})
	if err != nil {
		return err
	}

SQSにメッセージをバッチとしてまとめて送信する

メッセージがスライスで与えられ、1回のリクエストでまとめて送りたいという状況も考えられます。
こちらの方が速度的にも有利です。

すでにJSON文字列化されたメッセージのスライス msgJsons []string が与えられたとして、それらをバッチ送信するコードが以下です。

1件だけ送信する場合とほとんど同じで、sqs.SendMessageBatchInput を引数として sqsSvc.SendMessageBatch() を実行することでバッチ送信ができます。

違う部分としては、sqs.SendMessageBatchInput のフィールドに MessageBody ではなく、Entries を持ちます。ざっくり言うと、メッセージが複数なので、スライスになっているだけと考えて問題ありません。

	// この処理の前に msgJsons []string というスライスが作られているとする
	
	var entries []*sqs.SendMessageBatchRequestEntry
	entries = make([]*sqs.SendMessageBatchRequestEntry, len(msgJsons))

	for i, msgJson := range msgJsons {
		entries[i] = &sqs.SendMessageBatchRequestEntry{
			Id:          aws.String(strconv.Itoa(i)),
			MessageBody: aws.String(msgJson),
		}
	}
	// メッセージを送信 (sqsSvc, queueURLは準備のときに作成したもの)
	return sqsSvc.SendMessageBatch(&sqs.SendMessageBatchInput{
		Entries:  entries,
		QueueUrl: queueUrl,
	})

なお、sqsSvc.SendMessageBatch の戻り値である sqs.SendMessageBatchOutput を使うことで、バッチ送信した際に一部だけ失敗した場合のハンドリングが可能ですが、本記事では扱いません。

SQSメッセージをトリガーにLambdaを起動し、他のSQSにメッセージを送るコードの全体像

  1. 準備する
  2. メッセージを受け取る (Input にマッピング)
  3. 何かしらの処理をする (Input を受け取って Output を返す)
  4. メッセージを送る (Output をJSON化して送信)

という一連の処理の全体像が以下です。

メッセージの送信は、バッチ送信ではなく、1件ずつ送る方法で書いています。

imports (
	"context"
	"json"
	"os"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
)

// Input SQSから受け取るデータ
type struct Input {
	ID   int64  `json:"id"`
	Name string `json:"name"`
}

// Output 次のSQSに送るデータ
type struct Output {
	ID    int64
	Email string
}

// doSomething 何かしらの処理をする関数
func doSomething(input Input) Output {
	// 何かする
}

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	// 環境変数からキューURLを取得
	queueURL := os.Getenv("QUEUE_URL")
	// AWSのセッションを作成
	sess := session.Must(session.NewSession(&aws.Config{
		Region: aws.String("ap-northeast-1"),
	}))
	// SQSのクライアントを作成
	sqsSvc := sqs.New(sess)

	// レコード (メッセージ) 単位でループ
	for _, record := range sqsEvent.Records {
		// インプットメッセージを受け取って構造体にマッピング
		b := []byte(record.Body)
		var input Input
		err = json.Unmarshal(b, &input)
		if err != nil {
			return err
		}
		
		// 何かしらの処理
		output := doSomething

		// アウトプットメッセージをJSON化
		outputJson, err := json.Marshal(output)
		if err != nil {
			return err
		}
		// 別のSQSに送信
		_, err = sqsSvc.SendMessage(&sqs.SendMessageInput{
			MessageBody: aws.String(string(outputJson)),
			QueueUrl:    queueURL,
		})
		if err != nil {
			return err
		}
	}
}

func main() {
	lambda.Start(Handler)
}

GoでLambdaってどうなん?

LambdaといえばNode.jsかPythonで書くのが一般的だと思いますが、Goで実装するのも悪くないです。
やはり静的型付け言語は書きやすいです。
とはいえ、Lambdaを使うタイミングではインスタントに素早く実装したいことも多いと思うので、自分が慣れた言語を使えばいいと思います。

Goを使うメリットとして、ランタイムのバージョンがひとつしかないことが挙げられます。2022年10月現在、LambdaのGoのランタイムは Go 1.x のみです。
そのため、頻繁にアップデートする必要がありません。Goの後方互換性のおかげですね。

参考までに、現在Node.jsは12, 14, 16、Pythonは3.7, 3.8, 3.9のランタイムが提供されており、マイナーバージョンが更新される度に別のランタイムとして扱われます。

デメリットは、各種サポートがメジャー言語から若干遅れることです。
弊社ではモニタリングにDatadogを使っています。DatadogでLambdaのログを収集する場合、LambdaのExtentionを使用すると簡単です。しかしながら、現在Node.js版とPython版しか提供されていません。

Discussion

ログインするとコメントできます