Pub/Sub パターンの勉強がてら通知機能を作ってみた
Observer パターンや Pub/Sub パターンについて、知識としては理解しているつもりでも実際にゼロイチで作ったことがなかったので、勉強がてら通知機能を作ってみました。
作成した通知機能
通知機能・・・とはいいつつ、実際の通知は行わず、コンソールに出力するだけのシンプルなものです。ただし、実際のプロダクトを想定してインターフェースを設計しています。
- 購読者(Subscriber)は、通知を受け取るための手段(メール、Slack、LINEなど)と通知を受け取る種類(イベントタイプ)を登録できる
- 通知者(Publisher)は、通知手段を意識せず、イベントタイプとメッセージだけを関心として持ち、Subscriberたちに通知を送る
- 通知を受け取るための手段はとしてのインフラ実装は Subscriber に直接持たせず、NotificationInfrastructure というインターフェースに委譲する
今回実装したコードはこちらにあります
pub-sub-learn
Subscriber
import abc
from pydantic import EmailStr
from loguru import logger
from app.infrastructure import NotificationInfrastructure
class Subscriber(abc.ABC):
def __init__(self, destination: str | list[str] = None):
self._infra: NotificationInfrastructure | None = None
self.destination = destination
@property
def infra(self) -> NotificationInfrastructure:
if self._infra is None:
raise ValueError("NotificationInfrastructure is not set")
return self._infra
@infra.setter
def infra(self, infra: NotificationInfrastructure):
self._infra = infra
logger.info(f"Setting infrastructure for {self.__class__.__name__}")
@abc.abstractmethod
def get_event_type(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def get_subscriber_info(self) -> dict:
raise NotImplementedError
def notify(self, destination: str | list[str], title: str, message: str):
self.infra.send(destination=destination, title=title, content=message)
def validate(self):
self.infra.validate_destination(self.destination)
class MailSubscriber(Subscriber):
def __init__(self, event_type: str, mail_to: list[EmailStr]):
super().__init__(destination=mail_to)
self.event_type = event_type
self.mail_to = mail_to
def get_event_type(self) -> str:
return self.event_type
def get_subscriber_info(self) -> dict:
return {
"type": "mail",
"email_to": self.mail_to,
}
抽象クラス Subscriber
インターフェースとして notify
と validate
を持ち、実際は infra
に委譲しています。
MailSubscriber
メール通知を受け取る Subscriber です。コンストラクタでメールアドレスを受け取ります。
Infrastructure
import abc
from loguru import logger
class NotificationInfrastructure(abc.ABC):
@abc.abstractmethod
def send(self, destination: str | list[str], title: str, content: str):
raise NotImplementedError
@abc.abstractmethod
def validate_destination(self, destination: str | list[str]):
raise NotImplementedError
class NullNotificationInfra(NotificationInfrastructure):
"""
何もしない通知インフラストラクチャ
"""
def send(self, destination: str | list[str], title: str, content: str):
pass
def validate_destination(self, destination: str | list[str]):
pass
class MailNotificationInfra(NotificationInfrastructure):
def send(self, destination: str | list[str], title: str, content: str):
# 具体的なメール送信処理はここでは省略。
logger.info(f"Sending email to {destination}")
def validate_destination(self, destination: str | list[str]):
if not isinstance(destination, list):
destination = [destination]
for email in destination:
if not "@" in email: # This is a simple check, not a full validation
raise ValueError(f"Invalid email address: {email}")
抽象クラス NotificationInfrastructure
通知インフラストラクチャのインターフェースです。send
と validate_destination
を持ちます。
send
は通知を送信するメソッドで、validate_destination
は送信先のバリデーションを行うメソッドです。
今回はサンプルとして MailNotificationInfra
と NullNotificationInfra
を実装しています。
ここで Slack など他の通知手段を実装することもできます。
MailNotificationInfra
メール通知のインフラストラクチャです。send
はメールの送信処理を行いますが、ここではlogger
で出力するだけに留めています。
実際には、SendGrid や AWS SES などのメール送信サービスを利用することになるでしょう。
また、validate_destination
はメールアドレスのバリデーションを行いますが、ここでは簡易的なチェックに留めています。実際は Pydantic の EmailStr などを使って厳密なバリデーションを行うべきです。
NullNotificationInfra
TDDをする際に作った何もしない通知インフラストラクチャです。
NotificationService
from collections import defaultdict
from app.infrastructure import NotificationInfrastructure
from app.subscribers import Subscriber
class NotificationService:
def __init__(self):
self.subscribers: dict[str, list[Subscriber]] = defaultdict(list)
self.infrastructures: dict[str, NotificationInfrastructure] = {}
def register_infrastructure(
self, subscriber_type: str, infrastructure: NotificationInfrastructure
):
"""
NotificationInfrastructure を実装したクラスを登録する。
:param subscriber_type: str Subscriber タイプ。主に名前
:param infrastructure: NotificationInfrastructure NotificationInfrastructure を実装したクラス
:return:
"""
self.infrastructures[subscriber_type] = infrastructure
def get_infrastructure(self, subscriber_type: str) -> NotificationInfrastructure:
return self.infrastructures[subscriber_type]
def add_subscriber(self, event_type: str, subscriber: Subscriber):
"""
Subscriber を登録する。
subscriber_type に対応する NotificationInfrastructure を register_infrastructure で登録する必要がある
:param event_type: str イベントタイプ
:param subscriber: Subscriber 登録する Subscriber インスタンス
:return:
"""
subscriber_type = subscriber.__class__.__name__
infrastructure = self.get_infrastructure(subscriber_type)
subscriber.infra = infrastructure
subscriber.validate()
self.subscribers[event_type].append(subscriber)
def notify(self, event_type: str, title: str, message: str):
"""
登録されている Subscriber に通知を送信する
:param event_type: str イベントタイプ
:param title: str 通知タイトル
:param message: str メッセージ本文
:return:
"""
for subscriber in self.subscribers[event_type]:
subscriber.notify(
destination=subscriber.destination, title=title, message=message
)
NotificationService
Infrastructures と Subscribers を持ち、Subscriber に通知を送信する窓口となります。
add_subscriber
で Subscriber を登録しますが、対応した NotificationInfrastructure
をあらかじめ登録しておこないとvalidate
で例外が発生します。
Controller
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
from loguru import logger
from app.infrastructure import MailNotificationInfra
from app.notification_service import NotificationService
from app.subscribers import MailSubscriber
app = FastAPI()
class EmailSubscribeRequest(BaseModel):
email: EmailStr
event_type: str
class Event(BaseModel):
event_type: str
data: dict
notification_service = NotificationService()
mail_infra = MailNotificationInfra()
notification_service.register_infrastructure(
subscriber_type="MailSubscriber", infrastructure=mail_infra
)
@app.post("/subscribe")
async def subscribe(subscribe_req: EmailSubscribeRequest):
logger.info(f"Subscribing {subscribe_req.email} to {subscribe_req.event_type}")
mail_subscriber = MailSubscriber(
event_type=subscribe_req.event_type, mail_to=[subscribe_req.email]
)
notification_service.add_subscriber(
event_type=subscribe_req.event_type, subscriber=mail_subscriber
)
return {"email": subscribe_req.email, "event_type": subscribe_req.event_type}
@app.post("/publish")
async def publish(event: Event):
logger.info(f"Publishing {event.event_type} event")
logger.debug(event.data)
notification_service.notify(
event_type=event.event_type,
title="New article published",
message=f"New article published: {event.data}",
)
return {"message": "Event published"}
FastAPI で動作するAPIエンドポイントです。 /subscribe
でメールアドレスを登録し、 /publish
で通知を送信します。
/subscribe
ではメールに関する情報しか受け取っていませんが、他の通知手段を提供する場合は改良する必要があるでしょう。
動作確認
購読者登録
Postman を使って /subscribe
にリクエストを送信します。
new_article
イベントがあった場合に test@example.com
に対する通知を登録しています
2025-01-01 17:28:11.659 | INFO | app.main:subscribe:31 - Subscribing test@example.com to new_article
2025-01-01 17:28:11.659 | INFO | app.subscribers:infra:26 - Setting infrastructure for MailSubscriber
INFO: 127.0.0.1:52293 - "POST /subscribe HTTP/1.1" 200 OK
リクエストを送信すると、 MailSubscriber
を作成し、 NotificationService
に登録しています。
通知
次は /publish
にリクエストを送信して通知を行います。
購読した際と同じように、new_article
イベントを送信しています。
2025-01-01 17:28:16.524 | INFO | app.main:publish:45 - Publishing new_article event
2025-01-01 17:28:16.524 | DEBUG | app.main:publish:46 - {'title': 'Hello world', 'body': "Hey, what's up?"}
2025-01-01 17:28:16.524 | INFO | app.infrastructure:send:32 - Sending email to ['test@example.com']
INFO: 127.0.0.1:52293 - "POST /publish HTTP/1.1" 200 OK
先ほどの Subscriber は new_article
に対する購読をしている為、イベントタイプが異なるリクエストを送信した場合 test@example.com
に通知は行われません。
# 🙅 `new_user` への通知なので、上で購読した Subscriber には通知が行われない
2025-01-02 08:56:34.322 | INFO | app.main:publish:45 - Publishing new_user event
2025-01-02 08:56:34.322 | DEBUG | app.main:publish:46 - {'title': 'A new user joined', 'body': ''}
INFO: 127.0.0.1:56673 - "POST /publish HTTP/1.1" 200 OK
# 🙆 `new_article` への通知なので、上で購読した Subscriber に通知が行われる
2025-01-02 08:56:45.708 | INFO | app.main:publish:45 - Publishing new_article event
2025-01-02 08:56:45.708 | DEBUG | app.main:publish:46 - {'title': 'Hello world', 'body': "Hey, what's up?"}
2025-01-02 08:56:45.708 | INFO | app.infrastructure:send:32 - Sending email to ['test@example.com']
INFO: 127.0.0.1:56683 - "POST /publish HTTP/1.1" 200 OK
まとめ
Pub/Sub パターンを用いた通知機能を作成しました。
今回は自分なりに設計してみましたが、やっていると抽象クラス(他の言語の場合はインターフェースになるかも)の理解や知識を深める良い題材だなと感じました。
この辺りの理解が深まると中級クラスのエンジニアとしても活躍できると思いますし、Pub/Subパターンを実際に組めると設計の幅も広がるので、ぜひ挑戦してみてください
/以上
Discussion