【Go】RabbitMQを使用したメッセージング【RabbitMQ】

2023/03/15に公開

AMQP 0.9.1 プロトコル

詳しくは以下RabbitMQチュートリアルにまとまっています。

https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model

概要

AMQP 0.9.1では、メッセージは PublisherConsumerの間でやり取りされます。

  • Publisher: メッセージを作成してキューに送信する
  • Consumer: そのキューからメッセージを受信する

PublisherConsumerが同時にキューにアクセスすることができるため、複数のクライアントが同時にメッセージを送受信できます。

Exchenge

加えて、Exchengeと呼ばれる概念も登場します。このメッセージ交換手(交換機?)は言わばルーティングのような役割を持っています。Publisherが送信したメッセージはまず Exchengeが受け取り、どのキューに紐づけるかを決定します。

Exchangeはプロデューサとキューの間にあるため、プロデューサはキューの存在や位置を気にせずにメッセージを送信できます。これにより、システム全体が柔軟性とスケーラビリティを獲得し、コンポーネント間の疎結合が実現されます。

ちなみに、RabbitMQでは、いくつかのExchangeタイプ(Direct, Fanout, Topic, Headers)が提供されており、それぞれが異なる配信ポリシーを実現します。アプリケーションの要件に応じて、メッセージを適切なキューにルーティングする方法を選択できます。

特に、Fanoutの場合、送信されたメッセージをすべてのバインドされたキューにコピーすることができます。これにより、同じメッセージを複数のコンシューマに配信することが容易になります。

実装

https://github.com/rabbitmq/amqp091-go

Consumerの作成とExchengeの設定

listener-service/consumer
import (
	...
	amqp "github.com/rabbitmq/amqp091-go"
)

type Consumer struct {
	conn *amqp.Connection
	queueName string // 
}

func NewConsumer(conn *amqp.Connection) (Consumer, error) {
	consumer := Consumer{
		conn: conn,
	}

	err := consumer.setup()
	if err != nil {
		return Consumer{}, err
	}

	return consumer, nil
}


func (consumer *Consumer) setup() error {
	channel, err := consumer.conn.Channel()
	if err != nil {
		return err
	}

	return declareExchange(channel)
}

Publisherの作成とメッセージ送信

※ もしExchengeがなければ新たに作成する必要があるため、送信側でもExchangeを設定。

hoge-service/publisher
import (
	...
	amqp "github.com/rabbitmq/amqp091-go"
)

type Publisher struct {
	conn *amqp.Connection
}

func NewEventPublisher(conn *amqp.Connection) (Publisher, error) {
	p := Publisher{
		conn: conn,
	}

	err := p.setup()
	if err != nil {
		return Publisher{}, err
	}

	return p, nil
}

func (p *Publisher) setup() error {
	channel, err := p.conn.Channel()
	if err != nil {
		return err
	}

	defer channel.Close()

	return declareExchange(channel)
}

func (p *Publisher) Push(event string, severity string) error {
	channel, err := p.conn.Channel()
	if err != nil {
		return err
	}

	defer channel.Close()

	log.Println("Pushing event to queue channel")

	err = channel.Publish(
		"logs_topic", // exchange
		severity,     // routing key
		false,        // mandatory
		false,        // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(event),
		},
	)
	if err != nil {
		return err
	}

	return nil
}

キューの購読

listener-service/consumer
type Payload struct {
	Name string `json:"name"`
	Data string `json:"data"`
}

func (consumer *Consumer) Listen(topics []string) error {
	ch, err := consumer.conn.Channel()
	if err != nil {
		return err
	}
	defer ch.Close()

	q, err := declareRandomQueue(ch)
	if err != nil {
		return err
	}

	// bind the queue to the topics
	for _, s := range topics {
		ch.QueueBind(
			q.Name,       // queue name
			s, 	          // routing key
			"logs_topic", // exchange
			false,        // no-wait
			nil,          // arguments
		)
	}

	messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		return err
	}

	forever := make(chan bool) // this is a blocking channel
	go func() {
		for d := range messages {
			log.Printf("Received a message: %s", d.Body)
			var payload Payload
			_ = json.Unmarshal(d.Body, &payload)

			go handlePayload(payload)
		}
	}()

	fmt.Printf("Waiting for messages [Exchange, Queue] [logs_topic, %s]]", q.Name)

	<-forever
	return nil
}

func handlePayload(payload Payload) {
	fmt.Println("Handling payload: ", payload)
	switch payload.Name {
		case "log", "event":
			err := logEvent(payload)
			if err != nil {
				fmt.Println("Error logging event: ", err)
			}
		case "auth":
			// do something
		case ...
			// another thing

		default:
			err := logEvent(payload)
			if err != nil {
				fmt.Println("Error logging event: ", err)
			}
	}
}

Discussion