TWCC fixes, improved logging

* Added `payload` field to recv caps, fixing TWCC!
 * Re-enabled TWCC for audio
 * Added context to TLS error logging
 * Added missing rtcp-fb for RTX
This commit is contained in:
Jasper Hugo 2022-03-07 15:48:45 +07:00
parent d44546e8c6
commit 3557183a37
4 changed files with 59 additions and 11 deletions

View File

@ -50,7 +50,7 @@ impl ColibriChannel {
match tokio_tungstenite::connect_async_tls_with_config( match tokio_tungstenite::connect_async_tls_with_config(
request, request,
None, None,
Some(wss_connector(tls_insecure)?), Some(wss_connector(tls_insecure).context("failed to build TLS connector")?),
) )
.await .await
{ {

View File

@ -3,10 +3,11 @@ use std::{collections::HashMap, fmt, net::SocketAddr};
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use glib::{ObjectExt, ToValue}; use glib::{ObjectExt, ToValue};
use gstreamer::prelude::{ElementExt, GObjectExtManualGst, GstBinExt, PadExt}; use gstreamer::{Element, prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt}};
use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension}; use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension};
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
use gstreamer_rtp::RTPBuffer; use gstreamer_rtp::RTPBuffer;
use itertools::Itertools;
use jitsi_xmpp_parsers::{ use jitsi_xmpp_parsers::{
jingle::{Action, Content, Description, Jingle, Transport}, jingle::{Action, Content, Description, Jingle, Transport},
jingle_dtls_srtp::Fingerprint, jingle_dtls_srtp::Fingerprint,
@ -128,6 +129,7 @@ impl Codec {
struct ParsedRtpDescription { struct ParsedRtpDescription {
codecs: Vec<Codec>, codecs: Vec<Codec>,
audio_hdrext_ssrc_audio_level: Option<u16>, audio_hdrext_ssrc_audio_level: Option<u16>,
audio_hdrext_transport_cc: Option<u16>,
video_hdrext_transport_cc: Option<u16>, video_hdrext_transport_cc: Option<u16>,
} }
@ -191,6 +193,7 @@ impl JingleSession {
let mut vp8 = None; let mut vp8 = None;
let mut vp9 = None; let mut vp9 = None;
let mut audio_hdrext_ssrc_audio_level = None; let mut audio_hdrext_ssrc_audio_level = None;
let mut audio_hdrext_transport_cc = None;
let mut video_hdrext_transport_cc = None; let mut video_hdrext_transport_cc = None;
if description.media == "audio" { if description.media == "audio" {
@ -209,6 +212,9 @@ impl JingleSession {
if hdrext.uri == RTP_HDREXT_SSRC_AUDIO_LEVEL { if hdrext.uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
audio_hdrext_ssrc_audio_level = Some(hdrext.id); audio_hdrext_ssrc_audio_level = Some(hdrext.id);
} }
else if hdrext.uri == RTP_HDREXT_TRANSPORT_CC {
audio_hdrext_transport_cc = Some(hdrext.id);
}
} }
} }
else if description.media == "video" { else if description.media == "video" {
@ -320,6 +326,7 @@ impl JingleSession {
Ok(Some(ParsedRtpDescription { Ok(Some(ParsedRtpDescription {
codecs, codecs,
audio_hdrext_ssrc_audio_level, audio_hdrext_ssrc_audio_level,
audio_hdrext_transport_cc,
video_hdrext_transport_cc, video_hdrext_transport_cc,
})) }))
} }
@ -461,6 +468,7 @@ impl JingleSession {
let mut ice_transport = None; let mut ice_transport = None;
let mut codecs = vec![]; let mut codecs = vec![];
let mut audio_hdrext_ssrc_audio_level = None; let mut audio_hdrext_ssrc_audio_level = None;
let mut audio_hdrext_transport_cc = None;
let mut video_hdrext_transport_cc = None; let mut video_hdrext_transport_cc = None;
let mut remote_ssrc_map = HashMap::new(); let mut remote_ssrc_map = HashMap::new();
@ -473,6 +481,8 @@ impl JingleSession {
codecs.extend(description.codecs); codecs.extend(description.codecs);
audio_hdrext_ssrc_audio_level = audio_hdrext_ssrc_audio_level =
audio_hdrext_ssrc_audio_level.or(description.audio_hdrext_ssrc_audio_level); audio_hdrext_ssrc_audio_level.or(description.audio_hdrext_ssrc_audio_level);
audio_hdrext_transport_cc =
audio_hdrext_transport_cc.or(description.audio_hdrext_transport_cc);
video_hdrext_transport_cc = video_hdrext_transport_cc =
video_hdrext_transport_cc.or(description.video_hdrext_transport_cc); video_hdrext_transport_cc.or(description.video_hdrext_transport_cc);
} }
@ -572,7 +582,8 @@ impl JingleSession {
let f = || { let f = || {
debug!("rtpbin request-pt-map {:?}", values); debug!("rtpbin request-pt-map {:?}", values);
let pt = values[2].get::<u32>()? as u8; let pt = values[2].get::<u32>()? as u8;
let mut caps = gstreamer::Caps::builder("application/x-rtp"); let mut caps = gstreamer::Caps::builder("application/x-rtp")
.field("payload", pt as i32);
for codec in codecs.iter() { for codec in codecs.iter() {
if codec.is(pt) { if codec.is(pt) {
if codec.is_audio() { if codec.is_audio() {
@ -583,6 +594,9 @@ impl JingleSession {
if let Some(hdrext) = audio_hdrext_ssrc_audio_level { if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL); caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL);
} }
if let Some(hdrext) = audio_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
}
} }
else { else {
// A video codec, as the only audio codec we support is Opus. // A video codec, as the only audio codec we support is Opus.
@ -591,7 +605,7 @@ impl JingleSession {
.field("clock-rate", 90000) .field("clock-rate", 90000)
.field("encoding-name", codec.encoding_name()); .field("encoding-name", codec.encoding_name());
if let Some(hdrext) = video_hdrext_transport_cc { if let Some(hdrext) = video_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_TRANSPORT_CC); caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
} }
} }
return Ok::<_, anyhow::Error>(Some(caps.build())); return Ok::<_, anyhow::Error>(Some(caps.build()));
@ -836,8 +850,11 @@ impl JingleSession {
rtpbin rtpbin
.link_pads(Some(&pad_name), &source_element, None) .link_pads(Some(&pad_name), &source_element, None)
.context(format!("failed to link rtpbin.{} to depayloader", pad_name))?; .context(format!("failed to link rtpbin.{} to depayloader", pad_name))?;
debug!("linked rtpbin.{} to depayloader", pad_name); debug!("linked rtpbin.{} to depayloader", pad_name);
debug!("rtpbin pads:\n{}", dump_pads(&rtpbin));
let src_pad = source_element let src_pad = source_element
.static_pad("src") .static_pad("src")
.context("depayloader has no src pad")?; .context("depayloader has no src pad")?;
@ -1006,6 +1023,8 @@ impl JingleSession {
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
if conference.config.log_rtp { if conference.config.log_rtp {
debug!("setting up RTP/RTCP packet logging");
let make_rtp_logger = |direction: &'static str| { let make_rtp_logger = |direction: &'static str| {
move |values: &[glib::Value]| -> Option<glib::Value> { move |values: &[glib::Value]| -> Option<glib::Value> {
let f = || { let f = || {
@ -1013,9 +1032,11 @@ impl JingleSession {
let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?; let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?;
debug!( debug!(
ssrc=rtp_buffer.ssrc(), ssrc=rtp_buffer.ssrc(),
pt=rtp_buffer.payload_type(),
seq=rtp_buffer.seq(), seq=rtp_buffer.seq(),
ts=rtp_buffer.timestamp(), ts=rtp_buffer.timestamp(),
marker=rtp_buffer.is_marker(), marker=rtp_buffer.is_marker(),
extension=rtp_buffer.is_extension(),
payload_size=rtp_buffer.payload_size(), payload_size=rtp_buffer.payload_size(),
"RTP {}", "RTP {}",
direction, direction,
@ -1037,7 +1058,7 @@ impl JingleSession {
buffer.copy_to_slice(0, &mut buf[..buffer.size()]).map_err(|_| anyhow!("invalid RTCP packet size"))?; buffer.copy_to_slice(0, &mut buf[..buffer.size()]).map_err(|_| anyhow!("invalid RTCP packet size"))?;
let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?; let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?;
debug!( debug!(
"RTCP {} size={}\n{:?}", "RTCP {} size={}\n{:#?}",
direction, direction,
buffer.size(), buffer.size(),
decoded, decoded,
@ -1056,7 +1077,7 @@ impl JingleSession {
rtcp_recv_identity.connect("handoff", false, make_rtcp_logger("RECV")); rtcp_recv_identity.connect("handoff", false, make_rtcp_logger("RECV"));
rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND")); rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND"));
} }
debug!("link dtlssrtpdec -> rtpbin"); debug!("link dtlssrtpdec -> rtpbin");
dtlssrtpdec.link_pads(Some("rtp_src"), &rtp_recv_identity, None)?; dtlssrtpdec.link_pads(Some("rtp_src"), &rtp_recv_identity, None)?;
rtp_recv_identity.link_pads(None, &rtpbin, Some("recv_rtp_sink_0"))?; rtp_recv_identity.link_pads(None, &rtpbin, Some("recv_rtp_sink_0"))?;
@ -1069,6 +1090,8 @@ impl JingleSession {
rtpbin.link_pads(Some("send_rtcp_src_0"), &rtcp_send_identity, None)?; rtpbin.link_pads(Some("send_rtcp_src_0"), &rtcp_send_identity, None)?;
rtcp_send_identity.link_pads(None, &dtlssrtpenc, Some("rtcp_sink_0"))?; rtcp_send_identity.link_pads(None, &dtlssrtpenc, Some("rtcp_sink_0"))?;
debug!("rtpbin pads:\n{}", dump_pads(&rtpbin));
debug!("linking ice src -> dtlssrtpdec"); debug!("linking ice src -> dtlssrtpdec");
nicesrc.link(&dtlssrtpdec)?; nicesrc.link(&dtlssrtpdec)?;
@ -1152,6 +1175,12 @@ impl JingleSession {
name: "apt".to_owned(), name: "apt".to_owned(),
value: codec.pt.to_string(), value: codec.pt.to_string(),
}]; }];
rtx_pt.rtcp_fbs = codec
.rtcp_fbs
.clone()
.into_iter()
.filter(|fb| fb.type_ != "transport-cc")
.collect();
pts.push(rtx_pt); pts.push(rtx_pt);
} }
} }
@ -1215,6 +1244,11 @@ impl JingleSession {
RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned(), RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned(),
)); ));
} }
if let Some(hdrext) = audio_hdrext_transport_cc {
description
.hdrexts
.push(RtpHdrext::new(hdrext, RTP_HDREXT_TRANSPORT_CC.to_owned()));
}
} }
else if initiate_content.name.0 == "video" { else if initiate_content.name.0 == "video" {
if let Some(hdrext) = video_hdrext_transport_cc { if let Some(hdrext) = video_hdrext_transport_cc {
@ -1332,3 +1366,17 @@ impl JingleSession {
Ok(()) Ok(())
} }
} }
fn dump_pads(element: &Element) -> String {
element
.pads()
.into_iter()
.map(|pad| format!(
" {}, peer={}.{}, caps=\"{}\"",
pad.name(),
pad.peer().and_then(|peer| peer.parent_element()).map(|element| element.name().to_string()).unwrap_or_default(),
pad.peer().map(|peer| peer.name().to_string()).unwrap_or_default(),
pad.caps().map(|caps| caps.to_string()).unwrap_or_default(),
))
.join("\n")
}

View File

@ -6,14 +6,14 @@ use std::sync::Arc;
#[cfg(not(feature = "tls-insecure"))] #[cfg(not(feature = "tls-insecure"))]
use anyhow::bail; use anyhow::bail;
use anyhow::Result; use anyhow::{Context, Result};
use tokio_tungstenite::Connector; use tokio_tungstenite::Connector;
#[cfg(feature = "tls-rustls-native-roots")] #[cfg(feature = "tls-rustls-native-roots")]
pub(crate) fn wss_connector(insecure: bool) -> Result<tokio_tungstenite::Connector> { pub(crate) fn wss_connector(insecure: bool) -> Result<tokio_tungstenite::Connector> {
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? { for cert in rustls_native_certs::load_native_certs().context("failed to load native root certs")? {
roots.add(&rustls::Certificate(cert.0))?; roots.add(&rustls::Certificate(cert.0)).context("failed to add native root certs")?;
} }
let mut config = rustls::ClientConfig::builder() let mut config = rustls::ClientConfig::builder()
@ -80,7 +80,7 @@ pub(crate) fn wss_connector(insecure: bool) -> Result<tokio_tungstenite::Connect
"Insecure TLS mode can only be enabled if the tls-insecure feature was enabled at compile time." "Insecure TLS mode can only be enabled if the tls-insecure feature was enabled at compile time."
) )
} }
Ok(Connector::NativeTls(builder.build()?)) Ok(Connector::NativeTls(builder.build().context("failed to build native TLS connector")?))
} }
#[cfg(all( #[cfg(all(

View File

@ -100,7 +100,7 @@ impl Connection {
let (websocket, _response) = tokio_tungstenite::connect_async_tls_with_config( let (websocket, _response) = tokio_tungstenite::connect_async_tls_with_config(
request, request,
None, None,
Some(wss_connector(tls_insecure)?), Some(wss_connector(tls_insecure).context("failed to build TLS connector")?),
) )
.await .await
.context("failed to connect XMPP WebSocket")?; .context("failed to connect XMPP WebSocket")?;