Open8

Akka雑記その1

unouunou

アクターシステムを中心に動くシステム(JavaとScalaで動く)
HTTP、Stream、Clusterなどいくつかに別れている
アクターは型安全ではない
→2.6から型安全であるAkka Typedが追加された
アクターはメッセージを送受信する。
アクター自身が状態を持つことはなく、ステートレスである。

unouunou

最小のAkkaアプリ

まずはbuild.sbtに使用するライブラリの依存関係を追加する。

依存ライブラリごとに
organization % module % version というフォーマットでMavenのアーティファクトを指定。
(%%はScalaのバージョンを補完し、正しいバージョンのライブラリを自動で選択する。)

チケット販売サービスを例にRESTサーバーを構築する。
アクターはcreate, send/receive, become, superviseの4つの操作を行うことができるがそのうちの2つを使って実装して行く。

Akkaは多くの補助ツールで構築して行くので、拡張という仕組みを使って行く。
ActorSystemを扱うためにはExcutionContextが必要なので暗黙に宣言する
MaterializerはStreamを扱うために必要になるためこちらも暗黙に宣言
メッセージはそれぞれのアクターのコンパニオンオブジェクトにケースクラスとしてまとめる。
アクターはクラスで宣言。例えばAとBというアクターを作る際は

class A extends Acotr {...}
class B extends Actor {...}

として、処理を記述する。
メッセージの受け取りは

def recieve = {
  case MessageA => ...
  case MessageB => ...
}

のようにメッセージごとに分けて記述する。
本に明記されてないけど
sender() ! 返り値 とすることで送り主に返信できる。! は tell と同義。
また、転送のforwardもある。

// inside `B`, when received `msg` from `A`
C tell (msg, A) 
      
C forward msg

この場合、msgをCに通知することには変わりないが、
上の場合(tell)は送信者(sender())がBになり
下の場合(forward)は送信者(sender())がAになる。
どこにも書いてないからネットで調べたよ!!!本に書いてよ!!!!!!!!!!!

// ActorA
def createActorB(name: String) = {
  context.actorOf(ActorB.props(name), name)

上記のように、ActorSystemからでなくcontextからアクターを作成するとアクターの親子関係を作ることができ、親アクターの監視下に置くことができる。
今回の場合はActorAが親、ActorBが子

pipe(処理) to アクター で、 処理が完了したらその値をアクターに渡すことができる。
この値はFutureに包まれて送信される(?)とのこと。詳細は後の章でとのことだけど、雰囲気はJSのPromiseっぽい。

unouunou

アクターによるテスト駆動開発 part1

事前知識編

・アクターは振る舞いをもつため、より直接的にテストの対象になる。ほとんどのTDDはBDDの側面も持つ。
・アクターはメッセージングを用いて構築されているので、シュミレートが容易。
この2点から、アクターシステムはテスト駆動開発に向いている。

まず最初にプログラムで実現しようとしているゴールに着目し、次にアクターのためのテスト仕様を記述することでコードの実装を開始する。その後、テストに合格されるためにコードを書く
以上がテスト駆動開発の実装手順だそう

通常のScalaTestフレームワークを用いるとアクターをテストするのは難しい。理由は以下4つ
・タイミング ー メッセージの送信は非同期なため
・非同期 ー アクターは複数のスレッドで並列に実装されているため
・ステートレス ー アクターは内部状態を隠蔽し、状態へのアクセスを許可していないため
・協調、統合 ー 複数のアクター間のメッセージのやりとりを盗み見てアサートする必要があるため

そのため、Akkaはakka-testkitモジュールを提供している。

  • シングルスレッドの単体テスト ー 通常はアクターのインスタンスには直接アクセスはできないが、テストキットのTestActorRefを使えば可能となる。これを使えば、直接アクターインスタンスをテストしたり、シングルスレッド環境でrecieve関数を呼び出すことが可能になる。
  • マルチスレッドの単体テスト ー テストキットモジュールは、TestKitとTestProbeクラスを提供し、これらを使うことで、メッセージの期待値をアサートすることができる。

また、TestKitにはLocalActorRefクラスを継承しているTestActorRefがあり、テスト専用のCallingThreadDispatcherにディスパッチャーを設定する。これは別スレッドではなく呼び出し元と同じスレッドでアクターを動作させるのでアクターがもつテストの難しさを緩和することができる。

実装準備編

テストを始まる前に全てのテストで使える、アクターを自動で停止させる小さなトレイトを作成する。
scalatest のBeforeAndAfterAllを継承してStopSystemAfterAllトレイトを作成する。
(本ではsystem.shutdownとなっているが、使えなくなっているので代わりにsystem.terminateを使う)

// before
system.shutdown()
// after
system.terminate()

※IntelliJでscalatestの依存をbuild.sbtに書くと、Unknown artifact. Not resolved or indexed.という警告が出る。
1、 電球マークを押して、update project resolvers' indexesをクリック→update
2、File → Invalidate Cache / Restart をクリック
で警告が出なくなるらしい。ちなみに治らなかった。

sbt1.3以降は以下を書く必要があるっぽい(?)(僕の環境ではこれでいけました)

ThisBuild / useCoursier := false

アクターシステムはfire and forget スタイルで、メッセージは一方通行なのでテストを行うには、アクターがやるはずだった仕事を確認すれば良い。
アクターの振る舞いは以下3つに分類されるのでそれぞれのテストを実装する。
・SilentActor ー アクターの振る舞いを外部から直接確認できないので、例外を出さなかったこと、アクターが終了したこと、内部状態の変化を確認する。
・SendingActor ー アクターは受信したメッセージの処理が完了した後別のアクターにメッセージを送信するが、受信したメッセージに対する反応として送信されたメッセージを検査。
・SideEffectingActor ー アクターが他のオブジェクトに影響を与えるかどうか(副作用があるかどうか)を確認する。

*ScalaTest2.0について
ScalaTestのバージョンが上がってライブラリの導入方法が大きく変わった。
今までとは違い、機能ごとにライブラリを入れることができるようになった。(今まで通り、全てを一括で導入することもできる)

// 今までと同じパターン
 libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % "test"

// 2.0から新たに可能となったパターン
libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.0" % "test"

FunSuiteかFunSpecどちらかしか使わないと思うので、良い変更かと思ったけど、新しい方法で導入すると他のMustMatchersとかも各々入れないといけないので面倒で結局今まで通りの方使いそう。

また、パッケージの階層構造も大きく変わったので、2.0以前のscalatestから移行する際は注意が必要。

// FunSuiteの例

// before
import org.scalatest.FunSuite
// after
import org.scalatest.funsuite.AnyFunSuite

詳細は以下公式のリリースノートを参照
ScalaTest2.0 ReleaseNote

unouunou

アクターによるテスト駆動開発 SilentActor編

SilentActorのテストの実装のポイント
・テスト用のクラスを作成し、テストで用いるアクターシステム(testsystem)を引数に渡したTestKitを継承する。
・BDDスタイルのテストができるようにAnyWordSpecLikeトレイトをミックスイン
・可読性の高いMatchers(MustMatchers)をミックスイン
・must ~ in ~の形式でテストを書いて行く
・red-green-refactorスタイルに従って、最初にテストに失敗する(red)ことを確かめた後、テストがパスする(green)ように実装し、その後コードをより良く(refactor)する。

class SilentActorTest
    extends TestKit(ActorSystem("testsystem"))
    with AnyWordSpecLike
    with Matchers
    with StopSystemAfterAll {
  "A Silent Actor" must {
    "change state when it receive a message, single thread" in {
       // ここにテストを記述
    }
    "change state when it receive a message, multi-thread" in {
      // ここにテストを記述
    }
  }
}

次にSilentActorを作成する。
コンパニオンオブジェクトにケースクラスとしてメッセージをまとめ、SilentActorの中でimportする。

object SilentActor {
  case class SilentMessage(data: String)
}
class SilentActor extends Actor {
  import SilentActor._
  var internalState: Vector[String] = Vector[String]()

  def receive = {
    case SilentMessage(data) =>
      internalState = internalState :+ data
  }
  def state: Vector[String] = internalState
}

SilentActorができたら、まずはシングルスレッドのテストを実装する。

"change state when it receive a message, single thread" in {
-      // ここにテストを書く
+      import SilentActor._

+      val silentActor = TestActorRef[SilentActor]
+      silentActor ! SilentMessage("whisper")
+      silentActor.underlyingActor.state must contain("whisper")
    }

マルチスレッド版も同様に実装して行く。
マルチスレッドのアクターシステムを作るとアクターのインスタンスにアクセスできなくなるので、ActorRefを引数にとるGetStateメッセージを追加して、内部状態をtestActorに渡すように実装する。

また、マルチ版ではテストキットのActorSystemからSilentActorを作成する。
*アクターは常にPropsオブジェクトから作られ、型引数を指定することで簡単に作成することができる。
expectMsgを呼ぶことでtestActorが受信したメッセージをアサートすることができる。

"change state when it receive a message, multi-thread" in {
- // ここにテストを書く
+ import SilentActor._

+ val silentActor = system.actorOf(Props[SilentActor], "s3")
+ silentActor ! SilentMessage("whisper1")
+ silentActor ! SilentMessage("whisper2")
+ silentActor ! GetState(testActor)
+ expectMsg(Vector("whisper1", "whisper2"))
}

以上がSilentActorの作成方法となっている。

unouunou

アクターによるテスト駆動開発 SendingActor編

続いてSendingActorを作成する。
後からアクターがメッセージを送信できるようにpropsメソッドを経由してアクターがActorRefを受け取る方法がある。
SilentActorと同様にまずはテストを作成する。ここでは例として、イベントのリストを並べ替えて受信者となるアクターにリストを送信するアクターを作成する。

SilentActorのテストと異なるポイント
・sortedEventsがランダムな要素のベクターを持っているためにexpectMsgを利用することができないので、ここではexpectMsgPFを使う。これなら、値を束縛することができ、testActorに送信されたメッセージを確認できるようになる。

expectMsgPF() {
  case SortedEvents(events) =>
    events.size must be(size)
    unsorted.sortBy(_.id) must be(events)
}

・コンパニオンオブジェクトでpropsメソッドを定義している。Props(arg)と名前渡しで呼び出すと値が参照されるまで評価されないので、Akkaがインスタンスを生成するときにnew SendingActor(receiver)が実行される。
また、コンパニオンオブジェクトでPropsの生成を行うことで、他のアクターからこのアクターを作成する際に内部状態を隠蔽することができる。
これにより、競合状態を引き起こしたり、シリアライズの問題が起こることを防ぐことができる。
Propsを作る時は基本的にこの方法で良いらしい。

object SendingActor {
  def props(receiver: ActorRef): Props = Props(new SendingActor(receiver))
  case class Event(id: Long)
  case class SortEvents(unsorted: Vector[Event])
  case class SortedEvents(sorted: Vector[Event])
}

MutatingCopyActorとForwardingActor、TransformingActorはSendingActorと同じようにテストができる。
FilteringActorとSequencingActorは別のアプローチでテストを行う。
FilteringActorはフィルターを通過しなかったメッセージをどうアサートするかが問題となる。
これはreceiveWhileメソッドを使い、case文にマッチしなくなるまでメッセージを収集する

val eventIds = receiveWhile() {
  case Event(id) => if id <= 5 => id
}
unouunou

アクターによるテスト駆動開発SideEffectActor編

受信したメッセージに従って挨拶文をコンソールに出力するアクターを例にテストを実装する。
Akkaにはログを出力するモジュールがある。

import akka.actor.{ActorLogging, Actor}

class Greeter extends Actor with ActorLogging {
  def receive = {
    case Greeting(message) => log.info("Hello {}!", message)
  }
}

ActorLoggingをミックスインして、log.infoで出力
テストでログメッセージを検査するので、TestEventListenerを設定する。
val config = ConfigFactory.parseStringでコンフィグファクトリーから設定ファイルをパースするメソッドを呼び出すことができる。

val config = ConfigFactory.parseString(
  """
    akka.loggers = [akka.testkit.TestEventListener]
  """

上記のようにすることでテスト用のアクターを作成できる。
また、EventListenerオブジェクトを利用することで、ログイベントの検証を行える。
interceptコードブロックが実行される時にログメッセージが検知される。

このテストはOptionを使いActorRefを渡すことで簡略化することができる。

双方向メッセージの場合ImplicitSenderをミックスインすることでtellメソッドの暗黙的なsenderをTestKitのtestActorに固定することができる。

unouunou

耐障害性

最上位のActorはActorSystemのactorOfメソッドで生成され自動的に開始される。
アクターにはフックがあり、イベントによって起動する。

開始イベント

preStartフックはアクターが開始する直前に呼び出される。

override def preStart(): Unit = {
  println("preStart")
}

このフックで初期状態を設定したり、コンストラクターで初期化を行ったりする。

停止イベント

postStopフックは、アクターが停止する直前に呼び出される。

override def postStop(): Unit = {
  println("postStop")
}

preStartの真逆。アクターの最後の状態をアクターの外に保存したりする。
停止したアクターはActorRefから切り離され、停止した後はdeadLetterActorRefへと宛先が書き変わる。
deadLetterActorRefは停止したアクターへ送られる全てのメッセージを受け取る特別なActorRefである。

再起動イベント

再起動イベントによりアクターのインスタンスが置き換わるため、開始や終了イベントに比べると複雑。

override def preRestart(reason: Throuwable, message: Option[Any]): Unit = {
  println("preRestart")
  super.preRestart(reason, message)
}

再起動イベントはデフォルトで全ての子アクターを停止し、postStopを呼び出すが、super.preRestartを呼び忘れると、前述のように動作せずに子アクターがどんどん増えていってしまう。
再起動後に状態を復元するために、selfメソッドを用いて自身にメッセージを送信する必要がある。

アクターが再起動し、コンストラクターが呼び出された後、postRestartフックが呼び出される。

override def postRestart(reason: Throwable): Unit = {
  println("postRestart")
  super.postRestart(reason)
}

preRestartと同様にsuper.postRestartを忘れずに呼び出す。

全体の流れとしては

  1. actorof
    開始
  2. constractor
  3. preStart
  4. preRestart
    再起動
  5. constractor
  6. postRestart
  7. postStop
    停止

アクターのライフサイクル

アクターはstopメソッドが呼ばれたとき、PoisonPillメッセージが送られた時に終了するが、再起動時にアクターがクラッシュした場合、stopメソッドが呼ばれないので終了しない。
同様に、アクターが終了した際に監督にTerminatedメッセージも再起動時にアクターがクラッシュした場合は送られない。
以下にライフサイクルを監視する簡単なコードを示す。

class DbWatcher(dbWriter: ActorRef) extends Actor with ActorLogging {
  context.watch(dbWriter)
  def receive = {
    case Terminated(actorRef) =>
      log.warning("Actor {} terminated", actorRef)
  }
}

context.watch(dbWriter)を呼び出せばdbWriterを監視対象に置き、終了した際にTerminatedメッセージを受け取れるようになる。