🦔

シンプルなイベントソーシングをRust初心者がC#をまねてRustで作ってみた

2024/12/13に公開

株式会社ジェイテックジャパン CTOの高丘 @tomohisaです。

Sekibanという、C#のイベントソーシングフレームワークを作っています。

https://github.com/J-Tech-Japan/Sekiban

その新しいコンセプト(関数型で効率的な書き方)のために、まず、インメモリで動作する、イベントソーシングのコンセプトをC#で作りました。そちらの記事はこちら。

https://zenn.dev/jtechjapan_pub/articles/f7968a3f2fb6d5

C#で2日くらいでこれができるなら、このコピーでよければ知らない言語でもできるのではないかと思って、作ってみることにしました。色々やってみたい言語があったのですが、まずは Rust で作ってみました。紆余曲折あり、諦めそうになったのですが、なんとか超シンプルな機能で実現できたので、リリースしました。

https://github.com/J-Tech-Japan/SuperSimpleEventSourcing/tree/main/rust

https://zenn.dev/jtechjapan_pub/articles/1cd1ca43701960

https://zenn.dev/jtechjapan_pub/articles/d83e90c20917cd

実行コード

main.rs

    // フレームワークコード
    let mut repo = Repository::new();
    let mut command_executor = CommandExecutor {
        repository: repo,
    };

    // プロジェクター(集約をイベントの追加によって更新していく機能)
    let projector = BranchProjector {};

    // コマンド1 Branchの作成
    let create_branch_command = CreateBranchCommand {
        name: "main".to_string(),
        country: "Japan".to_string(),
    };
    // コマンドだけ定義されていて、機能は関数型で書いている
    let response = command_executor.execute(create_branch_command, &projector, |command| PartitionKeys {
        aggregate_id: Uuid::new_v4(),
        group_: "default".to_string(),
        root_partition_key: "default".to_string(),
    }, |command, context| Some(Box::new(BranchCreated {
        name: command.name.clone(),
        country: command.country.clone(),
    })));

    // コマンド2 Branchの名前変更
    let change_branch_name_command = ChangeBranchNameCommand {
        name: "main2".to_string(),
        partition_keys: response.partition_keys.clone()
    };
    // コマンドだけ定義されていて、機能は関数型で書いている
    let response = command_executor.execute(change_branch_name_command,
                                            &projector,
                                            |command| command.partition_keys.clone(),
                                            |command, context| Some(Box::new(BranchNameChanged {
        name: command.name.clone(),
    })));

    // コマンド3 Branchの国変更
    let change_branch_country_name_command = ChangeBranchCountryNameCommand {
        country: "USA".to_string(),
        partition_keys: response.partition_keys.clone()
    };
    // コマンドのなかに、機能の関数も書いている
    let response = command_executor.execute(change_branch_country_name_command.clone(),
                                            change_branch_country_name_command.get_projector().as_ref(),
                                            ChangeBranchCountryNameCommand::get_partition_keys,
                                            ChangeBranchCountryNameCommand::command_handler);

    println!("Change Name Change Country: {:?}", response);

    let loaded_aggregate = command_executor.repository.load(
        &response.partition_keys,
        &projector
    );
    println!("Loaded Aggregate After Change Country: {:?}", loaded_aggregate);


出力結果:

Loaded Aggregate After Change Country: Ok(
    Aggregate { payload: Branch { 
        name: "main2", country: "USA" }, 
        partition_keys: PartitionKeys { 
            aggregate_id: 8903dac7-c5d6-4c8c-96d2-276dd149414f, 
            group_: "default", 
            root_partition_key: "default" }, 
        version: 3, 
        last_sortable_unique_id: "063869638813381059044864246217" 
    })

基本クラスは以下のものです。

  • Repository : イベントをインメモリに保存したり、保存されたイベントから集約を呼び出す機能(複数のプロジェクタに対して汎用)
  • CommandExecutor : コマンドを実行したら、Repositoryにイベントを保存する、すでに保存された集約に対しては、現在の集約状態を呼び出して、追加のコマンドを定義する

Branchの集約のパーツも、プロジェクト内に定義しています。以下で、集約パーツのコードを紹介します。

ドメインのコード① - イベント

events.rs
#[derive(Debug,Clone)]
pub struct BranchCreated {
    pub name: String,
    pub country: String,
}
impl EventPayload for BranchCreated {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn clone_box(&self) -> Box<dyn EventPayload> {
        Box::new(self.clone())
    }
}
#[derive(Debug, Clone)]
pub struct BranchNameChanged {
    pub name: String,
}

impl EventPayload for BranchNameChanged {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn clone_box(&self) -> Box<dyn EventPayload> {
        Box::new(self.clone())
    }
}

#[derive(Debug, Clone)]
pub struct BranchCountryNameChanged {
    pub country: String,
}
impl EventPayload for BranchCountryNameChanged {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn clone_box(&self) -> Box<dyn EventPayload> {
        Box::new(self.clone())
    }
}

イベントは基本的にはデータクラスなのですが、as_any, clone_boxを定義しないといけないのはちょっと面倒ですが、決まった内容なので、LLMでガンガン作っていけるので、行数ほど大変ではありませんでした。このデータ+時間やパーティションのデータがEventCommonとして、リポジトリに保存されます。

ドメインのコード② - 集約

aggregate.rs
#[derive(Debug, Clone)]
pub struct Branch{
    pub name: String,
    pub country: String,
}

impl AggregatePayload for Branch {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn clone_box(&self) -> Box<dyn AggregatePayload> {
        Box::new(self.clone())
    }
}

イベントと同じく、データと簡単なメソッドです。このデータがイベントで構成されていきます。イベントの集合に対して、複数の集約の種別を定義することも可能ですが、集約を作るためには、次に書く、集約プロジェクターを定義する必要があります。

ドメインのコード③ - 集約プロジェクター

BranchProjector.rs
impl AggregateProjector for BranchProjector {
    fn project(&self, payload: &dyn AggregatePayload, ev: &EventCommon) -> Box<dyn AggregatePayload> {
        // EmptyAggregatePayloadの場合の処理
        if let Some(_) = payload.as_any().downcast_ref::<EmptyAggregatePayload>() {
            let event = ev.payload.clone_box();
            if let Some(branch_created) = event.as_any().downcast_ref::<BranchCreated>() {
                return Box::new(Branch {
                    name: branch_created.name.clone(),
                    country: branch_created.country.clone(),
                });
            } else {
                return (*payload).clone_box();
            }
        }

        // 既存のBranchがある場合の処理
        if let Some(branch) = payload.as_any().downcast_ref::<Branch>() {
            let event = ev.payload.clone_box();
            if let Some(branch_created) = event.as_any().downcast_ref::<BranchCreated>() {
                Box::new(Branch {
                    name: branch_created.name.clone(),
                    country: branch_created.country.clone(),
                })
            } else if let Some(branch_name_changed) = event.as_any().downcast_ref::<BranchNameChanged>() {
                Box::new(Branch {
                    name: branch_name_changed.name.clone(),
                    country: branch.country.clone(),
                })
            } else if let Some(branch_country_name_changed) = event.as_any().downcast_ref::<BranchCountryNameChanged>() {
                Box::new(Branch {
                    name: branch.name.clone(),
                    country: branch_country_name_changed.country.clone(),
                })
            } else {
                Box::new(Branch {
                    name: branch.name.clone(),
                    country: branch.country.clone(),
                })
            }
        } else {
            // Branch でも EmptyAggregatePayload でもない場合はそのまま
            (*payload).clone_box()
        }
    }

    fn clone_box(&self) -> Box<dyn AggregateProjector> {
        Box::new(self.clone())
    }
}

Sekibanにおいて、すべての集約は、EmptyAggregatePayloadで始まります。新規集約を開始する処理に関しては、集約がEmptyAggregatePayloadで特定のイベントが来たときに、新たな集約の型を返します。この場合、BranchCreatedイベントが来た時に、Branch集約に返します。

その他のイベントの時は、Branch集約をキープしたまま、Branch内のデータを変えていきます。この場合は、Branch型をキープしているのですが、型を複数定義して、状態によって型を変えることもできます。

  • ActiveBranch : ユーザーを追加できる
  • InactiveBranch : ユーザーを追加できないが、閲覧はできる
  • DeletedBranch : 削除されたBranch、閲覧もできない
    この型を変えることによって、以下で説明するCommandが、イベントによって特定の型の時だけコマンドを実行できるように構成することができます。(今回のSimpleRustではここまでは実行していません。)
  • ChangeBranchName は、ActiveBranchにしか実行できない
  • MakeBranchInactive はActiveBranchにしか実行できない
  • RestartInactiveBranch はInactiveBranchにしか実行できない
    など。

ドメインのコード④ - コマンドおよびそのハンドラー

commands.rs
pub struct CreateBranchCommand {
    pub name: String,
    pub country: String,
}
impl Command for CreateBranchCommand {}

pub struct ChangeBranchNameCommand {
    pub name: String,
    pub partition_keys: PartitionKeys
}
impl Command for ChangeBranchNameCommand {}
#[derive(Clone)]
pub struct ChangeBranchCountryNameCommand {
    pub country: String,
    pub partition_keys: PartitionKeys
}

impl Command for ChangeBranchCountryNameCommand {}
impl CommandWithHandler for ChangeBranchCountryNameCommand {
    fn get_projector(&self) -> Box<dyn AggregateProjector> {
        Box::new(BranchProjector {})
    }

    fn get_partition_keys(&self) -> PartitionKeys {
        self.partition_keys.clone()
    }

    fn command_handler(&self, context: &CommandContext) -> Option<Box<dyn EventPayload>> {
        let binding = context.get_current_aggregate();
        let branch = binding.payload.as_any().downcast_ref::<Branch>().unwrap();
        if branch.country == self.country {
            return None;
        }
        Some(Box::new(BranchCountryNameChanged {
            country: self.country.clone(),
        }))
    }
}

コマンドの目的および機能は以下のとおりです

  • 入力内容の決定 : Command
  • 指定するパーティションの決定(イベントストリームの指定)
  • コマンドを受け取り、どんなイベントを返すのか、それとも何も返さないのかを決定する: コマンドハンドラー

Commandにはいくつかの定義方法がありますが、主に2つあります。

  1. コマンドの型だけ定義して、その機能はCommandExecutorに直接渡す
    CreateBranchCommand, ChangeBranchNameCommandはこのスタイルで書いています。
  2. コマンドとハンドラーを一緒に定義して利用できるように準備しておく
    ChangeBranchCountryNameCommandはこのスタイルで書いています。
    個人的には、Handlerの責務はドメインが持っているのが良いと思いますので、こちらの書き方を個人的には主に用いています。

コマンドの責務はどのイベントを保存するかを決めるところまでで、その保存されたイベントが集約にどのような影響を与えるかは、Projectorが決定します。

上記の4つのドメインの構成要素を定義したのちに、最初に書いた実行コードを記述することができます。

Rustで書いてみて

Rustはメモリの使用に関して、コードでしっかり指定することによって、ガベージコレクションが不要で、安定したコードの実行ができることを目的として作られています。そのため、上記のドメインコードを書くときも、所有権は借用しているのか、自分のところに残しているのかを考えながら書く必要があります。しかし、逆に考えると、メモリの使用に関しては、インターフェースの方で定義しているので、ドメイン側ではその方針に従って、クローンを作成する形のコードで、インターフェースにあった型の使用をすれば良いので、慣れてくるとコードを書くのは難しくないかもしれません。

個人的にはRustが初めてだったので、かなり悩みながら書きました。ChatGPT頼りで書いていたので、昨日ChagGPTが数時間止まった時はとても困りました。そのタイミングでゆっくりRustのオフィシャルドキュメントを読むことができたのが、助けになったかもしれません。

https://x.com/tomohisa/status/1867130603038179797

ここでも書いたのですが、所有権関係の問題が出た時に、エラーメッセージだけをみて、短絡的な解決策がLLMから出されるのですが、複数のクラスに関連する場合、全てを総合した解決策にはならず、堂々めぐりになることが多かったです。なので所有権に関してはよく理解して、借用なのか所有権を渡すのか、クローンして渡すのかをよく考えて扱う必要があると感じました。

https://x.com/tomohisa/status/1867352506172682415

冗長なコードが多くなるものの、わかっていれば難しくはないと思うので、慣れてくれば色々採用できそうに感じました。

SuperSimpleEventSourcingに関しては、余裕があれば別の言語も書いてみたい。言語や機能のリクエストがあればこちらのコメントやXでのコメントでお願いします!!コメントがあるとやる気が出ます。

https://github.com/J-Tech-Japan/Sekiban

あと、SekibanのGitHubにスターをしてくださるととても喜びます!!

ジェイテックジャパンブログ

Discussion