Proto.Actor(Go) でアクターモデル実践
Proto.Actor
https://github.com/asynkron/protoactor-go)
.NET(https://github.com/asynkron/protoactor-dotnet)
Kotlin(https://github.com/asynkron/protoactor-kotlin)
今回は Go で実装していきます。
README.md にある通り、まだβ版とのことなので注意。
環境構築やアクターモデルの説明などはスキップして早速コードから。
Hello world
package main
import (
"fmt"
console "github.com/asynkron/goconsole"
"github.com/asynkron/protoactor-go/actor"
)
// メッセージ
type Hello struct{ Who string }
// アクター
type HelloActor struct{}
func (h *HelloActor) Receive(context actor.Context) {
// context.Message() で送信されたメッセージを取得
switch msg := context.Message().(type) {
case *Hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
// アクターシステムを生成
system := actor.NewActorSystem()
// アクターを生成するための props を作成
props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
// アクターを生成し、アクターを識別するためのPIDを受け取る
pid := system.Root.Spawn(props)
// 生成したアクターにメッセージを送信
system.Root.Send(pid, &Hello{Who: "Roger"})
_, _ = console.ReadLine()
}
実行すると以下のように表示されます。
Hello Roger
Hello
はアクターに送信されるメッセージです。
HelloActor
はアクターで、 actor.Actor
を実装しています。
type Actor interface {
Receive(c Context)
}
加算器
少しアクターから離れて以下のコードを実行します。
type Adder struct {
count int
}
func (a *Adder) Increment() {
a.count += 1
fmt.Printf("count: %d\n", a.count)
}
func main() {
adder := Adder{
count: 0,
}
go adder.Increment() // 1
go adder.Increment() // 2
go adder.Increment() // 3
go adder.Increment() // 4
go adder.Increment() // 5
go adder.Increment() // 6 ?
_, _ = console.ReadLine()
}
結果はその時によって異なりますが、私の場合は以下のように表示されました。
count: 2
count: 6
count: 1
count: 5
count: 3
count: 4
adder.Increment()
が複数の goroutine で実行されていることが原因です。
sync.Mutex
を利用した排他制御を行うことで正常に動作します。
type Adder struct {
mu sync.Mutex
count int
}
func (a *Adder) Increment() {
a.mu.Lock()
a.count += 1
fmt.Printf("count: %d\n", a.count)
a.mu.Unlock()
}
count: 1
count: 2
count: 3
count: 4
count: 5
count: 6
アクターを利用する場合は、メールボックスに送信されたメッセージを1つずつ処理するので、このような排他制御を必要としません。
type Increment struct{}
type Adder struct {
count int
}
func (h *Adder) Receive(context actor.Context) {
switch context.Message().(type) {
case *Increment:
h.count += 1
fmt.Printf("count: %d\n", h.count)
}
}
func main() {
system := actor.NewActorSystem()
props := actor.PropsFromProducer(func() actor.Actor { return &Adder{count: 0} })
pid := system.Root.Spawn(props)
go system.Root.Send(pid, &Increment{})
go system.Root.Send(pid, &Increment{})
go system.Root.Send(pid, &Increment{})
go system.Root.Send(pid, &Increment{})
go system.Root.Send(pid, &Increment{})
go system.Root.Send(pid, &Increment{})
_, _ = console.ReadLine()
}
count: 1
count: 2
count: 3
count: 4
count: 5
count: 6
Behavior
Waiting と Processing の2つの状態を取る有限オートマトン(FSM)を、 Behavior を用いて作成します。
type Process struct{}
type Finish struct{}
type BehaviorActor struct {
actor.Behavior
}
func (a *BehaviorActor) Waiting(context actor.Context) {
switch context.Message().(type) {
case *Process:
fmt.Println("Received: Process")
a.Become(a.Processing) // 状態を Processing に移行
fmt.Println("Becomed: Processing")
case *Finish:
fmt.Println("Received: Finish")
}
}
func (a *BehaviorActor) Processing(context actor.Context) {
switch context.Message().(type) {
case *Process:
fmt.Println("Received: Process")
case *Finish:
fmt.Println("Received: Finish")
a.Become(a.Waiting) // 状態を Waiting に移行
fmt.Println("Becomed: Waiting")
}
}
func NewBehaviorActor() actor.Actor {
act := &BehaviorActor{
actor.NewBehavior(),
}
act.Become(act.Waiting) // 初期状態は Waiting
return act
}
func main() {
root := actor.NewActorSystem().Root
props := actor.PropsFromProducer(NewBehaviorActor)
pid := root.Spawn(props)
root.Send(pid, &Process{}) // Waiting -> Processing
root.Send(pid, &Process{}) // 状態は移行しない
root.Send(pid, &Finish{}) // Processing -> Waiting
root.Send(pid, &Finish{}) // 状態は移行しない
_, _ = console.ReadLine()
}
Received: Process
Becomed: Processing
Received: Process
Received: Finish
Becomed: Waiting
Received: Finish
actor.NewBehavior()
で Behavior を作成できる。
Behavior は以下のメソッドを提供しています。
func (b *Behavior) Become(receive ReceiveFunc) {...}
func (b *Behavior) BecomeStacked(receive ReceiveFunc) {...}
func (b *Behavior) UnbecomeStacked() {...}
func (b *Behavior) Receive(context Context) {...}
今回は Become
を用いて直接状態を変移させていますが、 Push/Pop のような動作も可能。
また、 Receive
を実装しているので、 actor.Actor
としても振る舞う。
ライフサイクル
type Actor struct{}
func (a *Actor) Receive(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
fmt.Println("Started")
case *actor.Stopping:
fmt.Println("Stopping")
case *actor.Stopped:
fmt.Println("Stopped")
}
}
func main() {
root := actor.NewActorSystem().Root
props := actor.PropsFromProducer(func() actor.Actor { return &Actor{} })
pid := root.Spawn(props)
root.Stop(pid) // アクターを停止
_, _ = console.ReadLine()
}
Started
Stopping
Stopped
アクターが開始されると Started
が送信される。
また、アクターの停止時には Stopping
及び Stopped
が送信される。
Supervision
アクターは子アクターを生成でき、親子関係を持てる。
Supervisor は子アクターに予期せぬエラーが発生した場合、それを監督できます。
type Panic struct{}
type ParentActor struct{}
func (a *ParentActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *Panic:
props := actor.PropsFromProducer(NewChildActor)
child := context.Spawn(props) // 子アクターを生成
context.Send(child, msg) // 子アクターに Panic メッセージを送信
}
}
func NewParentActor() actor.Actor {
return &ParentActor{}
}
type ChildActor struct{}
func (a *ChildActor) Receive(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
fmt.Println("Started")
case *actor.Stopping:
fmt.Println("Stopping")
case *actor.Stopped:
fmt.Println("Stopped")
case *actor.Restarting:
fmt.Println("Restarting")
case *Panic:
panic("Panic!") // panic 発生
}
}
func NewChildActor() actor.Actor {
return &ChildActor{}
}
func main() {
root := actor.NewActorSystem().Root
decider := func(reason interface{}) actor.Directive {
fmt.Printf("Handling failure: \"%v\"\n", reason)
return actor.StopDirective
// return actor.RestartDirective
// return actor.ResumeDirective
}
// リトライ上限と時間を渡す
supervisor := actor.NewOneForOneStrategy(10, time.Nanosecond, decider)
// Supervisor としてParentActor を作成する
props := actor.PropsFromProducer(
NewParentActor,
actor.WithSupervisor(supervisor),
)
pid := root.Spawn(props)
// 子アクターが生成され、子アクター内でpanicが発生
root.Send(pid, &Panic{})
_, _ = console.ReadLine()
}
エラーログと共に以下のように出力される。
Started
Handling failure: "Panic!"
Stopping
Stopped
decider
はパニック内容を受け取り actor.Directive
を返します。
Directive については次の通り
- StopDirective: アクターを停止(Stopping->Stopped)
- RestartDirective: アクターを再起動(Restarting->Started)
- ResumeDirective: アクターを再開
- EscalateDirective: 失敗処理を親まで上げる
actor.WithSupervisor
には SupervisorStrategy
の実装を渡します。
type SupervisorStrategy interface {
HandleFailure(actorSystem *ActorSystem, supervisor Supervisor, child *PID, rs *RestartStatistics, reason interface{}, message interface{})
}
標準で実装されているものは以下の関数から利用できる
- NewOneForOneStrategy
- NewAllForOneStrategy
- NewExponentialBackoffStrategy
- NewRestartingStrategy
MailboxMiddleware
メールボックスにミドルウェアを使うことができる。
ミドルウェアは以下のインターフェイスを実装して下さい。
type MailboxMiddleware interface {
MailboxStarted()
MessagePosted(message interface{})
MessageReceived(message interface{})
MailboxEmpty()
}
以下のコードでミドルウェアの動作を確認します。
type Logger struct{}
func (l *Logger) MailboxStarted() {
fmt.Println("Mailbox started")
}
func (l *Logger) MessagePosted(msg interface{}) {
fmt.Printf("Message posted %T\n", msg)
}
func (l *Logger) MessageReceived(msg interface{}) {
fmt.Printf("Message received %T\n", msg)
}
func (l *Logger) MailboxEmpty() {
fmt.Println("Mailbox empty")
}
type Hello struct{}
type Actor struct{}
func (a *Actor) Receive(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
fmt.Println("Started")
case *Hello:
fmt.Println("Hello")
}
}
func NewActor() actor.Actor {
return &Actor{}
}
func main() {
root := actor.NewActorSystem().Root
props := actor.PropsFromProducer(
NewActor,
actor.WithMailbox(actor.Unbounded(&Logger{})),
)
pid := root.Spawn(props)
root.Send(pid, &Hello{})
_, _ = console.ReadLine()
}
Message posted *actor.Started
Mailbox started
Message posted *main.Hello
Started
Message received *actor.Started
Hello
Message received *main.Hello
Mailbox empty
実行結果から分かる通り、 アクターにメッセージが送信されると MessagePosted が呼び出され、アクターがメッセージを処理すると MessageReceived が呼び出されます。
Request/Response
type Ping struct{}
type Actor struct{}
func (a *Actor) Receive(context actor.Context) {
switch context.Message().(type) {
case *Ping:
context.Respond("Pong")
}
}
func NewActor() actor.Actor {
return &Actor{}
}
func main() {
root := actor.NewActorSystem().Root
props := actor.PropsFromProducer(NewActor)
pid := root.Spawn(props)
future := root.RequestFuture(pid, &Ping{}, 30*time.Second) // 30秒でタイムアウト
res, _ := future.Result() // 結果を待つ
fmt.Println(res)
_, _ = console.ReadLine()
}
Pong
タイムアウトした場合、 actor.ErrTimeout
を返す。
func (a *Actor) Receive(context actor.Context) {
switch context.Message().(type) {
case *Ping:
time.Sleep(time.Second * 2) // 2秒間待機
context.Respond("Pong")
}
}
func NewActor() actor.Actor {
return &Actor{}
}
func main() {
root := actor.NewActorSystem().Root
props := actor.PropsFromProducer(NewActor)
pid := root.Spawn(props)
// 1秒しか待たないのでタイムアウト
future := root.RequestFuture(pid, &Ping{}, 1*time.Second)
_, err := future.Result()
if err == actor.ErrTimeout {
fmt.Println(err.Error())
}
_, _ = console.ReadLine()
}
future: timeout
最後に
もう少し書くつもりでしたが少し長くなったので区切ります。
恐らく続きを書くと思います( remote とか イベントソーシングについて)
ここまで読んで頂きありがとうございます。参考になれば幸いです。
Discussion