⚡
【Go】RabbitMQを使用したメッセージング【RabbitMQ】
AMQP 0.9.1 プロトコル
詳しくは以下RabbitMQチュートリアルにまとまっています。
概要
AMQP 0.9.1では、メッセージは Publisher
とConsumer
の間でやり取りされます。
-
Publisher
: メッセージを作成してキューに送信する -
Consumer
: そのキューからメッセージを受信する
Publisher
とConsumer
が同時にキューにアクセスすることができるため、複数のクライアントが同時にメッセージを送受信できます。
Exchenge
加えて、Exchenge
と呼ばれる概念も登場します。このメッセージ交換手(交換機?)は言わばルーティングのような役割を持っています。Publisher
が送信したメッセージはまず Exchenge
が受け取り、どのキューに紐づけるかを決定します。
Exchangeはプロデューサとキューの間にあるため、プロデューサはキューの存在や位置を気にせずにメッセージを送信できます。これにより、システム全体が柔軟性とスケーラビリティを獲得し、コンポーネント間の疎結合が実現されます。
ちなみに、RabbitMQでは、いくつかのExchangeタイプ(Direct, Fanout, Topic, Headers)が提供されており、それぞれが異なる配信ポリシーを実現します。アプリケーションの要件に応じて、メッセージを適切なキューにルーティングする方法を選択できます。
特に、Fanoutの場合、送信されたメッセージをすべてのバインドされたキューにコピーすることができます。これにより、同じメッセージを複数のコンシューマに配信することが容易になります。
実装
Consumer
の作成とExchenge
の設定
listener-service/consumer
import (
...
amqp "github.com/rabbitmq/amqp091-go"
)
type Consumer struct {
conn *amqp.Connection
queueName string //
}
func NewConsumer(conn *amqp.Connection) (Consumer, error) {
consumer := Consumer{
conn: conn,
}
err := consumer.setup()
if err != nil {
return Consumer{}, err
}
return consumer, nil
}
func (consumer *Consumer) setup() error {
channel, err := consumer.conn.Channel()
if err != nil {
return err
}
return declareExchange(channel)
}
Publisher
の作成とメッセージ送信
※ もしExchenge
がなければ新たに作成する必要があるため、送信側でもExchange
を設定。
hoge-service/publisher
import (
...
amqp "github.com/rabbitmq/amqp091-go"
)
type Publisher struct {
conn *amqp.Connection
}
func NewEventPublisher(conn *amqp.Connection) (Publisher, error) {
p := Publisher{
conn: conn,
}
err := p.setup()
if err != nil {
return Publisher{}, err
}
return p, nil
}
func (p *Publisher) setup() error {
channel, err := p.conn.Channel()
if err != nil {
return err
}
defer channel.Close()
return declareExchange(channel)
}
func (p *Publisher) Push(event string, severity string) error {
channel, err := p.conn.Channel()
if err != nil {
return err
}
defer channel.Close()
log.Println("Pushing event to queue channel")
err = channel.Publish(
"logs_topic", // exchange
severity, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(event),
},
)
if err != nil {
return err
}
return nil
}
キューの購読
listener-service/consumer
type Payload struct {
Name string `json:"name"`
Data string `json:"data"`
}
func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
q, err := declareRandomQueue(ch)
if err != nil {
return err
}
// bind the queue to the topics
for _, s := range topics {
ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false, // no-wait
nil, // arguments
)
}
messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return err
}
forever := make(chan bool) // this is a blocking channel
go func() {
for d := range messages {
log.Printf("Received a message: %s", d.Body)
var payload Payload
_ = json.Unmarshal(d.Body, &payload)
go handlePayload(payload)
}
}()
fmt.Printf("Waiting for messages [Exchange, Queue] [logs_topic, %s]]", q.Name)
<-forever
return nil
}
func handlePayload(payload Payload) {
fmt.Println("Handling payload: ", payload)
switch payload.Name {
case "log", "event":
err := logEvent(payload)
if err != nil {
fmt.Println("Error logging event: ", err)
}
case "auth":
// do something
case ...
// another thing
default:
err := logEvent(payload)
if err != nil {
fmt.Println("Error logging event: ", err)
}
}
}
Discussion