🤖

honoとD1、Drizzle、react(vite)を使ってPubSubモデルのメッセージングシステムを作る

2024/07/05に公開

オープニング

こんにちは、安藤です。

Medicalforceでは週1回勉強会を行なっております。
今回は「MQTTを使ってPubSubモデルのメッセージングシステムを作る」という件についてお勉強して行きます
チャットシステムの具体例を交えてご紹介します。

イントロダクション

なぜこのテーマを選んだのか?

私は高専〜大学院までIoTとインターネットの研究し、学会をみたり聞いたりしました。
最近、D1というDBがリリースされ、IoTプラットフォームと相性良さそうなので、試してみました。
MQTTと組み合わせてを作ってみました。
特に、D1はリードレプリカを簡単に作成できるため、IoTの分析機能を強化する手段として非常に良さそうですね。

MQTTの基礎

MQTTとは?

MQTT(エムキューキューティーティー)は、IoT向けのOASIS標準メッセージングプロトコルで、非同期に1対1、1対n、n対nの通信が可能です。特に、メモリの制約が厳しかったり、ネットワーク帯域幅が限られている環境でのリアルタイムなメッセージングに適しています。

mqttの構成

主な用語

  1. ブローカー - MQTTの基幹システムで、リアルタイムで双方向通信が可能。
  2. トピック - メッセージのカテゴリ。例: /medicalforce/medicalforce/shibuya
  3. サブスクライブ - 特定のトピックに関する情報を購読すること。
  4. パブリッシュ - 特定のトピックに情報を公開すること。
  5. QoS - メッセージの品質レベル。
    1. QoS 0: 最大一回送信し、失敗してもリトライしない。
    2. QoS 1: 最小一回送信する。
    3. QoS 2: 絶対に一回送信する。

ユースケース紹介

家電メーカーのシステム担当として、家の家電の利用状況をリアルタイムに知りたい場合を考えます。コンセントからの電力使用状況を測る機器からサーバーにデータを送り、管理画面にリアルタイムで情報を表示するシステムをMQTTで実装します。

MQTTブローカーを使い、デバイスからの情報をリアルタイムで管理画面やサーバーに配信します。管理画面から特定のトピックにサブスクライブし、デバイスがパブリッシュした内容を受け取ります。

実践デモ

実践してみました。
構成図

コードの説明

以下は、メッセージ投稿のサンプルコードです。

/*
 * メッセージ 投稿
 */

type CreateMessageDto = {
  message: string;
  roomId: number;
};
app.post("/message", async (c: Context) => {
  const params = await c.req.json<CreateMessageDto>();

  const messageText = params.message;
  const roomId: number = params.roomId;

  const session = await c.get("session");
  const userId = (await session.get("id")) as string;

  const db = drizzle(c.env.DB);
  const result = await db
    .insert(message)
    .values({ roomId, message: messageText, userId })
    .execute();
  const messageId = result.meta.last_row_id;

  // TODO: ハードコーディングしちゃったけど、ここは環境変数とかで設定できるようにしたい
  const client = await mqtt.connectAsync("mqtt://localhost:15675");
  await client.publishAsync(`room/${roomId}`, "you should reload message");

  return c.json({ message: "created", messageId });
});
  useEffect(() => {
    const topic = `room/${roomId}`;

    // TODO: ハードコーディングしちゃったけど、ここは環境変数とかで設定できるようにしたい
    const client = mqtt.connectAsync("mqtt://localhost:15675", {});
    client.then((client) => {
      client.subscribe(topic, function (err) {
        if (!err) {
          client.publish(topic, "Hello mqtt");
        }
      });

      client.on("message", async function (topic, message) {
        // message is Buffer
        const messageText = message.toString();
        console.log({ messageText });
        if (topic !== `room/${roomId}`) return;
        await messages.execute({ roomId });
        const element = document.documentElement;
        const bottom = element.scrollHeight - element.clientHeight;
        window.scroll(0, bottom);
        // client.end();
      });
    });
    return () => {};
  }, [roomId]);

デモ動画

実際にチャットシステムをリアルタイムでやり取りするデモをお見せします。
https://github.com/takashiAg/Mqtt-chat/blob/main/docs/preview.gifからみてください!

なぜmqttに直接メッセージを流さないの?

mqttはtopicさえ指定すれば情報を購読できる
→情報漏洩につながる
なので、mqttにはリロード依頼のみを投げるようにするべきだと考えたからです

成果物

githubにあります!

最後に

まとめ

mqttを使ってシステムのリアルタイム性を向上させることができました!

やらかし

まじで本質と関係ないところに時間を使ってしまった。
bcryptとhashとjwtの関数をフルスクラッチで書いて認証サーバーを立てたこと。
どう考えてもauth0を使えば5分で終わった。

Discussion