⛄️

Daprのアクターを試してみた

18 min read

これはDapr Advent Calendar 2021の24日目です。

概要

Daprにはアクターモデルを実現するための機能があります。

Daprの公式ドキュメントはこちら。

https://docs.dapr.io/developing-applications/building-blocks/actors/actors-overview/

Microsoftが提供している.NET開発者向けDaprアクター機能の解説はこちら。
.NET開発者でなくとも参考になると思います。

https://docs.microsoft.com/ja-jp/dotnet/architecture/dapr-for-net-developers/actors

Daprのアクターには次のような特徴があります。

  • 自動的にアクティベートされる
  • アクターインスタンスはクラスター全体へ分散配置される
  • アクターインスタンスはIDによって識別され、同じIDに対するメソッド呼び出しは同じアクターインスタンスで行われる
  • ターンベースのアクセスモデルにより1つのアクターインスタンスへの並行同時アクセスは発生しない
  • アクターインスタンスを持つノードに障害が発生した場合、正常なノードへフェールオーバーされる

これらの特徴からStatefulでScalableなシステムを構築できるのが大きなメリットだと理解しています。

それからDaprのアクターには次のような機能があります。

  • メソッド呼び出し
  • 状態管理
  • タイマー
  • リマインダー

この記事ではこれらの特徴・機能について見ていくことにします。

アクターモデルについては私も理解が深いとは言えないためアクターモデルそのものの詳細な解説は省略します。

コード例にはJava・Spring Bootを使用します。
アプリケーションは最低限HTTPさえ分かっていればDaprと会話ができますが、アクター機能はHTTP通信以外にもやることがあるためSDKを使います。

Java用のSDKはこちら。
ちなみにSDKはデフォルトでDaprとgRPC通信します。

https://github.com/dapr/java-sdk

この記事に掲載するコード例の完全版はこちら。

https://github.com/backpaper0/dapr-actor-example/tree/v1

後からコードを追加したり修正する可能性があるため、この記事用にv1というタグを作成しています。

はじめてのアクター

アクターを機能させるには最低限、次の2点が必要になります。

  • io.dapr.actors.runtime.AbstractActorextendsしたクラス(以後、このクラスをアクタークラスと呼びます)
  • アクタークラスをio.dapr.actors.runtime.ActorRuntimeへ登録するコード

それに加えてアクターのメソッド呼び出しを簡便にするため次のものを用意します。

  • アクタークラスのメソッドを抽出したインターフェース(以後、このインターフェースをアクターインターフェースと呼びます)

はじめてのアクター実装

それではまずアクタークラスを作りましょう。
整数を持ちメソッドが呼び出されるたびにカウントアップするアクターを作ります。

package, import
package com.example.counter;

import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.ActorRuntimeContext;
public class CounterActorImpl extends AbstractActor implements CounterActor {

    private int count;

    public CounterActorImpl(ActorRuntimeContext<CounterActorImpl> runtimeContext, ActorId id) {
        super(runtimeContext, id);
    }

    @Override
    public int count() {
        count++;
        return count;
    }
}

AbstractActorextendsする以外は取り立てて特徴はありません。

次にメソッドを抽出してアクターインターフェースを作ります(なお、先に紹介したCounterActorImplはもうすでにCounterActorimplementsしています)。

package, import
package com.example.counter;
public interface CounterActor {

    int count();
}

最後にアクタークラスをActorRuntimeへ登録するコードを書きます。

    @Bean(destroyMethod = "close")
    public ActorRuntime actorRuntime() {
        ActorRuntime actorRuntime = ActorRuntime.getInstance();
        actorRuntime.registerActor(CounterActorImpl.class);
        return actorRuntime;
    }

ActorRuntimejava.io.CloseableimplementsしているためSpringで管理してコンポーネントを破棄する際にcloseメソッドが呼ばれるようにしました。

これでアクターの準備はできたのですが、動作確認のためアクターのメソッドを呼び出すコントローラーを用意します。

package, import
package com.example.counter;

import java.util.Map;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorClient;
import io.dapr.actors.client.ActorProxyBuilder;
@RestController
public class CounterController {

    private final ActorProxyBuilder<CounterActor> actorProxyBuilder;

    public CounterController(ActorClient actorClient) {
        this.actorProxyBuilder = new ActorProxyBuilder<>(
                "CounterActorImpl",
                CounterActor.class,
                actorClient);
    }

    @GetMapping("/{id}")
    public Object count(@PathVariable ActorId id) {
        CounterActor counterActor = actorProxyBuilder.build(id);
        int count = counterActor.count();
        return Map.of("count", count);
    }
}

ActorProxyBuilderはアクターをHTTP(またはgRPC)で呼び出すためのDynamic Proxyを生成します。
コンストラクターの第1引数はアクターの種類、第2引数はProxyインターフェースです。

buildメソッドでProxyインスタンスを生成しますが、その際に渡しているのがアクターのIDで、このID単位でアクターインスタンスが作られます。
ここではパスパラメーターで受け取った値をアクターのIDとして使用しています。

コントローラーにインジェクションするActorClientのコンポーネントも定義しておきます。
ActorRuntimeと同じようにコンポーネントを破棄する際、closeするようにしています。

    @Bean(destroyMethod = "close")
    public ActorClient actorClient() {
        return new ActorClient();
    }

CounterActorの動作確認

動作確認をしてみましょう。

Daprとアプリケーションを起動します。[1]

dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run

起動したらcurlでリクエストを投げます。
アクターIDはfooとします。

curl localhost:8080/foo

次のJSONが返ってきました。

{"count":1}

もう一度実行しましょう。

!!

カウントがインクリメントされました。

{"count":2}

異なるアクターIDを指定してみましょう。

curl localhost:8080/bar

先程までとは異なるアクターインスタンスのメソッドが呼び出されたためカウントは1になっています。

{"count":1}

処理の流れを図にするとこのようになります。

コントローラーから(Dynamic Proxy経由で)アクターのメソッドが呼び出されていますが、DaprがアクターIDを見て対応するアクターインスタンスへディスパッチしてくれています。

説明を簡単にするためCounterActorImplCounterActorCounterControllerを含む1つのアプリケーションとしてコード例は用意しましたがコントローラーからアクターのメソッドを呼び出す際はDaprを経由したHTTP/gRPC通信が行われています。

実際はCounterActorImplを含むアプリケーションとCounterControllerを含むアプリケーションに分けて、各アプリケーションがCounterActorを含む共有ライブラリを参照する形にするのが良さそうです。

なお、コンソールを見るとDaprが次のようなログを出力しています。
※ログが横に長いためタイムスタンプやログレベルなどの情報は除いて記載しています。

io.dapr.actors.ActorTrace                : Actor:foo Activating ...
io.dapr.actors.ActorTrace                : Actor:foo Activated
io.dapr.actors.ActorTrace                : Actor:bar Activating ...
io.dapr.actors.ActorTrace                : Actor:bar Activated

このログからアクターはDaprによってアクターID毎に自動的にアクティベートされていることがわかります。

カウントを状態ストアに保持する

今の実装だとカウントは単にアクターのフィールドで保持しているだけなのでアプリケーションを再起動するとカウントはまた1から開始します。
DaprのアクターはDaprのState management APIを使って状態管理ができるので、それでカウントを保持するようにします。

状態の保存と復元のタイミングや実装の方法はいくつか考えられますが、AbstractActoronActivateメソッドとonPostActorMethodメソッドをオーバーライドしてアクティベート時に状態を復元、メソッド呼び出し後に状態を保存するのが簡単かと思います。

まずonActivateメソッドをオーバーライドして状態を復元するコードを書きます。
CounterActorImplに次のコードを追加します。

    @Override
    protected Mono<Void> onActivate() {
        return getActorStateManager()
                .get("count", int.class)
                .flatMap(count -> {
                  this.count = count;
                  return Mono.empty();
                });
    }

ActorStateManagergetメソッドはReactorMonoを返します。
flatMapメソッド内で状態ストアから読み取った値をフィールドへ代入しています。

私の感覚からするとflatMapメソッド内でインスタンスの状態を変更するのはとても微妙で、本来はcountメソッド内で状態ストアから値を読み取ってメソッドチェーンで計算を行い、Mono<Integer>を返すのが良いと思います。
ただ、この記事ではReactorの解説をしたいわけではないため、このような形のコードとしています。

次にonPostActorMethodメソッドをオーバーライドして状態を保存するコードを書きます。
CounterActorImplに次のコードを追加します。

    @Override
    protected Mono<Void> onPostActorMethod(ActorMethodContext actorMethodContext) {
        return getActorStateManager()
                .set("count", this.count);
    }

これで状態の保存・復元ができるようになったので動作確認します。

アプリケーションを起動します。

dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run

curlでリクエストを投げます。

curl localhost:8080/foo

次のJSONが返ってきました。

{"count":1}

もう一度実行しましょう。

!!

カウントがインクリメントされました。

{"count":2}

ここまでは以前のセクションと同じですね。

ではここでアプリケーションを再起動してみます。
ctrl + cでアプリケーションを停止してから次のコマンドで再び起動します。

dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run

curlでリクエストを投げます。

curl localhost:8080/foo

次のJSONが返ってきました。

{"count":3}

アプリケーションを再起動しても状態は維持していますね。

並行同時アクセスが行われないことを確認する

現在のカウントを取得してからインクリメントして返すまでの間にスリープを挟むメソッドを追加します。
そして5秒スリープするリクエストと1秒スリープするリクエストを順に投げて、後のリクエストが最初のリクエストを追い越さないことを確認します。

まずCounterActorImplへ次のメソッドを追加します。

    @Override
    public int countWithSleep(long sleep) {
        int count = this.count;
        try {
            TimeUnit.SECONDS.sleep(sleep);
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        return this.count = count + 1;
    }

CounterActorにもメソッド宣言を追加します。

    int countWithSleep(long sleep);

それからCounterControllerへ次のメソッドを追加します。

    @GetMapping("/{id}/sleep/{sleep}")
    public Object countWithSleep(@PathVariable long sleep, @PathVariable ActorId id) {
        CounterActor counterActor = actorProxyBuilder.build(id);
        int count = counterActor.countWithSleep(sleep);
        return Map.of("count", count);
    }

これで準備が整ったので動作確認をします。

アプリケーションを起動します。

dapr run --app-id counter --app-port 8080 ./mvnw spring-boot:run

それからターミナルを2つ起動して次の2つのcurlコマンドを順に(ほぼ同時に)実行します。

curl localhost:8080/foo/sleep/5
curl localhost:8080/foo/sleep/1

すると、5秒待ってから1つめのターミナルで次のJSONが返ってきました。

{"count":4}

そしてそれから更に1秒待ってから2つめのターミナルで次のJSONが返ってきました。

{"count":5}

このことからアクターインスタンスには並行同時アクセスは発生せずシリアルに処理されることがわかります。

なお、アクターIDが異なる場合、つまりアクターインスタンスが異なる場合は並行同時アクセスできます。
2つのターミナルで次の2つのcurlコマンドを実行することでそれがわかります。

curl localhost:8080/foo/sleep/5
curl localhost:8080/bar/sleep/1

まず1秒後に2つめのターミナルに結果が返ってきて、5秒後に1つめのターミナルに結果が返ってきました。
アクターインスタンスが異なれば並行同時アクセスできていますね。

ついでにもう1つ述べておくと、アクターインスタンスが同一なら異なるメソッドでも並行同時アクセスはできません。
2つのターミナルで次の2つのcurlコマンドを実行することでそれがわかります。

curl localhost:8080/foo/sleep/5
curl localhost:8080/foo

5秒後に1つめのターミナルに結果が返ってきて、それとほぼ同時に2つめのターミナルに結果が返ってきました。
スリープをしないメソッド(CounterActorImpl.count)がスリープするメソッド(CounterActorImpl.countWithSleep)の終了を待たされたことがわかります。

Kubernetesクラスターへの分散配置

次はKubernetesへデプロイしてクラスター全体への分散配置とフェールオーバーを確認してみます。

動作確認で使用するため、まずCounterActorImplへメソッドを1つ追加します。

    @Override
    public List<Object> countAndHostname() {
        count++;
        return List.of(count, System.getenv("HOSTNAME"));
    }

カウント値とホスト名の組を返しています。[2]

CounterActorにもメソッド宣言を追加しておきます。

    List<Object> countAndHostname();

CounterControllerにもメソッドを追加します。

    @GetMapping("/and-hostname/{id}")
    public Object countAndHostname(@PathVariable ActorId id) {
        CounterActor counterActor = actorProxyBuilder.build(id);
        return counterActor.countAndHostname();
    }

これでアクターが返すホスト名を見て配置の分散具合を確認できるようになったはずです。

Kubernetesの準備とデプロイ

アプリケーションのコンテナイメージを作ります。

./mvnw spring-boot:build-image

Kubernetes環境を準備します。
この記事ではKindを使用してローカルで試します。

KindでKubernetesクラスター構築

Kubernetesクラスターを作成します。[3]

kind create cluster --name dapr-actor-example

kubectlのコンテキストを設定します。

kind export kubeconfig --name dapr-actor-example

先程作ったコンテナイメージをロードします。

kind load docker-image dapr-actor-example:v1 --name dapr-actor-example

DaprをKubernetesクラスターへインストールします。

dapr init -k

次のドキュメントに従ってRedisストアを作成します。

https://docs.dapr.io/getting-started/configure-state-pubsub/

アクターがRedisストアを利用することを示すためspec.metadataに次の設定が必要なことに注意してください。

  - name: actorStateStore
    value: "true"
Redisストアの作成

HelmでRedisをインストールします。

helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install redis bitnami/redis

次の内容をdeploy/kubernetes/redis-store.yamlへ保存します。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: default
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: redis-master.default.svc.cluster.local:6379
  - name: redisPassword
    secretKeyRef:
      name: redis
      key: redis-password
  - name: actorStateStore
    value: "true"

このYAMLファイルを適用します。

kubectl apply -f deploy/kubernetes/redis-store.yaml

これでRedisストアが作成されました。

アプリケーションをデプロイします。
次の内容をdeploy/kubernetes/counter.yamlというファイルに保存します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: counter
spec:
  replicas: 3
  selector:
    matchLabels:
      app: counter
  template:
    metadata:
      labels:
        app: counter
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "counter"
        dapr.io/app-port: "8080"
    spec:
      containers:
        - name: counter
          image: dapr-actor-example:v1
          imagePullPolicy: Never
---
apiVersion: v1
kind: Service
metadata:
  name: counter
spec:
  selector:
    app: counter
  ports:
    - port: 8080

このYAMLファイルを適用します。

kubectl apply -f deploy/kubernetes/counter.yaml

アプリケーションがデプロイされたかkubectl get allで確認してみます。
次のような出力になればデプロイされています。

NAME                           READY   STATUS    RESTARTS   AGE
pod/counter-74fbbbbc4c-2fk8b   2/2     Running   0          4m11s
pod/counter-74fbbbbc4c-sp462   2/2     Running   0          4m11s
pod/counter-74fbbbbc4c-zn8r4   2/2     Running   0          4m11s
pod/redis-master-0             1/1     Running   0          114m
pod/redis-replicas-0           1/1     Running   0          114m
pod/redis-replicas-1           1/1     Running   0          113m
pod/redis-replicas-2           1/1     Running   0          113m

NAME                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                               AGE
service/counter          ClusterIP   10.96.30.9      <none>        8080/TCP                              4m11s
service/counter-dapr     ClusterIP   None            <none>        80/TCP,50001/TCP,50002/TCP,9090/TCP   4m11s
service/kubernetes       ClusterIP   10.96.0.1       <none>        443/TCP                               117m
service/redis-headless   ClusterIP   None            <none>        6379/TCP                              114m
service/redis-master     ClusterIP   10.96.107.175   <none>        6379/TCP                              114m
service/redis-replicas   ClusterIP   10.96.208.221   <none>        6379/TCP                              114m

NAME                      READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/counter   3/3     3            3           4m11s

NAME                                 DESIRED   CURRENT   READY   AGE
replicaset.apps/counter-74fbbbbc4c   3         3         3       4m11s

NAME                              READY   AGE
statefulset.apps/redis-master     1/1     114m
statefulset.apps/redis-replicas   3/3     114m

NAME                           AGE
component.dapr.io/statestore   19m

分散配置とフェールオーバーの確認

アプリケーションがデプロイできたため動作確認をします。

アプリケーションは3つのPodで動いていますが、/and-hostname/{id}に対していくつかのアクターIDでcurlコマンドを試し、返ってきた値に含まれるホスト名を見ることで各Podへの分散具合を確認します。

私はMacを使っていますが、KindはServiceへポートフォワードする機能を持たないようなので、適当なコンテナを立ててその中からcurlを実行します。

動作確認用のコンテナを立てます。

kubectl run -it --rm --restart=Never --image=curlimages/curl --command test -- sh

この動作確認用のコンテナ内で次のようにcurlを実行します。

curl counter:8080/and-hostname/1

すると次のようにカウント値とホスト名が返ってきます。

[1,"counter-74fbbbbc4c-zn8r4"]

これを100個のアクターID(1100)に対して実行し[4]、集計したものが次の表です。

ホスト名 アクター数
counter-74fbbbbc4c-2fk8b 34
counter-74fbbbbc4c-sp462 32
counter-74fbbbbc4c-zn8r4 34

100個のアクターインスタンスが3つのPodへ分散配置されていることがわかりました。

次にフェールオーバーの確認をします。

アクターIDfooを持つアクターインスタンスを配置します。

curl counter:8080/and-hostname/foo
[1,"counter-74fbbbbc4c-zn8r4"]

counter-74fbbbbc4c-zn8r4というホスト名を持つPodへ配置されましたね。
では、そのcounter-74fbbbbc4c-zn8r4を削除します。

kubectl delete pod counter-74fbbbbc4c-zn8r4

Podが削除できたらクラスター内の各Podの状態を確認します。
ここで見たいのはアプリケーションのPodなので--selectorで絞り込みます。

kubectl get pod --selector='app=counter'
NAME                       READY   STATUS    RESTARTS   AGE
counter-74fbbbbc4c-2fk8b   2/2     Running   0          16m
counter-74fbbbbc4c-gvhvw   2/2     Running   0          2m20s
counter-74fbbbbc4c-sp462   2/2     Running   0          16m

counter-74fbbbbc4c-gvhvwというPodが登場しましたが、これは先ほど削除したcounter-74fbbbbc4c-zn8r4に代わり新しく生成されたものですね。

それでは再びアクターIDfooを指定してリクエストを投げます。

curl counter:8080/and-hostname/foo

すると次のJSONが返ってきました。

[2,"counter-74fbbbbc4c-gvhvw"]

新しく生成されたcounter-74fbbbbc4c-gvhvwに再配置されました。
状態も引き継げていますね。

このことからPodが落ちてしまっても、そこに配置されていたアクターは別のPodへフェールオーバーされることが確認できました。

今回紹介しなかった機能

以上でこの記事におけるアクター機能の紹介を終わります。

今回は次の2つの機能を紹介しませんでしたが、また別の記事で紹介したいと思います。[5]

  • タイマー
  • リマインダー

それからアクティベーションは確認しましたが、非アクティベーション(つまりアクターインスタンスの破棄)の動作も見ておきたいですね。

所感

アクターモデルを実現するための機能を言語やフレームワークではなくてSidecarで提供しているところが面白いです。
最低限HTTPさえ扱えるなら、どんな言語でもアクターモデルを手に入れられることが良いですね。

ただ、最初の方でも述べたようにDaprのアクター機能はHTTP通信(またはgRPC通信)以外にもやることが多いため、SDKが用意されている言語を使うのが無難だと思いました。[6]

Daprは登場した時から気になってはいたんですが、実際に使う機会もなくあまり触ってきませんでした。
今回、アドベントカレンダーがきっかけで色々と触ってみてこれは良いものだと感じたので、これからも遊びたいと思います。

以上です。

脚注
  1. Daprを初期化していない場合はdapr initしておいてください ↩︎

  2. Javaにもタプルが欲しくなる瞬間、、、 ↩︎

  3. 削除するときはkind delete cluster --name dapr-actor-exampleです。 ↩︎

  4. seq 100 | xargs -t -I{} curl counter:8080/and-hostname/{} ↩︎

  5. 当初はこの記事で紹介しようと考えていたのですが、記事がみるみる膨れ上がったので別の記事で紹介することにしました。 ↩︎

  6. あと本当に単なる印象というかどうでも良い話ではあるんですが、アクターインスタンスをランタイム(Dapr)が管理していたりDynamic ProxyでRPCするところがなんとなくEJBっぽいなぁ、と思いました。EJBを本格的に使ったことはないんですけども。 ↩︎

Discussion

ログインするとコメントできます