Daprのアクターを試してみた
これはDapr Advent Calendar 2021の24日目です。
概要
Daprにはアクターモデルを実現するための機能があります。
Daprの公式ドキュメントはこちら。
Microsoftが提供している.NET開発者向けDaprアクター機能の解説はこちら。
.NET開発者でなくとも参考になると思います。
Daprのアクターには次のような特徴があります。
- 自動的にアクティベートされる
- アクターインスタンスはクラスター全体へ分散配置される
- アクターインスタンスはIDによって識別され、同じIDに対するメソッド呼び出しは同じアクターインスタンスで行われる
- ターンベースのアクセスモデルにより1つのアクターインスタンスへの並行同時アクセスは発生しない
- アクターインスタンスを持つノードに障害が発生した場合、正常なノードへフェールオーバーされる
これらの特徴からStatefulでScalableなシステムを構築できるのが大きなメリットだと理解しています。
それからDaprのアクターには次のような機能があります。
- メソッド呼び出し
- 状態管理
- タイマー
- リマインダー
この記事ではこれらの特徴・機能について見ていくことにします。
コード例にはJava・Spring Bootを使用します。
アプリケーションは最低限HTTPさえ分かっていればDaprと会話ができますが、アクター機能はHTTP通信以外にもやることがあるためSDKを使います。
Java用のSDKはこちら。
ちなみにSDKはデフォルトでDaprとgRPC通信します。
この記事に掲載するコード例の完全版はこちら。
後からコードを追加したり修正する可能性があるため、この記事用にv1
というタグを作成しています。
はじめてのアクター
アクターを機能させるには最低限、次の2点が必要になります。
-
io.dapr.actors.runtime.AbstractActor
をextends
したクラス(以後、このクラスをアクタークラスと呼びます) - アクタークラスを
io.dapr.actors.runtime.ActorRuntime
へ登録するコード
それに加えてアクターのメソッド呼び出しを簡便にするため次のものを用意します。
- アクタークラスのメソッドを抽出したインターフェース(以後、このインターフェースをアクターインターフェースと呼びます)
はじめてのアクター実装
それではまずアクタークラスを作りましょう。
整数を持ちメソッドが呼び出されるたびにカウントアップするアクターを作ります。
package, import
package com.example.counter;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.ActorRuntimeContext;
public class CounterActorImpl extends AbstractActor implements CounterActor {
private int count;
public CounterActorImpl(ActorRuntimeContext<CounterActorImpl> runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public int count() {
count++;
return count;
}
}
AbstractActor
をextends
する以外は取り立てて特徴はありません。
次にメソッドを抽出してアクターインターフェースを作ります(なお、先に紹介したCounterActorImpl
はもうすでにCounterActor
をimplements
しています)。
package, import
package com.example.counter;
public interface CounterActor {
int count();
}
最後にアクタークラスをActorRuntime
へ登録するコードを書きます。
@Bean(destroyMethod = "close")
public ActorRuntime actorRuntime() {
ActorRuntime actorRuntime = ActorRuntime.getInstance();
actorRuntime.registerActor(CounterActorImpl.class);
return actorRuntime;
}
ActorRuntime
はjava.io.Closeable
をimplements
しているためSpringで管理してコンポーネントを破棄する際にclose
メソッドが呼ばれるようにしました。
これでアクターの準備はできたのですが、動作確認のためアクターのメソッドを呼び出すコントローラーを用意します。
package, import
package com.example.counter;
import java.util.Map;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorClient;
import io.dapr.actors.client.ActorProxyBuilder;
@RestController
public class CounterController {
private final ActorProxyBuilder<CounterActor> actorProxyBuilder;
public CounterController(ActorClient actorClient) {
this.actorProxyBuilder = new ActorProxyBuilder<>(
"CounterActorImpl",
CounterActor.class,
actorClient);
}
@GetMapping("/{id}")
public Object count(@PathVariable ActorId id) {
CounterActor counterActor = actorProxyBuilder.build(id);
int count = counterActor.count();
return Map.of("count", count);
}
}
ActorProxyBuilder
はアクターをHTTP(またはgRPC)で呼び出すためのDynamic Proxyを生成します。
コンストラクターの第1引数はアクターの種類、第2引数はProxyインターフェースです。
build
メソッドでProxyインスタンスを生成しますが、その際に渡しているのがアクターのIDで、このID単位でアクターインスタンスが作られます。
ここではパスパラメーターで受け取った値をアクターのIDとして使用しています。
コントローラーにインジェクションするActorClient
のコンポーネントも定義しておきます。
ActorRuntime
と同じようにコンポーネントを破棄する際、close
するようにしています。
@Bean(destroyMethod = "close")
public ActorClient actorClient() {
return new ActorClient();
}
CounterActorの動作確認
動作確認をしてみましょう。
Daprとアプリケーションを起動します。[1]
dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run
起動したらcurl
でリクエストを投げます。
アクターIDはfoo
とします。
curl localhost:8080/foo
次のJSONが返ってきました。
{"count":1}
もう一度実行しましょう。
!!
カウントがインクリメントされました。
{"count":2}
異なるアクターIDを指定してみましょう。
curl localhost:8080/bar
先程までとは異なるアクターインスタンスのメソッドが呼び出されたためカウントは1
になっています。
{"count":1}
処理の流れを図にするとこのようになります。
コントローラーから(Dynamic Proxy経由で)アクターのメソッドが呼び出されていますが、DaprがアクターIDを見て対応するアクターインスタンスへディスパッチしてくれています。
なお、コンソールを見るとDaprが次のようなログを出力しています。
※ログが横に長いためタイムスタンプやログレベルなどの情報は除いて記載しています。
io.dapr.actors.ActorTrace : Actor:foo Activating ...
io.dapr.actors.ActorTrace : Actor:foo Activated
io.dapr.actors.ActorTrace : Actor:bar Activating ...
io.dapr.actors.ActorTrace : Actor:bar Activated
このログからアクターはDaprによってアクターID毎に自動的にアクティベートされていることがわかります。
カウントを状態ストアに保持する
今の実装だとカウントは単にアクターのフィールドで保持しているだけなのでアプリケーションを再起動するとカウントはまた1
から開始します。
DaprのアクターはDaprのState management APIを使って状態管理ができるので、それでカウントを保持するようにします。
状態の保存と復元のタイミングや実装の方法はいくつか考えられますが、AbstractActor
のonActivate
メソッドとonPostActorMethod
メソッドをオーバーライドしてアクティベート時に状態を復元、メソッド呼び出し後に状態を保存するのが簡単かと思います。
まずonActivate
メソッドをオーバーライドして状態を復元するコードを書きます。
CounterActorImpl
に次のコードを追加します。
@Override
protected Mono<Void> onActivate() {
return getActorStateManager()
.get("count", int.class)
.flatMap(count -> {
this.count = count;
return Mono.empty();
});
}
ActorStateManager
のget
メソッドはReactorのMono
を返します。
flatMap
メソッド内で状態ストアから読み取った値をフィールドへ代入しています。
次にonPostActorMethod
メソッドをオーバーライドして状態を保存するコードを書きます。
CounterActorImpl
に次のコードを追加します。
@Override
protected Mono<Void> onPostActorMethod(ActorMethodContext actorMethodContext) {
return getActorStateManager()
.set("count", this.count);
}
これで状態の保存・復元ができるようになったので動作確認します。
アプリケーションを起動します。
dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run
curl
でリクエストを投げます。
curl localhost:8080/foo
次のJSONが返ってきました。
{"count":1}
もう一度実行しましょう。
!!
カウントがインクリメントされました。
{"count":2}
ここまでは以前のセクションと同じですね。
ではここでアプリケーションを再起動してみます。
ctrl + c
でアプリケーションを停止してから次のコマンドで再び起動します。
dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run
curl
でリクエストを投げます。
curl localhost:8080/foo
次のJSONが返ってきました。
{"count":3}
アプリケーションを再起動しても状態は維持していますね。
並行同時アクセスが行われないことを確認する
現在のカウントを取得してからインクリメントして返すまでの間にスリープを挟むメソッドを追加します。
そして5秒スリープするリクエストと1秒スリープするリクエストを順に投げて、後のリクエストが最初のリクエストを追い越さないことを確認します。
まずCounterActorImpl
へ次のメソッドを追加します。
@Override
public int countWithSleep(long sleep) {
int count = this.count;
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
return this.count = count + 1;
}
CounterActor
にもメソッド宣言を追加します。
int countWithSleep(long sleep);
それからCounterController
へ次のメソッドを追加します。
@GetMapping("/{id}/sleep/{sleep}")
public Object countWithSleep(@PathVariable long sleep, @PathVariable ActorId id) {
CounterActor counterActor = actorProxyBuilder.build(id);
int count = counterActor.countWithSleep(sleep);
return Map.of("count", count);
}
これで準備が整ったので動作確認をします。
アプリケーションを起動します。
dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run
それからターミナルを2つ起動して次の2つのcurl
コマンドを順に(ほぼ同時に)実行します。
curl localhost:8080/foo/sleep/5
curl localhost:8080/foo/sleep/1
すると、5秒待ってから1つめのターミナルで次のJSONが返ってきました。
{"count":4}
そしてそれから更に1秒待ってから2つめのターミナルで次のJSONが返ってきました。
{"count":5}
このことからアクターインスタンスには並行同時アクセスは発生せずシリアルに処理されることがわかります。
なお、アクターIDが異なる場合、つまりアクターインスタンスが異なる場合は並行同時アクセスできます。
2つのターミナルで次の2つのcurl
コマンドを実行することでそれがわかります。
curl localhost:8080/foo/sleep/5
curl localhost:8080/bar/sleep/1
まず1秒後に2つめのターミナルに結果が返ってきて、5秒後に1つめのターミナルに結果が返ってきました。
アクターインスタンスが異なれば並行同時アクセスできていますね。
ついでにもう1つ述べておくと、アクターインスタンスが同一なら異なるメソッドでも並行同時アクセスはできません。
2つのターミナルで次の2つのcurl
コマンドを実行することでそれがわかります。
curl localhost:8080/foo/sleep/5
curl localhost:8080/foo
5秒後に1つめのターミナルに結果が返ってきて、それとほぼ同時に2つめのターミナルに結果が返ってきました。
スリープをしないメソッド(CounterActorImpl.count
)がスリープするメソッド(CounterActorImpl.countWithSleep
)の終了を待たされたことがわかります。
Kubernetesクラスターへの分散配置
次はKubernetesへデプロイしてクラスター全体への分散配置とフェールオーバーを確認してみます。
動作確認で使用するため、まずCounterActorImpl
へメソッドを1つ追加します。
@Override
public List<Object> countAndHostname() {
count++;
return List.of(count, System.getenv("HOSTNAME"));
}
カウント値とホスト名の組を返しています。[2]
CounterActor
にもメソッド宣言を追加しておきます。
List<Object> countAndHostname();
CounterController
にもメソッドを追加します。
@GetMapping("/and-hostname/{id}")
public Object countAndHostname(@PathVariable ActorId id) {
CounterActor counterActor = actorProxyBuilder.build(id);
return counterActor.countAndHostname();
}
これでアクターが返すホスト名を見て配置の分散具合を確認できるようになったはずです。
Kubernetesの準備とデプロイ
アプリケーションのコンテナイメージを作ります。
./mvnw spring-boot:build-image
Kubernetes環境を準備します。
この記事ではKindを使用してローカルで試します。
KindでKubernetesクラスター構築
Kubernetesクラスターを作成します。[3]
kind create cluster --name dapr-actor-example
kubectl
のコンテキストを設定します。
kind export kubeconfig --name dapr-actor-example
先程作ったコンテナイメージをロードします。
kind load docker-image dapr-actor-example:v1 --name dapr-actor-example
DaprをKubernetesクラスターへインストールします。
dapr init -k
次のドキュメントに従ってRedisストアを作成します。
Redisストアの作成
HelmでRedisをインストールします。
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install redis bitnami/redis
次の内容をdeploy/kubernetes/redis-store.yaml
へ保存します。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
- name: actorStateStore
value: "true"
このYAMLファイルを適用します。
kubectl apply -f deploy/kubernetes/redis-store.yaml
これでRedisストアが作成されました。
アプリケーションをデプロイします。
次の内容をdeploy/kubernetes/counter.yaml
というファイルに保存します。
apiVersion: apps/v1
kind: Deployment
metadata:
name: counter
spec:
replicas: 3
selector:
matchLabels:
app: counter
template:
metadata:
labels:
app: counter
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "counter"
dapr.io/app-port: "8080"
spec:
containers:
- name: counter
image: dapr-actor-example:v1
imagePullPolicy: Never
---
apiVersion: v1
kind: Service
metadata:
name: counter
spec:
selector:
app: counter
ports:
- port: 8080
このYAMLファイルを適用します。
kubectl apply -f deploy/kubernetes/counter.yaml
アプリケーションがデプロイされたかkubectl get all
で確認してみます。
次のような出力になればデプロイされています。
NAME READY STATUS RESTARTS AGE
pod/counter-74fbbbbc4c-2fk8b 2/2 Running 0 4m11s
pod/counter-74fbbbbc4c-sp462 2/2 Running 0 4m11s
pod/counter-74fbbbbc4c-zn8r4 2/2 Running 0 4m11s
pod/redis-master-0 1/1 Running 0 114m
pod/redis-replicas-0 1/1 Running 0 114m
pod/redis-replicas-1 1/1 Running 0 113m
pod/redis-replicas-2 1/1 Running 0 113m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/counter ClusterIP 10.96.30.9 <none> 8080/TCP 4m11s
service/counter-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 4m11s
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 117m
service/redis-headless ClusterIP None <none> 6379/TCP 114m
service/redis-master ClusterIP 10.96.107.175 <none> 6379/TCP 114m
service/redis-replicas ClusterIP 10.96.208.221 <none> 6379/TCP 114m
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/counter 3/3 3 3 4m11s
NAME DESIRED CURRENT READY AGE
replicaset.apps/counter-74fbbbbc4c 3 3 3 4m11s
NAME READY AGE
statefulset.apps/redis-master 1/1 114m
statefulset.apps/redis-replicas 3/3 114m
NAME AGE
component.dapr.io/statestore 19m
分散配置とフェールオーバーの確認
アプリケーションがデプロイできたため動作確認をします。
アプリケーションは3つのPodで動いていますが、/and-hostname/{id}
に対していくつかのアクターIDでcurl
コマンドを試し、返ってきた値に含まれるホスト名を見ることで各Podへの分散具合を確認します。
私はMacを使っていますが、KindはServiceへポートフォワードする機能を持たないようなので、適当なコンテナを立ててその中からcurl
を実行します。
動作確認用のコンテナを立てます。
kubectl run -it --rm --restart=Never --image=curlimages/curl --command test -- sh
この動作確認用のコンテナ内で次のようにcurl
を実行します。
curl counter:8080/and-hostname/1
すると次のようにカウント値とホスト名が返ってきます。
[1,"counter-74fbbbbc4c-zn8r4"]
これを100個のアクターID(1
〜100
)に対して実行し[4]、集計したものが次の表です。
ホスト名 | アクター数 |
---|---|
counter-74fbbbbc4c-2fk8b |
34 |
counter-74fbbbbc4c-sp462 |
32 |
counter-74fbbbbc4c-zn8r4 |
34 |
100個のアクターインスタンスが3つのPodへ分散配置されていることがわかりました。
次にフェールオーバーの確認をします。
アクターIDfoo
を持つアクターインスタンスを配置します。
curl counter:8080/and-hostname/foo
[1,"counter-74fbbbbc4c-zn8r4"]
counter-74fbbbbc4c-zn8r4
というホスト名を持つPodへ配置されましたね。
では、そのcounter-74fbbbbc4c-zn8r4
を削除します。
kubectl delete pod counter-74fbbbbc4c-zn8r4
Podが削除できたらクラスター内の各Podの状態を確認します。
ここで見たいのはアプリケーションのPodなので--selector
で絞り込みます。
kubectl get pod --selector='app=counter'
NAME READY STATUS RESTARTS AGE
counter-74fbbbbc4c-2fk8b 2/2 Running 0 16m
counter-74fbbbbc4c-gvhvw 2/2 Running 0 2m20s
counter-74fbbbbc4c-sp462 2/2 Running 0 16m
counter-74fbbbbc4c-gvhvw
というPodが登場しましたが、これは先ほど削除したcounter-74fbbbbc4c-zn8r4
に代わり新しく生成されたものですね。
それでは再びアクターIDfoo
を指定してリクエストを投げます。
curl counter:8080/and-hostname/foo
すると次のJSONが返ってきました。
[2,"counter-74fbbbbc4c-gvhvw"]
新しく生成されたcounter-74fbbbbc4c-gvhvw
に再配置されました。
状態も引き継げていますね。
このことからPodが落ちてしまっても、そこに配置されていたアクターは別のPodへフェールオーバーされることが確認できました。
今回紹介しなかった機能
以上でこの記事におけるアクター機能の紹介を終わります。
今回は次の2つの機能を紹介しませんでしたが、また別の記事で紹介したいと思います。[5]
- タイマー
- リマインダー
それからアクティベーションは確認しましたが、非アクティベーション(つまりアクターインスタンスの破棄)の動作も見ておきたいですね。
所感
アクターモデルを実現するための機能を言語やフレームワークではなくてSidecarで提供しているところが面白いです。
最低限HTTPさえ扱えるなら、どんな言語でもアクターモデルを手に入れられることが良いですね。
ただ、最初の方でも述べたようにDaprのアクター機能はHTTP通信(またはgRPC通信)以外にもやることが多いため、SDKが用意されている言語を使うのが無難だと思いました。[6]
Daprは登場した時から気になってはいたんですが、実際に使う機会もなくあまり触ってきませんでした。
今回、アドベントカレンダーがきっかけで色々と触ってみてこれは良いものだと感じたので、これからも遊びたいと思います。
以上です。
-
Daprを初期化していない場合は
dapr init
しておいてください ↩︎ -
Javaにもタプルが欲しくなる瞬間、、、 ↩︎
-
削除するときは
kind delete cluster --name dapr-actor-example
です。 ↩︎ -
seq 100 | xargs -t -I{} curl counter:8080/and-hostname/{}
↩︎ -
当初はこの記事で紹介しようと考えていたのですが、記事がみるみる膨れ上がったので別の記事で紹介することにしました。 ↩︎
-
あと本当に単なる印象というかどうでも良い話ではあるんですが、アクターインスタンスをランタイム(Dapr)が管理していたりDynamic ProxyでRPCするところがなんとなくEJBっぽいなぁ、と思いました。EJBを本格的に使ったことはないんですけども。 ↩︎
Discussion