WebSocket通信と同期するReduxアプリケーションの実装パターン
こんにちは、PortalKeyの植森です。
今回は、PortalKeyで実装しているWebSocketやWebRTCといったリアルタイムイベントとReduxを組み合わせた状態管理のアーキテクチャについて紹介します。
SlackやDiscordのようなリアルタイムアプリケーションでは、他のユーザーの行動をリアルタイムに画面に反映する必要があります。
このような要件に対して、UIコンポーネントに直接WebSocketやWebRTCの処理を書くと更新タイミングが散らばり、コードが煩雑になってしまいます。
PortalKeyでは、データ操作とUIコンポーネントを完全に分離し、リアルタイムイベントをReduxのActionとして扱うことで、シンプルで保守性の高いアーキテクチャを実現しています。
課題と背景
リアルタイムアプリケーションでの状態管理には以下のような課題があります:
- 非同期イベントの管理: 他のユーザーの行動による状態変更が頻繁に発生
- UIとデータの分離: コンポーネントに直接リアルタイム処理を書くと複雑化
- 型安全性: リアルタイムイベントの型定義が曖昧になりがち
- 副作用の管理: 音声通話の制御やサウンド再生などの副作用が散らばる
従来のアプローチでは、useEffect
やContext
を使ってリアルタイムイベントを処理することが多いですが、これらは以下の問題があります:
- コンポーネントのライフサイクルに依存した処理
- イベントハンドラが散らばり、全体像が把握しにくい
- 型安全性の確保が困難
特に、WebSocket経由でユーザのインタラクション経由ではないデータ更新をグローバルステートに反映する場合、Providerを使ってグローバルにデータを更新することになります。
しかし、データ更新のみを行うコンポーネントは責務が曖昧になりやすかったり、可読性が悪くなりがちです。
こういった問題を解決するために、PortalKeyではReduxを採用したことによって、データ操作とUIコンポーネントを完全に分離し、WebSocketイベントをReduxのActionとして扱うことで、シンプルで保守性の高いアーキテクチャを実現しています。
アーキテクチャの設計思想
PortalKeyでは、以下の設計思想に基づいてアーキテクチャを構築しています:
Reduxを採用した理由
- データ操作とUIの完全分離: 状態変更のロジックをUIコンポーネントから分離
- 予測可能な状態管理: すべての状態変更がActionを通じて行われることで、状態変更のロジックを一元管理できる
- デバッグのしやすさ: Redux DevToolsによる状態変更の追跡
ドメインごとのコロケーションによる関心の分離
機能ごとにドメインを分離し、それぞれに独立した状態管理を実装することで、コードの保守性を向上させています。関連する状態、アクション、リデューサー、リスナー、オブザーバーを同じディレクトリに配置することで、変更時の影響範囲を最小限に抑えています。
リアルタイムイベントのReduxへの統合
PortalKeyでは、WebSocketイベントを扱うWebSocketClient、WebRTCイベントを扱うRTCClient、デバイスなどWebRTC以外の音声通話イベントを扱うVoiceClientなど、リアルタイムイベントを扱うクラスを用意しています。
これらのクラスのイベントはReduxのActionとしてdispatchすることで、通常のユーザーインタラクションと同じフローで状態を管理できるようにしています。
また、ReduxのselectorやreducerのようにWebSocketイベントを記述できるレイヤーを用意し、ドメイン内にコロケーションすることで、関連する処理を一箇所にまとめるようにしています。
PortalKey Redux
PortalKeyでは、リアルタイムアプリケーションを柔軟に開発できるように、通常のReduxを拡張した実装をしています。
以下は全体の構成図です。通常のFluxアーキテクチャに加えて、ListenerとObserverと書いてあるものが拡張要素です。
Listener
Listener は、Gateway や Voice といった外部からの通信を受け取り、処理を実行します。
Listener では EventListner のように、外部からの通信に対してイベントハンドラとしてコールバックを登録することができます。
Listener では state を参照することはできませんが、async/await といった非同期処理や、副作用のある処理を記述することができます。
Observer
Observer は、Action の発行を監視し、トリガーとなる Action が発行された際に処理を実行する関数を記述します。
また、Observer には async/await をはじめとした副作用のある処理を記述することができます。
実装パターンの詳細
createListenerによるWebSocketイベントのAction変換
WebSocketイベントをReduxのActionに変換するために、createListener
という独自のユーティリティを実装しています。このユーティリティは、WebSocketイベントリスナー内で手動でdispatchを呼び出すための仕組みを提供します。
// libs/flux/createListener.ts
export function createListener<M extends {} = Record<string | number | symbol, unknown[]>, D extends Dispatch = Dispatch>(
callback: (builder: EventEmitterProxy<M>) => void
) {
const listeners: Record<keyof M, Function> = {} as Record<keyof M, Function>
const builder: EventEmitterProxy<M> = {
on<K extends keyof M>(event: K, listener: (dispatch: D, ...payload: M[K]) => Promise<void> | void) {
listeners[event] = listener
}
}
callback(builder)
// React側へ統合するためのhook
const useListener = () => {
const dispatch = useDispatch<D>()
return {
subscribe: (emitter: AsyncEventEmitter<M>) => {
for (const event in listeners) {
emitter.on(event, (...args) => {
// イベントリスナー内で手動でdispatchを呼び出し
listeners[event](dispatch, ...args)
})
}
},
unsubscribe: (emitter: AsyncEventEmitter<M>) => {
for (const event in listeners) {
emitter.off(event, listeners[event])
}
}
}
}
return { useListener }
}
ドメイン別のListener実装
各ドメインでWebSocketイベントに対する処理を定義します。イベントリスナー内で手動でdispatchを呼び出すことで、Reduxのドメインと同じレイヤーで処理を実装できます:
// domains/gateway/listeners.ts
export const gatewayListener = createListener<ClientEventMap>((emitter) => {
emitter.on(GatewayMessageCode.READY, (dispatch, message) => {
// 手動でdispatchを呼び出し
dispatch(GatewayActions.ready(message))
})
emitter.on(GatewayMessageCode.NOTIFY_VOICE_STATE, (dispatch, message) => {
// 手動でdispatchを呼び出し
dispatch(GatewayActions.notifyVoiceState(message.payload))
})
})
ここで使用しているClientEventMap
は、GatewayMessageCode
に対するメッセージの型を型安全に扱うための型定義です。Protocol Buffersから自動生成された型を使用して、各メッセージコードに対応するペイロードの型を正確に定義しています:
// libs/portalkey-api/portalkey/v1/gateway_pb_ext.ts
export interface GatewayMessageMap extends Record<gatewayPb.GatewayMessageCode, [GatewayMessage]> {
[gatewayPb.GatewayMessageCode.READY]: [GatewayMessageReady]
[gatewayPb.GatewayMessageCode.NOTIFY_VOICE_STATE]: [GatewayMessageNotifyVoiceState]
// ... 他のメッセージコード
}
export type ClientEventMap = GatewayEventMap & GatewayMessageMap
この型定義により、emitter.on(GatewayMessageCode.READY, ...)
で指定したメッセージコードに対応する正しいペイロード型が自動的に推論され、型安全性が保証されます。
createObserverによる副作用の管理
状態変更に伴う副作用(音声通話の制御、サウンド再生など)を管理するために、createObserver
を実装しています:
// libs/flux/createObserver.ts
export function createObserver<S, C extends {}, A extends Action = UnknownAction, D extends Dispatch = Dispatch<A>>(
callback: (builder: ObserverBuilder<S, C, D>) => void
): Observer<S, C, A, D> {
const observers: Record<string, ObserverFunc<S, C, any, D>> = {}
const builder: ObserverBuilder<S, C, D> = {
addCase<ActionCreator extends TypedActionCreator<string>>(
actionCreator: ActionCreator,
callback: ObserverFunc<S, C, ReturnType<ActionCreator>, D>
) {
observers[actionCreator.type] = callback
}
}
callback(builder)
return {
onNext: ({ action, state, dispatch, context }) => {
const observer = observers[action.type]
if (observer !== undefined) {
observer({ action, state, dispatch, context })
}
}
}
}
ドメイン別のObserver実装
// domains/voice/observers.ts
export const voiceObserver = createObserver<UserVoiceState, ObserverContextMap>((builder) => {
builder.addCase(VoiceActions.moveRoom, async ({ action, context }) => {
const portalkeyClient = context.resolve(PortalkeyClientToken)
// ユーザの参加しているルームを変更する
await portalkeyClient.voice.moveRoom({
roomUid: action.payload.voiceRoom.room.roomUid,
roomType: action.payload.voiceRoom.roomType
})
})
builder.addCase(VoiceActions.changeDeviceState, ({ action, context }) => {
const soundPlayer = context.resolve(SoundPlayerToken)
// デバイスの変更イベントを検知して、サウンドを再生する
switch (action.payload.changed.device) {
case "mic":
if (action.payload.changed.isEnable) {
soundPlayer.playSound("micUnmute")
} else {
soundPlayer.playSound("micMute")
}
break
}
})
})
Protocol Buffersによる型安全なイベント定義
WebSocketイベントの型安全性を確保するために、Protocol Buffersを使用しています:
// proto/portalkey/v1/gateway.proto
message NotifyVoiceStatePayload {
string voice_chat_uid = 1;
VoiceState voice_state = 2;
}
enum GatewayMessageCode {
READY = 0;
NOTIFY_VOICE_STATE = 1;
}
ListenerとObserverのRedux統合
これまで説明したListenerとObserverをReduxに統合する仕組みについて説明します。
Listenerの統合
各ドメインで定義したListenerを統合し、WebSocketクライアントに接続する仕組みを実装しています:
// domains/listeners.ts
const useListeners = [
chatListener.useListener,
userListener.useListener,
voiceListener.useListener,
workspaceListener.useListener,
gatewayListener.useListener
]
export const useAppListener = () => {
const listeners = useListeners.map((observer) => observer())
const subscribe = useCallback(
(emitter: PortalKeyClient) => {
listeners.forEach((listener) => listener.subscribe(emitter))
},
[listeners]
)
const unsubscribe = useCallback(
(emitter: PortalKeyClient) => {
listeners.forEach((listener) => listener.unsubscribe(emitter))
},
[listeners]
)
return { subscribe, unsubscribe }
}
Observerの統合
各ドメインで定義したObserverを統合し、ReduxのMiddlewareで実行する仕組みを実装しています:
// domains/observers.ts
export const observer = combineObservers({
chat: chatObserver,
user: userObserver,
voice: voiceObserver,
workspace: workspaceObserver,
gateway: gatewayObserver
})
Redux Middlewareでの統合
ReduxのMiddlewareを使用して、ActionがdispatchされるたびにObserverを実行します。まず、ReduxのMiddlewareについて簡単に説明します。
Redux Middlewareとは
ReduxのMiddlewareは、ActionがdispatchされてからReducerが実行されるまでの間に、追加の処理を挟むことができる仕組みです。ログ出力、非同期処理、エラーハンドリングなど、様々な用途で使用されます。
// domains/portalkeyMiddleware.ts
export const createPortalkeyMiddleware = (context: ObserverContext) => {
return createMiddleware<RootState>((middlewareApi) => (next) => (action) => {
// ActionがdispatchされるたびにObserverを実行
observer.onNext({
action,
state: middlewareApi.getState(),
dispatch: middlewareApi.dispatch,
context
})
return next(action)
})
}
このMiddlewareは以下の流れで動作します:
- Actionがdispatchされる
- Middlewareが実行され、Observerを呼び出す
-
next(action)
で次のMiddlewareまたはReducerに処理を渡す - Reducerが状態を更新する
StoreとProviderでの統合
ObserverとMiddlewareを実際にアプリケーションで使用するために、Storeの作成とProviderでの統合を行います:
// domains/store.ts
export const createStore = <T extends {}>(context: ObserverContext<T>) =>
configureStore({
reducer,
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().concat(createPortalkeyMiddleware(context))
})
// components/providers/ObserverContextProvider/ObserverContextProvider.tsx
export const ObserverContextProvider = ({ children }: { children: React.ReactNode }) => {
// Observerのコンテキストを作成
const context = new ObserverRegistry<ObserverContextMap>()
// Storeを作成し、Middlewareにcontextを渡す
const store = createStore<ObserverContextMap>(context)
// 初期化処理
store.dispatch(AppActions.changePlatform(isElectron() ? "desktop" : "web"))
return (
<Provider value={{ context }}>
<ReactReduxProvider store={store}>
{children}
</ReactReduxProvider>
</Provider>
)
}
このProviderにより、以下のことが実現されます:
- Observerのコンテキスト: 音声通話クライアントやサウンドプレイヤーなどの依存関係をObserverに提供
- Redux Store: Middlewareを含むStoreを作成し、Observerが自動的に実行される
- React統合: React ReduxのProviderでStoreをアプリケーション全体に提供
ObserverContextによる依存関係の管理
ObserverはReactと分離されているため、Reactのコンポーネントやフックに直接アクセスすることができません。そのため、サウンドプレイヤーやWebSocketクライアントなどの外部依存関係を副作用なしに操作するために、ObserverContextという仕組みを使用しています。
ObserverContextの実装
ObserverContextは、トークンベースの依存性注入システムとして実装されています:
// types/observer.ts
export const PortalkeyClientToken = Symbol("PortalkeyClient")
export const SoundPlayerToken = Symbol("SoundPlayer")
export const PushNotifierToken = Symbol("PushNotifier")
export interface ObserverContextMap {
[PortalkeyClientToken]: PortalKeyClient
[SoundPlayerToken]: SoundPlayer
[PushNotifierToken]: PushNotifier
}
// libs/flux/observerRegistry.ts
export class ObserverRegistry<T extends {} = Record<ContextToken, unknown>> {
private _registry: Map<keyof Types, any> = new Map()
public register<K extends keyof Types, T extends Types[K]>(token: K, value: T): void {
if (this._registry.has(token)) {
throw new Error(`Key ${token.toString()} already exists in the registry`)
}
this._registry.set(token, value)
}
public resolve<K extends keyof Types, T extends Types[K]>(token: K): T {
const registration = this._registry.get(token)
if (registration == null) {
throw new Error(`No registration found for token: ${token.toString()}`)
}
return registration as T
}
}
ObserverContextの使用例
Observer内でObserverContextを使用することで、外部依存関係にアクセスできます:
// domains/voice/observers.ts
export const voiceObserver = createObserver<UserVoiceState, ObserverContextMap>((builder) => {
builder.addCase(VoiceActions.changeDeviceState, ({ action, context }) => {
// ObserverContextからSoundPlayerを取得
const soundPlayer = context.resolve(SoundPlayerToken)
switch (action.payload.changed.device) {
case "mic":
if (action.payload.changed.isEnable) {
// 副作用なしにサウンドを再生
soundPlayer.playSound("micUnmute")
} else {
soundPlayer.playSound("micMute")
}
break
}
})
})
ObserverContextの利点
- React分離: ObserverがReactのコンポーネントやフックに依存しない
- 副作用の分離: 外部依存関係の操作をObserver内に集約
- テスタビリティ: モックオブジェクトを注入してテストが容易
- 型安全性: TypeScriptの型システムによる依存関係の型安全性
統合の流れ
- WebSocketイベント受信: WebSocketクライアントがイベントを受信
- Listener実行: 対応するListenerがイベントを検知し、ReduxのActionをdispatch
- Reducer実行: Actionに応じてReducerが状態を更新
- Observer実行: MiddlewareがObserverを実行し、副作用を処理
この仕組みにより、WebSocketイベントから状態更新、副作用処理まで一貫したフローで管理できます。
具体的な実装例
ワークスペースの状態管理
ユーザーがワークスペースに参加した際や、チームが作成された際の状態管理の例です:
// domains/workspace/reducers.ts
export const workspaceReducer = createReducer(workspaceInitialState, (builder) => {
builder.addCase(WorkspaceActions.workspaceUpdated, (state, action) => {
const workspace = action.payload.payload.workspace
if (workspace == null) {
return
}
// ワークスペース情報を更新
const result = getWorkspace(state, workspace.id).andThen(updateWorkspace(workspace))
if (result.ok) {
workspaceAdapter.upsertOne(state, result.value)
}
})
builder.addCase(WorkspaceActions.workspaceJoined, (state, action) => {
const member = action.payload.payload.member
if (member == null) {
return
}
// 新しいメンバーをワークスペースに追加
const result = getWorkspace(state, member.workspaceId).andThen(addWorkspaceMember(member))
if (result.ok) {
workspaceAdapter.upsertOne(state, result.value)
}
})
})
ユーザーの参加/退出イベントの処理
// domains/workspace/actions.ts
export const WorkspaceActions = {
workspaceUpdated: createAction<GatewayMessageWorkspaceUpdated, "workspace/workspaceUpdated">("workspace/workspaceUpdated"),
workspaceJoined: createAction<GatewayMessageWorkspaceJoined, "workspace/workspaceJoined">("workspace/workspaceJoined"),
workspaceMemberUpdated: createAction<GatewayMessageWorkspaceMemberUpdated, "workspace/workspaceMemberUpdated">("workspace/workspaceMemberUpdated")
}
メリットとデメリット
メリット
- 開発効率の向上: 状態管理のロジックが集約され、新機能の追加が容易
- デバッグのしやすさ: Redux DevToolsで状態変更を追跡可能
- 型安全性: Protocol Buffersによる型安全なイベント定義
- テストのしやすさ: 状態と副作用が分離されているため、単体テストが書きやすい
- コロケーション: 関連する処理が同じディレクトリに配置され、保守性が向上
デメリット
- 学習コスト: 独自のFluxライブラリの理解が必要
- 複雑性: ObserverパターンとListenerパターンの組み合わせによる複雑性
- ボイラープレート: 各ドメインでListenerとObserverの実装が必要
まとめ
今回は、PortalKeyのフロントエンドアーキテクチャのコアであるReduxの実装について紹介しました。
コア部分の実装は少し複雑になっていますが、その分データとリアルタイム実装のコロケーションが可能となった上で、機能実装をする際は簡潔かつ型安全に記述できるようになっています。
このアーキテクチャは、以下のような場面で応用できると考えています。
- チャットアプリケーション: メッセージの送受信と状態管理
- ゲーム: プレイヤーの行動とゲーム状態の同期
- コラボレーションツール: 複数ユーザーの編集状態の同期
あまりこういったアプリケーションを開発する機会はないかもしれませんが、もし実装する機会があれば参考になれば嬉しいです。
Discussion