🔔

Pub/Sub パターンの勉強がてら通知機能を作ってみた

2025/01/02に公開

Observer パターンや Pub/Sub パターンについて、知識としては理解しているつもりでも実際にゼロイチで作ったことがなかったので、勉強がてら通知機能を作ってみました。

作成した通知機能

通知機能・・・とはいいつつ、実際の通知は行わず、コンソールに出力するだけのシンプルなものです。ただし、実際のプロダクトを想定してインターフェースを設計しています。

  • 購読者(Subscriber)は、通知を受け取るための手段(メール、Slack、LINEなど)と通知を受け取る種類(イベントタイプ)を登録できる
  • 通知者(Publisher)は、通知手段を意識せず、イベントタイプとメッセージだけを関心として持ち、Subscriberたちに通知を送る
  • 通知を受け取るための手段はとしてのインフラ実装は Subscriber に直接持たせず、NotificationInfrastructure というインターフェースに委譲する

今回実装したコードはこちらにあります
pub-sub-learn

Subscriber

subscribers.py
import abc
from pydantic import EmailStr
from loguru import logger

from app.infrastructure import NotificationInfrastructure


class Subscriber(abc.ABC):

    def __init__(self, destination: str | list[str] = None):
        self._infra: NotificationInfrastructure | None = None
        self.destination = destination

    @property
    def infra(self) -> NotificationInfrastructure:
        if self._infra is None:
            raise ValueError("NotificationInfrastructure is not set")

        return self._infra

    @infra.setter
    def infra(self, infra: NotificationInfrastructure):
        self._infra = infra
        logger.info(f"Setting infrastructure for {self.__class__.__name__}")

    @abc.abstractmethod
    def get_event_type(self) -> str:
        raise NotImplementedError

    @abc.abstractmethod
    def get_subscriber_info(self) -> dict:
        raise NotImplementedError

    def notify(self, destination: str | list[str], title: str, message: str):
        self.infra.send(destination=destination, title=title, content=message)

    def validate(self):
        self.infra.validate_destination(self.destination)


class MailSubscriber(Subscriber):
    def __init__(self, event_type: str, mail_to: list[EmailStr]):
        super().__init__(destination=mail_to)

        self.event_type = event_type
        self.mail_to = mail_to

    def get_event_type(self) -> str:
        return self.event_type

    def get_subscriber_info(self) -> dict:
        return {
            "type": "mail",
            "email_to": self.mail_to,
        }

抽象クラス Subscriber

インターフェースとして notifyvalidate を持ち、実際は infra に委譲しています。

MailSubscriber

メール通知を受け取る Subscriber です。コンストラクタでメールアドレスを受け取ります。

Infrastructure

infrastructure.py
import abc

from loguru import logger


class NotificationInfrastructure(abc.ABC):
    @abc.abstractmethod
    def send(self, destination: str | list[str], title: str, content: str):
        raise NotImplementedError

    @abc.abstractmethod
    def validate_destination(self, destination: str | list[str]):
        raise NotImplementedError


class NullNotificationInfra(NotificationInfrastructure):
    """
    何もしない通知インフラストラクチャ
    """

    def send(self, destination: str | list[str], title: str, content: str):
        pass

    def validate_destination(self, destination: str | list[str]):
        pass


class MailNotificationInfra(NotificationInfrastructure):
    def send(self, destination: str | list[str], title: str, content: str):
        # 具体的なメール送信処理はここでは省略。
        logger.info(f"Sending email to {destination}")

    def validate_destination(self, destination: str | list[str]):
        if not isinstance(destination, list):
            destination = [destination]
        for email in destination:
            if not "@" in email:  # This is a simple check, not a full validation
                raise ValueError(f"Invalid email address: {email}")

抽象クラス NotificationInfrastructure

通知インフラストラクチャのインターフェースです。sendvalidate_destination を持ちます。
send は通知を送信するメソッドで、validate_destination は送信先のバリデーションを行うメソッドです。

今回はサンプルとして MailNotificationInfraNullNotificationInfra を実装しています。
ここで Slack など他の通知手段を実装することもできます。

MailNotificationInfra

メール通知のインフラストラクチャです。send はメールの送信処理を行いますが、ここではloggerで出力するだけに留めています。

実際には、SendGrid や AWS SES などのメール送信サービスを利用することになるでしょう。

また、validate_destination はメールアドレスのバリデーションを行いますが、ここでは簡易的なチェックに留めています。実際は Pydantic の EmailStr などを使って厳密なバリデーションを行うべきです。

NullNotificationInfra

TDDをする際に作った何もしない通知インフラストラクチャです。

NotificationService

notification_service.py
from collections import defaultdict

from app.infrastructure import NotificationInfrastructure
from app.subscribers import Subscriber


class NotificationService:
    def __init__(self):
        self.subscribers: dict[str, list[Subscriber]] = defaultdict(list)
        self.infrastructures: dict[str, NotificationInfrastructure] = {}

    def register_infrastructure(
        self, subscriber_type: str, infrastructure: NotificationInfrastructure
    ):
        """
        NotificationInfrastructure を実装したクラスを登録する。

        :param subscriber_type: str Subscriber タイプ。主に名前
        :param infrastructure: NotificationInfrastructure NotificationInfrastructure を実装したクラス
        :return:
        """

        self.infrastructures[subscriber_type] = infrastructure

    def get_infrastructure(self, subscriber_type: str) -> NotificationInfrastructure:
        return self.infrastructures[subscriber_type]

    def add_subscriber(self, event_type: str, subscriber: Subscriber):
        """
        Subscriber を登録する。
        subscriber_type に対応する NotificationInfrastructure を register_infrastructure で登録する必要がある

        :param event_type: str イベントタイプ
        :param subscriber: Subscriber 登録する Subscriber インスタンス
        :return:
        """
        subscriber_type = subscriber.__class__.__name__
        infrastructure = self.get_infrastructure(subscriber_type)
        subscriber.infra = infrastructure
        subscriber.validate()

        self.subscribers[event_type].append(subscriber)
        
    def notify(self, event_type: str, title: str, message: str):
        """
        登録されている Subscriber に通知を送信する
        :param event_type: str イベントタイプ
        :param title: str 通知タイトル
        :param message: str メッセージ本文
        :return:
        """

        for subscriber in self.subscribers[event_type]:
            subscriber.notify(
                destination=subscriber.destination, title=title, message=message
            )

NotificationService

Infrastructures と Subscribers を持ち、Subscriber に通知を送信する窓口となります。
add_subscriber で Subscriber を登録しますが、対応した NotificationInfrastructure をあらかじめ登録しておこないとvalidateで例外が発生します。

Controller

main.py
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
from loguru import logger

from app.infrastructure import MailNotificationInfra
from app.notification_service import NotificationService
from app.subscribers import MailSubscriber

app = FastAPI()


class EmailSubscribeRequest(BaseModel):
    email: EmailStr
    event_type: str


class Event(BaseModel):
    event_type: str
    data: dict


notification_service = NotificationService()
mail_infra = MailNotificationInfra()
notification_service.register_infrastructure(
    subscriber_type="MailSubscriber", infrastructure=mail_infra
)


@app.post("/subscribe")
async def subscribe(subscribe_req: EmailSubscribeRequest):
    logger.info(f"Subscribing {subscribe_req.email} to {subscribe_req.event_type}")

    mail_subscriber = MailSubscriber(
        event_type=subscribe_req.event_type, mail_to=[subscribe_req.email]
    )
    notification_service.add_subscriber(
        event_type=subscribe_req.event_type, subscriber=mail_subscriber
    )

    return {"email": subscribe_req.email, "event_type": subscribe_req.event_type}


@app.post("/publish")
async def publish(event: Event):
    logger.info(f"Publishing {event.event_type} event")
    logger.debug(event.data)

    notification_service.notify(
        event_type=event.event_type,
        title="New article published",
        message=f"New article published: {event.data}",
    )

    return {"message": "Event published"}

FastAPI で動作するAPIエンドポイントです。 /subscribe でメールアドレスを登録し、 /publish で通知を送信します。

/subscribe ではメールに関する情報しか受け取っていませんが、他の通知手段を提供する場合は改良する必要があるでしょう。

動作確認

購読者登録

Postman を使って /subscribe にリクエストを送信します。

Postman_subscribe
new_article イベントがあった場合に test@example.com に対する通知を登録しています

2025-01-01 17:28:11.659 | INFO     | app.main:subscribe:31 - Subscribing test@example.com to new_article
2025-01-01 17:28:11.659 | INFO     | app.subscribers:infra:26 - Setting infrastructure for MailSubscriber
INFO:     127.0.0.1:52293 - "POST /subscribe HTTP/1.1" 200 OK

リクエストを送信すると、 MailSubscriber を作成し、 NotificationService に登録しています。

通知

次は /publish にリクエストを送信して通知を行います。

Postman_publish

購読した際と同じように、new_article イベントを送信しています。

2025-01-01 17:28:16.524 | INFO     | app.main:publish:45 - Publishing new_article event
2025-01-01 17:28:16.524 | DEBUG    | app.main:publish:46 - {'title': 'Hello world', 'body': "Hey, what's up?"}
2025-01-01 17:28:16.524 | INFO     | app.infrastructure:send:32 - Sending email to ['test@example.com']
INFO:     127.0.0.1:52293 - "POST /publish HTTP/1.1" 200 OK

先ほどの Subscriber は new_article に対する購読をしている為、イベントタイプが異なるリクエストを送信した場合 test@example.com に通知は行われません。

# 🙅 `new_user` への通知なので、上で購読した Subscriber には通知が行われない
2025-01-02 08:56:34.322 | INFO     | app.main:publish:45 - Publishing new_user event
2025-01-02 08:56:34.322 | DEBUG    | app.main:publish:46 - {'title': 'A new user joined', 'body': ''}
INFO:     127.0.0.1:56673 - "POST /publish HTTP/1.1" 200 OK

# 🙆 `new_article` への通知なので、上で購読した Subscriber に通知が行われる
2025-01-02 08:56:45.708 | INFO     | app.main:publish:45 - Publishing new_article event
2025-01-02 08:56:45.708 | DEBUG    | app.main:publish:46 - {'title': 'Hello world', 'body': "Hey, what's up?"}
2025-01-02 08:56:45.708 | INFO     | app.infrastructure:send:32 - Sending email to ['test@example.com']
INFO:     127.0.0.1:56683 - "POST /publish HTTP/1.1" 200 OK

まとめ

Pub/Sub パターンを用いた通知機能を作成しました。

今回は自分なりに設計してみましたが、やっていると抽象クラス(他の言語の場合はインターフェースになるかも)の理解や知識を深める良い題材だなと感じました。
この辺りの理解が深まると中級クラスのエンジニアとしても活躍できると思いますし、Pub/Subパターンを実際に組めると設計の幅も広がるので、ぜひ挑戦してみてください

/以上

GitHubで編集を提案

Discussion