🔫

GCP x TypeScript で作る非同期システム

に公開

はじめに

こんにちは、takahashi(@stak_22)です。
Otaac(オターク)という会社で新規プロダクトを制作しており、個人的に初めての技術に触れる機会が多いので、紹介していければと思います。

今回は、GCP(+Firebase) x TypeScript で非同期の仕組みを作ったという話をさせていただきます。背景としては、PUSH通知を送信するため、Firestoreに作成した未来の通知コレクション(notifications)を拾い、送信時間になったものを送信するという仕組みを実現することが目的で、これを作りました。

ただし、今回は具体的なコードは控えめにして、あくまで方針や設計など全体像についてご紹介できればと思います。GCPの基礎的なお話にはなりますが、ぜひお付き合い下さい。

想定する読者

  • TypescriptとGCP(+Firebase)で非同期システムを実装したい方
  • TypescriptとGCP(+Firebase)でタスクスケジューラーを実装したい方

話さないこと

  • PUSH通知の送信の仕方(送信自体はFCMを使っています)
  • 完成した完璧なコード(実際よりも抽象化しているので、コピペして使えるとは限りません)

実現したこと

  • 定期的に関数を実行する仕組み
  • (そのタイミングで)キューイングする仕組み

キューに積むトリガーが引かれている様子

キューに積まれたタスクが実行されている様子

システム設計

全体像

今回は Cloud Functions で処理している内容が通知送信ですが、これがキューイングされる関数なので通知送信に限らず置き換えることが可能です。

主要技術

1. Cloud Scheduler

Cloud Scheduler は、指定したスケジュールに基づいてジョブを定期実行できるマネージドサービスです。
いわゆる「cron ジョブ」のような役割を担い、HTTP リクエストや Pub/Sub トピックの公開を自動でトリガーできます。
これにより、通知処理やバッチ処理などを決まった時間に確実に実行することができます。

2. Cloud Pub/Sub

Cloud Pub/Sub は、非同期メッセージングを実現するためのサービスです。
Publisher がトピックにメッセージを送信し、Subscriber がそれを受信することで、システム間を疎結合に保ちながらデータをやり取りできます。
イベント駆動型アーキテクチャを構築する際に非常に有効です。

3. Cloud Tasks

Cloud Tasks は、非同期ジョブを安全かつ確実にキューイング・実行するためのサービスです。
大量のリクエストを一度に処理するのではなく、キューを介して順次実行することで、バックエンドへの負荷を制御できます。
また、リトライポリシーやレート制限を細かく設定できる点も特徴です。

4. Cloud Functions for Firebase

Cloud Functions は、サーバーレスでコードを実行できるコンピュートサービスです。
イベント駆動型で動作し、HTTP リクエストや Pub/Sub メッセージの受信などをトリガーに処理を実行します。
インフラ管理が不要で、必要な処理を関数単位で軽量にデプロイできる点が魅力です。

実装

手順

インフラはTerraformを使って構築します。(コードは一部AIを通して差し替えているため、あくまで参考程度にご覧ください)

1. GCPプロジェクトの準備

まず、必要なAPIを有効化します。:

  resource "google_project_service" "required_apis" {
    for_each = toset([
      "cloudscheduler.googleapis.com",
      "pubsub.googleapis.com",
      "cloudfunctions.googleapis.com",
      "cloudtasks.googleapis.com",
      "appengine.googleapis.com",
      "firestore.googleapis.com",
    ])

    service = each.value
  }

2. Pub/Subトピックの作成

Cloud Schedulerからのメッセージを受け取るトピックを作成:

  resource "google_pubsub_topic" "notification_scheduler" {
    name = "notification-scheduler-topic"
  }

3. Cloud Schedulerの設定

毎分実行されるスケジューラージョブを作成。Asia/Tokyoタイムゾーンで動作:

  resource "google_cloud_scheduler_job" "notification_scheduler" {
    name      = "notification-scheduler-job"
    schedule  = "* * * * *"  # 毎分実行
    time_zone = "Asia/Tokyo"
    
    pubsub_target {
      topic_name = google_pubsub_topic.notification_scheduler.id
    }
  }

4. Cloud Tasksキューの作成

非同期処理用のタスクキューを設定:

  resource "google_cloud_tasks_queue" "notification_queue" {
    name     = "notifications-queue"
    location = "asia-northeast1"

    rate_limits {
      max_dispatches_per_second = 100
      max_concurrent_dispatches = 100
    }
  }

5. 処理関数の実装

Cloud Tasksから Cloud Functions 経由(Cloud FunctionsのHTTPエンドポイント)で呼ばれる非同期で行いたい通知処理のメインの関数:

  import { onRequest } from 'firebase-functions/v2/https';

  export const processScheduledNotifications = onRequest(
    {
      region: 'asia-northeast1',
      memory: '1GiB',
      timeoutSeconds: 540,
    },
    async (req, res) => {
      // Cloud Tasksからの認証検証
      const authHeader = req.headers.authorization;
      if (!authHeader?.startsWith('Bearer ')) {
        return res.status(401).json({ error: 'Unauthorized' });
      }

      const { targetTime } = req.body;

      // Firestoreから該当時刻の通知を取得
      const notifications = await db
        .collection('notifications')
        .where('status', '==', 'pending')
        .where('scheduledTime', '>=', startTime)
        .where('scheduledTime', '<=', endTime)
        .get();

      // FCMで通知送信
      for (const notification of notifications.docs) {
        await fcmService.sendNotification(notification.data());
        await notification.ref.delete(); // 送信成功したら削除
      }

      res.status(200).json({ success: true });
    }
  );

6. トリガー関数の実装

毎分の Scheduler をトリガーに Pub/Sub メッセージを受信して Cloud Tasks にエンキューする関数:

  import { onMessagePublished } from 'firebase-functions/v2/pubsub';

  // 先ほど実装した processScheduledNotifications をキューイングします
  const functionUrl = `https://${location}-${project}.cloudfunctions.net/processScheduledNotifications`;

  export const notificationTrigger = onMessagePublished(
    {
      topic: 'notification-scheduler-topic',
      region: 'asia-northeast1',
      memory: '512MiB',
    },
    async (event) => {
      const now = new Date();
      const targetTime = new Date(now);
      targetTime.setSeconds(0, 0); // 秒とミリ秒を0に正規化

      const client = new CloudTasksClient();
      const parent = client.queuePath(project, location, queue);

      await client.createTask({
        parent,
        task: {
          httpRequest: {
            httpMethod: 'POST',
            url: functionUrl,
            body: Buffer.from(JSON.stringify({
              targetTime: targetTime.toISOString()
            })),
            oidcToken: {
              serviceAccountEmail: `${project}@appspot.gserviceaccount.com`,
            },
          },
        },
      });
    }
  );

7. エントリーポイントの設定

Firebase Functionsのエントリーポイント(index.ts)で、作成した関数をエクスポート:

  // index.ts
  import { initializeApp } from 'firebase-admin/app';
  
  // Firebase Adminを初期化
  initializeApp();
  
  // 作成した関数をエクスポート
  export { notificationTrigger } from './path/to/trigger';
  export { processScheduledNotifications } from './path/to/processor';

これにより、Firebase Functionsが関数を認識してデプロイできるようになります。

8. IAM権限の設定

必要な権限を付与:

  # Cloud Tasksがfunctionを呼び出す権限
  resource "google_cloudfunctions2_function_iam_member" "tasks_invoker" {
    cloud_function = "processScheduledNotifications"
    role           = "roles/cloudfunctions.invoker"
    member         = "serviceAccount:cloud-tasks-invoker@project.iam.gserviceaccount.com"
  }

  # FunctionsがCloud Tasksにエンキューする権限
  resource "google_project_iam_member" "functions_tasks_enqueuer" {
    role   = "roles/cloudtasks.enqueuer"
    member = "serviceAccount:${project_id}@appspot.gserviceaccount.com"
  }

9. デプロイ

  # Terraformでインフラをデプロイ
  terraform apply

  # Functionsをデプロイ
  firebase deploy --only functions

ポイント

  • 軽量なトリガー関数: Pub/Subを受けるトリガー関数は、タスクのエンキューのみを行い軽量にする(これは Scheduler により同期処理される)
  • 重い処理は非同期で: 実際の通知送信処理はCloud Tasksを介して非同期実行
  • リトライ設定: Cloud Tasksのリトライ設定で、失敗時の自動リトライを実現
  • スケーラビリティ: Cloud Tasksのrate limitsで同時実行数を制御
  • 時刻の正規化: 毎分0秒に正規化して、その分の通知を処理

この設計により、大量の通知があってもシステムに負荷をかけることなく、確実に処理できる仕組みを実現しています。

参考

https://zenn.dev/renn/articles/ee6fabe2cf5b25

まとめ

Cloud Scheduler → Pub/Sub → Cloud Functions → Cloud Tasks → Cloud Functions という流れで、定期実行と非同期処理を実現しました。

トリガー関数でキューイングのみを行い、重い処理は非同期で実行するという仕組みです。

この設計パターンは通知送信以外にも、バッチ処理やデータ同期など様々な用途に応用可能ですので、非同期システムを実装したい方はぜひ参考にしていただけますと幸いです。

Discussion