🐺

nestjs + Kubernetes + Redisでwebsocketスケールアウト

2021/06/06に公開

はじめに

本記事の中で Kubernetes をk8sと表記しています。

なお、本記事の成果物を下記リポジトリにて公開しております。必要に応じてご参照ください。
https://github.com/up-tri/redundant_websocket_sample

概要

  • そもそも、websocket 通信(ステートフル通信)を k8s で実現するには、ingress に適切な設定が必要
  • nestjs の公式サンプルはそのままでは動かず、custom-adapter を実装する必要がある
  • Pod をまたいで配信するには Redis を使う

本編

いっとき流行った Websocket プロトコルによる配信サーバを、今更ながら冗長構成で構築する記事です。

Websocket プロトコル

皆さまご存じの通り、通常の HTTP(S)通信はアプリケーションとクライアントの間に ロードバランサ を挟むことで脳死でロードバランシングできますが、Websocket プロトコル通信は一筋縄ではいきません。
Websocket 通信は基本的に配信サーバとクライアント間でコネクションを保ったまま疎通を行うからです。

その他にも下記のような条件がありますが、詳細は RFC などオフィシャルな文献をご参照ください。

  • HTTP/1.1 上でのみ動作する
  • 最初は HTTP によってリクエストされ、付属のUpgradeヘッダーによってプロトコルをスイッチする

① 非冗長構成なサーバサイドの実装

nestjs アプリケーションの構築

バックエンドに nestjs を用いるので、Nest CLIを導入していない場合はインストールします。

yarn global add @nestjs/cli

プロジェクトルートで nest アプリケーションを生成します。

pwd
/path/to/app

❯ nest new deliverer
⚡  We will scaffold your app in a few seconds..

CREATE deliverer/.eslintrc.js (666 bytes)

...


👉  Get started with the following commands:

$ cd deliverer
$ yarn run start


❯ cd deliverer
❯ yarn start

上記までで、 http://localhost:3000 にアクセスすると Hello World! というメッセージが返ると思います。

websocket サーバの実装

websocket サーバ構築に必要なモジュールをインストールします。

pwd
/path/to/app/deliverer

# socket.ioを明示的にインストールしないと2.x系が入ってしまいうまく動きませんyarn add @nestjs/websockets @nestjs/platform-socket.io socket.io
❯ yarn add -D @types/socket.io

続いて Websocket 通信用の Gateway を実装します。

events.gateway.ts
import { Logger } from '@nestjs/common';
import {
  OnGatewayConnection,
  OnGatewayDisconnect,
  OnGatewayInit,
  SubscribeMessage,
  WebSocketGateway,
  WebSocketServer,
  WsResponse,
} from '@nestjs/websockets';

import { Server, Socket } from 'socket.io';

@WebSocketGateway()
export class EventsGateway
  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
  @WebSocketServer() server: Server;
  private logger = new Logger(EventsGateway.name);

  @SubscribeMessage('input_event')
  handleEvent(_: Socket, data: string): WsResponse<{ status: string }> {
    this.logger.log(data, 'input_event');
    this.server.emit('broadcast_event', { data });

    return { event: 'input_event', data: { status: 'ok' } };
  }

  @SubscribeMessage('broadcast_event')
  handleBroadcast(): WsResponse<{ status: string }> {
    return { event: 'broadcast_event', data: { status: 'OK' } };
  }

  afterInit(server: Server) {
    this.logger.log('Init');
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
  }

  handleConnection(client: Socket, ...args: any[]) {
    this.logger.log(`Client connected: ${client.id}`);
  }
}

カスタムアダプタの実装

ここまではほぼ nestjs のサンプルの通りですが
こちらの issue にもあるとおり、現状 nestjs のサンプルは 最新の socket.io で動きません。

そこで、issue にあるとおり下記のカスタムアダプタを実装します。
下記は issue のままなので説明は割愛します。

アダプタの実装内容
src/socket-io.adapter.ts
import { INestApplicationContext } from '@nestjs/common';
import { isFunction, isNil } from '@nestjs/common/utils/shared.utils';
import {
  AbstractWsAdapter,
  MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server } from 'socket.io';
import { CustomOrigin } from '@nestjs/common/interfaces/external/cors-options.interface';

export class SocketIoAdapter extends AbstractWsAdapter {
  constructor(
    appOrHttpServer?: INestApplicationContext | any,
    private corsOrigin?:
      | boolean
      | string
      | RegExp
      | (string | RegExp)[]
      | CustomOrigin,
  ) {
    super(appOrHttpServer);
  }

  public create(
    port: number,
    options?: any & { namespace?: string; server?: any },
  ): any {
    if (!options) {
      return this.createIOServer(port);
    }
    const { namespace, server, ...opt } = options;
    return server && isFunction(server.of)
      ? server.of(namespace)
      : namespace
      ? this.createIOServer(port, opt).of(namespace)
      : this.createIOServer(port, opt);
  }

  public createIOServer(port: number, options?: any): any {
    if (this.httpServer && port === 0) {
      const s = new Server(this.httpServer, {
        cors: {
          origin: this.corsOrigin,
          methods: ['GET', 'POST'],
          credentials: true,
        },
        cookie: {
          httpOnly: true,
          path: '/',
        },
        maxHttpBufferSize: 1e6,
      });

      return s;
    }
    return new Server(port, options);
  }

  public bindMessageHandlers(
    client: any,
    handlers: MessageMappingProperties[],
    transform: (data: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(client, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const source$ = fromEvent(client, message).pipe(
        mergeMap((payload: any) => {
          const { data, ack } = this.mapPayload(payload);
          return transform(callback(data, ack)).pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      source$.subscribe(([response, ack]) => {
        if (response.event) {
          return client.emit(response.event, response.data);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

  public mapPayload(payload: any): { data: any; ack?: () => any } {
    if (!Array.isArray(payload)) {
      return { data: payload };
    }
    const lastElement = payload[payload.length - 1];
    const isAck = isFunction(lastElement);
    if (isAck) {
      const size = payload.length - 1;
      return {
        data: size === 1 ? payload[0] : payload.slice(0, size),
        ack: lastElement,
      };
    }
    return { data: payload };
  }
}

main.tsでアダプタを適用します。

src/main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { SocketIoAdapter } from './socket-io.adapter';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.useWebSocketAdapter(new SocketIoAdapter(app, true));
  await app.listen(3000);
}
bootstrap();

② クライアントアプリケーションの構築

今回はVue CLIでを用いて、簡素な Vue3 アプリケーションを構築します。
クライアントアプリは k8s の外側に配置するので、任意のディレクトリで構いません。

vue3 アプリケーションの構築

yarn global add @vue/cli
❯ vue create client

# Vue3+ts+class-componentで構築したいので、Manuallyを選択します。
? Please pick a preset: Manually select features
? Check the features needed for your project: Choose Vue version, Babel, TS, Vuex, Linter
? Choose a version of Vue.js that you want to start the project with 3.x
? Use class-style component syntax? Yes
? Use Babel alongside TypeScript (required for modern mode, auto-detected polyfills, transpiling JSX)? Yes
? Pick a linter / formatter config: Basic
? Pick additional lint features: Lint on save
? Where do you prefer placing config for Babel, ESLint, etc.? In dedicated config files


❯ cd client
❯ yarn serve

上記までで、 http://localhost:8080 にアクセスすると Welcome ページが表示されます。

ついでに、後ほど用いる vue-property-decoratorもインストールします。

yarn add vue-property-decorator

おまけ 1:tailwindcss の導入

❯ vue add tailwindcss

しかし、vue cli でインストールされる tailwindcss は v0.6 と古ためうまくビルドできません。(一部のプロパティが効かないような中途半端なレイアウトになります)
↓VSCode にも怒られる...

ので、下記を追加でインストールします。

yarn add -D tailwindcss@latest postcss@8 autoprefixer@9

自動生成された css ファイルを下記内容で上書きます。

src/assets/style/main.css
@tailwind base;
@tailwind components;
@tailwind utilities;
src/main.ts
// 下記を追記
import "./assets/style/main.css";

おまけ 2:vuex の型定義追加

下記を実装しておくと、TypeScript コンパイラ等に怒られなくてすみます。

src/shims-vuex.ts
import Store from "@/store";

declare module "@vue/runtime-core" {
  interface ComponentCustomProperties {
    $store: Store;
  }
}

websocket クライアントの実装

必要モジュールをインストールします。

yarn add socket.io-client
❯ yarn add -D @types/socket.io-client

vuex を用いて、受信したメッセージを記録できるようにしてみました。

src/store/index.ts
import { createStore } from "vuex";

export interface State {
  messages: { message: string }[];
}

export default createStore<State>({
  state: {
    messages: [],
  },
  mutations: {
    addMessage(state, data: { message: string }) {
      state.messages.push(data);
    },
    deleteMessages(state) {
      state.messages = [];
    },
  },
  actions: {
    receiveMessage({ commit }, data: { message: string }) {
      commit("addMessage", data);
    },
    initMessages({ commit }) {
      commit("deleteMessages");
    },
  },
  getters: {
    messages: (state) => {
      return state.messages;
    },
  },
  modules: {},
});

Vue はこんな感じ ↓  大した処理はしていません。

src/App.vue
<template>
  <div class="w-full h-full bg-gray-100 p-2">
    <h1 class="text-3xl font-bold text-gray-800">メッセージ送信テスト</h1>
    <div class="my-4 border-2 border-blue-900 rounded p-4">
      <input type="text" v-model="message" />
      <button
        class="inline-block bg-yellow-400 ml-1 rounded px-5 py-1 shadow"
        @click="sendMessage"
      >
        送信
      </button>
    </div>
    <p v-for="(message, idx) in messages" :key="idx">
      {{ message }}
    </p>
  </div>
</template>

<script lang="ts">
import { Options, Vue } from "vue-class-component";
import io from "socket.io-client";

@Options({
  components: {}
})
export default class Home extends Vue {
  message = "hello!";

  io = io("http://localhost:3000", {
    transports: ["websocket"]
  });
  socket = this.io.connect();

  sendMessage(): void {
    console.log({ message: this.message });
    this.socket.emit("input_event", this.message);
  }

  mounted(): void {
    this.socket.on("broadcast_event", data => {
      console.log("receive", data);
      this.$store.dispatch("receiveMessage", data);
    });
  }

  get messages(): { message: string }[] {
    return this.$store.getters.messages;
  }

  beforeUnmount(): void {
    this.socket.disconnect();
  }
}
</script>

ここまでで、 nestjs アプリケーションと vue アプリケーションを両方起動し、 http://localhost:8080 へアクセスすると
下記画面のようにオウム返しアプリが構築できているかと思います。

サーバログでも確認できますね。

[Nest] 51534   - 2021-04-01 12:34:56   [input_event] hello!

③ k8s の構築

今回は、Docker for Mac に付属する k8s を用います。

コンテキスト設定の確認・namespace の追加

namespace としてwebsocket-sampleを追加します。

❯ kubectl config get-contexts
CURRENT   NAME             CLUSTER          AUTHINFO         NAMESPACE
*         docker-desktop   docker-desktop   docker-desktop

❯ kubectl create namespace websocket-sample
namespace/websocket-sample created

❯ kubectl config set-context --current --namespace=websocket-sample
Context "docker-desktop" modified.

❯ kubectl config get-contexts
CURRENT   NAME             CLUSTER          AUTHINFO         NAMESPACE
*         docker-desktop   docker-desktop   docker-desktop   websocket-sample

設定ファイル群の記述

ingress.yaml

ロードバランサー...とは違い、Ingress は各パスを path-forwarding せずそのまま Node へ流します。

k8s 環境ローカルだったり自前の k8s(??)だったりの場合は、ingress にNGINX Ingress Controllerを用います。

❯ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v0.46.0/deploy/static/provider/cloud/deploy.yaml

ingress に下記設定を施すことで、Websocket に必要な IP,Upgrade ヘッダを後ろへ渡すことができます。

annotations:
  kubernetes.io/ingress.class: nginx
  nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
  nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
  nginx.ingress.kubernetes.io/server-snippets: |
    location / {
      real_ip_header X-Forwarded-For;
      real_ip_recursive on;

      proxy_http_version 1.1;

      proxy_set_header X-Forwarded-Host $http_host;
      proxy_set_header X-Forwarded-Proto $scheme;
      proxy_set_header X-Forwarded-For $remote_addr;
      proxy_set_header Host $host;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "upgrade";
      proxy_cache_bypass $http_upgrade;
    }

レガシーな構成の場合でもロードバランサに nginx を導入しているケース多いかと思います。
その場合は /etc/nginx/conf.d/hoge.conf に上記を参考にヘッダプロキシを設定してあげてください。

k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  namespace: websocket-sample
  name: websocket-sample
  labels:
    name: websocket-sample
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
    nginx.ingress.kubernetes.io/server-snippets: |
      location / {
        real_ip_header X-Forwarded-For;
        real_ip_recursive on;

        proxy_http_version 1.1;

        proxy_set_header X-Forwarded-Host $http_host;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_cache_bypass $http_upgrade;
      }

spec:
  rules:
    - http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: deliverer-svc
                port:
                  number: 3000
          - path: /socket.io
            pathType: Prefix
            backend:
              service:
                name: deliverer-svc
                port:
                  number: 3000

deployment.yaml

k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: websocket-sample
  name: deliverer
spec:
  replicas: 2
  selector:
    matchLabels:
      app: deliverer
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 50%
  template:
    metadata:
      labels:
        app: deliverer
    spec:
      containers:
        - name: deliverer
          image: deliverer:0.0.1
          imagePullPolicy: IfNotPresent
          resources:
            limits:
              memory: "512Mi"
              cpu: "500m"
          ports:
            - containerPort: 3000

nestjs アプリケーションを Docker image 化し、k8s に乗せます。

deliverer/Dockerfile
# notice: dev環境用の構成です
FROM node:14.17

RUN npm i -g @nestjs/cli

WORKDIR /app/api-server
COPY . /app/api-server

RUN npm i

EXPOSE 3000

CMD [ "npm", "run", "start"]

余計なディレクトリを巻き込まないよう .dockerignoreで除去設定を施します。

deliverer/.dockerignore
node_modules
dist

最後に k8s 用の service.yaml を置きます。

deliverer/service.yaml
apiVersion: v1
kind: Service
metadata:
  namespace: websocket-sample
  name: deliverer-svc
spec:
  selector:
    app: deliverer
  ports:
    - name: default-port
      protocol: TCP
      port: 3000
      targetPort: 3000

k8s 構築と動作確認

下記コマンド群で k8s 環境を構築します。

# ./deliverer をDocker image化docker build --no-cache -t deliverer:0.0.1 ./deliverer
# もろもろapply
❯ kubectl apply -f k8s/deployment.yaml
deployment.apps/deliverer created

❯ kubectl apply -f k8s/ingress.yaml
ingress.networking.k8s.io/websocket-sample created

❯ kubectl apply -f deliverer/service.yaml
service/deliverer-svc created

k8s のエンドポイントは http://localhost/なので、 websocket の宛先 URL を下記に変更します。

client/src/App.vue
  io = io("http://localhost", {
    transports: ["websocket"]
  });

vue アプリケーションを起動し、オウム返しアプリを複数タブで開いてみると、ここで 配信されるメッセージとされないメッセージがある ことに気づくかと思います。

今回、delivererのレプリカ数を 2 にしたので、2 台の Pod が起動することになります。
冒頭で述べましたとおり Websocket 通信は基本的に配信サーバとクライアント間でコネクションを保ったまま疎通を行うので、別の pod とコネクションを確立した端末間ではメッセージの送受信が出来ないのです。

これを解決するために、Websocket アダプタに Redis をかませます。

④ Redis の導入

nestjs 側へ redis を導入します。

pwd
/path/to/app/deliverer

❯ yarn add redis @socket.io/redis-adapter
❯ yarn add -D types/redis

そして、冒頭で追加した deliverer/src/socket-io.adapter.ts で、redis adapter を追加します。
(追記後、コンテナを build するのを忘れずに。バージョンタグも上げ、deployment.yamlも apply しておきましょう)

import { RedisClient } from "redis";
import { createAdapter } from "@socket.io/redis-adapter";

// 中略...

  public createIOServer(port: number, options?: any): any {
    const s = new Server(this.httpServer, {
      cors: {
        origin: this.corsOrigin,
        methods: ['GET', 'POST'],
        credentials: true,
      },
      cookie: {
        httpOnly: true,
        path: '/',
      },
      maxHttpBufferSize: 1e6,
    });

    // ここから追加実装
    const pubClient = new RedisClient({
      host: "<REDIS_HOST>", // ホスト端末のIPアドレスを入れます
      port: 6379,
    });
    const subClient = pubClient.duplicate();
    s.adapter(createAdapter(pubClient, subClient));
    // ここまで

    return s;
  }

Redis は Docker や docker-compose で立ち上げておきます。

docker-compose.yaml
redis:
  image: "redis:latest"
  ports:
    - "6379:6379"
  volumes:
    - /data
  mem_limit: 1g

これで Redis, k8s, vue アプリを全て起動すれば、無事に Pod をまたいでメッセージを配信できるはずです!!
うまく動かないときは stern を使ってstern delivererでログを参照したり、先述の Dashboard でデプロイ状況を確認してみてください。

長文・駄文にお付き合いくださった皆様、ありがとうございます。

Discussion