🔦

RabbitMQのTopic Exchangeの使い方を学ぶ

2025/02/21に公開

概要

RabbitMQのTOPICエクスチェンジを使ったメッセージングの設計できるようにするために使い方を纏める。以下経緯。

  1. DDDの適用方法について学習
  2. CQRS+ESという設計手法が色々なところでちらつくのでやってみることに
  3. Event Sourcingに利用するメッセージブローカーとしてBing CopilotからRabbitMQを紹介される
  4. 組み込む過程で手を加えようとしたときに理解が足りていないことがわかる
  5. チュートリアルを実施して理解を深めることに
  6. TOPIC Exchangeをアプリケーションに組み込んでみると手の止まる部分があり、まだ理解が足りていないと感じる
  7. 整理する

RabbitMQ作業環境の用意

docker composeで2つのサービスを起動して、python-devコンテナにvscodeをアタッチしてコーディングとpythonスクリプトの実行を行う。

./compose.yml:

https://github.com/rifumi/try_rabbitmq_pyenv/blob/main/compose.yml

./app/Dockerfile

https://github.com/rifumi/try_rabbitmq_pyenv/blob/main/app/Dockerfile

./app/requirements.txt

https://github.com/rifumi/try_rabbitmq_pyenv/blob/main/app/requirements.txt

./app/.vscode/settings.json

https://github.com/rifumi/try_rabbitmq_pyenv/blob/main/app/.vscode/settings.json

./.env

./compose.yml内で参照する環境変数を設定

RABBITMQ_DEFAULT_USER="user"
RABBITMQ_DEFAULT_PASS="pass"
RABBITMQ_DEFAULT_VHOST="vhost"
RABBITMQ_DEFAULT_HOST="host"

作業のルートパス

ホスト側の./appをコンテナの/appにvolume mountしているのでコンテナを起動してvscodeアタッチしたら/app以下で作業する。利用するリポジトリでは/app/try_rabbitmqをvscodeのワークスペースと想定している。

RabbitMQ->Get Started->Python->Topicsのノート

RabbitMQのPYTHON>TOPICチュートリアルに記載された説明文をダイアグラムを使って表現する

(github上の)ファイル名:CONSTRUCTION_FLOW.mmd

ノードの頭は実装の順番を表している。共通部分はn.ProducerPn.ConsumerCn.。最初に共通部分の処理が必要で、その後にPn又はCnの処理が行われる。

ダイアグラムの元にしたTOPICチュートリアルの文章

ここの説明文がわかりやすいと思い読み進めていたら何故かサンプルコードはこの説明とは別の動きが記述されていた。のが今回の記事が発生したきっかけ(これ概要に書いた方がよかったのだろうか?)。

In this example, we're going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: <celerity>.<colour>.<species>.

We created three bindings: Q1 is bound with binding key *.orange.* and Q2 with *.*.rabbit and lazy.#.

These bindings can be summarised as:

Q1 is interested in all the orange animals.

Q2 wants to hear everything about rabbits, and everything about lazy animals.

A message with a routing key set to quick.orange.rabbit will be delivered to both queues. Message lazy.orange.elephant also will go to both of them. On the other hand quick.orange.fox will only go to the first queue, and lazy.brown.fox only to the second. lazy.pink.rabbit will be delivered to the second queue only once, even though it matches two bindings. quick.brown.fox doesn't match any binding so it will be discarded.

What happens if we break our contract and send a message with one or four words, like orange or quick.orange.new.rabbit? Well, these messages won't match any bindings and will be lost.

On the other hand lazy.orange.new.rabbit, even though it has four words, will match the last binding and will be delivered to the second queue.

引用元.

翻訳された文章(Google翻訳)

この例では、動物についてすべて説明するメッセージを送信します。メッセージは、3 つの単語 (2 つのドット) で構成されるルーティング キーを使用して送信されます。ルーティング キーの最初の単語は、速さ、2 番目は色、3 番目は種を表します: <celerity>.<colour>.<species>

3 つのバインディングを作成しました: Q1 はバインディング キー *.orange.* にバインドされ、Q2 は *.*.rabbitlazy.# にバインドされています。

これらのバインディングは次のようにまとめることができます:

Q1 はオレンジ色の動物すべてに興味があります。

Q2 はウサギに関するすべての情報と、怠け者の動物に関するすべての情報を聞きたいと考えています。

ルーティング キーが quick.orange.rabbit に設定されたメッセージは、両方のキューに配信されます。メッセージ lazy.orange.elephant も、両方のキューに送信されます。一方、quick.orange.fox は最初のキューにのみ送信され、lazy.brown.fox は 2 番目のキューにのみ送信されます。 lazy.pink.rabbit は 2 つのバインディングに一致しますが、2 番目のキューに配信されるのは 1 回だけです。quick.brown.fox はどのバインディングにも一致しないため、破棄されます。

契約を破って、orangequick.orange.new.rabbit のように 1 語または 4 語のメッセージを送信するとどうなるでしょうか。これらのメッセージはどのバインディングにも一致せず、失われます。

一方、lazy.orange.new.rabbit は 4 語ありますが、最後のバインディングに一致し、2 番目のキューに配信されます。

図解(省エネのために)

(github上の)ファイル名:RabbitMQ.Tutorial.Python.Topic.mmd

纏めた図を見ることで流れを把握した。

※mermaid.jsのダイアグラム上でノードに表記されるはずの`*`が消える件

*.orange.*とか*.*.rabbitとかgithubのプレビューともmermaid.liveとも結果が異なりアスタリスクがダイアグラムのノードに表示できない。エスケープしてもだめ。HTMLエンティティでも埋め込めない。アスタリスク2連にすると2連がそのまま表示される。ので諦める。先頭がアスタリスクの場合のみの現象かもしれない。

図をpython+pikaで表現

ディレクトリ構造

# tree
.
├── tutorial_topic
│   ├── __init__.py
│   ├── emit_animal_topic.py
│   ├── receive_orange_animals.py
│   ├── receive_rabbit_or_lazy_animals.py
│   └── start_consuming_target.py
└── utils
    ├── __init__.py
    └── get_connection.py

共通部分のコード:

https://github.com/rifumi/try_rabbitmq/blob/main/utils/get_connection.py

Consumerのコード:

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/start_consuming_target.py

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/receive_orange_animals.py

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/receive_rabbit_or_lazy_animals.py

Producerのコード:

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/emit_animal_topic.py

実行

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/README.md

or

1. orangeの動物に関心を持つConsumerの起動 (Terminal1)

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/receive_orange_animals.py

python receive_orange_animals.py

2. rabbitもしくはlazy animalsに関心を持つConsumerの起動 (Terminal2)

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/receive_rabbit_or_lazy_animals.py

python receive_rabbit_or_lazy_animals.py

3. No.1〜9のメッセージを発行するPublisherの起動(Terminal3)

https://github.com/rifumi/try_rabbitmq/blob/main/tutorial_topic/emit_animal_topic.py

python emit_animal_topic.py

実行結果を確認する

Message発行処理(Publisher)の出力:no.0, 6, 7, 8のMessageは失われる

emitted messages
# emit_animal_topic.pyの実行結果
 To: celerity.color.animal
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 0
}

 To: quick.orange.rabbit
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 1
}

 To: lazy.orange.elephant
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 2
}

 To: quick.orange.fox
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 3
}

 To: lazy.brown.fox
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 4
}

 To: lazy.pink.rabbit
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 5
}

 To: quick.brown.fox
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 6
}

 To: orange
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 7
}

 To: quick.orange.new.rabbit
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 8
}

 To: lazy.orange.new.rabbit
 Sent: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 9
}

orangeの動物に関するMessage消費処理(Consumer)の出力:no. 1, 2, 3の Message の受信を確認

received `orange` animal messages
# receive_orange_animals.pyの実行結果
binding_keys(queue_bind(routing_key=<arg>)): ['*.orange.*']
 [*] Waiting for logs. To exit press CTRL+C
 from: quick.orange.rabbit
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 1
}

 from: lazy.orange.elephant
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 2
}

 from: quick.orange.fox
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 3
}

rabbitまたはlazyな動物に関するMessage消費処理(Consumer)の出力:no. 1, 2, 4, 5, 9の Message の受信を確認

received `rabbit` or `lazy` animal messages
# receive_rabbit_or_lazy_animals.pyの実行結果
binding_keys(queue_bind(routing_key=<arg>)): ['*.*.rabbit', 'lazy.#']
 [*] Waiting for logs. To exit press CTRL+C
 from: quick.orange.rabbit
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 1
}

 from: lazy.orange.elephant
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 2
}

 from: lazy.brown.fox
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 4
}

 from: lazy.pink.rabbit
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 5
}

 from: lazy.orange.new.rabbit
 Receive: {
  "my_desire": [
    "I want some money!",
    "More pizza!"
  ],
  "exchange_name": "topic_animals",
  "no.": 9
}

適用例を考える

EventとQueue、Consumerを1対1にする場合(ex1)

メリット:

  • Queueに対してConsumerが1つしか紐付かないため、処理を把握しやすい

デメリット:

  • Eventが増えるとQueue, Consumerのインスタンスも増えるので、下記が予想される
    • メモリを圧迫する
    • 変更量が多くなる

考察:

スペックが不安なら拡張すればいいと思えなければ採用は難しい

QueueとEntityを1対1にする場合(ex2)

メリット:

  • 前述の設計と比較してメモリ容量が少なく済む

デメリット:

  • Consumerでイベント判定と処理分岐が必要になる->実装が複雑になる->バグ作り込みの可能性を高める
  • 仕事量が多くなると処理の遅延の可能性がある(かもしれない)

考察:

よほど大規模な負荷が想定されなければこちらのパターンの選択が現実的と思われる

ex2を深堀りする

前述のダイアグラム内のエンティティをそれっぽい名前に寄せる:

ex2の実装と実行結果の確認

コード

Consumer
usercircleのConsumerの間で共有するメッセージ受信処理:

https://github.com/rifumi/try_rabbitmq/blob/main/ex/start_consuming_target.py

userメッセージ受信処理:

https://github.com/rifumi/try_rabbitmq/blob/main/ex/receive_user_events.py

circleメッセージ受信処理:

https://github.com/rifumi/try_rabbitmq/blob/main/ex/receive_circle_events.py

Producer

https://github.com/rifumi/try_rabbitmq/blob/main/ex/emit_entity_topic.py

実行

https://github.com/rifumi/try_rabbitmq/blob/main/ex/README.md

or

  1. userメッセージの受信処理を起動
python receive_user_events.py
  1. circleメッセージの受信処理を起動
python receive_circle_events.py
  1. メッセージの発行
python emit_entity_topic.py

実行結果

発行されたメッセージ:
emitted messages
# emit
 To: entity.event
 Sent: {
  "args": {
    "value": "blank"
  },
  "exchange_name": "topic_entity_events",
  "no.": 0
}

 To: user.created
 Sent: {
  "args": {
    "userId": "12345",
    "userName": "John Doe",
    "email": "john.doe@example.com",
    "timestamp": "2025-02-18T04:56:07Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 1
}

 To: user.updated
 Sent: {
  "args": {
    "userId": "12345",
    "updatedFields": {
      "userName": "John Doe",
      "email": "john.doe@newdomain.com"
    },
    "timestamp": "2025-02-18T05:00:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 2
}

 To: user.deleted
 Sent: {
  "args": {
    "userId": "12345",
    "timestamp": "2025-02-18T05:05:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 3
}

 To: circle.created
 Sent: {
  "args": {
    "circleId": "67890",
    "circleName": "Tech Enthusiasts",
    "createdBy": "12345",
    "timestamp": "2025-02-18T06:00:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 4
}

 To: circle.updated
 Sent: {
  "args": {
    "circleId": "67890",
    "updatedFields": {
      "circleName": "Tech Innovators"
    },
    "timestamp": "2025-02-18T06:05:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 5
}

 To: circle.deleted
 Sent: {
  "args": {
    "circleId": "67890",
    "timestamp": "2025-02-18T06:10:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 6
}
userメッセージ受信 no.1, 2, 3を受信できている
received `user` messages
# receive user
binding_keys(queue_bind(routing_key=<arg>)): ['user.#']
 [*] Waiting for logs. To exit press CTRL+C
 from: user.created
 Receive: {
  "args": {
    "userId": "12345",
    "userName": "John Doe",
    "email": "john.doe@example.com",
    "timestamp": "2025-02-18T04:56:07Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 1
}

 from: user.updated
 Receive: {
  "args": {
    "userId": "12345",
    "updatedFields": {
      "userName": "John Doe",
      "email": "john.doe@newdomain.com"
    },
    "timestamp": "2025-02-18T05:00:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 2
}

 from: user.deleted
 Receive: {
  "args": {
    "userId": "12345",
    "timestamp": "2025-02-18T05:05:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 3
}
circleメッセージ受信 no.4, 5, 6を受信できている
received `circle` messages
# receive circle
binding_keys(queue_bind(routing_key=<arg>)): ['circle.#']
 [*] Waiting for logs. To exit press CTRL+C
 from: circle.created
 Receive: {
  "args": {
    "circleId": "67890",
    "circleName": "Tech Enthusiasts",
    "createdBy": "12345",
    "timestamp": "2025-02-18T06:00:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 4
}

 from: circle.updated
 Receive: {
  "args": {
    "circleId": "67890",
    "updatedFields": {
      "circleName": "Tech Innovators"
    },
    "timestamp": "2025-02-18T06:05:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 5
}

 from: circle.deleted
 Receive: {
  "args": {
    "circleId": "67890",
    "timestamp": "2025-02-18T06:10:00Z"
  },
  "exchange_name": "topic_entity_events",
  "no.": 6
}
なぜ<Entity>.*ではなく<Entity>.#にしたのか

<Entity>.<something>.<Event>を発行したくなるかもしれないと感じたから。というのが理由ではあるものの、リポジトリの設計を集約エンティティに対して行っていれば<Entity>.<Event>で済むのでは?とも感じた。つまり設計でミスしなければいいのでは?と。とはいえ設計ミスが発生しないことを前提にするのも無理があるんじゃないか?という思いもあり。ミスをカバーできると考え#にした。
この許容により実装レベルで設計外のメッセージが使われてしまう余地もあるので諸々含めて自信があれば<Entity>.*のような厳格な形式を用いるべきとは思う。

まとめ

ここまでの作業をこなせばRabbitMQのTOPIC Exchangeを使ったメッセージ送受信が行える予定。
次のステップは、このメッセージング機能のシステムの開発への組込みの実践になる(個人的には)。
CQRS+ES採用システム自体を構築するのは手探り状態なので組み込んだときの使い勝手をどう感じるのかは不明。一般的には下記のような4プロセス(?)の構成になるらしい(マルチスレッドではないよな)。

  • コマンドハンドラ(Writeサービス): FastAPI(Python)とか
  • イベントストア(データベース): MariaDB, PostgreSQL, MongoDB(NoSQL)とか
  • クエリハンドラ(Readサービス): FastAPI(Python)とか
  • イベントバス: RabbitMQ

Event Sourcingでjson記録するからと言ってNoSQLでなければならないわけでもないらしい。とはいえパフォーマンスにどう影響があるのか。MongoDBだと速いとか省メモリとかメリットがあったりRDBだと遅かったりとかあるのか。検証してみたいものの、できる機会は訪れるのだろうか?探せば検証記事が見つかるような気もするけど数値で見るだけでなく体験したい…。

Discussion