Configurable JB size, send PLIs, send loss events, sep RTP/RTCP logging

This commit is contained in:
Jasper Hugo 2022-03-09 10:56:21 +07:00 committed by Jasper
parent 6ef974719b
commit d7e2320fe5
4 changed files with 109 additions and 52 deletions

View File

@ -25,80 +25,113 @@ use tracing::{error, info, trace, warn};
struct Opt { struct Opt {
#[structopt(long)] #[structopt(long)]
web_socket_url: String, web_socket_url: String,
#[structopt( #[structopt(
long, long,
help = "If not specified, assumed to be the host part of <web-socket-url>", help = "If not specified, assumed to be the host part of <web-socket-url>",
)] )]
xmpp_domain: Option<String>, xmpp_domain: Option<String>,
#[structopt(long)] #[structopt(long)]
room_name: String, room_name: String,
#[structopt( #[structopt(
long, long,
help = "If not specified, assumed to be conference.<xmpp-domain>", help = "If not specified, assumed to be conference.<xmpp-domain>",
)] )]
muc_domain: Option<String>, muc_domain: Option<String>,
#[structopt( #[structopt(
long, long,
help = "If not specified, assumed to be focus@auth.<xmpp-domain>/focus", help = "If not specified, assumed to be focus@auth.<xmpp-domain>/focus",
)] )]
focus_jid: Option<String>, focus_jid: Option<String>,
#[structopt( #[structopt(
long, long,
default_value = "vp8", default_value = "vp8",
help = "The video codec to negotiate support for. One of: vp8, vp9, h264", help = "The video codec to negotiate support for. One of: vp8, vp9, h264",
)] )]
video_codec: String, video_codec: String,
#[structopt(long, default_value = "gst-meet")] #[structopt(long, default_value = "gst-meet")]
nick: String, nick: String,
#[structopt(long)] #[structopt(long)]
region: Option<String>, region: Option<String>,
#[structopt(long)] #[structopt(long)]
send_pipeline: Option<String>, send_pipeline: Option<String>,
#[structopt(long)] #[structopt(long)]
recv_pipeline_participant_template: Option<String>, recv_pipeline_participant_template: Option<String>,
#[structopt( #[structopt(
long, long,
help = "Comma-separated endpoint IDs to select (prioritise receiving of)", help = "Comma-separated endpoint IDs to select (prioritise receiving of)",
)] )]
select_endpoints: Option<String>, select_endpoints: Option<String>,
#[structopt( #[structopt(
long, long,
help = "The maximum number of video streams we would like to receive", help = "The maximum number of video streams we would like to receive",
)] )]
last_n: Option<u16>, last_n: Option<u16>,
#[structopt( #[structopt(
long, long,
help = "The maximum height to receive video at." help = "The maximum height to receive video at."
)] )]
recv_video_height: Option<u16>, recv_video_height: Option<u16>,
#[structopt( #[structopt(
long, long,
help = "The maximum height we plan to send video at (used for stats only)." help = "The maximum height we plan to send video at (used for stats only)."
)] )]
send_video_height: Option<u16>, send_video_height: Option<u16>,
#[structopt( #[structopt(
long, long,
help = "The video type to signal that we are sending. One of: camera, desktop" help = "The video type to signal that we are sending. One of: camera, desktop"
)] )]
video_type: Option<String>, video_type: Option<String>,
#[structopt(
long,
default_value = "200",
help = "The size of the jitter buffers in milliseconds. Larger values are more resilient to packet loss and jitter, smaller values give lower latency.",
)]
buffer_size: u32,
#[structopt(long)] #[structopt(long)]
start_bitrate: Option<u32>, start_bitrate: Option<u32>,
#[structopt(long)] #[structopt(long)]
stereo: Option<bool>, stereo: Option<bool>,
#[structopt(short, long, parse(from_occurrences))] #[structopt(short, long, parse(from_occurrences))]
verbose: u8, verbose: u8,
#[cfg(feature = "tls-insecure")] #[cfg(feature = "tls-insecure")]
#[structopt( #[structopt(
long, long,
help = "Disable TLS certificate verification (use with extreme caution)" help = "Disable TLS certificate verification (use with extreme caution)"
)] )]
tls_insecure: bool, tls_insecure: bool,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
#[structopt( #[structopt(
long, long,
help = "Log all RTP/RTCP packets at DEBUG level" help = "Log all RTP packets at DEBUG level (extremely verbose)"
)] )]
log_rtp: bool, log_rtp: bool,
#[cfg(feature = "log-rtp")]
#[structopt(
long,
help = "Log all RTCP packets at DEBUG level"
)]
log_rtcp: bool,
} }
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]
@ -204,10 +237,13 @@ async fn main_inner() -> Result<()> {
video_codec, video_codec,
recv_pipeline_participant_template, recv_pipeline_participant_template,
send_video_height, send_video_height,
buffer_size,
start_bitrate, start_bitrate,
stereo, stereo,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
log_rtp, log_rtp,
#[cfg(feature = "log-rtp")]
log_rtcp,
.. ..
} = opt; } = opt;
@ -220,8 +256,11 @@ async fn main_inner() -> Result<()> {
extra_muc_features: vec![], extra_muc_features: vec![],
start_bitrate: start_bitrate.unwrap_or(800), start_bitrate: start_bitrate.unwrap_or(800),
stereo: stereo.unwrap_or_default(), stereo: stereo.unwrap_or_default(),
buffer_size,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
log_rtp, log_rtp,
#[cfg(feature = "log-rtp")]
log_rtcp,
}; };
let main_loop = glib::MainLoop::new(None, false); let main_loop = glib::MainLoop::new(None, false);

View File

@ -160,9 +160,11 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference(
// TODO // TODO
start_bitrate: 800, start_bitrate: 800,
stereo: false, stereo: false,
buffer_size: 200,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
log_rtp: false, log_rtp: false,
#[cfg(feature = "log-rtp")]
log_rtcp: false,
}; };
(*context) (*context)
.runtime .runtime

View File

@ -80,8 +80,11 @@ pub struct JitsiConferenceConfig {
pub extra_muc_features: Vec<String>, pub extra_muc_features: Vec<String>,
pub start_bitrate: u32, pub start_bitrate: u32,
pub stereo: bool, pub stereo: bool,
pub buffer_size: u32,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
pub log_rtp: bool, pub log_rtp: bool,
#[cfg(feature = "log-rtp")]
pub log_rtcp: bool,
} }
#[derive(Clone)] #[derive(Clone)]

View File

@ -548,6 +548,8 @@ impl JingleSession {
let rtpbin = gstreamer::ElementFactory::make("rtpbin", Some("rtpbin"))?; let rtpbin = gstreamer::ElementFactory::make("rtpbin", Some("rtpbin"))?;
rtpbin.set_property_from_str("rtp-profile", "savpf"); rtpbin.set_property_from_str("rtp-profile", "savpf");
rtpbin.set_property("autoremove", true); rtpbin.set_property("autoremove", true);
rtpbin.set_property("do-lost", true);
rtpbin.set_property("do-sync-event", true);
pipeline.add(&rtpbin)?; pipeline.add(&rtpbin)?;
let nicesrc = gstreamer::ElementFactory::make("nicesrc", None)?; let nicesrc = gstreamer::ElementFactory::make("nicesrc", None)?;
@ -604,7 +606,8 @@ impl JingleSession {
caps = caps caps = caps
.field("media", "video") .field("media", "video")
.field("clock-rate", 90000) .field("clock-rate", 90000)
.field("encoding-name", codec.encoding_name()); .field("encoding-name", codec.encoding_name())
.field("rtcp-fb-nack-pli", true);
if let Some(hdrext) = video_hdrext_transport_cc { if let Some(hdrext) = video_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC); caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
} }
@ -640,6 +643,7 @@ impl JingleSession {
let handle = Handle::current(); let handle = Handle::current();
let jingle_session = conference.jingle_session.clone(); let jingle_session = conference.jingle_session.clone();
let buffer_size = conference.config.buffer_size;
rtpbin.connect("new-jitterbuffer", false, move |values| { rtpbin.connect("new-jitterbuffer", false, move |values| {
let handle = handle.clone(); let handle = handle.clone();
let jingle_session = jingle_session.clone(); let jingle_session = jingle_session.clone();
@ -669,6 +673,8 @@ impl JingleSession {
if source.media_type == MediaType::Video && source.participant_id.is_some() { if source.media_type == MediaType::Video && source.participant_id.is_some() {
debug!("enabling RTX for ssrc {}", ssrc); debug!("enabling RTX for ssrc {}", ssrc);
rtpjitterbuffer.set_property("do-retransmission", true); rtpjitterbuffer.set_property("do-retransmission", true);
rtpjitterbuffer.set_property("drop-on-latency", true);
rtpjitterbuffer.set_property("latency", buffer_size);
} }
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}; };
@ -1023,60 +1029,67 @@ impl JingleSession {
pipeline.add(&rtcp_send_identity)?; pipeline.add(&rtcp_send_identity)?;
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
if conference.config.log_rtp { {
debug!("setting up RTP/RTCP packet logging"); if conference.config.log_rtp {
debug!("setting up RTP packet logging");
let make_rtp_logger = |direction: &'static str| { let make_rtp_logger = |direction: &'static str| {
move |values: &[glib::Value]| -> Option<glib::Value> { move |values: &[glib::Value]| -> Option<glib::Value> {
let f = || { let f = || {
let buffer: gstreamer::Buffer = values[1].get()?; let buffer: gstreamer::Buffer = values[1].get()?;
let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?; let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?;
debug!( debug!(
ssrc=rtp_buffer.ssrc(), ssrc=rtp_buffer.ssrc(),
pt=rtp_buffer.payload_type(), pt=rtp_buffer.payload_type(),
seq=rtp_buffer.seq(), seq=rtp_buffer.seq(),
ts=rtp_buffer.timestamp(), ts=rtp_buffer.timestamp(),
marker=rtp_buffer.is_marker(), marker=rtp_buffer.is_marker(),
extension=rtp_buffer.is_extension(), extension=rtp_buffer.is_extension(),
payload_size=rtp_buffer.payload_size(), payload_size=rtp_buffer.payload_size(),
"RTP {}", "RTP {}",
direction, direction,
); );
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}; };
if let Err(e) = f() { if let Err(e) = f() {
warn!("RTP {}: {:?}", direction, e); warn!("RTP {}: {:?}", direction, e);
}
None
} }
None };
}
};
let make_rtcp_logger = |direction: &'static str| { rtp_recv_identity.connect("handoff", false, make_rtp_logger("RECV"));
move |values: &[glib::Value]| -> Option<glib::Value> { rtp_send_identity.connect("handoff", false, make_rtp_logger("SEND"));
let f = || { }
let buffer: gstreamer::Buffer = values[1].get()?;
let mut buf = [0u8; 1500]; if conference.config.log_rtcp {
buffer.copy_to_slice(0, &mut buf[..buffer.size()]).map_err(|_| anyhow!("invalid RTCP packet size"))?; debug!("setting up RTCP packet logging");
let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?;
debug!( let make_rtcp_logger = |direction: &'static str| {
"RTCP {} size={}\n{:#?}", move |values: &[glib::Value]| -> Option<glib::Value> {
direction, let f = || {
buffer.size(), let buffer: gstreamer::Buffer = values[1].get()?;
decoded, let mut buf = [0u8; 1500];
); buffer.copy_to_slice(0, &mut buf[..buffer.size()]).map_err(|_| anyhow!("invalid RTCP packet size"))?;
Ok::<_, anyhow::Error>(()) let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?;
}; debug!(
if let Err(e) = f() { "RTCP {} size={}\n{:#?}",
warn!("RTCP {}: {:?}", direction, e); direction,
buffer.size(),
decoded,
);
Ok::<_, anyhow::Error>(())
};
if let Err(e) = f() {
warn!("RTCP {}: {:?}", direction, e);
}
None
} }
None };
}
};
rtp_recv_identity.connect("handoff", false, make_rtp_logger("RECV")); rtcp_recv_identity.connect("handoff", false, make_rtcp_logger("RECV"));
rtp_send_identity.connect("handoff", false, make_rtp_logger("SEND")); rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND"));
rtcp_recv_identity.connect("handoff", false, make_rtcp_logger("RECV")); }
rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND"));
} }
debug!("link dtlssrtpdec -> rtpbin"); debug!("link dtlssrtpdec -> rtpbin");