🐣

Dapr Tutorials - Pub/Sub をやってみた

2023/11/30に公開

はじめに

Dapr入門シリーズです。
今回はこちら
https://github.com/dapr/quickstarts/tree/master/tutorials/pub-sub

kubernetes 環境で動かしたいので、チュートリアルのKubernetesで実行からやっていきます。

環境

WSL2 Ubuntu 22.04
minikube v1.32.0
helm v3.13.1

構成

今回も公式様よりお借りしました。
アーキテクチャ図

C# のサブスクライバーが図にいない気がするんだけど?

事前準備

準備ができていたらこの章はスキップで。

  • クラスターを作成します。
$ minikube start --cpus=4 --memory=4096 --vm-driver=docker
  • dapr を初期化します。
$ dapr init --kubernetes --wait
  • 適当なディレクトリに git のコードをcloneします。
git clone https://github.com/dapr/quickstarts.git

Run in Kubernetes

Set up a Redis store

まずは Redis をセットアップします。

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
$ helm install redis bitnami/redis --set image.tag=6.2

このチュートリアルでは自動で生成されたシークレットを使えるようになっているので、特に作業をせずに次に進みます。

Deploy assets

  1. deploy ディレクトリに移動します。
$ cd quickstarts/tutorials/pub-sub/deploy
  1. パブリッシャーと 3 つのサブスクライバー マイクロサービス、および Redis 構成をデプロイします。
$ kubectl apply -f .

デプロイされたか確認するには以下のコマンドを叩きます。
4つもデプロイするので、ちょっと時間がかかりました。

$ kubectl rollout status deploy/node-subscriber
deployment "node-subscriber" successfully rolled out
$ kubectl rollout status deploy/python-subscriber
deployment "python-subscriber" successfully rolled out
$ kubectl rollout status deploy/csharp-subscriber
deployment "csharp-subscriber" successfully rolled out
$ kubectl rollout status deploy/react-form
deployment "react-form" successfully rolled out
  1. 作成されたポッドを確認します。
$ kubectl get pods
NAME                                 READY   STATUS    RESTARTS        AGE
csharp-subscriber-6dddd7767b-6xks4   2/2     Running   0               8m28s
node-subscriber-54795fc658-fbkqn     2/2     Running   0               8m28s
python-subscriber-6cfcfd589d-8bvmg   2/2     Running   1 (110s ago)    8m28s
react-form-557ff489d8-2zk7j          2/2     Running   1 (117s ago)    8m28s
redis-master-0                       1/1     Running   0               13m
redis-replicas-0                     1/1     Running   1 (2m42s ago)   13m
redis-replicas-1                     1/1     Running   1 (2m39s ago)   12m
redis-replicas-2                     1/1     Running   1 (2m28s ago)   12m

Use the app

  1. React の web フォームにポートフォワードしてアクセスします。
$ kubectl port-forward service/react-form 8000:80

http://localhost:8000 にアクセスします。

  1. メッセージタイプを選んで、メッセージを入力して送信します。
  2. ログが生成されるので、それぞれ確認します。
$ kubectl logs --selector app=node-subscriber -c node-subscriber
Node App listening on port 3000!
A:  Message on A
B:  Message on B
$ kubectl logs --selector app=python-subscriber -c python-subscriber
127.0.0.1 - - [30/Nov/2023 08:35:40] "GET /dapr/subscribe HTTP/1.1" 200 -
A: {'data': {'message': 'Message on A', 'messageType': 'A'}, 'datacontenttype': 'application/json', 'id': '7e315539-9733-4e59-81e9-83a5c534877f', 'pubsubname': 'pubsub', 'source': 'react-form', 'specversion': '1.0', 'time': '2023-11-30T08:47:22Z', 'topic': 'A', 'traceid': '00-00000000000000000000000000000000-0000000000000000-00', 'traceparent': '00-00000000000000000000000000000000-0000000000000000-00', 'tracestate': '', 'type': 'com.dapr.event.sent'}
Received message "Message on A" on topic "A"
127.0.0.1 - - [30/Nov/2023 08:47:22] "POST /A HTTP/1.1" 200 -
C: {'data': {'message': 'Message on C', 'messageType': 'C'}, 'datacontenttype': 'application/json', 'id': '5016c6bc-8376-4699-9353-3c59fc25905f', 'pubsubname': 'pubsub', 'source': 'react-form', 'specversion': '1.0', 'time': '2023-11-30T08:47:38Z', 'topic': 'C', 'traceid': '00-00000000000000000000000000000000-0000000000000000-00', 'traceparent': '00-00000000000000000000000000000000-0000000000000000-00', 'tracestate': '', 'type': 'com.dapr.event.sent'}
Received message "Message on C" on topic "C"
$ kubectl logs --selector app=csharp-subscriber -c csharp-subscriber
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: /app
warn: Microsoft.AspNetCore.Server.Kestrel[22]
      As of "11/30/2023 08:29:30 +00:00", the heartbeat has been running for "00:00:01.6835938" which is longer than "00:00:01". This could be caused by thread pool starvation.
A: Message on A
B: Message on B
C: Message on C

node のサブスクライバーは AとB、pythonのサブスクライバーは AとC、C#のサブスクライバーは全てを、サブスクライブするようになっています。(なんか間違ったかと思ってちょっとびっくり😲した笑)

Cleanup

クリーンアップしましょう。

$ kubectl delete -f .

How it works

node の場合だと、この /dapr/subscribe のエンドポイントで、dapr に、AとBを購読するよ!と教えてるんですね。

node-subscriber/app.js
app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        {
            pubsubname: "pubsub",
            topic: "A",
            route: "A"
        },
        {
            pubsubname: "pubsub",
            topic: "B",
            route: "B"
        }
    ]);
});

で、購読の通知が来たAとBは以下のエンドポイントで処理されると。

node-subscriber/app.js
app.post('/A', (req, res) => {
    console.log("A: ", req.body.data.message);
    res.sendStatus(200);
});

app.post('/B', (req, res) => {
    console.log("B: ", req.body.data.message);
    res.sendStatus(200);
});

Python と C# も同じ感じ。

React は クライアントとサーバに分かれていて、
まずクライアント側で、publish エンドポイント に送信します。ここのエンドポイント名は実はなんでも良さそう。

react-form/client/src/MessageForm.js
    handleSubmit = (event) => {
        fetch('/publish', {
            headers: {
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            },
            method:"POST",
            body: JSON.stringify(this.state),
        });
        event.preventDefault();
        this.setState(this.getInitialState());
    }

受け取ったサーバ側は、Dapr に配信してーとお願いします。

react-form/server.js
app.post('/publish', async (req, res) => {
  console.log("Publishing: ", req.body);
  await axios.post(`${daprUrl}/publish/${pubsubName}/${req.body?.messageType}`, req.body);
  return res.sendStatus(200);
});

おわりに

仕組みのところを読んで、Copilotにも解説してもらってようやくPub/Subを理解できた気がします💦
サブスクライブするエンドポイントは各アプリで用意しないといけないんだなーと思いつつ、実装にどうやっていくか考えたいと思います。

Discussion