👌
Python で publish-subscribe パターン
方針としては observer パターンと似たようなものですが。
ここではかなり単純にしていますが、publish-subscribe パターンを使えば Windows のメッセージのようなものを実装できます( ref. ウィンドウ メッセージ ‐ 通信用語の基礎知識 )。
msgid に結び付けられた複数の func を実行できるようになっています。こんなイメージ。
msgid | func | |
---|---|---|
0 | func0 | |
1 | func1 | func2 |
3 | func3 | |
4 | func4 |
queue には固定値を put していますが、たとえば複数の publisher を実装し、各々異なる何かを put すれば多対多のやりとりが実装できます。
dict を queue に put するときコピーを生成しないと、 get する側では dict のインスタンスだけが取得されます。値が無いものが取得されます( Python multiprocessing Queue put() behavior - Stack Overflow )
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Callable
from queue import Queue
_handler = {}
def register(msgid: int, func: Callable[[int, str], None]):
if _handler.get(msgid) is None:
_handler[msgid] = []
_handler[msgid].append(func)
def publisher(q):
while True:
d = {1: 'message0'}
q.put(d.copy()) # copy により値を渡さないといかんです
d.clear()
time.sleep(1)
def subscriber(q):
while True:
data = q.get()
for msgid, msg in data.items():
if not _handler.get(msgid) is None:
for func in _handler[msgid]:
func(msgid, msg)
time.sleep(1)
def func0(msgid: int, msg: str):
print(f"func0 {msgid} {msg}")
def func1(msgid: int, msg: str):
print(f"func1 {msgid} {msg}")
def func2(msgid: int, msg: str):
print(f"func2 {msgid} {msg}")
def func3(msgid: int, msg: str):
print(f"func3 {msgid} {msg}")
def func4(msgid: int, msg: str):
print(f"func4 {msgid} {msg}")
def main():
register(0, func0)
register(1, func1)
register(1, func2)
register(3, func3)
register(4, func4)
q = Queue()
with ThreadPoolExecutor() as executor:
executor.submit(publisher, q)
executor.submit(subscriber, q)
main()
Discussion