cloudflare d1 と DurableObject で firestore の夢を見る
完成形
firestore のメリット・デメリット
メリデメはこんな感じ。firestore を多用する理由はデメリットが大きい反面、圧倒的なスピードを得られるから。(refetch を websocket にすべて任せられるのは強いがよい
メリット
- firestore の realtime 性はとても強力
- 大体のケースにおいて web api server を書く必要がない
- 逆に書くと遅くなるので client side sdk で完結させるように書くことが多い
デメリット
- NoSQL なので変更に弱い
- rules がマジできつい
- client side sdk で DB を変更するのでセキュリティを rules という独自言語で担保するのだが、これが型がないうえに複雑な状態が持てないので roles による可視性制御がめちゃくちゃ大変(できることにはできる)
- web api server を書こうと思ったときに firebase cloudfunction という手段をとることになるが、めちゃくちゃデプロイが遅くて気がくるってしまう
cloudflare workers で firestore 相当のものを作る
- ほしい機能
- 永続化層がある
- DB にスキーマがある
- RDBMS が使える
- web api server が書ける
- websocket による変更のリアルタイム通知ができる
- デプロイが高速である
それぞれ workers にある機能に割り当てる
- 永続化層 --> D1
- DB にスキーマがある --> drizzle によるスキーマファースト開発
- web api server が書ける --> cloudflare workers そのもの
- websocket による変更のリアルタイム通知 --> DurableObjects +
WebSocketPair
,Response
で達成可能 - デプロイが高速である --> cloudflare workers は v8 エンジンを積んでるので js を直でデプロイできることから高速である(コンテナ技術を使用していない)
いけそう
cloudfalre workers ってサーバーレスだから global に状態持てなくない?
websocket で client side とコネクションを張りっぱなしにしておくためには WebSocketPair
を接続が破棄される or 接続を破棄するまでは保持しておかないといけない。
しかし、普通にサーバーレスでは in-memory で状態を持っていても次々に別のプロセスでサーバーが起動するので同じ in-memory を参照できないので connection を保持することができない。
そこで登場するのが DurableObjects という機能。
workers が stateless な function だとすると DurableObjects は a singleton の class instance とみることができる。(実際に class で記述するし....
アプリケーションがそのクラスの名前付きインスタンス(Cloudflareネットワーク全体を通して必ずユニークなものになります)を作成できます。そのインスタンスが1つのDurable Objectであり、Workers(と他のDurable Objects)はもとのDurable Objectに対し、ID経由でメッセージを送ることができます。Durable Objectは送られてきたメッセージを順番にシングルスレッドで処理し、メッセージ間の調整を行います。
DurableObjects の強みは強整合であること、トランザクション系に強そうだなと思っている。DB を使わずに状態を表現できるので counter を簡単に作ることができる。
singleton とみることができるといったがブラウザの window みたいなところに生えるわけではなく、fetch
を返して network 越しに singleton instance の method をたたくという I/F になっている。
つまり Workers KV は結果整合、 Durable Objects は強整合とのこと。地理的に離れてる場合はどうなるんだろう?と思ったんですが、次のような記述があります。
適当な Counter を作るならこう(下にコードを用意した)。動かしたかったらこちらで
Counter
import { Hono } from "hono";
export { Counter } from "./counter";
type Bindings = {
COUNTER: DurableObjectNamespace;
};
const app = new Hono<{ Bindings: Bindings }>();
app.get("*", async (c) => {
const id = c.env.COUNTER.idFromName("A");
const obj = c.env.COUNTER.get(id);
const resp = await obj.fetch(c.req.url);
if (resp.status === 404) {
return c.text("404 Not Found", 404);
}
const count = parseInt(await resp.text());
return c.text(`Count is ${count}`);
});
export default app;
import { Hono } from "hono";
export class Counter {
value: number = 0;
state: DurableObjectState;
app: Hono = new Hono();
constructor(state: DurableObjectState) {
this.state = state;
this.state.blockConcurrencyWhile(async () => {
const stored = await this.state.storage?.get<number>("value");
this.value = stored || 0;
});
this.app.get("/increment", async (c) => {
const currentValue = ++this.value;
await this.state.storage?.put("value", this.value);
return c.text(currentValue.toString());
});
this.app.get("/decrement", async (c) => {
const currentValue = --this.value;
await this.state.storage?.put("value", this.value);
return c.text(currentValue.toString());
});
this.app.get("/", async (c) => {
return c.text(this.value.toString());
});
}
async fetch(request: Request) {
return this.app.fetch(request);
}
}
firestore 相当のものを作る
- Hono と drizzle で CRUD を作る
a. drizzle-orm + d1 で table と client を作る
b. middleware で認証入れたり cache する - subscription api を作って websocket で変更を通知する
- 変更通知対象は DurableObjects で保持する
1. Hono で CRUD を作る
import { zValidator } from "@hono/zod-validator";
import { drizzle } from "drizzle-orm/d1";
import { Hono } from "hono";
import { cors } from "hono/cors";
type Bindings = {
readonly DB: D1Database;
};
type Environment = {
readonly Bindings: Bindings;
};
const app = new Hono<Environment>();
app.use("/api/*", cors());
app.get("/api/posts", async (c) => {
const db = drizzle(c.env.DB);
const data = await db.select().from(posts).all();
return c.json(data);
});
app.post("/api/post", zValidator("json", z.object({ title: z.string(), body: z.string() })), async (c) => {
const res = c.req.valid("json");
const db = drizzle(c.env.DB);
await db
.insert(posts)
.values({
title: res.title,
body: res.body,
createdAt: new Date(),
})
.run();
const result = await db.select().from(posts).all();
return c.json(result);
});
export default app;
2. subscription api を作って websocket で変更を通知する
type Bindings = {
readonly DB: D1Database;
+ readonly SHARED_EVENT: DurableObjectNamespace;
};
const sharedEvent = (c: Context<Environment>) => (type: string) => {
const doId = c.env.SHARED_EVENT.idFromName(type);
return c.env.SHARED_EVENT.get(doId);
};
app.get("/subscribe/posts", async (c) => {
const obj = sharedEvent(c)("posts");
const response = await obj.fetch(new URL("/events", c.req.url), {
headers: c.req.headers,
});
return response;
});
3. 変更通知対象は DurableObjects で保持する
WebSocketPair
を保持する DurableObjects を作る
import { Hono } from "hono";
import { wsupgrade } from "../middleware";
export class SharedEvent implements DurableObject {
private readonly app = new Hono();
private readonly sessions = new Set<WebSocket>();
constructor(private readonly state: DurableObjectState) {
this.app.get("/events", wsupgrade(), async (c) => {
const pair = new WebSocketPair();
this.handleSession(pair[1]);
return new Response(null, { status: 101, webSocket: pair[0] });
});
this.app.post("/event", async (c) => {
const data = await c.req.json();
const json = JSON.stringify(data);
for (const socket of this.sessions) {
socket.send(json);
}
});
}
private handleSession(socket: WebSocket): void {
socket.accept();
this.sessions.add(socket);
socket.addEventListener("close", () => {
this.sessions.delete(socket);
socket.close();
});
}
fetch(request: Request) {
return this.app.fetch(request);
}
}
あとは workers の endpoint に post されたときに SHARED_EVENT
に対して /event
を post することで /subscribe/xxx
から client side に websocket で通知が送られる。
余談
- websocket の connection を何個同時接続できるのか実際にリクエストしてみた結果。7000 個くらいが限界っぽい。詳しくは調べてはない。
/* eslint-disable import/no-extraneous-dependencies */
/* eslint-disable no-console */
import { client as WebSocket } from "websocket";
import type { connection as Connection } from "websocket";
const main = async () => {
const waitlist = new Set<string>();
const connections = new Set<Connection>();
process.on("SIGINT", () => {
for (const connection of connections) {
connection.close();
console.log(connection.state);
}
process.exit(0);
});
const receiveCountMap = new Map<string, number>();
{
const id = setInterval(() => {
console.log("waitlist", waitlist.size);
if (waitlist.size === 0) {
clearInterval(id);
}
}, 5000);
}
{
const id = setInterval(() => {
if (receiveCountMap.size === 0) return;
console.log("size", receiveCountMap.size);
}, 5000);
}
new Array(10000).fill(null).forEach(async (_, idx) => {
const key = `client[${idx}]`;
waitlist.add(key);
const client = new WebSocket();
client.connect("wss://<worker-url>/subscribe/posts");
// client が connected になるまで待機
const connection = await new Promise<Connection>((resolve, reject) => {
client.on("connect", (con) => resolve(con));
// client.on("connectFailed", (error) => reject(error));
});
connections.add(connection);
waitlist.delete(key);
connection.on("message", (message) => {
if (message.type === "utf8") {
const count = receiveCountMap.get(key) ?? 0;
receiveCountMap.set(key, count + 1);
}
});
connection.on("error", (error) => {
console.error(`error[${key}]`, error);
});
connection.on("close", (close) => {
console.error(`close[${key}]`, close);
});
});
};
void main();
実装によるかもだけど同時接続 7000 くらいで durable object の fetch が止まってる?
waitlist 9540
waitlist 6741
waitlist 3667
waitlist 2517
waitlist 2487
waitlist 2487
waitlist 2437
waitlist 2435
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2426
waitlist 2425
waitlist 2425
waitlist 2425
10000 件 websocket のコネクション飛ばして残り 2425 個以降進まない
size 7533 帰ってきた message の数は 7533 個だから結構落ちるなぁ
connected すらも行かないのかー
Discussion
面白い記事ありがとうございます!おそらくタイポかと思います!
WebSocketPair
ありがとうございます!直しておきます