gst-meet/lib-gst-meet/src/jingle.rs

1550 lines
52 KiB
Rust
Raw Permalink Normal View History

2021-08-13 11:48:59 +00:00
use std::{collections::HashMap, fmt, net::SocketAddr};
use anyhow::{anyhow, bail, Context, Result};
use futures::stream::StreamExt;
use glib::{Cast, ObjectExt, ToValue};
2022-03-09 06:35:00 +00:00
use gstreamer::{
prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt},
Bin, Element, GhostPad,
};
#[cfg(feature = "log-rtp")]
use gstreamer_rtp::RTPBuffer;
2022-03-09 06:35:00 +00:00
use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension};
use itertools::Itertools;
use jitsi_xmpp_parsers::{
jingle::{Action, Content, Description, Jingle, Transport},
jingle_dtls_srtp::Fingerprint,
jingle_ice_udp::Transport as IceUdpTransport,
jingle_rtp::Description as RtpDescription,
jingle_ssma,
};
2021-08-13 11:48:59 +00:00
use nice_gst_meet as nice;
use pem::Pem;
use rand::random;
use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256};
use ring::digest::{digest, SHA256};
use tokio::{net::lookup_host, runtime::Handle, sync::oneshot, task::JoinHandle};
use tracing::{debug, error, info, warn};
2021-08-13 11:48:59 +00:00
use uuid::Uuid;
use xmpp_parsers::{
hashes::Algo,
iq::Iq,
jingle::{Creator, Senders},
jingle_dtls_srtp::Setup,
jingle_grouping::{self, Content as GroupContent},
jingle_ice_udp,
jingle_rtcp_fb::RtcpFb,
jingle_rtp::{self, PayloadType, RtcpMux},
jingle_rtp_hdrext::RtpHdrext,
jingle_ssma::{Parameter, Semantics},
2021-08-13 11:48:59 +00:00
Jid,
};
use crate::{
colibri::ColibriChannel,
2021-08-13 11:48:59 +00:00
conference::JitsiConference,
source::{MediaType, Source},
util::generate_id,
};
const RTP_HDREXT_SSRC_AUDIO_LEVEL: &str = "urn:ietf:params:rtp-hdrext:ssrc-audio-level";
2021-08-25 04:21:43 +00:00
const RTP_HDREXT_TRANSPORT_CC: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
2021-08-13 11:48:59 +00:00
const DEFAULT_STUN_PORT: u16 = 3478;
2021-08-13 16:09:17 +00:00
const DEFAULT_TURNS_PORT: u16 = 5349;
2021-08-13 11:48:59 +00:00
#[derive(Clone, PartialEq)]
enum CodecName {
Opus,
H264,
Vp8,
Vp9,
}
#[derive(Clone)]
struct Codec {
name: CodecName,
pt: u8,
rtx_pt: Option<u8>,
rtcp_fbs: Vec<RtcpFb>,
}
impl Codec {
fn is(&self, pt: u8) -> bool {
self.pt == pt
}
fn is_rtx(&self, rtx_pt: u8) -> bool {
if let Some(pt) = self.rtx_pt {
pt == rtx_pt
2022-03-06 09:24:02 +00:00
}
else {
false
}
}
fn is_audio(&self) -> bool {
self.name == CodecName::Opus
}
fn is_video(&self) -> bool {
self.name != CodecName::Opus
}
fn is_codec(&self, name: &str) -> bool {
match name {
"h264" => self.name == CodecName::H264,
"vp8" => self.name == CodecName::Vp8,
"vp9" => self.name == CodecName::Vp9,
_ => false,
}
}
fn encoding_name(&self) -> &'static str {
match self.name {
CodecName::Opus => "opus",
CodecName::H264 => "H264",
CodecName::Vp8 => "VP8",
CodecName::Vp9 => "VP9",
}
}
fn depayloader_name(&self) -> &'static str {
match self.name {
CodecName::Opus => "rtpopusdepay",
CodecName::H264 => "rtph264depay",
CodecName::Vp8 => "rtpvp8depay",
CodecName::Vp9 => "rtpvp9depay",
}
}
fn decoder_name(&self) -> &'static str {
match self.name {
CodecName::Opus => "opusdec",
CodecName::H264 => "avdec_h264",
CodecName::Vp8 => "vp8dec",
CodecName::Vp9 => "vp9dec",
}
}
fn payloader_name(&self) -> &'static str {
match self.name {
CodecName::Opus => "rtpopuspay",
CodecName::H264 => "rtph264pay",
CodecName::Vp8 => "rtpvp8pay",
CodecName::Vp9 => "rtpvp9pay",
}
}
}
struct ParsedRtpDescription {
2022-03-06 09:24:02 +00:00
codecs: Vec<Codec>,
audio_hdrext_ssrc_audio_level: Option<u16>,
audio_hdrext_transport_cc: Option<u16>,
2022-03-06 09:24:02 +00:00
video_hdrext_transport_cc: Option<u16>,
}
2021-08-13 11:48:59 +00:00
pub(crate) struct JingleSession {
pipeline: gstreamer::Pipeline,
audio_sink_element: gstreamer::Element,
video_sink_element: gstreamer::Element,
pub(crate) remote_ssrc_map: HashMap<u32, Source>,
_ice_agent: nice::Agent,
2021-08-13 11:48:59 +00:00
pub(crate) accept_iq_id: Option<String>,
pub(crate) colibri_url: Option<String>,
pub(crate) colibri_channel: Option<ColibriChannel>,
pub(crate) stats_handler_task: Option<JoinHandle<()>>,
2021-08-13 11:48:59 +00:00
pipeline_state_null_rx: oneshot::Receiver<()>,
}
impl fmt::Debug for JingleSession {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JingleSession").finish()
}
}
impl JingleSession {
pub(crate) fn pipeline(&self) -> gstreamer::Pipeline {
self.pipeline.clone()
}
pub(crate) fn audio_sink_element(&self) -> gstreamer::Element {
self.audio_sink_element.clone()
}
pub(crate) fn video_sink_element(&self) -> gstreamer::Element {
self.video_sink_element.clone()
}
pub(crate) fn pause_all_sinks(&self) {
if let Some(rtpbin) = self.pipeline.by_name("rtpbin") {
rtpbin.foreach_src_pad(|_, pad| {
let pad_name: String = pad.property("name");
2021-08-13 11:48:59 +00:00
if pad_name.starts_with("recv_rtp_src_0_") {
if let Some(peer_pad) = pad.peer() {
if let Some(element) = peer_pad.parent_element() {
element.set_state(gstreamer::State::Paused).unwrap();
}
}
}
true
});
}
}
pub(crate) async fn pipeline_stopped(self) -> Result<()> {
Ok(self.pipeline_state_null_rx.await?)
}
2022-03-06 09:24:02 +00:00
fn parse_rtp_description(
description: &RtpDescription,
remote_ssrc_map: &mut HashMap<u32, Source>,
) -> Result<Option<ParsedRtpDescription>> {
let mut opus = None;
let mut h264 = None;
let mut vp8 = None;
let mut vp9 = None;
let mut audio_hdrext_ssrc_audio_level = None;
let mut audio_hdrext_transport_cc = None;
let mut video_hdrext_transport_cc = None;
if description.media == "audio" {
2021-10-23 15:32:13 +00:00
for pt in description.payload_types.iter() {
// We dont support any static codec, so name MUST be set.
if pt.name.as_deref() == Some("opus") {
opus = Some(Codec {
name: CodecName::Opus,
pt: pt.id,
rtx_pt: None,
rtcp_fbs: pt.rtcp_fbs.clone(),
});
2021-10-23 15:32:13 +00:00
}
}
for hdrext in description.hdrexts.iter() {
if hdrext.uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
audio_hdrext_ssrc_audio_level = Some(hdrext.id);
2021-10-23 15:32:13 +00:00
}
else if hdrext.uri == RTP_HDREXT_TRANSPORT_CC {
audio_hdrext_transport_cc = Some(hdrext.id);
}
2021-10-23 15:32:13 +00:00
}
}
else if description.media == "video" {
2021-10-23 15:32:13 +00:00
for pt in description.payload_types.iter() {
// We dont support any static codec, so name MUST be set.
if let Some(name) = &pt.name {
match name.as_str() {
"H264" => {
h264 = Some(Codec {
name: CodecName::H264,
pt: pt.id,
rtx_pt: None,
rtcp_fbs: pt.rtcp_fbs.clone(),
});
2022-03-06 09:24:02 +00:00
},
2021-10-23 15:32:13 +00:00
"VP8" => {
vp8 = Some(Codec {
name: CodecName::Vp8,
pt: pt.id,
rtx_pt: None,
rtcp_fbs: pt.rtcp_fbs.clone(),
});
2022-03-06 09:24:02 +00:00
},
2021-10-23 15:32:13 +00:00
"VP9" => {
vp9 = Some(Codec {
name: CodecName::Vp9,
pt: pt.id,
rtx_pt: None,
rtcp_fbs: pt.rtcp_fbs.clone(),
});
2022-03-06 09:24:02 +00:00
},
2021-10-23 15:32:13 +00:00
_ => (),
}
}
}
for pt in description.payload_types.iter() {
if let Some(name) = &pt.name {
if name == "rtx" {
for param in pt.parameters.iter() {
if param.name == "apt" {
let apt_pt: u8 = param.value.parse()?;
if let Some(h264) = &mut h264 {
if apt_pt == h264.pt {
h264.rtx_pt = Some(pt.id);
}
2021-10-23 15:32:13 +00:00
}
if let Some(vp8) = &mut vp8 {
if apt_pt == vp8.pt {
vp8.rtx_pt = Some(pt.id);
}
2021-10-23 15:32:13 +00:00
}
if let Some(vp9) = &mut vp9 {
if apt_pt == vp9.pt {
vp9.rtx_pt = Some(pt.id);
}
2021-10-23 15:32:13 +00:00
}
}
}
}
}
}
for hdrext in description.hdrexts.iter() {
if hdrext.uri == RTP_HDREXT_TRANSPORT_CC {
video_hdrext_transport_cc = Some(hdrext.id);
2021-10-23 15:32:13 +00:00
}
}
}
else {
debug!("skipping media: {}", description.media);
return Ok(None);
}
let codecs = [opus, h264, vp8, vp9].iter().flatten().cloned().collect();
for ssrc in &description.ssrcs {
let owner = ssrc
.info
.as_ref()
.context("missing ssrc-info")?
.owner
.clone();
debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc);
remote_ssrc_map.insert(
ssrc.id,
Source {
ssrc: ssrc.id,
participant_id: if owner == "jvb" {
None
}
else {
Some(
owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
)
},
media_type: if description.media == "audio" {
MediaType::Audio
}
else {
MediaType::Video
},
},
);
}
Ok(Some(ParsedRtpDescription {
2022-03-06 09:24:02 +00:00
codecs,
audio_hdrext_ssrc_audio_level,
audio_hdrext_transport_cc,
2022-03-06 09:24:02 +00:00
video_hdrext_transport_cc,
}))
}
2022-03-06 09:24:02 +00:00
async fn setup_ice(
conference: &JitsiConference,
transport: &IceUdpTransport,
) -> Result<(nice::Agent, u32, u32)> {
2021-08-13 16:09:17 +00:00
let ice_agent = nice::Agent::new(&conference.glib_main_context, nice::Compatibility::Rfc5245);
ice_agent.set_ice_tcp(false);
ice_agent.set_upnp(false);
let ice_stream_id = ice_agent.add_stream(1);
let ice_component_id = 1;
2021-08-13 11:48:59 +00:00
let maybe_stun = conference
.external_services
.iter()
.find(|svc| svc.r#type == "stun");
2021-08-13 16:09:17 +00:00
2021-08-13 11:48:59 +00:00
let stun_addr = if let Some(stun) = maybe_stun {
2021-08-13 16:09:17 +00:00
lookup_host(format!(
"{}:{}",
stun.host,
stun.port.unwrap_or(DEFAULT_STUN_PORT)
))
.await?
.next()
2021-08-13 11:48:59 +00:00
}
else {
None
};
debug!("STUN address: {:?}", stun_addr);
if let Some((stun_addr, stun_port)) = stun_addr.map(|sa| (sa.ip().to_string(), sa.port())) {
ice_agent.set_stun_server(Some(&stun_addr));
ice_agent.set_stun_server_port(stun_port as u32);
}
2021-08-13 16:09:17 +00:00
let maybe_turn = conference
.external_services
.iter()
.find(|svc| svc.r#type == "turns");
if let Some(turn_server) = maybe_turn {
let maybe_addr = lookup_host(format!(
"{}:{}",
turn_server.host,
turn_server.port.unwrap_or(DEFAULT_TURNS_PORT)
))
.await?
.next();
if let Some(addr) = maybe_addr {
debug!("TURN address: {:?}", addr);
ice_agent.set_relay_info(
ice_stream_id,
ice_component_id,
&addr.ip().to_string(),
addr.port() as u32,
turn_server.username.as_deref().unwrap_or_default(),
turn_server.password.as_deref().unwrap_or_default(),
nice::RelayType::Tls,
);
}
}
2021-08-13 11:48:59 +00:00
if !ice_agent.attach_recv(
ice_stream_id,
ice_component_id,
&conference.glib_main_context,
|_, _, _, s| debug!("ICE nice_agent_attach_recv cb: {}", s),
) {
warn!("nice_agent_attach_recv failed");
}
debug!("ice_agent={:?}", ice_agent);
debug!("ice_stream_id={}", ice_stream_id);
debug!("ice_component_id={}", ice_component_id);
2021-10-23 14:45:25 +00:00
if let (Some(ufrag), Some(pwd)) = (&transport.ufrag, &transport.pwd) {
2021-08-13 11:48:59 +00:00
debug!("setting ICE remote credentials");
if !ice_agent.set_remote_credentials(ice_stream_id, ufrag, pwd) {
warn!("nice_agent_set_remote_candidates failed");
}
}
ice_agent.connect_candidate_gathering_done(move |_agent, candidates| {
debug!("ICE candidate-gathering-done {:?}", candidates);
2021-08-13 11:48:59 +00:00
});
debug!("gathering ICE candidates");
if !ice_agent.gather_candidates(ice_stream_id) {
warn!("nice_agent_gather_candidates failed");
}
2021-10-23 14:45:25 +00:00
if let (Some(ufrag), Some(pwd), remote_candidates) =
(&transport.ufrag, &transport.pwd, &transport.candidates)
2021-08-13 11:48:59 +00:00
{
debug!("setting ICE remote candidates: {:?}", remote_candidates);
let remote_candidates: Vec<_> = remote_candidates
.iter()
.map(|c| {
let mut candidate = nice::Candidate::new(match c.type_ {
jingle_ice_udp::Type::Host => nice::CandidateType::Host,
jingle_ice_udp::Type::Prflx => nice::CandidateType::PeerReflexive,
jingle_ice_udp::Type::Srflx => nice::CandidateType::ServerReflexive,
jingle_ice_udp::Type::Relay => nice::CandidateType::Relayed,
});
candidate.set_stream_id(ice_stream_id);
candidate.set_component_id(c.component as u32);
candidate.set_foundation(&c.foundation);
candidate.set_addr(SocketAddr::new(c.ip, c.port));
candidate.set_priority(c.priority);
candidate.set_username(ufrag);
candidate.set_password(pwd);
debug!("candidate: {:?}", candidate);
candidate
})
.collect();
let candidate_refs: Vec<_> = remote_candidates.iter().collect();
let res = ice_agent.set_remote_candidates(ice_stream_id, ice_component_id, &candidate_refs);
if res < remote_candidates.len() as i32 {
warn!("some remote candidates failed to add: {}", res);
}
}
2021-10-23 14:45:25 +00:00
Ok((ice_agent, ice_stream_id, ice_component_id))
}
pub(crate) async fn initiate(conference: &JitsiConference, jingle: Jingle) -> Result<Self> {
let initiator = jingle
.initiator
.as_ref()
.ok_or_else(|| anyhow!("session-initiate with no initiator"))?
.clone();
debug!("Received Jingle session-initiate from {}", initiator);
let mut ice_transport = None;
let mut codecs = vec![];
2021-10-23 14:45:25 +00:00
let mut audio_hdrext_ssrc_audio_level = None;
let mut audio_hdrext_transport_cc = None;
2021-10-23 14:45:25 +00:00
let mut video_hdrext_transport_cc = None;
let mut remote_ssrc_map = HashMap::new();
for content in &jingle.contents {
if let Some(Description::Rtp(description)) = &content.description {
2022-03-06 09:24:02 +00:00
if let Some(description) =
JingleSession::parse_rtp_description(description, &mut remote_ssrc_map)?
{
codecs.extend(description.codecs);
2022-03-06 09:24:02 +00:00
audio_hdrext_ssrc_audio_level =
audio_hdrext_ssrc_audio_level.or(description.audio_hdrext_ssrc_audio_level);
audio_hdrext_transport_cc =
audio_hdrext_transport_cc.or(description.audio_hdrext_transport_cc);
2022-03-06 09:24:02 +00:00
video_hdrext_transport_cc =
video_hdrext_transport_cc.or(description.video_hdrext_transport_cc);
2021-10-23 14:45:25 +00:00
}
}
if let Some(Transport::IceUdp(transport)) = &content.transport {
if let Some(fingerprint) = &transport.fingerprint {
if fingerprint.hash != Algo::Sha_256 {
bail!("unsupported fingerprint hash: {:?}", fingerprint.hash);
}
}
ice_transport = Some(transport);
}
}
let ice_transport = ice_transport.context("missing ICE transport")?;
if let Some(remote_fingerprint) = &ice_transport.fingerprint {
warn!(
"Remote DTLS fingerprint (verification not implemented yet): {:?}",
remote_fingerprint
);
}
let mut dtls_cert_params = CertificateParams::new(vec!["gst-meet".to_owned()]);
dtls_cert_params.alg = &PKCS_ECDSA_P256_SHA256;
let dtls_cert = Certificate::from_params(dtls_cert_params)?;
let dtls_cert_der = dtls_cert.serialize_der()?;
let fingerprint = digest(&SHA256, &dtls_cert_der).as_ref().to_vec();
let fingerprint_str =
itertools::join(fingerprint.iter().map(|byte| format!("{:X}", byte)), ":");
let dtls_cert_pem = pem::encode(&Pem {
tag: "CERTIFICATE".to_string(),
contents: dtls_cert_der,
});
let dtls_private_key_pem = pem::encode(&Pem {
tag: "PRIVATE KEY".to_string(),
contents: dtls_cert.serialize_private_key_der(),
});
debug!("Local DTLS certificate:\n{}", dtls_cert_pem);
debug!("Local DTLS fingerprint: {}", fingerprint_str);
let audio_ssrc: u32 = random();
let video_ssrc: u32 = random();
let video_rtx_ssrc: u32 = random();
debug!("audio SSRC: {}", audio_ssrc);
debug!("video SSRC: {}", video_ssrc);
debug!("video RTX SSRC: {}", video_rtx_ssrc);
2022-03-06 09:24:02 +00:00
let (ice_agent, ice_stream_id, ice_component_id) =
JingleSession::setup_ice(conference, ice_transport).await?;
2021-10-23 14:45:25 +00:00
let (ice_local_ufrag, ice_local_pwd) = ice_agent
.local_credentials(ice_stream_id)
.context("no local ICE credentials")?;
debug!("building gstreamer pipeline");
2021-08-13 11:48:59 +00:00
let pipeline = gstreamer::Pipeline::new(None);
2021-08-13 11:48:59 +00:00
let rtpbin = gstreamer::ElementFactory::make("rtpbin", Some("rtpbin"))?;
rtpbin.set_property_from_str("rtp-profile", "savpf");
rtpbin.set_property("autoremove", true);
rtpbin.set_property("do-lost", true);
rtpbin.set_property("do-sync-event", true);
pipeline.add(&rtpbin)?;
2021-08-13 11:48:59 +00:00
let nicesrc = gstreamer::ElementFactory::make("nicesrc", None)?;
nicesrc.set_property("stream", ice_stream_id);
nicesrc.set_property("component", ice_component_id);
nicesrc.set_property("agent", &ice_agent);
pipeline.add(&nicesrc)?;
let nicesink = gstreamer::ElementFactory::make("nicesink", None)?;
nicesink.set_property("stream", ice_stream_id);
nicesink.set_property("component", ice_component_id);
nicesink.set_property("agent", &ice_agent);
pipeline.add(&nicesink)?;
2021-08-13 11:48:59 +00:00
let dtls_srtp_connection_id = "gst-meet";
2021-08-13 11:48:59 +00:00
2021-08-24 08:40:08 +00:00
let dtlssrtpenc = gstreamer::ElementFactory::make("dtlssrtpenc", None)?;
dtlssrtpenc.set_property("connection-id", dtls_srtp_connection_id);
dtlssrtpenc.set_property("is-client", true);
2021-08-24 08:40:08 +00:00
pipeline.add(&dtlssrtpenc)?;
let dtlssrtpdec = gstreamer::ElementFactory::make("dtlssrtpdec", None)?;
dtlssrtpdec.set_property("connection-id", dtls_srtp_connection_id);
dtlssrtpdec.set_property(
"pem",
format!("{}\n{}", dtls_cert_pem, dtls_private_key_pem),
);
pipeline.add(&dtlssrtpdec)?;
2021-08-13 11:48:59 +00:00
{
let codecs = codecs.clone();
rtpbin.connect("request-pt-map", false, move |values| {
let f = || {
debug!("rtpbin request-pt-map {:?}", values);
let pt = values[2].get::<u32>()? as u8;
2022-03-09 06:35:00 +00:00
let mut caps = gstreamer::Caps::builder("application/x-rtp").field("payload", pt as i32);
for codec in codecs.iter() {
if codec.is(pt) {
if codec.is_audio() {
caps = caps
.field("media", "audio")
.field("encoding-name", "OPUS")
.field("clock-rate", 48000);
if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL);
}
if let Some(hdrext) = audio_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
}
2021-08-25 04:21:43 +00:00
}
else {
// A video codec, as the only audio codec we support is Opus.
caps = caps
.field("media", "video")
.field("clock-rate", 90000)
.field("encoding-name", codec.encoding_name())
.field("rtcp-fb-nack-pli", true);
if let Some(hdrext) = video_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
}
2021-08-25 04:21:43 +00:00
}
return Ok::<_, anyhow::Error>(Some(caps.build()));
}
else if codec.is_rtx(pt) {
caps = caps
.field("media", "video")
.field("clock-rate", 90000)
.field("encoding-name", "RTX")
.field("apt", codec.pt);
return Ok(Some(caps.build()));
}
}
2021-08-13 11:48:59 +00:00
warn!("unknown payload type: {}", pt);
Ok(None)
};
match f() {
Ok(Some(caps)) => {
debug!("mapped pt to caps: {:?}", caps);
Some(caps.to_value())
},
Ok(None) => None,
Err(e) => {
error!("handling request-pt-map: {:?}", e);
None
},
}
});
}
2021-08-13 11:48:59 +00:00
2021-08-24 08:40:08 +00:00
let handle = Handle::current();
let jingle_session = conference.jingle_session.clone();
let buffer_size = conference.config.buffer_size;
2021-08-24 08:40:08 +00:00
rtpbin.connect("new-jitterbuffer", false, move |values| {
let handle = handle.clone();
let jingle_session = jingle_session.clone();
2021-08-24 08:40:08 +00:00
let f = move || {
let rtpjitterbuffer: gstreamer::Element = values[1].get()?;
let session: u32 = values[2].get()?;
let ssrc: u32 = values[3].get()?;
2021-08-25 04:21:43 +00:00
debug!(
"new jitterbuffer created for session {} ssrc {}",
session, ssrc
);
2021-08-24 08:40:08 +00:00
let source = handle.block_on(async move {
Ok::<_, anyhow::Error>(
jingle_session
.lock()
.await
.as_ref()
.context("not connected (no jingle session)")?
2021-08-24 08:40:08 +00:00
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
.clone(),
)
})?;
debug!("jitterbuffer is for remote source: {:?}", source);
if source.media_type == MediaType::Video && source.participant_id.is_some() {
2021-08-24 08:40:08 +00:00
debug!("enabling RTX for ssrc {}", ssrc);
rtpjitterbuffer.set_property("do-retransmission", true);
rtpjitterbuffer.set_property("drop-on-latency", true);
rtpjitterbuffer.set_property("latency", buffer_size);
2021-08-24 08:40:08 +00:00
}
Ok::<_, anyhow::Error>(())
};
if let Err(e) = f() {
warn!("new-jitterbuffer: {:?}", e);
}
None
});
2021-08-24 08:40:08 +00:00
2022-03-06 09:24:02 +00:00
let pts: Vec<(String, u32)> = codecs
.iter()
.filter(|codec| codec.is_video())
2022-03-06 09:24:02 +00:00
.flat_map(|codec| {
codec
.rtx_pt
.map(|rtx_pt| (codec.pt.to_string(), rtx_pt as u32))
})
.collect();
{
let pts = pts.clone();
rtpbin.connect("request-aux-sender", false, move |values| {
let f = || {
let session: u32 = values[1].get()?;
debug!("creating RTX sender for session {}", session);
let mut pt_map = gstreamer::Structure::builder("application/x-rtp-pt-map");
let mut ssrc_map = gstreamer::Structure::builder("application/x-rtp-ssrc-map");
for (pt, rtx_pt) in pts.iter() {
pt_map = pt_map.field(pt, rtx_pt);
}
ssrc_map = ssrc_map.field(&video_ssrc.to_string(), &(video_rtx_ssrc as u32));
let bin = gstreamer::Bin::new(None);
let rtx_sender = gstreamer::ElementFactory::make("rtprtxsend", None)?;
rtx_sender.set_property("payload-type-map", pt_map.build());
rtx_sender.set_property("ssrc-map", ssrc_map.build());
bin.add(&rtx_sender)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_sender
.static_pad("src")
.context("rtprtxsend has no src pad")?,
)?)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_sender
.static_pad("sink")
.context("rtprtxsend has no sink pad")?,
)?)?;
Ok::<_, anyhow::Error>(Some(bin.to_value()))
};
match f() {
Ok(o) => o,
Err(e) => {
warn!("request-aux-sender: {:?}", e);
None
},
}
});
}
rtpbin.connect("request-aux-receiver", false, move |values| {
let f = || {
let session: u32 = values[1].get()?;
debug!("creating RTX receiver for session {}", session);
let mut pt_map = gstreamer::Structure::builder("application/x-rtp-pt-map");
for (pt, rtx_pt) in pts.iter() {
pt_map = pt_map.field(pt, rtx_pt);
}
let bin = gstreamer::Bin::new(None);
let rtx_receiver = gstreamer::ElementFactory::make("rtprtxreceive", None)?;
rtx_receiver.set_property("payload-type-map", pt_map.build());
bin.add(&rtx_receiver)?;
2021-08-25 04:21:43 +00:00
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_receiver
.static_pad("src")
.context("rtprtxreceive has no src pad")?,
)?)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_receiver
.static_pad("sink")
.context("rtprtxreceive has no sink pad")?,
)?)?;
Ok::<_, anyhow::Error>(Some(bin.to_value()))
};
match f() {
Ok(o) => o,
Err(e) => {
warn!("request-aux-receiver: {:?}", e);
None
},
}
});
2021-08-25 04:21:43 +00:00
{
let handle = Handle::current();
let conference = conference.clone();
2021-08-25 04:21:43 +00:00
let pipeline = pipeline.clone();
let rtpbin_ = rtpbin.clone();
let codecs = codecs.clone();
2021-08-25 04:21:43 +00:00
rtpbin.connect("pad-added", false, move |values| {
let rtpbin = &rtpbin_;
let f = || {
debug!("rtpbin pad-added {:?}", values);
let pad: gstreamer::Pad = values[1].get()?;
let pad_name: String = pad.property("name");
2021-08-25 04:21:43 +00:00
if pad_name.starts_with("recv_rtp_src_0_") {
let mut parts = pad_name.split('_').skip(4);
let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?;
let pt: u8 = parts.next().context("malformed pad name")?.parse()?;
let source = handle.block_on(async {
Ok::<_, anyhow::Error>(
conference
.jingle_session
.lock()
.await
.as_ref()
.context("not connected (no jingle session)")?
2021-08-25 04:21:43 +00:00
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
.clone(),
)
})?;
debug!("pad added for remote source: {:?}", source);
let depayloader = match source.media_type {
2021-08-25 04:21:43 +00:00
MediaType::Audio => {
2022-03-06 09:24:02 +00:00
let codec = codecs
.iter()
.filter(|codec| codec.is_audio())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.depayloader_name(), None)?
2021-08-25 04:21:43 +00:00
}
else {
bail!("received audio with unsupported PT {}", pt);
}
},
MediaType::Video => {
2022-03-06 09:24:02 +00:00
let codec = codecs
.iter()
.filter(|codec| codec.is_video())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.depayloader_name(), None)?
2021-08-25 04:21:43 +00:00
}
else {
bail!("received video with unsupported PT {}", pt);
}
},
};
depayloader.set_property("auto-header-extension", false);
depayloader.connect("request-extension", false, move |values| {
2021-08-25 04:21:43 +00:00
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!("depayloader requested extension: {} {}", ext_id, ext_uri);
let hdrext = RTPHeaderExtension::create_from_uri(&ext_uri)
.context("failed to create hdrext")?;
hdrext.set_id(ext_id);
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
2021-08-13 11:48:59 +00:00
}
});
2021-08-25 04:21:43 +00:00
pipeline
.add(&depayloader)
2021-08-25 04:21:43 +00:00
.context("failed to add depayloader to pipeline")?;
2022-03-09 06:35:00 +00:00
depayloader.sync_state_with_parent()?;
2021-08-25 04:21:43 +00:00
debug!("created depayloader");
rtpbin
.link_pads(Some(&pad_name), &depayloader, None)
2021-08-25 04:21:43 +00:00
.context(format!("failed to link rtpbin.{} to depayloader", pad_name))?;
2021-08-25 04:21:43 +00:00
debug!("linked rtpbin.{} to depayloader", pad_name);
debug!("rtpbin pads:\n{}", dump_pads(&rtpbin));
let queue = gstreamer::ElementFactory::make("queue", None)?;
2022-03-09 06:35:00 +00:00
pipeline
.add(&queue)
.context("failed to add queue to pipeline")?;
queue.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
depayloader
.link(&queue)
.context("failed to link depayloader to queue")?;
let decoder = match source.media_type {
MediaType::Audio => {
let codec = codecs
.iter()
.filter(|codec| codec.is_audio())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.decoder_name(), None)?
// TODO: fec
}
else {
bail!("received audio with unsupported PT {}", pt);
}
},
MediaType::Video => {
let codec = codecs
.iter()
.filter(|codec| codec.is_video())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
let decoder = gstreamer::ElementFactory::make(codec.decoder_name(), None)?;
decoder.set_property("automatic-request-sync-points", true);
2022-03-09 06:35:00 +00:00
decoder.set_property_from_str(
"automatic-request-sync-point-flags",
"GST_VIDEO_DECODER_REQUEST_SYNC_POINT_CORRUPT_OUTPUT",
);
decoder
}
else {
bail!("received video with unsupported PT {}", pt);
}
},
};
2022-03-09 06:35:00 +00:00
pipeline
.add(&decoder)
.context("failed to add decoder to pipeline")?;
decoder.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
queue
.link(&decoder)
.context("failed to link queue to decoder")?;
let src_pad = match source.media_type {
MediaType::Audio => decoder
.static_pad("src")
.context("decoder has no src pad")?,
MediaType::Video => {
let videoscale = gstreamer::ElementFactory::make("videoscale", None)?;
2022-03-09 06:35:00 +00:00
pipeline
.add(&videoscale)
.context("failed to add videoscale to pipeline")?;
videoscale.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
decoder
.link(&videoscale)
.context("failed to link decoder to videoscale")?;
let capsfilter = gstreamer::ElementFactory::make("capsfilter", None)?;
2022-03-09 06:35:00 +00:00
capsfilter.set_property_from_str(
"caps",
&format!(
"video/x-raw, width={}, height={}",
conference.config.recv_video_scale_width,
conference.config.recv_video_scale_height
),
);
pipeline
.add(&capsfilter)
.context("failed to add capsfilter to pipeline")?;
capsfilter.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
videoscale
.link(&capsfilter)
.context("failed to link videoscale to capsfilter")?;
let videoconvert = gstreamer::ElementFactory::make("videoconvert", None)?;
2022-03-09 06:35:00 +00:00
pipeline
.add(&videoconvert)
.context("failed to add videoconvert to pipeline")?;
videoconvert.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
capsfilter
.link(&videoconvert)
.context("failed to link capsfilter to videoconvert")?;
2022-03-09 06:35:00 +00:00
videoconvert
.static_pad("src")
.context("videoconvert has no src pad")?
},
};
2021-08-25 04:21:43 +00:00
if let Some(participant_id) = source.participant_id {
handle.block_on(conference.ensure_participant(&participant_id))?;
let maybe_sink_element = match source.media_type {
2022-03-09 06:35:00 +00:00
MediaType::Audio => {
handle.block_on(conference.remote_participant_audio_sink_element())
},
MediaType::Video => {
handle.block_on(conference.remote_participant_video_sink_element())
},
};
if let Some(sink_element) = maybe_sink_element {
let sink_pad = sink_element
.request_pad_simple("sink_%u")
.context("no suitable sink pad provided by sink element in recv pipeline")?;
2022-03-09 06:35:00 +00:00
let ghost_pad = GhostPad::with_target(
Some(&format!(
"participant_{}_{:?}",
participant_id, source.media_type
)),
&sink_pad,
)?;
let bin: Bin = sink_element
.parent()
.context("sink element has no parent")?
.downcast()
.map_err(|_| anyhow!("sink element's parent is not a bin"))?;
bin.add_pad(&ghost_pad)?;
2022-03-09 06:35:00 +00:00
src_pad
.link(&ghost_pad)
.context("failed to link decode chain to participant bin from recv pipeline")?;
info!(
"linked {}/{:?} to new pad in recv pipeline",
participant_id, source.media_type
);
}
else if let Some(participant_bin) =
pipeline.by_name(&format!("participant_{}", participant_id))
{
let sink_pad_name = match source.media_type {
MediaType::Audio => "audio",
MediaType::Video => "video",
};
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
2022-03-09 06:35:00 +00:00
src_pad.link(&sink_pad).context(
"failed to link decode chain to participant bin from recv participant pipeline",
)?;
info!(
"linked {}/{:?} to recv participant pipeline",
participant_id, source.media_type
);
}
else {
warn!(
"no {} sink pad on {} participant bin in recv participant pipeline",
sink_pad_name, participant_id
);
}
2021-08-13 11:48:59 +00:00
}
else {
warn!("no pipeline handled new participant: {}", participant_id);
2021-08-13 11:48:59 +00:00
}
}
else {
debug!("not looking for participant bin, source is owned by JVB");
2021-08-13 11:48:59 +00:00
}
2021-08-25 04:21:43 +00:00
if !src_pad.is_linked() {
debug!("nothing linked to decoder, adding fakesink");
2021-08-25 04:21:43 +00:00
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
pipeline.add(&fakesink)?;
fakesink.sync_state_with_parent()?;
2022-03-09 06:35:00 +00:00
let sink_pad = fakesink
.static_pad("sink")
.context("fakesink has no sink pad")?;
src_pad.link(&sink_pad)?;
2021-08-25 04:21:43 +00:00
}
2021-08-13 11:48:59 +00:00
2021-08-25 04:21:43 +00:00
gstreamer::debug_bin_to_dot_file(
&pipeline,
gstreamer::DebugGraphDetails::ALL,
&format!("ssrc-added-{}", ssrc),
);
2021-08-13 11:48:59 +00:00
2021-08-25 04:21:43 +00:00
Ok::<_, anyhow::Error>(())
}
else {
Ok(())
}
};
if let Err(e) = f() {
error!("handling pad-added: {:?}", e);
2021-08-13 11:48:59 +00:00
}
2021-08-25 04:21:43 +00:00
None
});
2021-08-25 04:21:43 +00:00
}
2021-08-13 11:48:59 +00:00
let opus = codecs.iter().find(|codec| codec.name == CodecName::Opus);
let audio_sink_element = if let Some(opus) = opus {
let audio_sink_element = gstreamer::ElementFactory::make(opus.payloader_name(), None)?;
audio_sink_element.set_property("pt", opus.pt as u32);
audio_sink_element
2022-03-06 09:24:02 +00:00
}
else {
bail!("no opus payload type in jingle session-initiate");
};
audio_sink_element.set_property("min-ptime", 10i64 * 1000 * 1000);
audio_sink_element.set_property("ssrc", audio_ssrc);
if audio_sink_element.has_property("auto-header-extension", None) {
audio_sink_element.set_property("auto-header-extension", false);
audio_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"audio payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
2021-08-25 04:21:43 +00:00
}
});
}
else {
debug!("audio payloader: no rtp header extension support");
}
2021-08-13 11:48:59 +00:00
pipeline.add(&audio_sink_element)?;
let codec_name = conference.config.video_codec.as_str();
let codec = codecs.iter().find(|codec| codec.is_codec(codec_name));
let video_sink_element = if let Some(codec) = codec {
let element = gstreamer::ElementFactory::make(codec.payloader_name(), None)?;
element.set_property("pt", codec.pt as u32);
if codec.name == CodecName::H264 {
2021-08-13 11:48:59 +00:00
element.set_property_from_str("aggregate-mode", "zero-latency");
}
else {
2021-08-13 11:48:59 +00:00
element.set_property_from_str("picture-id-mode", "15-bit");
}
element
}
else {
bail!("unsupported video codec: {}", codec_name);
2021-08-13 11:48:59 +00:00
};
video_sink_element.set_property("ssrc", video_ssrc);
if video_sink_element.has_property("auto-header-extension", None) {
video_sink_element.set_property("auto-header-extension", false);
video_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"video payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
2021-08-25 04:21:43 +00:00
}
});
}
else {
debug!("video payloader: no rtp header extension support");
}
2021-08-13 11:48:59 +00:00
pipeline.add(&video_sink_element)?;
let rtpfunnel = gstreamer::ElementFactory::make("rtpfunnel", None)?;
pipeline.add(&rtpfunnel)?;
debug!("linking video payloader -> rtpfunnel");
video_sink_element.link(&rtpfunnel)?;
debug!("linking audio payloader -> rtpfunnel");
audio_sink_element.link(&rtpfunnel)?;
debug!("linking rtpfunnel -> rtpbin");
rtpfunnel.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?;
let rtp_recv_identity = gstreamer::ElementFactory::make("identity", None)?;
pipeline.add(&rtp_recv_identity)?;
let rtcp_recv_identity = gstreamer::ElementFactory::make("identity", None)?;
pipeline.add(&rtcp_recv_identity)?;
let rtp_send_identity = gstreamer::ElementFactory::make("identity", None)?;
pipeline.add(&rtp_send_identity)?;
let rtcp_send_identity = gstreamer::ElementFactory::make("identity", None)?;
pipeline.add(&rtcp_send_identity)?;
#[cfg(feature = "log-rtp")]
{
if conference.config.log_rtp {
debug!("setting up RTP packet logging");
let make_rtp_logger = |direction: &'static str| {
move |values: &[glib::Value]| -> Option<glib::Value> {
let f = || {
let buffer: gstreamer::Buffer = values[1].get()?;
let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?;
debug!(
2022-03-09 06:35:00 +00:00
ssrc = rtp_buffer.ssrc(),
pt = rtp_buffer.payload_type(),
seq = rtp_buffer.seq(),
ts = rtp_buffer.timestamp(),
marker = rtp_buffer.is_marker(),
extension = rtp_buffer.is_extension(),
payload_size = rtp_buffer.payload_size(),
"RTP {}",
direction,
);
Ok::<_, anyhow::Error>(())
};
if let Err(e) = f() {
warn!("RTP {}: {:?}", direction, e);
}
None
}
};
rtp_recv_identity.connect("handoff", false, make_rtp_logger("RECV"));
rtp_send_identity.connect("handoff", false, make_rtp_logger("SEND"));
}
if conference.config.log_rtcp {
debug!("setting up RTCP packet logging");
let make_rtcp_logger = |direction: &'static str| {
move |values: &[glib::Value]| -> Option<glib::Value> {
let f = || {
let buffer: gstreamer::Buffer = values[1].get()?;
let mut buf = [0u8; 1500];
2022-03-09 06:35:00 +00:00
buffer
.copy_to_slice(0, &mut buf[..buffer.size()])
.map_err(|_| anyhow!("invalid RTCP packet size"))?;
let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?;
2022-03-09 06:35:00 +00:00
debug!("RTCP {} size={}\n{:#?}", direction, buffer.size(), decoded,);
Ok::<_, anyhow::Error>(())
};
if let Err(e) = f() {
warn!("RTCP {}: {:?}", direction, e);
}
None
}
};
rtcp_recv_identity.connect("handoff", false, make_rtcp_logger("RECV"));
rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND"));
}
}
2022-03-09 06:35:00 +00:00
2021-08-24 08:40:08 +00:00
debug!("link dtlssrtpdec -> rtpbin");
dtlssrtpdec.link_pads(Some("rtp_src"), &rtp_recv_identity, None)?;
rtp_recv_identity.link_pads(None, &rtpbin, Some("recv_rtp_sink_0"))?;
dtlssrtpdec.link_pads(Some("rtcp_src"), &rtcp_recv_identity, None)?;
rtcp_recv_identity.link_pads(None, &rtpbin, Some("recv_rtcp_sink_0"))?;
2021-08-24 08:40:08 +00:00
debug!("linking rtpbin -> dtlssrtpenc");
rtpbin.link_pads(Some("send_rtp_src_0"), &rtp_send_identity, None)?;
rtp_send_identity.link_pads(None, &dtlssrtpenc, Some("rtp_sink_0"))?;
rtpbin.link_pads(Some("send_rtcp_src_0"), &rtcp_send_identity, None)?;
rtcp_send_identity.link_pads(None, &dtlssrtpenc, Some("rtcp_sink_0"))?;
debug!("rtpbin pads:\n{}", dump_pads(&rtpbin));
2021-08-24 08:40:08 +00:00
debug!("linking ice src -> dtlssrtpdec");
nicesrc.link(&dtlssrtpdec)?;
debug!("linking dtlssrtpenc -> ice sink");
dtlssrtpenc.link_pads(Some("src"), &nicesink, Some("sink"))?;
2021-08-13 11:48:59 +00:00
let bus = pipeline.bus().context("failed to get pipeline bus")?;
let (pipeline_state_null_tx, pipeline_state_null_rx) = oneshot::channel();
tokio::spawn(async move {
let mut stream = bus.stream();
while let Some(msg) = stream.next().await {
match msg.view() {
gstreamer::MessageView::Error(e) => {
if let Some(d) = e.debug() {
error!("{}", d);
}
},
gstreamer::MessageView::Warning(e) => {
if let Some(d) = e.debug() {
warn!("{}", d);
}
},
gstreamer::MessageView::StateChanged(state)
if state.current() == gstreamer::State::Null =>
{
debug!("pipeline state is null");
pipeline_state_null_tx.send(()).unwrap();
break;
2022-03-06 09:24:02 +00:00
},
2021-08-13 11:48:59 +00:00
_ => {},
}
}
});
gstreamer::debug_bin_to_dot_file(
&pipeline,
gstreamer::DebugGraphDetails::ALL,
"session-initiate",
);
let local_candidates = ice_agent.local_candidates(ice_stream_id, ice_component_id);
debug!("local candidates: {:?}", local_candidates);
debug!("building Jingle session-accept");
let mut jingle_accept = Jingle::new(Action::SessionAccept, jingle.sid.clone())
.with_initiator(
jingle
.initiator
.as_ref()
.context("jingle session-initiate with no initiator")?
.clone(),
)
.with_responder(Jid::Full(conference.jid.clone()));
for initiate_content in &jingle.contents {
let mut description = RtpDescription::new(initiate_content.name.0.clone());
description.payload_types = if initiate_content.name.0 == "audio" {
let codec = codecs.iter().find(|codec| codec.name == CodecName::Opus);
if let Some(codec) = codec {
2022-03-06 09:24:02 +00:00
let mut pt = PayloadType::new(codec.pt, "opus".to_owned(), 48000, 2);
pt.rtcp_fbs = codec.rtcp_fbs.clone();
vec![pt]
}
else {
bail!("no opus payload type in jingle session-initiate");
}
2021-08-13 11:48:59 +00:00
}
else {
2021-08-24 08:40:08 +00:00
let mut pts = vec![];
let codec_name = conference.config.video_codec.as_str();
let codec = codecs.iter().find(|codec| codec.is_codec(codec_name));
if let Some(codec) = codec {
let mut pt = PayloadType::new(codec.pt, codec.encoding_name().to_owned(), 90000, 1);
pt.rtcp_fbs = codec.rtcp_fbs.clone();
pts.push(pt);
if let Some(rtx_pt) = codec.rtx_pt {
let mut rtx_pt = PayloadType::new(rtx_pt, "rtx".to_owned(), 90000, 1);
rtx_pt.parameters = vec![jingle_rtp::Parameter {
name: "apt".to_owned(),
value: codec.pt.to_string(),
}];
rtx_pt.rtcp_fbs = codec
.rtcp_fbs
.clone()
.into_iter()
.filter(|fb| fb.type_ != "transport-cc")
.collect();
pts.push(rtx_pt);
}
}
else {
bail!("unsupported video codec: {}", codec_name);
2021-08-13 11:48:59 +00:00
}
2021-08-24 08:40:08 +00:00
pts
2021-08-13 11:48:59 +00:00
};
description.rtcp_mux = Some(RtcpMux);
let mslabel = Uuid::new_v4().to_string();
let label = Uuid::new_v4().to_string();
let cname = Uuid::new_v4().to_string();
2021-08-24 08:40:08 +00:00
description.ssrc = Some(if initiate_content.name.0 == "audio" {
audio_ssrc.to_string()
}
else {
video_ssrc.to_string()
});
description.ssrcs = if initiate_content.name.0 == "audio" {
vec![jingle_ssma::Source::new(audio_ssrc)]
2021-08-13 11:48:59 +00:00
}
else {
2021-08-24 08:40:08 +00:00
vec![
jingle_ssma::Source::new(video_ssrc),
jingle_ssma::Source::new(video_rtx_ssrc),
2021-08-24 08:40:08 +00:00
]
};
for ssrc in description.ssrcs.iter_mut() {
ssrc.parameters.push(Parameter {
name: "cname".to_owned(),
value: Some(cname.clone()),
});
ssrc.parameters.push(Parameter {
name: "msid".to_owned(),
value: Some(format!("{} {}", mslabel, label)),
});
}
2021-08-24 08:40:08 +00:00
description.ssrc_groups = if initiate_content.name.0 == "audio" {
vec![]
}
else {
vec![jingle_ssma::Group {
semantics: Semantics::Fid,
2021-08-24 08:40:08 +00:00
sources: vec![
jingle_ssma::Source::new(video_ssrc),
jingle_ssma::Source::new(video_rtx_ssrc),
2021-08-24 08:40:08 +00:00
],
}]
};
if initiate_content.name.0 == "audio" {
if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
description.hdrexts.push(RtpHdrext::new(
hdrext,
RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned(),
));
}
if let Some(hdrext) = audio_hdrext_transport_cc {
description
.hdrexts
.push(RtpHdrext::new(hdrext, RTP_HDREXT_TRANSPORT_CC.to_owned()));
}
}
else if initiate_content.name.0 == "video" {
if let Some(hdrext) = video_hdrext_transport_cc {
2022-03-06 09:24:02 +00:00
description
.hdrexts
.push(RtpHdrext::new(hdrext, RTP_HDREXT_TRANSPORT_CC.to_owned()));
}
}
2021-08-13 11:48:59 +00:00
2021-08-13 16:09:17 +00:00
let mut transport = IceUdpTransport::new().with_fingerprint(Fingerprint {
hash: Algo::Sha_256,
setup: Some(Setup::Active),
value: fingerprint.clone(),
// required: Some(true.to_string()),
2021-08-13 16:09:17 +00:00
});
2021-08-13 11:48:59 +00:00
transport.ufrag = Some(ice_local_ufrag.clone());
transport.pwd = Some(ice_local_pwd.clone());
transport.candidates = vec![];
for c in &local_candidates {
2021-08-14 06:04:40 +00:00
let addr = c.addr();
let foundation = c.foundation()?;
transport.candidates.push(jingle_ice_udp::Candidate {
component: c.component_id() as u8,
foundation: foundation.to_owned(),
generation: 0,
id: Uuid::new_v4().to_string(),
ip: addr.ip(),
port: addr.port(),
priority: c.priority(),
protocol: "udp".to_owned(),
type_: match c.type_() {
nice::CandidateType::Host => jingle_ice_udp::Type::Host,
nice::CandidateType::PeerReflexive => jingle_ice_udp::Type::Prflx,
nice::CandidateType::ServerReflexive => jingle_ice_udp::Type::Srflx,
nice::CandidateType::Relayed => jingle_ice_udp::Type::Relay,
other => bail!("unsupported candidate type: {:?}", other),
2021-08-13 11:48:59 +00:00
},
2021-08-14 06:04:40 +00:00
rel_addr: None,
rel_port: None,
network: None,
});
2021-08-13 11:48:59 +00:00
}
jingle_accept = jingle_accept.add_content(
Content::new(Creator::Responder, initiate_content.name.clone())
.with_senders(Senders::Both)
.with_description(description)
.with_transport(transport),
);
}
jingle_accept = jingle_accept.set_group(jingle_grouping::Group {
semantics: jingle_grouping::Semantics::Bundle,
2022-03-06 09:24:02 +00:00
contents: vec![GroupContent::new("video"), GroupContent::new("audio")],
});
2021-08-13 11:48:59 +00:00
let accept_iq_id = generate_id();
let session_accept_iq = Iq::from_set(accept_iq_id.clone(), jingle_accept)
.with_to(Jid::Full(conference.focus_jid_in_muc()?))
.with_from(Jid::Full(conference.jid.clone()));
conference.xmpp_tx.send(session_accept_iq.into()).await?;
Ok(Self {
pipeline,
audio_sink_element,
video_sink_element,
remote_ssrc_map,
_ice_agent: ice_agent,
2021-08-13 11:48:59 +00:00
accept_iq_id: Some(accept_iq_id),
2021-10-23 14:45:25 +00:00
colibri_url: ice_transport.web_socket.clone().map(|ws| ws.url),
colibri_channel: None,
stats_handler_task: None,
2021-08-13 11:48:59 +00:00
pipeline_state_null_rx,
})
}
pub(crate) async fn source_add(&mut self, jingle: Jingle) -> Result<()> {
for content in &jingle.contents {
if let Some(Description::Rtp(description)) = &content.description {
for ssrc in &description.ssrcs {
let owner = ssrc
.info
.as_ref()
.context("missing ssrc-info")?
.owner
.clone();
debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc);
self.remote_ssrc_map.insert(
ssrc.id,
2021-08-13 11:48:59 +00:00
Source {
ssrc: ssrc.id,
participant_id: if owner == "jvb" {
None
}
else {
Some(
owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
)
},
2021-08-13 11:48:59 +00:00
media_type: if description.media == "audio" {
MediaType::Audio
}
else {
MediaType::Video
},
},
);
}
}
}
Ok(())
}
}
fn dump_pads(element: &Element) -> String {
element
.pads()
.into_iter()
2022-03-09 06:35:00 +00:00
.map(|pad| {
format!(
" {}, peer={}.{}, caps=\"{}\"",
pad.name(),
pad
.peer()
.and_then(|peer| peer.parent_element())
.map(|element| element.name().to_string())
.unwrap_or_default(),
pad
.peer()
.map(|peer| peer.name().to_string())
.unwrap_or_default(),
pad.caps().map(|caps| caps.to_string()).unwrap_or_default(),
)
})
.join("\n")
2022-03-09 06:35:00 +00:00
}