mediasoup で録画する方法
この記事は Rust アドベントカレンダー 2023 の19日目の記事です。
mediasoup で録画する方法
この記事では mediasoup の Rust 実装を使って、ビデオチャットに参加している参加者の映像を録画する方法について紹介します。[1]
もし間違いがあればコメントで指摘いただければ幸いです。
サンプルプロジェクト
上のリポジトリを clone して、 README に従ってプロジェクトを起動すると、下のような画面が表示され、ビデオチャットルームに入室できます。
チャットルームで録画ボタンを押すと、録画ボタンを押した人の音声と映像を録画できます。録画したファイルは backend/recordings ディレクトリに mp4 ファイルとして保存されます。[2]
mediasoup について
mediasoup はオープンソースの SFU で、 C++, JavaScript, Rust から利用できるインターフェースが提供されています。
mediasoup の概要については以前に Rust、何もわからない... #10 というイベントで発表したスライドがあるのでそちらもご参照ください。
あるいは https://leaysgur.github.io/posts/2020/03/24/152051/ の解説記事も非常に詳しくて参考になるのでおすすめです。
mediasoup の通信の仕組み
mediasoup でビデオ通信/データ通信を行うコンポーネントは Transport と呼ばれます。
複数の Transport は Router というコンポーネントによって管理されます。(シンプルな実装では一つの Router がビデオチャットでいうところのチャットルームに相当しますが、複数の Router 同士を繋いでより複雑な構成を作ることも可能です。)
mediasoup では一つのクライアントに対してデータの送信用と受信用でそれぞれ別の Transport を使う仕組みになっていて、それぞれ Producer, Consumer と呼ばれます。
また Transport は用途に合わせて次の4種類が用意されています。
名前 | 説明 |
---|---|
WebRtcTransport | WebRTC通信を行うためのトランスポート |
PlainTransport | 特定のホスト/ポートに対してRTP, RTCPパケットを送受信するためのトランスポート |
PipeTransport | 2つの Router 同士で通信を繋ぐためのトランスポート |
DirectTransport | データ通信を行ったり、RTP/RTCPパケットを注入するためのトランスポート |
このあたりは先に紹介したこちらの記事も詳しいのでこちらもご参照ください。
簡単なビデオチャットを実装するときは、サーバー上でチャットルームに相当する Router を作り、参加者ごとに Producer と Consumer に使用される WebRTCTransport を 1セット作成します。
同じくクライアント側でもそれと対応する Producer と Consumer を作成してそれらを繋ぐことでクライアント/SFU 間で WebRTC 通信の送受信ができるようになります。
録画処理の解説
mediasoup は SFU なので、ビデオチャットの各参加者の映像/音声は一度サーバーに集められます。このデータを FFmpeg や GStreamer に送ることで、各参加者の映像/音声を録画できます。(ただし SFU という特性上、 mediasoup は受信した映像/音声データをエンコードや mix は行いません。)
録画をするには以下のようにします。
- create_plain_transport で PlainTransport を作成し、録画対象 Transport のデータを、PlainTransport を経由して指定のポートに投げる。
- そのパケットを FFmpeg や GStreamer で録画してファイルに書き出す。
今回は FFmpeg で録画する仕組みを実装しました。
以下にそのコードと解説コメントを記載します。
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::process::Child;
use std::process::Stdio;
use std::sync::Mutex;
use std::thread;
use crate::room::media_codecs;
use mediasoup::plain_transport::*;
use mediasoup::prelude::*;
use mediasoup::rtp_parameters::RtpCodecCapabilityFinalized;
use std::process::Command;
static RECORDING_PORT_MIN: u16 = 12000;
static RECORDING_PORT_MAX: u16 = 13000;
static RECORDING_PORT: Mutex<u16> = Mutex::new(RECORDING_PORT_MIN);
// (1)
#[derive(Default, Debug)]
pub struct Recorder {
pub audio_transport: Option<PlainTransport>,
pub video_transport: Option<PlainTransport>,
pub audio_consumer: Option<Consumer>,
pub video_consumer: Option<Consumer>,
pub process: Option<Child>,
pub is_recording: bool,
pub filename: String,
pub sdp_filename: String,
pub port_number: u16,
}
impl Recorder {
// (2)
pub async fn new(
router: &Router,
audio_producer: Option<&Producer>,
video_producer: Option<&Producer>,
) -> Result<Self, String> {
let mut tmp_self = Recorder::default();
// (3)
let mut rp_guard = RECORDING_PORT.lock().expect("lock mutex");
let port_number = *rp_guard;
*rp_guard = if port_number >= RECORDING_PORT_MAX {
RECORDING_PORT_MIN
} else {
port_number + 4
};
std::mem::drop(rp_guard);
tmp_self.port_number = port_number;
// (4)-1
if let Some(ap) = audio_producer {
let mut transport_options = PlainTransportOptions::new(ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
});
transport_options.comedia = false;
transport_options.rtcp_mux = false;
let transport = router
.create_plain_transport(transport_options)
.await
.map_err(|error| format!("Failed to create audio transport: {error}"))?;
let remote_params = PlainTransportRemoteParameters {
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
port: Some(port_number),
rtcp_port: Some(port_number + 1),
srtp_parameters: None,
};
transport
.connect(remote_params)
.await
.map_err(|error| format!("Failed to connect audio transport: {error}"))?;
log::debug!("audio transport tuple: {:?}", &transport.tuple());
log::debug!("audio transport rtcp tuple: {:?}", &transport.rtcp_tuple());
let src_cap = convert_rtp_capabilities(router.rtp_capabilities());
log::debug!("audio capabilities: {:?}", &src_cap);
let mut cap = RtpCapabilities::default();
cap.header_extensions = src_cap.header_extensions.clone();
let codec = media_codecs()[0].clone();
cap.codecs.push(codec);
let mut consume_options = ConsumerOptions::new(ap.id(), cap);
consume_options.paused = true;
let consumer = transport
.consume(consume_options)
.await
.map_err(|error| format!("Failed to consume audio transport: {error}"))?;
tmp_self.audio_transport = Some(transport);
tmp_self.audio_consumer = Some(consumer);
}
// (4)-2
if let Some(vp) = video_producer {
let mut transport_options = PlainTransportOptions::new(ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
});
transport_options.comedia = false;
transport_options.rtcp_mux = false;
let transport = router
.create_plain_transport(transport_options)
.await
.map_err(|error| format!("Failed to create video transport: {error}"))?;
let remote_params = PlainTransportRemoteParameters {
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
port: Some(port_number + 2),
rtcp_port: Some(port_number + 3),
srtp_parameters: None,
};
transport
.connect(remote_params)
.await
.map_err(|error| format!("Failed to connect video transport: {error}"))?;
log::debug!("video transport tuple: {:?}", &transport.tuple());
log::debug!("video transport rtcp tuple: {:?}", &transport.rtcp_tuple());
let src_cap = convert_rtp_capabilities(router.rtp_capabilities());
log::debug!("video capabilities: {:?}", &src_cap);
let mut cap = RtpCapabilities::default();
cap.header_extensions = src_cap.header_extensions.clone();
let codec = media_codecs()[1].clone();
cap.codecs.push(codec);
let mut consume_options = ConsumerOptions::new(vp.id(), cap);
consume_options.paused = true;
let consumer = transport
.consume(consume_options)
.await
.map_err(|error| format!("Failed to consume video transport: {error}"))?;
tmp_self.video_transport = Some(transport);
tmp_self.video_consumer = Some(consumer);
}
Ok(tmp_self)
}
// (5)
pub async fn start_recording(&mut self, output_name: &str) -> Result<(), String> {
self.start_recording_process(output_name).await?;
if let Some(c) = self.audio_consumer.as_ref() {
c.resume()
.await
.map_err(|e| format!("Failed to start audio consumer: {e}"))?;
log::debug!("redume audio consumer");
}
if let Some(c) = self.video_consumer.as_ref() {
c.resume()
.await
.map_err(|e| format!("Failed to start video consumer: {e}"))?;
log::debug!("redume video consumer");
}
self.is_recording = true;
self.filename = output_name.to_string();
Ok(())
}
// (6)
fn create_sdp_file(&mut self, sdp_filename: &str, port_number: u16) -> Result<(), String> {
let text = format!(
r#"
v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio {} RTP/AVPF 111
a=rtcp:{}
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
m=video {} RTP/AVPF 125
a=rtcp:{}
a=rtpmap:125 H264/90000
"#,
port_number,
port_number + 1,
port_number + 2,
port_number + 3
);
let _ = std::fs::write(sdp_filename, text);
Ok(())
}
// (7)
async fn start_recording_process(&mut self, output_name: &str) -> Result<(), String> {
let sdp_filename = format!("./profiles/{}.sdp", output_name);
self.create_sdp_file(&sdp_filename, self.port_number)?;
let cmd_program = "ffmpeg";
let output_path = format!("./recordings/{}_tmp.mp4", output_name);
let video_format = vec!["-f", "mp4", "-strict", "experimental"];
// Run process
let cmd_args = [
vec![
"-protocol_whitelist",
"file,rtp,udp",
"-probesize",
"50M",
"-fflags",
"+genpts",
"-i",
&sdp_filename,
],
video_format,
vec!["-y", output_path.as_ref()],
]
.concat();
log::info!("spawn ffmpeg: {:?}", &cmd_program);
let mut proc = Command::new(cmd_program)
.args(cmd_args)
.stderr(Stdio::piped())
.stdin(Stdio::piped())
.spawn()
.map_err(|error| format!("Failed to consume audio transport: {error}"))?;
log::info!("get ffmpeg handle");
let stderr = proc
.stderr
.take()
.ok_or("Failed to take stdout".to_owned())?;
log::info!("take stderr");
let mut r = BufReader::with_capacity(10000000, stderr);
log::info!("get buf reader");
loop {
log::info!("get ffmpeg output");
let mut line = String::new();
let result = r.read_line(&mut line);
if let Err(e) = result {
return Err(format!("Failed to read line: {e}"));
}
if let Ok(0) = result {
return Err("FFmpeg is quit".to_owned());
}
log::debug!("line: {}", &line);
if line.starts_with("ffmpeg version") {
break;
}
}
thread::spawn(move || {
log::info!("read lines.");
loop {
let mut buf = String::new();
let result = r.read_line(&mut buf);
if result.is_err() || result.unwrap_or(1) == 0 {
break;
}
log::info!("{}", &buf);
}
});
log::debug!("ffmpeg has been started.");
self.process = Some(proc);
self.sdp_filename = sdp_filename;
Ok(())
}
// (8)
pub fn stop_recording_process(&mut self) -> Result<(), String> {
let proc = std::mem::replace(&mut self.process, None);
let Some(mut c) = proc else {
return Err("proc is none".to_owned());
};
if let Some(stream) = c.stdin.as_mut() {
stream.write(b"q\n");
stream.flush();
}
let _ = c
.wait()
.map_err(|e| format!("FFmpeg failed to exit: {e}"))?;
Ok(())
}
// (9)
pub async fn stop_recording(&mut self) -> Result<(), String> {
if self.is_recording == false {
return Ok(());
}
self.stop_recording_process();
let filename = self.filename.clone();
let src_path = format!("./recordings/{}_tmp.mp4", &filename);
let dest_path = format!("./recordings/{}.mp4", &filename);
let _ = std::fs::rename(src_path, dest_path);
let _ = std::fs::remove_file(&self.sdp_filename);
if let Some(c) = self.audio_consumer.as_ref() {
c.pause()
.await
.map_err(|e| "Failed to pause audio consumer: {e}".to_owned())?;
}
if let Some(c) = self.video_consumer.as_ref() {
c.pause()
.await
.map_err(|e| "Failed to pause video consumer: {e}".to_owned())?;
}
Ok(())
}
}
fn convert_rtp_codec_capability(src: &RtpCodecCapabilityFinalized) -> RtpCodecCapability {
match src {
RtpCodecCapabilityFinalized::Audio {
mime_type,
preferred_payload_type,
clock_rate,
channels,
parameters,
rtcp_feedback,
} => RtpCodecCapability::Audio {
mime_type: mime_type.clone(),
preferred_payload_type: Some(preferred_payload_type.clone()),
clock_rate: clock_rate.clone(),
channels: channels.clone(),
parameters: parameters.clone(),
rtcp_feedback: rtcp_feedback.clone(),
},
RtpCodecCapabilityFinalized::Video {
mime_type,
preferred_payload_type,
clock_rate,
parameters,
rtcp_feedback,
} => RtpCodecCapability::Video {
mime_type: mime_type.clone(),
preferred_payload_type: Some(preferred_payload_type.clone()),
clock_rate: clock_rate.clone(),
parameters: parameters.clone(),
rtcp_feedback: rtcp_feedback.clone(),
},
_ => panic!("Unknown type"),
}
}
fn convert_rtp_capabilities(src: &RtpCapabilitiesFinalized) -> RtpCapabilities {
let mut dest = RtpCapabilities::default();
dest.header_extensions = src.header_extensions.clone();
dest.codecs = src
.codecs
.iter()
.map(convert_rtp_codec_capability)
.collect();
dest
}
(1) Recorder 構造体
#[derive(Default, Debug)]
pub struct Recorder {
pub audio_transport: Option<PlainTransport>,
pub video_transport: Option<PlainTransport>,
pub audio_consumer: Option<Consumer>,
pub video_consumer: Option<Consumer>,
pub process: Option<Child>,
pub is_recording: bool,
pub filename: String,
pub sdp_filename: String,
pub port_number: u16,
}
Recorder 構造体は録画用のデータを保持する構造体です。
このプロジェクトでは room.rs というファイルに Client 構造体(参加者一人分の情報を管理する構造体)があり、その中に Recorder のインスタンスを配置しています。
(2) Recorder のコンストラクタ
impl Recorder {
pub async fn new(
router: &Router,
audio_producer: Option<&Producer>,
video_producer: Option<&Producer>,
) -> Result<Self, String> {
let mut tmp_self = Recorder::default();
Recorder オブジェクトは、ユーザーがブラウザ上で録画ボタンを押したときに、録画開始に先立って構築されます。
コンストラクタには現在使用している音声と映像データを受け取るための Producer を渡しています。コンストラクタの中で、この Producer に繋ぐ形で PlainTransport を生成することで、 FFmpeg(が動作しているローカルホスト)に向けて RTP パケットを飛ばせるようになります。
(3) ポートの切り替え
let mut rp_guard = RECORDING_PORT.lock().expect("lock mutex");
let port_number = *rp_guard;
*rp_guard = if port_number >= RECORDING_PORT_MAX {
RECORDING_PORT_MIN
} else {
port_number + 4
};
std::mem::drop(rp_guard);
(3) の箇所では、 Recorder クラスごとにポートを切り替える処理を行っています。
RTP パケットをローカルホストのポートに投げて録画を行う使用上、複数の録画処理で同じポートを使用すると不整合が発生します。それを避けるため、各録画で別々のポートを使用するようにポートを切り替えています。
(4) PlainTransport の作成
if let Some(ap) = audio_producer {
let mut transport_options = PlainTransportOptions::new(ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
});
transport_options.comedia = false;
transport_options.rtcp_mux = false;
let transport = router
.create_plain_transport(transport_options)
.await
.map_err(|error| format!("Failed to create audio transport: {error}"))?;
if let Some(vp) = video_producer {
let mut transport_options = PlainTransportOptions::new(ListenIp {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
});
transport_options.comedia = false;
transport_options.rtcp_mux = false;
let transport = router
.create_plain_transport(transport_options)
.await
.map_err(|error| format!("Failed to create video transport: {error}"))?;
(4)-1, (4)-2 では、オーディオとビデオそれぞれの PlainTransport を作成しています。
(5) 録画の開始
pub async fn start_recording(&mut self, output_name: &str) -> Result<(), String> {
self.start_recording_process(output_name).await?;
if let Some(c) = self.audio_consumer.as_ref() {
c.resume()
.await
.map_err(|e| format!("Failed to start audio consumer: {e}"))?;
log::debug!("redume audio consumer");
}
if let Some(c) = self.video_consumer.as_ref() {
c.resume()
.await
.map_err(|e| format!("Failed to start video consumer: {e}"))?;
log::debug!("redume video consumer");
}
self.is_recording = true;
self.filename = output_name.to_string();
Ok(())
}
(5) では録画を開始する関数を定義しています。ブラウザ上で録画の start ボタンを押すと、 Recorder オブジェクトが作成され、その次にこの関数が呼び出されます。
この関数では、先に FFmpeg のプロセスを起動し、その後 PlainTransport から生成した consumer の処理を開始して、ローカルホストに向けて RTP パケットを送信しています。
(6) SDP ファイルの生成
fn create_sdp_file(&mut self, sdp_filename: &str, port_number: u16) -> Result<(), String> {
let text = format!(
r#"
v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio {} RTP/AVPF 111
a=rtcp:{}
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
m=video {} RTP/AVPF 125
a=rtcp:{}
a=rtpmap:125 H264/90000
"#,
port_number,
port_number + 1,
port_number + 2,
port_number + 3
);
let _ = std::fs::write(sdp_filename, text);
Ok(())
}
(6) の箇所では SDP ファイル生成してファイルに書き出しています。
録画ごとにポートを切り替える必要があるので、 FFmpeg が使用する SDP もそれに合わせた内容にする必要があります。そのため SDP ファイルは Recorder オブジェクトごとに固有のものを作り、録画が完了したタイミングでファイルを削除するようにしています。
(7) FFmpeg プロセスの開始
async fn start_recording_process(&mut self, output_name: &str) -> Result<(), String> {
let sdp_filename = format!("./profiles/{}.sdp", output_name);
self.create_sdp_file(&sdp_filename, self.port_number)?;
let cmd_program = "ffmpeg";
let output_path = format!("./recordings/{}_tmp.mp4", output_name);
let video_format = vec!["-f", "mp4", "-strict", "experimental"];
// Run process
let cmd_args = [
(7) では録画を行う FFmpeg のプロセスを開始する関数を定義しています。
PlainTransport から送信した RTP パケットは FFmpeg を使用して録画します。
(6) で作った SDP ファイルを読み込んで FFmpeg を起動しておくことで、そのポートに投げられた RTP パケット読み込んで mp4 ファイルを生成できます。
(8) FFmpeg プロセスの停止
pub fn stop_recording_process(&mut self) -> Result<(), String> {
let proc = std::mem::replace(&mut self.process, None);
let Some(mut c) = proc else {
return Err("proc is none".to_owned());
};
if let Some(stream) = c.stdin.as_mut() {
stream.write(b"q\n");
stream.flush();
}
let _ = c
.wait()
.map_err(|e| format!("FFmpeg failed to exit: {e}"))?;
Ok(())
}
(8) 録画を行う FFmpeg のプロセスを停止する関数を定義しています。
ここで注意するべきは、FFmpeg は標準入力に q
を渡すことで処理を停止できるのですが、 RTP パケットを送る Transport が先に停止していると FFmpeg もそれに合わせて処理を停止してしまい、標準入力経由で処理を停止できなくなってしまうことです。
そのため、このあと (9) で PlainTransport のパケット送信を停止するのに先立ってこの関数で FFmpeg を終了させるようにしています。
(9) 録画の終了
pub async fn stop_recording(&mut self) -> Result<(), String> {
if self.is_recording == false {
return Ok(());
}
self.stop_recording_process();
let filename = self.filename.clone();
let src_path = format!("./recordings/{}_tmp.mp4", &filename);
let dest_path = format!("./recordings/{}.mp4", &filename);
let _ = std::fs::rename(src_path, dest_path);
let _ = std::fs::remove_file(&self.sdp_filename);
if let Some(c) = self.audio_consumer.as_ref() {
c.pause()
.await
.map_err(|e| "Failed to pause audio consumer: {e}".to_owned())?;
}
if let Some(c) = self.video_consumer.as_ref() {
c.pause()
.await
.map_err(|e| "Failed to pause video consumer: {e}".to_owned())?;
}
Ok(())
}
(9) では録画の終了処理を行っています。
はじめに stop_recording_process()
関数を呼び出し FFmpeg のプロセスを終了させた後、テンポラリの録画ファイルを正式版としてリネームし、不要になった SDP ファイルを削除しています。
その後、 PlainTransport からのパケット送信を停止して録画処理が終了します。
録画の制限
ポートの使用について
PlainTransport は指定したポートに RTP パケットを投げる仕組みです。ある録画処理で使用しているポートに他の録画用のデータが流れると不整合が起きるため、録画に使用しているポートは他の録画と共有できません。(また、録画をするためには RTP/RTCP 用のポートが映像/音声それぞれで必要になるため、一つの録画のために4つのポートを使用します)
このため複数の録画を同時に行うためには次のどちらかの対応が必要になります。
- 録画ごとに別々のポートが使用されるようにする。
- FFmpeg などを使用せずに、mediasoup のプロセス内で RTP パケットをそのまま取得して適切にフォーマット変換して動画ファイルに書き出す。(Comsumer の on_rtp() メソッドを使用すれば可能?)
録画対象のデータについて
先に記載した通り、 mediasoup は SFU であるため、サーバー側で受信した映像/音声データをエンコードや mix はしません。
このため、録画するデータは各参加者ごとに別々になり、 zoom のようにその瞬間に話している人にフォーカスして録画するような機能はデフォルトでは用意されていません。
これに対しては、現在話している人を検知して、その人の RTP パケットだけを PlainTransport で送信して録画するという対応ができるかもしれません。
mediasoup は通信しているデータの RTP パケットを別の場所に送るという非常にプリミティブな機能だけを提供しているので、よりリッチな録画機能については mediasoup の利用者が頑張って実装する必要があります。
おわりに
今回は mediasoup を使って録画する方法について調査しました。
RTP パケットを別の場所に送れるということで自由度は高いですが、リッチな機能は自分で実装する必要があり、きちんとしたものを作るには胆力が必要になりそうです。
別のポートにデータを送るのではなく、 mediasoup が受信した RTP パケットをそのまま使って録画できるようになると便利なので、その辺りについては引き続き調査していこうと思います。
Discussion