🐙

イベントソーシングフレームワーク、Sekibanの開発に至る経緯と開発中の試行錯誤

2024/06/13に公開

株式会社ジェイテックジャパン CTOの高丘 @tomohisaです。この記事は私たちジェイテックジャパンの作成しているイベントソーシング・CQRSフレームワークがリリースされてしばらく経ったので開発に至るきっかけやこれまでの歴史をまとめてみました。

Sekiban開発に至る経緯

複雑さに対応するアーキテクチャの必要性

当初、私たちはシンプルなMVCで開発を行っておりました。しかしながら、Fatコントローラーが生じる問題や、モデルにビジネスロジックをまとめる難しさから、メンテナンスが困難なシステムへと変化してしまいました。

そこで、弊社ではDDDというドメイン駆動設計を導入しました。初期段階では「軽量DDD」もしくは「とりあえずレイヤー化したオニオンアーキテクチャ」でした。しかし、リポジトリの抽象化によって、コードのテストが劇的に書きやすくなりました。また、多くのビジネスロジックがドメインにまとめられました。しかし、ドメイン層とアプリケーション層(ユースケース)の切り分けは、難易度が高く、チーム内でも統一が難しいと感じています。

層を分けることにより、ドメイン層のデータクラス(テーブルのレコード)と同じ型をアプリケーション層で用いず、またプレゼンテーション層でも使用しない方が理想となります。その結果、"詰め替え"のコードが増加する傾向が見受けられます。

中規模のシステムにおいて、シングルデータベースシステムを用いる場合、多数のユーザーがデータにアクセスする状況では、トランザクションエラーが生じてしまうことが避けられません。これは、SQLの運用費用を高めることで一部改善することが可能ですが、費用面での負担が大きくなり、また全ての問題が解決するわけではありません。

大きめのシステムを作成した後に反省会を開き、アーキテクチャに関するディスカッションを行うことになりました。そのために、社内で "Zadankai_Backend"というチャンネルを作成しました。

いくつかのキーワードを皆で調査し、議論していきました。

  • gRPC
  • GraphQL
  • イベント駆動
  • 集約
  • ドキュメントデータベース NoSQL
  • SignalR
  • イベントソーシング
  • CQRS
  • マイクロサービス
  • 宣言的プログラミング
  • タスクベースUI

複雑、大規模なシステムになるにあたり、必要となる、結果整合性やイベントソーシング

いろいろ調べている中でドキュメントDBであるCosmos DBや、それを用いて、結果整合性でデータの一貫性を保つ方法について調べることとなり、イベントソーシングについての情報を調べることとなりました。

このようなメッセージがありました。

昨日見つけた記事をメモ代わりに貼っておきます。分散システム、つまり非同期・疎結合なアーキテクチャにおいては、トランザクションによってではなく、結果整合性によってデータの一貫性を保つ、という話がとても簡潔にまとまっています。記事内の、実際の世界では、トランザクション一貫性のワークフローのほうが珍しいという文には考えさせられます。個人的にはこの意見に同意です。
ちなみに結果整合性という考え方自体は新しいものではないはずです。...

https://www.infoq.com/jp/news/2015/10/domain-events-consistency/

イベントソーシングについて調べていくと理論的にとても納得できるもので、実際に作ってみて試そうということになりました。イベントソーシングについて調べた後でのミーティングでは以下のようなメッセージを書いていました。

面白いのが、イベントが発生した日時=コマンド日時ではないということです。たとえば今年度の計算式のパラメータを指定してるとしたら、変更したのが年度末でも、それは年度はじめのデータを変更しているということになり、それによって、4/1-3/31のデータをProjectionすれば良いということです。...
考えてみたらなんかすごく良いことに感じてきました!

なんかDDDの意味がやっとわかってきた気がします。マスタの修正にしても、今のデータを変えることにより過去のデータが壊れたりしていましたが、ESを正しく構築すれば、色々正しく扱えますね

面白いですね。だんだんと全部繋がっていきますね。そしてスナップショット再構築は、非同期で、且つ自動リトライができる環境でないと安心して実行できない処理と思います(複雑さによって違う部分はありそうですが)。サーバーレスな世界を志向することで初めて利点が分かってきた感じがあります。

そうですね。よくよく色々考えてやっとつながってきました

2022年の3月に、既存システムの1つの機能に対してイベントソーシングを導入してみました。するとわずか1週間程度で、イベントソーシングを用いた新機能の実装が可能となりました。この実装は、Zadankaiを立ち上げてから1ヶ月後に、他のオープンソースを参考に行いました。

その機能を別リポジトリに切り出し、共通で利用していけるライブラリとして実装していき、社内プロジェクトとして使えるライブラリとして約1年半運用しました。Sekibanとしてオープンソース化すると考え始めたのが2022年の末くらいで、その後、オープンソース化の準備をして、2023年12月にオープンソースリリースをしました。

こちらが初期に作成したイベントソーシングの1つの集約の実装です。基本的には、AggregateRootのファイルの内部にほとんど全ての機能が包含されていました。そのため、大きな機能が追加されると1つのファイルが巨大化する可能性があるという懸念が存在していました。また、例えば無名コンストラクタがないとJSON保存時にエラーになったりするため、深く理解していないと使えないなども問題が存在しました。

using DomainCommon.Domain.Model.Blobs;
using DomainCommon.Domain.Model.EventSourcings;
using RunningLogContext.Domain.Model.Facilities;
using RunningLogContext.Domain.Model.ImportCSVProcesses;
using RunningLogContext.Domain.Model.Imports;
using RunningLogContext.Domain.Model.InputCSVProcesses.Events;
using RunningLogContext.Domain.Model.Modules;
using System;
using System.Collections.Generic;
using System.Linq;
namespace RunningLogContext.Domain.Model.InputCSVProcesses;

public class InputCSVProcess : AggregateRoot, IDTOAccess<InputCSVProcess, InputCSVProcessDTO>
{
    private FacilityId FacilityId { get; set; }
    private ModuleInputCSVId InputCSVId { get; set; }
    private List<InputCSVProcessFile> Files { get; set; } = new List<InputCSVProcessFile>();

    public InputCSVProcess(Guid aggregateId) : base(aggregateId) { }

    public InputCSVProcess(Guid commandId, FacilityId facilityId, ModuleInputCSVId inputCsvId)
    {
        AddAndApplyEvent(new InputCSVProcessCreated(commandId, facilityId, inputCsvId));
    }
    // dynamic を通してアクセスされるため、参照がつかない
    public void Apply(InputCSVProcessCreated @event)
    {
        FacilityId = new FacilityId(@event.FacilityId);
        InputCSVId = new ModuleInputCSVId(@event.InputCSVId);
    }

    public void StartProcess(Guid commandId, FacilityId facilityId, BlobFileId blobFileId, DateTime ClientDetectedTargetDataDate)
    {
        var @event = StartProcessWithoutProjection(commandId, facilityId, blobFileId, ClientDetectedTargetDataDate);
        Apply(@event);
    }
    // snapshotとの共用ができないため、privateとしている。AutoSnapshotを使わない場合は、public にすることも可能
    private InputCSVProcessFileAdded StartProcessWithoutProjection(
        Guid commandId,
        FacilityId facilityId,
        BlobFileId blobFileId,
        DateTime ClientDetectedTargetDataDate)
    {
        var @event = new InputCSVProcessFileAdded(commandId, facilityId, blobFileId, ClientDetectedTargetDataDate);
        AddEvent(@event);
        return @event;
    }
    // dynamic を通してアクセスされるため、参照がつかない
    public void Apply(InputCSVProcessFileAdded @event)
    {
        if (!Files.Any(
            m =>
                m.BlobFileId.Value == @event.BlobFileId && m.ClientDetectedTargetDataDate == @event.ClientDetectedTargetDataDate
        ))
        {
            Files.Add(
                new InputCSVProcessFile()
                {
                    BlobFileId = new BlobFileId(@event.BlobFileId),
                    ClientDetectedTargetDataDate = @event.ClientDetectedTargetDataDate
                });
        }
    }
    public void EndProcess(Guid commandId, FacilityId facilityId, BlobFileId blobFileId, DateTime ClientDetectedTargetDataDate)
    {
        var @event = EndProcessWithoutProjection(commandId, facilityId, blobFileId, ClientDetectedTargetDataDate);
        Apply(@event);
    }
    public InputCSVProcessFileRemoved EndProcessWithoutProjection(
        Guid commandId,
        FacilityId facilityId,
        BlobFileId blobFileId,
        DateTime ClientDetectedTargetDataDate)
    {
        var @event = new InputCSVProcessFileRemoved(commandId, facilityId, blobFileId, ClientDetectedTargetDataDate);
        AddEvent(@event);
        return @event;
    }
    // dynamic を通してアクセスされるため、参照がつかない
    public void Apply(InputCSVProcessFileRemoved @event)
    {
        var file = Files.FirstOrDefault(
            m =>
                m.BlobFileId.Value == @event.BlobFileId && m.ClientDetectedTargetDataDate == @event.ClientDetectedTargetDataDate
        );
        if (file != null)
        {
            Files.Remove(file);
        }
    }
    protected override dynamic DTOInternal()
    {
        return DTOFromAggregateRoot();
    }
    protected override void ApplySnapshotInternal(dynamic snapshot)
    {
        ApplySnapshotDTO(snapshot);
    }

    public InputCSVProcessDTO DTOFromAggregateRoot()
    {
        return new InputCSVProcessDTO(this)
        {
            InputCSVId = InputCSVId.Value,
            FacilityId = FacilityId.Value,
            Files = Files.Select(
                    m => new InputCSVProcessFileDTO()
                    {
                        BlobFileId = m.BlobFileId.Value,
                        ClientDetectedTargetDataDate = m.ClientDetectedTargetDataDate
                    })
                .ToList()
        };
    }
    public void ApplySnapshotDTO(InputCSVProcessDTO snapshot)
    {
        FromSnapshot(snapshot);
        InputCSVId = new ModuleInputCSVId(snapshot.InputCSVId);
        FacilityId = new FacilityId(snapshot.FacilityId);
        Files = snapshot.Files.Select(
                m =>
                    new InputCSVProcessFile()
                    {
                        BlobFileId = new BlobFileId(m.BlobFileId),
                        ClientDetectedTargetDataDate = m.ClientDetectedTargetDataDate
                    })
            .ToList();
    }

    public override Type GetDTOType()
    {
        return typeof(InputCSVProcessDTO);
    }
    /// <summary>
    /// イベント種別 (Staticメソッドが継承できないので、インスタンスメソッドとしています)
    /// </summary>
    /// <returns></returns>
    public override IList<Type> GetEventTypes()
    {
        return ListOfEvents();
    }
    public override int? AutoSnapshotCount()
    {
        return 20;
    }
    /// <summary>
    /// イベント種別
    /// </summary>
    /// <returns></returns>
    public static IList<Type> ListOfEvents()
    {
        var all = new List<Type>(GetCommonEventTypes());
        all.AddRange(
            new List<Type>()
            {
                typeof(InputCSVProcessCreated),
                typeof(InputCSVProcessFileAdded),
                typeof(InputCSVProcessFileRemoved)
            });
        return all;
    }

    /// <summary>
    /// 集約の子エンティティは集約なので、エンティティ内に書く
    /// </summary>
    private class InputCSVProcessFile
    {
        public BlobFileId BlobFileId { get; set; }
        /// <summary>
        /// アプリ側で対象日付と考えた日時
        /// </summary>
        public DateTime ClientDetectedTargetDataDate { get; set; }
    }
}

初期バージョンを使ってみての感想

開発を始めてしばらくしてこのようなメッセージを書いていました。

この座談会スレッドを読み返してみたのですが、最初はDDDを効率的に書いて、gRPCやSignalRで値を戻す方法について色々考えていたのですが、Cosmos DBが面白いと話題を出されてから、しばらく、試行錯誤していましたがイベントソーシングのキーワードを見つけました。Greg Youngの話を聞いてから、これはいけると思い、実装してみたという流れでした。個人的には過去二十数年やってきた、「データベースを使用したシステム開発」を壊して組み立て直したような、コンセプトの違いと感じています。

作って数ヶ月の段階でしたが、システムとしての可能性を感じたというのと、これまでのやり方とは全然違うもののこれからのスタンダードになりそうな仕組みになると感じて、フレームワークとして完成度を高めていくことにしました。

Sekiban開発中の試行錯誤

社内でも名前を考え、「Sekiban」という、一度書いたらそれが長期間残るイメージを持ったフレームワークとして名付けて開発を始めました。以下にSekibanを作るにあたり試行錯誤した点を書いていきます。

Value Objectの扱いをどうするか?

Value Object の役割、バリデーションとクラスによるビジネスロジックの定義をどのように両立するかについて議論になりました。
イベント構築の際に例外を投げてはいけないというルールがあるため、Value Objectをどこに配置するかの調整が必要になります。Sekibanでは、.NETのValidation機能を活用し、属性(Attribute)ベースで行うバリデーションを推奨しています。それにより、イベントの構築時には検証が実行されないため、エラーがthrowされず、eventの中にも、集約の中にもValueObjectを含めることができるようになっています。

宣言的プログラミングをどのように適用していくか

検証を属性(Attribute)を使用して行う
属性で指定する値の許容値を属性で定義することにより、手続型ではなく、宣言でどのような値であるかを示すことができるようになりました。

パターンマッチング if {} else if ()と記述する代わりに、パターンマッチングを用いて値まで含まれる動作パターンを定義することで、宣言的でシンプルなプログラムを構築していくことが可能となります。

サブクラスで業務オブジェクトを表現する RDBで状態を表現していると、複雑なデータの表現に際して、nullableで並列に表現する傾向があります。しかし、SekibanではデータをJSONで保存しており、JSONで表現可能な形式であれば、ポリモーフィズム的な解決策が使用できます。
JsonDerivedType属性を使用すると、インターフェースや基本型をイベントや集約に持たせ、その実装型を具体的な各イベント内に設定することが可能となります。この記述方法はパターンマッチングとの相性が良いです。

宣言的に依存関係を定義する
DomainDependencyというファイルで該当のプロジェクト内のイベントソーシングの集約、コマンド、クエリを簡単に定義し、実際の使用コードやテストから容易に呼び出すことが可能になります。

オブジェクト指向プログラミングから関数型プログラミングへ

Greg Young およびその他多くのサンプルコードは、集約をオブジェクトとして表しています。そのため、集約オブジェクト内にメソッドがあり、それらのメソッドを使用してイベントを生成したり、イベントリストから集約を構成したり、集約の保存用のDTOに変換したりしていました。しかし、イベントソーシングの概念自体は関数型の概念によって表されているため、Sekibanでは"Opinionated"な形で各APIを関数型の形にバージョンアップしていきました。

多くの機能で使用されていた、継承をインターフェースと実装に変更することにより、各クラスの機能が明確になり、継承元オブジェクトにより内容が変わるなどの心配をしなくて良くなりました。

社内用や、プロジェクト内コードであれば多くの変更を伴うバージョンアップは考えませんでしたが、オープンソース化のために、新規プロジェクトで理想的な記述ができるように破壊的な変更を入れていきました。

  • 集約 Aggregate : 多くの機能を持つオブジェクトから、集約の現在のデータを表すイミュータブルなデータクラスへ変更いたしました。集約はその集約にアサインされたイベントからしか変更できないため、privateプロパティを作って、集約オブジェクト内のメソッドからしか変更できないという制限を作り、表示用にDTOを作る必要がなくなりました。

  • イベント : Aggregate' = f(Aggregate', event) で表されます。現在のAggregateにイベントを適用するとAggregateの状態が新しいものとなります。このメソッドをイベントに定義できるようにすることにより、Aggregateを簡略化いたしました。

  • コマンド : Sekibanのコマンドは、1つの集約(集約種別+集約ID:Guid)にしか変更を出来ないように設計されています。初期のオブジェクト指向で書かれていた時は、コマンドから集約内のメソッドを呼んでいて、どこまでの処理をコマンドに書いて、どこまでの処理を集約オブジェクト内に書くかのルールが不明瞭でした。集約をデータ化して、集約ないメソッドをなくしたため、コマンドがイベントを返す役割を担うことになりました。Sekibanはコマンドから返されたイベントを指定した集約IDで保存します。

追記 2024/6/13

https://x.com/tomohisa/status/1801117285442363463

Commandに関しても、ResultBoxを使用してリファクタリングを行いました。新しい書き方なのでどれくらいなれるかは未知数ですが、試してみたいと思います。

CQRSの実現と実装

イベントソーシングではイベントを元に読み取り用データベースを構築していきますが、大きな手法として以下の2つがあります。

  • リードモデルデータベース: 読み込み用のRDBデータベースを作成し、それを読み込みます(複雑、大規模システム向き)
  • ライブプロジェクション: 読み込み用のプロジェクションをインメモリで作成し、それにアクセスします(シンプル、中小規模システム向き)
    Sekibanでは現在ではライブプロジェクションをサポートしていて、各開発者がリードモデルデータベースの生成は可能です。

プロジェクションのデータから問い合わせを行うために、クエリを定義します。簡単にプロジェクションからデータを取得できるようになっているため、使用状況に合わせたデータを簡単に取得可能です。

メソッドと名前の一貫性

コマンド、クエリ、テストで1つの機能が多くの場所で呼ばれていますが、作っている過程で名前の一貫性が崩れることがあります。しかし、Sekibanでは何度か全体チェックを行って、名前の一貫性を修正してきました。

Opinionatedな機能

最近、多くのオープンソースライブラリで"Opinionated"であることを明記していますが、意味としては、「意見は分かれるところだけど、自分たちはこの方法が好きでやっている」という意思表示と捉えています。Sekibanも多くの点で私を含めたコアメンバーの好みを採用しているので、いわゆる一般の教科書的な回答とは違うかもしれません。

シナリオベースのテストフレームワーク

作成、変更、削除などのプロセスの表現をしてテストをする場合に、前のテストが成功したという前提で、そのテストの次にこの作業した時というケースが非常に多いです。Sekibanでは、そのようなシナリオテストを含め、多くの状況に対応したテストを簡単に記述して実行することができるテストフレームワークを準備しています。

データベースにデータを保存せずにインメモリでイベントソーシングを実行して結果を出すため、コードのロジックのテストを早く確実に行うことができます。

コマンドからWEB APIの自動生成

CQRSをイベントソーシングで実装する場合、多くの場合、コマンドにはその時に必要なパラメータだけが含まれています。名前変更機能であれば、集約Idと変更後名称だけあれば十分です。そのため、この情報をさらにプレゼンテーション層で別のDTOに詰め替えるという必要がなくなります。多くの場合、APIはコマンドをそのままマッピングしたものとすることができます。それらを簡単に追加する機能を作ったものが、Sekiban.Webとなります。

複数の集約に変更を加える機能を作る場合は、APIを定義してその中からいくつかの主役を呼び出す方法をとったり、イベント駆動でピタゴラスイッチ的にイベントの副作用を定義していくことによっても実現可能です。

Domain Modeling Made Functional からの影響

関数型でF#を使用してドメインモデリングをおこなうDMMF本につきまして、ポリモーフィズムを使って型合成を行う方法やエラーをResult型で返すなど、Sekiban内部やSekibanの使用例で活用しております。それもあり、Railway Oriented ProgrammingライブラリのResultBoxの開発も行いました。

https://github.com/J-Tech-Japan/ResultBoxes

Domain Modeling Made Functional 本は2024年6月に和訳本「関数型ドメインモデリング」が発売されるので楽しみですね。

https://www.amazon.co.jp/dp/4048931164

マルチデータベース対応

Dynamo DBおよびPostgresの対応につきましては、ユースケースにより、Azure以外で使用する場合のことを考え、実施いたしました。テストケースで多くのパターンでCosmos, Dynamo, Postgresでそれぞれ同じコードを実行して同じ結果になるテストを準備して、特定の環境でのエラーが発生しないように努力しています。

最新のコードはこのようなものになっています。


public record UserPoint(string Name, string Email, int Point) : IAggregatePayload<UserPoint>
{
    public static UserPoint CreateInitialPayload(UserPoint? _) => new UserPoint(string.Empty,string.Empty,0);
}

public record ChangeUserPointName(
    [property: Required] Guid UserPointId,
    [property: Required] string NameToChange) : ICommand<UserPoint>
{
    // Aggregate Id should be current aggregate.
    public Guid GetAggregateId() => UserPointId;

    public class Handler : ICommandHandler<UserPoint, ChangeUserPointName>
    {
        public IEnumerable<IEventPayloadApplicableTo<UserPoint>> HandleCommand(ChangeUserPointName command, ICommandContext<UserPoint> context)
        {
            if (command.NameToChange == context.GetState().Payload.Name)
                throw new InvalidOperationException("Already have same name as requested.");
            yield return new UserPointNameChanged(command.NameToChange);
        }
    }
}

public record CreateUserPoint(
    [property: Required]string Name, 
    [property:Required, EmailAddress]string Email,
    [property:Range(0,10000)] int Point) : ICommand<UserPoint>
{
    // Assign new Aggregate Id by NewGuid()
    public Guid GetAggregateId() => Guid.NewGuid();

    public class Handler : ICommandHandler<UserPoint, CreateUserPoint>
    {
        public IEnumerable<IEventPayloadApplicableTo<UserPoint>> HandleCommand(CreateUserPoint command, ICommandContext<UserPoint> context)
        {
            yield return new UserPointCreated(command.Name, command.Email, command.Point);
        }
    }
}

public record UserPointCreated(string Name, string Email, int Point) : IEventPayload<UserPoint, UserPointCreated>
{
    public static UserPoint OnEvent(UserPoint aggregatePayload, Event<UserPointCreated> ev) =>
        new(ev.Payload.Name, ev.Payload.Email, ev.Payload.Point);
}

public record UserPointNameChanged(string ChangedName) : IEventPayload<UserPoint, UserPointNameChanged>
{
    public static UserPoint OnEvent(UserPoint aggregatePayload, Event<UserPointNameChanged> ev) =>
        aggregatePayload with { Name = ev.Payload.ChangedName };
}

public record UserPointReceived(int Point, string Note) : IEventPayload<UserPoint, UserPointReceived>
{
    public static UserPoint OnEvent(UserPoint aggregatePayload, Event<UserPointReceived> ev) =>
        aggregatePayload with { Point = aggregatePayload.Point + ev.Payload.Point };
}

public record UserPointUsed(int Point, string Note) : IEventPayload<UserPoint, UserPointUsed>
{
    public static UserPoint OnEvent(UserPoint aggregatePayload, Event<UserPointUsed> ev) =>
        aggregatePayload with { Point = aggregatePayload.Point - ev.Payload.Point };
}

public class DomainDependency : DomainDependencyDefinitionBase
{
    public override Assembly GetExecutingAssembly() => Assembly.GetExecutingAssembly();

    public override void Define()
    {
        AddAggregate<UserPoint>()
            .AddCommandHandler<CreateUserPoint, CreateUserPoint.Handler>()
            .AddCommandHandler<ChangeUserPointName, ChangeUserPointName.Handler>();
    }
}

まとめ

この記事では、これまでイベントソーシング・CQRS フレームワークを開発してきたきっかけと開発中に方針を決めるために行なったいろいろな選択について書いてきました。今までも自社で作るシステムの下回りを作るのが個人的に好きでいろいろ作ってきたのですが、どうしても基本的には1つのシステムのために共通機能を作ってきたので、次のシステムに移行した時に少しずつ改良するという形でした。

しかし今回のSekibanに関しては複数の社内、社外のシステムの下回りとなるために何が必要だろうか、それぞれの機能は正しく動くだろうかと考えてここまで完成に持ってくることができました。イベントソーシングはこれからの開発ではほとんど全てのシステム開発に役に立つものだと考えています。これから業務システムを作るにあたり、選択することができるのであれば、常にSekibanをベースに作っていくことができればと現時点では考えています。

ただ、より多くのシステムで対応できるように大規模システムでの使用を想定した便利な機能を追加していきたいと思っています。これからも引き続き開発を進めていきますのでよろしければSekibanを使ってみてください。

https://www.sekiban.dev

ジェイテックジャパンブログ

Discussion