Socket.IOでバッチ処理の完了をユーザーに即時通知してみる
こんにちは 👋
株式会社 Rehab for JAPAN フロントエンドエンジニアの okazawa です!
突然ですが Web アプリ開発で、
- API リクエストをする
- API リクエストをきっかけにバッチ処理等の時間のかかる処理を開始する
という場合、バッチ処理の完了をユーザーに知らせるにはどんな方法があるでしょうか?
- バッチ処理が完了するまで API レスポンスを返さない
- 一定時間後に処理が完了しているかユーザーに画面更新等で確認してもらう
- ポーリングする
- WebSocket 等リアルタイムにクライアントとサーバー間で双方向通信できるような技術を使って即時通知をする
- メールで完了通知を送る
簡単に思いつくのはこんなところかと思います。サーバー負荷や UX 的な観点を踏まえると 4 や 5 の方針が良さそうに見えます。
そこで、今回は 4 の方針でバッチ処理の完了をユーザーに即時通知させてみたいと思います!
作りたいもの
今回は、WebSocket を使ったリアルタイム通知の基本的な実装例として、シンプルなバッチ処理の完了通知機能を作っていきます!🚀
主な技術スタック
- Next.js (App Router) v15.1.6
- Socket.IO v4.8.1
- Jotai v2.11.3
画面要件
- バッチ処理を開始できるボタンがある
- 処理中はボタンがローディング状態になる
- 処理完了時、どのページにいてもトースト通知を受け取れる
作ったもの
画面要件だけでは伝わりにくいと思うので、まずは今回作ったデモアプリの操作キャプチャを載せたいと思います!
どの画面にいても通知を受け取れる
ユーザー毎に通知を受け取れる
↓ 今回作ったサンプルのリポジトリです。
前提知識
今回のメインとなる WebSocket とそれに関連して Socket.IO について簡単に説明したいと思います。
WebSocket とは?
MDN の説明を引用すると
WebSocket API は、ユーザーのブラウザーとサーバー間で対話的な通信セッションを開くことができるものです。この API を使用すると、サーバーにメッセージを送信し、サーバーから返信をポーリングすることなく応答を受信することができます。
とあります。
従来の HTTP 通信ではクライアントからサーバーに対してリクエストを投げることでやり取りができるわけですが、サーバー主導でクライアントにデータを転送することは難しいです。したがって今回のようなケースだと画面更新やポーリングによってクライアントからサーバーに「バッチ処理終わりましたか?」とクライアント側から問い合わせるしか確認手段がないということです。
そして、上記のようなクライアント主導の通信の課題を解決したものが WebSocket になります。WebSocket という通信プロトコルによってクライアントとサーバーの双方向通信が可能になり、サーバーからクライアントに対してもデータの転送ができるようになったというわけです。
Socket.IO とは?
簡単に言えば、簡単にクライアントとサーバーの双方向通信を実現するためのライブラリです。基本的に WebSocket を使って接続を行い、WebSocket による接続が不可能な場合は HTTP のロングポーリングを行ってくれるなど、通常の WebSocket 実装だと面倒な部分が簡単に実装できるようになるので、今回はこれを使っていきたいと思います。
実装
それではサンプルコードを見ながら実装を行っていきたいと思います!
留意事項
今回は WebSocket による双方向通信をテーマにしたいので、コンポーネントに使用している shadcn/ui や sonner、状態管理ライブラリの jotai の使用方法については触れないので、必要あれば公式サイトを参照ください 🙇♂️
1. カスタムサーバーの設定
今回の API 実装は Next.js の API Routes を使って実装するので、まずは Socket.IO を Next.js に組み込むためのカスタムサーバーを作成したいと思います!
import { createServer } from 'node:http'
import { parse } from 'node:url'
import next from 'next'
import { Server } from 'socket.io'
declare global {
// グローバルにSocket.IOサーバーインスタンスを保持
// 今回はNext.jsのAPIルートからアクセスするので必要
var io: Server | undefined
}
// Next.jsの開発モードの設定
const dev = process.env.NODE_ENV !== 'production'
const app = next({ dev, turbo: true })
const handle = app.getRequestHandler()
// Next.jsの準備が整ってからサーバー起動
app.prepare().then(() => {
// HTTPサーバーを作成
// Next.jsのリクエストハンドラーを組み込む
const server = createServer((req, res) => {
if (!req.url) return
const parsedUrl = parse(req.url, true)
handle(req, res, parsedUrl)
})
// Socket.IOサーバーの初期化
const io = new Server(server, {
cors: {
origin: '*', // 本番環境では適切なオリジンの設定が必要
methods: ['GET', 'POST'],
},
})
// グローバルにSocket.IOサーバーインスタンスを保存
// これによりNext.jsのAPIルートからWebSocketイベントを発火できる
global.io = io
// クライアントとのWebSocket接続を管理
io.on('connection', (socket) => {
console.log('connected')
// ユーザーごとの個別ルームを作成
// これにより特定のユーザーにのみメッセージを送信できる
socket.on('joinRoom', (userId: string) => {
socket.join(userId)
console.log(`User ${userId} joined room`)
console.log('Current rooms:', socket.rooms)
})
// クライアントが切断したときのクリーンアップ
socket.on('disconnect', () => {
console.log('disconnected')
})
})
// サーバーを3000番ポートで起動
server.listen(3000, () => {
console.log('> Ready on http://localhost:3000')
})
})
このコードでは主に以下のことを行っています
-
HTTP サーバーの作成
- Next.js のリクエストハンドラーを組み込んだ HTTP サーバーを作成
- これによって、通常の Next.js アプリケーションとしての機能を維持
-
Socket.IO サーバーの初期化
- HTTP サーバーに Socket.IO を組み込む
- CORS の設定(開発環境用)
-
WebSocket 接続の管理
- クライアントとの接続確立時の処理
- ユーザーごとの個別ルームの作成
- 切断時の処理
次にこのカスタムサーバーを使用するために、package.json
の scripts
を以下のように変更します。
{
"scripts": {
"dev": "ts-node --project tsconfig.server.json src/server.ts", // ここを変更
// ... 他のスクリプト
}
これで Socket.IO を組み込んだカスタムサーバーの準備は完了です!🎉
2. バッチ処理 API の実装
次はユーザーからのリクエストを受け取ってバッチ処理を開始する API の実装を行いたいと思います!
バッチ処理の API エンドポイント
import { NextResponse } from 'next/server'
export async function POST(request: Request) {
if (!global.io) {
return NextResponse.json({ error: 'WebSocket server not initialized' }, { status: 500 })
}
const { userId } = await request.json()
// バッチ処理をシミュレート
setTimeout(() => {
global.io?.to(userId).emit('batchComplete')
}, 3000)
return NextResponse.json({ status: 'accepted' }, { status: 202 })
}
今回はデモアプリなのでバッチ処理は setTimeout
を使用してシミュレートしています。ここでのポイントは以下の 2 つです。
-
非同期処理の開始
- バッチ処理を開始したら即座に 202 レスポンスを返す
- 処理自体は非同期で継続される
-
WebSocket での通知
- 処理完了時に
global.io
を使って特定のユーザーに通知 -
to(userId)
で特定のユーザールームにのみ通知を送信
- 処理完了時に
「ユーザールーム」とありますが、これをしないと WebSocket サーバーに接続している全員に通知が飛んでしまうことになるので、今回は初回アクセス時に uuid
を使用してユーザー ID を生成してそれを固有の ID としてユーザールームを作成し、そのルームに参加しているユーザーにのみ通知が飛ぶようにしています。
ユーザー ID の初期化処理
import { atom } from 'jotai'
import { v4 as uuidv4 } from 'uuid'
// アプリ起動時に一意のユーザーIDを生成(デモ用)
export const userIdAtom = atom<string>(uuidv4())
バッチ処理の状態管理
バッチ処理の状態を管理するための atom を作成します。
import { atom } from 'jotai'
/**
* バッチ処理の状態を表す型
*
* - idle: 初期状態(未実行)
* - processing: 処理実行中
* - completed: 処理完了
* - error: エラー発生
*/
export type BatchStatus = 'idle' | 'processing' | 'completed' | 'error'
/**
* バッチ処理の状態を管理するための型
*
* @property isProcessing - 処理中かどうかのフラグ
* @property status - 処理の現在の状態
* @property error - エラーが発生した場合のメッセージ
*/
type BatchState = {
isProcessing: boolean
status: BatchStatus
error?: string
}
/**
* バッチ処理の状態を管理するグローバルステート
*
* このatomは以下の目的で使用されます:
* 1. バッチ処理の進行状況の追跡
* 2. UI表示の制御(ボタンの無効化など)
* 3. エラーハンドリング
*
* 初期状態は未実行(idle)で、処理は開始されていない状態
*/
export const batchStatusAtom = atom<BatchState>({
isProcessing: false,
status: 'idle',
})
バッチの進行状態を global state で管理することで、例えば
- ユーザーがバッチ処理開始ボタンを押下
- 別画面に遷移
- 再度ボタンのある画面に戻って来る
という操作が行われた時でもボタンの表示を制御することができます。
カスタムフックの実装
バッチ処理の開始と状態管理を行うカスタムフックを実装します。
import { batchStatusAtom } from '@/jotai/batchAtom'
import { userIdAtom } from '@/jotai/userAtom'
import { useAtom, useAtomValue } from 'jotai'
import { toast } from 'sonner'
/**
* バッチ処理の開始と状態管理を行うカスタムフック
*
* このフックは以下の機能を提供します:
* 1. バッチ処理の開始(APIリクエスト)
* 2. 処理状態の管理と更新
* 3. エラーハンドリングとユーザーへの通知
*/
export const useBatchProcess = () => {
const [batchStatus, setBatchStatus] = useAtom(batchStatusAtom)
const userId = useAtomValue(userIdAtom)
/**
* バッチ処理を開始する関数
*
* 処理の流れ:
* 1. 状態を「処理中」に更新
* 2. APIにリクエストを送信
* 3. エラー発生時は状態を「エラー」に更新
*
* 注意:実際の完了通知はWebSocketで受け取るため、
* この関数では完了状態への更新は行いません
*/
const startBatchProcess = async () => {
try {
// バッチ処理の開始を状態に反映
setBatchStatus({
isProcessing: true,
status: 'processing',
})
// バッチ処理開始のAPIリクエスト
const response = await fetch('/api/batch', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
// ユーザーIDを送信して、WebSocketで個別に通知を受け取れるようにする
body: JSON.stringify({
userId,
}),
})
if (!response.ok) {
throw new Error('バッチ処理の開始に失敗しました')
}
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : 'バッチ処理でエラーが発生しました'
setBatchStatus({
isProcessing: false,
status: 'error',
error: errorMessage,
})
// ユーザーにエラーを通知
toast(errorMessage, {
style: {
backgroundColor: '#fee2e2',
color: '#ef4444',
borderColor: '#fecaca',
},
})
}
}
return {
isProcessing: batchStatus.isProcessing,
startBatchProcess,
}
}
3. フロントエンドの実装
最後に、バッチ処理を開始するボタンと WebSocket 通知を受け取る部分を実装したいと思います!
WebSocket 接続の管理
まずは、WebSocket 接続を管理するためのカスタムフックを作成します。
'use client'
import { socketAtom } from '@/jotai/socketAtom'
import { useAtom } from 'jotai'
import { useCallback, useEffect } from 'react'
import { io } from 'socket.io-client'
/**
* WebSocket接続を確立する関数
*
* @param userId - ユーザーを識別するためのID
* @returns Socket.IOのインスタンス
*
* この関数は以下の機能を提供します:
* 1. Socket.IOクライアントの初期化
* 2. 再接続の設定
* 3. ユーザー固有のルームへの参加
*/
export const connectSocket = (userId: string) => {
// Socket.IOクライアントの初期化
const newSocket = io('http://localhost:3000', {
reconnection: true,
reconnectionAttempts: 3,
})
// 接続確立時の処理
newSocket.on('connect', () => {
// ユーザー固有のルームに参加
// これにより、このユーザーだけに通知を送信できる
newSocket.emit('joinRoom', userId)
})
return newSocket
}
/**
* WebSocket接続を管理するカスタムフック
*
* @param userId - ユーザーを識別するためのID
* @returns WebSocket関連の操作と状態
*
* このフックは以下の機能を提供します:
* 1. WebSocket接続の確立
* 2. 接続の切断
* 3. コンポーネントのアンマウント時の自動切断
*/
export const useSocket = (userId: string) => {
// WebSocketインスタンスをグローバルに管理
const [socket, setSocket] = useAtom(socketAtom)
// WebSocket接続を確立する関数
const connectSocketWithUser = useCallback(() => {
setSocket(connectSocket(userId))
}, [userId, setSocket])
const disconnectSocket = useCallback(() => {
if (socket) {
socket.disconnect()
setSocket(null)
}
}, [socket, setSocket])
// コンポーネントのアンマウント時に自動的に接続を切断
useEffect(() => {
return () => {
disconnectSocket()
}
}, [disconnectSocket])
return {
socket,
connectSocket: connectSocketWithUser,
disconnectSocket,
}
}
バッチ処理完了の通知リスナー
次に、WebSocket からの通知を受け取るためのリスナーを作成します。
'use client'
import { useSocket } from '@/hooks/useSocket'
import { batchStatusAtom } from '@/jotai/batchAtom'
import { userIdAtom } from '@/jotai/userAtom'
import { useAtom, useAtomValue } from 'jotai'
import { useEffect } from 'react'
import { toast } from 'sonner'
/**
* バッチ処理の状態を監視し、WebSocketを通じて完了通知を受け取るコンポーネント
*
* このコンポーネントは以下の役割を持ちます:
* 1. バッチ処理中のWebSocket接続の管理
* 2. バッチ処理完了イベントの受信と状態更新
* 3. ユーザーへの完了通知(トースト表示)
*/
export const BatchProcessListener = () => {
const [batchStatus, setBatchStatus] = useAtom(batchStatusAtom)
const userId = useAtomValue(userIdAtom)
const { socket, connectSocket, disconnectSocket } = useSocket(userId)
// バッチ処理中のソケット接続管理
// バッチ処理が開始されたときのみWebSocket接続を確立
useEffect(() => {
if (batchStatus.status === 'processing' && !socket) {
connectSocket()
}
}, [batchStatus.status, socket, connectSocket])
// バッチ処理完了イベントの監視
// WebSocketを通じて完了通知を受け取り、状態を更新
useEffect(() => {
if (!socket) return
// バッチ処理完了時の処理
const handleComplete = () => {
// バッチ処理の状態を完了に更新
setBatchStatus({ isProcessing: false, status: 'completed' })
// ユーザーに完了を通知
toast('バッチ処理が完了しました', {
style: {
backgroundColor: '#dcfce7',
color: '#124a28',
borderColor: '#bbf7d0',
},
})
// 完了後はWebSocket接続を切断
disconnectSocket()
}
// 完了イベントのリスナーを登録
socket.on('batchComplete', handleComplete)
// コンポーネントのアンマウント時やsocketの再作成時にリスナーを解除
return () => {
socket.off('batchComplete', handleComplete)
}
}, [socket, setBatchStatus, disconnectSocket])
return null
}
バッチ処理開始ボタン
最後に、バッチ処理を開始するボタンコンポーネントを配置して作成したカスタムフックから必要な値を渡してあげます。
Button コンポーネントの中身
import { Button, type ButtonProps } from '@/components/ui/button'
import { Loader2 } from 'lucide-react'
import type { FC } from 'react'
type Props = ButtonProps & {
isPending: boolean
label: string
pendingLabel: string
}
export const SubmitButton: FC<Props> = ({ isPending, label, pendingLabel, ...props }) => {
return (
<Button type='submit' {...props}>
{isPending ? (
<>
<Loader2 className='mr-1 h-4 w-4 animate-spin' />
{pendingLabel}
</>
) : (
label
)}
</Button>
)
}
'use client'
import { SubmitButton } from '@/components/submitButton'
import { useBatchProcess } from '@/hooks/useBatchProcess'
export default function Home() {
const { isProcessing, startBatchProcess } = useBatchProcess()
return (
<>
<SubmitButton
isPending={isProcessing}
label='バッチ処理を開始'
pendingLabel='処理中'
onClick={startBatchProcess}
disabled={isProcessing}
className='w-40'
/>
</>
)
}
具体的にコードが何をしているのかはコメントに書いている通りです。
全体像をイメージするとこんな感じです。
実際に DevTools で通信を見てみると、以下のように WebSocket 接続がされ、ルームへの参加や batchComplete の通知がされていることが確認できます ✨
DevTools の確認キャプチャ
まとめ
今回は、Socket.IO を使ってバッチ処理の完了をユーザーに即時通知する機能を実装してみました!
実装のポイントをまとめると
-
カスタムサーバー
- Next.js に Socket.IO を組み込み
- ユーザーごとの個別ルームを作成してユーザー毎に通知を受け取れるように
-
バッチ処理
- 非同期処理の開始と完了通知
- バッチ処理の状態を global state で管理することでコンポーネントの制御に使用
-
フロントエンド
- 必要なときだけ接続をする効率的な WebSocket 接続管理
- ユーザーフレンドリーな通知
今回の記事では「こんな方法もあるよ」的な感じで簡単な実装とともに WebSocket の紹介をしてみました。最後までお読みいただきありがとうございました 👋
補足
今回は基本的な使い方を実装しただけで、実際にプロダクトで運用するとなると考慮すべきことが多くあります。なので、最後に注意点をまとめて終わりにしたいと思います。
1. セキュリティ面での考慮
- CORS の適切な設定
- WebSocket 接続時の認証処理の実装
- ユーザーの権限チェック
2. エラーハンドリング
- WebSocket 接続の再接続処理
- バッチ処理のタイムアウト処理
- エラー発生時のユーザーへの適切な通知
3. スケーラビリティ
- 複数の WebSocket サーバーを使用する場合の設定
- コネクション数の制限と管理
4. アーキテクチャ設計
-
バックエンドが別サーバーの場合
- WebSocket サーバーを どこに立てるか
- BE から WebSocket サーバーにイベントを送信する方法の検討
- WebSocket サーバーへの接続情報の管理
5. 監視・ロギング
- WebSocket 接続エラーの検知
- バッチ処理の失敗検知
- クライアントサイドのエラー収集
Discussion