📑

WebSocketを使用したシグナリングサーバー

2023/06/15に公開

はじめに

本記事は、Go言語でWebSocketを使用し、シグナリングを行うプログラムの解説です。

WebRTCにおいてクライアント間の通信を確立するための情報交換を行います。

プログラムの解説

処理の流れ

処理の流れ

パッケージと変数

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strings"

	"github.com/gorilla/websocket"
)

まず、必要なパッケージをインポートします。

  • encoding/jsonパッケージは、JSONデータのエンコードとデコードを行うために使用されます。
  • logパッケージは、ログ出力を行うために使用されます。
  • net/httpパッケージは、HTTPサーバーを作成するために使用されます。
  • stringsパッケージは、文字列操作に使用されます。
  • github.com/gorilla/websocketパッケージは、WebSocket接続を扱うための機能を提供します。
var (
	upgrader = websocket.Upgrader{}

	clients              = make(map[*websocket.Conn]string)
	clientsByID          = make(map[string]*websocket.Conn)
	broadcast            = make(chan []byte)
	offerId       string = ""
	functions            = make(map[string]interface{})
	sdpData              = make(map[string]string)
	candidateData        = make(map[string][]string)
)

次に、変数の宣言を行います。

  • upgrader: WebSocket接続をアップグレードするためのwebsocket.Upgrader構造体。
  • clients: WebSocket接続のマップ。各クライアントの接続をキーとし、クライアントの識別子を値として持ちます。
  • clientsByID: クライアントの識別子をキーとし、WebSocket接続を値として持つマップ。
  • broadcast: メッセージのブロードキャストを行うためのチャネル。
  • offerId: 現在のオファーの識別子。
  • functions: メッセージのタイプに対応する関数を格納するマップ。
  • sdpData: オファーやアンサーのSDP(Session Description Protocol)データを格納するマップ。
  • candidateData: ICE(Interactive Connectivity Establishment)の候補情報を格納するマップ。

main関数

func main() {
	functions["connect"] = connect
	functions["offer"] = offer
	functions["answer"] = answer
	functions["candidateAdd"] = candidateAdd

	http

.HandleFunc("/ws", handleWebSocket)
	go handleMessages()

	http.ListenAndServe(":8080", nil)
}

main関数では、以下の処理を行います。

  1. シグナリングメッセージのタイプに対応する関数をfunctionsマップに登録します。
  2. /wsパスに対してWebSocket接続を処理するハンドラ関数handleWebSocketを登録します。
  3. メッセージの処理を行うためのゴルーチンであるhandleMessages関数を開始します。
  4. HTTPサーバーを起動し、ポート8080でリクエストを受け付けるようにします。

handleWebSocket関数

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
	conn, _ := upgrader.Upgrade(w, r, nil) // error ignored for sake of simplicity

	for {
		_, message, err := conn.ReadMessage()
		if _, ok := clients[conn]; !ok {
			// 新規接続
			var jsonStr = string(message)
			var data map[string]interface{}
			err := json.Unmarshal([]byte(jsonStr), &data)
			if err != nil {
				panic(err)
			}

			// idの登録
			id := data["id"].(string)
			clients[conn] = id
			clientsByID[id] = conn
		}

		if err != nil {
			log.Println(err)
			delete(clientsByID, clients[conn])
			delete(clients, conn)
			break
		}

		broadcast <- message
	}
	offerId = ""
	defer conn.Close()
}

handleWebSocket関数は、WebSocket接続を処理するためのハンドラ関数です。
以下の処理が行われます。

  1. upgrader.Upgrade関数を使用してWebSocket接続をアップグレードします。
  2. 接続が確立されると、無限ループが開始されます。
  3. メッセージを読み取り、クライアントが新規接続かどうかを確認します。もし新規接続なら、クライアントの識別子を取得し、clientsclientsByIDのマップに登録します。
  4. エラーが発生した場合は、エラーログを出力し、クライアントを削除してループを終了します。
  5. メッセージをbroadcastチャネルに送信します。
  6. ループを抜けた後、オファーの識別子をリセットし、接続を閉じます。

handleMessages関数

func handleMessages() {
	for {
		message := <-broadcast
		// text -> json
		var jsonStr = string(message)
		fmt.Println(jsonStr)
		var data map[string]interface{}
		err := json.Unmarshal([]byte(jsonStr), &data)
		if err != nil {
			panic(err)
		}

		

	// 処理分岐
		msgDataType := data["type"].(string)
		function := functions[msgDataType].(func(map[string]interface{}))
		function(data)
	}
}

handleMessages関数は、broadcastチャネルからメッセージを受け取り、処理を行うためのゴルーチンです。以下の処理が行われます。

  1. broadcastチャネルからメッセージを受け取ります。
  2. 受け取ったメッセージをJSON形式に変換し、dataマップにデコードします。
  3. メッセージのタイプに基づいて処理を分岐し、対応する関数を呼び出します。

connect関数

func connect(data map[string]interface{}) {
	resultData := make(map[string]string)
	
	id := data["id"].(string)
	client := clientsByID[id]

	// offerを送ってもらう
	if len(offerId) == 0 {
		offerId = id
		resultData["type"] = "offer"
		bytes := jsonToBytes(resultData)
		sendMessage(client, bytes)
		return
	} else if id == offerId {
		// 重複
		return
	}

	// offerを送る
	resultData["type"] = "offer"
	resultData["sdp"] = sdpData[offerId]
	resultData["target_id"] = offerId
	bytes := jsonToBytes(resultData)
	sendMessage(client, bytes)
}

connect関数は、クライアントが接続したときに実行される関数です。以下の処理が行われます。

  1. クライアントの識別子を取得し、クライアントのWebSocket接続を取得します。
  2. もし現在のオファーの識別子が空であれば、オファーを要求するためのメッセージを送信します。
  3. もしクライアントの識別子が現在のオファーの識別子と一致する場合は、重複として処理を終了します。
  4. それ以外の場合は、オファーを送信するためのメッセージを作成し、クライアントに送信します。

offer関数

func offer(data map[string]interface{}) {
	fmt.Println("[Offer]")
	id := data["id"].(string)
	sdp, _ := json.Marshal(data["sdp"])
	sdpData[id] = string(sdp)
}

offer関数は、オファーの受信時に実行される関数です。以下の処理が行われます。

  1. オファーの送信元のクライアントの識別子を取得します。
  2. オファーのSDPデータをJSON形式に変換し、sdpDataマップに格納します。

answer関数



func answer(data map[string]interface{}) {
	fmt.Println("[Answer]")
	id := data["id"].(string)
	sdp, _ := json.Marshal(data["sdp"])
	sdpData[id] = string(sdp)

	// 接続を確立する
	targetClient := clientsByID[id]
	resultData := make(map[string]string)
	resultData["type"] = "answer"
	resultData["sdp"] = sdpData[offerId]
	bytes := jsonToBytes(resultData)
	sendMessage(targetClient, bytes)
}

answer関数は、アンサーの受信時に実行される関数です。以下の処理が行われます。

  1. アンサーの送信元のクライアントの識別子を取得します。
  2. アンサーのSDPデータをJSON形式に変換し、sdpDataマップに格納します。
  3. オファー元のクライアントに対してアンサーを送信するためのメッセージを作成し、送信します。

candidateAdd関数

func candidateAdd(data map[string]interface{}) {
	fmt.Println("[Candidate]")
	id := data["id"].(string)
	candidates := data["candidates"].([]interface{})

	for _, candidate := range candidates {
		candidateData[id] = append(candidateData[id], candidate.(string))
	}

	targetClient := clientsByID[id]
	resultData := make(map[string]interface{})
	resultData["type"] = "candidate"
	resultData["candidates"] = candidateData[offerId]
	bytes := jsonToBytes(resultData)
	sendMessage(targetClient, bytes)
}

candidateAdd関数は、ICEの候補情報の受信時に実行される関数です。以下の処理が行われます。

  1. 候補情報の送信元のクライアントの識別子を取得します。
  2. 候補情報をcandidateDataマップに追加します。
  3. オファー元のクライアントに対して候補情報を送信するためのメッセージを作成し、送信します。

sendMessage関数

func sendMessage(client *websocket.Conn, message []byte) {
	err := client.WriteMessage(websocket.TextMessage, message)
	if err != nil {
		log.Println(err)
		client.Close()
		delete(clientsByID, clients[client])
		delete(clients, client)
	}
}

sendMessage関数は、指定されたクライアントにメッセージを送信するための関数です。以下の処理が行われます。

  1. client.WriteMessage関数を使用してメッセージを送信します。
  2. エラーが発生した場合は、エラーログを出力し、クライアントを削除します。

jsonToBytes関数

func jsonToBytes(data map[string]interface{}) []byte {
	bytes, _ := json.Marshal(data)
	return bytes
}

jsonToBytes関数は、指定されたマップをJSON形式に変換してバイト配列に変換するための関数です。

終わりに

私にとって初めてGo言語を使用した実装になるため、間違い等あるかもしれません。
そのときは教えていただけると幸いです。

全体のプログラム

コメント文に中国語が混じっていたり間違っている可能性があります。ご了承ください。

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strings"

	"github.com/gorilla/websocket"
)

var (
	upgrader = websocket.Upgrader{}

	clients              = make(map[*websocket.Conn]string)
	clientsByID          = make(map[string]*websocket.Conn)
	broadcast            = make(chan []byte)
	offerId       string = ""
	functions            = make(map[string]interface{})
	sdpData              = make(map[string]string)
	candidateData        = make(map[string][]string)
)

func main() {
	functions["connect"] = connect
	functions["offer"] = offer
	functions["answer"] = answer
	functions["candidateAdd"] = candidateAdd

	http.HandleFunc("/ws", handleWebSocket)
	go handleMessages()

	http.ListenAndServe(":8080", nil)
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
	conn, _ := upgrader.Upgrade(w, r, nil) // error ignored for sake of simplicity

	for {
		_, message, err := conn.ReadMessage()
		if _, ok := clients[conn]; !ok {
			// 新規接続
			var jsonStr = string(message)
			var data map[string]interface{}
			err := json.Unmarshal([]byte(jsonStr), &data)
			if err != nil {
				panic(err)
			}

			// idの登録
			id := data["id"].(string)
			clients[conn] = id
			clientsByID[id] = conn
		}

		if err != nil {
			log.Println(err)
			delete(clientsByID, clients[conn])
			delete(clients, conn)
			break
		}

		broadcast <- message
	}
	offerId = ""
	defer conn.Close()
}

func handleMessages() {
	for {
		message := <-broadcast
		// text -> json
		var jsonStr = string(message)
		fmt.Println(jsonStr)
		var data map[string]interface{}
		err := json.Unmarshal([]byte(jsonStr), &data)
		if err != nil {
			panic(err)
		}

		// 処理分岐
		msgDataType := data["type"].(string)
		function := functions[msgDataType].(func(map[string]interface{}))
		function(data)
	}
}

func connect(data map[string]interface{}) {
	resultData := make(map[string]string)
	
	id := data["id"].(string)
	client := clientsByID[id]

	// offerを送ってもらう
	if len(offerId) == 0 {
		offerId = id
		resultData["type"] = "offer"
		bytes := jsonToBytes(resultData)
		sendMessage(client, bytes)
		return
	} else if id == offerId {
		// 重複
		return
	}

	// offerを送る
	resultData["type"] = "offer"
	resultData["sdp"] = sdpData[offerId]
	resultData["target_id"] = offerId
	bytes := jsonToBytes(resultData)
	sendMessage(client, bytes)
}

func offer(data map[string]interface{}) {
	fmt.Println("[Offer]")
	id := data["id"].(string)
	sdp, _ := json.Marshal(data["sdp"])
	sdpData[id] = string(sdp)
}

func answer(data map[string]interface{}) {
	// offerの送り主にanswerを返す
	sendAnswer(data)

	// answerの送り主にcandidateを送る
	sendCandidate(data)
}

func sendAnswer(data map[string]interface{}) {
	fmt.Println("[Answer]")
	resultData := make(map[string]string)
	resultData["type"] = "answer"
	target_id := data["target_id"].(string)
	sdp, _ := json.Marshal(data["sdp"])
	resultData["sdp"] = string(sdp)

	client := clientsByID[target_id]
	bytes := jsonToBytes(resultData)
	sendMessage(client, bytes)
}

func sendCandidate(data map[string]interface{}) {
	returnData := make(map[string]string)
	id := offerId
	if _, ok := candidateData[id]; !ok {
		return
	}

	answerId := data["id"].(string)
	client := clientsByID[answerId]
	fmt.Println("candidate受け取り")
	fmt.Println("[Candidate]")
	returnData["type"] = "candidate"
	returnData["candidate"] = strings.Join(candidateData[id], "|")
	bytes := jsonToBytes(returnData)
	sendMessage(client, bytes)

}

func candidateAdd(data map[string]interface{}) {
	fmt.Println("[Candidate Add]")
	resultData := make(map[string]string)

	// 相手が已經接続的話、candidateDataに入れずに直接送る
	id := data["id"].(string)
	candidateByte, _ := json.Marshal(data["candidate"])
	candidate := string(candidateByte)

	target_id := data["target_id"].(string)
	if target_id != "" {
		if client, ok2 := clientsByID[target_id]; ok2 {
			// 相手が有的話
			fmt.Println("[Candidate]")
			resultData["type"] = "candidate"
			resultData["candidate"] = candidate
			bytes := jsonToBytes(resultData)
			sendMessage(client, bytes)
			return
		}
	}

	// 相手が還沒來 -> 保存
	if _, ok := candidateData[id]; !ok {
		candidateData[id] = []string{candidate}
	} else {
		candidateData[id] = append(candidateData[id], candidate)
	}
}

// 訊息送信
func sendMessage(client *websocket.Conn, bytes []byte) {
	err := client.WriteMessage(websocket.TextMessage, bytes)
	if err != nil {
		log.Println(err)
		client.Close()
		delete(clients, client)
	}
}

func jsonToBytes(result map[string]string) []byte {
	jsonText, err := json.Marshal(result)
	if err != nil {
		panic(err)
	}

	bytes := []byte(jsonText)
	return bytes
}

Discussion