🎉

gqlgenによるGraphQL Subscription実装

2023/07/04に公開

はじめに

こんにちは株式会社スマートショッピング ソフトウェアエンジニアの葛西伸樹です。
現在のプロジェクトでGraphQLライブラリのgqlgenを使用しているのですが、GraphQLにおけるリアルタイム通信用のコマンドであるSubscriptionに関する知識がなかったので簡単に実装してその内容をまとめました。

この記事で説明しないこと

この記事ではSubscriptionの仕組みと実装方法に焦点を当てているので

  • GraphQLの概要
  • gqlgenによるQuery, Mutationの実装方法

については説明を省略します。

Subscriptionとは

SubscriptionはGraphQLのデータ取得操作の一つで、リクエストを投げて結果を取得するQueryとは違いコネクションを接続することでサーバー側からのプッシュ通知によるデータ取得が可能になっています。
SubscriptionではWebSocket通信によってクライアント側とサーバー側を接続し、サーバーからクライアントに対してデータをプッシュ通知します。
参考:Apollo Docs/Subscriptions

実装

ここからは弊社のプロダクトであるスマートマットを題材にしてgqlgenによる実装について説明していきます。
スマートマットは重量を計測して閾値を下回ったときに自動で発注をする機能などを有するIoTプロダクトで、今回はスマートマットで計測された最新の重量を取得できるSubscriptionを実装していきます。
コードはこちらで公開しているのでよければ手元で動かしてみて下さい。

説明としてはマット登録、重量取得、重量更新それぞれのQuery及びMutationは実装済みの前提で、重量が更新された時に最新の重量を取得できるSubscriptionを追加していきます。

マット登録Mutation
mutation {
  createSmartMat(currentWeight: $weight) {
    id,
    currentWeight
  }
}
重量取得Query
query {
  smartMats {
    id
    currentWeight
  }
}
重量更新Mutation
mutation {
    updateSmartMatWeight(id: $id, currentWeight: $weight) {
        id
        currentWeight
    }
}

処理の流れ

  1. GraphQLサーバー起動時にマットIDに対応するチャネルを保持するmapを作成する
  2. Subscriptionリクエスト時に重量更新イベントを受け取るチャネルを作成してmapに格納する
  3. 重量更新mutationがリクエストされた時にチャネルに対して重量データをpublishする
  4. subscribeしているクライアントに対して最新重量が送信される

コード

GraphQLサーバー起動処理

サーバー起動時にマットIDに対応するチャネルを保持するmapを作成しています。
またSubscriptionを使用する場合はNewDefaultServerの引数を修正する必要があるので注意です。

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = defaultPort
	}

	// Subscriptionを使うときはNewDefaultServerの引数を修正する
	// ここで作成しているChannelsByMatIDでマットIDに対応するチャネルを保持している
	srv := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{Resolvers: &graph.Resolver{
		ChannelsByMatID: make(map[int64][]chan<- *model.SmartMat),
		Mutex:           sync.Mutex{},
	}}))

	http.Handle("/", playground.Handler("GraphQL playground", "/query"))
	http.Handle("/query", srv)

	log.Printf("connect to http://localhost:%s/ for GraphQL playground", port)
	log.Fatal(http.ListenAndServe(":"+port, nil))
}

Subscription

SubscriptionではMutexで排他制御をした上でmapへのチャネルの追加とコネクション終了時のチャネル削除を行っています。

func (r *subscriptionResolver) SmartMatWeightUpdated(ctx context.Context, id int64) (<-chan *model.SmartMat, error) {
	// Mutex で ChannelsByMatID の操作を排他制御
	r.Mutex.Lock()
	defer r.Mutex.Unlock()

	// マットIDに対応するチャネルを追加
	ch := make(chan *model.SmartMat, 1)
	r.ChannelsByMatID[id] = append(r.ChannelsByMatID[id], ch)

	// コネクション終了時にチャネルを削除
	go func() {
		<-ctx.Done()
		r.Mutex.Lock()
		defer r.Mutex.Unlock()
		for i, c := range r.ChannelsByMatID[id] {
			if c == ch {
				r.ChannelsByMatID[id] = append(r.ChannelsByMatID[id][:i], r.ChannelsByMatID[id][i+1:]...)
				break
			}
		}
	}()

	return ch, nil
}

Mutation

Mutationではdb更新後にmapに格納しているチャネルを取得し、最新の重量情報をpublishしています。

func (r *mutationResolver) UpdateSmartMatWeight(ctx context.Context, id int64, currentWeight float64) (*model.SmartMat, error) {
	// db更新処理
	dbDriver := "mysql"
	dsn := "root:root@tcp(db:3306)/smart_shopping"
	db, err := sql.Open(dbDriver, dsn)
	if err != nil {
		return &model.SmartMat{}, err
	}
	defer db.Close()
	_, err = db.Exec("UPDATE smart_mats SET current_weight = ? WHERE id = ?", currentWeight, id)
	if err != nil {![](https://storage.googleapis.com/zenn-user-upload/74eac11f30a7-20230620.gif)
		return &model.SmartMat{}, err
	}
	if err != nil {
		return &model.SmartMat{}, err
	}

	// サブスクリプション更新処理
	r.Mutex.Lock()
	defer r.Mutex.Unlock()
	// 対応するMatIDのチャネルにPublish
	for _, ch := range r.ChannelsByMatID[id] {
		ch <- &model.SmartMat{
			ID:            id,
			CurrentWeight: currentWeight,
		}
	}

	return &model.SmartMat{
		ID:            id,
		CurrentWeight: currentWeight,
	}, err
}

動作確認

id:1のマットが作成済みの状態で、Subscriptionを実行した後に重量を更新するとSubscriptionの画面で重量が変更されています。

まとめ

以上がgqlgenによる簡単なSubscriptionの実装例になります。
弊社のプロダクトではより短い秒単位でのリアルタイムに在庫数を把握できるシステムを目指しており、Subscriptionが有効的に使える場面があると考えています。
ただし、Apollo Docs/Subscriptions/When to use subscriptionsでも言及されているようにほとんどの場合は定期ポーリングなどで解決できるので、機能要件や保守コストを考慮した上で選択する必要があります。

株式会社エスマット

Discussion