📑

AkkaのPersistence (Event Sourcing)の用途について

2021/12/17に公開

この記事はScala Advent Calendar 2021の16日目の記事です。

https://qiita.com/advent-calendar/2021/scala

はじめに

Akka Clusterを使った分散システムを実現するために、いくつかのAkkaのモジュールを理解する必要がある。
そのうちの一つにPersistence (Event Sourcing) がある。よく Actor を永続化するために使うということを聞くが、あまり使い所がピンと来なかったので、Akkaのドキュメントと参考文献を読みながらメモを書いていく。

Persistence (Event Sourcing) とは

状態を持ったアクターを永続化できるモジュール。

以下の二種の方法で永続化ができる

  • Journal
      - 更新イベントを1つずつ発生した順番に保存
  • Snapshot
      - その時点でのActorの状態を直接保存

状態を持ったアクターのスナップショットも使うことはできるが(リカバリーする際にスナップショットを活用することで復旧時間を短縮できる。)、重要な概念は実際のアクターのスナップショットを保存するのではなく、イベントのみが保存されること。

永続化するのはコマンドではなく、適用できるか検証した後のイベントを保存するためイベントソーシングである。そして復旧する際は、適用できるイベントが実行されるのでイベントは失敗しない。

参考資料

イベントソーシングとは何か?

イベントソーシングは、アプリケーションの現在の状態を決定する履歴を保存することにより、アプリケーションの状態を永続化する方法です。

MSDNのイベントソーシングの概要より引用

参考資料

なぜ必要なのか?

Cluster Shardingを使う場合に、状態を復元できるようにするために利用する。

Cluster Shardingとは、複数のノードでActorを動かし、時間が経過してActorが動いているノードが変わっても、論理的な識別子を使い相互作用するできるようにするモジュールである。(エンティティアクターと呼ばれるActorを扱う)
エンティティアクターは状態を持ったActorであり、一つのノードで管理されるため、障害時に復元できる必要がある。そこで、Persistence (Event Sourcing)を使い、状態を永続化している。

参考資料

どう実装するのか?

EventSourcedBehaviorを使いイベントソーシング用のアクターを作る

EventSourcedBehavior.applyのシグニチャ

object EventSourcedBehavior {

// 永続的なアクター用のBehaviorを作成するapplyメソッド
  def apply[Command, Event, State](
      persistenceId: PersistenceId,
      emptyState: State,
      commandHandler: (State, Command) => Effect[Event, State],
      eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = ???

EventSourcedBehavior.applyを使う際に以下を渡す。

  • persistenceId
    • イベントをソースとする振る舞いに対する一意な識別子
  • emptyState
    • イベントが処理される前の、エンティティの初期状態
  • commandHandler
    • コマンドをエフェクトにマッピングする
  • eventHandler
    • イベントが永続化されたとき、現在の状態から新しい状態を計算する

サンプルコード


import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}

object MyPersistentBehavior {

  sealed trait Command
  final case class Add(data: String) extends Command
  case object Clear extends Command

  // Eventは過去形で定義している
  sealed trait Event
  final case class Added(data: String) extends Event
  case object Cleared extends Event

  // 最新の5つの状態をもつリスト
  final case class State(history: List[String] = Nil)

  //コマンドハンドラは、Effect永続化する1つまたは複数のイベント(存在する場合)を定義する指示を返します。
  val commandHandler: (State, Command) => Effect[Event, State] = {
    (state, command) =>
      command match {
        // エフェクトはファクトリを使用して作成
        case Add(data) => Effect.persist(Added(data))
        case Clear     => Effect.persist(Cleared)
      }
  }

  //イベントが正常に永続化されると、eventHandlerを使用してイベントを現在の状態に適用することにより、新しい状態が作成されます
  val eventHandler: (State, Event) => State = { (state, event) =>
    event match {
      case Added(data) => state.copy((data :: state.history).take(5))
      case Cleared     => State(Nil)
    }
  }
  

  def apply(id: String): Behavior[Command] =
    EventSourcedBehavior[Command, Event, State](
      // persistenceId 永続アクターの一意の識別子。ofUniqueIdで独自の識別子を作っている
      persistenceId = PersistenceId.ofUniqueId(id),
      // emptyState エンティティが最初に作成されるタイミングを定義
      emptyState = State(Nil),
      // 効果を生成することによってコマンドを処理する方法
      commandHandler = commandHandler,
      // イベントが永続化されたときの現在の状態を指定して、新しい状態を返す
      eventHandler = eventHandler
    )
 }

コードはExample and core APIより引用

終わりに

今回簡単なサンプルのみしかみていないが、Persistence (Event Sourcing)を使うこと自体はとてもシンプルにできそうである。

今回ドキュメントと参考文献を読んでなんとなく理解できたくらいなので、今後Cluster Shardingを使ってみたい。

参考文献

Discussion