イベントソーシングはシンプル!:400行で出来たインメモリ簡易フレームワーク
株式会社ジェイテックジャパン CTOの高丘 @tomohisaです。
現在、Sekibanでのドメインの記述方法をシンプルかつ柔軟に書き換えられないだろうかと考えて、色々検証コードを書いています。2年半前から書いてきたコードの追記、編集で書いてきて、書き方としてもちょっと古くなってきなととともに、リフレクションが多く使われていて、バグも起きやすいコードとなっていたので、面倒に感じたのですが、腰を上げて、考えていたコンセプトを実装するフレームワークを1から書いてみました。
考えていたコンセプトはこちら。
Sekibanを書き始めた時にはあまり慣れていなかった、インターフェースをまとめて作る書き方を最初から取り入れることができたため、なかなかシンプルに書くことができました。
そして書いてみて、記述方法が色々工夫できることを社内で話し合い、形になってきたので、一旦、コンセプト版として、リポジトリにまとめてみました。
それがこちら。
Super Sinple Event Sourcing
基本的に永続化はインメモリのリストにイベントオブジェクトを置いているだけです。ただこの保存機能部分を抽象化すれば、データベースへの対応も簡単なので、今は簡単な形にしています。
集約、プロジェクター、コマンド、イベント、コマンドハンドラーを定義することで、イベントソーシングを簡単に実行して試すことができます。
スレッドの制御を入れていないため、並行実行時の整合性に関してはなにも機能はないですが、スレッドをコントロールすれば、1つの集約では1つの処理しか実行しない形の制御も追加可能です。
また、Web API、コンソール、ユニットテストのサンプルも含まれていて、gifで表示しているように、これだけのコードで、WebAPIも実装されています。
コードはシンプルなもので、
- フレームワーク 400行程度
- ドメインコード 130行程度
- Web API 30行程度
- コンソールアプリ 40行程度
- テスト 100行程度
合計700行程度の短いプログラムです。
コードはC#で書いていますが、難しいコードはあまり書いていないので、他の言語に移植することも簡単かもしれません。ここから作り始めて、永続化やスレッドの制御を入れていけば、イベントソーシングの導入の助けになるかもしれません。
いかに、GithubのReadme.mdの和訳をおきます。サンプルコードに関しても説明していますので宜しかったらご覧下さい。
わからないことなどありましたらXなどで声をかけてください。
Readme.mdの和訳
Sekiban.Pure を使用した簡易的なイベントソーシングのサンプル
これは非常にシンプルなイベントソーシングのデモサンプルです。
イベントソーシングはシンプルなコンセプトです。イベントを保存し、状態はプロジェクタを使って投影できます。
このサンプルにはイベントソーシングライブラリとサンプルドメインが含まれており、イベントソーシングライブラリがどのように作られているか学ぶのに役立ちます。
さらに、簡易的な Web API とスカラー UI、シンプルなコンソールアプリ、シンプルなユニットテストも含まれています。
制限事項
イベントストレージは単純な List<IEvent>
を用いており、これは実行時のみ有効なストレージです。また、パーティションレベルのプロジェクションしか行えず、すべてのアグリゲートを一覧表示するためのライブプロジェクトを行うことはできません。
プロジェクト構成
- Domain :シンプルなイベントソーシングフレームワークとサンプルドメイン
- Web :シンプルな Web API
- Console :シンプルなコンソールアプリ
- Test :シンプルな xUnit テスト
TODO:既存のアグリゲートを照会・一覧表示する機能はまだ含まれていません。
パーツについて
- Aggregate(アグリゲート):状態を保持するための「箱」。状態のライフサイクル中に変化し得ます。
- Command(コマンド):アグリゲートに変更を加えるための命令。コマンドはコマンドハンドラに渡され、単一のパーティションにのみ影響を及ぼします。
- Command Handler(コマンドハンドラ):コマンドからイベントを生成する関数。
- Event(イベント):真実の源泉(ソース・オブ・トゥルース)として保存する事実。
- Partition(パーティション):イベントを保存するためのストリーム。アグリゲートはこのストリームから投影できます。
- Projector(プロジェクタ):イベントを用いてアグリゲートを進化させる関数。新たなアグリゲート状態を返します。
以下はシンプルなブランチの定義例です。
このブランチには登録用のコマンドと名前を変更するためのコマンドがあります。
public record Branch(string Name) : IAggregatePayload;
public record BranchCreated(string Name) : IEventPayload;
public record BranchNameChanged(string Name) : IEventPayload;
public class BranchProjector : IAggregateProjector
{
public IAggregatePayload Project(IAggregatePayload payload, IEvent ev) =>
(payload, ev.GetPayload()) switch
{
(EmptyAggregatePayload, BranchCreated created) => new Branch(created.Name),
(Branch branch, BranchNameChanged changed) => new Branch(changed.Name),
_ => payload
};
}
public record RegisterBranch(string Name) : ICommandWithHandler<RegisterBranch, BranchProjector>
{
public PartitionKeys SpecifyPartitionKeys(RegisterBranch command) => PartitionKeys<BranchProjector>.Generate();
public ResultBox<EventOrNone> Handle(RegisterBranch command, ICommandContext context) =>
EventOrNone.Event(new BranchCreated(command.Name));
}
public record ChangeBranchName(Guid BranchId, string NameToChange)
: ICommandWithHandler<ChangeBranchName, BranchProjector>
{
public ResultBox<EventOrNone> Handle(ChangeBranchName command, ICommandContext context) =>
context.AppendEvent(new BranchNameChanged(command.NameToChange));
public PartitionKeys SpecifyPartitionKeys(ChangeBranchName command) =>
PartitionKeys<BranchProjector>.Existing(BranchId);
}
使用方法
- コンソールでのブランチ登録例
Console.WriteLine("新しいブランチ名を入力してください:");
var inputN = Console.ReadLine();
var responseN = await executor.Execute(new RegisterBranch(inputN)).UnwrapBox();
var aggregateN = Repository.Load<BranchProjector>(responseN.PartitionKeys).UnwrapBox();
Console.WriteLine(JsonSerializer.Serialize(aggregateN.ToTypedPayload<Branch>().UnwrapBox()));
- コンソールでのブランチ名変更例
Console.WriteLine("ChangeName: 変更後の名前を入力してください:");
var input = Console.ReadLine();
if (!string.IsNullOrEmpty(input))
{
var response = await executor.Execute(new ChangeBranchName(responseN.PartitionKeys.AggregateId, input??"")).UnwrapBox();
var aggregate = Repository.Load<BranchProjector>(response.PartitionKeys).UnwrapBox();
Console.WriteLine(JsonSerializer.Serialize(aggregate.ToTypedPayload<Branch>().UnwrapBox()));
}
- Minimal API 定義例
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddOpenApi();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
app.MapScalarApiReference();
}
app.MapGet("/", () => "Hello World!");
app.MapPost("/api/branch/register", async (RegisterBranch command) =>
{
var executor = new CommandExecutor { EventTypes = new DomainEventTypes() };
return await executor.Execute(command).UnwrapBox();
}).WithOpenApi();
app.MapPost("/api/branch/changename", async (ChangeBranchName command) =>
{
var executor = new CommandExecutor { EventTypes = new DomainEventTypes() };
return await executor.Execute(command).UnwrapBox();
}).WithOpenApi();
app.MapGet("/api/branch/{id}", (Guid id) =>
Repository.Load<BranchProjector>(PartitionKeys<BranchProjector>.Existing(id)).Conveyor(aggregate => aggregate.ToTypedPayload<Branch>()).UnwrapBox()).WithOpenApi();
app.Run();
次は?
私たちは、このコンセプトを用いて、"Sekiban" を使った本格的なイベントソーシングフレームワークを構築中です。
https://github.com/J-Tech-Japan/Sekiban
私たちはドメイン記述方法を常に改善中です。上記のコンセプトはまだ開発中ですが、Sekiban は Azure Cosmos DB、Dynamo DB、Postgres を使用することが可能です。小・中規模アプリケーション向けにフル機能のイベントソーシングアプリを構築でき、分散環境拡張に向けても取り組んでいます。
まとめ
2024年12月21日に、日本では初めての、イベントソーシングとCQRSに特化したカンファレンスが開かれます。もう少し空き席もありそうですので、よろしかったらどうぞ。僕も来日してイベントに参加して、登壇も行う予定です。
Discussion