Mattermostのソースコード追ってみる
目的:
GoのOSSのコードを読むこと
Mattermostとは:
Slackクローンのチャットアプリ
前職でも使っていたくらい結構出来の良いちゃんとした(スマホ/Web/デスクトップ)アプリです
技術スタック:
API: Go
Webフロント: React
デスクトップ: Electron
モバイルアプリ: React Native
メインで読むのはAPIとWebフロントになると思います
前職の同僚のエンジニアと輪読会シリーズ第二弾でGoのOSS読む会として一緒にやってます
「〜が入力しています」のイベントを検知する仕組み
フロントエンドのコードから探す
webapp/channels/src/i18n/ja.json
"msg_typing.isTyping": "{user}が入力しています..."
が見つかる
webapp/channels/src/components/msg_typing/msg_typing.tsx
return (
<FormattedMessage
id='msg_typing.isTyping'
defaultMessage='{user} is typing...'
values={{
user: users[0],
}}
/>
);
ここで呼ばれている
このファイル内でuseWebSocket
を読んでいる箇所がある
ここにおそらくどうサーバを読んでいるかの定義もあるのでそれを探す
webapp/channels/src/utils/use_websocket/hooks.ts
export function useWebSocket({handler}: UseWebSocketOptions) {
const wsClient = useWebSocketClient();
useEffect(() => {
wsClient.addMessageListener(handler);
return () => {
wsClient.removeMessageListener(handler);
};
}, [wsClient, handler]);
}
hooks.tsからwsClient
の定義元のwebapp/platform/client/src/websocket.tsに飛び、initialize(connectionUrl = this.connectionUrl, token?: string)
関数で接続先のURLも設定する
this.conn = new WebSocket(`${connectionUrl}?connection_id=${this.connectionId}&sequence_number=${this.serverSequence}`);
WebSocketClient.initialize()を読んでいる箇所を探す
webapp/channels/src/actions/websocket_actions.jsxで読んでいる
WebSocketClient.initialize(connUrl); // 182行目
connUrlの定義は
let connUrl = '';
if (config.WebsocketURL) {
connUrl = config.WebsocketURL;
} else {
connUrl = new URL(getSiteURL());
// replace the protocol with a websocket one
if (connUrl.protocol === 'https:') {
connUrl.protocol = 'wss:';
} else {
connUrl.protocol = 'ws:';
}
// append a port number if one isn't already specified
if (!(/:\d+$/).test(connUrl.host)) {
if (connUrl.protocol === 'wss:') {
connUrl.host += ':' + config.WebsocketSecurePort;
} else {
connUrl.host += ':' + config.WebsocketPort;
}
}
connUrl = connUrl.toString();
}
// Strip any trailing slash before appending the pathname below.
if (connUrl.length > 0 && connUrl[connUrl.length - 1] === '/') {
connUrl = connUrl.substring(0, connUrl.length - 1);
}
connUrl += Client4.getUrlVersion() + '/websocket';
ここから、{config.WebsocketURL}/api/v4/websocket
にアクセスしてるとわかった
次はAPIの方を見に行く
server/channels/api4/websocket.go
にそれらしいファイルを発見
が、ここではコネクション確立以外は大したことをやっていないのでいまいちわからなかった。
再度フロントの方に戻ってみる。
webapp/channels/src/components/msg_typing/msg_typing.tsxに戻ってどうやってサーバにタイピング中の情報を送っているのか調べる
下記のgetTypingTextのprops.typingUsersにデータがあればフロントにタイピング中と表示できるようになっていそう。
const getTypingText = () => {
let users: string[] = [];
let numUsers = 0;
if (props.typingUsers) {
users = [...props.typingUsers];
numUsers = users.length;
}
if (numUsers === 0) {
return '';
}
// 中略
return (
<FormattedMessage
id='msg_typing.areTyping'
defaultMessage='{users} and {last} are typing...'
values={{
users: (users.join(', ')),
last,
}}
/>
);
};
でもこのコンポーネントの呼び出し元ではprops.typingUsersを渡していなかった。
webapp/channels/src/components/advanced_text_editor/advanced_text_editor.tsx
<MsgTyping
channelId={channelId}
postId={postId}
/>
webapp/channels/src/components/msg_typing/index.ts
でStateに登録してコンポーネントに渡してるっぽい
connectって関数はreduxに備わってるやつみたい
function makeMapStateToProps() {
const getUsersTypingByChannelAndPost = makeGetUsersTypingByChannelAndPost();
return function mapStateToProps(state: GlobalState, ownProps: OwnProps) {
const typingUsers = getUsersTypingByChannelAndPost(state, {channelId: ownProps.channelId, postId: ownProps.postId});
return {
typingUsers,
};
};
}
const mapDispatchToProps = {
userStartedTyping,
userStoppedTyping,
};
export default connect(makeMapStateToProps, mapDispatchToProps)(MsgTyping);
上のuserStartedTyping
の定義元
dispatch()もreduxの関数らしい
ここではストアにタイピング中イベントの登録と一定時間後にストップのイベントを登録することで実質的にイベントの有効期限を設けている
export function userStartedTyping(userId: string, channelId: string, rootId: string, now: number) {
return (dispatch: DispatchFunc, getState: GetStateFunc) => {
const state = getState();
if (
isPerformanceDebuggingEnabled(state) &&
getBool(state, Preferences.CATEGORY_PERFORMANCE_DEBUGGING, Preferences.NAME_DISABLE_TYPING_MESSAGES)
) {
return;
}
dispatch({
type: WebsocketEvents.TYPING,
data: {
id: channelId + rootId,
userId,
now,
},
});
// Ideally this followup loading would be done by someone else
dispatch(fillInMissingInfo(userId));
setTimeout(() => {
dispatch(userStoppedTyping(userId, channelId, rootId, now));
}, getTimeBetweenTypingEvents(state));
};
}
webapp/channels/src/components/msg_typing/msg_typing.tsxにもう一度戻る
useWebSocketとして登録したhandlerがどう使われるのかよくわかってないので追ってみる
useWebSocket({
handler: useCallback((msg: WebSocketMessage) => {
if (msg.event === SocketEvents.TYPING) {
const channelId = msg.broadcast.channel_id;
const rootId = msg.data.parent_id;
const userId = msg.data.user_id;
if (props.channelId === channelId && props.postId === rootId) {
userStartedTyping(userId, channelId, rootId, Date.now());
}
} else if (msg.event === SocketEvents.POSTED) {
const post = JSON.parse(msg.data.post);
const channelId = post.channel_id;
const rootId = post.root_id;
const userId = post.user_id;
if (props.channelId === channelId && props.postId === rootId) {
userStoppedTyping(userId, channelId, rootId, Date.now());
}
}
}, [props.channelId, props.postId, userStartedTyping, userStoppedTyping]),
});
useWebSocketの定義元
webapp/channels/src/utils/use_websocket/hooks.ts
wsClient(見るからにWebSocketのクライアント)にaddMessageListenerとしてhandlerを登録してる
これがどう使われるのか?
export function useWebSocket({handler}: UseWebSocketOptions) {
const wsClient = useWebSocketClient();
useEffect(() => {
wsClient.addMessageListener(handler);
return () => {
wsClient.removeMessageListener(handler);
};
}, [wsClient, handler]);
}
webapp/platform/client/src/websocket.ts
messageListeners
というところに追加している
WebSocketMessage
としてイベントや実際に受け取ったデータが入ってくる様子
export type MessageListener = (msg: WebSocketMessage) => void;
private messageListeners = new Set<MessageListener>();
export type WebSocketMessage<T = any> = {
event: string;
data: T;
broadcast: WebSocketBroadcast;
seq: number;
};
// 中略
export default class WebSocketClient {
// 中略
addMessageListener(listener: MessageListener) {
this.messageListeners.add(listener);
if (this.messageListeners.size > 5) {
// eslint-disable-next-line no-console
console.warn(
`WebSocketClient has ${this.messageListeners.size} message listeners registered`
);
}
}
}
実際に受け取ってくるところも確認したい
export default class WebSocketClient {
initialize(connectionUrl = this.connectionUrl, token?: string) {
// 中略
this.conn = new WebSocket(`${connectionUrl}?connection_id=${this.connectionId}&sequence_number=${this.serverSequence}`);
// 中略
this.conn.onmessage = (evt) => {
const msg = JSON.parse(evt.data);
// たくさん中略
this.messageListeners.forEach((listener) => listener(msg));
};
}
}
WebSocketClient
のinitialize
の中でonmessageイベントに対して処理を登録してる
ここでmessageListeners
の個々のデータ(=wsClient.addMessageListener(handler);
でaddしたhandler)をループしてmsg(=WebSocketで受け取ったデータ)を引数に関数として実行している。
onmessageはJavaScriptに備わっているWebSocketのイベント
フロント側でどうやって入力中のイベントを受け取って反映させているかは完全に理解したのでもう一度APIの方に戻ってみる
先ほど見ていたserver/channels/api4/websocket.goはそんなに長くない
func (api *API) InitWebSocket() {
// Optionally supports a trailing slash
api.BaseRoutes.APIRoot.Handle("/{websocket:websocket(?:\\/)?}", api.APIHandlerTrustRequester(connectWebSocket)).Methods("GET")
}
func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: model.SocketMaxMessageSizeKb,
WriteBufferSize: model.SocketMaxMessageSizeKb,
CheckOrigin: c.App.OriginChecker(),
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", nil, err.Error(), http.StatusBadRequest)
return
}
// We initialize webconn with all the necessary data.
// If the queues are empty, they are initialized in the constructor.
cfg := &platform.WebConnConfig{
WebSocket: ws,
Session: *c.AppContext.Session(),
TFunc: c.AppContext.T,
Locale: "",
Active: true,
}
cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {
// If not present, we assume client is not capable yet, or it's a fresh connection.
// We just create a new ID.
cfg.ConnectionID = model.NewId()
// In case of fresh connection id, sequence number is already zero.
} else {
cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))
if err != nil {
c.Logger.Warn("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
ws.Close()
return
}
}
wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
if c.AppContext.Session().UserId != "" {
c.App.Srv().Platform().HubRegister(wc)
}
wc.Pump()
}
ざっとみる感じ、
-
ws, err := upgrader.Upgrade(w, r, nil)
でhttpをWebSocketにUpgradeする -
wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
でおそらくWebSocketのインスタンス生成 -
wc.Pump()
でコネクションを張る
さっきは見落としていたwc.Pump()
が肝っぽいのでこれを追ってみる
server/channels/app/platform/web_conn.go
ぱっと見よくわからんけど並行処理でWebSocketのリクエストを発行しているように見える
コメントのsend/receiveというあたりからもこの辺で送信と受信の処理をやっていそう
// Pump starts the WebConn instance. After this, the websocket
// is ready to send/receive messages.
func (wc *WebConn) Pump() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wc.writePump()
}()
wg.Add(1)
go wc.pluginPostedConsumer(&wg)
wc.readPump()
close(wc.endWritePump)
close(wc.pluginPosted)
wg.Wait()
wc.Platform.HubUnregister(wc)
close(wc.pumpFinished)
userID := wc.UserId
wc.Platform.Go(func() {
wc.HookRunner.RunMultiHook(func(hooks plugin.Hooks) bool {
hooks.OnWebSocketDisconnect(wc.GetConnectionID(), userID)
return true
}, plugin.OnWebSocketDisconnectID)
})
}
とりあえず今回はここまで
次回は来週の木曜21:00~22:00の予定(今日21:00~22:00の予定だったのに0時近くまでやってしまった)
server/channels/app/platform/web_conn.go
送信と受信の処理を追ってみる
今回はPump()の中で実行されているwritePump()とreadPump()について。
writePump()
(waitGroupに1addした後)最初にwritePump()を実行している
go func() {
defer wg.Done()
wc.writePump()
}()
ここでは下記を主に行なっている
- deadQueueに関連する処理
- deadQueueは送信するメッセージを入れる箇所っぽい
- for文のselect case文でチャネルからデータが送られてくるのを待機している
- チャネルから送られてきたらwc.writeMessageBuf()でコネクションにメッセージを書き込んでクライアントに送っている
- writeMessageBuf()の中ではgorilla.WebSocketのライブラリを使ってる
- goroutineの中でwritePump()を実行することで書き込み待ちのループを並行処理で実施
readPump()
- readLimitやreadDeadlineを設定している
- 一度に受信するメッセージサイズの上限やPong(WebSocket接続中にクライアント/サーバ間でリクエストをやりとりする時に送り合う応答確認のようなものらしい)の有効期限を設定
- for文で無条件ループし、コネクションが切れるまで(=この処理が終了するまで)WebSocketからのメッセージを受信する
for {
msgType, rd, err := wc.WebSocket.NextReader()
if err != nil {
wc.logSocketErr("websocket.NextReader", err)
return
}
var decoder interface {
Decode(v any) error
}
if msgType == websocket.TextMessage {
decoder = json.NewDecoder(rd)
} else {
decoder = msgpack.NewDecoder(rd)
}
var req model.WebSocketRequest
if err = decoder.Decode(&req); err != nil {
wc.logSocketErr("websocket.Decode", err)
return
}
// Messages which actions are prefixed with the plugin prefix
// should only be dispatched to the plugins
if !strings.HasPrefix(req.Action, websocketMessagePluginPrefix) {
wc.Platform.WebSocketRouter.ServeWebSocket(wc, &req)
}
clonedReq, err := req.Clone()
if err != nil {
wc.logSocketErr("websocket.cloneRequest", err)
continue
}
wc.pluginPosted <- pluginWSPostedHook{wc.GetConnectionID(), wc.UserId, clonedReq}
}
この2つでWebSocketを介してメッセージを送受信していることはわかった
来週はwritePump()の中でselect caseでチャネルから受信しているメッセージがどこから渡されているのかなどを調べたい
@Kiyo_Karl2さんのメモ
Socket処理
server/channels/app/server.go:275 でStart()を呼んでいる
func (ps *PlatformService) Start(broadcastHooks map[string]BroadcastHook) error {
ps.hubStart(broadcastHooks)
ps.configListenerId = ps.AddConfigListener(func(_, _ *model.Config) {
ps.regenerateClientConfig()
message := model.NewWebSocketEvent(model.WebsocketEventConfigChanged, "", "", "", nil, "")
message.Add("config", ps.ClientConfigWithComputed())
ps.Go(func() {
ps.Publish(message)
})
if err := ps.ReconfigureLogger(); err != nil {
mlog.Error("Error re-configuring logging after config change", mlog.Err(err))
return
}
})
ps.licenseListenerId = ps.AddLicenseListener(func(oldLicense, newLicense *model.License) {
ps.regenerateClientConfig()
message := model.NewWebSocketEvent(model.WebsocketEventLicenseChanged, "", "", "", nil, "")
message.Add("license", ps.GetSanitizedClientLicense())
ps.Go(func() {
ps.Publish(message)
})
})
return nil
}
このhubStart関数は、WebSocket通信を管理するためのHubオブジェクトを複数生成し、起動するための処理を含んでいます。この関数はPlatformServiceという構造体のメソッドであり、WebSocketサーバーの一部として機能している。
func (ps *PlatformService) hubStart(broadcastHooks map[string]BroadcastHook) {
// Total number of hubs is twice the number of CPUs.
numberOfHubs := runtime.NumCPU() * 2
ps.logger.Info("Starting websocket hubs", mlog.Int("number_of_hubs", numberOfHubs))
hubs := make([]*Hub, numberOfHubs)
for i := 0; i < numberOfHubs; i++ {
hubs[i] = newWebHub(ps)
hubs[i].connectionIndex = i
hubs[i].broadcastHooks = broadcastHooks
hubs[i].Start()
}
// Assigning to the hubs slice without any mutex is fine because it is only assigned once
// during the start of the program and always read from after that.
ps.hubs = hubs
}
server/channels/app/platform/web_hub.go:370 でhubを起動してチャネルを待機している
server/channels/api4/websocket.go:26 でhubが登録される
func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: model.SocketMaxMessageSizeKb,
WriteBufferSize: model.SocketMaxMessageSizeKb,
CheckOrigin: c.App.OriginChecker(),
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", nil, err.Error(), http.StatusBadRequest)
return
}
// We initialize webconn with all the necessary data.
// If the queues are empty, they are initialized in the constructor. cfg := &platform.WebConnConfig{
WebSocket: ws,
Session: *c.AppContext.Session(),
TFunc: c.AppContext.T,
Locale: "",
Active: true,
}
cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {
// If not present, we assume client is not capable yet, or it's a fresh connection.
// We just create a new ID. cfg.ConnectionID = model.NewId()
// In case of fresh connection id, sequence number is already zero.
} else {
cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))
if err != nil {
c.Logger.Warn("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
ws.Close()
return
}
}
wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
if c.AppContext.Session().UserId != "" {
c.App.Srv().Platform().HubRegister(wc)
}
wc.Pump()
}
↑を見つつ、
channels/app/platform/web_hub.go を見ていた
Start関数でゴルーチンを回してコネクションにアクションが発生したらそれを伝達する処理を実行していた
// Start starts the hub.
func (h *Hub) Start() {
var doStart func()
var doRecoverableStart func()
var doRecover func()
doStart = func() {
mlog.Debug("Hub is starting", mlog.Int("index", h.connectionIndex))
ticker := time.NewTicker(inactiveConnReaperInterval)
defer ticker.Stop()
connIndex := newHubConnectionIndex(inactiveConnReaperInterval)
for {
select {
case webSessionMessage := <-h.checkRegistered:
conns := connIndex.ForUser(webSessionMessage.userID)
var isRegistered bool
for _, conn := range conns {
if !conn.active.Load() {
continue
}
if conn.GetSessionToken() == webSessionMessage.sessionToken {
isRegistered = true
}
}
webSessionMessage.isRegistered <- isRegistered
case req := <-h.checkConn:
var res *CheckConnResult
conn := connIndex.RemoveInactiveByConnectionID(req.userID, req.connectionID)
if conn != nil {
res = &CheckConnResult{
ConnectionID: req.connectionID,
UserID: req.userID,
ActiveQueue: conn.send,
DeadQueue: conn.deadQueue,
DeadQueuePointer: conn.deadQueuePointer,
ReuseCount: conn.reuseCount + 1,
}
}
req.result <- res
case <-ticker.C:
connIndex.RemoveInactiveConnections()
case webConn := <-h.register:
// Mark the current one as active.
// There is no need to check if it was inactive or not,
// we will anyways need to make it active.
webConn.active.Store(true)
connIndex.Add(webConn)
atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))
if webConn.IsAuthenticated() && webConn.reuseCount == 0 {
// The hello message should only be sent when the reuseCount is 0.
// i.e in server restart, or long timeout, or fresh connection case.
// In case of seq number not found in dead queue, it is handled by
// the webconn write pump.
webConn.send <- webConn.createHelloMessage()
}
case webConn := <-h.unregister:
// If already removed (via queue full), then removing again becomes a noop.
// But if not removed, mark inactive.
webConn.active.Store(false)
atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))
if webConn.UserId == "" {
continue
}
conns := connIndex.ForUser(webConn.UserId)
if len(conns) == 0 || areAllInactive(conns) {
userID := webConn.UserId
h.platform.Go(func() {
h.platform.SetStatusOffline(userID, false)
})
continue
}
var latestActivity int64
for _, conn := range conns {
if !conn.active.Load() {
continue
}
if conn.lastUserActivityAt > latestActivity {
latestActivity = conn.lastUserActivityAt
}
}
if h.platform.isUserAway(latestActivity) {
userID := webConn.UserId
h.platform.Go(func() {
h.platform.SetStatusLastActivityAt(userID, latestActivity)
})
}
case userID := <-h.invalidateUser:
for _, webConn := range connIndex.ForUser(userID) {
webConn.InvalidateCache()
}
case activity := <-h.activity:
for _, webConn := range connIndex.ForUser(activity.userID) {
if !webConn.active.Load() {
continue
}
if webConn.GetSessionToken() == activity.sessionToken {
webConn.lastUserActivityAt = activity.activityAt
}
}
case directMsg := <-h.directMsg:
if !connIndex.Has(directMsg.conn) {
continue
}
select {
case directMsg.conn.send <- directMsg.msg:
default:
// Don't log the warning if it's an inactive connection.
if directMsg.conn.active.Load() {
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", directMsg.conn.UserId),
mlog.String("conn_id", directMsg.conn.GetConnectionID()))
}
close(directMsg.conn.send)
connIndex.Remove(directMsg.conn)
}
case msg := <-h.broadcast:
if metrics := h.platform.metricsIFace; metrics != nil {
metrics.DecrementWebSocketBroadcastBufferSize(strconv.Itoa(h.connectionIndex), 1)
}
// Remove the broadcast hook information before precomputing the JSON so that those aren't included in it
msg, broadcastHooks, broadcastHookArgs := msg.WithoutBroadcastHooks()
msg = msg.PrecomputeJSON()
broadcast := func(webConn *WebConn) {
if !connIndex.Has(webConn) {
return
}
if webConn.ShouldSendEvent(msg) {
select {
case webConn.send <- h.runBroadcastHooks(msg, webConn, broadcastHooks, broadcastHookArgs):
default:
// Don't log the warning if it's an inactive connection.
if webConn.active.Load() {
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", webConn.UserId),
mlog.String("conn_id", webConn.GetConnectionID()))
}
close(webConn.send)
connIndex.Remove(webConn)
}
}
}
if connID := msg.GetBroadcast().ConnectionId; connID != "" {
if webConn := connIndex.byConnectionId[connID]; webConn != nil {
broadcast(webConn)
continue
}
} else if msg.GetBroadcast().UserId != "" {
candidates := connIndex.ForUser(msg.GetBroadcast().UserId)
for _, webConn := range candidates {
broadcast(webConn)
}
continue
}
candidates := connIndex.All()
for webConn := range candidates {
broadcast(webConn)
}
case <-h.stop:
for webConn := range connIndex.All() {
webConn.Close()
h.platform.SetStatusOffline(webConn.UserId, false)
}
h.explicitStop = true
close(h.didStop)
return
}
}
}
doRecoverableStart = func() {
defer doRecover()
doStart()
}
doRecover = func() {
if !h.explicitStop {
if r := recover(); r != nil {
mlog.Error("Recovering from Hub panic.", mlog.Any("panic", r))
} else {
mlog.Error("Webhub stopped unexpectedly. Recovering.")
}
mlog.Error(string(debug.Stack()))
go doRecoverableStart()
}
}
go doRecoverableStart()
}
この中でconn.send
のチャネルにデータを入れているところが、WebSocketでクライアントにメッセージを送ってる箇所として怪しい
で探したが、それっぽいのは見つからず、代わりにmodel.WebsocketEventTyping
といういかにもな型を見つけた
type WebsocketEventType string
const (
WebsocketEventTyping WebsocketEventType = "typing"
)
これの使用箇所を探すと、channels/app/user.go でPublishUserTypingという関数があった
func (a *App) PublishUserTyping(userID, channelID, parentId string) *model.AppError {
omitUsers := make(map[string]bool, 1)
omitUsers[userID] = true
event := model.NewWebSocketEvent(model.WebsocketEventTyping, "", channelID, "", omitUsers, "")
event.Add("parent_id", parentId)
event.Add("user_id", userID)
a.Publish(event)
return nil
}
channels/wsapi/user.go で呼んでいる
func (api *API) InitUser() {
api.Router.Handle("user_typing", api.APIWebSocketHandler(api.userTyping))
api.Router.Handle("user_update_active_status", api.APIWebSocketHandler(api.userUpdateActiveStatus))
}
func (api *API) userTyping(req *model.WebSocketRequest) (map[string]any, *model.AppError) {
api.App.ExtendSessionExpiryIfNeeded(request.EmptyContext(api.App.Log()), &req.Session)
if api.App.Srv().Platform().Busy.IsBusy() {
// this is considered a non-critical service and will be disabled when server busy.
return nil, NewServerBusyWebSocketError(req.Action)
}
var ok bool
var channelId string
if channelId, ok = req.Data["channel_id"].(string); !ok || !model.IsValidId(channelId) {
return nil, NewInvalidWebSocketParamError(req.Action, "channel_id")
}
if !api.App.SessionHasPermissionToChannel(request.EmptyContext(api.App.Log()), req.Session, channelId, model.PermissionCreatePost) {
return nil, NewInvalidWebSocketParamError(req.Action, "channel_id")
}
var parentId string
if parentId, ok = req.Data["parent_id"].(string); !ok {
parentId = ""
}
appErr := api.App.PublishUserTyping(req.Session.UserId, channelId, parentId)
return nil, appErr
}
フロント側でuser_typing
が使われている箇所を探す
platform/client/src/websocket.ts
userTyping(channelId: string, parentId: string, callback?: () => void) {
const data = {
channel_id: channelId,
parent_id: parentId,
};
this.sendMessage("user_typing", data, callback);
}
channels/src/actions/global_actions.tsx で呼ばれている
export function emitLocalUserTypingEvent(channelId: string, parentPostId: string) {
const userTyping = async (actionDispatch: DispatchFunc, actionGetState: GetStateFunc) => {
const state = actionGetState();
const config = getConfig(state);
if (
isPerformanceDebuggingEnabled(state) &&
getBool(state, Preferences.CATEGORY_PERFORMANCE_DEBUGGING, Preferences.NAME_DISABLE_TYPING_MESSAGES)
) {
return {data: false};
}
const t = Date.now();
const stats = getCurrentChannelStats(state);
const membersInChannel = stats ? stats.member_count : 0;
const timeBetweenUserTypingUpdatesMilliseconds = Utils.stringToNumber(config.TimeBetweenUserTypingUpdatesMilliseconds);
const maxNotificationsPerChannel = Utils.stringToNumber(config.MaxNotificationsPerChannel);
if (((t - lastTimeTypingSent) > timeBetweenUserTypingUpdatesMilliseconds) &&
(membersInChannel < maxNotificationsPerChannel) && (config.EnableUserTypingMessages === 'true')) {
WebSocketClient.userTyping(channelId, parentPostId);
lastTimeTypingSent = t;
}
return {data: true};
};
return dispatch(userTyping);
}
channels/src/components/advanced_create_post/advanced_create_post.tsx で呼ばれている
ここがユーザーが入力用の画面を作ってるぽくて、入力時にユーザータイピングしていたらuser_typing
のエンドポイントを叩いて(? 叩くではなくWebSocketのコネクションに送るだけかも)、サーバーにタイピング中の情報を送っているぽい
postMsgKeyPress = (e: React.KeyboardEvent<TextboxElement>) => {
const {ctrlSend, codeBlockOnCtrlEnter} = this.props;
const {allowSending, withClosedCodeBlock, ignoreKeyPress, message} = postMessageOnKeyPress(
e,
this.state.message,
Boolean(ctrlSend),
Boolean(codeBlockOnCtrlEnter),
Date.now(),
this.lastChannelSwitchAt,
this.state.caretPosition,
) as {
allowSending: boolean;
withClosedCodeBlock?: boolean;
ignoreKeyPress?: boolean;
message?: string;
};
if (ignoreKeyPress) {
e.preventDefault();
e.stopPropagation();
return;
}
if (allowSending && this.isValidPersistentNotifications()) {
if (e.persist) {
e.persist();
}
if (this.textboxRef.current) {
this.isDraftSubmitting = true;
this.textboxRef.current.blur();
}
if (withClosedCodeBlock && message) {
this.setState({message}, () => this.handleSubmit(e));
} else {
this.handleSubmit(e);
}
this.setShowPreview(false);
}
this.emitTypingEvent();
};
emitTypingEvent = () => {
const channelId = this.props.currentChannel.id;
GlobalActions.emitLocalUserTypingEvent(channelId, '');
};
channels/src/components/advanced_text_editor/advanced_text_editor.tsx でテキストボックスのonKeyPressのイベントで上記の関数を実行してサーバーに情報を送っている
対象コードが長くてどこのコードを書くかはむずいので省略
UserTypingのWebSocket通信処理を"完全に理解した"のでまとめ
- commands/server.goでapp.NewServer(options...)とapi4.Init(server)とwsapi.Init(server)が呼ばれ、それぞれのサーバが起動(他にも色々)
- app.NewServer(options...)
- 色々入ったスーパー構造体が生成される
- この返り値をapi4.Initやwsapi.Initの引数に渡すことで2つの間でチャネルの通信などができる
- Hub.Start()でhubを介した各種データのやり取りがループしている
- その中でh.broadcastを受け取るとwebConn.sendにWebSocketEventを渡している箇所がある
- api4.Init
- api.InitWebSocket()を呼び、connectWebSocket()によって、Pump()を実行
- wc.readPump()とwc.writePump()を実行
- readPump()はWebSocketでクライアントからのデータを受けとる処理
- writePump()はchannelのfor文を回してwc.sendに入ってきたらデータが入ってきたら、WebConnectionに書き込みをするように待機している(≒クライアントにデータ送信)
- api.InitWebSocket()を呼び、connectWebSocket()によって、Pump()を実行
- wsapi.Init
- InitUser()によって
"/user_typing"
のWebSocketのエンドポイントでuserTyping()が動かされている
- InitUser()によって
- app.NewServer(options...)
- クライアントから
"/user_typing"
にWebSocketEventが送信される- 前回追っていた箇所
- userTyping()が受け取り、api.App.PublishUserTyping()として、WebSocketEventを発行して、Publishしている
- Publishした中で、下記のようにhub.Broadcast(event)を読んで、Hubに渡している
- Hub.Start()のチャネルのループにWebSocketEventが渡り、WebConnectionに回される
- WebConnのwritePump()がそれを受け取り、WebSocketに書き込む
- WebSocketに書きこれたデータがここでクライアントに受信され、ユーザーの画面に「〜が入力中です」と表示される