🍣

GolangでSQSのメッセージを取得する

2023/12/10に公開

キューの取得

  • 特に考えることはなくconfig.LoadDefaultConfig()sqs.NewFromConfig()で取得できる
package main

import (
	"context"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"log"
)

func main() {
	ctx := context.Background()

	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("%v\n", err)
	}
	log.Printf("%v\n", sqs.NewFromConfig(cfg))
}

メッセージの取得

  • ReceiveMessage()で取得
  • 一度に最大10件取得できる
package main

import (
	"context"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"log"
	"os"
)

const (
	maxNumberOfMessages = 10
)

type queue struct {
	client *sqs.Client
	url    string
}

func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
	msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
		MessageAttributeNames: []string{
			string(types.QueueAttributeNameAll),
		},
		QueueUrl:            &q.url,
		MaxNumberOfMessages: maxNumberOfMessages,
	})
	if err != nil {
		return []types.Message{}, err
	}

	return msgResult.Messages, nil
}

func newQueue(ctx context.Context, url string) (*queue, error) {
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		return &queue{}, err
	}
	q := &queue{url: url, client: sqs.NewFromConfig(cfg)}

	return q, nil
}

func main() {
	ctx := context.Background()

	queueURL := os.Getenv("QUEUE_URL")
	queue, err := newQueue(ctx, queueURL)
	if err != nil {
		log.Fatalf("%v\n", err)
	}

	messages, err := queue.fetchMessages(ctx)
	if err != nil {
		log.Fatalf("%v\n", err)
	}
	log.Printf("%v\n", messages)
}
  • 権限
{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Sid": "",
           "Effect": "Allow",
           "Action": [
              "sqs:ReceiveMessage"
           ],
           "Resource": "*"
       }
   ]
}

メッセージの削除

  • DeleteMessage()で1件のメッセージを削除
  • メッセージを削除するにはメッセージのReceiptHandleが必要
    • ReceiptHandleを知るにはメッセージを取得する必要がある
    • つまり、メッセージを削除するには予めメッセージを取得する必要がある
      • MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
  • DeleteMessageBatch()で最大10件のメッセージの削除
    • DeleteMessageBatch()を使用する場合はReceiptHandle以外にIdが必要
    • 今回はMessageに含まれるMessageのIdを利用している
package main

import (
	"context"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"log"
	"os"
)

const (
	maxNumberOfMessages = 10
)

type queue struct {
	client *sqs.Client
	url    string
}

func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
	msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
		MessageAttributeNames: []string{
			string(types.QueueAttributeNameAll),
		},
		QueueUrl:            &q.url,
		MaxNumberOfMessages: maxNumberOfMessages,
	})
	if err != nil {
		return []types.Message{}, err
	}

	return msgResult.Messages, nil
}

func (q *queue) deleteMessages(ctx context.Context, messages []types.Message) error {
	entries := make([]types.DeleteMessageBatchRequestEntry, 0, len(messages))
	for _, message := range messages {
		entries = append(entries, types.DeleteMessageBatchRequestEntry{
			Id:            message.MessageId,
			ReceiptHandle: message.ReceiptHandle,
		})
	}
	_, err := q.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
		Entries:  entries,
		QueueUrl: &q.url,
	})

	return err
}

func newQueue(ctx context.Context, url string) (*queue, error) {
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		return &queue{}, err
	}
	q := &queue{url: url, client: sqs.NewFromConfig(cfg)}

	return q, nil
}

func main() {
	ctx := context.Background()

	queueURL := os.Getenv("QUEUE_URL")
	queue, err := newQueue(ctx, queueURL)
	if err != nil {
		log.Fatalf("%v\n", err)
	}

	messages, err := queue.fetchMessages(ctx)
	if err != nil {
		log.Fatalf("%v\n", err)
	}
	log.Printf("%v\n", messages)

	if err := queue.deleteMessages(ctx, messages); err != nil {
		log.Fatalf("%v\n", err)
	} else {
	  log.Println("Messages deleted.")
	}
}
  • 権限
{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Sid": "",
           "Effect": "Allow",
           "Action": [
              "sqs:ReceiveMessage",
              "sqs:DeleteMessage"
           ],
           "Resource": "*"
       }
   ]
}

メッセージの全件取得と全件削除

  • メッセージは一度に最大10件取得できます
  • 新しく11件以降のメッセージを取得するには先に取得したメッセージを削除する必要があります
  • よって、全件取得するにも全件削除するにも都度10件取得と10件削除を繰り返す必要があります
package main

import (
	"context"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"log"
	"os"
)

const (
	maxNumberOfMessages = 10
)

type queue struct {
	client *sqs.Client
	url    string
}

func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
	msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
		MessageAttributeNames: []string{
			string(types.QueueAttributeNameAll),
		},
		QueueUrl:            &q.url,
		MaxNumberOfMessages: maxNumberOfMessages,
	})
	if err != nil {
		return []types.Message{}, err
	}

	return msgResult.Messages, nil
}

func (q *queue) deleteMessages(ctx context.Context, messages []types.Message) error {
	entries := make([]types.DeleteMessageBatchRequestEntry, 0, len(messages))
	for _, message := range messages {
		entries = append(entries, types.DeleteMessageBatchRequestEntry{
			Id:            message.MessageId,
			ReceiptHandle: message.ReceiptHandle,
		})
	}
	_, err := q.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
		Entries:  entries,
		QueueUrl: &q.url,
	})

	return err
}

func (q *queue) fetchMessagesAll(ctx context.Context) ([]types.Message, error) {
	messages, err := q.fetchMessages(ctx)
	if err != nil {
		return []types.Message{}, err
	}
	messagesAll := messages

	if err := q.deleteMessages(ctx, messages); err != nil {
		return []types.Message{}, err
	}

	for len(messages) == maxNumberOfMessages {
		messages, err = q.fetchMessages(ctx)
		if err != nil {
			return []types.Message{}, err
		}
		messagesAll = append(messagesAll, messages...)

		if err := q.deleteMessages(ctx, messages); err != nil {
			return []types.Message{}, err
		}
	}

	return messagesAll, err
}

func newQueue(ctx context.Context, url string) (*queue, error) {
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		return &queue{}, err
	}
	q := &queue{url: url, client: sqs.NewFromConfig(cfg)}

	return q, nil
}

func main() {
	ctx := context.Background()

	queueURL := os.Getenv("QUEUE_URL")
	queue, err := newQueue(ctx, queueURL)
	if err != nil {
		log.Fatalf("%v\n", err)
	}

	messages, err := queue.fetchMessagesAll(ctx)
	if err != nil {
		log.Fatalf("%v\n", err)
	}
	log.Printf("%v\n", messages)
}

Discussion