🎒

Next.js + Supabase pgmq で非同期 Worker パイプラインを実装する

に公開

本記事はCommune Developers Advent Calendar 2025の11日目の記事です

はじめに

アプリケーション上でで重い処理(外部API呼び出し、大量データ処理など)をリクエスト中に実行すると、レスポンスが遅くなりUXが悪化します。

本記事では、Supabase に内蔵されている pgmq(PostgreSQL Message Queue) を使って、非同期WorkerパイプラインをNext.js上で実装する方法を紹介します。

  • 追加インフラ不要(Supabase のみ)
  • DBトランザクションと統合可能
  • シンプルな実装でリトライ・エラーハンドリングも対応

*Cron jobでの呼び出しには別途インフラが必要。今回はVercel Cronを使用。pg_cronを使えばSupabase内で完結も可能。

アーキテクチャ概要

┌──────────────┐     enqueue     ┌─────────────┐
│  API Route   │ ──────────────→ │    pgmq     │
│ (ユーザー操作) │                 │   (Queue)   │
└──────────────┘                 └──────┬──────┘
                                        │ read
                                        ↓
                               ┌─────────────────┐
                               │  Worker Route   │
                               │ (バックグラウンド) │
                               └─────────────────┘

ポイント:

  • ユーザー操作は即座にレスポンスを返す
  • 重い処理は Queue に入れて Worker が非同期で処理
  • Worker は Vercel Cron(pg_cronでも可)または手動トリガーで起動

pgmq セットアップ

Extension 有効化とキュー作成

-- supabase/migrations/001_setup_pgmq.sql

-- pgmq Extension を有効化
CREATE EXTENSION IF NOT EXISTS pgmq;

-- キューを作成
SELECT pgmq.create('email_notifications');
SELECT pgmq.create('image_processing');
SELECT pgmq.create('data_sync');

RPC ラッパー関数

Supabase Client から pgmq を呼び出すための関数を作成します。

-- Job を送信
CREATE OR REPLACE FUNCTION public.pgmq_send(
    queue_name TEXT,
    msg JSONB
) RETURNS BIGINT
LANGUAGE plpgsql
AS $$
BEGIN
    RETURN pgmq.send(queue_name, msg);
END;
$$;

-- Job を読み取り(visibility timeout 付き)
CREATE OR REPLACE FUNCTION public.pgmq_read(
    queue_name TEXT,
    visibility_timeout INT,
    qty INT
) RETURNS SETOF pgmq.message_record
LANGUAGE plpgsql
AS $$
BEGIN
    RETURN QUERY SELECT * FROM pgmq.read(queue_name, visibility_timeout, qty);
END;
$$;

-- Job を削除
CREATE OR REPLACE FUNCTION public.pgmq_delete(
    queue_name TEXT,
    msg_id BIGINT
) RETURNS BOOLEAN
LANGUAGE plpgsql
AS $$
BEGIN
    RETURN pgmq.delete(queue_name, msg_id);
END;
$$;

TypeScript 実装

型定義

// lib/queue/types.ts

import { z } from "zod";

// pgmq から返されるメッセージの型
export type QueueMessage<T> = {
  msgId: string;
  readCount: number;   // 読み取り回数(リトライ判定に使用)
  enqueuedAt: Date;
  vt: Date;            // Visibility Timeout 期限
  message: T;
};

// Job の型定義(例: メール通知)
export const EmailJobSchema = z.object({
  to: z.string().email(),
  subject: z.string(),
  body: z.string(),
  createdAt: z.coerce.date(),
});
export type EmailJob = z.infer<typeof EmailJobSchema>;

// Job の型定義(例: 画像処理)
export const ImageJobSchema = z.object({
  imageUrl: z.string().url(),
  operations: z.array(z.enum(["resize", "compress", "watermark"])),
  userId: z.string(),
  createdAt: z.coerce.date(),
});
export type ImageJob = z.infer<typeof ImageJobSchema>;

Queue Client

// lib/queue/client.ts

import { createClient } from "@supabase/supabase-js";
import type { QueueMessage } from "./types";

const supabase = createClient(
  process.env.NEXT_PUBLIC_SUPABASE_URL!,
  process.env.SUPABASE_SERVICE_ROLE_KEY! // Server側では Service Role Key を使用
);

export const queueClient = {
  /**
   * Job をキューに追加
   */
  async send<T extends Record<string, unknown>>(
    queueName: string,
    job: T
  ): Promise<{ ok: true; msgId: number } | { ok: false; error: unknown }> {
    const { data, error } = await supabase.rpc("pgmq_send", {
      queue_name: queueName,
      msg: job,
    });

    if (error) {
      console.error(`[Queue] Failed to send to ${queueName}:`, error);
      return { ok: false, error };
    }

    return { ok: true, msgId: data };
  },

  /**
   * キューから Job を読み取り
   * visibility_timeout 秒間は他の Worker から見えなくなる
   */
  async read<T>(
    queueName: string,
    visibilityTimeout: number,
    quantity: number = 1
  ): Promise<{ ok: true; messages: QueueMessage<T>[] } | { ok: false; error: unknown }> {
    const { data, error } = await supabase.rpc("pgmq_read", {
      queue_name: queueName,
      visibility_timeout: visibilityTimeout,
      qty: quantity,
    });

    if (error) {
      console.error(`[Queue] Failed to read from ${queueName}:`, error);
      return { ok: false, error };
    }

    const messages: QueueMessage<T>[] = (data || []).map((row: any) => ({
      msgId: String(row.msg_id),
      readCount: row.read_ct,
      enqueuedAt: new Date(row.enqueued_at),
      vt: new Date(row.vt),
      message: row.message as T,
    }));

    return { ok: true, messages };
  },

  /**
   * 処理完了した Job を削除
   */
  async delete(
    queueName: string,
    msgId: string
  ): Promise<{ ok: true } | { ok: false; error: unknown }> {
    const { error } = await supabase.rpc("pgmq_delete", {
      queue_name: queueName,
      msg_id: BigInt(msgId),
    });

    if (error) {
      console.error(`[Queue] Failed to delete ${msgId} from ${queueName}:`, error);
      return { ok: false, error };
    }

    return { ok: true };
  },
};

Worker API Route の実装

基本パターン

// app/api/worker/email/route.ts

import { NextRequest, NextResponse } from "next/server";
import { queueClient } from "@/lib/queue/client";
import { EmailJobSchema, type EmailJob } from "@/lib/queue/types";

const QUEUE_NAME = "email_notifications";
const VISIBILITY_TIMEOUT = 30; // 30秒
const MAX_ATTEMPTS = 5;

export async function POST(request: NextRequest) {
  // 1. 認証チェック(後述)
  const authError = verifyWorkerRequest(request);
  if (authError) return authError;

  // 2. キューから Job を取得
  const readResult = await queueClient.read<EmailJob>(
    QUEUE_NAME,
    VISIBILITY_TIMEOUT,
    1
  );

  if (!readResult.ok) {
    return NextResponse.json({ error: "Failed to read queue" }, { status: 500 });
  }

  if (readResult.messages.length === 0) {
    return NextResponse.json({ message: "No jobs in queue" });
  }

  const job = readResult.messages[0];

  // 3. リトライ上限チェック
  if (job.readCount >= MAX_ATTEMPTS) {
    console.error(`[Worker] Job ${job.msgId} exceeded max attempts, discarding`);
    await queueClient.delete(QUEUE_NAME, job.msgId);
    return NextResponse.json({ message: "Job discarded after max attempts" });
  }

  // 4. Job のバリデーション
  const parsed = EmailJobSchema.safeParse(job.message);
  if (!parsed.success) {
    console.error(`[Worker] Invalid job payload:`, parsed.error);
    await queueClient.delete(QUEUE_NAME, job.msgId);
    return NextResponse.json({ error: "Invalid job payload" }, { status: 400 });
  }

  // 5. 実際の処理
  try {
    await sendEmail(parsed.data);

    // 6. 成功時のみ削除
    await queueClient.delete(QUEUE_NAME, job.msgId);

    return NextResponse.json({
      message: "Job completed",
      jobId: job.msgId,
    });
  } catch (error) {
    console.error(`[Worker] Processing failed:`, error);
    // 削除しない → Visibility Timeout 後に再試行される
    return NextResponse.json(
      { error: "Processing failed" },
      { status: 500 }
    );
  }
}

async function sendEmail(job: EmailJob) {
  // メール送信処理(例: Resend, SendGrid など)
  console.log(`[Worker] Sending email to ${job.to}`);
  // await resend.emails.send({ ... })
}

バッチ処理パターン

複数の Job を一度に処理する場合:

// app/api/worker/batch-process/route.ts

export async function POST(request: NextRequest) {
  const authError = verifyWorkerRequest(request);
  if (authError) return authError;

  const MAX_JOBS_PER_INVOCATION = 10;
  let processedCount = 0;
  const errors: string[] = [];

  while (processedCount < MAX_JOBS_PER_INVOCATION) {
    const readResult = await queueClient.read<ImageJob>(
      "image_processing",
      300, // 5分
      1
    );

    if (!readResult.ok || readResult.messages.length === 0) {
      break; // キューが空
    }

    const job = readResult.messages[0];

    // リトライ上限チェック
    if (job.readCount >= MAX_ATTEMPTS) {
      await queueClient.delete("image_processing", job.msgId);
      processedCount++;
      continue;
    }

    try {
      await processImage(job.message);
      await queueClient.delete("image_processing", job.msgId);
    } catch (error) {
      errors.push(`Job ${job.msgId} failed: ${error}`);
      // 削除しない → リトライ
    }

    processedCount++;
  }

  return NextResponse.json({
    processed: processedCount,
    errors: errors.length > 0 ? errors : undefined,
  });
}

セキュリティ: Worker 認証

Worker Route は外部から不正に呼び出されないよう保護します。

// lib/queue/auth.ts

import { NextRequest, NextResponse } from "next/server";

const WORKER_SECRET = process.env.WORKER_SECRET;

export function verifyWorkerRequest(request: NextRequest): NextResponse | null {
  // 開発環境ではスキップ
  if (process.env.NODE_ENV === "development") {
    return null;
  }

  const authHeader = request.headers.get("authorization");
  const token = authHeader?.replace("Bearer ", "");

  // Query Parameter でも認証可能(Vercel Cron 用)
  const queryToken = request.nextUrl.searchParams.get("token");

  if (token === WORKER_SECRET || queryToken === WORKER_SECRET) {
    return null; // OK
  }

  return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}

// Worker を呼び出す際のヘルパー
export function withWorkerAuth(options: RequestInit = {}): RequestInit {
  return {
    ...options,
    headers: {
      ...options.headers,
      Authorization: `Bearer ${WORKER_SECRET}`,
    },
  };
}

リトライ戦略とエラーハンドリング

Visibility Timeout の仕組み

1. read() で Job を取得
   → 指定秒数(visibility_timeout)の間、他の Worker から見えなくなる

2. 処理成功 → delete() で削除

3. 処理失敗 → 削除しない
   → visibility_timeout 経過後、再び読み取り可能に
   → readCount がインクリメントされる

4. readCount >= MAX_ATTEMPTS
   → 強制削除(Dead Letter 扱い)

Visibility Timeout の設定目安

処理タイプ Timeout 理由
メール送信 30秒 外部API呼び出し1回
画像処理 5分 重い処理
データ同期 2分 複数DB操作

ルール: 予想処理時間 × 1.5〜2倍

リトライユーティリティ

// lib/queue/retry.ts

import { queueClient } from "./client";
import type { QueueMessage } from "./types";

const MAX_ATTEMPTS = 5;

type DiscardResult =
  | { discarded: false }
  | { discarded: true; jobId: string; attempts: number };

export async function discardIfExceeded<T>(
  queueName: string,
  job: QueueMessage<T>
): Promise<DiscardResult> {
  if (job.readCount < MAX_ATTEMPTS) {
    return { discarded: false };
  }

  console.error(
    `[Queue] Discarding job ${job.msgId} from ${queueName} after ${job.readCount} attempts`,
    { payload: job.message }
  );

  await queueClient.delete(queueName, job.msgId);

  return {
    discarded: true,
    jobId: job.msgId,
    attempts: job.readCount,
  };
}

Job の enqueue(ユーザー操作時)

// app/api/upload/route.ts

import { NextRequest, NextResponse } from "next/server";
import { queueClient } from "@/lib/queue/client";
import { withWorkerAuth } from "@/lib/queue/auth";

export async function POST(request: NextRequest) {
  const { imageUrl, userId } = await request.json();

  // 1. DB に記録(すぐにレスポンスを返すため)
  const record = await db.images.create({
    data: { imageUrl, userId, status: "pending" },
  });

  // 2. 処理を Queue に追加
  const enqueueResult = await queueClient.send("image_processing", {
    imageUrl,
    userId,
    operations: ["resize", "compress"],
    createdAt: new Date(),
  });

  if (!enqueueResult.ok) {
    // enqueue 失敗時はステータスを更新
    await db.images.update({
      where: { id: record.id },
      data: { status: "failed" },
    });
    return NextResponse.json({ error: "Failed to queue job" }, { status: 500 });
  }

  // 3. Worker を即座にトリガー(オプション)
  const appUrl = process.env.NEXT_PUBLIC_APP_URL || "http://localhost:3000";
  fetch(`${appUrl}/api/worker/image`, withWorkerAuth({ method: "POST" }))
    .catch((err) => console.error("Worker trigger failed:", err));

  // 4. すぐにレスポンスを返す
  return NextResponse.json({
    message: "Upload accepted",
    recordId: record.id,
    status: "processing",
  });
}

Vercel Cron で定期実行

// vercel.json
{
  "crons": [
    {
      "path": "/api/worker/email?token=YOUR_WORKER_SECRET",
      "schedule": "* * * * *"
    },
    {
      "path": "/api/worker/image?token=YOUR_WORKER_SECRET",
      "schedule": "*/5 * * * *"
    }
  ]
}

パイプライン化(複数 Stage)

Stage 1 の完了後に Stage 2 を enqueue する例:

// Stage 1: 画像をダウンロード・保存
async function processStage1(job: ImageJob) {
  const localPath = await downloadImage(job.imageUrl);

  // Stage 2 を enqueue
  await queueClient.send("image_transform", {
    localPath,
    operations: job.operations,
    userId: job.userId,
    createdAt: new Date(),
  });

  // Stage 2 Worker をトリガー
  fetch(`${appUrl}/api/worker/image-transform`, withWorkerAuth({ method: "POST" }))
    .catch(console.error);
}
┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
│   Stage 1   │ ──→ │   Stage 2        │ ──→ │    Stage 3      │
│  Download   │     │   Transform      │     │   Upload CDN    │
└─────────────┘     └──────────────────┘     └─────────────────┘

まとめ

pgmq を選ぶメリット

特徴 説明
追加インフラ不要 Supabase に内蔵されている
トランザクション統合 DB操作と enqueue を同一トランザクションで実行可能
シンプル send / read / delete の3操作のみ
リトライ内蔵 visibility timeout + readCount で自動リトライ

ベストプラクティス

  1. 冪等性を意識: 同じ Job を複数回処理しても問題ないよう設計
  2. Visibility Timeout は余裕を持って: 予想時間 × 1.5〜2倍
  3. 失敗時は削除しない: timeout 経過で自動リトライ
  4. MAX_ATTEMPTS で上限設定: 無限リトライを防ぐ
  5. Cron + 手動トリガー併用: 定期実行と即時実行のハイブリッド

ディレクトリ構成例

app/
├── api/
│   ├── upload/route.ts          # ユーザー操作 → enqueue
│   └── worker/
│       ├── email/route.ts       # Worker: メール送信
│       └── image/route.ts       # Worker: 画像処理
lib/
└── queue/
    ├── types.ts                 # Job 型定義
    ├── client.ts                # Queue クライアント
    ├── auth.ts                  # Worker 認証
    └── retry.ts                 # リトライユーティリティ
supabase/
└── migrations/
    └── 001_setup_pgmq.sql       # pgmq セットアップ

従来のアプローチとの比較

従来、非同期処理やバックグラウンドジョブを実装しようとすると、Redis + BullMQ、AWS SQS + Lambda、RabbitMQ など、アプリケーションとは別にキューイングサービスやスケジューラのインフラを用意する必要がありました。

pgmq を使ったアプローチでは:

  • インフラ管理が最小限: Supabase を使っていれば追加のサービス不要
  • コードベースで一元管理: SQLマイグレーションと TypeScript コードだけで完結
  • ローカル開発が容易: supabase start で pgmq 含めた環境がすぐ立ち上がる

特にローカル環境のセットアップが楽な点は、チーム開発でも新メンバーのオンボーディングがスムーズになるメリットがあります。


最後に

本記事で紹介した実装パターンは、私が個人開発している翻訳アプリで実際に運用しているものを抽象化・汎用化したものです。このアプリでは翻訳した内容からユーザーにとっての重要な単語を解析し、単語帳を作るというアプリです。翻訳以外の部分はワーカーで処理をして、「翻訳したいだけなのに、なんかめっちゃ遅い。」みたいなことは一定程度提言できていると感じます。具体的に、「翻訳テキストの解析 → 単語抽出 → ユーザーにとっての重要度のスコアリング」という4段階のパイプラインを pgmq で実装しており、1日数千件の Job程度であればを安定して処理できています。
今回はPublicationの記事なので、具体的なアプリや実装は紹介せず、この程度にとどめ別の機会にまとめたいと思います。

Supabase を使っているプロジェクトで非同期処理が必要になった際は、ぜひ pgmq を検討してみてください。

コミューン株式会社

Discussion