🔖

WebSocket 接続のハンドリング (2) Fanout を動かしてみる

2024/12/09に公開

この記事は Fastly Compute 一人アドベントカレンダー 2024 9 日目の記事です。

Fastly Compute の特徴的な機能である WebSocket の利用方法について紹介するシリーズ(WebSocket 接続のハンドリング)の第二回目です。本稿ではリアルタイムメッセージング処理について豊富な機能を持つ Fanout について、特に WebSocket のハンドリングに焦点を当てて概要を紹介します。

仕組み

Fanout をおおまかに一言で説明すると、「WebScoket や HTTP Long polling などのリアルタイムメッセージングの開発・実装を容易にするリバプロ。そのオープンソース実装である Pushpin の商用版」です。動作の概要図は以下のようなイメージで、 Pushpin とバックエンドサーバのやり取りは Generic Realtime Intermediary Protocol (GRIP) というプロトコルで定義された REST API を通して実現しています。

例えば以下のような特徴があります;

  • 図の右手に示されるバックエンドサーバは WebSocket を喋らずに REST API のみを喋れれば OK
  • WebSocket サーバの責務を分離できるので、バックエンド側から見ると接続管理やルーティング等の WebSocket 固有の問題を Pushpin(Fanout) にオフロードできる

WebSocket というと過去の C10K 問題をはじめとした運用面での懸念や議論が印象にある方も中にはいらっしゃるかと思うのですが(著者がまさにそうです)、Fanout のような仕組みがあるおかげでスケーラビリティを担保する部分をオフロードできる(そしてオフロード先が同時接続や負荷に対して耐性の高いエッジネットワークである)ことで WebSocket を導入する敷居が下げられる点は特筆しておきたいポイントです。

最小限のコードをデプロイしてみる

ここで Pushpin の詳細に入っていく前に、一度実際にサンプルコードを通して動かしながら仕組みを学んでみましょう。下記コードは Go の Fanout starter kit のうち最小限の動作に必要なコードに絞った簡易版のサンプルです(エラー処理など基本的なフローの理解に必要な処理を省いています)。仮にこのコードを含む新規の Compute サービスをビルド・デプロイしたとします。

package main

import (
  "context"
  "fmt"
  "io"
  "strings"

  "github.com/fastly/compute-sdk-go/fsthttp"
  "github.com/fastly/compute-sdk-go/x/exp/handoff"
)

func wsText(msg string) []byte {
  return []byte(fmt.Sprintf("TEXT %02x\r\n%s\r\n", len(msg), msg))
}

func main() {
  fsthttp.ServeFunc(func(ctx context.Context, w fsthttp.ResponseWriter, r *fsthttp.Request) {
    // Request is a test request - from client, or from Fanout
    if strings.HasSuffix(r.URL.Host, ".edgecompute.app") && r.URL.Path == "/test/websocket" {
      if len(r.Header.Get("Grip-Sig")) > 0 {
        reqBody, err := io.ReadAll(r.Body)
        if err != nil {
          w.WriteHeader(fsthttp.StatusBadRequest)
          fmt.Fprintf(w, "Failed to read body.\n")
          return
        }
        respBody := []byte("")
        w.Header().Set("Content-Type", "application/websocket-events")

        if strings.HasPrefix(string(reqBody[:6]), "OPEN\r\n") {
          w.Header().Set("Sec-WebSocket-Extensions", "grip; message-prefix=\"\"")
          respBody = append(respBody, []byte("OPEN\r\n")...)
          respBody = append(respBody, wsText("c:{\"type\":\"subscribe\",\"channel\":\"my_test_channel\"}")...)
        } else if strings.HasPrefix(string(reqBody[:5]), "TEXT ") {
          respBody = append(respBody, wsText(fmt.Sprintf("You said: %s", reqBody))...)
        }
        w.WriteHeader(fsthttp.StatusOK)
        w.Write(respBody)
        return

      } else {
        // Not from Fanout, route it through Fanout first
        handoff.Fanout("self")
        return
      }
    }
    return
  })
}

上記コードでまず気がつくのは、コードの後半に handoff.Fanout("self") という処理があることです。コメントと、先ほどサービスをデプロイする際に self という名前で自分自身を指す Backend を設定したことを踏まえると、これは Fanout に対して処理をハンドオフ(移譲)した後、Fanout によって自分自身が再度呼び出される(そしてコードから類推すると、その際に Grip-Sig ヘッダが付加される)ようです。実はこれでこのコードの大まかな処理が追えたことになります。すなわち、

  • "https://your-fanout-example.edgecompute.app/test/websocket" というバックエンドに対して、WebSocket クライアントから接続があるとまずは Fanout に処理が移譲される
  • 移譲される際に Grip-Sig ヘッダが付加された状態で自分自身が呼び出しされ、上記コードの中ほどにある "application/websocket-events" 型の Content-Type をもつレスポンスボディを生成して返却する

wscat を使って動作をみていく

ざっくりとコードの流れが追えたところで、次に wscat コマンドを使って実際に WebSocket のメッセージをブロードキャストされる様子を紹介します。初めに一つ目のターミナルで以下のように wscat に接続先 URL として上記コードがデプロイされたサービスを指定してコマンドを実行すると WebSocket の接続が張られた様子が確認できます。

$ wscat -c https://your-fanout-example.edgecompute.app/test/websocket
Connected (press CTRL+C to quit)
> 

この以下の状態でターミナルに何か適当な文字列を入力すると、以下のように You said:... という文字列が入力した文字列とともに表示されることが確認できます。前述のコードの else if strings.HasPrefix(string(reqBody[:5]), "TEXT ") という分岐の処理で該当する You said:... という文字列処理をしている部分が対応する処理になります。ということは、これを使うと WebSocket クライアントからの入力(例えばチャットのメッセージ)を受けつけて、DB に保存するなりバックエンドサーバに別の API コールで何か処理をキックする、といったコードもこのタイミングに差し込むことができそうです。

$ wscat -c https://your-fanout-example.edgecompute.app/test/websocket
Connected (press CTRL+C to quit)
> hi
< You said: TEXT b
hi

> 

さきほどはユーザからのチャットを受け付けるようなイメージのサンプルを紹介しましたが、次にバックエンドサーバから何かメッセージを特定の channel に送信する場合をイメージして、Pushpin(ここでは Fanout)に対して REST API を呼び出すことで WebSocket メッセージを送信してみます。具体的にどのようにメッセージを送れるかについては、Fanout の場合はこちらに API Endpoint の定義があります。今回はこれに従って、以下のようなサンプルのメッセージを別ターミナルから送ります。

$ curl -X POST "https://api.fastly.com/service/<YOUR_SERVICE_ID>/publish/" \
-H "Fastly-Key: <YOUR_API_KEY>" \
-H "Content-Type: application/json" \
-H "Accept: text/plain" \
-d "{\"items\":[{\"channel\":\"test\",\"formats\":{\"ws-message\":{\"content\":\"hello from backend server\"}}}]}"
Published

すると、先に開いていたターミナルで実行されている wscat の方でメッセージが受信され、以下のようにターミナルの表示が変わっていることが確認できます。メッセージをブロードキャストするようなイメージの処理例となりますが、REST API 越しに json 形式のメッセージを送ることで WebSocket 上でブロードキャストができていますね。もし興味があれば他の PC や端末で同様に WebSocket 接続を確立して本当にメッセージがブロードキャストされているか確認してもいいかもしれません。

$ wscat -c https://your-fanout-example.edgecompute.app/test/websocket
Connected (press CTRL+C to quit)
> hi
< You said: TEXT b
hi

< hello from backend server
> 

使い込む時の手引き

本稿では簡単な動作例を通して Fanout の動作のイメージを説明してきましたが、GRIP の詳細(ヘッダの意味など)や送受信した json 形式のメッセージの定義、WebSocket 以外のクライアントとの通信手段等(Server-Sent EventsやLong Polling等) Pushpin の豊富な機能についてまだ触れていない部分が数多くあります。本稿でその全てを紹介することは紙面と時間の都合で叶いませんが、本節では実際にアプリケーションを構築する際に参考になるリソースや資料を紹介していきます。

まず下記 Developer Doc では Subscribe や Publishing(ブロードキャスティング) のより詳しい動作や本稿では詳しく触れてこなかった GRIP 関係のヘッダの取り扱いなどが記載されています。

https://www.fastly.com/documentation/guides/concepts/real-time-messaging/fanout/

併せて読みたいのが Fanout の日本人開発者の @harmony7 さんの下記記事です。英語の記事にはなりますが、Fanout の仕組みについて WebSocket に留まらずにどのように活用してリアルタイムメッセージングのインフラとして活用できるかについてわかりやすくかつ詳しく説明されています。

https://dev.to/fastly/supercharge-your-api-with-realtime-push-powers-using-fastly-fanout-3ba0

また @harmony7 さんは Fanout のサンプルアプリも複数開発・公開されているのでこちらも参考になる部分が多いかと思います。

Fastly 公式のスターターキットも Rust, Go, JavaScript の各言語版が用意されていますのでこちらもリンクを貼っておきますね。

まとめ

本稿ではリアルタイムメッセージングの豊富な機能を持つ Fanout について、特に WebSocket のハンドリングに焦点を当てて概要を紹介しました。

次回は Fastly が昨年リリースした KV Store について、今年 API の追加があり更に便利に使えるようになったので、その概要について紹介します。

Discussion