Closed18

Mattermostのソースコード追ってみる

にしやまにしやま

目的:
GoのOSSのコードを読むこと

Mattermostとは:
Slackクローンのチャットアプリ
前職でも使っていたくらい結構出来の良いちゃんとした(スマホ/Web/デスクトップ)アプリです

技術スタック:
API: Go
Webフロント: React
デスクトップ: Electron
モバイルアプリ: React Native

メインで読むのはAPIとWebフロントになると思います
前職の同僚のエンジニアと輪読会シリーズ第二弾でGoのOSS読む会として一緒にやってます

にしやまにしやま

「〜が入力しています」のイベントを検知する仕組み

フロントエンドのコードから探す
webapp/channels/src/i18n/ja.json

"msg_typing.isTyping": "{user}が入力しています..."

が見つかる

webapp/channels/src/components/msg_typing/msg_typing.tsx

return (
    <FormattedMessage
        id='msg_typing.isTyping'
        defaultMessage='{user} is typing...'
        values={{
            user: users[0],
        }}
    />
);

ここで呼ばれている

このファイル内でuseWebSocketを読んでいる箇所がある
ここにおそらくどうサーバを読んでいるかの定義もあるのでそれを探す

にしやまにしやま

webapp/channels/src/utils/use_websocket/hooks.ts

export function useWebSocket({handler}: UseWebSocketOptions) {
    const wsClient = useWebSocketClient();

    useEffect(() => {
        wsClient.addMessageListener(handler);

        return () => {
            wsClient.removeMessageListener(handler);
        };
    }, [wsClient, handler]);
}

hooks.tsからwsClientの定義元のwebapp/platform/client/src/websocket.tsに飛び、initialize(connectionUrl = this.connectionUrl, token?: string)関数で接続先のURLも設定する

this.conn = new WebSocket(`${connectionUrl}?connection_id=${this.connectionId}&sequence_number=${this.serverSequence}`);

WebSocketClient.initialize()を読んでいる箇所を探す
webapp/channels/src/actions/websocket_actions.jsxで読んでいる

WebSocketClient.initialize(connUrl); // 182行目

connUrlの定義は

let connUrl = '';
    if (config.WebsocketURL) {
        connUrl = config.WebsocketURL;
    } else {
        connUrl = new URL(getSiteURL());

        // replace the protocol with a websocket one
        if (connUrl.protocol === 'https:') {
            connUrl.protocol = 'wss:';
        } else {
            connUrl.protocol = 'ws:';
        }

        // append a port number if one isn't already specified
        if (!(/:\d+$/).test(connUrl.host)) {
            if (connUrl.protocol === 'wss:') {
                connUrl.host += ':' + config.WebsocketSecurePort;
            } else {
                connUrl.host += ':' + config.WebsocketPort;
            }
        }

        connUrl = connUrl.toString();
    }

    // Strip any trailing slash before appending the pathname below.
    if (connUrl.length > 0 && connUrl[connUrl.length - 1] === '/') {
        connUrl = connUrl.substring(0, connUrl.length - 1);
    }

    connUrl += Client4.getUrlVersion() + '/websocket';

ここから、{config.WebsocketURL}/api/v4/websocketにアクセスしてるとわかった

にしやまにしやま

次はAPIの方を見に行く
server/channels/api4/websocket.go
にそれらしいファイルを発見

が、ここではコネクション確立以外は大したことをやっていないのでいまいちわからなかった。

再度フロントの方に戻ってみる。

にしやまにしやま

webapp/channels/src/components/msg_typing/msg_typing.tsxに戻ってどうやってサーバにタイピング中の情報を送っているのか調べる
下記のgetTypingTextのprops.typingUsersにデータがあればフロントにタイピング中と表示できるようになっていそう。

const getTypingText = () => {
    let users: string[] = [];
    let numUsers = 0;
    if (props.typingUsers) {
        users = [...props.typingUsers];
        numUsers = users.length;
    }

    if (numUsers === 0) {
        return '';
    }
    // 中略
    return (
        <FormattedMessage
            id='msg_typing.areTyping'
            defaultMessage='{users} and {last} are typing...'
            values={{
                users: (users.join(', ')),
                last,
            }}
        />
    );
};

でもこのコンポーネントの呼び出し元ではprops.typingUsersを渡していなかった。
webapp/channels/src/components/advanced_text_editor/advanced_text_editor.tsx

<MsgTyping
    channelId={channelId}
    postId={postId}
/>
にしやまにしやま

webapp/channels/src/components/msg_typing/index.ts
でStateに登録してコンポーネントに渡してるっぽい
connectって関数はreduxに備わってるやつみたい


function makeMapStateToProps() {
    const getUsersTypingByChannelAndPost = makeGetUsersTypingByChannelAndPost();

    return function mapStateToProps(state: GlobalState, ownProps: OwnProps) {
        const typingUsers = getUsersTypingByChannelAndPost(state, {channelId: ownProps.channelId, postId: ownProps.postId});

        return {
            typingUsers,
        };
    };
}

const mapDispatchToProps = {
    userStartedTyping,
    userStoppedTyping,
};

export default connect(makeMapStateToProps, mapDispatchToProps)(MsgTyping);
にしやまにしやま

上のuserStartedTypingの定義元
dispatch()もreduxの関数らしい
ここではストアにタイピング中イベントの登録と一定時間後にストップのイベントを登録することで実質的にイベントの有効期限を設けている

export function userStartedTyping(userId: string, channelId: string, rootId: string, now: number) {
    return (dispatch: DispatchFunc, getState: GetStateFunc) => {
        const state = getState();

        if (
            isPerformanceDebuggingEnabled(state) &&
            getBool(state, Preferences.CATEGORY_PERFORMANCE_DEBUGGING, Preferences.NAME_DISABLE_TYPING_MESSAGES)
        ) {
            return;
        }

        dispatch({
            type: WebsocketEvents.TYPING,
            data: {
                id: channelId + rootId,
                userId,
                now,
            },
        });

        // Ideally this followup loading would be done by someone else
        dispatch(fillInMissingInfo(userId));

        setTimeout(() => {
            dispatch(userStoppedTyping(userId, channelId, rootId, now));
        }, getTimeBetweenTypingEvents(state));
    };
}
にしやまにしやま

webapp/channels/src/components/msg_typing/msg_typing.tsxにもう一度戻る
useWebSocketとして登録したhandlerがどう使われるのかよくわかってないので追ってみる

useWebSocket({
    handler: useCallback((msg: WebSocketMessage) => {
        if (msg.event === SocketEvents.TYPING) {
            const channelId = msg.broadcast.channel_id;
            const rootId = msg.data.parent_id;
            const userId = msg.data.user_id;

            if (props.channelId === channelId && props.postId === rootId) {
                userStartedTyping(userId, channelId, rootId, Date.now());
            }
        } else if (msg.event === SocketEvents.POSTED) {
            const post = JSON.parse(msg.data.post);

            const channelId = post.channel_id;
            const rootId = post.root_id;
            const userId = post.user_id;

            if (props.channelId === channelId && props.postId === rootId) {
                userStoppedTyping(userId, channelId, rootId, Date.now());
            }
        }
    }, [props.channelId, props.postId, userStartedTyping, userStoppedTyping]),
});

useWebSocketの定義元
webapp/channels/src/utils/use_websocket/hooks.ts
wsClient(見るからにWebSocketのクライアント)にaddMessageListenerとしてhandlerを登録してる
これがどう使われるのか?

export function useWebSocket({handler}: UseWebSocketOptions) {
    const wsClient = useWebSocketClient();

    useEffect(() => {
        wsClient.addMessageListener(handler);

        return () => {
            wsClient.removeMessageListener(handler);
        };
    }, [wsClient, handler]);
}
にしやまにしやま

webapp/platform/client/src/websocket.ts
messageListenersというところに追加している
WebSocketMessageとしてイベントや実際に受け取ったデータが入ってくる様子

export type MessageListener = (msg: WebSocketMessage) => void;
private messageListeners = new Set<MessageListener>();
export type WebSocketMessage<T = any> = {
    event: string;
    data: T;
    broadcast: WebSocketBroadcast;
    seq: number;
};

// 中略

export default class WebSocketClient {
    // 中略
    addMessageListener(listener: MessageListener) {
        this.messageListeners.add(listener);

        if (this.messageListeners.size > 5) {
            // eslint-disable-next-line no-console
            console.warn(
                `WebSocketClient has ${this.messageListeners.size} message listeners registered`
            );
        }
    }
}

実際に受け取ってくるところも確認したい

export default class WebSocketClient {
    initialize(connectionUrl = this.connectionUrl, token?: string) {
    // 中略
    this.conn = new WebSocket(`${connectionUrl}?connection_id=${this.connectionId}&sequence_number=${this.serverSequence}`);
    // 中略
    this.conn.onmessage = (evt) => {
            const msg = JSON.parse(evt.data);
            // たくさん中略
            this.messageListeners.forEach((listener) => listener(msg));
        };
    }
}

WebSocketClientinitializeの中でonmessageイベントに対して処理を登録してる
ここでmessageListenersの個々のデータ(=wsClient.addMessageListener(handler);でaddしたhandler)をループしてmsg(=WebSocketで受け取ったデータ)を引数に関数として実行している。
onmessageはJavaScriptに備わっているWebSocketのイベント

にしやまにしやま

フロント側でどうやって入力中のイベントを受け取って反映させているかは完全に理解したのでもう一度APIの方に戻ってみる
先ほど見ていたserver/channels/api4/websocket.goはそんなに長くない

func (api *API) InitWebSocket() {
	// Optionally supports a trailing slash
	api.BaseRoutes.APIRoot.Handle("/{websocket:websocket(?:\\/)?}", api.APIHandlerTrustRequester(connectWebSocket)).Methods("GET")
}

func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
	upgrader := websocket.Upgrader{
		ReadBufferSize:  model.SocketMaxMessageSizeKb,
		WriteBufferSize: model.SocketMaxMessageSizeKb,
		CheckOrigin:     c.App.OriginChecker(),
	}

	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", nil, err.Error(), http.StatusBadRequest)
		return
	}

	// We initialize webconn with all the necessary data.
	// If the queues are empty, they are initialized in the constructor.
	cfg := &platform.WebConnConfig{
		WebSocket: ws,
		Session:   *c.AppContext.Session(),
		TFunc:     c.AppContext.T,
		Locale:    "",
		Active:    true,
	}

	cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
	if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {
		// If not present, we assume client is not capable yet, or it's a fresh connection.
		// We just create a new ID.
		cfg.ConnectionID = model.NewId()
		// In case of fresh connection id, sequence number is already zero.
	} else {
		cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))
		if err != nil {
			c.Logger.Warn("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
			ws.Close()
			return
		}
	}

	wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
	if c.AppContext.Session().UserId != "" {
		c.App.Srv().Platform().HubRegister(wc)
	}

	wc.Pump()
}

ざっとみる感じ、

  • ws, err := upgrader.Upgrade(w, r, nil)でhttpをWebSocketにUpgradeする
  • wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())でおそらくWebSocketのインスタンス生成
  • wc.Pump()でコネクションを張る

さっきは見落としていたwc.Pump()が肝っぽいのでこれを追ってみる

にしやまにしやま

server/channels/app/platform/web_conn.go
ぱっと見よくわからんけど並行処理でWebSocketのリクエストを発行しているように見える
コメントのsend/receiveというあたりからもこの辺で送信と受信の処理をやっていそう

// Pump starts the WebConn instance. After this, the websocket
// is ready to send/receive messages.
func (wc *WebConn) Pump() {
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		wc.writePump()
	}()

	wg.Add(1)
	go wc.pluginPostedConsumer(&wg)

	wc.readPump()
	close(wc.endWritePump)
	close(wc.pluginPosted)
	wg.Wait()
	wc.Platform.HubUnregister(wc)
	close(wc.pumpFinished)

	userID := wc.UserId
	wc.Platform.Go(func() {
		wc.HookRunner.RunMultiHook(func(hooks plugin.Hooks) bool {
			hooks.OnWebSocketDisconnect(wc.GetConnectionID(), userID)
			return true
		}, plugin.OnWebSocketDisconnectID)
	})
}
にしやまにしやま

とりあえず今回はここまで
次回は来週の木曜21:00~22:00の予定(今日21:00~22:00の予定だったのに0時近くまでやってしまった)

にしやまにしやま

server/channels/app/platform/web_conn.go
送信と受信の処理を追ってみる
今回はPump()の中で実行されているwritePump()とreadPump()について。

writePump()

(waitGroupに1addした後)最初にwritePump()を実行している

go func() {
	defer wg.Done()
	wc.writePump()
}()

ここでは下記を主に行なっている

  • deadQueueに関連する処理
    • deadQueueは送信するメッセージを入れる箇所っぽい
  • for文のselect case文でチャネルからデータが送られてくるのを待機している
    • チャネルから送られてきたらwc.writeMessageBuf()でコネクションにメッセージを書き込んでクライアントに送っている
    • writeMessageBuf()の中ではgorilla.WebSocketのライブラリを使ってる
  • goroutineの中でwritePump()を実行することで書き込み待ちのループを並行処理で実施

readPump()

  • readLimitやreadDeadlineを設定している
    • 一度に受信するメッセージサイズの上限やPong(WebSocket接続中にクライアント/サーバ間でリクエストをやりとりする時に送り合う応答確認のようなものらしい)の有効期限を設定
  • for文で無条件ループし、コネクションが切れるまで(=この処理が終了するまで)WebSocketからのメッセージを受信する
for {
	msgType, rd, err := wc.WebSocket.NextReader()
	if err != nil {
		wc.logSocketErr("websocket.NextReader", err)
		return
	}

	var decoder interface {
		Decode(v any) error
	}
	if msgType == websocket.TextMessage {
		decoder = json.NewDecoder(rd)
	} else {
		decoder = msgpack.NewDecoder(rd)
	}
	var req model.WebSocketRequest
	if err = decoder.Decode(&req); err != nil {
		wc.logSocketErr("websocket.Decode", err)
		return
	}

	// Messages which actions are prefixed with the plugin prefix
	// should only be dispatched to the plugins
	if !strings.HasPrefix(req.Action, websocketMessagePluginPrefix) {
		wc.Platform.WebSocketRouter.ServeWebSocket(wc, &req)
	}

	clonedReq, err := req.Clone()
	if err != nil {
		wc.logSocketErr("websocket.cloneRequest", err)
		continue
	}

	wc.pluginPosted <- pluginWSPostedHook{wc.GetConnectionID(), wc.UserId, clonedReq}
}

この2つでWebSocketを介してメッセージを送受信していることはわかった
来週はwritePump()の中でselect caseでチャネルから受信しているメッセージがどこから渡されているのかなどを調べたい

にしやまにしやま

@Kiyo_Karl2さんのメモ

Socket処理
server/channels/app/server.go:275 でStart()を呼んでいる

func (ps *PlatformService) Start(broadcastHooks map[string]BroadcastHook) error {
    ps.hubStart(broadcastHooks)

    ps.configListenerId = ps.AddConfigListener(func(_, _ *model.Config) {
        ps.regenerateClientConfig()

        message := model.NewWebSocketEvent(model.WebsocketEventConfigChanged, "", "", "", nil, "")

        message.Add("config", ps.ClientConfigWithComputed())
        ps.Go(func() {
            ps.Publish(message)
        })

        if err := ps.ReconfigureLogger(); err != nil {
            mlog.Error("Error re-configuring logging after config change", mlog.Err(err))
            return
        }
    })

    ps.licenseListenerId = ps.AddLicenseListener(func(oldLicense, newLicense *model.License) {
        ps.regenerateClientConfig()

        message := model.NewWebSocketEvent(model.WebsocketEventLicenseChanged, "", "", "", nil, "")
        message.Add("license", ps.GetSanitizedClientLicense())
        ps.Go(func() {
            ps.Publish(message)
        })
    })
    return nil
}

このhubStart関数は、WebSocket通信を管理するためのHubオブジェクトを複数生成し、起動するための処理を含んでいます。この関数はPlatformServiceという構造体のメソッドであり、WebSocketサーバーの一部として機能している。

func (ps *PlatformService) hubStart(broadcastHooks map[string]BroadcastHook) {
    // Total number of hubs is twice the number of CPUs.
    numberOfHubs := runtime.NumCPU() * 2
    ps.logger.Info("Starting websocket hubs", mlog.Int("number_of_hubs", numberOfHubs))

    hubs := make([]*Hub, numberOfHubs)

    for i := 0; i < numberOfHubs; i++ {
        hubs[i] = newWebHub(ps)
        hubs[i].connectionIndex = i
        hubs[i].broadcastHooks = broadcastHooks
        hubs[i].Start()
    }
    // Assigning to the hubs slice without any mutex is fine because it is only assigned once
    // during the start of the program and always read from after that.
    ps.hubs = hubs
}

server/channels/app/platform/web_hub.go:370 でhubを起動してチャネルを待機している

server/channels/api4/websocket.go:26 でhubが登録される

func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {  
    upgrader := websocket.Upgrader{  
       ReadBufferSize:  model.SocketMaxMessageSizeKb,  
       WriteBufferSize: model.SocketMaxMessageSizeKb,  
       CheckOrigin:     c.App.OriginChecker(),  
    }  
  
    ws, err := upgrader.Upgrade(w, r, nil)  
    if err != nil {  
       c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", nil, err.Error(), http.StatusBadRequest)  
       return  
    }  
  
    // We initialize webconn with all the necessary data.  
    // If the queues are empty, they are initialized in the constructor.    cfg := &platform.WebConnConfig{  
       WebSocket: ws,  
       Session:   *c.AppContext.Session(),  
       TFunc:     c.AppContext.T,  
       Locale:    "",  
       Active:    true,  
    }  
  
    cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)  
    if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {  
       // If not present, we assume client is not capable yet, or it's a fresh connection.  
       // We just create a new ID.       cfg.ConnectionID = model.NewId()  
       // In case of fresh connection id, sequence number is already zero.  
    } else {  
       cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))  
       if err != nil {  
          c.Logger.Warn("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))  
          ws.Close()  
          return  
       }  
    }  
  
    wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())  
    if c.AppContext.Session().UserId != "" {  
       c.App.Srv().Platform().HubRegister(wc)  
    }  
 
    wc.Pump()  
}
にしやまにしやま

↑を見つつ、
channels/app/platform/web_hub.go を見ていた
Start関数でゴルーチンを回してコネクションにアクションが発生したらそれを伝達する処理を実行していた

// Start starts the hub.
func (h *Hub) Start() {
	var doStart func()
	var doRecoverableStart func()
	var doRecover func()

	doStart = func() {
		mlog.Debug("Hub is starting", mlog.Int("index", h.connectionIndex))

		ticker := time.NewTicker(inactiveConnReaperInterval)
		defer ticker.Stop()

		connIndex := newHubConnectionIndex(inactiveConnReaperInterval)

		for {
			select {
			case webSessionMessage := <-h.checkRegistered:
				conns := connIndex.ForUser(webSessionMessage.userID)
				var isRegistered bool
				for _, conn := range conns {
					if !conn.active.Load() {
						continue
					}
					if conn.GetSessionToken() == webSessionMessage.sessionToken {
						isRegistered = true
					}
				}
				webSessionMessage.isRegistered <- isRegistered
			case req := <-h.checkConn:
				var res *CheckConnResult
				conn := connIndex.RemoveInactiveByConnectionID(req.userID, req.connectionID)
				if conn != nil {
					res = &CheckConnResult{
						ConnectionID:     req.connectionID,
						UserID:           req.userID,
						ActiveQueue:      conn.send,
						DeadQueue:        conn.deadQueue,
						DeadQueuePointer: conn.deadQueuePointer,
						ReuseCount:       conn.reuseCount + 1,
					}
				}
				req.result <- res
			case <-ticker.C:
				connIndex.RemoveInactiveConnections()
			case webConn := <-h.register:
				// Mark the current one as active.
				// There is no need to check if it was inactive or not,
				// we will anyways need to make it active.
				webConn.active.Store(true)

				connIndex.Add(webConn)
				atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))

				if webConn.IsAuthenticated() && webConn.reuseCount == 0 {
					// The hello message should only be sent when the reuseCount is 0.
					// i.e in server restart, or long timeout, or fresh connection case.
					// In case of seq number not found in dead queue, it is handled by
					// the webconn write pump.
					webConn.send <- webConn.createHelloMessage()
				}
			case webConn := <-h.unregister:
				// If already removed (via queue full), then removing again becomes a noop.
				// But if not removed, mark inactive.
				webConn.active.Store(false)

				atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))

				if webConn.UserId == "" {
					continue
				}

				conns := connIndex.ForUser(webConn.UserId)
				if len(conns) == 0 || areAllInactive(conns) {
					userID := webConn.UserId
					h.platform.Go(func() {
						h.platform.SetStatusOffline(userID, false)
					})
					continue
				}
				var latestActivity int64
				for _, conn := range conns {
					if !conn.active.Load() {
						continue
					}
					if conn.lastUserActivityAt > latestActivity {
						latestActivity = conn.lastUserActivityAt
					}
				}

				if h.platform.isUserAway(latestActivity) {
					userID := webConn.UserId
					h.platform.Go(func() {
						h.platform.SetStatusLastActivityAt(userID, latestActivity)
					})
				}
			case userID := <-h.invalidateUser:
				for _, webConn := range connIndex.ForUser(userID) {
					webConn.InvalidateCache()
				}
			case activity := <-h.activity:
				for _, webConn := range connIndex.ForUser(activity.userID) {
					if !webConn.active.Load() {
						continue
					}
					if webConn.GetSessionToken() == activity.sessionToken {
						webConn.lastUserActivityAt = activity.activityAt
					}
				}
			case directMsg := <-h.directMsg:
				if !connIndex.Has(directMsg.conn) {
					continue
				}
				select {
				case directMsg.conn.send <- directMsg.msg:
				default:
					// Don't log the warning if it's an inactive connection.
					if directMsg.conn.active.Load() {
						mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
							mlog.String("user_id", directMsg.conn.UserId),
							mlog.String("conn_id", directMsg.conn.GetConnectionID()))
					}
					close(directMsg.conn.send)
					connIndex.Remove(directMsg.conn)
				}
			case msg := <-h.broadcast:
				if metrics := h.platform.metricsIFace; metrics != nil {
					metrics.DecrementWebSocketBroadcastBufferSize(strconv.Itoa(h.connectionIndex), 1)
				}

				// Remove the broadcast hook information before precomputing the JSON so that those aren't included in it
				msg, broadcastHooks, broadcastHookArgs := msg.WithoutBroadcastHooks()

				msg = msg.PrecomputeJSON()

				broadcast := func(webConn *WebConn) {
					if !connIndex.Has(webConn) {
						return
					}
					if webConn.ShouldSendEvent(msg) {
						select {
						case webConn.send <- h.runBroadcastHooks(msg, webConn, broadcastHooks, broadcastHookArgs):
						default:
							// Don't log the warning if it's an inactive connection.
							if webConn.active.Load() {
								mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
									mlog.String("user_id", webConn.UserId),
									mlog.String("conn_id", webConn.GetConnectionID()))
							}
							close(webConn.send)
							connIndex.Remove(webConn)
						}
					}
				}

				if connID := msg.GetBroadcast().ConnectionId; connID != "" {
					if webConn := connIndex.byConnectionId[connID]; webConn != nil {
						broadcast(webConn)
						continue
					}
				} else if msg.GetBroadcast().UserId != "" {
					candidates := connIndex.ForUser(msg.GetBroadcast().UserId)
					for _, webConn := range candidates {
						broadcast(webConn)
					}
					continue
				}

				candidates := connIndex.All()
				for webConn := range candidates {
					broadcast(webConn)
				}
			case <-h.stop:
				for webConn := range connIndex.All() {
					webConn.Close()
					h.platform.SetStatusOffline(webConn.UserId, false)
				}

				h.explicitStop = true
				close(h.didStop)

				return
			}
		}
	}

	doRecoverableStart = func() {
		defer doRecover()
		doStart()
	}

	doRecover = func() {
		if !h.explicitStop {
			if r := recover(); r != nil {
				mlog.Error("Recovering from Hub panic.", mlog.Any("panic", r))
			} else {
				mlog.Error("Webhub stopped unexpectedly. Recovering.")
			}

			mlog.Error(string(debug.Stack()))

			go doRecoverableStart()
		}
	}

	go doRecoverableStart()
}

この中でconn.sendのチャネルにデータを入れているところが、WebSocketでクライアントにメッセージを送ってる箇所として怪しい

で探したが、それっぽいのは見つからず、代わりにmodel.WebsocketEventTypingといういかにもな型を見つけた

type WebsocketEventType string

const (
	WebsocketEventTyping                              WebsocketEventType = "typing"
)

これの使用箇所を探すと、channels/app/user.go でPublishUserTypingという関数があった

func (a *App) PublishUserTyping(userID, channelID, parentId string) *model.AppError {
	omitUsers := make(map[string]bool, 1)
	omitUsers[userID] = true

	event := model.NewWebSocketEvent(model.WebsocketEventTyping, "", channelID, "", omitUsers, "")
	event.Add("parent_id", parentId)
	event.Add("user_id", userID)
	a.Publish(event)

	return nil
}

channels/wsapi/user.go で呼んでいる

func (api *API) InitUser() {
	api.Router.Handle("user_typing", api.APIWebSocketHandler(api.userTyping))
	api.Router.Handle("user_update_active_status", api.APIWebSocketHandler(api.userUpdateActiveStatus))
}

func (api *API) userTyping(req *model.WebSocketRequest) (map[string]any, *model.AppError) {
	api.App.ExtendSessionExpiryIfNeeded(request.EmptyContext(api.App.Log()), &req.Session)

	if api.App.Srv().Platform().Busy.IsBusy() {
		// this is considered a non-critical service and will be disabled when server busy.
		return nil, NewServerBusyWebSocketError(req.Action)
	}

	var ok bool
	var channelId string
	if channelId, ok = req.Data["channel_id"].(string); !ok || !model.IsValidId(channelId) {
		return nil, NewInvalidWebSocketParamError(req.Action, "channel_id")
	}

	if !api.App.SessionHasPermissionToChannel(request.EmptyContext(api.App.Log()), req.Session, channelId, model.PermissionCreatePost) {
		return nil, NewInvalidWebSocketParamError(req.Action, "channel_id")
	}

	var parentId string
	if parentId, ok = req.Data["parent_id"].(string); !ok {
		parentId = ""
	}

	appErr := api.App.PublishUserTyping(req.Session.UserId, channelId, parentId)

	return nil, appErr
}
にしやまにしやま

フロント側でuser_typingが使われている箇所を探す
platform/client/src/websocket.ts

userTyping(channelId: string, parentId: string, callback?: () => void) {
    const data = {
        channel_id: channelId,
        parent_id: parentId,
    };
    this.sendMessage("user_typing", data, callback);
}

channels/src/actions/global_actions.tsx で呼ばれている

export function emitLocalUserTypingEvent(channelId: string, parentPostId: string) {
    const userTyping = async (actionDispatch: DispatchFunc, actionGetState: GetStateFunc) => {
        const state = actionGetState();
        const config = getConfig(state);

        if (
            isPerformanceDebuggingEnabled(state) &&
            getBool(state, Preferences.CATEGORY_PERFORMANCE_DEBUGGING, Preferences.NAME_DISABLE_TYPING_MESSAGES)
        ) {
            return {data: false};
        }

        const t = Date.now();
        const stats = getCurrentChannelStats(state);
        const membersInChannel = stats ? stats.member_count : 0;

        const timeBetweenUserTypingUpdatesMilliseconds = Utils.stringToNumber(config.TimeBetweenUserTypingUpdatesMilliseconds);
        const maxNotificationsPerChannel = Utils.stringToNumber(config.MaxNotificationsPerChannel);

        if (((t - lastTimeTypingSent) > timeBetweenUserTypingUpdatesMilliseconds) &&
            (membersInChannel < maxNotificationsPerChannel) && (config.EnableUserTypingMessages === 'true')) {
            WebSocketClient.userTyping(channelId, parentPostId);
            lastTimeTypingSent = t;
        }

        return {data: true};
    };

    return dispatch(userTyping);
}

channels/src/components/advanced_create_post/advanced_create_post.tsx で呼ばれている
ここがユーザーが入力用の画面を作ってるぽくて、入力時にユーザータイピングしていたらuser_typingのエンドポイントを叩いて(? 叩くではなくWebSocketのコネクションに送るだけかも)、サーバーにタイピング中の情報を送っているぽい

postMsgKeyPress = (e: React.KeyboardEvent<TextboxElement>) => {
    const {ctrlSend, codeBlockOnCtrlEnter} = this.props;

    const {allowSending, withClosedCodeBlock, ignoreKeyPress, message} = postMessageOnKeyPress(
        e,
        this.state.message,
        Boolean(ctrlSend),
        Boolean(codeBlockOnCtrlEnter),
        Date.now(),
        this.lastChannelSwitchAt,
        this.state.caretPosition,
    ) as {
        allowSending: boolean;
        withClosedCodeBlock?: boolean;
        ignoreKeyPress?: boolean;
        message?: string;
    };

    if (ignoreKeyPress) {
        e.preventDefault();
        e.stopPropagation();
        return;
    }

    if (allowSending && this.isValidPersistentNotifications()) {
        if (e.persist) {
            e.persist();
        }
        if (this.textboxRef.current) {
            this.isDraftSubmitting = true;
            this.textboxRef.current.blur();
        }

        if (withClosedCodeBlock && message) {
            this.setState({message}, () => this.handleSubmit(e));
        } else {
            this.handleSubmit(e);
        }

        this.setShowPreview(false);
    }

    this.emitTypingEvent();
};

emitTypingEvent = () => {
    const channelId = this.props.currentChannel.id;
    GlobalActions.emitLocalUserTypingEvent(channelId, '');
};

channels/src/components/advanced_text_editor/advanced_text_editor.tsx でテキストボックスのonKeyPressのイベントで上記の関数を実行してサーバーに情報を送っている
対象コードが長くてどこのコードを書くかはむずいので省略

にしやまにしやま

UserTypingのWebSocket通信処理を"完全に理解した"のでまとめ

  • commands/server.goでapp.NewServer(options...)とapi4.Init(server)とwsapi.Init(server)が呼ばれ、それぞれのサーバが起動(他にも色々)
    • app.NewServer(options...)
      • 色々入ったスーパー構造体が生成される
      • この返り値をapi4.Initやwsapi.Initの引数に渡すことで2つの間でチャネルの通信などができる
      • Hub.Start()でhubを介した各種データのやり取りがループしている
        • その中でh.broadcastを受け取るとwebConn.sendにWebSocketEventを渡している箇所がある
    • api4.Init
      • api.InitWebSocket()を呼び、connectWebSocket()によって、Pump()を実行
        • wc.readPump()とwc.writePump()を実行
        • readPump()はWebSocketでクライアントからのデータを受けとる処理
        • writePump()はchannelのfor文を回してwc.sendに入ってきたらデータが入ってきたら、WebConnectionに書き込みをするように待機している(≒クライアントにデータ送信)
    • wsapi.Init
      • InitUser()によって"/user_typing"のWebSocketのエンドポイントでuserTyping()が動かされている
  • クライアントから"/user_typing"にWebSocketEventが送信される
  • userTyping()が受け取り、api.App.PublishUserTyping()として、WebSocketEventを発行して、Publishしている
    • Publishした中で、下記のようにhub.Broadcast(event)を読んで、Hubに渡している
  • Hub.Start()のチャネルのループにWebSocketEventが渡り、WebConnectionに回される
  • WebConnのwritePump()がそれを受け取り、WebSocketに書き込む
  • WebSocketに書きこれたデータがここでクライアントに受信され、ユーザーの画面に「〜が入力中です」と表示される
このスクラップは2024/05/01にクローズされました