🐅

Classic ActorからTyped Actor への移行について

2021/03/07に公開

以下のLTの発表と同じ内容になります。
https://speakerdeck.com/taketora/classic-actorkaratyped-actor-hefalseyi-xing-nituite

はじめに

Actor(Typed)を構築するDSLがClassic Actorと差異が多く、私自身理解をすることに時間がかかった。そこでこの記事では大きな違いを説明する。

移行する上で参考にした資料

なぜClassic ActorではなくTyped Actorを採用するのか?

  • 2019年11月にリリースされたAkka 2.6.0からtyped Actorが安定版へ
  • Typed APIがAkkaのメインのAPIになった。
  • メッセージの受け渡しが型安全になった
  • 状態をもつようなActorをImmutableなコードで実装することできる

Actorを構築するDSLの比較

HelloWorldアクターがGreetを受け取ってGreetedを返すActorを作る。

Classic Actor

import akka.actor.{Actor, ActorLogging, Props}

object HelloWorld {
  final case class Greet(whom: String)
  final case class Greeted(whom: String)

  def props(): Props = Props(new HelloWorld)
}

class HelloWorld extends Actor with ActorLogging {
  import HelloWorld._

  override def receive: Receive = { //※1
    case Greet(whom) =>
      log.info("Hello {}!", whom)
      sender() ! Greeted(whom)    //※2
  }
}

※1 receiveの返り値がReceive型で、Actorはどんなメッセージも受信できる

※2 sender()を使いActorRefを呼び出してメッセージを送っており、型の制約がないためActor間のメッセージ送信はどんなメッセージも送信できる

Typed Actor(関数型スタイル)

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}

object HelloWorld {

  final case class Greet(whom: String, replyTo: ActorRef[Greeted]) //※1
  final case class Greeted(whom: String, from: ActorRef[Greet])

  def apply(): Behavior[Greet] = //※2
    Behaviors.receive { (context, message) =>
      context.log.info("Hello {}", message.whom)
      message.replyTo ! Greeted(message.whom, context.self) //※3
      Behaviors.same
    }
}

※1明示的に送信者を表すActorRef[T]をメッセージに含める。メッセージからどのアクターが何を期待しているのかということがより明示的になる。

※2 Actorを継承する代わりに、Behaviorを定義してActorを構築する。

※3 sender()がなく、Greetメッセージを受け取ったアクターは、送信元のActorRef[Greeted]へ、Greetedと、自身のActorRef返している。

プロトコルについて

Typed Actorから公式ドキュメントでは単にメッセージではなくプロトコルと呼ばれている。
特定の順序と組み合わせで2つ以上のアクター間で交換されるメッセージのセットを指している。

object ChatRoom {

  // プロトコル
  sealed trait RoomCommand
  final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand

  sealed trait SessionEvent
  final case class SessionGranted(handle: ActorRef[PostMessage])      extends SessionEvent
  final case class SessionDenied(reason: String)                      extends SessionEvent
  final case class MessagePosted(screenName: String, message: String) extends SessionEvent

  trait SessionCommand
  final case class PostMessage(message: String)                 extends SessionCommand
  private final case class NotifyClient(message: MessagePosted) extends SessionCommand

  private final case class PublishSessionMessage(screenName: String, message: String) extends RoomCommand

// 省略
}

RoomCommand,SessionEventを使ってプロトコルを作っている。

プロトコルを呼び出す方法もシングルトンオブジェクトからフルパスで明記することが推奨されている。公式ドキュメントでも、2.6.Xからフルパスで定義されている

参考:Akka Typed: Protocols

Classic Actor でも同様にプロトコルを定義できるが強制されていなかった。そのためプロトコルを使っていない大規模なアクターシステムの拡張と維持が困難になる。

一方Akka Typed の場合必ずプロトコルを宣言する必要があり、Tour of Akka typedの記事では、これをプロトコルファーストと呼んでいる

This is where the Akka Typed API comes in. This API is designed to be “protocol-first”: you no longer have a choice but to spend at least a little bit of time thinking about the messages each actor can deal with. Unlike the classic API where following this best practice is optional, you need to formalize the set of handled messages during implementation.

そこで Akka Typed API の出番です。この API は "プロトコルファースト" で設計されています。このベストプラクティスに従うことがオプションである古典的な API とは異なり、実装時に処理されるメッセージのセットを形式化する必要があります。

Tour of Akka Typed: Protocols and Behaviorsより引用

プロトコルを使ったActorの定義(オブジェクト指向スタイルと関数型スタイル)

オブジェクト指向スタイル

Behaviorを定義するオブジェクト指向のスタイル。

import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext}
import akka.actor.typed.{ActorRef, Behavior}

object Counter {

 // sealed traitでプロトコルを定義
  sealed trait Command
  case object Increment                               extends Command
  final case class GetValue(replyTo: ActorRef[Value]) extends Command
  final case class Value(n: Int)
}

class Counter(context: ActorContext[Counter.Command]) extends AbstractBehavior[Counter.Command](context) { //※1


  import Counter._

  private var n = 0

  override def onMessage(msg: Command): Behavior[Counter.Command] =
    msg match { //※2
      case Counter.Increment =>
        n += 1
        context.log.debug("Incremented counter to [{}]", n)
        this

      case GetValue(replyTo) =>
        replyTo ! Value(n)
        this
    }
}

※1アクターの構築に Actor の代わりにAbstractBehavior[T] が継承して構築する
※2プロトコルが網羅的にチェックされる。

アクターのBehaviorはこのAbstractBehaviorを拡張して抽象メソッド#onMessageを実装し、オプションで#onSignalをオーバーライドすることで実装することができる。Mutableな状態は、クラスのインスタンス変数として定義することができる。

関数型スタイル

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}

object Counter {
  sealed trait Command
  case object Increment                               extends Command
  final case class GetValue(replyTo: ActorRef[Value]) extends Command
  final case class Value(n: Int)

  def apply(): Behavior[Command] = counter(0)

  private def counter(n: Int): Behavior[Command] =
    Behaviors.receive { (context, message) => //※1
      message match {
        case Increment         =>
          val newValue = n + 1
          context.log.debug("Incremented counter to [{}]", newValue)
          counter(newValue) //※2

        case GetValue(replyTo) =>
          replyTo ! Value(n)
          Behaviors.same
      }
    }
}

※1 AbstractBehaviorの代わりに、より関数型のスタイルとしてBehaviorsのファクトリーメソッドある。(receive,receiveMessageなど)

※2 状態をアクターに持たせたい場合にImmutableに定義することができる。ここではBehaviorを返すメソッドの引数にcounter状態を持たせている。Actorは振る舞い(Behavior)を切り替えている。

Classic Actorの違い

  • アクター間のメッセージの整合性がコンパイル時にチェックできる
    • TypedActorのメッセージの型を宣言する必要がある(強制される)
    • コンパイラーは、メッセージの受信者が送信中のメッセージを実際に処理できるかどうかを静的にチェックできる。
  • Actor内のmutableなコードがなくなり、ImmutableなコードでActorを実装できる。

概念の比較表(※厳密なマッピングの表ではない)

項目 Classic Actor API Typed Actor API
アクター参照 ActorRef ActorRef[T]
アクターの構築 extends Actor extends AbstractBehavior[T] (オブジェクト指向スタイル)
子アクターの生成 context.actorOf context.spawn
ユーザーガーディアン(/user) ActorSystemによって提供される(ユーザーガーディアン) ユーザーによって提供され、ActorSystemにBehaviorを渡す
デフォルトのスーパーバイザーの戦略(監督戦略) 例外が投げられた際に子アクターを再起動する 例外が投げられた際に子アクターを停止する(let it crash)

Tour of Akka Typed: Protocols and Behaviors より翻訳して引用

子アクターの生成

import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}

object Main {

  def apply(): Behavior[NotUsed]      =
    Behaviors.setup { context => //※1
      val chatRoom   = context.spawn(ChatRoom(), "chatroom") //※2
      val gabblerRef = context.spawn(Gabbler(), "gabbler")
      context.watch(gabblerRef)
      chatRoom ! ChatRoom.GetSession("ol Gabbler", gabblerRef)

      Behaviors.receiveSignal {
        case (_, Terminated(_)) =>
          Behaviors.stopped
      }
    }

  def main(args: Array[String]): Unit =
    ActorSystem(Main(), "ChatRoomDemo")
}

※1 setupはBehaviorのファクトリーの一つ(またtyped actorにはPreStartとPostRestartシグナルがないので、同様の処置はBehaviors.setupやAbstractBehaviorクラスのコンストラクタから実行する。)
※2 spawnで子アクターを生成している

ユーザーガーディアン

typed Akkaでは、アプリケーションのための追加アクタをユーザーガーディアンから追加する。初期化を行うと同時に、ユーザーガーディアンから作られる。
Classic ではActorSystemからactorOfで作っている。

デフォルトのスーパーバイザーの戦略が異なる

例外が発生した際にClassicActorは子を再起動する。typed Actorは停止させる。

      Behaviors
        .supervise[Message] {
          Behaviors
            .receiveMessage {
              case Restart(accountIds) => ???
              case Start() => ???
            }
        }
        // リトライの最大期間は1分で、3回リトライするように設定
        .onFailure[Exception](SupervisorStrategy.restart.withLimit(maxNrOfRetries = 3, withinTimeRange = 1.minutes))
    }

typed Actorでも上記のように再起動の戦略をカスタマイズもできる。

さらに詳細な違いについてはドキュメントへ

Discussion