Open9
akkaまとめ
akkaチュートリアル
compileエラー
- どのマイクロサービスを
sbt compile
しても同じエラーが出る
java.lang.ClassCastException: class java.lang.UnsupportedOperationException cannot be cast to class xsbti.FullReload (java.lang.UnsupportedOperationException is in module java.base of loader 'bootstrap'; xsbti.FullReload is in unnamed module of loader 'app')
at sbt.internal.XMainConfiguration.run(XMainConfiguration.java:59)
at sbt.xMain.run(Main.scala:46)
at xsbt.boot.Launch$.$anonfun$run$1(Launch.scala:149)
at xsbt.boot.Launch$.withContextLoader(Launch.scala:176)
at xsbt.boot.Launch$.run(Launch.scala:149)
at xsbt.boot.Launch$.$anonfun$apply$1(Launch.scala:44)
at xsbt.boot.Launch$.launch(Launch.scala:159)
at xsbt.boot.Launch$.apply(Launch.scala:44)
at xsbt.boot.Launch$.apply(Launch.scala:21)
at xsbt.boot.Boot$.runImpl(Boot.scala:78)
at xsbt.boot.Boot$.run(Boot.scala:73)
at xsbt.boot.Boot$.main(Boot.scala:21)
at xsbt.boot.Boot.main(Boot.scala)
-
sbt コマンド自体が失敗してるっぽい
-
project配下のbuild.propertiesを変えてみたらsbtは起動した
- ?バージョン違いの度にこんな分かりづらいエラーが出るのかな...?
sbt.version=1.6.2 # 元は1.5.6
- 今度は依存性の解決に失敗している
[error] sbt.librarymanagement.ResolveException: Error downloading com.typesafe.sbt:sbt-native-packager;sbtVersion=1.0;scalaVersion=2.12:1.8.1
- ivyに他のresolver追加する必要あるのかな?チュートリアルには書いてないけど...
- ひとまずtypesafeのgit確認したらプラグインのDL先が変わっていたのでplugins.sbtに1行追加した
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.4")
- ようやくsbt compile通った!
- ivyを直接いじる必要はなかった
run
-
sbt -Dconfig.resource=local1.conf run
- これでローカル環境ように異なるモジュールが異なるポートで起動する
-
akkaチュートリアル
- こっちの方がチュートリアルとしての質が高い(少なくとも起動できる)と聞いたので浮気してみる
akka actors quickstart
-
https://developer.lightbend.com/guides/akka-quickstart-scala/
-
終わったらドキュメントを読んでみよう
-
こっちのチュートリアルはsbtのbinファイルも一緒に入れてくれてて親切だな
-
reStartで再起動するのはどうしてだろう
- build.sbtに書いてあるのかな〜と思いきやそうでもない
- reloadは知ってたけどrestartは初めてだ
- sbt-resolverのプラグインの影響っぽい
- nodemon的なものか
-
case class / case objectを知っておく必要がありそう
- case classは通常のclassに20個近くメソッドが用意されているもの
- 比較メソッド、unapply、applyなどさまざまなメソッドが用意されていてcase matchで扱いやすい
- ?FPとOOでかけるけどFPを採用するのであれば必須と考えて良いのだろうか
actor
- GreeterBotはどこかのグローバルスコープに変数を用意しておくのではなくbehaviour Factoryが返却するbehaviourを少し変えることでステートを維持している
- behaviour.setupとbehaviour.receiveは何が違うのだろう
-
公式に書いてあった
- setupはactorが開始されるまで実行されない、receiveは即座に実行される?
- でもStackOverflowの説明を読む限り、逆のことが書いてある。setupは即座に実行されるから一番最初のactorを起動するのに使う、receiveはmessageを受け取るまで何もしない
- SOの回答の方がしっくりくるな(他のチュートリアルでもqueryActorはsetupで作成している。もしreceiveだったら明示的にmessageを送らないとqueryが開始しない)
-
公式に書いてあった
- behaviors.stoppedするとactorが作成したactorも消えるらしいが、actorからactorを作成したいのってどんな時なんだろう
- これはDevice Tutorialでカバーした
- 全てルートactorからspawnしない理由としては、親actorが子actorの終了などを知りたいときに親がsignalを受け取りたいからだろうか
- contextには何が入るんだろう(behaviors.receiveとかbehaviors.setupに頻出)
- context.log.infoに含まれるactorIdとか、今のactorの実行環境の情報などを持っている
- watchWithとか、actor同士の関連づけに使われることも
- context.spawnとは
- 新たなactorを今のactorの子として作成する
- behaviors.receiveとbehaviors.receiveMessageは何が違うんだろう
- contextが含まれてるか否か。contextが不要ならreceiveMessageで事足りる
- behaviors.sameを返し忘れたら何が起きるんだろう
- 返し忘れたらコンパイルエラーが生じるはず。Behaviorを返さなきゃいけないから
- OOならthisを返せるけどFPだとBehaviors.sameとか、せめてBehaviors.emptyかunhandledを返せばコンパイルは通る
- behaviorBotは戻り値がBehaviorだけど再帰するからbehaviorsが返らないタイミングがある。この時actorはどういう状態にあるんだろう?
- そんなことはない。常にbehaviorsが返るようになっている
- actorはnewではなくspawnで作成する
- actorのインスタンスではなくActorRefが返ることが柔軟性を生み出すらしい
- actorインスタンスを渡すのも参照が渡されるだろうから何が違うんだろう
- akkaの場合はアクターが同じメモリ空間上にいなくても構わないので稼働しながら一部アクターを別インスタンスに動かすことさえ可能、多分これを柔軟性と呼んでいる
- actorインスタンスを渡すのも参照が渡されるだろうから何が違うんだろう
- 今回の例だとgreetermainはspawnによりgreeterを作成して、もし[sayhello]messageをgreeterMainが受け取ったらmax=3のgreeterBotのactorをspawnしている
- ?scalaのクラス定義によく出てくる class Hoge() { this => ...はどういう意味だろう
- どうしてactorはclassではなくobjectとして定義しているんだろう
- FP的な書き方をしているため。objectはアプリケーション全体に一つしか存在しないことが保証されているので、TSで言うとfunctionを定義しているような感じ
- ?actorが存在していなかった場合(stoppedされていた場合)メッセージはどう扱われるんだろう
- 基本的には無視されるだけ。unhandledとの違いは、unhandledであることを伝えるか否か、ではなかろうか
- actorのインスタンスではなくActorRefが返ることが柔軟性を生み出すらしい
- アプリケーションにはActorSystemが1つだけ存在していて、こいつを extends Appしたobjectが初期化する
- def applyはspawnのタイミングで一度だけ呼ばれるってことで良いのかな
- 合っている
- printlnじゃなくてcontext.log.infoしてる、どういう違いがあるんだろう
- 実行されているactorの識別子などが情報として追加される
message
- ActorRefが ! というメソッドを定義しているので、それを呼び出している
- 実際にはref.tellしてるだけに見えるけど、直接tell呼んじゃダメなのかな(コンパイルは通りそう)
- テストも通るし、実際はrefしかなければtellを使うこともあるんだろうな
- 実際にはref.tellしてるだけに見えるけど、直接tell呼んじゃダメなのかな(コンパイルは通りそう)
test
- testProbeを作成して、こいつをreplyToに指定してメッセージが受信できることを確認するだけ
- ?めちゃくちゃ簡単そうだけどまだ複雑なテストを書くイメージが湧かない...
- scalaだとtraitはxxxLikeって付けるのが作法なのかな(AnyWordSpecのtraitがAnyWordSpecLike)
- scalaTest、いろんな書き方をサポートしてるのすごいけどプロジェクトで統一するの大変そう
- テストケース1つ書くにも今までと違う頭の使い方をするからちょっと疲れるな(replyToあたりが少し混乱する)
- ?複数のメッセージを受け取ったことを保証するにはどうすればいいんだろう?(今回だと2回は追加で呼ばれる必要がある)
- 無理か
- testingProbeだとGreetメッセージを受け取った時にGreetedメッセージを発しない
- GreeterBotのBehaviors.receiveが1度しか呼ばれない
- 複数回呼ばれること〜みたいなテストは統合テストになるからprobeでは担保できない
- ?botのcounterがincrementされていることを知りたいんだけどな、どうすればいいんだろう
- 無理か
サンプル)
Akka(IoTactor
- akkaのactorは全てroot guardinの配下、開発者が明示的にspawnしたactorはuser配下にある
- そういえば上のサンプルでもuser/配下だったな
- ?import akka.actor.typed.ActorSystemとimport akka.actor.ActorSystemは何が違うんだろう
- applyは何かしらの予約語なんだろうか
- ?behaviors.receiveとreceiveMessageは何が違うんだろうか
- ?scalaって同じファイル内でもimportするんだ
- 前のサンプルで
import com.example.GreeterMain.SayHello
を実行しないとAkkaQuickStartの中でSayHelloメッセージを作成できていなかった - sayHelloは別にexportしてる訳でもないけど、いつimportできる/できないが決まるんだろう
- 前のサンプルで
- ?いまいちcompanion objectの利点がまだ理解できていない
- 今回のサンプルでもapply(コンストラクタ?)だけobjectに書いてるのはどうしてだろう
- そこまで全力でnewを避けたい理由もないんだけど、コンストラクタをたくさん作るのとobjectにapply作る違いはどこに?
- ?AbstractBehaviorって何だろう
- どうしてextendsしてるのにコンストラクタが存在するんだろう(traitにはコンストラクタないはず)
- これはabstractClassだった
- extendsしたクラスに直接contextを渡してる。これを定義しないとどうなるんだろう
- ビルドが通らない。abstractには明示的にコンストラクタ引数を受け渡さないといけないっぽい
- ?behavior返すときにthis返してる。Behaviors.sameとはどう使い分けるんだろう?
- Behaviors.stoppedとかBehaviors.unhandledとかも返せる。ステートマシンっぽいな
- unhandledだと、一応メッセージは受け取るけど何もしない、って意味
- ?何のために定義するんだろう?onMessageが全部空じゃダメなのかな
- onMessageがあれば何かしらのBehaviorは返さなきゃダメ、ActorBehaviorをextendする以上はonMessageが必要、ってことなんだろうか
- ?何のために定義するんだろう?onMessageが全部空じゃダメなのかな
- unhandledだと、一応メッセージは受け取るけど何もしない、って意味
- Behaviors.stoppedとかBehaviors.unhandledとかも返せる。ステートマシンっぽいな
- ?前のサンプルとはactorの作り方が異なる
- 前はBehaviors.receive、今回はabstractClassのextend。それぞれの違いを理解したい
- ?printMyActorRefActorのclassを定義しただけの段階ではoverrideしなさいエラーが出ない
- でもcompanion objectを定義した瞬間にエラーが出た。どうしてだろう?
- newするだけでエラーが生じる訳ではない。applyができることによってこれがabstract classではないことが確定するからだろうか?
- ?behaviors.emptyをspawnする意味とは
[error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
[info] [2022-04-15 12:42:52,767] [INFO] [akka.event.slf4j.Slf4jLogger] [testSystem-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
[error] SLF4J: See also http://www.slf4j.org/codes.html#replay
- ?動かしてみたらこんなエラーが出たけどこれはどういうことだろか
- ?sbt "runMain packagename" これで起動したけどrunMainって何だろう
actor lifecycle
- Behaviors.stopped()で止められる
- context.stop(childRef)でも止められる
- childではないactorを止めることはできない
- ?import akka.actor.typed.scaladslとimport akka.actor.typedの違いが気になる
- partialFunctionって何だろう?scalaっぽい
- 関数のドメイン(定義域)に応じたcase文を書ける。引数に対する例外処理に近いイメージ
- 今回のケースだと[Signal,Behavior[String]]だから、引数(Signal)の値次第では処理されないかもしれないけど、基本はString型のメッセージに反応するBehaviorを返す
- 引数は必ず1つしか取れない(unary function)なので注意
- collectに渡すと、isDefinedAtがtrueを返す値だけ返せる(filter的な)
- ?onMessageはPartialFunctionじゃないけど、もしmsgのcaseが一致しなければ何も返らない。同じくpartialfunctionじゃなきゃいけないのでは?
- きっと内部的にはPartial[Behavior[String], Unit]みたいな感じになっているけど、Unit全部書くのめんどいから省略されているのかな
- PostStopはどこにも定義されていないけどちゃんと解釈されるのかな
- ?やっぱりimportしなきゃダメだった。なんかimportの補完が効くvs拡張ないのかな
- ?onSignalは引数がないのにどうしてcaseが機能するんだろう?何に対してcaseをmatchしてるんだろう
- onSignalはデフォルトでPartialFunction.emptyが定義されている。.emptyは関数型だとよく見るなぁ
- stopするときは必ずchildの方を先にstopする
failure handling
- デフォルトではchildからunhandledExceptionが生じるとchildは停止する
- ?restartの方が都合良さそう+akkaの思想(resilient)に沿ってそうだけどなんで停止なんだろうな
- spawnするときに例外時の方針を決められる
- ?特定の期間中に何回restartしたら止める(流石にやばいから)みたいな制御できるんだろうか
IoTApp
Supervisor
- ?Behaviors.emptyとBehaviors.unhandledの違いが知りたい
- unhandledは直前のbehaviorを使いつつもunhandledであることを示す。emptyは全てのメッセージに対してunhandledを返すようにする
- printlnとcontext.log.info(akkaが提供する機能)を使い分けよう
- Nothing型はどう使われるんだろう?Nilとの違い
- まずNothingとNullはtrait、NilとnullとNoneは値
- Nothing, Null -> 全ての型のサブタイプなので、どこにでも使える
- Nil -> 空のリスト
- null -> 何もない値
- None -> Optionのサブタイプ、Option[Nothing]をextendしている
- ScalaのNilはList[Nothing]
- 1 :: 2 :: Nilみたいな感じでListの初期化に使える。:: (const)は最後に必ずListを使う必要があるので、そこにNilが役立つ
- nullは何にも入れられるので非常に注意が必要。ヌルポが発生する
- ?使うケースほぼなくね?いつ使いたいんだろう
- NoneはOption型のサブタイプなので、通常はこちらを使う
- まずNothingとNullはtrait、NilとnullとNoneは値
- IoTSupervisorはonMessageで何一つメッセージを処理しないからBehavior[String]と当初書いてしまったけどBehavior[Nothing]と書き換える
Main
- extends Appではなくdef main()にしている。どうしてだろう
- extends AppはOO的な書き方、Objectに対するdef mainはFP的な書き方
DeviceActor
- まずmessageを定義するところから始める
Deliveryについて
- akkaのmessageは届かないかもしれない前提で考える(at-most-once)
- 特定のactorからのメッセージは順番が保証されるけど、他のactorからのメッセージが途中で挿入される可能性はある
- messageには識別子を含めることが良い
DeviceActorのコード
- Option[Long]型にNoneを代入している。nullとはまた別...?
- nullにしてもコンパイルは通ってる
- nullは何にでも入れられるから、TSでいうところのany型に近い
- nullにしてもコンパイルは通ってる
- context.log.info2って何だろう?infoしか定義されていないけど
- placeholder({})が複数ある時のためのメソッド。{}が複数あると条件が曖昧なため代入できないらしい。それより多くの{}がある場合はinfoNが必要。なんか面倒だな
- testの中でtestProbeを作成する際、actorではなくrespondTemperatureメッセージをprobeの型として指定している...?
- 勘違いしてた。指定しているのはテスト用のactorが受け取るmessageの型
- ?akkaでは自前でxxxCompleted的な確認メッセージを返す必要がある。普通の手続的な書き方ならcompletedを書かなくても次の処理を書けば良い(例外が生じたら中断されてcatchに移動するため)
- この面倒くささは後々コード書いてる時に効いて来そうだけど、どうなんだろう
- この確認メッセージすら届かなかった時どうすれば良いんだろう?普通の手続的なプログラムの場合は関数全体のロールバックができる(例外処理のcatchの中で)。でもakkaの場合は1)いつロールバックを始めるべきなのかわからない(リソースの作成は完了しているけど確認メッセージが届いていないのか、リソースの作成が完了していないのか判別できない)、2)ロールバック処理をどうやってまとめるのか(これも複数のaktorに分散されるのだとしたら正常系とロールバックの整合性を管理しづらくならないだろうか)
- どうしてextends Commandしてるんだろう? と気になっていたけど、caseがexhaustiveになっていないことをvscodeが示してくれるからか(command定義してないと毎回Read|Writeみたいに書き換えていかないといけない)
- testKitの中で自由にspawnとか呼び出せるのありがたいな
- receiveMessageとexpectMessageの使い分け
- messageの中身が必要な時はreceiveMessageが必要
- expectMessageの方が宣言っぽい感じがして好き。messageの中身を使う必要がなければ基本はexpectMessageで良いのかな
- messageの中身が必要な時はreceiveMessageが必要
actorの設計
- DeviceGroupのCommandはsealed traitじゃなくてただのtraitになってるけど、何が違うんだろう
- sealedはそのファイル内で全てのextendが行われることを保証できるので、caseがexhaustiveになっていないときにlinterに指摘してもらえる
- companion objectに対してもclass内でimportしてるのが気になった
- 昔Scalaの言語仕様を決める際にこれは議論されたらしい。必要なものをimportする方が、不要なものを削除するより簡単だから、という結論らしい
- import com.example.DeviceManagerにしたらtraitもimportできるようになった
- ?絶対パスを指定するのと、ファイル内で識別子を指定するのと何が違うんだろう
- 昔Scalaの言語仕様を決める際にこれは議論されたらしい。必要なものをimportする方が、不要なものを削除するより簡単だから、という結論らしい
- いまいちまだonSignalで使われるcase文を返す関数に慣れていない...
- これはpartialFunctionの書き方だから慣れるしかないかな...
- ?PartialFunctionの以下の「self」って何だろう?
trait PartialFunction[-A, +B] extends (A => B) { self =>
- case trackMsg @ RequestTrackDeviceの @と
groupId
って何だろう...- @はcase matchingした後に値を変数に格納するときに使う
- 今回のケースだと型がRequestTrackDeviceだった時に、その値をtrackMsg変数に格納してる
-
groupId
はいまだにわからない- backtickはcase文の中で他の変数を参照して、特定の値を持っていることをmatchの条件にできるらしい
- backtickがないとケースマッチが「3つの引数が存在するとき」になってしまうけど、backtickが存在することで「特定のgroupIdに合致するとき」になるんだろうな
- なのでbacktickを外すと、その下のRequestTrackDeviceのcaseと差がなくなるので「unreachable code」
- めちゃくちゃ細かくcase書けて素晴らしい
- if文を繋げてcaseを作ることもできるんだろうけど、こちらの方が可読性が高いってことかな
- mapからのgetもsome,noneでcase書く必要があるの良いな
- @はcase matchingした後に値を変数に格納するときに使う
- messageってcase objectにした方が良くない?
- objectはそれ単体がアプリケーション上で1つしか存在し得ないから、同じタイプのインスタンスが複数とびかうmessageに使うのは不適切
- listDeviceメッセージを自分で作った時、以下の点がチュートリアルと異なった
- deviceIdのsetを返すのではなく、deviceのListを返した
- これだとメッセージの容量が大きくなりがちだし、識別子を返すだけでも良いのかも
- actorRef自体がメッセージに格納されているとだいぶテストしづらい(actorRefだけではidが含まれないため)
- groupIdの一致を確認せず、deviceListを返した
- これだと全てのdeviceGroupからdeviceListが返ってきてしまう。一致しない時はBehaviors.unhandledを返さなきゃ。まだまだあんまりアクターの粒度が頭で描けていない
- thisを返すのとBehaviors.unhandledを返すのって何か違いがあるんだろうか
- thisはOO、unhandledはFP的な書き方なのでプロジェクト次第
- deviceIdのsetを返すのではなく、deviceのListを返した
- テストで使う testProbe.remainingOrDefaultって何だろう
- ?terminateされたactorが全ての処理を実行するまでの残り時間どうやって計算するんだろう
- awaitAssertが必要な理由がいまいち分かってない
- expectTerminatedが実行されているのだから、もうterminatedは処理されている(deviceは減っている)のでは...?
- expectTerminatedはexpectしているだけなのでreceiveしているとは限らない。こちらはdevice1がどこかのタイミングでterminateされることをテストしている。次のawaitAssertは、最終的にDevice2だけを含むListが返ることをテストしているので、それぞれ違うことをテストしている
- extends abstractClass...したときにabstractのconstructor呼び出すのをいつも忘れちゃう。自動的にやってくれる言語に慣れすぎた
- scalaで list filterとlist.filterって何か違いあるのかな
- 簡単なサンプルだと特に違いなさそう。公式でArity-1 (Infix Notation)として説明されている
- akkaではterminateしたactorに対してwatchWith(actor, msg)することで、terminateした時に新たなmsgを自身に対して発生させられる
- terminatedだけだとidが足りない時などに役立つ
- DeviceManagerのcommandの中でextends DeviceGroup.Commandしている影響で、DeviceGroupのmatchがexhaustiveになっていない(deviceManager宛のcommandをdeviceGroupも受け取る可能性が生じている?)
- RequestTrackDeviceなどを誤ってGroupの方に定義していたため定義が重複していた
- ?親クラス(今回はmanager)のcommandが色んな子のcommandをextendしてる。どんどん伸びていくのかな...?
- ?deviceManagerのテストを書いてて思ったけど、device登録の成功をテストしてるのはdeviceGroupのテストと責務が重複してないだろうか?
- 登録されているdeviceGroupにmsgが横流しされていることだけテストしたい
- だけど登録完了メッセージにはdeviceが登録されたことしか保証されていないので、正しいグループに登録されたかわからない
- ?actorの内部状態がわからないので、registeredDeviceの一覧を取るために毎回getDeviceListのメッセージを送らなきゃいけなかったり、テストが密結合になりやすそう
- ?deviceGroupにメッセージが横流しされているテストが書けない
- deviceGroupを取得するメッセージが定義されていないため
- テストのために定義するのもおかしいし...どうすれば良いんだろう
- この辺を見る限り自身がspawnしたactorであればactorの名前を指定してメッセージボックスの中身を取得できそう
- 今回のテストだとgroupId-groupみたいな名前のgroupActorをspawnしてるから、そいつを取得してテストする感じだろうか
- どうしてcase classを常にfinalしてるんだっけ
- 基本継承不可にしといて損はないぐらいの感じかな
- 今まではdomain entityに対応したactorしか作成してこなかったけど、CQRSでいうクエリ処理専用のアクターを作ることもできる
- groupは書き込みに専念させて読み取りは専用actorを作った方が良いのかも
- タイムアウトの時とかどうしてFiniteDuration型を使うんだろう。IntとかLongではだめなのかな
- msとか非常に短い時間を指定する必要があるからFloatだとメモリが無駄になる?
- 値から値への変換もあるし、その辺を開発者が毎回手書きするのは面倒だから助かる
- ?アクター内のcommandどうやって整理するのが良いんだろうな...
- DeviceManagerとか結構いろんなコマンドを知ってるから、コメントで整理するぐらいの事しか今はできていない。objectとか使って整理するのかな
- ?respondTemperatureAdapterの使い方が今のところ全くわかっていない
- どうしてDevice.RespondTemperatureをそのまま使っちゃいけないんだろう?今まで散々他のactorのcommandをextendして受け取れるようにしてきたのに...actorのコマンドなら良いけど(他のactorが同じコマンドに反応することは許容されるべき)内部的なレスポンス型まで許しちゃうと密結合になる的な...?
- でもrequestDeviceListを受け取ったdeviceGroupはdeviceManagerの中に定義されているDeviceListを普通に参照している。ここではmessageAdapter要らないの?
- もしかしてレイヤーを意識してるのかな(ドメイン内部はお互い参照しても良いけど、queryActorからの三勝は許さない、的な?)
- 公式を見る感じ、requestを送信したactorは、それに対するresponseを知るべきではない、と書いてある。ここでもadapterを使って変換している。サンプルではcase objectが自動的に生成するapplyを使っているからイメージしづらいけど、実際は引数を1つ受け取って新たなクラスに詰め替える関数を渡す必要がある。setupで定義することが推奨されている
- ?特定のmsgに対してwrapperは1つしか定義できない。複数定義すると後に定義された方が上書きされる。上書きを検知できないから保守性に問題ありそう(元のwrappedResponseが呼び出されなくなってしまうから)、かつ複数のactorが対応しなければいけないresponseになった瞬間に終わる(どれか1つのwrappedResponseしか有効にならないから)
- サンプルを見てる感じ case x @ Hoge...みたいな書き方の他にも case x: Hoge...みたいな感じでもmatchした値を変数に格納できるっぽい
- 今までmap ++ kvsかmap + k -> vって感じで書いてきたけどmap.updated(k,v)的な書き方もある。Scalaって本当に書き方が豊富だな
- DeviceGroupQueryの中で、DeviceGroupとDeviceManagerはimportしてるけどDeviceのReadTemperatureaコマンドだけはDevice.ReadTemperature。どうしてだろう
- 一箇所でしかDeviceのコマンドは使わなからimportするほどでもない〜的な意味合いなんだろうか、それとも何か設計指針に基づいているんだろうか
- ?どうして
RequestAllTemperatures
はDeviceManager.Commandをextendしてるんだろう?そのせいでDeviceManagerのonMessageのmatchがexhaustiveにならない。extendを消したら問題なくコンパイルできた - ?コードがどこで呼び出されているのか探すのがめちゃくちゃ難しいイメージ...?
- 「device終了時に何もしない」ってロジックに変えたい時にgrepキツそう。watchWithされると好きなメッセージに変えられるから一括で検索することもできないし、補完も効かないし
- ?アクターモデル全般IDEの恩恵を受けづらい気もするけど、elixirとかerlangはうまく対処できるんだろか
- watch多用していると不要になった(絶対に呼び出されないコード)が見つけづらくならない?
- DeviceGroupQueryのtimeoutが3sだとテストがtimeoutエラーで落ちる、200msだと通過する。testProbe.expectMessageのデフォルトタイムアウトの方が先に来てしまうからだろうけど、こいつのデフォルト値は幾つなんだろう
- ここにdefault configurationが書いてあった
- Optionに対してNoneを渡すべきところでnullを渡していたがためにcase matchしていない状況が発生した。どうやってOptionに対してnullを非許容にする、あるいはcaseでnullもnoneもmatchさせるんだろう
- nullは全ての型に代入可能なので防げないのではないか?本当にnullのユースケースが思いつかない
ひとまず全部通して実装できた!よかったよかった
輪読会の疑問
- context.watch(userRegistryActor)
- ?これって何の意味があるんだろう?特にリカバリー戦略を明示しているわけでもなく、子アクターならwatchしなくてもシグナルは届きそうだけど
Akka素朴な疑問
- ?認証が完了した時だけ送信できるmsgはどう定義するのが良いんだろう
- 認証自体は専用のactorに任せるんだろうな。それが完了したらhogeを送るんだろうな。認証が完了していない状態でhogeを送れないようにどうすれば良いんだろう
- ?複数のactor間でどうやってDBに対するトランザクションを貼るんだろう
- ?テストを書いてる時、ターゲットとしているactor以外は一切メッセージに反応しなかったよ、ということを全てのテストで保証する必要ないのだろうか?(ネガティブチェック的な)
Concepts
Akka General- actorRefとそれが対処できるメッセージの型は生成時にパラメタ化されて定義されるので、新しく生成されるBehaviorも常に同じメッセージに対応できるようにしなければいけない(だから処理をしない場合もbehaviors.unhandledとかを返さなきゃいけない)
- actorRefを通してactorにメッセージを送っている
- メリット:送信したメッセージが必ずactorの何らかのbehaviorにキャッチされることを保証できる(無意味なメッセージ送信にはならない)
message
- 特定のactorから特定のactorに対するメッセージは順番が保証される。それ以外は保証されない
- 基本はFIFOだけど優先順位をつけることも可能
- actorが停止した時はメールボックスのメッセージがdead mail boxに移動される。ただしbest-effort
- mailboxがactor自体とは分離しているため例えばactorが再起動している間もメッセージが失われることはない
- remoteで実行されているactorに対してメッセージを送れる必要があるため全てのメッセージはserializable、かつサイズ制限もある(なのでインスタンスを丸ごと送るより識別子を送るパターンがより好まれるのかな)
- 複数ホストで稼働することを諦めるなら、より厳密なメッセージの伝達戦略を選択できる
- 厳密性を後から追加することは容易いけど、パフォーマンスのために厳密性を排除することは難しい
- アクターシステムのfwの中でもアクター間のメッセージの順番が保証されているのはakkaの特徴らしい
- イベントシステムの利点は障害発生時に全てをリプレイできること
ask
- ?公式サンプルでimplicit timeout使ってる。こういう使い方があるんだ〜と思う一方で普通の変数として宣言して渡した方が読みやすい気がしちゃうけど、どうしてimplicitなんだろう
- ?askの方が先述のadapterを使うより良さそう(他のadapterが上書きしてきても、このメッセージは従来通り対処される)
- ?askWithStatusはどうしてflatmapを接続する必要があるんだろう。レスポンスは1つしか返ってこないからSeqにはならないのかな...と思ったのだけど
- ignoreRefを使えばreplyToが要らない時にコンパイラを黙らせられる
- DBなどにアクセスしてFutureが返ってくる時、普段ならonComplete使っちゃうけどpipeToSelfしてメッセージがハンドリングするようにしないとスレッドセーフにならない可能性がある。これ慣れるまで間違えまくりそう
monitoring/supervision
- monitoringは死活監視、supervisionはrestartやmsgをハンドリングできないエラー時などactorRefを外から見ているだけでは気づけない状況も含む
interaction patterns(actor)
-
ここのnextBehavior()の
(keys, wallet) match {
って何をしてるんだろう- メソッドの中に格納されている変数に対してもパターンマッチが使えるから、それっぽい
-
?まだちょっとwrapすべきタイミング、委譲の仕方が理解できていない。このコードが読み解けていない
- Replyは何だろうか
- ただのジェネリクスだった
- ClassTagの例題に出てくるT*のアスタリスクは何だろう
- variable argっぽい。可変長引数
- ClassTagの例題に出てくるT*のアスタリスクは何だろう
- ただのジェネリクスだった
- aggregatorの責務は何だろうか
- sendRequest(その細部は知らない)を実行すること、sendRequestにmessageAdapterを渡すこと。sendRequestはmessageAdapterをreplyToに指定している。Hotel達にrequestを送信するので、Hotelからの返信はadapterに定義されたWrappedReplyとして帰ってくる
- wrappedReplyをAggregatorが受け取って、回答が足りていたらreplyTo(HotelCustomer)に返す
- aggreGateReplies(その細部は知らない)を実行すること
- aggregateRepliesはHotelCustomerが処理できるmessage形式に変換するのが責務。こうしておかないとAggregatorはHotelCustomerの型を正確に知る必要があるためAggregatorを他の機能に使えなくなる
- sendRequest(その細部は知らない)を実行すること、sendRequestにmessageAdapterを渡すこと。sendRequestはmessageAdapterをreplyToに指定している。Hotel達にrequestを送信するので、Hotelからの返信はadapterに定義されたWrappedReplyとして帰ってくる
- messageAdapterはReplyをWrappedReplyに変換しているけど、ReplyはAnyだから、全てのメッセージをWrappedReplyに変換することにならない?
- それとも実行時に Hotel1.RequestQuote(replyTo)の段階でreplyToはActorRef[Price]になるから、Priceに対してのみ反応するbehaviorになるのかな
- 試してみたらあらゆるメッセージをwrappedReplyに変換してたので、aggregateRepliesの中でcase unknownに入っていた。akka上で送信される全てのメッセージを受け取る、というわけではない!(ブロードキャスト的なものと勘違いしていた)
- akkaのテストでは「特定のメッセージに反応してはいけないこと」を保証するのが難しそう
- 誤ってAnyのmessageを受け取るようにしたら、本来反応してはいけないメッセージに反応してしまう
- でもこれは設計上は仕方ないのかな
- ?どうしてReplyだけは: ClassTagが必要なんだろう?Aggregateはいらないのに。Anyを選んだからかな
- messageAdapterに型を指定する必要があるため
- IOTのサンプルではメッセージ型が明確だったけど、今回はHotel1とHotel2のどちらが返ってくるか分からないため
- Hotel1とHotel2を横断する型があればAnyの代わりに使うことで: ClassTagは消せるんだろうか。union作るのTSよりだいぶ難しいな...
- Replyは何だろうか
Persistence
- ブロックチェーンぽいな
- データを削除できないから秘匿情報を削除しづらい、そのためにはGDPR for Akka Persistenceを使う
- ?deleteのためだけに周辺ツールを待たなきゃいけないのはちょっと柔軟性が足りない気もしてしまうけど、どうなんだろう
- ?eventSourcingは今のところnoSQL感がするけど、実際どういうDBと統合できるんだろう
- ?イベントを取得して現状を復旧している間に他のイベントが挿入されないことをどう防止するんだろう?RDBならテーブルとか行ロックできるけど、id=1に関するイベントの追加はできない、みたいなロックをかけるんだろうか
- イベントのインターフェースが変わった場合
- ?どうやって既存イベントのインターフェースを把握するんだろう?(現状どんなプロパティを持ったイベントが保存されているか〜とか。テーブルスキーマ的な静的な定義があるんだろうか)
- ?古いifのイベントが存在しないことを保証しない限りエンティティのリファクタリング(古いifに対応したメソッドの削除)に踏み切れなさそうだけど、イベントが存在しない+今後追加されないことをどうやって保証するんだろう?(RDBならスキーマに合致しないデータは絶対保存できないからスキーマが絶対だけど)
- ?Effect.persist()に含めたイベントは保存されるらしいけど、チュートリアルの状態では何もDBをセットアップしていない。どこに保存されてるんだろう?どうやってDB保存の設定を渡すんだろう
- default journal plugin is not configuredエラーが出た。やっぱダメか
- ?commandHandlerの中でEffect.persistを複数回呼び出したらどうなるんだろう?(最終的にreturnしたEffectだけが保存されるんだろうか)
- ?commonChainedEffects: MoodのMoodって何だろう
- behaviorをpersistedActorが変更することは許容されない。変更できてしまうとイベントをリプレイしている際にbehaviorもリプレイしなければいけないため(保存の難易度が上がったりリプレイの速度の問題だろうか)
- そのためstateに応じてcommandHandlerの挙動を切り替えるためにはcommandHandler自体を動的にすることが多い
- ?classの中でpostIdをvalではなくdefで定義しているのはどうしてだろう
- def -> 都度定義し直す
- val -> 一度定義されたら一定
- 今回の例だとeventHandlerの中でdraftStateを作成し直してるからvalでも良さそうな気がするけどな
- ?.thenRunの中に入ってるstateを使う場合に何か影響が出るんだろうか
- statusReplyはどこで定義されているんだろう
- ここ。akka.patternパッケージ
- withEnforcedRepliesを使えば、replyを忘れたときにコンパイラーが教えてくれる
- ?CborSerializableって何だろう
- ?journalって何だろう
- eventSourcedBehaviorがpersistしてside-effectを実行し終えるまで他のコマンドは受け付けないことが保証されている
- その間に受け取ったメッセージは全てstashされる
- ?一般的なアクターのメールボックスとは何が違うんだろう
- stashはメモリ上にのみ存在するためクラッシュしても再実行されない(メールボックスも同じだっけ?)
- Effect.stash()とEffect.persist().unstashAllで保存・取り出しができる
かとじゅんさんのwalletコードリーディング
- ?package.scalaにまとめる情報の範囲が気になる
- やっているのはwalletパッケージに対するpackage objectの定義
- import wallet._ で使用している
- wallet.newULID()、ファクトリ的な使い方をしている
- companion objectの使い方がなんとなくBalance.zeroでイメージついた
- ?balance.addしてzeroを下回るのってどういう状況なんだろう?
- ?actorRefはこれまで見てきたけどentityRefは何が違うんだろう
- ?message -> commandMessage -> CommandRequestってな具合にextendしている背景
- ?永続化に関わる大体のactorがCommandRequestという範囲の大きめなmsgを受け取るようにしている背景(割とどんなmsgも受け取れそう?)
- ?walletAggregateってどこで使われているんだろう?実プロダクトでは使われてなさそうな気配
- persistentじゃない場合、subscriberに対する通知を自前で実装しなければいけないからだろうか?でも基本的にchild.spawnしてきたから、全部通知届くはずだけどな...
- chargeとwithdrawの違い
- chargeは他の口座への入金も伴う、withdrawは自分の口座しか影響を受けない?
- 違う。chargeは送金予約的なもので、withdrawを実行したときに初めて残高に変更が生じる
- tddの本で見たwalletもこんな作りだった気がしてきた
- ?walletJournalReaderって何だろう
- ?どうしてwalletCreatedはserializationの対象なのにCreateWalletSucceededは対象外なんだろう?前者はpersistence機構から返ってくるからかな
処理の流れ
shardedWalletAggregate
- shardedWalletAggregateを初期化
- swaはCommandRequestを受け取る
- CommandRequestはコマンドのidとwalletのidが必須
- CommandRequestなど、wallet全体で交わされるプロトコルはadaptor配下にまとめられている
- domain/walletをakkaに一歳依存しない形にするためかな
- CommandRequestなど、wallet全体で交わされるプロトコルはadaptor配下にまとめられている
walletAggregates
- 初期化のタイミングでchildActor(WalletAggregates)をspawnする
- childActorは初期化のタイミングでは何もしない
- 以降CommandRequestを受け取ったら、childActor(PersistentWalletAggregate)をspawnする
- もし既にwalletIDに一致するWalletAggregateが存在したら何もしない
- spawn(もしくは参照のみ取得したaggregate)にmsgを渡す
- walletAggregates自身は自分がどんなbehaviorを持ったactorをspawnするのか知らない(behaviorF、多分factoryを引数に取っているので、ただこれを呼び出すにとどめている)。walletAggregateのactorを新たにspawnすべきか否か判断するだけの責務を持たせたいからかな。behaviorはそれ以降も特に変わらず
persistentWalletAggregate
- 当然commandHandler(msgを受け取った時)とeventHandler(それによりstateが更新された時)が定義されている
- commandHandler
- stateとcommandを引数に取ってEffect(Event,State)を返す
- ケース1
- walletがEmpty(初期状態)かつwallet作成のリクエストだった場合、かつwalletIdが自分(actor)に割り振られたwalletIdに一致する場合、createWalletRequestのtoEventを呼び出して対応するイベントを作成して通知する。今の所createWalletSucceededに対応しているactorは存在しない
- どうしてこれが必要なんだろう?context.spawnの時点でwalletは作成されているのでは?
- context.spawnの時点で作成されているのはactorで、永続化するのはこれが初めて
- 実際main.scalaの中で、walletRefを作成した後にcreateWalletRequestを飛ばしている
- msgの中で、そのmsgが成功したときに発生すべきイベントのファクトリを定義する方法は自分も真似してみよう
- createWalletRequestがpersistされたのはわかるけど、その後のstate更新はどこで行っているんだろう
- それはeventHandlerの1だけど、WalletCreatedとCreateWalletSucceededの違いが気になる。どちらもprotocolで定義されているけどeventHandlerで使うwalletCreatedの方はapplication.confでserializationのマッピングにも定義されているから、何らか永続化手段から発するものだろうか
- 勘違いしていた。toEventでwalletCreatedを返していた。
- ケース2
- walletがdefined(作成ずみ)かつdepositRequestだった場合、かつwalletIdが一致する場合
- depositRequestを速攻でpersistしてdomain/walletのdepositを呼び出す。ここでドメインロジックが絡んでくる
- wallet.depositは自分自身ではなくcopyしたオブジェクトを返す。この辺り慣れていかないと
- domainの処理(残高チェック)の結果に応じてreplyToに返信する。actorが常に1つしか存在しないことが保証されているので、他のトランザクションに割り込まれて残高を打ち消し合う恐れがない
- depositSucceededイベントをreplyToに返す(eventHandlerに行くわけではない)
- eventHandlerのWalletDepositedとdepositSucceededが若干混乱した。前者はDBから送られてくる?
- 勘違いしていた。前者はpersistに渡されているからそのままeventHandlerで処理される。後者はreplyToに渡している
- ?walletIdが自分と一致する場合〜みたいなマルチテナント的な概念を見落とすとバグが発生しそうだけどこの辺は外部ライブラリで担保できたりするんだろうか
- ケース4
- withdrawだった場合はwallet.withdrawする
- ?せっかくchargeにもmoneyが保持されているのに、どうしてwithdrawで改めてmoneyを指定しているんだろう?chargeを取得したらmoneyも導出できそうだけど
- eventHandler
- ケース1
- 初期化状態だったwalletに対してcreatedイベントが生じた時
- domainのWalletを初期化する。特にコンストラクタ内にロジックなし
- DefinedStateのプロパティとして指定して保存する
- 初期化状態だったwalletに対してcreatedイベントが生じた時
- ケース2
- 既に定義されたwalletに対してdepositedイベントが生じた時
- wallet.depositを実施する
- ?commandHandlerとeventHandlerの合計2回ドメインメソッドを呼び出してるけど、これどうにかならないのかな?persistするときに更新後のwalletも一緒に入れておいて、それをeventHandlerで受け取るとか。あんまりakkaらしくないのかな
- 無理か。wallet.depositの前にpersist先にしないといけないから(後でリプレイできるように)、persistの時点では更新後のwalletの値は取得できない
- ケース1