gqlgenでGraphQLのSubscriptionリゾルバを実装する
はじめに
2023年5月頃に実装していたアプリケーションについての話です。
リポジトリはこちらです。
gqlgenを使ってQueryやMutationのリゾルバは実装したことがあるのですが、Subscriptionのリゾルバを実装したことがなかったため、試したいと考えていました。
その時期に会社総会の運営も担当しており、ワークショップとして会社や組織、一緒に働くメンバーに関するクイズを企画していたので、そこで使えるアプリケーションを作ることにしました。
なお、gqlgenのバージョンは以下のとおりです。
github.com/99designs/gqlgen v0.17.35
gqlgenの基本的な使い方については省きます。
そちらに関してはBuilding GraphQL servers in golang — gqlgenをご覧ください。
想定読者
- GraphQLのQuery、Mutation、リゾルバなどの概念が分かる方
- Go言語の基本的な知識をお持ちの方
アプリケーションの概要
実装対象のイメージしやすくなるかと思うのでアプリケーションについて軽くご紹介します。
アプリケーションにアクセスすると下記の画面が表示されます。
画面上部にあるlobbiesのリンクをクリックすると、ロビー一覧画面へと遷移します。
ロビー一覧画面からロビーを選択することで、クイズセッションに参加できます。
クイズセッションの参加者側の画面は以下のようになっています。
問題文が表示され、それに対する回答を入力して送信するシンプルな仕組みになっています。
クイズセッションの管理者は次の管理画面にアクセスできます。
管理画面には
- ロビーステータスの切り替え
- クイズの配信
- 採点
これらの機能があります。
このうち、セッションステータスの切り替えとクイズの配信部分にGraphQLのSubscriptionを使っています。
Subscriptionの実装のポイント
大まかな流れを図に示してみました。
システムの概観
ユーザー
上述したように、参加者と管理者の2種類のユーザが存在します。
システム
図にはApplicationとRedisの2種類が登場しています。
Publish/Subscribeの機構があれば、Redisでなくとも良いです。
実際にはMySQLもありますが、Subscriptionの実装部分に焦点を当てるために割愛しています。
参加者はApplicationに対してsubscriptionを実行し、Applicationからデータを待ち受けます。
ApplicationはRedisのchannelをSubscribeしてメッセージを待ちます。
管理者がApplicationに対してmutationを実行すると、ApplicationはRedisのchannelに対してメッセージを送ります。
これにより、参加者に対してデータの変更がリアルタイムに届くことになります。
WebSocketトランスポートの追加
公式ドキュメントを参考にします。
ドキュメントのとおり、GraphQLサーバのハンドラに対してAddTransport
メソッドでWebSocketを追加します。
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &graph.Resolver{}}))
srv.AddTransport(&transport.Websocket{})
GraphQLスキーマ定義とリゾルバの作成
実際のアプリケーションにはcurrentQuestion
とlobbyStatus
の2種類のSubscriptionがありますが、返り値がシンプルなlobbyStatus
を題材に説明していきます。
下記のようなスキーマを定義します。
type Subscription {
lobbyStatus(lobbyId: ID!): LobbyStatus!
}
enum LobbyStatus {
WAITING
ACTIVE
FINISHED
}
その後、gqlgen generate
を実行します。
go run github.com/99designs/gqlgen generate
リゾルバの雛形が生成されますので、これを修正しながら実装を進めていきます。
func (r *subscriptionResolver) LobbyStatus(ctx context.Context) (<-chan model.LobbyStatus, error) {
panic(fmt.Errorf("not implemented: LobbyStatus - lobbyStatus"))
}
リゾルバからチャネルへ値を送信する部分
参加者側のリクエストの処理部分です。
まずはシンプルに実装します。
func (r *subscriptionResolver) LobbyStatus(ctx context.Context) (<-chan model.LobbyStatus, error) {
modelCh := make(chan model.LobbyStatus) // (1)
go func () {
time.Sleep(1 * time.Second)
modelCh <- model.LobbyStatusActive // (2)
}()
return modelCh, nil // (1)
}
LobbyStatusメソッドの返り値は(<-chan model.LobbyStatus, error)
になっているため、適した型のchannelを生成して返却します。(1)
(1)の状態で、このメソッドを呼び出したクライアントはサーバから送られるデータを待ち受ける状態になります。
(2)では実際に値を送信します。値は非同期に取得することになるためgoroutineを使います。(1秒間のスリープ処理は、この後実装する処理のダミーです)
この例では値がmodel.LobbyStatusActive
から変わることがなくリアルタイムに取得できる感覚が掴みづらいかもしれません。
gqlgenのサンプルでは1秒ごとに時刻を返す例が載っているので、そちらを試してみるのも良いと思います。
Subscriptions — gqlgen
LobbyStatusの切り替えイベントを発生させる
管理者側のリクエストの処理部分です。
LobbyStatusを変更するためのmutationをGraphQLスキーマに定義します。
type Mutation {
publishLobbyStatus(lobbyId: ID!, status: LobbyStatus!): PublishLobbyStatusPayload!
}
type PublishLobbyStatusPayload {
lobby: Lobby!
status: LobbyStatus!
}
先程と同じくgqlgen generate
を実行して生成されたリゾルバに修正を加えます。
func (r *mutationResolver) PublishLobbyStatus(ctx context.Context, lobbyID string, status model.LobbyStatus) (*model.PublishLobbyStatusPayload, error) {
s := domain.LobbyStatus(strings.ToLower(string(status)))
if !s.Valid() {
return nil, fmt.Errorf("invalid lobby status: %s", status)
}
err := r.LobbyInteractor.PublishLobbyStatus(ctx, lobbyID, s) // (1)
if err != nil {
return nil, err
}
lobby, err := r.LobbyInteractor.FetchLobby(ctx, lobbyID)
if err != nil {
return nil, err
}
return &model.PublishLobbyStatusPayload{
Lobby: &model.Lobby{
ID: lobby.ID,
Name: lobby.Name,
Public: lobby.IsPublic,
OwnerUID: lobby.OwnerUID,
},
Status: status,
}, nil
}
(1)の部分でLobbyInteractor
のPublishLobbyStatus
メソッドを呼び出しています。
(1)は最終的にLobbyRepository
のPublishLobbyStatus
メソッドに到達し、内部でRedisに対してPublishを実行します。(2)
func (r *LobbyRepository) PublishLobbyStatus(ctx context.Context, id string, status domain.LobbyStatus) error {
key := r.lobbyStatusKey(id)
err := r.RedisHandler.Set(ctx, key, status, 0)
if err != nil {
return fmt.Errorf("failed to publish lobby status: %w", err)
}
err = r.RedisHandler.Publish(ctx, key, status) // (2)
if err != nil {
return fmt.Errorf("failed to publish lobby status: %w", err)
}
return nil
}
リゾルバへ値を伝播する
再びsubscriptionのリゾルバに戻ってきました。
上述したPublishのイベントを検知して値をリゾルバまで届けるための実装です。
func (r *subscriptionResolver) LobbyStatus(ctx context.Context, lobbyID string) (<-chan model.LobbyStatus, error) {
ch := make(chan domain.LobbyStatus)
go func() {
err := r.LobbyInteractor.SubscribeLobbyStatus(ctx, lobbyID, ch) // (1)
if err != nil {
fmt.Errorf(err)
return
}
}()
modelCh := make(chan model.LobbyStatus)
go func() {
for s := range ch { // (2)
m := model.LobbyStatus(strings.ToUpper(string(s)))
select {
case modelCh <- m: // (3)
fmt.Println("sent lobby status")
default:
fmt.Println("channel closed")
return
}
}
}()
return modelCh, nil
}
(1)で、LobbyInteractor
のSubscribeLobbyStatus
メソッドを呼び出します。
LobbyInteractor.SubscribeLobbyStatus
は引数にchannelを受け取り、内部的にRedis Pub/Subからメッセージを受け取るとchannelに値を送信します。
(2)は(1)で渡したchannelのデータを待ち受けています。
subscriptionを実行したクライアントがまだ接続されている場合は(3)のケースに入り、クライアントに対してデータが渡されます。
Redis Pub/Subからメッセージを受け取る
リゾルバへ値を伝播するのセクションで説明を省いてしまったLobbyRepository
のSubscribeLobbyStatus
の実装です。
func (r *LobbyRepository) SubscribeLobbyStatus(ctx context.Context, id string, ch chan<- domain.LobbyStatus) {
channelID := r.lobbyStatusKey(id)
pubsub := r.RedisHandler.Subscribe(ctx, channelID) // (1)
go func() {
event := pubsub.Channel() // (2)
for e := range event {
payload := e.Payload // (3)
select {
case ch <- domain.LobbyStatus(payload): // (4)
default:
fmt.Println("failed to send lobby status")
pubsub.Close() // (5)
return
}
}
}()
}
channelを引数に取り、LobbyStatusのsubscriptionリゾルバと似たような形をしています。
違いとしては、(1)と(2)でRedisの特定のChannelを購読している点です。
管理者側の処理の実装の説明で触れたPublishLobbyStatus
が実行されると、イベントが流れてきます。
(3)でイベントのペイロードという形でstring型のメッセージを受け取ることができます。
メッセージにはLobbyStatus
の値が入っているため、(4)で型変換してデータを送信します。
送信先のchannelが既に閉じていた場合はRedisへのChannel接続も閉じるようにします。
まとめ
gqlgenでのsubscriptionのリゾルバの実装について、実際の成果物をもとにかいつまんでご紹介しました。
subscriptionのリゾルバはchannelを返すというのが面白かったです。
リゾルバがどこでchannelをクローズしているかなども気になるので、gqlgenのソースを追って調べてみようと思っています。
InteractorやRepositoryといったレイヤー分割をしている関係でchannelの引き渡しが発生し、多少複雑な実装となってしまったのは反省点です。
gqlgenではWebSocketの代わりにServer-Side Eventsというサーバプッシュの仕組みを使うこともできるようです。
WebSocketに比べていくつか制約があるようですが、こちらもいずれ試したいと思います。
Discussion