nestjs + Kubernetes + Redisでwebsocketスケールアウト
はじめに
本記事の中で Kubernetes をk8s
と表記しています。
なお、本記事の成果物を下記リポジトリにて公開しております。必要に応じてご参照ください。
概要
- そもそも、websocket 通信(ステートフル通信)を k8s で実現するには、ingress に適切な設定が必要
- nestjs の公式サンプルはそのままでは動かず、custom-adapter を実装する必要がある
- Pod をまたいで配信するには Redis を使う
本編
いっとき流行った Websocket プロトコルによる配信サーバを、今更ながら冗長構成で構築する記事です。
Websocket プロトコル
皆さまご存じの通り、通常の HTTP(S)通信はアプリケーションとクライアントの間に ロードバランサ を挟むことで脳死でロードバランシングできますが、Websocket プロトコル通信は一筋縄ではいきません。
Websocket 通信は基本的に配信サーバとクライアント間でコネクションを保ったまま疎通を行うからです。
その他にも下記のような条件がありますが、詳細は RFC などオフィシャルな文献をご参照ください。
- HTTP/1.1 上でのみ動作する
- 最初は HTTP によってリクエストされ、付属の
Upgrade
ヘッダーによってプロトコルをスイッチする
① 非冗長構成なサーバサイドの実装
nestjs アプリケーションの構築
バックエンドに nestjs を用いるので、Nest CLIを導入していない場合はインストールします。
❯ yarn global add @nestjs/cli
プロジェクトルートで nest アプリケーションを生成します。
❯ pwd
/path/to/app
❯ nest new deliverer
⚡ We will scaffold your app in a few seconds..
CREATE deliverer/.eslintrc.js (666 bytes)
...
👉 Get started with the following commands:
$ cd deliverer
$ yarn run start
❯ cd deliverer
❯ yarn start
上記までで、 http://localhost:3000 にアクセスすると Hello World!
というメッセージが返ると思います。
websocket サーバの実装
websocket サーバ構築に必要なモジュールをインストールします。
❯ pwd
/path/to/app/deliverer
# socket.ioを明示的にインストールしないと2.x系が入ってしまいうまく動きません
❯ yarn add @nestjs/websockets @nestjs/platform-socket.io socket.io
❯ yarn add -D @types/socket.io
続いて Websocket 通信用の Gateway を実装します。
import { Logger } from '@nestjs/common';
import {
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway()
export class EventsGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer() server: Server;
private logger = new Logger(EventsGateway.name);
@SubscribeMessage('input_event')
handleEvent(_: Socket, data: string): WsResponse<{ status: string }> {
this.logger.log(data, 'input_event');
this.server.emit('broadcast_event', { data });
return { event: 'input_event', data: { status: 'ok' } };
}
@SubscribeMessage('broadcast_event')
handleBroadcast(): WsResponse<{ status: string }> {
return { event: 'broadcast_event', data: { status: 'OK' } };
}
afterInit(server: Server) {
this.logger.log('Init');
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
}
handleConnection(client: Socket, ...args: any[]) {
this.logger.log(`Client connected: ${client.id}`);
}
}
カスタムアダプタの実装
ここまではほぼ nestjs のサンプルの通りですが
こちらの issue にもあるとおり、現状 nestjs のサンプルは 最新の socket.io で動きません。
そこで、issue にあるとおり下記のカスタムアダプタを実装します。
下記は issue のままなので説明は割愛します。
アダプタの実装内容
import { INestApplicationContext } from '@nestjs/common';
import { isFunction, isNil } from '@nestjs/common/utils/shared.utils';
import {
AbstractWsAdapter,
MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server } from 'socket.io';
import { CustomOrigin } from '@nestjs/common/interfaces/external/cors-options.interface';
export class SocketIoAdapter extends AbstractWsAdapter {
constructor(
appOrHttpServer?: INestApplicationContext | any,
private corsOrigin?:
| boolean
| string
| RegExp
| (string | RegExp)[]
| CustomOrigin,
) {
super(appOrHttpServer);
}
public create(
port: number,
options?: any & { namespace?: string; server?: any },
): any {
if (!options) {
return this.createIOServer(port);
}
const { namespace, server, ...opt } = options;
return server && isFunction(server.of)
? server.of(namespace)
: namespace
? this.createIOServer(port, opt).of(namespace)
: this.createIOServer(port, opt);
}
public createIOServer(port: number, options?: any): any {
if (this.httpServer && port === 0) {
const s = new Server(this.httpServer, {
cors: {
origin: this.corsOrigin,
methods: ['GET', 'POST'],
credentials: true,
},
cookie: {
httpOnly: true,
path: '/',
},
maxHttpBufferSize: 1e6,
});
return s;
}
return new Server(port, options);
}
public bindMessageHandlers(
client: any,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
const disconnect$ = fromEvent(client, DISCONNECT_EVENT).pipe(
share(),
first(),
);
handlers.forEach(({ message, callback }) => {
const source$ = fromEvent(client, message).pipe(
mergeMap((payload: any) => {
const { data, ack } = this.mapPayload(payload);
return transform(callback(data, ack)).pipe(
filter((response: any) => !isNil(response)),
map((response: any) => [response, ack]),
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
if (response.event) {
return client.emit(response.event, response.data);
}
isFunction(ack) && ack(response);
});
});
}
public mapPayload(payload: any): { data: any; ack?: () => any } {
if (!Array.isArray(payload)) {
return { data: payload };
}
const lastElement = payload[payload.length - 1];
const isAck = isFunction(lastElement);
if (isAck) {
const size = payload.length - 1;
return {
data: size === 1 ? payload[0] : payload.slice(0, size),
ack: lastElement,
};
}
return { data: payload };
}
}
main.ts
でアダプタを適用します。
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { SocketIoAdapter } from './socket-io.adapter';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new SocketIoAdapter(app, true));
await app.listen(3000);
}
bootstrap();
② クライアントアプリケーションの構築
今回はVue CLIでを用いて、簡素な Vue3 アプリケーションを構築します。
クライアントアプリは k8s の外側に配置するので、任意のディレクトリで構いません。
vue3 アプリケーションの構築
❯ yarn global add @vue/cli
❯ vue create client
# Vue3+ts+class-componentで構築したいので、Manuallyを選択します。
? Please pick a preset: Manually select features
? Check the features needed for your project: Choose Vue version, Babel, TS, Vuex, Linter
? Choose a version of Vue.js that you want to start the project with 3.x
? Use class-style component syntax? Yes
? Use Babel alongside TypeScript (required for modern mode, auto-detected polyfills, transpiling JSX)? Yes
? Pick a linter / formatter config: Basic
? Pick additional lint features: Lint on save
? Where do you prefer placing config for Babel, ESLint, etc.? In dedicated config files
❯ cd client
❯ yarn serve
上記までで、 http://localhost:8080 にアクセスすると Welcome ページが表示されます。
ついでに、後ほど用いる vue-property-decorator
もインストールします。
❯ yarn add vue-property-decorator
おまけ 1:tailwindcss の導入
❯ vue add tailwindcss
しかし、vue cli でインストールされる tailwindcss は v0.6 と古ためうまくビルドできません。(一部のプロパティが効かないような中途半端なレイアウトになります)
↓VSCode にも怒られる...
ので、下記を追加でインストールします。
❯ yarn add -D tailwindcss@latest postcss@8 autoprefixer@9
自動生成された css ファイルを下記内容で上書きます。
@tailwind base;
@tailwind components;
@tailwind utilities;
// 下記を追記
import "./assets/style/main.css";
おまけ 2:vuex の型定義追加
下記を実装しておくと、TypeScript コンパイラ等に怒られなくてすみます。
import Store from "@/store";
declare module "@vue/runtime-core" {
interface ComponentCustomProperties {
$store: Store;
}
}
websocket クライアントの実装
必要モジュールをインストールします。
❯ yarn add socket.io-client
❯ yarn add -D @types/socket.io-client
vuex を用いて、受信したメッセージを記録できるようにしてみました。
import { createStore } from "vuex";
export interface State {
messages: { message: string }[];
}
export default createStore<State>({
state: {
messages: [],
},
mutations: {
addMessage(state, data: { message: string }) {
state.messages.push(data);
},
deleteMessages(state) {
state.messages = [];
},
},
actions: {
receiveMessage({ commit }, data: { message: string }) {
commit("addMessage", data);
},
initMessages({ commit }) {
commit("deleteMessages");
},
},
getters: {
messages: (state) => {
return state.messages;
},
},
modules: {},
});
Vue はこんな感じ ↓ 大した処理はしていません。
<template>
<div class="w-full h-full bg-gray-100 p-2">
<h1 class="text-3xl font-bold text-gray-800">メッセージ送信テスト</h1>
<div class="my-4 border-2 border-blue-900 rounded p-4">
<input type="text" v-model="message" />
<button
class="inline-block bg-yellow-400 ml-1 rounded px-5 py-1 shadow"
@click="sendMessage"
>
送信
</button>
</div>
<p v-for="(message, idx) in messages" :key="idx">
{{ message }}
</p>
</div>
</template>
<script lang="ts">
import { Options, Vue } from "vue-class-component";
import io from "socket.io-client";
@Options({
components: {}
})
export default class Home extends Vue {
message = "hello!";
io = io("http://localhost:3000", {
transports: ["websocket"]
});
socket = this.io.connect();
sendMessage(): void {
console.log({ message: this.message });
this.socket.emit("input_event", this.message);
}
mounted(): void {
this.socket.on("broadcast_event", data => {
console.log("receive", data);
this.$store.dispatch("receiveMessage", data);
});
}
get messages(): { message: string }[] {
return this.$store.getters.messages;
}
beforeUnmount(): void {
this.socket.disconnect();
}
}
</script>
ここまでで、 nestjs アプリケーションと vue アプリケーションを両方起動し、 http://localhost:8080 へアクセスすると
下記画面のようにオウム返しアプリが構築できているかと思います。
サーバログでも確認できますね。
[Nest] 51534 - 2021-04-01 12:34:56 [input_event] hello!
③ k8s の構築
今回は、Docker for Mac に付属する k8s を用います。
コンテキスト設定の確認・namespace の追加
namespace としてwebsocket-sample
を追加します。
❯ kubectl config get-contexts
CURRENT NAME CLUSTER AUTHINFO NAMESPACE
* docker-desktop docker-desktop docker-desktop
❯ kubectl create namespace websocket-sample
namespace/websocket-sample created
❯ kubectl config set-context --current --namespace=websocket-sample
Context "docker-desktop" modified.
❯ kubectl config get-contexts
CURRENT NAME CLUSTER AUTHINFO NAMESPACE
* docker-desktop docker-desktop docker-desktop websocket-sample
設定ファイル群の記述
ingress.yaml
ロードバランサー...とは違い、Ingress は各パスを path-forwarding せずそのまま Node へ流します。
k8s 環境ローカルだったり自前の k8s(??)だったりの場合は、ingress にNGINX Ingress Controllerを用います。
❯ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v0.46.0/deploy/static/provider/cloud/deploy.yaml
ingress に下記設定を施すことで、Websocket に必要な IP,Upgrade ヘッダを後ろへ渡すことができます。
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
nginx.ingress.kubernetes.io/server-snippets: |
location / {
real_ip_header X-Forwarded-For;
real_ip_recursive on;
proxy_http_version 1.1;
proxy_set_header X-Forwarded-Host $http_host;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Host $host;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_cache_bypass $http_upgrade;
}
レガシーな構成の場合でもロードバランサに nginx を導入しているケース多いかと思います。
その場合は /etc/nginx/conf.d/hoge.conf
に上記を参考にヘッダプロキシを設定してあげてください。
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
namespace: websocket-sample
name: websocket-sample
labels:
name: websocket-sample
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
nginx.ingress.kubernetes.io/server-snippets: |
location / {
real_ip_header X-Forwarded-For;
real_ip_recursive on;
proxy_http_version 1.1;
proxy_set_header X-Forwarded-Host $http_host;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Host $host;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_cache_bypass $http_upgrade;
}
spec:
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: deliverer-svc
port:
number: 3000
- path: /socket.io
pathType: Prefix
backend:
service:
name: deliverer-svc
port:
number: 3000
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: websocket-sample
name: deliverer
spec:
replicas: 2
selector:
matchLabels:
app: deliverer
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 50%
template:
metadata:
labels:
app: deliverer
spec:
containers:
- name: deliverer
image: deliverer:0.0.1
imagePullPolicy: IfNotPresent
resources:
limits:
memory: "512Mi"
cpu: "500m"
ports:
- containerPort: 3000
nestjs アプリケーションを Docker image 化し、k8s に乗せます。
# notice: dev環境用の構成です
FROM node:14.17
RUN npm i -g @nestjs/cli
WORKDIR /app/api-server
COPY . /app/api-server
RUN npm i
EXPOSE 3000
CMD [ "npm", "run", "start"]
余計なディレクトリを巻き込まないよう .dockerignore
で除去設定を施します。
node_modules
dist
最後に k8s 用の service.yaml を置きます。
apiVersion: v1
kind: Service
metadata:
namespace: websocket-sample
name: deliverer-svc
spec:
selector:
app: deliverer
ports:
- name: default-port
protocol: TCP
port: 3000
targetPort: 3000
k8s 構築と動作確認
下記コマンド群で k8s 環境を構築します。
# ./deliverer をDocker image化
❯ docker build --no-cache -t deliverer:0.0.1 ./deliverer
# もろもろapply
❯ kubectl apply -f k8s/deployment.yaml
deployment.apps/deliverer created
❯ kubectl apply -f k8s/ingress.yaml
ingress.networking.k8s.io/websocket-sample created
❯ kubectl apply -f deliverer/service.yaml
service/deliverer-svc created
k8s のエンドポイントは http://localhost/
なので、 websocket の宛先 URL を下記に変更します。
io = io("http://localhost", {
transports: ["websocket"]
});
vue アプリケーションを起動し、オウム返しアプリを複数タブで開いてみると、ここで 配信されるメッセージとされないメッセージがある ことに気づくかと思います。
今回、deliverer
のレプリカ数を 2 にしたので、2 台の Pod が起動することになります。
冒頭で述べましたとおり Websocket 通信は基本的に配信サーバとクライアント間でコネクションを保ったまま疎通を行うので、別の pod とコネクションを確立した端末間ではメッセージの送受信が出来ないのです。
これを解決するために、Websocket アダプタに Redis をかませます。
④ Redis の導入
nestjs 側へ redis を導入します。
❯ pwd
/path/to/app/deliverer
❯ yarn add redis @socket.io/redis-adapter
❯ yarn add -D types/redis
そして、冒頭で追加した deliverer/src/socket-io.adapter.ts
で、redis adapter を追加します。
(追記後、コンテナを build するのを忘れずに。バージョンタグも上げ、deployment.yaml
も apply しておきましょう)
import { RedisClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";
// 中略...
public createIOServer(port: number, options?: any): any {
const s = new Server(this.httpServer, {
cors: {
origin: this.corsOrigin,
methods: ['GET', 'POST'],
credentials: true,
},
cookie: {
httpOnly: true,
path: '/',
},
maxHttpBufferSize: 1e6,
});
// ここから追加実装
const pubClient = new RedisClient({
host: "<REDIS_HOST>", // ホスト端末のIPアドレスを入れます
port: 6379,
});
const subClient = pubClient.duplicate();
s.adapter(createAdapter(pubClient, subClient));
// ここまで
return s;
}
Redis は Docker や docker-compose で立ち上げておきます。
redis:
image: "redis:latest"
ports:
- "6379:6379"
volumes:
- /data
mem_limit: 1g
これで Redis, k8s, vue アプリを全て起動すれば、無事に Pod をまたいでメッセージを配信できるはずです!!
うまく動かないときは stern を使ってstern deliverer
でログを参照したり、先述の Dashboard でデプロイ状況を確認してみてください。
長文・駄文にお付き合いくださった皆様、ありがとうございます。
Discussion