😽

Laravel Queue × Pub/Sub:ドメインを汚さない非同期処理の設計と実装

に公開

はじめに

本記事では、Laravel Queueを活用した非同期処理を、Pub/Subパターンで実装する方法について解説します。主に下記の内容についてまとめていきます。

  • Laravel Queueを活用した非同期処理をどのようにPub/Subパターンで実装したのか
  • なぜPub/Subパターンを使ったのか
  • Pub/Subパターンの欠点
  • Pub/Subパターンを使った場合のテスト容易性を高めるテクニック

Pub/Subパターンについて

Pub/Subパターンとはどのようなパターンなのかについて、生成AIに出力してもらいました。

情報の発信者(Publisher)と受信者(Subscriber)が互いを直接知ることなく、仲介役(Message Broker)を介してメッセージをやり取りする疎結合な仕組みです。

このパターンを使うことにより、発信者と受信者が疎結合になるので、後から簡単に受信者側の処理を追加することが可能です。基本的な処理の流れのシーケンス図を下記に示します。

図1: Pub/Subパターンの処理の流れ

実装例

ここでは、仮想のユースケースである「注文を行う」処理を例に挙げてこのパターンの実装例を示します。今回の実装例の場合、図1中のPublisher・Message Broker・Subscriberはそれぞれ下記になります。

Publisher: Order
Message Broker: DomainEventPublisher
Subscriber: LaravelOrderCompletedMailSubscriber

まず「注文が行われた」ことを表すドメインイベントと、そのイベントを購読するサブスクライバーを下記に示します。

namespace App\Domains\Models\Order;

use App\Domains\Models\Shared\DomainEvent;
use DateTimeImmutable;

// 「注文が行われた」ことを表すドメインイベント
class OrderCreatedEvent implements DomainEvent
{
    private int $eventVersion = 1;

    private string $orderId;

    public function __construct(
        string $orderId,
        private DateTimeImmutable $currentDate = new DateTimeImmutable()
    )
    {
        $this->orderId = $orderId;
    }

    /**
     * イベントのバージョンを返す
     * イベントを変更する際はこの値をインクリメントする
     */
    public function eventVersion(): int
    {
        return $this->eventVersion;
    }

    public function orderId(): string
    {
        return $this->orderId;
    }

    /**
     * イベント発生日時を返す
     */
    public function occurredOn(): DateTimeImmutable
    {
        return $this->currentDate;
    }
}
namespace App\Domains\Models\Order;

use App\Domains\Models\Shared\DomainEvent;
use App\Domains\Models\Shared\DomainEventSubscriber;

// 注文完了メール送信のサブスクライバー
abstract class OrderCompletedMailSubscriber implements DomainEventSubscriber
{
    /**
     * 購読するイベントの型を返す
     */
    public function subscribedToEventType(): string
    {
        // OrderCreatedEventを購読する
        return OrderCreatedEvent::class;
    }

    /**
     * イベントを処理する
     */
    abstract public function handleEvent(DomainEvent $domainEvent): void;
}

OrderCompletedMailSubscriberはあえて抽象クラスとして定義しています。理由は、単体テストで OrderCompletedMailSubscriberをスパイとして扱いたかったためです。スパイの作り方や使用方法は後述しています。

subscribedToEventTypeメソッドには、購読したいイベントのクラス名を指定しています。今回は、OrderCreatedEventを購読したいため、そのクラス名を指定しています。

また、今回はあえてLaravel標準のイベント機能は使わずに実装しています。理由を下記に記載します。

  • ドメイン層やユースケース層がLaravelに依存してしまうのは避けたかったため[1]
  • Laravel標準のイベント機能は少しテストがやりづらく、柔軟な検証が難しいため
  • 高速で純粋な単体テストが行えなくなってしまうため

次に注文を行うユースケースの処理を下記に示します。

namespace App\UseCase\Order;

use App\Domains\Models\Shared\DomainEventPublisher;
use App\Domains\Models\Order\OrderCompletedMailSubscriber;
use App\UseCase\Shared\TransactionExecutor;

class PlaceOrderUsecase
{
    public function __construct(
        private OrderCompletedMailSubscriber $orderCompletedMailSubscriber,
        private DomainEventPublisher $domainEventPublisher,
        private TransactionExecutor $transactionExecutor,
        // 省略
    ) 
    {
        // サブスクライバーの登録
        $this->domainEventPublisher->subscribe($this->orderCompletedMailSubscriber);
    }

    /**
     * 注文を行う
     */
    public function execute(
        string $orderId
    ): void
    {
        // トランザクションの開始
        $this->transactionExecutor->perform(
            function () use (
                $orderId
            ) 
            {
                $order = Order::create(
                    $orderId, 
                    $this->domainEventPublisher,
                    // 省略
                );

                // ここで注文集約を永続化する想定です。
            }
        );
    }
}

ユースケース上で、OrderCompletedMailSubscriberの登録を行っています。
注文集約の実装を下記に示します。

namespace App\Domains\Models\Order;

use App\Domains\Models\Shared\DomainEventPublisher;

class Order
{
    private function __construct(
       private string $orderId
       // 他のプロパティもある想定です。
    ) {}

    /**
     * 注文を作成
     */
    public static function create(
        string $orderId,
        DomainEventPublisher $domainEventPublisher,
        // 他のパラメータもある想定です。
    ): self
    {
        // 「注文が行われた」イベントを作成してpublishします
        $orderCreatedEvent = new OrderCreatedEvent(
            $orderId
        );
        $domainEventPublisher->publish($orderCreatedEvent);
        
        return new self(
            $orderId
            // 省略
        );
    }

    // 他のメソッドもある想定です。
}

集約のcreateファクトリメソッド内で、OrderCreatedEventをpublishをしています。これによって、このイベントを購読しているOrderCompletedMailSubscriberのhandleEventメソッドが呼ばれます。
OrderCompletedMailSubscriberを継承したクラスを下記に示します。

namespace App\Infrastructure\Job\Order;

use App\Domains\Models\Shared\DomainEvent;
use App\Domains\Models\Order\OrderCreatedEvent;
use App\Domains\Models\Order\OrderCompletedMailSubscriber;
use BadMethodCallException;

class LaravelOrderCompletedMailSubscriber extends OrderCompletedMailSubscriber
{
    public function handleEvent(DomainEvent $event): void
    {
        if (!$event instanceof OrderCreatedEvent) {
            throw new BadMethodCallException('無効なイベントでメソッドが呼び出されました。 event: ' . get_class($event));
        }

        OrderCompletedMailJob::dispatch($event)->afterCommit()->onQueue('high');
    }
}

OrderCompletedMailJob::dispatch($event)->afterCommit()->onQueue('high'); の部分で、OrderCreatedEventをQueueに投入し、非同期で注文完了メールを送信する処理を予約しています。ただし、ここで重要なポイントがあります。afterCommitを付けることで、即座にQueueに投入される訳ではなく、注文データのトランザクションがコミットされた後に、Queueに投入されるようにしています。こうすることで、注文データがロールバックされたのに、なぜかメールだけ飛んでしまったといった事態を防ぐことができます。

一応、メール送信を行うジョブクラスも下記に載せておきます。ジョブの部分はLaravelをフル活用して実装しています。

namespace App\Infrastructure\Job\Order;

use App\Domains\Models\Order\OrderCreatedEvent;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class OrderCompletedMailJob implements ShouldQueue, ShouldBeUnique
{
    use Queueable;

    public $uniqueFor;
    
    // リトライ処理やバックオフの設定はhorizon.phpに設定しています。

    public function __construct(
        private OrderCreatedEvent $domainEvent
    ) 
    {
        $this->uniqueFor = (int) config('queue.job_unique_for', 3600);
    }

    /**
     * ジョブの一意識別子を返す
     * 同じ注文IDのジョブが重複実行されないようにするために使用
     */
    public function uniqueId()
    {
        return $this->domainEvent->orderId();
    }

    /**
     * ジョブのタグを返す
     * Horizonでのジョブ監視・管理のために使用
     */
    public function tags(): array
    {
        return [
            'orderCompletedMail:' . $this->domainEvent->orderId(),
            'type:orderCompletedMail',
        ];
    }

    /**
     * ジョブを実行する
     */
    public function handle(): void
    {
        // 注文完了メール送信処理がここに実装される想定
    }
}

最後に、イベントのPub/Subで共通で使われているモジュールを記載します。下記のモジュールは、こちらのリポジトリ[2]の実装を参考に作成しています。

namespace App\Domains\Models\Shared;

class DomainEventPublisher
{
    /**
     * @var DomainEventSubscriber[]
     */
    private array $subscriberList = [];

    /**
     * サブスクライバーを購読登録する
     */
    public function subscribe(DomainEventSubscriber $subscriber): void
    {
        $this->addSubscriber($subscriber);
    }

    /**
     * ドメインイベントを発行する
     * 登録されているサブスクライバーのうち、該当するイベントを購読しているものに対してイベントを配信する
     */
    public function publish(DomainEvent $domainEvent): void
    {
        foreach ($this->subscriberList as $subscriber) {
            if (
                $this->canHandleEvent($subscriber, $domainEvent)
            ) {
                $subscriber->handleEvent($domainEvent);
            }
        }
    }

    /**
     * サブスクライバーをリストに追加する
     */
    private function addSubscriber(DomainEventSubscriber $subscriber): void
    {
        $this->subscriberList[] = $subscriber;
    }

    /**
     * サブスクライバーが指定されたイベントを処理できるかを判定する
     */
    private function canHandleEvent(
        DomainEventSubscriber $subscriber,
        DomainEvent $domainEvent
    ): bool
    {
        return $subscriber->subscribedToEventType() === get_class($domainEvent) ||
                $subscriber->subscribedToEventType() === DomainEvent::class;
    }
}
namespace App\Domains\Models\Shared;

use DateTimeImmutable;

interface DomainEvent
{
    public function eventVersion(): int;
    
    public function occurredOn(): DateTimeImmutable;
}


interface DomainEventSubscriber
{
    public function subscribedToEventType(): string;

    public function handleEvent(DomainEvent $domainEvent): void;
}

上記の一連の処理の流れのシーケンス図を下記に載せておきます。

図2: laravel Queueを活用したPub/Subパターンの処理の流れ

なぜPub/Subパターンを使ったのか

1. 後から柔軟にサブスクライバーの処理を追加したかったため

今後、大規模な改修が予定されているシステムであり、あるイベントが起こった際に、例えばメルマガ更新処理といった、非同期で更新可能な様々な処理を追加していく必要があります。このパターンを使うことで、新しいサブスクライバーのクラスを作成して、そのクラスを登録(subscribe)するだけで、そのような処理を柔軟に追加できます。既存の処理はほとんど変更する必要がありません。この、最小限の労力で後から安全に処理を追加できる点が、後に控えている大規模な改修をより簡単にするのではないかと考え、このパターンを採用しました。

2. 責務を小さく保ちたかったため

このパターンを使うことにより、例えば、「注文を行う」ユースケースであればその責務だけに集中できます。「注文完了メール送信」などの他の責務が混在しません。
現在改修を行っているシステムが、1つのユースケース上で様々な処理を行う必要があります。そのまま実装してしまうと1つのユースケース上の処理が巨大化し、可読性や変更容易性が低下してしまう懸念がありました。このパターンの利点である、関心事を小さく保てることが、まさにこの懸念事項を解決できる利点だと考え、このパターンを採用しました。

Pub/Subパターンの欠点

Pub/Subパターンの欠点について、全体像の把握が難しいという点が挙げられます。
Pub/Subパターンの場合、単純に上から下にコードを読んでも処理が追いづらいです。特に、publishメソッドの呼び出し元と、それを受け取るSubscriberがコード上で直接繋がっていません。処理の全体像を理解するには、どうしてもある程度のPub/Subパターンの前提知識が必要になってしまいます。

Pub/Subパターンのテスト戦略

上記例のOrderCompletedMailSubscriberを抽象クラスとして定義したことによって、単体テストからサブスクライバーをスパイとして使うことができます。まずスパイの実装を下記に示します。

class OrderCompletedMailSubscriberSpy extends OrderCompletedMailSubscriber
{
    private bool $wasCalled = false;

    private OrderCreatedEvent $receivedEvent;

    public function handleEvent(DomainEvent $event): void
    {
        if (!$event instanceof OrderCreatedEvent) {
            throw new BadMethodCallException('無効なイベントでメソッドが呼び出されました。 event: ' . get_class($event));
        }

        $this->wasCalled = true;
        $this->receivedEvent = $event;
    }

    /**
     * テストでサブスクライバーが正しく実行されたことを検証するために使用
     */
    public function wasCalled(): bool
    {
        return $this->wasCalled;
    }

    /**
     * テストで正しいイベントが受信されたことを検証するために使用
     */
    public function actualOrderId(): string
    {
        return $this->receivedEvent->orderId();
    }
}

上記のスパイは下記のように単体テストから使うことができます。

class PlaceOrderUsecaseTest extends TestCase
{
    private OrderCompletedMailSubscriberSpy $subscriberSpy;
    
    private PlaceOrderUsecase $usecase;

    protected function setUp(): void
    {
        $this->subscriberSpy = new OrderCompletedMailSubscriberSpy();
        $this->usecase = new PlaceOrderUsecase(
            $this->subscriberSpy,
            new DomainEventPublisher(),
            new TestTransactionExecutor()
        );
    }

    public function test_注文を行った際に、注文完了メールの送信がサブスクライブされる()
    {
        // given
        $orderId = 'order-1234';

        // when
        $this->usecase->execute(
            $orderId
        );

        // then
        $this->assertTrue($this->subscriberSpy->wasCalled());
        $this->assertSame($orderId, $this->subscriberSpy->actualOrderId());
    }
}

スパイを使ったことによって、下記の利点があります。

  • Mockの設定を行う必要がなく、Mockを使った場合と比べて可読性が高い
  • より本番に近い実装で単体テストが行える
    例えば、サブスクライバーにMockを使ってしまうと、下記のような判定条件を含めてテストを行うことができない。
if (!$event instanceof OrderCreatedEvent) {
    throw new BadMethodCallException('無効なイベントでメソッドが呼び出されました。 event: ' . get_class($event));
}
  • 一般的にモックよりスパイの方が実装の詳細に依存しないので、テストコードのリファクタリングの耐性が高まる

まとめ

本記事では、Laravel Queueを活用した非同期処理を、Pub/Subパターンで実装する方法について解説しました。本記事で紹介した実装の重要なポイントを下記にまとめます。

  • ドメイン層とユースケース層をLaravelに依存させずに、Pub/Subパターンを実装
  • Pub/Subパターンを用いることで、Subscriber側の処理を後から柔軟に追加することが可能
  • Subscriberを抽象クラスとして定義することで、テスト時にモックではなく「スパイ」として使用可能

もちろん、Pub/Subパターンには「処理の全体像がコード上で追いづらくなる」という欠点も存在します。しかし、ドメインが複雑で今後もずっと長く使い続けていくようなシステムにおいては、この疎結合なパターンがもたらす保守性の高さは、この欠点を補って余りあるメリットを提供してくれます。

脚注
  1. 過去に経験したプロジェクトで、システムのビジネスロジック部分がCodeIgniter3などのレガシーなフレームワークに依存してしまっており、それらのフレームワークから脱却したいのにコストが高すぎて脱却できていない事例を経験したことがあります。ドメインが複雑でずっと長く使い続けていくようなシステムに関しては、より低コストで安全に他のフレームワークに移行できるように、システムのコア部分はフレームワークに依存しないように開発することが非常に重要だと考えています。こちらに関しては、こちらの書籍[3]に詳しく書かれています。 ↩︎

  2. https://github.com/VaughnVernon/IDDD_Samples/blob/master/iddd_common/src/main/java/com/saasovation/common/domain/model/DomainEventPublisher.java ↩︎

  3. Robert C. Martin他, Clean Architecture, 2018年7月27日, 株式会社ドワンゴ, P269~272 ↩︎

TREASURE FACTORY TECH BLOG

Discussion