✍🏼

CloudflareWorkers で共同編集

2024/02/20に公開

はじめに

フロントエンドエンジニアなら一度は憧れる共同編集。ただ websocket や webrtc のような stateful な通信が必須であるために、手軽なものとは言えない。できればサーバーレスでデータベースなども使わずに実装したい。そんなときに CloudflareWorkers の DurableObjects が役に立つ。

CloudflareWorkers の Durable Objects には Hibernation API と transaction storage があり、websocket の接続とデータの永続化ができる機能がある。これが共同編集と非常に相性が良い。
またスケーリングの面[1]から見ても、優れているのではないかと思っている。

しかし、共同編集ライブラリとして有名な Yjs は Node.js での実装が前提であり、そのままでは CloudflareWorkers で使用することができない。そこで、Yjs の websocket 通信を再実装し、Durable Objects を使用することで CloudflareWorkers 上で共同編集を実現するライブラリを作成した。

https://developers.cloudflare.com/durable-objects/

https://developers.cloudflare.com/durable-objects/reference/websockets/

共同編集デモ

Yjs on Cloudflare Workers with Durable Objects Demo Movie

デモサイトはこちら

CloudflareWorkers DurableObjects について

https://developers.cloudflare.com/durable-objects/

Use Durable Objects to build collaborative editing tools, interactive chat, multiplayer games and applications that need coordination among multiple clients, without requiring you to build serialization and coordination primitives on your own.

共同編集やチャットゲームのために作られたものらしいので、今回のやりたいことに最適。

Hibernation API

https://developers.cloudflare.com/durable-objects/reference/websockets/#websocket-hibernation

The Hibernatable WebSockets API allows a Durable Object that is not currently running an event handler (such as handling a WebSocket message, HTTP request, or alarms) to be removed from memory while keeping its WebSockets connected (“hibernation”)

従来の Durable Objects は websocket の connection を in-memory state として持たせる必要があり、Durable Objects が揮発した段階で状態が消えてしまうという問題があった。しかし、Hibernation API が追加されたことで、Websocket の connection を維持したまま「休止状態」にすることができ、長い期間接続できるようになった。

Yjsについて

Yjs は共同編集を実現するためのライブラリであり、CRDT に基づいて構築されている。これにより、複数のユーザーが同時に文書を編集しても、データの衝突や破損を防ぐことができる。

https://docs.Yjs.dev/

サーバー実装である y-websocket は、Node.js での実装が前提であるため、CloudflareWorkers で使用することができない。

https://github.com/Yjs/y-websocket/blob/master/bin/server.js#L6-L7

y-websocket を構築するために必要なライブラリ群の lib0y-protocols は動かすことができるので utils.js を参考にして Cloudflare Workers 仕様に実装しなおせばよい。

https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L1-L9

Yjs を CloudflareWorkers で動かす

y-websocket ベースに再実装を行う。以下のことが行えれば十分である。

  • 接続の保持
  • 変更の通知
  • 変更の永続化

y-websocket はすべての接続状態を一括で管理し、id と connection の対応を保持しているが Durable Objects の場合は id ごとにインスタンスを立ち上げるため必要ない。

こちらが完成したもの。どのように作っていったかは以下に記載する。

https://github.com/napolab/y-durableobjects/blob/main/src/yjs/index.ts#L10-L102

接続の保持

Hibernation API の acceptWebSocket と getWebSockets を使用する。

https://developers.cloudflare.com/durable-objects/api/websockets/#acceptwebsocket

https://developers.cloudflare.com/durable-objects/api/websockets/#getwebsockets

実装例
import { Hono, Env } from "hono";

class Connection<E extends Env> implements DurableObject {
  private readonly app = new Hono();
  private session = new Set<WebSocket>();

  constructor(
    private readonly state: DurableObjectState,
    private readonly env: E["Bindings"],
  ) {
    void this.state.blockConcurrencyWhile(async () => {
      for (const ws of this.state.getWebSocket()) {
        this.session.add(ws);
      }
    });

    // upgrade header の確認は必要
    this.app.get("/", async (c) => {
      const pair = new WebSocketPair();
      const client = pair[0];
      const server = pair[1];

      this.state.acceptWebSocket(server);

      return new Response(null, { webSocket: client, status: 101 });
    });
  }

  async fetch(request: Request): Promise<Response> {
    return this.app.request(request, undefined, this.env);
  }

  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    // Hibernation API での message 受信
  }

  async webSocketError(ws: WebSocket): Promise<void> {
    // Hibernation API での error
  }

  async webSocketClose(ws: WebSocket): Promise<void> {
    // Hibernation API での close
  }
}

変更の通知

y-websocket で変更の受信・通知をしているのはこの部分。YDoc は buffer や base64 に永続化でき、websocket connection は Hibernation API で保持できるため、これらの実装は in-memory state でもよい。永続化のことを考えずに実装する。

YDoc の変更を通知
https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L87-L136

client から送られてくる変更を受け取り server の YDoc を更新
https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L162-L187

実装例

WSSharedDoc に connection の関心は必要ないと思ったため、doc の更新と通知のインターフェースに変更し、使用する側で websocket send を行うようにした。

import { createDecoder, readVarUint, readVarUint8Array } from "lib0/decoding";
import { createEncoder, length, toUint8Array, writeVarUint, writeVarUint8Array } from "lib0/encoding";
import { applyAwarenessUpdate, Awareness, encodeAwarenessUpdate } from "y-protocols/awareness";
import { readSyncMessage, writeUpdate } from "y-protocols/sync";
import { Doc } from "yjs";

const messageSync = 0;
const messageAwareness = 1;

type Changes = {
  added: number[];
  updated: number[];
  removed: number[];
};
type Listener<T> = (message: T) => void;
type Unsubscribe = () => void;
interface Notification<T> {
  notify(cb: Listener<T>): Unsubscribe;
}

export class WSSharedDoc extends Doc implements Notification<Uint8Array> {
  private listeners = new Set<Listener<Uint8Array>>();
  readonly awareness = new Awareness(this);

  constructor(gc = true) {
    super({ gc });
    this.awareness.setLocalState(null);
    this.setup();

    this.awareness.on("update", (changes: Changes) => {
      this.awarenessChangeHandler(changes);
    });
    this.on("update", (update: Uint8Array) => {
      this.syncMessageHandler(update);
    });
  }

  update(message: Uint8Array) {
    const encoder = createEncoder();
    const decoder = createDecoder(new Uint8Array(message));
    const messageType = readVarUint(decoder);

    switch (messageType) {
      case messageSync: {
        writeVarUint(encoder, messageSync);
        readSyncMessage(decoder, encoder, this, null);

        if (length(encoder) > 1) {
          this._notify(toUint8Array(encoder));
        }
        break;
      }
      case messageAwareness: {
        applyAwarenessUpdate(this.awareness, readVarUint8Array(decoder), null);
        break;
      }
    }
  }

  notify(listener: Listener<Uint8Array>) {
    this.listeners.add(listener);

    return () => {
      this.listeners.delete(listener);
    };
  }

  private setup() {
    const encoder = createEncoder();
    writeVarUint(encoder, messageSync);
    this._notify(toUint8Array(encoder));

    const awarenessStates = this.awareness.getStates();
    if (awarenessStates.size > 0) {
      writeVarUint(encoder, messageAwareness);
      const message = encodeAwarenessUpdate(this.awareness, Array.from(awarenessStates.keys()), this.awareness.states);
      writeVarUint8Array(encoder, message);

      this._notify(toUint8Array(encoder));
    }
  }

  private syncMessageHandler(update: Uint8Array) {
    const encoder = createEncoder();
    writeVarUint(encoder, messageSync);
    writeUpdate(encoder, update);

    this._notify(toUint8Array(encoder));
  }
  private awarenessChangeHandler({ added, updated, removed }: Changes) {
    const changedClients = [...added, ...updated, ...removed];
    const encoder = createEncoder();
    writeVarUint(encoder, messageAwareness);
    writeVarUint8Array(encoder, encodeAwarenessUpdate(this.awareness, changedClients, this.awareness.states));

    this._notify(toUint8Array(encoder));
  }

  private _notify(message: Uint8Array) {
    for (const subscriber of this.listeners) {
      subscriber(message);
    }
  }
}

変更の永続化

y-websocket は Leveldb に永続化しているが、Durable Objects は transaction storage があるため、これを使用する。

https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L28-L47

また保存タイミングは connection が 0 になったときとなっていたが、デモをしばらく運用してみた結果たまに揮発していたような挙動をしたので connection の数が 1 以下の時に close や error が起きたときに永続化した。

https://github.com/yjs/y-websocket/blob/master/bin/utils.js#L203-L206

できたもの

https://github.com/napolab/y-durableobjects

https://www.npmjs.com/package/y-durableobjects

まとめ

CloudflareWorkers の DurableObjects を用いて共同編集を実現することができた。

Hibernation API と transaction storage を使用することで、簡単に websocket の接続とデータの永続化ができた。共同編集の他にも簡単なゲームも作ってみたいと思った。フロントエンドで表現できる幅が大きく広がるようなプロダクトだと思う。ありがとう Cloudflare。

僕は実際にスケーリングするところやデプロイ直後の不安定さ、計測監視のようなプロダクション投入できるか否かまでは検証しきれてないため、どのような問題があるか気になるところ。

もし実際にライブラリを本番に適用してみたい・やってみた等あればコメントや Twitter[2] 等で教えていただけると嬉しいです。

脚注
  1. https://developers.cloudflare.com/durable-objects/platform/limits/ ↩︎

  2. https://twitter.com/naporin24690 ↩︎

Discussion