Open9

LiveKitの本番運用に向けて気になったとこ色々調べる

SchottMan13SchottMan13

LiveKitは比較的手軽に使用できる(あくまで他のOSSと比べたらだけど)SFUだし、公式ドキュメントも充実しているのでとても助かる。
ただ、ドキュメント読んでも理解しきれない部分や本番で運用するにあたっては詳細を知っておきたいかなって部分もちょこちょこあるのでそんなところを調べてみることにする。

ちなみにバージョンは1.7.2

このスクラップに登場するコンフィグファイルはこれがベース

SchottMan13SchottMan13

疑問:LiveKitが対応しているRedisの冗長構成は制約がある?

LiveKitはRedisの接続先をコンフィグファイルに設定することで自動的にマルチノードルーティングして負荷分散できる。

# ここを設定
redis:
  address: redis.mine.net:6379
  db: 0
  username: myredis
  password: xxxxxxxxx
  # To use sentinel remove the address key above and add the following
  # sentinel_master_name: livekit
  # sentinel_addresses:
  # - livekit-redis-node-0.livekit-redis-headless:26379
  # - livekit-redis-node-1.livekit-redis-headless:26379
  # If you use a different set of credentials for sentinel add
  # sentinel_username: user
  # sentinel_password: pass

  # To use TLS with redis
  tls:
    enabled: true
    # when set to true, LiveKit will not verify the server's certificate, defaults to true
    insecure: true
    # server_name: myserver.com
    # file containing trusted root certificates for verification
    # ca_cert_file: /path/to/ca.crt
    # client_cert_file: /path/to/client.crt
    # client_key_file: /path/to/client.key

  # To use cluster remove the address key above and add the following
  # cluster_addresses:
  # - livekit-redis-node-0.livekit-redis-headless:6379
  # - livekit-redis-node-1.livekit-redis-headless:6380
  # And it will use the password key above as cluster password
  # And the db key will not be used due to cluster mode not support it.

ただし、対応可能なRedisの冗長構成は制限がありそう。

Redisには3種類の冗長構成があるが、そのうちReplicationにのみ対応していない(公式で明言されているわけではない)。

LiveKitのRedisに関わるコンフィグを見るとSentinelとClusterは設定箇所があるが、Replicationはない。本来であればReplicationはMasterに書き込み処理、Replicaに読み込み処理を分散させたいのでそれぞれAddressを設定させる箇所があって欲しいが存在しない。

type RedisConfig struct {
	Address  string `yaml:"address,omitempty"`
	Username string `yaml:"username,omitempty"`
	Password string `yaml:"password,omitempty"`
	DB       int    `yaml:"db,omitempty"`
	// Deprecated: use TLS instead of UseTLS
	UseTLS            bool         `yaml:"use_tls,omitempty"`
	TLS               *xtls.Config `yaml:"tls,omitempty"`
	MasterName        string       `yaml:"sentinel_master_name,omitempty"`
	SentinelUsername  string       `yaml:"sentinel_username,omitempty"`
	SentinelPassword  string       `yaml:"sentinel_password,omitempty"`
	SentinelAddresses []string     `yaml:"sentinel_addresses,omitempty"`
	ClusterAddresses  []string     `yaml:"cluster_addresses,omitempty"`
	DialTimeout       int          `yaml:"dial_timeout,omitempty"`
	ReadTimeout       int          `yaml:"read_timeout,omitempty"`
	WriteTimeout      int          `yaml:"write_timeout,omitempty"`
	// for clustererd mode only, number of redirects to follow, defaults to 2
	MaxRedirects *int          `yaml:"max_redirects,omitempty"`
	PoolTimeout  time.Duration `yaml:"pool_timeout,omitempty"`
	PoolSize     int           `yaml:"pool_size,omitempty"`
}

また、下記はRedisClientを初期化する部分のコードだが、見ての通りReplicationに関する記載はない。

// NewUniversalClient returns a new multi client. The type of the returned client depends
// on the following conditions:
//
// 1. If the MasterName option is specified, a sentinel-backed FailoverClient is returned.
// 2. if the number of Addrs is two or more, a ClusterClient is returned.
// 3. Otherwise, a single-node Client is returned.
func NewUniversalClient(opts *UniversalOptions) UniversalClient {
	if opts.MasterName != "" {
		return NewFailoverClient(opts.Failover())
	} else if len(opts.Addrs) > 1 {
		return NewClusterClient(opts.Cluster())
	}
	return NewClient(opts.Simple())
}

SchottMan13SchottMan13

疑問: LiveKitのProductionセットアップ手順にCaddyでTLS終端しろと書かれているが、LiveKit本体で終端はできない?

LiveKitはシグナリング用のWebSocket接続(DTLSなどのWebRTCの通信は関係ない)に対してLoadBalancerもしくはプロキシでのTLS終端を求めている
LiveKit本体では終端できないのか?

pkg/service/server.goを見る

server.goのNewLivekitServer関数内で各パスに対するハンドラを設定
server.goのStart関数内で指定されたBindAddressごとにTCPでリッスンし、その作成されたリスナーごとにs.httpServer.Serveを実行

	httpGroup := &errgroup.Group{}
	for _, ln := range listeners {
		l := ln
		httpGroup.Go(func() error {
			return s.httpServer.Serve(l)
		})
	}

ServeTLSではないのでTLS終端機能は持たず、wssでアクセスはできない(そのため、ドキュメントにはCaddyでTLS終端をしろと書いていると思われる)

SchottMan13SchottMan13

疑問:DTLSで使用する証明書の設定箇所が見当たらないがどうしている?

LiveKit内部で使用されているpionのNewPeerConnectionからNewDTLSTransportを呼び出してDTLSセッションを作成
明示的に証明書を与えられない場合は証明書を自動作成して使用する(=LiveKitにDTLSで使用する証明書を渡す必要なし)

SchottMan13SchottMan13

疑問:SDKでTrackをPublishする際に最初からMutedにできない?

Issueで聞いてみたところ、Publishの際にはいくつかパケットを送信する必要があるとのことで現状できないらしい。

SchottMan13SchottMan13

疑問:複数のLiveKitノードがある場合、RoomやParticipantの割り振りはどうなる?

マルチノードルーティングの項を見る。以下はDeepLに突っ込んだ訳

クライアントがLiveKitへのシグナル接続を確立すると、インスタンスの1つと持続的なWebSocket接続が作成されます。
そのインスタンスは、ルームがホストされているノードとクライアント間のメッセージをプロキシするシグナルブリッジとして動作します。

マルチノードのセットアップでは、LiveKitは多数の同時ルームをサポートすることができます。
ただし、今のところルームは1つのノードに収まる必要があるため、ルームの参加者数には制限があります。

とりあえず、Roomは必ず一つのLiveKitに収まっている必要があることは分かった(Roomがばらけたノードに存在すると上りは分散できても下りでコネクションが無駄に増えそうだなとは思ってたけど。LiveKit同士がWebRTC接続をバイパスできればいけるのかな)。

では、Roomはどのノードに作られるのか。これにはノードセレクターと呼ばれる機能が関わっている。
設定ファイルでは下記が該当する。

# # node selector is (possibly) for multi region deployment
# node_selector:
#   # default: any. valid values: any, sysload, cpuload, regionaware
#   kind: sysload
#   # priority used for selection of node when multiple are available
#   # default: random. valid values: random, sysload, cpuload, rooms, clients, tracks, bytespersec
#   sort_by: sysload
#   # used in sysload and regionaware
#   # do not assign room to node if load per CPU exceeds sysload_limit
#   sysload_limit: 0.7
#   # used in regionaware
#   # list of regions and their lat/lon coordinates
#   regions:
#     - name: us-west-2
#       lat: 44.19434095976287
#       lon: -123.0674908379146

kindを見るとany, sysload, cpuload, regionawareとある。これによってノード選択の大まかな戦略が決まり、その後のsort_byでさらにノードの優先度づけが行われる。
各Selectorに対するコードのコメントを訳して載せておく。

AnySelector: 利用可能なノードを制限なく選択する
CPULoadSelector: CPU使用率がCPULoadLimitより高いノードを排除し、その後、過負荷でないノードからノードを選択する
SystemLoadSelector: SysloadLimitより高いCPUあたりのノードを持つノードを排除し、その後、過負荷でないノードからノードを選択します。
RegionAwareSelector: 現在のインスタンスのリージョンに最も近い利用可能なノードを優先する。

各NodeSelector構造体にはSelectNodeという関数が備わっている。

// NodeSelector selects an appropriate node to run the current session
type NodeSelector interface {
	SelectNode(nodes []*livekit.Node) (*livekit.Node, error)
}

SelectNodeを実行すると、各Selectorは候補となりうるNodeをフィルタリングして絞り込む。そして、絞り込んだ後のノードの中からsort_byの項目でNodeをソートして最も先頭のNodeを選択する。

例. SysLoadSelector

SysloadLimitより高いCPU利用率のNodeを排除

func (s *SystemLoadSelector) filterNodes(nodes []*livekit.Node) ([]*livekit.Node, error) {
	nodes = GetAvailableNodes(nodes)
	if len(nodes) == 0 {
		return nil, ErrNoAvailableNodes
	}

	nodesLowLoad := make([]*livekit.Node, 0)
	for _, node := range nodes {
		if GetNodeSysload(node) < s.SysloadLimit {
			nodesLowLoad = append(nodesLowLoad, node)
		}
	}
	if len(nodesLowLoad) > 0 {
		nodes = nodesLowLoad
	}
	return nodes, nil
}

func (s *SystemLoadSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, error) {
	nodes, err := s.filterNodes(nodes)
	if err != nil {
		return nil, err
	}

	return SelectSortedNode(nodes, s.SortBy)
}

SelectSortedNode関数はすべてのSelecotorから共通して呼ばれる

func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) {
	if sortBy == "" {
		return nil, ErrSortByNotSet
	}

	// Return a node based on what it should be sorted by for priority
	switch sortBy {
	case "random":
		idx := funk.RandomInt(0, len(nodes))
		return nodes[idx], nil
	case "sysload":
		sort.Slice(nodes, func(i, j int) bool {
			return GetNodeSysload(nodes[i]) < GetNodeSysload(nodes[j])
		})
		return nodes[0], nil
	case "cpuload":
		sort.Slice(nodes, func(i, j int) bool {
			return nodes[i].Stats.CpuLoad < nodes[j].Stats.CpuLoad
		})
		return nodes[0], nil
	case "rooms":
		sort.Slice(nodes, func(i, j int) bool {
			return nodes[i].Stats.NumRooms < nodes[j].Stats.NumRooms
		})
		return nodes[0], nil
	case "clients":
		sort.Slice(nodes, func(i, j int) bool {
			return nodes[i].Stats.NumClients < nodes[j].Stats.NumClients
		})
		return nodes[0], nil
	case "tracks":
		sort.Slice(nodes, func(i, j int) bool {
			return nodes[i].Stats.NumTracksIn+nodes[i].Stats.NumTracksOut < nodes[j].Stats.NumTracksIn+nodes[j].Stats.NumTracksOut
		})
		return nodes[0], nil
	case "bytespersec":
		sort.Slice(nodes, func(i, j int) bool {
			return nodes[i].Stats.BytesInPerSec+nodes[i].Stats.BytesOutPerSec < nodes[j].Stats.BytesInPerSec+nodes[j].Stats.BytesOutPerSec
		})
		return nodes[0], nil
	default:
		return nil, ErrSortByUnknown
	}
}

SchottMan13SchottMan13

疑問:自前でホストしているLiveKitの負荷試験どうやる?

ヘッドレスブラウザで頑張って殴るか、LiveKitが提供してくれているCLIツールを使う方法がある。

前者はtestRTCのようなサービスもあるがもちろん有料
後者は無料で使用できるしApache License2.0なので改変も可能。ただし機能はシンプル。

以下の仕様で問題ないのならCLIツールで良い

  • 1つのRoomに対してAudio/Video PublisherとSubscriberの数をそれぞれ指定する
  • 指定された時間の間Publisherは音声/映像データを配信し続け、Subscriberはそれを受信し続ける
  • コードを読む感じ非常にシンプル。Participantの数はmax(AudioPublisher, VideoPublisher) + Subscriberとなり、各Publisherはあらかじめ用意されているダミーデータを指定時間中に永遠にループして配信し続ける。音声配信中はアクティブスピーカーが切り替わったことをシミュレートしている。

公式の内容まんまだけどこんな感じで実行できる(音声配信テストシナリオ)

lk load-test \
  --url <YOUR-SERVER-URL> \
  --api-key <YOUR-KEY> \
  --api-secret <YOUR-SECRET> \
  --room load-test \
  --audio-publishers 10 \
  --subscribers 10 \
  --duration 1m

結果

Summary | Tester | Tracks | Bitrate                 | Total Dropped | Error
        | Sub 10 | 6/10   | 121.7kbps               | 0 (0%)        | -
        | Sub 11 | 6/10   | 121.7kbps               | 0 (0%)        | -
        | Sub 12 | 6/10   | 121.6kbps               | 0 (0%)        | -
        | Sub 13 | 6/10   | 121.5kbps               | 0 (0%)        | -
        | Sub 14 | 6/10   | 121.4kbps               | 0 (0%)        | -
        | Sub 15 | 6/10   | 121.5kbps               | 0 (0%)        | -
        | Sub 16 | 6/10   | 119.5kbps               | 0 (0%)        | -
        | Sub 17 | 6/10   | 121.1kbps               | 0 (0%)        | -
        | Sub 18 | 6/10   | 121.2kbps               | 0 (0%)        | -
        | Sub 19 | 6/10   | 120.8kbps               | 0 (0%)        | -
        | Total  | 60/100 | 1.2mbps (119.5kbps avg) | 0 (0%)        | 0

1つのRoomしかシミュレートできないのが要件に合致しなかったので、自分はCLIツールをフォークして複数Roomをシミュレートできるようにした。
master - worker構成でworkerにCLIツールを配置し、mastarはworkerの同期取りや死活監視、テスト実行データの収集と集約する程度のシンプルなものではあるが。

SchottMan13SchottMan13

疑問:STUNサーバーはデフォルト設定がある?

下記のstun_serversを設定しない場合、どういった挙動になる?

# WebRTC configuration
rtc:
  # node_ip: 106.168.167.96
  # UDP ports to use for client traffic.
  # this port range should be open for inbound traffic on the firewall
  port_range_start: 40000
  port_range_end: 65535
  # when set, LiveKit enable WebRTC ICE over TCP when UDP isn't available
  # this port *cannot* be behind load balancer or TLS, and must be exposed on the node
  # WebRTC transports are encrypted and do not require additional encryption
  # only 80/443 on public IP are allowed if less than 1024
  tcp_port: 7881
  # when set to true, attempts to discover the host's public IP via STUN
  # this is useful for cloud environments such as AWS & Google where hosts have an internal IP
  # that maps to an external one
  use_external_ip: true
  # # when set, LiveKit will attempt to use a UDP mux so all UDP traffic goes through
  # # listed port(s). To maximize system performance, we recommend using a range of ports
  # # greater or equal to the number of vCPUs on the machine.
  # # port_range_start & end must not be set for this config to take effect
  # udp_port: 7882-7892
  # # when set to true, server will use a lite ice agent, that will speed up ice connection, but
  # # might cause connect issue if server running behind NAT.
  # use_ice_lite: true
  # # optional STUN servers for LiveKit clients to use. Clients will be configured to use these STUN servers automatically.
  # # by default LiveKit clients use Google's public STUN servers
  # stun_servers:    <-- ここ⭐️
  #    - 

調べたところ、STUNサーバーが明示されない場合はGoogleやtwilioのデフォルトSTUNサーバー群を使用している。

// pkg/config.go

var DefaultStunServers = []string{
	"global.stun.twilio.com:3478",
	"stun.l.google.com:19302",
	"stun1.l.google.com:19302",
}

具体的にどこでこれらのSTUNサーバーは使用されるのか?
まず一箇所目はLiveKit自体のIP(NodeIP)を決めるタイミング。
こいつはLiveKitの起動時にコンフィグをセットしているがその時に実行される。
NodeIPは環境変数やコンフィグファイルから明示的に指定することもできるが、use_external_ipをTrueにしておくことでSTUNを使用して取得させることもできる。
コンフィグのstun_serversはその際に利用されるし、指定がなければ上記のデフォルトが利用される。

// pkg/config.go

func (conf *RTCConfig) Validate(development bool) error {
	// set defaults for ports if none are set
	if !conf.UDPPort.Valid() && conf.ICEPortRangeStart == 0 {
		// to make it easier to run in dev mode/docker, default to single port
		if development {
			conf.UDPPort = PortRange{Start: 7882}
		} else {
			conf.ICEPortRangeStart = 50000
			conf.ICEPortRangeEnd = 60000
		}
	}

	var err error
	if conf.NodeIP == "" || conf.UseExternalIP {
		conf.NodeIP, err = conf.determineIP()
		if err != nil {
			logger.Warnw("could not determine node ip", err)
			return err
		}
		conf.NodeIPAutoGenerated = true
	}

	return nil
}

二箇所目はLiveKit内のParticipantが使用するICEServersを指定する箇所。
ICE Liteを使用していない・NodeIPを明示的に指定していない・use_external_ipオプションをTrueにしていなければstun_serversをICEサーバーとして使用する。

func NewWebRTCConfig(rtcConf *RTCConfig, development bool) (*WebRTCConfig, error) {
...
..
	if rtcConf.UseICELite {
		s.SetLite(true)
	} else if (rtcConf.NodeIP == "" || rtcConf.NodeIPAutoGenerated) && !rtcConf.UseExternalIP {
		// use STUN servers for server to support NAT
		// when deployed in production, we expect UseExternalIP to be used, and ports accessible
		// this is not compatible with ICE Lite
		// Do not automatically add STUN servers if nodeIP is set
		if len(rtcConf.STUNServers) > 0 {
			c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(rtcConf.STUNServers)}
		} else {
			c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(DefaultStunServers)}
		}
	}
...
..

SchottMan13SchottMan13

疑問:公式にはオンプレとKubernetesしかSelfHostingの例がないけどそれ以外でも運用できる?

もちろんできる。
ECS on EC2をhostネットワークモードで運用した。
イメージは公式のものをベースにしてLiveKit実行用ユーザを作成し、そのユーザで実行する(そのまま使うとrootで使うことになる)。
全てのEC2にはパブリックIPを付与する(起動中にIPが固定されていれば良いのでElasticIPである必要はない)。
また、コンフィグのuse_external_ipをtrueにしておく。それ以外の設定はこことかこことか見る。
冗長化が必要な場合はRedisを立ち上げる(ElastiCache for Redisも可)とともに、シグナリング用のWebSocketをLoadBalancingするためにALBも立てる(ここでTLS終端。ドキュメントのCaddyの代わり)。