Python で publish-subscribe パターン

2 min読了の目安(約2000字TECH技術記事

方針としては observer パターンと似たようなものですが。

ここではかなり単純にしていますが、publish-subscribe パターンを使えば Windows のメッセージのようなものを実装できます( ref. ウィンドウ メッセージ ‐ 通信用語の基礎知識 )。

msgid に結び付けられた複数の func を実行できるようになっています。こんなイメージ。

msgid func
0 func0
1 func1 func2
3 func3
4 func4

queue には固定値を put していますが、たとえば複数の publisher を実装し、各々異なる何かを put すれば多対多のやりとりが実装できます。

dict を queue に put するときコピーを生成しないと、 get する側では dict のインスタンスだけが取得されます。値が無いものが取得されます( Python multiprocessing Queue put() behavior - Stack Overflow )

import time
from concurrent.futures import ThreadPoolExecutor
from typing import Callable
from queue import Queue


_handler = {}


def register(msgid: int, func: Callable[[int, str], None]):
    if _handler.get(msgid) is None:
        _handler[msgid] = []
    _handler[msgid].append(func)


def publisher(q):
    while True:
        d = {1: 'message0'}
        q.put(d.copy())         # copy により値を渡さないといかんです
        d.clear()
        time.sleep(1)


def subscriber(q):
    while True:
        data = q.get()
        for msgid, msg in data.items():
            if not _handler.get(msgid) is None:
                for func in _handler[msgid]:
                    func(msgid, msg)
        time.sleep(1)


def func0(msgid: int, msg: str):
    print(f"func0 {msgid} {msg}")


def func1(msgid: int, msg: str):
    print(f"func1 {msgid} {msg}")


def func2(msgid: int, msg: str):
    print(f"func2 {msgid} {msg}")


def func3(msgid: int, msg: str):
    print(f"func3 {msgid} {msg}")


def func4(msgid: int, msg: str):
    print(f"func4 {msgid} {msg}")


def main():
    register(0, func0)
    register(1, func1)
    register(1, func2)
    register(3, func3)
    register(4, func4)
    q = Queue()
    with ThreadPoolExecutor() as executor:
        executor.submit(publisher, q)
        executor.submit(subscriber, q)


main()