🚌

MassTransit で実現するイベント駆動アーキテクチャ

に公開

はじめに

PKSHA Technology のソフトウェアエンジニアの小松原です。
私が担当する PKSHA AI ヘルプデスクでは、Sharepoint Online のフォルダ内のドキュメントを自社の RAG サービスに同期する機能を提供しています。
フォルダの同期は以下のステップで処理されます。

フォルダ内ファイルのアップロード → インデクシングリクエスト → ステータス確認 → 完了

各ステップは外部サービスとの通信を伴い、処理に時間がかかります。フォルダ内のファイル数が多い場合、すべてを同期的に処理するとタイムアウトや途中失敗時の再開が困難になります。そこで、イベント駆動アーキテクチャを採用しました。イベント駆動アーキテクチャに移行することで、主に以下の4点を目指しました。

  • 非同期処理
    長時間かかる処理をブロックせずに実行でき、タイムアウトを回避できる
  • 疎結合
    各ステップがメッセージを介して連携するため、ステップ間の依存が小さくなる
  • 再試行の自動化
    外部 API の不調などの一時的なエラーに対し、指数バックオフを伴うリトライを標準機能で実現する
  • 進捗の永続化
    各ステップの完了ごとに状態を保存し、万が一のシステム停止時も「失敗した箇所から」確実に再開できるようにする

この機能は .NET で開発しており、イベント駆動を実現するためのメッセージングフレームワークとして MassTransit を導入しました。本記事では、MassTransit を使ってイベント駆動をどう実現しているのか紹介します。

MassTransit とは

MassTransit は、.NET 向けのメッセージングフレームワークです。Azure の Service Bus や AWS の SQS などのメッセージブローカーを抽象化し、メッセージの送受信、リトライ、状態管理といった機能を統一的な API で提供します。

https://masstransit.massient.com/

主な機能

機能 説明
マルチトランスポート対応 Azure Service Bus、RabbitMQ、Amazon SQS、Kafka など主要なメッセージブローカーを統一的な API で扱える
Saga ステートマシン 複数ステップにまたがる長期トランザクションを宣言的に管理
リトライ・例外処理 一時的なエラーに対する自動リトライ、エラーキューへの退避
スケジューリング メッセージの遅延発行、定期実行
Transactional Outbox メッセージ発行とデータベース操作の一貫性を保証
テストサポート InMemory トランスポートによる高速なユニットテスト

メッセージの定義

イベント駆動アーキテクチャでは、コンポーネント間の連携を「メッセージ」で行います。
MassTransit ではメッセージを C# のクラスやレコードで定義します。たとえば「フォルダの同期を開始してほしい」というリクエストは、次のように定義できます。

public sealed record FolderSyncRequested
{
    public required Guid FolderId { get; init; }
    public required string FolderPath { get; init; }
}

https://masstransit.massient.com/guides/message-contract

メッセージの発行

メッセージを発行するには、いくつかの方法があります。

インターフェース 用途
IPublishEndpoint メッセージの発行(Pub/Sub)
ISendEndpointProvider 特定のエンドポイントへの送信(Point-to-Point)
IBus 上記両方を含む。アプリケーションのエントリポイントで使用
ConsumeContext Consumer 内から発行・送信する場合

Publish すると、メッセージはシリアライズされてブローカーへ送られ、同じ型を購読する Consumer に届けられます。

public class FolderSyncService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public FolderSyncService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task RequestSync(Guid folderId, string folderPath)
    {
        await _publishEndpoint.Publish(new FolderSyncRequested
        {
            FolderId = folderId,
            FolderPath = folderPath
        });
    }
}

Consumer の実装

MassTransit では、メッセージ型ごとに Consumer を登録すると、対応するエンドポイントが自動的に構成され、受信したメッセージが Consume メソッドに配送されます。

https://masstransit.massient.com/guides/message-consumer

Consumer は IConsumer<T> インターフェースを実装して定義します。たとえば IConsumer<FolderSyncRequested> を実装すると、FolderSyncRequested メッセージを受信すると自動的に Consume メソッドが呼び出されます。

public class FolderSyncRequestedConsumer : IConsumer<FolderSyncRequested>
{
    private readonly IFolderSyncService _folderSyncService;

    public FolderSyncRequestedConsumer(IFolderSyncService folderSyncService)
    {
        _folderSyncService = folderSyncService;
    }

    public async Task Consume(ConsumeContext<FolderSyncRequested> context)
    {
        await _folderSyncService.SyncAsync(
            context.Message.FolderId,
            context.Message.FolderPath);
    }
}

ConsumerDefinition による設定

Consumer ごとのリトライポリシーや並行数制限は、ConsumerDefinition クラスで設定できます。

public class FolderSyncRequestedConsumerDefinition : ConsumerDefinition<FolderSyncRequestedConsumer>
{
    public FolderSyncRequestedConsumerDefinition()
    {
        // このエンドポイントで同時に処理するメッセージ数の上限
        // サーバーが複数台ある場合、各インスタンスでこの数だけ同時に処理する
        ConcurrentMessageLimit = 1;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<FolderSyncRequestedConsumer> consumerConfigurator,
        IRegistrationContext context)
    {
        // 一時的なエラー時の自動リトライ
        endpointConfigurator.UseMessageRetry(r => r.Intervals(
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(30)));

        endpointConfigurator.UseInMemoryOutbox(context);
    }
}

Consumer の登録

Consumer と ConsumerDefinition を MassTransit に登録するには、Program.csAddMassTransit を使います。

builder.Services.AddMassTransit(x =>
{
    // Consumer と ConsumerDefinition を登録
    x.AddConsumer<FolderSyncRequestedConsumer, FolderSyncRequestedConsumerDefinition>();

    // トランスポートの設定(Azure Service Bus)
    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host(connectionString);
        cfg.ConfigureEndpoints(context);
    });
});

Saga ステートマシン

Consumer は単発のメッセージ処理には適しています。しかし「フォルダ内ファイルのアップロード → インデクシングリクエスト → ステータス確認 → 完了」のような複数ステップの処理では、進捗状態の管理が課題になります。

MassTransit の Saga ステートマシンは、このような長期にわたるワークフローを状態遷移として管理します。「今はどの状態で、どのイベントが来たら次に進むか」をコードで宣言的に記述できるため、複雑な分岐や例外処理が整理されます。

https://masstransit.massient.com/guides/saga-state-machines

Saga ステートマシンの実装に必要なもの

Saga ステートマシンを実装するには、以下の要素が必要です。

要素 役割 なぜ必要か
イベント Saga ステートマシンが受け取るメッセージ CorrelationId で「どのインスタンスへのメッセージか」を特定するため
状態クラス Saga ステートマシンの現在の状態とデータを保持 現在どこまで進んだかを永続化し、システム再起動後も処理を継続するため
ステートマシンクラス 状態遷移のルールを定義 どのイベントでどの状態に遷移するかを宣言的に記述するため

以下でそれぞれの実装方法を説明します。

イベントの定義

Saga ステートマシンが受け取るイベントには CorrelationId が必要です。Saga ステートマシンは複数のインスタンスが同時に動作します(例:リクエスト A とリクエスト B の処理)。イベントが届いたときに「どのインスタンス宛か」を特定するために CorrelationId を使います。CorrelatedBy<Guid> インターフェースを実装すると、MassTransit が自動で認識します。

// フォルダ同期の開始(ワークフローのトリガー)
public sealed record FolderSyncStarted : CorrelatedBy<Guid>
{
    public required Guid CorrelationId { get; init; }
    public required Guid FolderId { get; init; }
    public required string FolderPath { get; init; }
}

// アップロード完了
public sealed record UploadCompleted : CorrelatedBy<Guid>
{
    public required Guid CorrelationId { get; init; }
}

// インデクシングリクエスト
public sealed record IndexingRequested : CorrelatedBy<Guid>
{
    public required Guid CorrelationId { get; init; }
}

// インデクシング完了
public sealed record IndexingCompleted : CorrelatedBy<Guid>
{
    public required Guid CorrelationId { get; init; }
}

ステートマシンの実装

ステートマシンは以下の 2 つのクラスで構成されます。

  • 状態クラス(SagaStateMachineInstance): Saga ステートマシンの状態を保持するクラス。CorrelationIdCurrentState は必須で、ワークフローに必要なデータをプロパティとして追加する
  • ステートマシンクラス(MassTransitStateMachine<T>): 状態遷移のルールを定義するクラス。どのイベントでどの状態に遷移するかを宣言的に記述する

ステートマシンクラスでは、以下のメソッドで状態遷移を定義します。

メソッド 説明
Initially(...) 新規 Saga インスタンスが最初に受け取るイベントを定義
During(State, ...) 特定の状態で受け取るイベントと遷移を定義
DuringAny(...) どの状態でも受け付けるイベントを定義(エラー処理など)
TransitionTo(State) 指定した状態へ遷移

また、ステートマシンクラスでは以下のプロパティを定義します。

  • State プロパティ: ワークフローが取りうる状態を定義する
  • Event<T> プロパティ: ステートマシンが受け取るメッセージを定義する
public class FolderSyncState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public Guid FolderId { get; set; }
    public string? FolderPath { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime UpdatedAt { get; set; }
}

public class FolderSyncStateMachine : MassTransitStateMachine<FolderSyncState>
{
    // 状態
    public State Uploading { get; private set; }
    public State Indexing { get; private set; }
    public State CheckingStatus { get; private set; }
    public State Completed { get; private set; }

    // イベント
    public Event<FolderSyncStarted> FolderSyncStarted { get; private set; }
    public Event<UploadCompleted> UploadCompleted { get; private set; }
    public Event<IndexingRequested> IndexingRequested { get; private set; }
    public Event<IndexingCompleted> IndexingCompleted { get; private set; }

    public FolderSyncStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(FolderSyncStarted)
                .Then(context =>
                {
                    context.Saga.FolderId = context.Message.FolderId;
                    context.Saga.FolderPath = context.Message.FolderPath;
                    context.Saga.CreatedAt = DateTime.UtcNow;
                    context.Saga.UpdatedAt = DateTime.UtcNow;
                })
                .TransitionTo(Uploading)
                .Publish(context => new UploadStarted
                {
                    CorrelationId = context.Saga.CorrelationId,
                    FolderId = context.Saga.FolderId,
                    FolderPath = context.Saga.FolderPath
                })
        );

        During(Uploading,
            When(UploadCompleted)
                .Then(context => context.Saga.UpdatedAt = DateTime.UtcNow)
                .TransitionTo(Indexing)
                .Publish(context => new IndexingStarted
                {
                    CorrelationId = context.Saga.CorrelationId
                })
        );

        During(Indexing,
            When(IndexingRequested)
                .Then(context => context.Saga.UpdatedAt = DateTime.UtcNow)
                .TransitionTo(CheckingStatus)
                .Publish(context => new StatusCheckStarted
                {
                    CorrelationId = context.Saga.CorrelationId
                })
        );

        During(CheckingStatus,
            When(IndexingCompleted)
                .Then(context =>
                {
                    context.Saga.UpdatedAt = DateTime.UtcNow;
                })
                .TransitionTo(Completed)
        );
    }
}

SagaDefinition による設定

Consumer に ConsumerDefinition があるように、Saga ステートマシンにも SagaDefinition があります。リトライポリシーや並行数制限、エンドポイント名などを設定できます。

public class FolderSyncStateMachineDefinition : SagaDefinition<FolderSyncState>
{
    public FolderSyncStateMachineDefinition()
    {
        // このエンドポイントで同時に処理するメッセージ数の上限(アプリケーションインスタンスごと)
        // サーバーが3台で ConcurrentMessageLimit = 1 の場合、全体では最大3メッセージが同時処理される
        ConcurrentMessageLimit = 1;
    }

    protected override void ConfigureSaga(
        IReceiveEndpointConfigurator endpointConfigurator,
        ISagaConfigurator<FolderSyncState> sagaConfigurator,
        IRegistrationContext context)
    {
        // 一時的なエラー時の自動リトライ
        endpointConfigurator.UseMessageRetry(r => r.Intervals(
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(30)));

        // Saga 内で発行するメッセージをバッファリングし、処理成功時にまとめて送信
        endpointConfigurator.UseInMemoryOutbox(context);
    }
}

ステートマシンの登録

Saga ステートマシンを動作させるには、AddSagaStateMachine で登録し、状態の永続化先を指定します。Saga ステートマシンの状態はデータベースに永続化されるため、アプリケーションが再起動しても処理中のワークフローは継続されます。

MassTransit は以下の永続化先に対応しています。

  • RDB: Entity Framework、Dapper、NHibernate、Marten
  • NoSQL: MongoDB、Azure Cosmos DB、DynamoDB
  • KVS: Redis、Azure Table Storage
builder.Services.AddMassTransit(x =>
{
    // Saga ステートマシンと SagaDefinition の登録(MongoDB に永続化)
    x.AddSagaStateMachine<FolderSyncStateMachine, FolderSyncState, FolderSyncStateMachineDefinition>()
        .MongoDbRepository(r =>
        {
            r.Connection = "mongodb://127.0.0.1";
            r.DatabaseName = "document-sync-db";
            r.CollectionName = "folder-sync-states";
        });

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host(connectionString);
        cfg.ConfigureEndpoints(context);
    });
});

全体アーキテクチャ

ここまで Consumer と Saga ステートマシンを個別に説明してきました。ここでは、これらを組み合わせた全体の流れを説明します。

処理の流れは次のとおりです。

  1. Saga ステートマシンが状態を遷移させ、次のステップを開始するメッセージを発行
  2. Consumer がメッセージを受け取って処理を実行
  3. Consumer が完了イベントを発行
  4. Saga ステートマシンが完了イベントを受信して状態を更新
  5. 1〜4 を繰り返してワークフロー全体を進行

処理の流れ

まとめ

本記事では、MassTransit を使って複数ステップにまたがるワークフローをイベント駆動で実現する方法を紹介しました。
非同期化によるタイムアウト回避、メッセージ連携による疎結合、標準機能のリトライ、そして Saga による進捗の永続化により、外部サービス連携を伴う長時間処理でも安定して再開可能な同期基盤を構築できます。これにより、インフラや状態管理の複雑さを抑えつつ、ビジネス要件に集中して開発できるようになりました。

PKSHAテックブログ

Discussion