🛠️

TemporalでSubscriptionのWorkflowを組む(クエリ編)

2023/02/15に公開

前回からの続きとなります。
https://zenn.dev/kenfdev/articles/272a627d30b503

本記事では4番の「サブスクリプション購読中の任意のタイミングで顧客に関する情報を取得可能とする」を実装します。

  1. ✅ユーザーがサインアップしたらウェルカムメッセージ送信し、無料トライアル期間を TrialPeriod の間有効化する
  2. TrialPeriod が終わったら、課金処理を開始する
    • ✅無料トライアル期間内にユーザーがキャンセルしたらキャンセルメールを送信する
  3. ✅課金処理
    1. MaxBillingPeriods を超えていない限り
    2. BillingPeriodChargeAmound顧客に請求する
    3. ✅2が終わったら BillingPeriod 待機する
    4. ✅待機中に顧客がキャンセルをしたら、サブスクリプションキャンセルメールを送信する
    5. ✅サブスクリプションが完了( MaxBillingPeriods を超えた)したら、サブスクリプション終了メールを送信する
  4. サブスクリプション購読中の任意のタイミングで顧客に関する以下の情報が取得可能とする
    • 請求金額
    • 現在のサブスクリプション期間(手動での調整用 e.g. 返金など)

Queryの実装

https://learn.temporal.io/tutorials/typescript/subscriptions/#queries-and-reusable-functions

公式チュートリアルの方もかなりサラッと流しちゃってるトピックですが、本記事でも実装内容を先に載せちゃいます。

 export const cancelSubscription = wf.defineSignal('cancelSignal');
 
-export async function subscriptionWorkflow(customer: Customer): Promise<void> {
+export async function subscriptionWorkflow(customer: Customer): Promise<string> {
+  const customerIdName = querySignalState('CustomerIdName', 'customerid');
+  const billingPeriodNumber = querySignalState('BillingPeriodNumber', 0);
+  const billingPeriodChargeAmount = querySignalState(
+    'BillingPeriodChargeAmount',
+    customer.subscription.initialBillingPeriodCharge
+  );
+  wf.setHandler(customerIdName.query, () => customer.id);
+
   let isCanceled = false;
   wf.setHandler(cancelSubscription, () => void (isCanceled = true));
 
   await acts.sendWelcomeEmail(customer);
   if (await wf.condition(() => isCanceled, customer.subscription.trialPeriod)) {
     await acts.sendCancellationEmailDuringTrialPeriod(customer);
+    return `Cancelled subscription for ${customer.id} during trial period`;
   } else {
-    await billingCycle(customer);
+    const totalCharged = await billingCycle(customer, billingPeriodNumber, billingPeriodChargeAmount);
+    return `Completed subscription for ${customer.id}, total charged: $${totalCharged}`;
   }
 }
 
-async function billingCycle(customer: Customer) {
+async function billingCycle(
+  customer: Customer,
+  billingPeriodNumber: ReturnType<typeof querySignalState<number>>,
+  billingPeriodChargeAmount: ReturnType<typeof querySignalState<number>>
+): Promise<number> {
+  let totalCharged = 0;
   let isCanceled = false;
   wf.setHandler(cancelSubscription, () => void (isCanceled = true)); // reuse signals
-  await acts.chargeCustomerForBillingPeriod(customer);
-  for (let num = 0; num < customer.subscription.maxBillingPeriods; num++) {
+
+  // eslint-disable-next-line no-constant-condition
+  while (true) {
+    if (billingPeriodNumber.value >= customer.subscription.maxBillingPeriods) break;
+    console.log('charging', customer.id, billingPeriodChargeAmount.value);
+
     // Wait 1 billing period to charge customer or if they cancel subscription
     // whichever comes first
     if (await wf.condition(() => isCanceled, customer.subscription.billingPeriod)) {
@@ -33,11 +52,38 @@ async function billingCycle(customer: Customer) {
       break;
     }
 
-    await acts.chargeCustomerForBillingPeriod(customer);
+    await acts.chargeCustomerForBillingPeriod(customer, billingPeriodChargeAmount.value);
+    totalCharged += billingPeriodChargeAmount.value;
+
+    billingPeriodNumber.value++;
   }
 
   // if we get here the subscription period is over
   if (!isCanceled) {
     await acts.sendSubscriptionOverEmail(customer);
   }
+
+  return totalCharged;
+}
+
+function querySignalState<T = any>(name: string, initialValue: T) {
+  const signal = wf.defineSignal<[T]>(name);
+  const query = wf.defineQuery<T>(name);
+  let state: T = initialValue;
+  wf.setHandler(signal, (newValue: T) => {
+    console.log('updating ', name, newValue);
+    state = newValue;
+  });
+  wf.setHandler(query, () => state);
+  return {
+    signal,
+    query,
+    get value() {
+      // need to use closure because function doesn't rerun unlike React Hooks
+      return state;
+    },
+    set value(newVal: T) {
+      state = newVal;
+    },
+  };
 }

ポイントとしては querySignalState の中で使っている defineQuery です。

https://docs.temporal.io/application-development/features?lang=typescript#queries

Workflowの状態を取得するために使うのがこの defineQuery になります。Queryに対して setHandler をすることで、何を返すのか、というロジックを組み立てることができます。

上記の querySignalState は、SignalとQueryを作ってくれる便利関数となっています。

SignalとQueryの使い分けとしては、

  • Workflowの状態を更新したり、副作用を及ぼすのがSignal
    • Signalからは情報を返せない
  • Workflowの状態を取得するためにはQuery(副作用が無い)

Clientを実装

では、Queryを実行するClientを実装します。これは比較的シンプルに取得できます。

import { Connection, WorkflowClient } from '@temporalio/client';

async function run() {
  const connection = await Connection.connect({ address: 'temporal:7233' });
  const client = new WorkflowClient({
    connection,
  });
  for (let i = 0; i < 5; i++) {
    // workflowのhandleを取得
    const handle = await client.getHandle('workflow-' + i);
    // `query` を使って、WorkflowのQueryハンドラを呼び出す
    const result = await handle.query<number>('BillingPeriodNumber');
    const result2 = await handle.query<number>('BillingPeriodChargeAmount');

    console.log('Workflow:', 'Id', handle.workflowId);
    console.log('Billing Results', 'Billing Period', result);
    console.log('Billing Results', 'Billing Period Charge', result2);
  }
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});

テスト

ということでWorkflowを実行してみます。

npm run workflow

そして先程のクエリを実行するスクリプトも使ってみます。

npm run workflow:query

以下のようにしっかりと状態を取得できます。

Workflow: Id workflow-0
Billing Results Billing Period 0
Billing Results Billing Period Charge 130
Workflow: Id workflow-1
Billing Results Billing Period 0
Billing Results Billing Period Charge 140
Workflow: Id workflow-2
Billing Results Billing Period 0
Billing Results Billing Period Charge 150
Workflow: Id workflow-3
Billing Results Billing Period 0
Billing Results Billing Period Charge 160
Workflow: Id workflow-4
Billing Results Billing Period 0
Billing Results Billing Period Charge 170

途中で実行すると以下のようにBilling Periodが更新されているのが確認できます。

Workflow: Id workflow-0
Billing Results Billing Period 3
Billing Results Billing Period Charge 130
Workflow: Id workflow-1
Billing Results Billing Period 2
Billing Results Billing Period Charge 140
Workflow: Id workflow-2
Billing Results Billing Period 0
Billing Results Billing Period Charge 150
Workflow: Id workflow-3
Billing Results Billing Period 0
Billing Results Billing Period Charge 160
Workflow: Id workflow-4
Billing Results Billing Period 0
Billing Results Billing Period Charge 170

そして最終的に全部3回請求が走って完了します。

Workflow: Id workflow-0
Billing Results Billing Period 3
Billing Results Billing Period Charge 130
Workflow: Id workflow-1
Billing Results Billing Period 3
Billing Results Billing Period Charge 140
Workflow: Id workflow-2
Billing Results Billing Period 3
Billing Results Billing Period Charge 150
Workflow: Id workflow-3
Billing Results Billing Period 3
Billing Results Billing Period Charge 160
Workflow: Id workflow-4
Billing Results Billing Period 3
Billing Results Billing Period Charge 170

Workflow完了時にGUIからも結果が参照できています。

おわりに

ということで、急ぎ足ですがQueryの実装も完了しました。Workflowの実行から状態の取得、状態の更新まで一通り触れたことになります。あくまでNPM ScriptsからClientを実行しているだけになるので、いまいち実際の環境でどのように使うのかがわかりにくいかもしれないので次はNext.jsでのフルスタックな利用例について見ていこうと思います。
https://learn.temporal.io/tutorials/typescript/nextjs/

このデモを試したい人は以下のリポジトリでVSCodeのdevcontainerを用意しているので、うまくいけばそのまま動作確認できます。
https://github.com/kenfdev/temporal-tutorial/tree/main/subscription-tutorial

Discussion