Closed15

「RabbitMQ」をPythonで試す

kun432kun432

公式サイト

https://www.rabbitmq.com/

RabbitMQ

全てをキューにまとめるブローカー

なぜRabbitMQなのか?

RabbitMQは信頼性が高く成熟したメッセージングおよびストリーミングブローカーであり、クラウド環境、オンプレミス、ローカルマシンのいずれでも簡単にデプロイできます。現在、世界中で数百万人に利用されています。

一般的な使用例

以下は、コミュニティや顧客からよく聞くRabbitMQの一般的な使用例です。これらの例は、RabbitMQが何であるか、そしてどのように役立つかをよりよく理解するのに役立つでしょう。

相互接続されたサービスの分離

たとえば、エンドユーザーに通知を送る必要があるバックエンドサービスを持っているとします。通知には2つのチャネルがあり、1つはメール、もう1つはモバイルアプリ用のプッシュ通知です。

バックエンドサービスは通知を2つのキューに発行します。1つはメール用、もう1つはプッシュ通知用です。メール管理プログラムとプッシュ通知管理プログラムは、それぞれ興味のあるキューに登録し、通知が届くとすぐに処理を開始します。

メリット

  • RabbitMQが負荷の急増を吸収します。
  • 通知管理プログラムのメンテナンス中でも、サービス全体を中断することなく運用できます。

リモートプロシージャコール(Remote Procedure Call)

あなたがコンサートホールを所有しているとします。チケットは複数のウェブサイトや物理的な端末(キオスク)を通じて販売されています。すべての注文は、チケットの在庫状況に応じて購入者がチケットを確保できるかどうかを判断するために、複雑な処理を経る必要があります。この処理では、ウェブサイトやキオスクができるだけ短い時間で注文結果を受け取ることを求められます。

注文は、RabbitMQのキューにcorrelation ID付きで発行されます。注文を送信した発行者は、別のキューにサブスクライブし、同じcorrelation IDを持つ回答を待ちます。

低遅延を実現するには、クラシックキュー(Classic Queue)が適していますが、安全性が若干低くなる可能性があります。この場合、発行者がリトライできる仕組みが必要です。もし注文が失われることが許されない場合、確認(Acknowledgment)クォーラムキュー(Quorum Queues)を組み合わせて、メッセージが確認された時点で安全であることを保証する方法が適しているでしょう。

このトポロジーは、注文処理を直列化し、「先着順」で処理することを可能にします。これにより、トランザクションを必要とせずに整合性を保つことができます。

メリット

  • RabbitMQのクライアントは、発行者(パブリッシャー)と受信者(コンシューマー)を同時に実行できます。
  • RabbitMQは、RPCコールの分散処理に利用できます。

ストリーミング

あなたは動画プラットフォームを運営しています。ユーザーが新しい動画をアップロードすると、その動画が安全に保存された後に、いくつかのタスクを実行する必要があります。例えば、アップロード後の分析を行う、動画の低画質版をトランスコードする、著者の新作を登録している他のユーザーに通知する、などです。

アップロードサービスは「新しい動画」というイベントをRabbitMQのストリームに追加します。複数のバックエンドアプリケーションがそのストリームをサブスクライブし、それぞれ独立して新しいイベントを読み取ることができます。ユーザーへの通知はすぐに行う必要がありますが、アップロード後の分析は待機して、1日に1回実行することも可能です。

メリット

  • ストリームは非常に効率的で、メッセージの複製を必要としません。
  • 複数のコンシューマーが同時に存在していても、コンシューマーはストリーム内で前後に移動することができます。

IoT

あなたは銀河全域で荷物配送サービスを提供しています。多数の宇宙ドローンが定期的に、ケプラー438bに設置されたサーバーに自身のステータスを報告する必要があります。しかし、ネットワーク接続はあまり良好ではありません…。

各宇宙ドローンは、ローカルのRabbitMQスタンドアロンノードを実行しており、上流のRabbitMQに接続可能になるまで報告をバッファに蓄積します。

惑星が整列したとき、ドローンのRabbitMQがすべての報告を上流のRabbitMQに転送します。

メリット

  • RabbitMQのデプロイメントは、ShovelsFederationなどの機能を活用することで、サービスのさまざまなニーズに応じて連鎖させることが可能です。
  • MQTTは、数百万の同時接続に適したプロトコルです。

ライセンスについて

RabbitMQは、2007年の初回リリース以来、無料でオープンソースのソフトウェアとして提供されています。加えて、Broadcomは商用のオファリングも提供しています。

無料でオープンソース

RabbitMQは、Apache License 2.0とMozilla Public License 2のデュアルライセンスの下で提供されています。そのため、RabbitMQを自由に使用および改変することが可能です。

もちろん、貢献は大歓迎です!バグ報告、パッチの提供、他のユーザーのサポート、ドキュメントの作成、あるいはプロジェクトを広める活動など、どのような形でも構いません。実際、プロジェクトを支援する最善の方法は貢献することです!詳しくは、Contributorsページをご覧ください。

商用サービス

Broadcomは、エンタープライズ向けの24時間365日サポートを提供しており、RabbitMQを開発しているエンジニアに直接アクセスできます。

さらに、RabbitMQには商用サービスも用意されており、基本的なすべての機能に加え、ウォームスタンバイレプリケーションクラスタ内データ圧縮といった高度な管理機能を利用できます。これらの機能は、大規模なワークロードに必須のものです。

商用サービスの一覧については、Tanzu RabbitMQとオープンソースRabbitMQディストリビューションの実行方法の表をご覧ください。

kun432kun432

Get Started

公式ドキュメントのGet Startedに従って進める。

https://www.rabbitmq.com/tutorials

様々な言語向けに用意されているが、今回はPythonで。

インストール

今回はローカルのMac上で。Homebrewなどでもインストールできるが、自分はDockerを使った。

作業ディレクトリを作成して、Python仮想環境を作成する。自分はmiseを使うが、適宜。

mkdir rabbitmq-test && cd rabbitmq-test
mise use python@3.12
cat << 'EOS' >> .mise.toml

[env]
_.python.venv = { path = ".venv", create = true }
EOS

RabbitMQのコンテナを起動

docker run -it --rm \
    --name rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    rabbitmq:4.0-management
kun432kun432

チュートリアルはまず大きく2つに分かれている

  • RabbitMQキュー
  • RabbitMQストリーム

最初にキューのチュートリアルを進める

RabbitMQキュー

キューのチュートリアルは7つの例で構成されている。

  1. "Hello World!"
  2. Work Queues
  3. Publish/Subscribe
  4. Routing
  5. Topics
  6. RPC
  7. Publisher Confirms

これらにRabbitMQのデフォルトプロトコルである「AMQP 0-9-1」の内容がカバーされているらしい。

順にやっていく。

kun432kun432

RabbitMQキュー チュートリアル: 1. "Hello, World"

https://www.rabbitmq.com/tutorials/tutorial-one-python

RabbitMQは、「メッセージブローカー」であり、データを受信して何かしらに転送する仕組みである。この時に扱われるデータを「メッセージ」という。

その他のRabbitMQのメッセージングで使用される用語は以下

  • プロデューサーProducer
    • メッセージの送信者
  • コンシュマーConsumer
    • メッセージの受信者
  • キューQueue
    • メッセージをためておくバッファ
    • プロデューサーがメッセージをキューに送り、コンシュマーがメッセージをキューから取り出す

最初にRabbitMQは「ブローカー」であるとあったけど、それを踏まえると多分こうなるのだと思う。

「ブローカー」は株式用語で売買取引の「仲介人」らしい。なるほど、RabbitMQはメッセージの送信者と受信者の間に立って仲介を行うということね。

なお、役割は分かれているけれども、それぞれが同一ホスト内に損刺ししても、違うホストに分かれていても構わない。実際に多くのアプリケーションは別々に分かれていることが多い。また、アプリケーションがプロデューサーとコンシュマーの両方になることもありうる。

ということで、上のプロデューサーがメッセージを"hello"キューに送信、コンシュマーがキューを受信して出力する"Hello, World"のサンプル。

Pythonではpikaを使う。

pip install pika
pip freeze | grep -i pika
出力
pika==1.3.2

プロデューサー(send.py)のコード

send.py
import pika

# RabbitMQサーバへ接続
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
# メッセージ送受信用のチャネルを開く
channel = connection.channel()

# キュー名の宣言
channel.queue_declare(queue='hello')

# メッセージをキューに送信(publish)
channel.basic_publish(
    # ルーティング時に使用する(今回は未使用)
    exchange='',
    # メッセージ送信先キュー名
    routing_key='hello',
    # 送信するメッセージ
    body='こんにちは!'
)
print(" [x] 'こんにちは!' を送信しました。")

# RabbitMQサーバとの接続をクローズ
connection.close()

コンシュマー(receive.py)のコード

receive.py
import pika, sys, os

def main():
    # RabbitMQサーバへ接続
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    # メッセージ送受信用のチャネルを開く
    channel = connection.channel()
    # キュー名の宣言
    channel.queue_declare(queue='hello')

    # メッセージ受診時に呼び出されるコールバック関数
    def callback(ch, method, properties, body):
        print(f" [x] '{body}' を受信しました。")

    # メッセージを受信(消費)
    channel.basic_consume(
        # 対象のキュー名
        queue='hello',
        # キュー受信時に呼び出すコールバック関数
        on_message_callback=callback,
        # メッセージ受信後、RabbitMQに自動的に確認(acknowledge)を送信->そのキューを削除
        auto_ack=True
    )
    print(' [*] メッセージを待っています。終了する場合は CTRL+C で終了します。')

    # メッセージ受信を開始(無限ループで待機)
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

それぞれ細かいところは一旦置いておいて、実行してみる。

コンシュマーを起動

python receive.py
出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。

ではプロデューサーからメッセージを送信してみる。

python send.py

コンシュマー側の出力は以下となる。

出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
 [x] 'b'\xe3\x81\x93\xe3\x82\x93\xe3\x81\xab\xe3\x81\xa1\xe3\x81\xaf\xef\xbc\x81'' を受信しました。

メッセージを複数回送信すると同じように出力される。

メッセージはバイナリデータに変換されるらしいので、コンシュマー側でデコードしてやればいいかな。

receive.py
(snip)
    def callback(ch, method, properties, body):
        # メッセージをUTF−8でデコード
        decoded_body = body.decode('utf-8')
        print(f" [x] '{decoded_body}' を受信しました。")
(snip)

再度実行するとこんな感じになる。

出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
 [x] 'こんにちは!' を受信しました。
 [x] 'こんにちは!' を受信しました。
 [x] 'こんにちは!' を受信しました。

なお、コンシュマー側での受信を止めて、プロデューサーがメッセージを送信し続けるとキューに溜まる。これを確認するにはRabbitMQのCLIを使える。

python send.py
python send.py
python send.py
python send.py
docker exec -ti rabbitmq rabbitmqctl list_queues
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	4

コンシュマーを起動するとまとめて受信された。

python receive.py
出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
 [x] 'こんにちは!' を受信しました。
 [x] 'こんにちは!' を受信しました。
 [x] 'こんにちは!' を受信しました。
 [x] 'こんにちは!' を受信しました。
kun432kun432

RabbitMQキュー チュートリアル: 2. Work Queues

RabbitMQでは複数のコンシュマー(ワーカー)に処理を分散することができる。これにより、時間のかかるタスクや負荷の高いタスクを分散して効率的に処理することができる。この仕組みを、作業キュー(Work Queue)、またはタスクキュー(Task Queue)というらしい。

実際に試してみる。

上で使用したプロデューサー側スクリプト(send.py)を修正して、コマンドライン引数でメッセージを渡せる新しいスクリプト(new_task.py)を作成。

new_task.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello')

# コマンドライン引数でメッセージを取得
message = ' '.join(sys.argv[1:]) or "こんにちは!"

channel.basic_publish(
    exchange='',
    routing_key='hello',
    body=message
)
print(f" [x] '{messasge}' を送信しました。")

connection.close()

次にコンシュマー側スクリプト(receive.py)に、「時間のかかる処理」を擬似的に再現するためのスリープを入れた新しいスクリプト(worker.py)を作成。

worker.py
import pika, sys, os
import time
from datetime import datetime

def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        decoded_body = body.decode('utf-8')
        print("{} [x] '{}' を受信しました。".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), decoded_body))
        # 時間がかかる処理を擬似的に再現するためのスリープ
        time.sleep(len(decoded_body))
        print("{} [x] '{}' を完了しました。".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), decoded_body))

    channel.basic_consume(
        queue='hello',
        on_message_callback=callback,
        auto_ack=True
    )
    print(' [*] メッセージを待っています。終了する場合は CTRL+C で終了します。')

    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

ラウンドロビンのディスパッチ

ターミナルを複数開いてそれぞれでworker.pyを実行

ターミナル1
python worker.py
ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
ターミナル2
python worker.py
ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。

ではnew_task.pyでメッセージを連続で送信してみる。

python new_task.py "テスト1"
python new_task.py "テスト2"
python new_task.py "テスト3"
python new_task.py "テスト4"

それぞれのワーカーの出力

ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:24:53 [x] 'テスト1' を受信しました。
2025-01-26 16:24:57 [x] 'テスト1' を完了しました。
2025-01-26 16:24:57 [x] 'テスト3' を受信しました。
2025-01-26 16:25:01 [x] 'テスト3' を完了しました。
ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:24:54 [x] 'テスト2' を受信しました。
2025-01-26 16:24:58 [x] 'テスト2' を完了しました。
2025-01-26 16:24:58 [x] 'テスト4' を受信しました。
2025-01-26 16:25:02 [x] 'テスト4' を完了しました。

上記を見ると、

  • メッセージはそれぞれのワーカーに交互に処理されて、ラウンドロビンになっている
  • それぞれのワーカーは受信したメッセージの処理が終わってから次のメッセージを処理している

となっているのがわかる。つまり、処理・負荷の分散と並列処理が可能ということ。なお、分散ロジックはラウンドロビンがデフォルトとなっている。

メッセージの確認応答(Acknowledgment)

コンシュマーが受信したメッセージを、例えばとても時間がかかるものだった場合に、コンシュマーが処理途中で落ちてしまうと、デフォルトだとこのメッセージは処理が完了されず失われてしまうことになる。上記でやったような複数ワーカー構成の場合なら、別のワーカーでこれを代替できるようにしたくなる。

これを実現するのが「メッセージの確認応答」になる。「メッセージの確認応答」を有効にすると、ワーカーがメッセージを受信・処理完了後にack(nowledgment)=「処理完了」を返すと、RabbitMQがこのメッセージをキューから削除、つまり安全にロストなくメッセージの処理が行われるということになる。

上のコードではこれが自動で有効になっていた。この部分。

worker.py
(snip)
    channel.basic_consume(
        queue='hello',
        on_message_callback=callback,
        auto_ack=True  # メッセージ受信したら、確認応答を自動で返す、つまりタスク成功可否に関係ない
    )
(snip)

受信したメッセージが処理できたかどうかはワーカー側で判断すべきことであり、RabbitMQではこれは手動で行われることを想定している。

手動でackを返すように変更する。

worker.py
(snip)
    def callback(ch, method, properties, body):
        decoded_body = body.decode('utf-8')
        print("{} [x] '{}' を受信しました。".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), decoded_body))
        time.sleep(len(decoded_body))
        print("{} [x] '{}' を完了しました。".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), decoded_body))
        # 処理成功後に手動でackを返す
        ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(
        queue='hello',
        on_message_callback=callback,
        auto_ack=False  # auto_ackを無効化。デフォルトは無効なので指定しなくてもよい。
    )
(snip)

では再度worker.pyを複数起動。

ターミナル1
python worker.py
ターミナル2
python worker.py

確認の簡単のため、長めのメッセージを渡してみる。

python new_task.py "テストテストテスト1"

1つ目のワーカーで処理された模様

ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:51:29 [x] 'テストテストテスト1' を受信しました。
2025-01-26 16:51:38 [x] 'テストテストテスト1' を完了しました。
ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。

再度送信

python new_task.py "テストテストテスト2"

ラウンドロビンなのでもう1つのワーカーで処理が開始される。

ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:52:38 [x] 'テストテストテスト2' を受信しました。

これを途中で止める。

ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:52:38 [x] 'テストテストテスト2' を受信しました。
^CInterrupted

するともう1つのワーカーの方で処理が行われる。

ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 16:51:29 [x] 'テストテストテスト1' を受信しました。
2025-01-26 16:51:38 [x] 'テストテストテスト1' を完了しました。
2025-01-26 16:52:41 [x] 'テストテストテスト2' を受信しました。
2025-01-26 16:52:51 [x] 'テストテストテスト2' を完了しました。

なお、ackが返されていないメッセージが溜まると再配信のループが行わるような感じでメモリを圧迫する。rabbitmqctlを使えばackが返されていないメッセージを確認することができる。

まず何もない状態。

docker exec -ti rabbitmq rabbitmqctl list_queues  name messages_ready messages_unacknowledged
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages_ready	messages_unacknowledged
hello	0	0
  • messages_ready: まだ配信されていないメッセージ数。
  • messages_unacknowledged: 配信されたがACKされていないメッセージ数。

ワーカーを1つだけにして、メッセージを送信。以下の受信後・処理「中」の状態でrabbitmqctlで確認。

出力
2025-01-26 17:01:29 [x] 'テストテストテスト3' を受信しました。
docker exec -ti rabbitmq rabbitmqctl list_queues  name messages_ready messages_unacknowledged
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages_ready	messages_unacknowledged
hello	0	1

処理が終わる前にワーカーを強制終了して、再度確認。

出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages_ready	messages_unacknowledged
hello	1	0

なるほど。以下に記載がある。

もしコンシューマーがACK(確認応答)を送信せずに終了(チャンネルが閉じられる、接続が切れる、またはTCP接続が失われるなど)した場合、RabbitMQはそのメッセージが完全には処理されていないと判断し、再度キューに戻します。同時に他のコンシューマーがオンラインであれば、そのメッセージは迅速に別のコンシューマーに再配信されます。この仕組みにより、ワーカーが時折終了するような場合でも、メッセージが失われることはありません。

このあたりを見ると、上記以外の場合で応答が返ってこない場合にはタイムアウトが適用される様子。

デフォルトでは、コンシューマーがACKを送信するまでのタイムアウトが30分に設定されています。このタイムアウトは、ACKを送信しない不具合のある(スタックした)コンシューマーを検出するのに役立ちます。タイムアウトの値は、「Delivery Acknowledgement Timeout」で説明されているように増加させることができます。

メッセージの耐久性

コンシュマーが異常終了した場合は上記ackでメッセージのロストを回避できる。ではRabbitMQサーバが強制終了した場合はどうなるか?

RabbitMQサーバは、サーバの停止時(シャットダウンやクラッシュ)に、デフォルトではキューやメッセージは保存されない、つまり全て失われる。サーバがクラッシュしても失われないようにするには、キューとメッセージの両方の耐久性を確保する必要がある。

まずキュー。RabbitMQサーバが再起動してもキューが残るようにするには、キューを「耐久性あり(durable)」として指定する。new_task.pyの以下の箇所を修正。

new_task.py
(snip)
channel.queue_declare(
    queue='hello',
    durable=True    # キューを「耐久性あり」に指定
)
(snip)

ただし、すでにあるキューに対しては変更できない。実行するとこうなる。

出力
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'")

この場合には、

  • キュー名を別名にする
  • RabbitMQサーバを再起動(してキューを削除)後に再度指定する。

必要がある。ここまでの手順だとRabbitMQコンテナはフォアグラウンドで動いているはずなので、Ctrl+cで停止。

^C2025-01-26 08:27:47.461149+00:00 [notice] <0.64.0> SIGTERM received - shutting down
(snip)
2025-01-26 08:27:47.495199+00:00 [info] <0.555.0> Message store for directory '/var/lib/rabbitmq/mnesia/rabbit@3645d298b22d/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient' is stopped

再度RabbitMQを立ち上げる。確認の簡単のため、--rmは外して、ボリュームを永続化。ついでにバックグラウンドで実行。Dockerの場合はホスト名を指定してやる必要がある。(RabbitMQはキューディレクトリにホスト名を使用する。Dockerの場合はホスト名が毎回変わってしまうため。)

docker run -it -d \
    --name rabbitmq \
    -h "rabbitmq" \
    -p 5672:5672 \
    -p 15672:15672 \
    -v ./rabbitmq-data:/var/lib/rabbitmq/mnesia \
    rabbitmq:4.0-management

メッセージを送信

python new_task.py "テスト"

キューを確認してみる。

docker exec -ti rabbitmq rabbitmqctl list_queues
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	1

コンテナを再起動。

docker restart rabbitmq

再度キューを確認。

docker exec -ti rabbitmq rabbitmqctl list_queues
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	0

メッセージは消えてしまっているが、キューは残っていることがわかる。

コンテナでボリューム永続化してるなら当然、というふうに見えるが、以下のようにしてコンテナ再起動すると、上は残るが下は消える。

new_task.py
(snip)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='non_durable_hello')
(snip)

なお、この設定はプロデューサー・コンシュマーの両方で必要な点に注意。

次にメッセージの耐久性を確保する。メッセージの耐久性はpublish時にメッセージのプロパティでdelivery_mode=pika.DeliveryMode.Persistentを指定する。

new_task.py
(snip)
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body=message,
    # メッセージのプロパティで「耐久性あり」を指定
    properties=pika.BasicProperties(
       delivery_mode = pika.DeliveryMode.Persistent
    )
)
(snip)

ではメッセージを送信。

python new_task.py "テスト"

キューを確認。

docker exec -ti rabbitmq rabbitmqctl list_queues

耐久性なしのキューも表示されている。

出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	1
non_durable_hello	0

コンテナを再起動。

docker restart rabbitmq

再度キューを確認

docker exec -ti rabbitmq rabbitmqctl list_queues

耐久性なしのキューは削除され、耐久性ありのキュー・耐久性ありのメッセージは再起動後でも残っていることがわかる。

出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	1

なお、必ずしも完全に保証されるわけではない。

メッセージ永続化に関する注意点

メッセージを永続化としてマークすることは、メッセージが絶対に失われないことを完全に保証するものではありません。永続化設定により、RabbitMQはメッセージをディスクに保存するよう指示されますが、RabbitMQがメッセージを受け入れた後にディスクに保存されるまでの短いタイムウィンドウが存在します。この間にRabbitMQが停止した場合、メッセージが失われる可能性があります。

また、RabbitMQはすべてのメッセージに対してfsync(2)(ディスクへの即時書き込み)を実行するわけではありません。一部のメッセージはキャッシュに保存されるだけで、実際にはディスクに書き込まれていない場合があります。そのため、永続化による保証は完全ではありません。ただし、これでも単純なタスクキューには十分なレベルの信頼性を提供します。

もし、より強力な保証が必要であれば、Publisher Confirms(パブリッシャー確認)を使用することを検討してください。

公平なディスパッチ

ラウンドロビンは公平なように思えるが、以下のようなケースを考えてみる

  • 2つのワーカーが存在する
  • 奇数番目のメッセージは「重いタスク」(処理時間が長い)。
  • 偶数番目のメッセージは「軽いタスク」(処理時間が短い)。
  • 上記のメッセージを送信し続けると・・・
    • 1つ目のワーカーは重いタスクばかり処理し続けて、ずっと忙しい状態
    • もう1つのワーカーは軽いタスクだけを処理し続けて、ほぼ待機状態と同じ

つまり、偏りが発生することになる。実際にやってみるとこうなる。

python new_task.py "テストテストテストテストテスト"
python new_task.py "テスト"
python new_task.py "テストテストテストテストテスト"
python new_task.py "テスト"
python new_task.py "テストテストテストテストテスト"
python new_task.py "テスト"
ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 18:31:04 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:31:19 [x] 'テストテストテストテストテスト' を完了しました。
2025-01-26 18:31:19 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:31:34 [x] 'テストテストテストテストテスト' を完了しました。
2025-01-26 18:31:34 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:31:49 [x] 'テストテストテストテストテスト' を完了しました。
ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 18:31:05 [x] 'テスト' を受信しました。
2025-01-26 18:31:08 [x] 'テスト' を完了しました。
2025-01-26 18:31:08 [x] 'テスト' を受信しました。
2025-01-26 18:31:11 [x] 'テスト' を完了しました。
2025-01-26 18:31:11 [x] 'テスト' を受信しました。
2025-01-26 18:31:14 [x] 'テスト' を完了しました。

これを回避するにはchannel.basic_qos()メソッドを使って、各ワーカーが同時に受け取れるメッセージ数を制限する、、、ということは、ワーカーはメッセージを処理しながら(それが未処理・処理中であったとしても)次のメッセージを受信している、ということなんだな。で、同時に受け取れるメッセージ数をprefetch_countで指定すると。なるほど、上の動きがよくわかる。

worker.py
    # ワーカーが同時受信可能なメッセージ数を1に制限
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue='hello',
        on_message_callback=callback,
        auto_ack=False
    )
    print(' [*] メッセージを待っています。終了する場合は CTRL+C で終了します。')

これで再度試してみると以下のようにある程度公平に分散されているのがわかる。

ターミナル1 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 18:42:19 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:42:34 [x] 'テストテストテストテストテスト' を完了しました。
2025-01-26 18:42:34 [x] 'テスト' を受信しました。
2025-01-26 18:42:37 [x] 'テスト' を完了しました。
2025-01-26 18:42:37 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:42:52 [x] 'テストテストテストテストテスト' を完了しました。
ターミナル2 出力
 [*] メッセージを待っています。終了する場合は CTRL+C で終了します。
2025-01-26 18:42:21 [x] 'テスト' を受信しました。
2025-01-26 18:42:24 [x] 'テスト' を完了しました。
2025-01-26 18:42:24 [x] 'テストテストテストテストテスト' を受信しました。
2025-01-26 18:42:39 [x] 'テストテストテストテストテスト' を完了しました。
2025-01-26 18:42:39 [x] 'テスト' を受信しました。
2025-01-26 18:42:42 [x] 'テスト' を完了しました。

ただし、当然処理完了を待って次の処理に進むので、以下あたりについても考慮が必要。

キューサイズに関する注意点

すべてのワーカーが忙しい場合、キューがいっぱいになる可能性があります。そのため、キューの状況を注意深く監視する必要があります。また、必要に応じてワーカーを追加したり、メッセージのTTL(有効期限)を設定することを検討してください。

kun432kun432

RabbitMQキュー チュートリアル: 3. Publish/Subscribe

https://www.rabbitmq.com/tutorials/tutorial-three-python

作業キュー(work queue)では、送信されたメッセージはどれか1つのワーカー(コンシュマー)に配信されていた。

Publish/Subscribeと呼ばれるパターンでは、メッセージはすべてのコンシュマーに配信される(注:以下はあくまでもイメージで、実際はひとつ下の図になる。)

昔なつかしいメルマガとかメーリングリストがまさにこれだと思うし、現在でよくあるユースケースとしてはログを複数箇所に送信して、冗長性を確保するとか、出力媒体(コンソールとファイル)を分けるなどの使い方が考えられる。

エクスチェンジ(Exchanges)

ここまでは、プロデューサーはキューを使ってメッセージの送受信、つまりプロデューサーはメッセージをキューに直接送信してきたが、RabbitMQの完全なメッセージングモデルでは、プロデューサーが直接キューにメッセージを送信せずに、エクスチェンジ(Exchange) に送信することになる。

エクスチェンジを使ったメッセージングの流れは以下。

  1. プロデューサーは、エクスチェンジにメッセージを送信
  2. エクスチェンジは、受け取ったメッセージを「どのキューに配信するか」を判断
  3. エクスチェンジは、メッセージを1つまたは複数のキューに配信
  4. キューからコンシュマーにメッセージを配信

これにより、

  • プロデューサーとコンシューマーの完全な疎結合で柔軟・拡張性が高いシステム設計が可能。
  • メッセージの振り分けロジックをエクスチェンジで一元管理可能。
  • 用途に応じた柔軟な配信と同時配信(Fanout、Direct、Topic、Headers)。

などのメリットが生まれる。

ということで実際にやってみる。今回はログシステム的なイメージですべてのキュー・コンシュマーにログを送信するのをやってみる。

プロデューサー側のスクリプト(emit_log.py)はこんな感じ

emit_log.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# エクスチェンジの宣言
channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout'
)

message = ' '.join(sys.argv[1:]) or "info: こんにちは!"

channel.basic_publish(
    exchange='logs',  # エクスチェンジを指定
    routing_key='',
    body=message,
)
print(f" [x] ログ '{message}' を送信しました。")

connection.close()

以前までのプロデューサー→直接キューに送信していたコードはこうなっていた。

send.py
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='こんにちは!'
)

エクスチェンジの指定はこのexchangeパラメータで行う。エクスチェンジの種類には以下の4つがある様子。

  • Topic: トピックパターンを使い、柔軟な条件で配信。
  • Fanout: すべてのキューにメッセージを配信(ブロードキャスト)。
  • Direct: ルーティングキーに基づき特定のキューにのみ配信。
  • Headers: メッセージヘッダーの条件に基づいて配信。

エクスチェンジの宣言は以下のように行う。

emit_log.py
channel.exchange_declare(
    exchange='logs',        # エクスチェンジの名前
    exchange_type='fanout'  # エクスチェンジの種類
)

そして送信時にこの宣言したエクスチェンジを指定する。今度はrouting_keyが空になっている。

emit_log.py
(snip)
channel.basic_publish(
    exchange='logs',  # エクスチェンジを指定
    routing_key='',
    body=message,
)
(snip)

exchangerouting_keyの関係性はこうみたい。

  1. exchangeの指定がない場合
    -「デフォルト」の「無名」エクスチェンジが使用される
    • routing_keyで指定されたキューに配送される
  2. exchangeの指定がある場合
    • 指定された名前のエクスチェンジが使用される

RabbitMQサーバでのエクスチェンジの一覧は以下で確認できる。

docker exec -ti rabbitmq rabbitmqctl list_exchanges
出力
Listing exchanges for vhost / ...
name	type
amq.direct	direct
amq.topic	topic
amq.fanout	fanout
amq.match	headers
	direct
amq.rabbitmq.trace	topic
amq.headers	headers

5つ目のやつが「デフォルト」の「無名」エクスチェンジなのではないかと思われる。

では、コンシュマーはないけれども、一旦ログを送信してみる。

python emit_log.py "ログ1"

再度エクスチェンジの一覧を見てみる。

出力
Listing exchanges for vhost / ...
name	type
amq.direct	direct
amq.topic	topic
amq.fanout	fanout
amq.match	headers
	direct
amq.rabbitmq.trace	topic
amq.headers	headers
logs	fanout

上で定義した"logs"という"fanout"型のエクスチェンジが作成されているのがわかる。

キューも見てみる。

docker exec -ti rabbitmq rabbitmqctl list_queues
出力
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	0
non_durable_hello	0

1つ前のチュートリアルで作成したキューがあるだけで、メッセージは見当たらない。上のプロデューサー側のスクリプトではキューの指定はないし、プロデューサーがキューを意識しなくていいのがエクスチェンジなので、当然と言えば当然か。

とはいえ、コンシュマーはキューが必要なはず、ということでコンシュマー側のスクリプト(receive_log.py

receive_log.py
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# エクスチェンジの宣言
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 一時的な名前付きキューを作成#
result = channel.queue_declare(
    queue='',      # 空文字列の場合、ランダムな名前付きキューが自動生成される
    exclusive=True # 現在の接続でのみ使用され、接続が閉じると削除される一時キューになる
)
# キュー名を取得する
queue_name = result.method.queue
print("[DEBUG] Queue: {queue_name}")

# キュー名とエクスチェンジを紐づけ
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] ログを待っています。終了する場合は CTRL+C で終了します。')

def callback(ch, method, properties, body):
    decoded_body = body.decode('utf-8')
    print(f" [x] {decoded_body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

ここは説明を読んでいてもわかりにくかったので少し噛み砕いてみる。

一時キュー

通常、キュー名は、複数のプロデューサー(メッセージ送信者)とコンシューマー(メッセージ受信者)で同じキューを処理するために重要になる。これまでの例では、特定の「名前付き」キュー(helloとか)を使用していた。

ただ、今回のようなログシステムの場合

  • コンシュマーはすべてのログメッセージを受信する必要がある。キュー名で指定すると特定のメッセージだけを受信する設定になってしまう
  • ログの場合、過去の古いメッセージは不要で、今流れてくるメッセージを処理する必要がある。

となるため、必要なのは「一時的で、接続が閉じたら削除される」キューということになる。

結局のところ、エクスチェンジが入ってきて、プロデューサーとキューは疎結合になったけども、

  • キューとコンシューマーの結合は依然として密結合
    • キュー名が必要なため、結合度が変わらない。
  • キューに「名前」が必要な理由
    • キューがエクスチェンジやコンシューマーとやり取りする際に、一意の識別子としての役割を果たす。

ということなのだけど、ログシステムのようなブロードキャスト的な用途の場合には、 エクスチェンジ・キュー・コンシューマーの紐づけに対して、「名前」を明示的に持つ必要はない、のでこういうことになるって感じなんだろう。

これを実現しているのが以下の部分。

receive_log.py
(snip)
# 一時的な名前付きキューを作成#
result = channel.queue_declare(
    queue='',      # 空文字列の場合、ランダムな名前付きキューが自動生成される
    exclusive=True # 現在の接続でのみ使用され、接続が閉じると削除される一時キューになる
)
# ランダムに生成されたキュー名を取得する
queue_name = result.method.queue
(snip)

channel.queue_declare()でキュー名を指定しなければ(queue='')、まずランダムなキュー名のキューが作成される。これは通常、amq.gen-JzTY20BRgKO-HjmUJj0wLgというような名前になるらしい。そして、このキューはこの接続でだけ動作すればいいので、exclusive=Trueでコンシュマーからの
接続が終われば削除するようにする。

(エクスチェンジとキューの)バインディング

エクスチェンジとキューが作成できたので、これをchannel.queue_bind()で、紐づける。このひも付けをバインディングという。

receive_log.py
# キュー名とエクスチェンジを紐づけ
channel.queue_bind(exchange='logs', queue=queue_name)

一つ上でキュー名を取得しているのは、ランダムに生成される&キューとの紐づけで必要になるからだね。

このバインディングもrabbitmqctlコマンドで確認できる。

docker exec -ti rabbitmq rabbitmqctl list_bindings
出力
Listing bindings for vhost /...
source_name	source_kind	destination_name	destination_kind	routing_key	arguments
	exchange	hello	queue	hello	[]
	exchange	non_durable_hello	queue	non_durable_hello	[]

少し前に作成したキューが表示されている。なるほど、これらは「デフォルト」の「無名」エクスチェンジと紐づいているのだろう。

では実際に動かしてみる。 ワーカーを2つ起動。

ターミナル1
python receive_log.py
ターミナル1 出力
[DEBUG] Queue: amq.gen-ByfpokCJnrbbYKXKejZVQQ
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
ターミナル2
python receive_log.py
ターミナル2 出力
[DEBUG] Queue: amq.gen-K1Us9NRxxwVlqdOMXafmXQ
 [*] ログを待っています。終了する場合は CTRL+C で終了します。

それぞれキュー名が異なっているのがわかる。

ではログを送信してみる。

python emit_log.py "ログ1"
python emit_log.py "ログ2"
python emit_log.py "ログ3"
ターミナル1 出力
[DEBUG] Queue: amq.gen-ByfpokCJnrbbYKXKejZVQQ
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
 [x] ログ1
 [x] ログ2
 [x] ログ3
ターミナル2 出力
[DEBUG] Queue: amq.gen-K1Us9NRxxwVlqdOMXafmXQ
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
 [x] ログ1
 [x] ログ2
 [x] ログ3

両方のワーカーにメッセージが送信されているのがわかる。

バインディングを見てみる。

docker exec -ti rabbitmq rabbitmqctl list_bindings
出力
Listing bindings for vhost /...
source_name	source_kind	destination_name	destination_kind	routing_key	arguments
	exchange	hello	queue	hello	[]
	exchange	amq.gen-K1Us9NRxxwVlqdOMXafmXQ	queue	amq.gen-K1Us9NRxxwVlqdOMXafmXQ	[]
	exchange	amq.gen-ByfpokCJnrbbYKXKejZVQQ	queue	amq.gen-ByfpokCJnrbbYKXKejZVQQ	[]
	exchange	non_durable_hello	queue	non_durable_hello	[]
logs	exchange	amq.gen-ByfpokCJnrbbYKXKejZVQQ	queue	amq.gen-ByfpokCJnrbbYKXKejZVQQ	[]
logs	exchange	amq.gen-K1Us9NRxxwVlqdOMXafmXQ	queue	amq.gen-K1Us9NRxxwVlqdOMXafmXQ	[]

最初にlogsとあるのが今のバインディング。他にもランダムな名前のキューとのバインディングが見えるが、これはコンシュマーを一旦落としたりして過去に使っていたもの。どうやら自動生成された一時キューやそのバインディングは少なくともサーバが起動している限り、しばらくは残るみたい。

ということで、今回のチュートリアルの構成をまとめるとこんな感じ。

kun432kun432

RabbitMQキュー チュートリアル: 4. Routing

https://www.rabbitmq.com/tutorials/tutorial-four-python

一つ上ではログをすべてのコンシュマーにブロードキャストで配信した。今度はこれに、特定のメッセージだけ=特定のログレベルの場合だけ別の処理を行う、例えば、

  • 重要なエラーメッセージはログファイルに保存
  • すべてのメッセージはコンソールに表示

という「ルーティング」をやってみる。図にするとこう。

このチュートリアルはちょっと説明的な感じで最後にまとまったコードだけ書いてあるので、途中家庭もなるべくコードで書いてみたいと思う。

バインディング

前回やった通り、バインディングは、エクスチェンジとキューの紐づけ、つまり、「このキューは、このエクスチェンジからメッセージを受け取りたい」という関係を定義する。

channel.queue_bind(
    exchange=exchange_name,  # メッセージを送信するエクスチェンジ
    queue=queue_name         # メッセージを受け取るキュー
)

ここにrouting_keyパラメータを追加すると、エクスチェンジとキューの間に「条件」を設定することができる。これを「バインディングキー」という(basic_publishにも同じ名前のパラメータがあるので、区別のため。)

channel.queue_bind(
    exchange=exchange_name,  # メッセージを送信するエクスチェンジ
    queue=queue_name,        # メッセージを受け取るキュー
    routing_key='black'      # バインディングキーの設定
)

このバインディングキーの役割はエクスチェンジタイプによって変わる。例えば1つ前にやったfanoutではバインディングキーは無視される(ブロードキャストなので条件を指定する意味がない)

Directエクスチェンジ

エクスチェンジの種類がdirectの場合、メッセージを送信時に指定したルーティングキーに基づいて特定のキューに振り分けることができる。

例えば、プロデューサーがメッセージ送信時に以下のようにルーティングキーを指定してエクスチェンジに送信する。

プロデューサー側
channel.basic_publish(
    exchange='direct_logs',   # 送信先のエクスチェンジ
    routing_key='orange',     # メッセージのルーティングキー
    body='...'                # メッセージの内容
)

コンシュマー側で、エクスチェンジとキューをバインディングする際に、バインディングキーで同じキーを指定する。

コンシュマー側
# エクスチェンジの宣言
channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'  # エクスチェンジの種類を`direct`に指定
)

# キューの宣言
channel.queue_declare(queue='orange_queue')

# エクスチェンジとキューをバインディング
channel.queue_bind(
    exchange='direct_logs',  # メッセージを送信するエクスチェンジ
    queue="orange_queue",    # メッセージを受け取るキュー
    routing_key='orange'     # バインディングキーの設定
)

これで、ルーティングキーとバインディングキーが完全一致する場合、はこのエクスチェンジからの送信はこのキューに紐づけられることになる。もう少し複雑にするとこういう図は、

こういうコードになると思う。

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct' 
)

channel.queue_declare(queue='orange_queue')
channel.queue_declare(queue='other_queue')

channel.queue_bind(
    exchange='direct_logs',
    queue="orange_queue"
    routing_key='orange'
)
channel.queue_bind(
    exchange='direct_logs',
    queue="other_queue",
    routing_key='green'
)
channel.queue_bind(
    exchange='direct_logs',
    queue="other_queue",
    routing_key='black'
)

で、定義されていないルーティングキューのメッセージが送信されてきた場合は破棄される。

(同じバインディングキーで)複数のバインディング

同じバインディングキーでで、複数のキューにバインディングすることも可能。例えば以下。

エクスチェンジの種類 fanout と似ているが、こちらの場合はルーティングキー・バインディングキーがblackである必要がある、つまり、そうでないものは破棄される。

こちらの場合のコードはこうなると思う。

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct' 
)

channel.queue_declare(queue='q1')
channel.queue_declare(queue='q2')

channel.queue_bind(
    exchange='direct_logs',
    queue="q1"
    routing_key='black'
)
channel.queue_bind(
    exchange='direct_logs',
    queue="q2",
    routing_key='black'
)

ログの送信

上はコンシュマー側の処理だったが、送信の大元であるプロデューサー側の処理。これは上でもすでに少し書いた。今回の例ではログの送信なので、ルーティングキーはログレベルにするのが良さそう。

channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message
)

当然、コンシュマー側のバインディングもこれを踏まえたものにする必要がある。

キューのサブスクライブ

ということでコンシュマー側のバインディング。上で少し書いてみたように、ログレベルをコードに書いて個別にバインディングを書いてもいいのだが、引数で渡せるような感じで、動的に設定するようにする。

result = channel.queue_declare(
    queue='', exclusive=True
)
queue_name = result.method.queue

# severities(ログレベル)は起動時のコマンドライン引数から取得する
for severity in severities:
    # ログレベルごとに、ログレベルがルーティングキーに指定されたバインディングを設定
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
)

ここはRabbitMQそのものとは関係なくて、今回のユースケースの実装に関する部分ってだけ。

全部組み合わせてみる

ということで、最終的なコード

まずプロデューサー側(emit_log_direct.py

emit_log.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'
)

severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "テストログ"

channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message,
)
print(f" [x] ログ '{severity}:{message}' を送信しました。")

connection.close()

コンシュマー側(recieve_log_direct.py

receive_log_direct.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'
)

result = channel.queue_declare(
    queue='',
    exclusive=True
)

queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    )

print(' [*] ログを待っています。終了する場合は CTRL+C で終了します。')

def callback(ch, method, properties, body):
    decoded_body = body.decode('utf-8')
    print(f" [x] {method.routing_key}:{decoded_body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

ではコンシュマーを複数立ち上げる。

1つ目は、warningerrorだけを受けてログに書き込むコンシュマー。(PYTHONUNBUFFERED=1はバックグラウンド実行でバッファリングされて出力がリアルタイムに表示されないため、バッファリングを無効化している。)

ターミナル1
PYTHONUNBUFFERED=1 python receive_log_direct.py warning error > logs_from_rabbit.log &
tail -f logs_from_rabbit.log

もう1つは、info / warning / error をすべて受けてコンソールに出力するコンシュマー

ターミナル2
python receive_log_direct.py info warning error

ではログを送信してみる。まず普通のログ。

python emit_log_direct.py info "infoログ"

この場合はコンソール出力のコンシュマー側にだけ出力される。

ターミナル1 出力
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
ターミナル2 出力
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
 [x] info:ログ1

エラー相当のログを送信してみる

python emit_log_direct.py error "errorログ"
python emit_log_direct.py warning "warningログ"
ターミナル1 出力
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
 [x] error:errorログ
 [x] warning:warningログ
ターミナル2 出力
 [*] ログを待っています。終了する場合は CTRL+C で終了します。
 [x] info:ログ1
 [x] error:errorログ
 [x] warning:warningログ

こちらは両方に出力される。

kun432kun432

RabbitMQキュー チュートリアル: 5. Topics

https://www.rabbitmq.com/tutorials/tutorial-five-python

ここまでで、

  • fanoutエクスチェンジによるブロードキャスト配信
  • directエクスチェンジによる特定条件でのルーティング

をやってきたが、directエクスチェンジは「複数条件でのルーティングができない」という制限がある。

ログシステムの例だと、ログレベルだけでなく、ログの発生元ソース(auth、kernなど)も踏まえて、ログの振り分けを行いたい、というかそれができるのがsyslogである。

これを行えるようにするのがtopicエクスチェンジになる。

Topicエクスチェンジ

directエクスチェンジでは、routing_keyパラメータを使って、ルーティングキー・バインディングキーに指定した文字列の完全一致でキューへのルーティングを行った。

topicエクスチェンジでも同じようにrouting_keyパラメータを使うのだが、指定の仕方が変わる

  • ルーティングキー
    • ドットで区切られた単語のリストでなければならない(例: quick.orange.rabbit
    • 単語数に制限はないがrouting_key全体の長さは255バイト以内に収める必要がある
  • バインディングキー
    • 基本的には上と同じ
    • 以下の特殊文字が使用可能
      • *: 1個の単語にマッチ
      • #: 0個以上の単語にマッチ

実際の例。ルーティングキーは <速さ>.<色>.<動物の種類>という3つの単語からなる形式だとして、バインディングキーを以下のように設定する。

以下のルーティングキーの場合にどうなるか?

ルーティングキー マッチする条件 配送先 備考
quick.orange.rabbit *.orange.*
*.*.rabbit
Q1
Q2
lazy.orange.elephant *.orange.*
lazy.#
Q1
Q2
quick.orange.fox *.orange.* Q1
lazy.brown.fox lazy.# Q2
lazy.pink.rabbit *.*.rabbit
lazy.#
Q2 1度だけ配送
(同一キューへの複数条件にマッチ)
quick.brown.fox どれにもマッチしない 破棄
orange どれにもマッチしない 破棄 単語数が少ない
quick.orange.new.rabbit どれにもマッチしない 破棄 単語数が多い
lazy.orange.new.rabbit lazy.# Q2 単語数が多いが#によりマッチ

となる。

この指定方法は柔軟に使えるので、例えば以下のようなバインディングキーだと、他の種類のエクスチェンジと同じような動きになる

  • #: fanoutと同じ
  • *#も使わない: directと同じ

ということでコード。ログシステムの例で。まずプロデューサー側。

emit_log_topic.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.exchange_declare(
    exchange='topic_logs',
    exchange_type='topic' # topicタイプを指定
)

# ルーティングキーは <facility>.<severity> を想定
routing_key = sys.argv[1] if len(sys.argv) > 2 else "anonymous.info"
message = ' '.join(sys.argv[2:]) or "テストログ"

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message,
)
print(f" [x] ログ '{routing_key}:{message}' を送信しました。")

connection.close()

コンシュマー側

receive_log_topic.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='topic_logs',
    exchange_type='topic'
)

result = channel.queue_declare(
    queue='',
    exclusive=True
)

queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print(' [*] ログを待っています。終了する場合は CTRL+C で終了します。')

def callback(ch, method, properties, body):
    decoded_body = body.decode('utf-8')
    print(f" [x] {method.routing_key}:{decoded_body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

実際に動かしてみる。コンシュマーを4つ立ち上げる。

全てを受信する

ターミナル1
python receive_log_topic.py "#"

kern.*を受信

ターミナル2
python receive_log_topic.py "kern.*"

*.critを受信

ターミナル3
python receive_log_topic.py "*.critical"

kern.**.critの両方を受信

ターミナル4
python receive_log_topic.py "kern.*" "*.critical"

kern.criticalなログを送信

python emit_log_topic.py "kern.critical" "クリティカルなカーネルエラー"

すべてのターミナルに出力される

出力
 [x] kern.critical:クリティカルなカーネルエラー

kern.infoの場合

python emit_log_topic.py "kern.info" "普通のカーネルログ"

ターミナル1(#)、2(kern.*)、4(kern.* or*.crit)に出力される

出力
 [x] kern.info:普通のカーネルログ

auth.infoの場合

python emit_log_topic.py "auth.info" "普通の認証ログ"

ターミナル1(#)にだけ出力される

出力
 [x] auth.info:普通の認証ログ
kun432kun432

RabbitMQキュー チュートリアル: 6. RPC(Remote procedure call)

上でやった作業キュー(Work Queue)を使えば、

  • プロデューサーがタスクをキューに送る
  • タスクは複数のワーカー(コンシュマー)に分散して効率よく処理される
  • プロデューサーは結果を待つ必要はなく、非同期的に動作する。

が実現できるが、タスクを完了した後の「結果」を送信者が受け取る仕組みがない。

リモートプロシージャコール (RPC)を使うと、

  • クライアントが「リクエスト」としてキューに処理を送信
  • リモートコンピュータ上で処理を実行し結果を返す
  • クライアントは「レスポンス(結果)」が返ってくるまで待機、同期的に動作する

と流れになり、送信者が「結果」を受け取ることができる。

ということで、このチュートリアルでは、RabbitMQを使ったRPCシステムを構築する。

  • クライアントとスケーラブルなRPCサーバーを作成
  • 「n番目のフィボナッチ数」を返すダミーのRPCサービスを作成

最終的な構成図としてはこんな感じ。細かいところはあとで見ていく。

クライアントのインタフェース

RPCサービスの利用イメージをクライアントのコードで。FibonacciRpcClientというクラスになっていて、callメソッドで値を渡すと、結果を受け取るまで処理をブロックする。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

RPCについての注意もある。自分はRPCの知見がないのでわかっていないが、注意しておくのがよさそう。

RPCについての注意

RPCはコンピューティングにおいて非常に一般的なパターンですが、批判されることも少なくありません。問題は、プログラマーが関数呼び出しがローカルで行われているのか、それとも遅いRPCで行われているのかを意識していない場合に発生します。このような混乱は、予測不可能なシステムを生み出し、デバッグを不必要に複雑にします。ソフトウェアを簡略化する代わりに、誤用されたRPCはメンテナンス不可能なスパゲッティコードを生む可能性があります。

これを踏まえ、以下のアドバイスを検討してください:

  1. ローカル関数呼び出しかリモート関数呼び出しかを明確にする
    • どの関数呼び出しがローカルで、どの関数呼び出しがリモートであるのかが明確であることを確認してください。
  2. システムをドキュメント化する
    • コンポーネント間の依存関係を明確にし、記録してください。
  3. エラーケースを処理する
    • RPCサーバーが長時間ダウンしている場合、クライアントがどのように対応すべきかを設計してください。
  4. 疑問がある場合はRPCを避ける
    • 可能であれば、RPCのようなブロッキング動作を避け、非同期的なパイプラインを使用してください。この方法では、結果が次の計算ステージに非同期でプッシュされます。

コールバックキュー

作業キューの場合は送信者が結果を受け取れなかったが、RabbitMQでRPCを構成する場合、「コールバックキュー」を用意することで、クライアントが結果を受け取ることができる。ただし、サーバはそのコールバックキューがどれなのか?がわからないので、クライアントは「コールバックキュー」をリクエストのプロパティreply_toに設定する必要がある。

(snip)

# コールバックキューを作成
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

# リクエストを送信
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',  # クライアントからの送信先キュー
    properties=pika.BasicProperties(
        reply_to = callback_queue,    # サーバからの結果の返信先キュー
    ),
    body=request
)
(snip)

このメッセージに付与できるプロパティは14種類あるらしいが、よく使われるのは以下だけで、他はほとんど使われることがないらしい。

  • delivery_mode
    • メッセージを「永続的(persistent、値指定の場合は2)」または「一時的(transient、値指定の場合は2以外の値)」としてマークする。
    • 「作業キュー」のチュートリアルで実施済み
  • content_type
    • エンコーディングのMIMEタイプを記述するために使用。
    • 例えば、JSONエンコーディングの場合は application/jsonを設定するとよい。
  • reply_to
    • コールバックキューの名前を指定するために使用。
    • これにより、サーバーは応答メッセージをどのキューに送るべきかを知ることができる。
  • correlation_id
    • RPCのリクエストとレスポンスを関連付けるために使用。
    • これにより、複数のリクエストに対するレスポンスを区別できる。

Correlation id

上記でも出てきたCorrelation idについて。

1つ前のコードでは、RPCリクエストごとに一時的なコールバックキューを作成していた。

# コールバックキューを作成
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

リクエストごとに一意かつ一時的なものが用意されるので、管理上非常に良いと思うのだが、

  • クライアントがリクエストごとにコールバックキューを作成
  • かつ、クライアントが増えれば増えるほど、コールバックキュー作成のコストが発生

になるため、効率はあまり良くないと思われる。

これを回避する方法として、クライアントごとに1つのコールバックキューを作成するという方法がある。

同じクライアントは同じコールバックキューを使うし、クライアントが増えてもその数だけなので、たしかに効率は良さそうであるが、以下のケースではどうなるか?

複数のリクエストを送信した場合、リクエストの内容によっては順番が前後してしまうこともある。1つのクライアントが同じコールバックキューを使う場合、レスポンスがどのリクエストに対するものなのか?を判断できない。

そこで、correlation_idを使えば、

  • correlation_idに一意の値を設定してリクエスト
  • コールバックキューにメッセージが返されたら、correlation_idプロパティを確認
  • リクエストのcorrelation_id同じ値を持っていれば、そのリクエストに対するレスポンスと判断

することで、リクエストに対する正しいレスポンスを特定できるということになる。

ここにも注意書きがあるので参考まで。

「なぜコールバックキュー内の未知のメッセージをエラーとして扱うのではなく無視するべきなのか?」と疑問に思うかもしれません。それは、サーバー側でレースコンディションが発生する可能性があるためです。

可能性は低いものの、RPCサーバーが私たちに回答を送信した直後、リクエストに対する確認メッセージ(acknowledgment message)を送信する前にクラッシュすることがあり得ます。その場合、再起動したRPCサーバーは再びリクエストを処理することになります。こうした理由から、クライアント側では重複したレスポンスを適切に処理しなければなりません。また、RPCは理想的には冪等性(idempotency)を持つべきです。

サマリ

全体の処理の流れは以下となる。

  1. クライアントの初期化
    • クライアントが起動して、匿名で排他的なコールバックキューを作成
  2. クライアントがリクエストメッセージを送信
    • メッセージには次の2つのプロパティが含まれる
      • reply_to: コールバックキューを指定
      • correlation_id: 各リクエストごとに一意の値を設定
  3. リクエストの受信
    • リクエストは rpc_queue キューに送信される
  4. サーバーの処理
    • RPCワーカー(サーバー)は、rpc_queue 上でリクエストを待機
    • リクエストを受信すると、処理を実行し、結果をメッセージとして返す
    • この時、メッセージは reply_to プロパティで指定されたキューに送信される
  5. レスポンスの受信
    • クライアントはコールバックキューでデータを待機。
    • メッセージを受信すると、correlation_id プロパティを確認
    • もしリクエストで設定した値と一致すれば、そのレスポンスをアプリケーションに返す

ということで、コード。

まずサーバ側

rpc_server.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# rpc_queueを宣言
result = channel.queue_declare(queue='rpc_queue')

# フィボナッチ数を返す関数
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

# リクエスト時のコールバック関数
def on_request(ch, method, props, body):
    n = int(body)
    print(f" [.] fib({n})")
    
    # フィボナッチ関数を求める
    response = fib(n)

    # コールバックキューにメッセージを発行 
    ch.basic_publish(
        exchange="",
        routing_key=props.reply_to,               # コールバックキューを指定
        properties=pika.BasicProperties(
            correlation_id = props.correlation_id # correlation_idを指定
        ),
        body=str(response)
    )
    # rpc_queueにackを返す
    ch.basic_ack(delevery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# rpc_queueのキューを受信して処理
channel.basic_consume(
    queue="rpc_queue",
    on_message_callback=on_request,
)

print(' [*] RPCリクエストを待っています。')
channel.start_consuming()

クライアント側

rpc_client.py
import pika
import uuid

# フィボナッチ数を求めるRPCクライアントクラス
class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        # コールバックキューを宣言
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # コールバックキューをサブスクライブ
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    # レスポンス時のコールバック関数
    def on_response(self, ch, method, props, body):
        # correlation_idが一致したら結果を取得
        if self.corr_id == props.correlation_id:
            self.response = body

    # RPCサービスを呼び出す
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())  # co

        # rpc_queueにリクエストを送信
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue, # コールバックキューを指定
                correlation_id=self.corr_id,  # correlation_idを指定
            ),
            body=str(n))
        # コールバックキューからのレスポンスが返ってくるまで待つ
        while self.response is None:
            self.connection.process_data_events(time_limit=None)
        return int(self.response)

# クライアント初期化
fibonacci_rpc = FibonacciRpcClient()

# フィボナッチ数を求めるリクエストを送信
print(" [x] リクエストしています: fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] 結果: {response}")

では実際に動かしてみる。サーバ側を起動

python rpc_server.py
出力
[*] RPCリクエストを待っています。

クライアントを実行

python rpc_client.py

結果が取得された。

出力
 [x] リクエストしています: fib(30)
 [.] 結果: 832040

サーバ側でもリクエストがきていることを確認できる。

出力
 [*] RPCリクエストを待っています。
 [.] fib(30)

上記の例はRPCサービスの実装の一例であり、よりよい実装は他にあるかもしれないが、少なくともでは以下のメリットを実現できている。

  1. スケールアップが簡単
    • RPCサーバーが遅い場合などは、単にもう一台RPCサーバーを起動するだけでスケールアップ可能
  2. クライアント側での効率的な通信
    • RPCクライアントが送信・受信するメッセージはそれぞれ1回のみ
    • queue_declareのような同期的な呼び出しは必要不要
    • 最小のネットワークラウンドトリップ(通信往復)で済む

ただし以下のような課題もある点に注意。

  • サーバーが1台も動作していない場合、クライアントじゃどのように対応すべきか?
  • クライアントがRPCに対してタイムアウトを設定すべきか?
  • サーバーが異常動作して例外を発生させた場合、それをクライアントに転送すべきか?
  • 処理を開始する前に、不正な受信メッセージ(例: 範囲外の値)をチェックして保護すべきか?
kun432kun432

RabbitMQキュー チュートリアル: 7. Publisher Confirms

このチュートリアルは、Python向けがなく、Java/C#/PHPのみとなっているが、チュートリアル2の作業キューに出てきた以下の箇所のことだと思う。

メッセージ永続化に関する注意点

メッセージを永続化としてマークすることは、メッセージが絶対に失われないことを完全に保証するものではありません。永続化設定により、RabbitMQはメッセージをディスクに保存するよう指示されますが、RabbitMQがメッセージを受け入れた後にディスクに保存されるまでの短いタイムウィンドウが存在します。この間にRabbitMQが停止した場合、メッセージが失われる可能性があります。

また、RabbitMQはすべてのメッセージに対してfsync(2)(ディスクへの即時書き込み)を実行するわけではありません。一部のメッセージはキャッシュに保存されるだけで、実際にはディスクに書き込まれていない場合があります。そのため、永続化による保証は完全ではありません。ただし、これでも単純なタスクキューには十分なレベルの信頼性を提供します。

もし、より強力な保証が必要であれば、Publisher Confirms(パブリッシャー確認)を使用することを検討してください。

以下に説明がある

https://www.rabbitmq.com/docs/confirms#publisher-confirms

パブリッシャー確認

ネットワークは、明白でない方法で故障することがあり、一部の故障を検出するには時間がかかります。そのため、ソケットにプロトコルフレームまたはフレームセット(例えば、公開メッセージ)を書き込んだクライアントは、メッセージがサーバーに到達し、正常に処理されたと仮定することはできません。途中で失われた可能性もありますし、配信が大幅に遅延する可能性もあります。

標準の AMQP 0-9-1 を使用する場合、メッセージが失われないことを保証する唯一の方法は、トランザクションを使用することです。つまり、チャネルをトランザクション対応にして、メッセージまたはメッセージセットごとにパブリッシュ、コミットを行うのです。この場合、トランザクションは不必要に重くなり、スループットが 250 分の 1 に低下します。これを改善するために、確認メカニズムが導入されました。これは、プロトコルにすでに存在する消費者確認メカニズムを模倣したものです。

確認を有効にするには、クライアントがconfirm.selectメソッドを送信します。 no-waitが設定されているか否かによって、ブローカーはconfirm.select-okで応答する場合があります。 チャネルでconfirm.selectメソッドが使用されると、確認モードに入ったことになります。 トランザクションチャネルは確認モードに入ることができず、チャネルが確認モードに入ると、トランザクションチャネルにすることはできません。

チャネルが確認モードになると、ブローカーとクライアントの両方がメッセージをカウントします(カウントは最初のconfirm.selectで1から開始します)。ブローカーは、同じチャネル上でbasic.ackを送信することで、メッセージを処理する際にメッセージを確認します。delivery-tagフィールドには、確認済みのメッセージのシーケンス番号が格納されます。ブローカーは、basic.ackmultipleフィールドを設定して、シーケンス番号を持つメッセージまでが処理済みであることを示すこともできます。

kun432kun432

RabbitMQストリーム

ストリームのチュートリアルは2つの例で構成されているので、これも順にやっていく。

  1. "Hello World!"
  2. Offset Tracking

ストリームの場合はRabbitMQでストリームプラグインを有効にする必要がある。ストリーム用のポート(5552)を割り当てて、環境変数でストリームプロトコルのホスト設定を行う。ドキュメントはRabbitMQのバージョンが3.13だったが、上のチュートリアルで使用したバージョンに合わせてみた。

docker run -it -d \
    --name rabbitmq \
    -h "rabbitmq" \
    -p 5672:5672 \
    -p 15672:15672 \
    -p 5552:5552 \
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
    -v ./rabbitmq-data:/var/lib/rabbitmq/mnesia \
    rabbitmq:4.0-management

立ち上がったら、プラグインを有効化する。

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 
出力
Enabling plugins on node rabbit@rabbitmq:
rabbitmq_stream
rabbitmq_stream_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_stream
  rabbitmq_stream_management
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@rabbitmq...
The following plugins have been enabled:
  rabbitmq_stream
  rabbitmq_stream_management

started 2 plugins.
kun432kun432

RabbitMQストリーム チュートリアル: 1. "Hello World!"

https://www.rabbitmq.com/tutorials/tutorial-one-python-stream

ストリームでの"Hello, World"チュートリアル。シンプルにプロデューサーがメッセージを送信して、コンシュマーがそのメッセージを受信するというもので、まずは詳細なAPIの説明は後回しで、Pythonのrstreamクライアントライブラリを使用して、基本的な動作を見るという流れ。

このrstreamというのは元々個人の開発者が作成したもので、今はRabbitMQがメンテしている、RabbitMQストリームプロトコル専用のクライアントライブラリらしい。

https://github.com/rabbitmq-community/rstream

パッケージインストール

pip install rstream
pip freeze | grep -i rstream
出力
rstream==0.20.8

チュートリアルで使うコードは以下にある。

プロデューサー側のsend.py
https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/python-stream/send.py

コンシュマー側のreceive.py
https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/python-stream/receive.py

まず送信から。日本語でコメントを入れた。

send.py
import asyncio

from rstream import Producer

STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000


async def send():
    # RabbitMQサーバに接続
    async with Producer(
        host="localhost",
        username="guest",
        password="guest",
    ) as producer:
        # 新しいストリームを作成
        await producer.create_stream(
            # ストリーム名
            STREAM_NAME,
            # ストリームが既に存在する場合は無視
            exists_ok=True,
            # ストリームの最大サイズを5GBに設定
            arguments={"max-length-bytes": STREAM_RETENTION}
        )

        # メッセージを送信
        await producer.send(
            # ストリーム名
            stream=STREAM_NAME,
            # メッセージ
            # 訳注: メッセージはバイト列で送信する必要がある。
            # 元のコードは
            # `message=b'Hello, World!'`
            # となっていたが、日本語の場合は
            # `encode("utf-8")`
            # でバイト列に変換する。
            message="こんにちは!".encode("utf-8")
        )

        print(" [x] メッセージを送信しました。")

        input(" [x] Enterを押してプロデューサを終了します。")

with asyncio.Runner() as runner:
    runner.run(send())

各処理について見ていく。

まず、Producerクラスを使って、RabbitMQに接続。

async def send():
    async with Producer(
        host="localhost",
        username="guest",
        password="guest",
    ) as producer:

Producer クラスは、その名前の通り、RabbitMQストリームの送信側を設定するためのクラスで、ソケット接続・プロトコルのバージョン交渉・認証などが抽象化されている。

次にストリームを作成。

STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000

(snip)
        await producer.create_stream(
            # ストリーム名
            STREAM_NAME,
            exists_ok=True,
            arguments={"max-length-bytes": STREAM_RETENTION}
        )

create_stream()メソッドで新しいストリームを作成するが、ストリームがすでに存在する場合はエラーになる。exists_ok=Trueをつけることで、すでにストリームが存在してもエラーにはならない。

ストリームは追加専用のログのようなものであり、破棄されるまでの一定期間保持され、繰り返し読み取ることができる。 arguments={"max-length-bytes": ...}で保持ポリシーを設定すれば、そのサイズが保持するストリームの最大上限となる。

そしてメッセージを送信する。

        await producer.send(
            stream=STREAM_NAME,
            message="こんにちは!".encode("utf-8")
        )

メッセージはバイト列で指定する。JSONやMessagePackなどの形式でエンコードしたデータを送信することも可能。元のコードでは、メッセージは message=b'Hello, World!'となっていたが、日本語の場合はUTF-8エンコードしてバイト列に変換する必要がある。

次に受信側。

receive.py
import asyncio

from rstream import (
    AMQPMessage,
    Consumer,
    ConsumerOffsetSpecification,
    MessageContext,
    OffsetType,
)

STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000


async def receive():
    # RabbitMQサーバに接続
    async with Consumer(
        host="localhost",
        username="guest",
        password="guest"
    ) as consumer:
        # ストリームを作成
        await consumer.create_stream(
            # ストリーム名
            STREAM_NAME,
            # ストリームが既に存在する場合は無視
            exists_ok=True,
            # ストリームの最大サイズを5GBに設定
            arguments={"max-length-bytes": STREAM_RETENTION}
        )

        # メッセージを受信したときのコールバック関数
        #   msg:
        #     受信したメッセージを表すAMQPMessageオブジェクト
        #   message_context:
        #     メッセージのコンテキストを表すMessageContextオブジェクト
        #     どのストリームからのメッセージかを識別するために使用
        async def on_message(msg: AMQPMessage, message_context: MessageContext):
            # ストリームを取得
            stream = message_context.consumer.get_stream(message_context.subscriber_name)
            # 日本語の場合はデコードする必要がある
            print("ストリーム {} から メッセージを受信: {}".format(stream, msg.decode("utf-8")))

        print("Enterを押してコンシューマを終了します。")
        # コンシュマーを開始
        await consumer.start()
        # ストリームをサブスクライプ
        await consumer.subscribe(
            # ストリーム名
            stream=STREAM_NAME,
            # メッセージを受信したときのコールバック関数
            callback=on_message,
            # ストリーム内のどこからメッセージを受信するかを指定
            # 以下の場合は、最初のメッセージから受信を開始する
            offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None),
        )

        try:
            # コンシュマーを無限ループで実行
            await consumer.run()
        except (KeyboardInterrupt, asyncio.CancelledError):
            print("コンシューマを終了します。。。")
            return


with asyncio.Runner() as runner:
    runner.run(receive())

こちらも処理ごとに。

接続およびストリームの作成については、基本的にプロデューサ側と同じ。Consumer クラスを使うというところが違うだけ。

(snip)

STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000

async def receive():
    async with Consumer(
        host="localhost",
        username="guest",
        password="guest"
    ) as consumer:
        await consumer.create_stream(
            STREAM_NAME,
            exists_ok=True,
            arguments={"max-length-bytes": STREAM_RETENTION}
        )
(snip)

で、次に、メッセージ受信時のコールバック関数を作成して、consumer.subscribeでそれを指定する。offset_specificationはコンシュマーがストリーム内のどこから受信するかのオフセットを指定する。以下の例だとストリームの最初から受信する。

        async def on_message(msg: AMQPMessage, message_context: MessageContext):
            stream = message_context.consumer.get_stream(message_context.subscriber_name)
            print("ストリーム {} から メッセージを受信: {}".format(stream, msg.decode("utf-8")))

        print("Enterを押してコンシューマを終了します。")
        await consumer.start()
        await consumer.subscribe(
            stream=STREAM_NAME,
            callback=on_message,
            offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None),
        )

        try:
            await consumer.run()
        except (KeyboardInterrupt, asyncio.CancelledError):
            print("コンシューマを終了します。。。")
            return

では早速実行してみる。まずコンシュマーを起動。

python receive.py
出力
Enterを押してコンシューマを終了します。

こちらはこのままにして、別ターミナルでプロバイダーを実行。

python send.py
出力
 [x] メッセージを送信しました。
 [x] Enterを押してプロデューサを終了します。

ENTERを押してプロデューサを終了すると、コンシュマー側に以下のように出力される。

出力
Enterを押してコンシューマを終了します。
ストリーム hello-python-stream から メッセージを受信: こんにちは!

プロデューサは一度メッセージを送ったら終了する。何度か実行すると以下のように新しいメッセージが受信される。

出力
Enterを押してコンシューマを終了します。
ストリーム hello-python-stream から メッセージを受信: こんにちは!
ストリーム hello-python-stream から メッセージを受信: こんにちは!
ストリーム hello-python-stream から メッセージを受信: こんにちは

コンシュマーを増やしてみる。別ターミナルでもう1つreceive.pyを起動する。

python receive.py
出力
Enterを押してコンシューマを終了します。
ストリーム hello-python-stream から メッセージを受信: こんにちは!
ストリーム hello-python-stream から メッセージを受信: こんにちは!
ストリーム hello-python-stream から メッセージを受信: こんにちは!

ここまで送信されたメッセージが最初から取得される。

kun432kun432

通常のキューとストリームはどう違うのか?というとここ。

ストリームは追加専用のログのようなものであり、破棄されるまでの一定期間保持され、繰り返し読み取ることができる。 arguments={"max-length-bytes": ...}で保持ポリシーを設定すれば、そのサイズが保持するストリームの最大上限となる。

ストリームのドキュメントにもこうある。

https://www.rabbitmq.com/docs/streams

ただし、メッセージの保存と消費の方法という2つの重要な点において、ストリームはキューとは異なります。

ストリームは、メッセージの追記のみ可能なログをモデル化しており、有効期限が切れるまで繰り返し読み取ることができます。ストリームは常に永続的かつ複製されます。このストリームの動作に関するより技術的な説明は、「非破壊的コンシューマーセマンティクス」です。

RabbitMQのストリームからメッセージを読み取るには、1つ以上のコンシューマーが購読し、同じメッセージを何度でも読み取ることができます。

ChatGPT o3-miniによると以下らしい

RabbitMQでは、従来からある「キュー」と、より新しい「ストリーム」は、メッセージの保持方法や消費モデル、用途において以下のような違いがあります。


1. メッセージの保持と消費モデル

  • キュー

    • 消費後の削除:
      キューに送られたメッセージは、消費者に配信され「受信(ACK)されると削除」されるのが基本動作です。
    • プッシュモデル:
      メッセージはブローカーから自動的に(プッシュ方式で)消費者に配布され、ロードバランシング的に使われることが多いです。
    • 一度きりの消費:
      通常、各メッセージは1度だけ配信され、消費済みのメッセージは再利用や再読込みはできません。
  • ストリーム

    • メッセージの永続的なログ:
      ストリームは、メッセージが「追加(append-only)」されるログのようなもので、一定の保持ポリシー(サイズや時間)に基づいてメッセージが保存されます。
    • 消費者によるオフセット管理:
      各消費者は独自の読み取り位置(オフセット)を管理できるため、同じストリーム内のメッセージを何度も読み返したり、異なるタイミングで消費を開始したりできます。
    • プルモデルに近い:
      消費者が自分のペースでメッセージを取得(pull)する使い方がしやすく、リアルタイムなデータストリームの再処理や、履歴データの分析などに向いています。

2. 用途と適用シナリオ

  • キューの利用例:
    タスクの分散処理や、1回限りのメッセージ配信(ワークキュー)、要求応答パターンなど、従来型のメッセージング用途に適しています。

  • ストリームの利用例:
    ログの集約、大量のデータを高速にストリーム処理する必要があるシナリオ、後からデータをリプレイして分析する必要がある場合(例えば、イベントソーシングやリアルタイム解析)に適しています。


3. パフォーマンスとスケーラビリティ

  • キュー:
    消費後にメッセージが削除されるため、メモリやディスク上のデータ量は一定に保たれやすいですが、複数回の読み直しや長期間のメッセージ保持には向いていません。

  • ストリーム:
    大量のメッセージを連続的に追加し、消費者が必要に応じて過去データを再読込できる設計となっており、高スループットやリプレイ性が要求されるシナリオに対応しやすいです。


まとめ

  • キュー:
    「一度きりの配信」が基本。メッセージは消費されると削除され、プッシュ型で消費者に割り当てられるため、シンプルなタスク分散や要求応答に最適。

  • ストリーム:
    メッセージをログのように保持し、消費者が独自のオフセットを管理できるため、複数回のリプレイや後からの再処理が可能。大量データのストリーム処理や、イベントソーシングなど、柔軟性と高スループットが求められるシナリオに向いています。

このように、RabbitMQでは利用シーンに応じて「キュー」と「ストリーム」を選択することで、最適なメッセージングアーキテクチャを実現できます。

kun432kun432

まとめ

きっかけはこれで使用されていたことなのだけど。

https://zenn.dev/kun432/scraps/f43428e1dc101e

自分はあまりpub/subみたいなものの経験がなく、並行・並列処理や非同期なんかもあまり知見がなかったが、今回いろいろ触ってみて、結構な可能性を感じている。

このスクラップは2025/02/09にクローズされました