AkkaのPersistence (Event Sourcing)の用途について
この記事はScala Advent Calendar 2021の16日目の記事です。
はじめに
Akka Clusterを使った分散システムを実現するために、いくつかのAkkaのモジュールを理解する必要がある。
そのうちの一つにPersistence (Event Sourcing) がある。よく Actor を永続化するために使うということを聞くが、あまり使い所がピンと来なかったので、Akkaのドキュメントと参考文献を読みながらメモを書いていく。
Persistence (Event Sourcing) とは
状態を持ったアクターを永続化できるモジュール。
以下の二種の方法で永続化ができる
- Journal
- 更新イベントを1つずつ発生した順番に保存 - Snapshot
- その時点でのActorの状態を直接保存
状態を持ったアクターのスナップショットも使うことはできるが(リカバリーする際にスナップショットを活用することで復旧時間を短縮できる。)、重要な概念は実際のアクターのスナップショットを保存するのではなく、イベントのみが保存されること。
永続化するのはコマンドではなく、適用できるか検証した後のイベントを保存するためイベントソーシングである。そして復旧する際は、適用できるイベントが実行されるのでイベントは失敗しない。
参考資料
イベントソーシングとは何か?
イベントソーシングは、アプリケーションの現在の状態を決定する履歴を保存することにより、アプリケーションの状態を永続化する方法です。
参考資料
なぜ必要なのか?
Cluster Shardingを使う場合に、状態を復元できるようにするために利用する。
Cluster Shardingとは、複数のノードでActorを動かし、時間が経過してActorが動いているノードが変わっても、論理的な識別子を使い相互作用するできるようにするモジュールである。(エンティティアクターと呼ばれるActorを扱う)
エンティティアクターは状態を持ったActorであり、一つのノードで管理されるため、障害時に復元できる必要がある。そこで、Persistence (Event Sourcing)を使い、状態を永続化している。
参考資料
どう実装するのか?
EventSourcedBehaviorを使いイベントソーシング用のアクターを作る
EventSourcedBehavior.applyのシグニチャ
object EventSourcedBehavior {
// 永続的なアクター用のBehaviorを作成するapplyメソッド
def apply[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = ???
EventSourcedBehavior.apply
を使う際に以下を渡す。
- persistenceId
- イベントをソースとする振る舞いに対する一意な識別子
- emptyState
- イベントが処理される前の、エンティティの初期状態
- commandHandler
- コマンドをエフェクトにマッピングする
- eventHandler
- イベントが永続化されたとき、現在の状態から新しい状態を計算する
サンプルコード
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
object MyPersistentBehavior {
sealed trait Command
final case class Add(data: String) extends Command
case object Clear extends Command
// Eventは過去形で定義している
sealed trait Event
final case class Added(data: String) extends Event
case object Cleared extends Event
// 最新の5つの状態をもつリスト
final case class State(history: List[String] = Nil)
//コマンドハンドラは、Effect永続化する1つまたは複数のイベント(存在する場合)を定義する指示を返します。
val commandHandler: (State, Command) => Effect[Event, State] = {
(state, command) =>
command match {
// エフェクトはファクトリを使用して作成
case Add(data) => Effect.persist(Added(data))
case Clear => Effect.persist(Cleared)
}
}
//イベントが正常に永続化されると、eventHandlerを使用してイベントを現在の状態に適用することにより、新しい状態が作成されます
val eventHandler: (State, Event) => State = { (state, event) =>
event match {
case Added(data) => state.copy((data :: state.history).take(5))
case Cleared => State(Nil)
}
}
def apply(id: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
// persistenceId 永続アクターの一意の識別子。ofUniqueIdで独自の識別子を作っている
persistenceId = PersistenceId.ofUniqueId(id),
// emptyState エンティティが最初に作成されるタイミングを定義
emptyState = State(Nil),
// 効果を生成することによってコマンドを処理する方法
commandHandler = commandHandler,
// イベントが永続化されたときの現在の状態を指定して、新しい状態を返す
eventHandler = eventHandler
)
}
コードはExample and core APIより引用
終わりに
今回簡単なサンプルのみしかみていないが、Persistence (Event Sourcing)を使うこと自体はとてもシンプルにできそうである。
今回ドキュメントと参考文献を読んでなんとなく理解できたくらいなので、今後Cluster Shardingを使ってみたい。
参考文献
- インメモリデータストアを使って Akka Cluster + Akka Persistence のアプリを自動テストする
- Akka Persistence Testkitを動かすまでの手順をメモっておく
- AWSで CQRS/Event Sourcing するにはどうすればいいのか
- 具体的な実装コードからEvent Sourcingを理解する
- Akkaで分散システム入門
- Akka Clusterで超レジリエンスを手に入れる
- Akka Clusterで超レジリエンスを手に入れる(その2)
- Akkaクラスターでアクターを用いた大規模分散システムを構築する
- akka-persistenceのプラグインをつくろう
- Events As First-Class Citizens
Discussion