📑
WebSocketを使用したシグナリングサーバー
はじめに
本記事は、Go言語でWebSocketを使用し、シグナリングを行うプログラムの解説です。
WebRTCにおいてクライアント間の通信を確立するための情報交換を行います。
プログラムの解説
処理の流れ
パッケージと変数
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"github.com/gorilla/websocket"
)
まず、必要なパッケージをインポートします。
-
encoding/json
パッケージは、JSONデータのエンコードとデコードを行うために使用されます。 -
log
パッケージは、ログ出力を行うために使用されます。 -
net/http
パッケージは、HTTPサーバーを作成するために使用されます。 -
strings
パッケージは、文字列操作に使用されます。 -
github.com/gorilla/websocket
パッケージは、WebSocket接続を扱うための機能を提供します。
var (
upgrader = websocket.Upgrader{}
clients = make(map[*websocket.Conn]string)
clientsByID = make(map[string]*websocket.Conn)
broadcast = make(chan []byte)
offerId string = ""
functions = make(map[string]interface{})
sdpData = make(map[string]string)
candidateData = make(map[string][]string)
)
次に、変数の宣言を行います。
-
upgrader
: WebSocket接続をアップグレードするためのwebsocket.Upgrader
構造体。 -
clients
: WebSocket接続のマップ。各クライアントの接続をキーとし、クライアントの識別子を値として持ちます。 -
clientsByID
: クライアントの識別子をキーとし、WebSocket接続を値として持つマップ。 -
broadcast
: メッセージのブロードキャストを行うためのチャネル。 -
offerId
: 現在のオファーの識別子。 -
functions
: メッセージのタイプに対応する関数を格納するマップ。 -
sdpData
: オファーやアンサーのSDP(Session Description Protocol)データを格納するマップ。 -
candidateData
: ICE(Interactive Connectivity Establishment)の候補情報を格納するマップ。
main関数
func main() {
functions["connect"] = connect
functions["offer"] = offer
functions["answer"] = answer
functions["candidateAdd"] = candidateAdd
http
.HandleFunc("/ws", handleWebSocket)
go handleMessages()
http.ListenAndServe(":8080", nil)
}
main
関数では、以下の処理を行います。
- シグナリングメッセージのタイプに対応する関数を
functions
マップに登録します。 -
/ws
パスに対してWebSocket接続を処理するハンドラ関数handleWebSocket
を登録します。 - メッセージの処理を行うためのゴルーチンである
handleMessages
関数を開始します。 - HTTPサーバーを起動し、ポート8080でリクエストを受け付けるようにします。
handleWebSocket関数
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil) // error ignored for sake of simplicity
for {
_, message, err := conn.ReadMessage()
if _, ok := clients[conn]; !ok {
// 新規接続
var jsonStr = string(message)
var data map[string]interface{}
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
panic(err)
}
// idの登録
id := data["id"].(string)
clients[conn] = id
clientsByID[id] = conn
}
if err != nil {
log.Println(err)
delete(clientsByID, clients[conn])
delete(clients, conn)
break
}
broadcast <- message
}
offerId = ""
defer conn.Close()
}
handleWebSocket
関数は、WebSocket接続を処理するためのハンドラ関数です。
以下の処理が行われます。
-
upgrader.Upgrade
関数を使用してWebSocket接続をアップグレードします。 - 接続が確立されると、無限ループが開始されます。
- メッセージを読み取り、クライアントが新規接続かどうかを確認します。もし新規接続なら、クライアントの識別子を取得し、
clients
とclientsByID
のマップに登録します。 - エラーが発生した場合は、エラーログを出力し、クライアントを削除してループを終了します。
- メッセージを
broadcast
チャネルに送信します。 - ループを抜けた後、オファーの識別子をリセットし、接続を閉じます。
handleMessages関数
func handleMessages() {
for {
message := <-broadcast
// text -> json
var jsonStr = string(message)
fmt.Println(jsonStr)
var data map[string]interface{}
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
panic(err)
}
// 処理分岐
msgDataType := data["type"].(string)
function := functions[msgDataType].(func(map[string]interface{}))
function(data)
}
}
handleMessages
関数は、broadcast
チャネルからメッセージを受け取り、処理を行うためのゴルーチンです。以下の処理が行われます。
-
broadcast
チャネルからメッセージを受け取ります。 - 受け取ったメッセージをJSON形式に変換し、
data
マップにデコードします。 - メッセージのタイプに基づいて処理を分岐し、対応する関数を呼び出します。
connect関数
func connect(data map[string]interface{}) {
resultData := make(map[string]string)
id := data["id"].(string)
client := clientsByID[id]
// offerを送ってもらう
if len(offerId) == 0 {
offerId = id
resultData["type"] = "offer"
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
return
} else if id == offerId {
// 重複
return
}
// offerを送る
resultData["type"] = "offer"
resultData["sdp"] = sdpData[offerId]
resultData["target_id"] = offerId
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
}
connect
関数は、クライアントが接続したときに実行される関数です。以下の処理が行われます。
- クライアントの識別子を取得し、クライアントのWebSocket接続を取得します。
- もし現在のオファーの識別子が空であれば、オファーを要求するためのメッセージを送信します。
- もしクライアントの識別子が現在のオファーの識別子と一致する場合は、重複として処理を終了します。
- それ以外の場合は、オファーを送信するためのメッセージを作成し、クライアントに送信します。
offer関数
func offer(data map[string]interface{}) {
fmt.Println("[Offer]")
id := data["id"].(string)
sdp, _ := json.Marshal(data["sdp"])
sdpData[id] = string(sdp)
}
offer
関数は、オファーの受信時に実行される関数です。以下の処理が行われます。
- オファーの送信元のクライアントの識別子を取得します。
- オファーのSDPデータをJSON形式に変換し、
sdpData
マップに格納します。
answer関数
func answer(data map[string]interface{}) {
fmt.Println("[Answer]")
id := data["id"].(string)
sdp, _ := json.Marshal(data["sdp"])
sdpData[id] = string(sdp)
// 接続を確立する
targetClient := clientsByID[id]
resultData := make(map[string]string)
resultData["type"] = "answer"
resultData["sdp"] = sdpData[offerId]
bytes := jsonToBytes(resultData)
sendMessage(targetClient, bytes)
}
answer
関数は、アンサーの受信時に実行される関数です。以下の処理が行われます。
- アンサーの送信元のクライアントの識別子を取得します。
- アンサーのSDPデータをJSON形式に変換し、
sdpData
マップに格納します。 - オファー元のクライアントに対してアンサーを送信するためのメッセージを作成し、送信します。
candidateAdd関数
func candidateAdd(data map[string]interface{}) {
fmt.Println("[Candidate]")
id := data["id"].(string)
candidates := data["candidates"].([]interface{})
for _, candidate := range candidates {
candidateData[id] = append(candidateData[id], candidate.(string))
}
targetClient := clientsByID[id]
resultData := make(map[string]interface{})
resultData["type"] = "candidate"
resultData["candidates"] = candidateData[offerId]
bytes := jsonToBytes(resultData)
sendMessage(targetClient, bytes)
}
candidateAdd
関数は、ICEの候補情報の受信時に実行される関数です。以下の処理が行われます。
- 候補情報の送信元のクライアントの識別子を取得します。
- 候補情報を
candidateData
マップに追加します。 - オファー元のクライアントに対して候補情報を送信するためのメッセージを作成し、送信します。
sendMessage関数
func sendMessage(client *websocket.Conn, message []byte) {
err := client.WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Println(err)
client.Close()
delete(clientsByID, clients[client])
delete(clients, client)
}
}
sendMessage
関数は、指定されたクライアントにメッセージを送信するための関数です。以下の処理が行われます。
-
client.WriteMessage
関数を使用してメッセージを送信します。 - エラーが発生した場合は、エラーログを出力し、クライアントを削除します。
jsonToBytes関数
func jsonToBytes(data map[string]interface{}) []byte {
bytes, _ := json.Marshal(data)
return bytes
}
jsonToBytes
関数は、指定されたマップをJSON形式に変換してバイト配列に変換するための関数です。
終わりに
私にとって初めてGo言語を使用した実装になるため、間違い等あるかもしれません。
そのときは教えていただけると幸いです。
全体のプログラム
コメント文に中国語が混じっていたり間違っている可能性があります。ご了承ください。
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"github.com/gorilla/websocket"
)
var (
upgrader = websocket.Upgrader{}
clients = make(map[*websocket.Conn]string)
clientsByID = make(map[string]*websocket.Conn)
broadcast = make(chan []byte)
offerId string = ""
functions = make(map[string]interface{})
sdpData = make(map[string]string)
candidateData = make(map[string][]string)
)
func main() {
functions["connect"] = connect
functions["offer"] = offer
functions["answer"] = answer
functions["candidateAdd"] = candidateAdd
http.HandleFunc("/ws", handleWebSocket)
go handleMessages()
http.ListenAndServe(":8080", nil)
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil) // error ignored for sake of simplicity
for {
_, message, err := conn.ReadMessage()
if _, ok := clients[conn]; !ok {
// 新規接続
var jsonStr = string(message)
var data map[string]interface{}
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
panic(err)
}
// idの登録
id := data["id"].(string)
clients[conn] = id
clientsByID[id] = conn
}
if err != nil {
log.Println(err)
delete(clientsByID, clients[conn])
delete(clients, conn)
break
}
broadcast <- message
}
offerId = ""
defer conn.Close()
}
func handleMessages() {
for {
message := <-broadcast
// text -> json
var jsonStr = string(message)
fmt.Println(jsonStr)
var data map[string]interface{}
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
panic(err)
}
// 処理分岐
msgDataType := data["type"].(string)
function := functions[msgDataType].(func(map[string]interface{}))
function(data)
}
}
func connect(data map[string]interface{}) {
resultData := make(map[string]string)
id := data["id"].(string)
client := clientsByID[id]
// offerを送ってもらう
if len(offerId) == 0 {
offerId = id
resultData["type"] = "offer"
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
return
} else if id == offerId {
// 重複
return
}
// offerを送る
resultData["type"] = "offer"
resultData["sdp"] = sdpData[offerId]
resultData["target_id"] = offerId
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
}
func offer(data map[string]interface{}) {
fmt.Println("[Offer]")
id := data["id"].(string)
sdp, _ := json.Marshal(data["sdp"])
sdpData[id] = string(sdp)
}
func answer(data map[string]interface{}) {
// offerの送り主にanswerを返す
sendAnswer(data)
// answerの送り主にcandidateを送る
sendCandidate(data)
}
func sendAnswer(data map[string]interface{}) {
fmt.Println("[Answer]")
resultData := make(map[string]string)
resultData["type"] = "answer"
target_id := data["target_id"].(string)
sdp, _ := json.Marshal(data["sdp"])
resultData["sdp"] = string(sdp)
client := clientsByID[target_id]
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
}
func sendCandidate(data map[string]interface{}) {
returnData := make(map[string]string)
id := offerId
if _, ok := candidateData[id]; !ok {
return
}
answerId := data["id"].(string)
client := clientsByID[answerId]
fmt.Println("candidate受け取り")
fmt.Println("[Candidate]")
returnData["type"] = "candidate"
returnData["candidate"] = strings.Join(candidateData[id], "|")
bytes := jsonToBytes(returnData)
sendMessage(client, bytes)
}
func candidateAdd(data map[string]interface{}) {
fmt.Println("[Candidate Add]")
resultData := make(map[string]string)
// 相手が已經接続的話、candidateDataに入れずに直接送る
id := data["id"].(string)
candidateByte, _ := json.Marshal(data["candidate"])
candidate := string(candidateByte)
target_id := data["target_id"].(string)
if target_id != "" {
if client, ok2 := clientsByID[target_id]; ok2 {
// 相手が有的話
fmt.Println("[Candidate]")
resultData["type"] = "candidate"
resultData["candidate"] = candidate
bytes := jsonToBytes(resultData)
sendMessage(client, bytes)
return
}
}
// 相手が還沒來 -> 保存
if _, ok := candidateData[id]; !ok {
candidateData[id] = []string{candidate}
} else {
candidateData[id] = append(candidateData[id], candidate)
}
}
// 訊息送信
func sendMessage(client *websocket.Conn, bytes []byte) {
err := client.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
log.Println(err)
client.Close()
delete(clients, client)
}
}
func jsonToBytes(result map[string]string) []byte {
jsonText, err := json.Marshal(result)
if err != nil {
panic(err)
}
bytes := []byte(jsonText)
return bytes
}
Discussion