🍣
GolangでSQSのメッセージを取得する
キューの取得
- 特に考えることはなく
config.LoadDefaultConfig()
とsqs.NewFromConfig()
で取得できる
package main
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"log"
)
func main() {
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatalf("%v\n", err)
}
log.Printf("%v\n", sqs.NewFromConfig(cfg))
}
メッセージの取得
- ReceiveMessage()で取得
- 一度に最大10件取得できる
package main
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"log"
"os"
)
const (
maxNumberOfMessages = 10
)
type queue struct {
client *sqs.Client
url string
}
func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
QueueUrl: &q.url,
MaxNumberOfMessages: maxNumberOfMessages,
})
if err != nil {
return []types.Message{}, err
}
return msgResult.Messages, nil
}
func newQueue(ctx context.Context, url string) (*queue, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return &queue{}, err
}
q := &queue{url: url, client: sqs.NewFromConfig(cfg)}
return q, nil
}
func main() {
ctx := context.Background()
queueURL := os.Getenv("QUEUE_URL")
queue, err := newQueue(ctx, queueURL)
if err != nil {
log.Fatalf("%v\n", err)
}
messages, err := queue.fetchMessages(ctx)
if err != nil {
log.Fatalf("%v\n", err)
}
log.Printf("%v\n", messages)
}
- 権限
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage"
],
"Resource": "*"
}
]
}
メッセージの削除
- DeleteMessage()で1件のメッセージを削除
- メッセージを削除するにはメッセージのReceiptHandleが必要
- ReceiptHandleを知るにはメッセージを取得する必要がある
- つまり、メッセージを削除するには予めメッセージを取得する必要がある
- 例
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
- DeleteMessageBatch()で最大10件のメッセージの削除
- DeleteMessageBatch()を使用する場合はReceiptHandle以外にIdが必要
- 今回はMessageに含まれるMessageのIdを利用している
package main
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"log"
"os"
)
const (
maxNumberOfMessages = 10
)
type queue struct {
client *sqs.Client
url string
}
func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
QueueUrl: &q.url,
MaxNumberOfMessages: maxNumberOfMessages,
})
if err != nil {
return []types.Message{}, err
}
return msgResult.Messages, nil
}
func (q *queue) deleteMessages(ctx context.Context, messages []types.Message) error {
entries := make([]types.DeleteMessageBatchRequestEntry, 0, len(messages))
for _, message := range messages {
entries = append(entries, types.DeleteMessageBatchRequestEntry{
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
})
}
_, err := q.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
Entries: entries,
QueueUrl: &q.url,
})
return err
}
func newQueue(ctx context.Context, url string) (*queue, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return &queue{}, err
}
q := &queue{url: url, client: sqs.NewFromConfig(cfg)}
return q, nil
}
func main() {
ctx := context.Background()
queueURL := os.Getenv("QUEUE_URL")
queue, err := newQueue(ctx, queueURL)
if err != nil {
log.Fatalf("%v\n", err)
}
messages, err := queue.fetchMessages(ctx)
if err != nil {
log.Fatalf("%v\n", err)
}
log.Printf("%v\n", messages)
if err := queue.deleteMessages(ctx, messages); err != nil {
log.Fatalf("%v\n", err)
} else {
log.Println("Messages deleted.")
}
}
- 権限
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage"
],
"Resource": "*"
}
]
}
メッセージの全件取得と全件削除
- メッセージは一度に最大10件取得できます
- 新しく11件以降のメッセージを取得するには先に取得したメッセージを削除する必要があります
- よって、全件取得するにも全件削除するにも都度10件取得と10件削除を繰り返す必要があります
package main
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"log"
"os"
)
const (
maxNumberOfMessages = 10
)
type queue struct {
client *sqs.Client
url string
}
func (q *queue) fetchMessages(ctx context.Context) ([]types.Message, error) {
msgResult, err := q.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
QueueUrl: &q.url,
MaxNumberOfMessages: maxNumberOfMessages,
})
if err != nil {
return []types.Message{}, err
}
return msgResult.Messages, nil
}
func (q *queue) deleteMessages(ctx context.Context, messages []types.Message) error {
entries := make([]types.DeleteMessageBatchRequestEntry, 0, len(messages))
for _, message := range messages {
entries = append(entries, types.DeleteMessageBatchRequestEntry{
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
})
}
_, err := q.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
Entries: entries,
QueueUrl: &q.url,
})
return err
}
func (q *queue) fetchMessagesAll(ctx context.Context) ([]types.Message, error) {
messages, err := q.fetchMessages(ctx)
if err != nil {
return []types.Message{}, err
}
messagesAll := messages
if err := q.deleteMessages(ctx, messages); err != nil {
return []types.Message{}, err
}
for len(messages) == maxNumberOfMessages {
messages, err = q.fetchMessages(ctx)
if err != nil {
return []types.Message{}, err
}
messagesAll = append(messagesAll, messages...)
if err := q.deleteMessages(ctx, messages); err != nil {
return []types.Message{}, err
}
}
return messagesAll, err
}
func newQueue(ctx context.Context, url string) (*queue, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return &queue{}, err
}
q := &queue{url: url, client: sqs.NewFromConfig(cfg)}
return q, nil
}
func main() {
ctx := context.Background()
queueURL := os.Getenv("QUEUE_URL")
queue, err := newQueue(ctx, queueURL)
if err != nil {
log.Fatalf("%v\n", err)
}
messages, err := queue.fetchMessagesAll(ctx)
if err != nil {
log.Fatalf("%v\n", err)
}
log.Printf("%v\n", messages)
}
Discussion