Open1

ZeroMQとは?

nnn112358nnn112358

ZeroMQとは

ZeroMQ(ØMQ、ZMQ)は、高性能な非同期メッセージングライブラリです。ソケットの抽象化と同時に、ソケット以上の機能を提供します。ZeroMQは以下の特徴を持っています:

  • 軽量: 最小限のリソースで動作します
  • 高速: 非常に低いレイテンシで通信できます
  • パターン指向: 複数の通信パターン(PUB-SUB、REQ-REP、PUSH-PULLなど)をサポートしています
  • 多言語対応: C、C++、Python、Java、.NET、Goなど多くの言語でバインディングが提供されています
  • トランスポート非依存: inproc(プロセス内)、IPC(プロセス間)、TCP、PGM(マルチキャスト)といった複数のトランスポートをサポートしています

ZeroMQはソケット通信の複雑さを隠蔽し、シンプルなAPIでネットワークプログラミングを可能にします。また、キューイング、メッセージフィルタリング、パターンマッチングなどの機能も提供します。

主要な通信パターン

ZeroMQは以下のような通信パターンをサポートしています:

  1. Request-Reply (REQ-REP): クライアント-サーバーモデル
  2. Publish-Subscribe (PUB-SUB): パブリッシャーからサブスクライバーへの一方向通信
  3. Push-Pull (PUSH-PULL): タスク分散パターン
  4. Dealer-Router (DEALER-ROUTER): 非同期Request-Reply
  5. Pair (PAIR): 排他的ペア接続

ZeroMQで文字列を送信するPython

以下は、ZeroMQを使用してPythonで文字列を送信する簡単な例です。この例ではREQ-REPパターンを使用します。

まず、ZeroMQをインストールします:

pip install pyzmq

送信側(クライアント)の実装

import zmq
import time

# ZeroMQコンテキストを作成
context = zmq.Context()

# REQソケットを作成(RequestパターンのクライアントソケットタイプはREQ)
print("クライアントを起動中...")
socket = context.socket(zmq.REQ)

# サーバーに接続
socket.connect("tcp://localhost:5555")

# 複数のリクエストを送信
for request_num in range(5):
    message = f"Hello {request_num}"
    print(f"送信: {message}")
    
    # メッセージを送信
    socket.send_string(message)
    
    # サーバーからの応答を待機
    response = socket.recv_string()
    print(f"受信: {response}")
    
    # 少し待機
    time.sleep(1)

# ソケットとコンテキストを閉じる
socket.close()
context.term()

PUB-SUBパターンを使用した送信例

import zmq
import time

context = zmq.Context()

# PUBソケットを作成(パブリッシャー)
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

print("パブリッシャーを起動中...")
# 少し待機して接続を確立
time.sleep(1)

# メッセージをパブリッシュ
count = 0
while count < 10:
    topic = "update"
    message = f"ニュース #{count}"
    print(f"パブリッシュ: {topic} {message}")
    
    # トピックとメッセージを送信
    publisher.send_string(f"{topic} {message}")
    
    count += 1
    time.sleep(0.5)

# ソケットとコンテキストを閉じる
publisher.close()
context.term()

ZeroMQで受信するPython

受信側(サーバー)の実装 - REQ-REPパターン

import zmq
import time

# ZeroMQコンテキストを作成
context = zmq.Context()

# REPソケットを作成(ReplyパターンのサーバーソケットタイプはREP)
socket = context.socket(zmq.REP)

# ソケットをバインド(特定のアドレスとポートで待機)
socket.bind("tcp://*:5555")

print("サーバーを起動中...")

# リクエストを待機してレスポンスを返す無限ループ
while True:
    try:
        # クライアントからのメッセージを受信
        message = socket.recv_string()
        print(f"受信: {message}")
        
        # 少し処理に時間がかかると仮定
        time.sleep(1)
        
        # クライアントに応答
        response = f"応答: {message}"
        socket.send_string(response)
        
    except KeyboardInterrupt:
        break

# ソケットとコンテキストを閉じる
socket.close()
context.term()

PUB-SUBパターンを使用した受信例

import zmq

context = zmq.Context()

# SUBソケットを作成(サブスクライバー)
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")

# 受信するトピックをフィルタリング(空文字列ですべてのメッセージを受信)
# 特定のトピックだけを購読する場合は: subscriber.setsockopt_string(zmq.SUBSCRIBE, "update")
topic_filter = "update"
subscriber.setsockopt_string(zmq.SUBSCRIBE, topic_filter)

print(f"サブスクライバーを起動中... トピックフィルター: '{topic_filter}'")

# メッセージ受信の無限ループ
try:
    while True:
        # メッセージを受信
        message = subscriber.recv_string()
        topic, content = message.split(' ', 1)
        print(f"受信 [{topic}]: {content}")
        
except KeyboardInterrupt:
    pass

# ソケットとコンテキストを閉じる
subscriber.close()
context.term()

高度な使用例

PUSH-PULLパターン (ワーカーパターン)

送信側(タスク配信)

import zmq
import random
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

print("タスク配信を開始します。Ctrl+Cで終了。")

# タスク配信
try:
    task_id = 0
    while True:
        # タスクデータ
        workload = random.randint(1, 100)
        task_id += 1
        
        # タスクの送信
        sender.send_json({
            "task_id": task_id,
            "workload": workload
        })
        
        print(f"タスク #{task_id} を送信 (作業量: {workload})")
        time.sleep(0.1)
        
except KeyboardInterrupt:
    pass

sender.close()
context.term()

受信側(ワーカー)

import zmq
import time

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

print("ワーカーを起動しました。タスクを待機中... Ctrl+Cで終了。")

# タスク処理
try:
    while True:
        # タスクの受信
        task = receiver.recv_json()
        task_id = task["task_id"]
        workload = task["workload"]
        
        print(f"タスク #{task_id} を受信 (作業量: {workload})")
        
        # タスクの処理をシミュレート
        time.sleep(workload / 100)  # 作業量に応じた処理時間
        
        print(f"タスク #{task_id} の処理完了")
        
except KeyboardInterrupt:
    pass

receiver.close()
context.term()

使用上の注意点

  1. コンテキストの共有: アプリケーション内で単一のコンテキストを使用することをお勧めします。
  2. ソケットの終了: アプリケーション終了時にはソケットとコンテキストを適切に閉じてください。
  3. 非同期通信: ZeroMQは非同期通信が基本です。必要に応じてタイムアウトを設定してください。
  4. メッセージフォーマット: JSON、MessagePack、Protocol Buffersなど構造化されたデータを送信する場合は、適切なシリアライズ方法を選択してください。
  5. エラーハンドリング: ネットワークエラーやタイムアウトに適切に対応するコードを実装してください。

ZeroMQは非常に柔軟で強力なメッセージングライブラリですが、その使い方を理解するにはパターンや概念に慣れる必要があります。公式ドキュメントやチュートリアルを参照しながら、徐々に複雑なパターンを試していくことをお勧めします。

https://zeromq.org/languages/python/
https://pyzmq.readthedocs.io/en/latest/
https://zguide.zeromq.org/docs/chapter1/