🐙

Proto.Actor(Go) でアクターモデル実践

2022/12/18に公開

Proto.Actor

https://proto.actor
Go(https://github.com/asynkron/protoactor-go)
.NET(https://github.com/asynkron/protoactor-dotnet)
Kotlin(https://github.com/asynkron/protoactor-kotlin)

今回は Go で実装していきます。
README.md にある通り、まだβ版とのことなので注意。

環境構築やアクターモデルの説明などはスキップして早速コードから。

Hello world

package main

import (
	"fmt"

	console "github.com/asynkron/goconsole"
	"github.com/asynkron/protoactor-go/actor"
)

// メッセージ
type Hello struct{ Who string }

// アクター
type HelloActor struct{}

func (h *HelloActor) Receive(context actor.Context) {
	// context.Message() で送信されたメッセージを取得
	switch msg := context.Message().(type) {
	case *Hello:
		fmt.Printf("Hello %v\n", msg.Who)
	}
}

func main() {
	// アクターシステムを生成
	system := actor.NewActorSystem()
	// アクターを生成するための props を作成
	props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
	// アクターを生成し、アクターを識別するためのPIDを受け取る
	pid := system.Root.Spawn(props)

	// 生成したアクターにメッセージを送信
	system.Root.Send(pid, &Hello{Who: "Roger"})

	_, _ = console.ReadLine()
}

実行すると以下のように表示されます。

Hello Roger

Hello はアクターに送信されるメッセージです。
HelloActorアクターで、 actor.Actor を実装しています。

type Actor interface {
	Receive(c Context)
}

加算器

少しアクターから離れて以下のコードを実行します。

type Adder struct {
	count int
}

func (a *Adder) Increment() {
	a.count += 1
	fmt.Printf("count: %d\n", a.count)
}

func main() {
	adder := Adder{
		count: 0,
	}

	go adder.Increment() // 1
	go adder.Increment() // 2
	go adder.Increment() // 3
	go adder.Increment() // 4
	go adder.Increment() // 5
	go adder.Increment() // 6 ?

	_, _ = console.ReadLine()
}

結果はその時によって異なりますが、私の場合は以下のように表示されました。

count: 2
count: 6
count: 1
count: 5
count: 3
count: 4

adder.Increment() が複数の goroutine で実行されていることが原因です。
sync.Mutex を利用した排他制御を行うことで正常に動作します。

type Adder struct {
	mu    sync.Mutex
	count int
}

func (a *Adder) Increment() {
	a.mu.Lock()

	a.count += 1
	fmt.Printf("count: %d\n", a.count)

	a.mu.Unlock()
}
count: 1
count: 2
count: 3
count: 4
count: 5
count: 6

アクターを利用する場合は、メールボックスに送信されたメッセージを1つずつ処理するので、このような排他制御を必要としません

type Increment struct{}
type Adder struct {
	count int
}

func (h *Adder) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *Increment:
		h.count += 1
		fmt.Printf("count: %d\n", h.count)
	}
}

func main() {
	system := actor.NewActorSystem()
	props := actor.PropsFromProducer(func() actor.Actor { return &Adder{count: 0} })
	pid := system.Root.Spawn(props)

	go system.Root.Send(pid, &Increment{})
	go system.Root.Send(pid, &Increment{})
	go system.Root.Send(pid, &Increment{})
	go system.Root.Send(pid, &Increment{})
	go system.Root.Send(pid, &Increment{})
	go system.Root.Send(pid, &Increment{})

	_, _ = console.ReadLine()
}
count: 1
count: 2
count: 3
count: 4
count: 5
count: 6

Behavior

Waiting と Processing の2つの状態を取る有限オートマトン(FSM)を、 Behavior を用いて作成します。

type Process struct{}
type Finish struct{}

type BehaviorActor struct {
	actor.Behavior
}

func (a *BehaviorActor) Waiting(context actor.Context) {
	switch context.Message().(type) {
	case *Process:
		fmt.Println("Received: Process")
		a.Become(a.Processing) // 状態を Processing に移行
		fmt.Println("Becomed: Processing")
	case *Finish:
		fmt.Println("Received: Finish")
	}
}

func (a *BehaviorActor) Processing(context actor.Context) {
	switch context.Message().(type) {
	case *Process:
		fmt.Println("Received: Process")
	case *Finish:
		fmt.Println("Received: Finish")
		a.Become(a.Waiting) // 状態を Waiting に移行
		fmt.Println("Becomed: Waiting")
	}
}

func NewBehaviorActor() actor.Actor {
	act := &BehaviorActor{
		actor.NewBehavior(),
	}

	act.Become(act.Waiting) // 初期状態は Waiting

	return act
}

func main() {
	root := actor.NewActorSystem().Root
	props := actor.PropsFromProducer(NewBehaviorActor)
	pid := root.Spawn(props)

	root.Send(pid, &Process{}) // Waiting -> Processing
	root.Send(pid, &Process{}) // 状態は移行しない

	root.Send(pid, &Finish{}) // Processing -> Waiting
	root.Send(pid, &Finish{}) // 状態は移行しない

	_, _ = console.ReadLine()
}
Received: Process
Becomed: Processing
Received: Process
Received: Finish
Becomed: Waiting
Received: Finish

actor.NewBehavior() で Behavior を作成できる。
Behavior は以下のメソッドを提供しています。

func (b *Behavior) Become(receive ReceiveFunc) {...}

func (b *Behavior) BecomeStacked(receive ReceiveFunc) {...}

func (b *Behavior) UnbecomeStacked() {...}

func (b *Behavior) Receive(context Context) {...}

今回は Become を用いて直接状態を変移させていますが、 Push/Pop のような動作も可能。
また、 Receive を実装しているので、 actor.Actor としても振る舞う。

ライフサイクル

type Actor struct{}

func (a *Actor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started")
	case *actor.Stopping:
		fmt.Println("Stopping")
	case *actor.Stopped:
		fmt.Println("Stopped")
	}
}

func main() {
	root := actor.NewActorSystem().Root
	props := actor.PropsFromProducer(func() actor.Actor { return &Actor{} })
	pid := root.Spawn(props)

	root.Stop(pid) // アクターを停止

	_, _ = console.ReadLine()
}
Started
Stopping
Stopped

アクターが開始されると Started が送信される。
また、アクターの停止時には Stopping 及び Stopped が送信される。

Supervision

アクターは子アクターを生成でき、親子関係を持てる。
Supervisor は子アクターに予期せぬエラーが発生した場合、それを監督できます。

type Panic struct{}
type ParentActor struct{}

func (a *ParentActor) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *Panic:
		props := actor.PropsFromProducer(NewChildActor)
		child := context.Spawn(props) // 子アクターを生成
		context.Send(child, msg)      // 子アクターに Panic メッセージを送信
	}
}

func NewParentActor() actor.Actor {
	return &ParentActor{}
}

type ChildActor struct{}

func (a *ChildActor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started")
	case *actor.Stopping:
		fmt.Println("Stopping")
	case *actor.Stopped:
		fmt.Println("Stopped")
	case *actor.Restarting:
		fmt.Println("Restarting")
	case *Panic:
		panic("Panic!") // panic 発生
	}
}

func NewChildActor() actor.Actor {
	return &ChildActor{}
}

func main() {
	root := actor.NewActorSystem().Root

	decider := func(reason interface{}) actor.Directive {
		fmt.Printf("Handling failure: \"%v\"\n", reason)

		return actor.StopDirective
		// return actor.RestartDirective
		// return actor.ResumeDirective
	}

	// リトライ上限と時間を渡す
	supervisor := actor.NewOneForOneStrategy(10, time.Nanosecond, decider)
	// Supervisor としてParentActor を作成する
	props := actor.PropsFromProducer(
		NewParentActor,
		actor.WithSupervisor(supervisor),
	)
	pid := root.Spawn(props)

	// 子アクターが生成され、子アクター内でpanicが発生
	root.Send(pid, &Panic{})

	_, _ = console.ReadLine()
}

エラーログと共に以下のように出力される。

Started
Handling failure: "Panic!"
Stopping
Stopped

decider はパニック内容を受け取り actor.Directive を返します。
Directive については次の通り

  • StopDirective: アクターを停止(Stopping->Stopped)
  • RestartDirective: アクターを再起動(Restarting->Started)
  • ResumeDirective: アクターを再開
  • EscalateDirective: 失敗処理を親まで上げる

actor.WithSupervisor には SupervisorStrategy の実装を渡します。

type SupervisorStrategy interface {
	HandleFailure(actorSystem *ActorSystem, supervisor Supervisor, child *PID, rs *RestartStatistics, reason interface{}, message interface{})
}

標準で実装されているものは以下の関数から利用できる

  • NewOneForOneStrategy
  • NewAllForOneStrategy
  • NewExponentialBackoffStrategy
  • NewRestartingStrategy

MailboxMiddleware

メールボックスにミドルウェアを使うことができる。
ミドルウェアは以下のインターフェイスを実装して下さい。

type MailboxMiddleware interface {
	MailboxStarted()
	MessagePosted(message interface{})
	MessageReceived(message interface{})
	MailboxEmpty()
}

以下のコードでミドルウェアの動作を確認します。

type Logger struct{}

func (l *Logger) MailboxStarted() {
	fmt.Println("Mailbox started")
}

func (l *Logger) MessagePosted(msg interface{}) {
	fmt.Printf("Message posted %T\n", msg)
}

func (l *Logger) MessageReceived(msg interface{}) {
	fmt.Printf("Message received %T\n", msg)
}

func (l *Logger) MailboxEmpty() {
	fmt.Println("Mailbox empty")
}

type Hello struct{}

type Actor struct{}

func (a *Actor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started")
	case *Hello:
		fmt.Println("Hello")
	}
}

func NewActor() actor.Actor {
	return &Actor{}
}

func main() {
	root := actor.NewActorSystem().Root
	props := actor.PropsFromProducer(
		NewActor,
		actor.WithMailbox(actor.Unbounded(&Logger{})),
	)
	pid := root.Spawn(props)

	root.Send(pid, &Hello{})

	_, _ = console.ReadLine()
}
Message posted *actor.Started
Mailbox started
Message posted *main.Hello
Started
Message received *actor.Started
Hello
Message received *main.Hello
Mailbox empty

実行結果から分かる通り、 アクターにメッセージが送信されると MessagePosted が呼び出され、アクターがメッセージを処理すると MessageReceived が呼び出されます。

Request/Response

type Ping struct{}

type Actor struct{}

func (a *Actor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *Ping:
		context.Respond("Pong")
	}
}

func NewActor() actor.Actor {
	return &Actor{}
}

func main() {
	root := actor.NewActorSystem().Root
	props := actor.PropsFromProducer(NewActor)
	pid := root.Spawn(props)

	future := root.RequestFuture(pid, &Ping{}, 30*time.Second) // 30秒でタイムアウト
	res, _ := future.Result() // 結果を待つ

	fmt.Println(res)

	_, _ = console.ReadLine()
}
Pong

タイムアウトした場合、 actor.ErrTimeout を返す。

func (a *Actor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *Ping:
		time.Sleep(time.Second * 2) // 2秒間待機
		context.Respond("Pong")
	}
}

func NewActor() actor.Actor {
	return &Actor{}
}

func main() {
	root := actor.NewActorSystem().Root
	props := actor.PropsFromProducer(NewActor)
	pid := root.Spawn(props)

	// 1秒しか待たないのでタイムアウト
	future := root.RequestFuture(pid, &Ping{}, 1*time.Second)
	_, err := future.Result()

	if err == actor.ErrTimeout {
		fmt.Println(err.Error())
	}

	_, _ = console.ReadLine()
}
future: timeout

最後に

もう少し書くつもりでしたが少し長くなったので区切ります。
恐らく続きを書くと思います( remote とか イベントソーシングについて)
ここまで読んで頂きありがとうございます。参考になれば幸いです。

Discussion