From 037c2d944fd00b810c68fb48eebfa172c1726d7c Mon Sep 17 00:00:00 2001 From: Jasper Hugo Date: Tue, 7 Sep 2021 23:52:04 +0700 Subject: [PATCH] XMPP improvements - Handle unsolicited XMPP messages gracefully - Split out generic XMPP connection handler which can be used for connecting to brewery MUCs - Don't deadlock during Jingle handling --- Cargo.lock | 2 - lib-gst-meet/Cargo.toml | 3 +- lib-gst-meet/src/colibri.rs | 8 +- lib-gst-meet/src/conference.rs | 276 ++++++++++++++------- lib-gst-meet/src/jingle.rs | 279 ++++++++++++---------- lib-gst-meet/src/lib.rs | 8 +- lib-gst-meet/src/source.rs | 2 +- lib-gst-meet/src/stanza_filter.rs | 2 +- lib-gst-meet/src/{ => xmpp}/connection.rs | 174 +++++++------- lib-gst-meet/src/xmpp/extdisco.rs | 2 +- lib-gst-meet/src/xmpp/jitsi.rs | 16 ++ lib-gst-meet/src/xmpp/mod.rs | 1 + lib-gst-meet/src/xmpp/ns.rs | 2 + 13 files changed, 458 insertions(+), 317 deletions(-) rename lib-gst-meet/src/{ => xmpp}/connection.rs (67%) diff --git a/Cargo.lock b/Cargo.lock index fd6ee9e..207bd9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1829,8 +1829,6 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "xmpp-parsers-gst-meet" version = "0.18.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cde8e9611ca7cac569119e4ec1b6fe50da4df91a168fdab786693029ab1482a" dependencies = [ "base64", "blake2", diff --git a/lib-gst-meet/Cargo.toml b/lib-gst-meet/Cargo.toml index 3df4899..ca7d623 100644 --- a/lib-gst-meet/Cargo.toml +++ b/lib-gst-meet/Cargo.toml @@ -27,6 +27,7 @@ rcgen = { version = "0.8", default-features = false } ring = { version = "0.16", default-features = false } serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", default-features = false, features = ["std"] } +serde_with = { version = "1", default-features = false, features = ["macros"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros", "sync", "time"] } tokio-stream = { version = "0.1", default-features = false, features = ["time"] } tokio-tungstenite = { version = "0.14", default-features = false, features = ["connect", "rustls-tls"] } @@ -39,7 +40,7 @@ tracing-subscriber = { version = "0.2", optional = true, default-features = fals "tracing-log", ] } uuid = { version = "0.8", default-features = false, features = ["v4"] } -xmpp-parsers = { package = "xmpp-parsers-gst-meet", version = "0.18", default-features = false } +xmpp-parsers = { path = "../../xmpp-rs/xmpp-parsers", package = "xmpp-parsers-gst-meet", version = "0.18", default-features = false } [features] default = [] diff --git a/lib-gst-meet/src/colibri.rs b/lib-gst-meet/src/colibri.rs index 3b07b61..388267a 100644 --- a/lib-gst-meet/src/colibri.rs +++ b/lib-gst-meet/src/colibri.rs @@ -6,11 +6,13 @@ use futures::{ stream::{StreamExt, TryStreamExt}, }; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tokio_tungstenite::tungstenite::{http::Request, Message}; use tracing::{debug, error, info, warn}; +#[serde_as] #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "colibriClass")] pub enum ColibriMessage { @@ -20,7 +22,11 @@ pub enum ColibriMessage { previous_speakers: Vec, }, #[serde(rename_all = "camelCase")] - EndpointConnectivityStatusChangeEvent { endpoint: String, active: bool }, + EndpointConnectivityStatusChangeEvent { + endpoint: String, + #[serde_as(as = "DisplayFromStr")] + active: bool, + }, #[serde(rename_all = "camelCase")] EndpointMessage { from: String, diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index cc3e387..931ca62 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -4,17 +4,23 @@ use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use futures::stream::StreamExt; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; +use maplit::hashmap; use once_cell::sync::Lazy; +use serde::Serialize; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error, info, trace, warn}; +use uuid::Uuid; +pub use xmpp_parsers::disco::Feature; use xmpp_parsers::{ - disco::{DiscoInfoQuery, DiscoInfoResult, Feature}, + disco::{DiscoInfoQuery, DiscoInfoResult}, ecaps2::{self, ECaps2}, hashes::Algo, iq::{Iq, IqType}, jingle::{Action, Jingle}, + message::{Message, MessageType}, muc::{Muc, MucUser}, + nick::Nick, ns, presence::{self, Presence}, BareJid, Element, FullJid, Jid, @@ -25,7 +31,8 @@ use crate::{ jingle::JingleSession, source::MediaType, stanza_filter::StanzaFilter, - xmpp, + util::generate_id, + xmpp::{self, connection::Connection}, }; static DISCO_INFO: Lazy = Lazy::new(|| { @@ -70,6 +77,7 @@ pub struct JitsiConferenceConfig { pub nick: String, pub region: String, pub video_codec: String, + pub extra_muc_features: Vec, } #[derive(Clone)] @@ -79,6 +87,7 @@ pub struct JitsiConference { pub(crate) xmpp_tx: mpsc::Sender, pub(crate) config: JitsiConferenceConfig, pub(crate) external_services: Vec, + pub(crate) jingle_session: Arc>>, pub(crate) inner: Arc>, } @@ -94,7 +103,7 @@ impl fmt::Debug for JitsiConference { #[derive(Debug, Clone)] pub struct Participant { - pub jid: FullJid, + pub jid: Option, pub muc_jid: FullJid, pub nick: Option, } @@ -102,7 +111,6 @@ pub struct Participant { type BoxedResultFuture = Pin> + Send>>; pub(crate) struct JitsiConferenceInner { - pub(crate) jingle_session: Option, participants: HashMap, on_participant: Option BoxedResultFuture) + Send + Sync>>, @@ -112,7 +120,6 @@ pub(crate) struct JitsiConferenceInner { Option BoxedResultFuture) + Send + Sync>>, state: JitsiConferenceState, connected_tx: Option>, - connected_rx: Option>, } impl fmt::Debug for JitsiConferenceInner { @@ -124,51 +131,60 @@ impl fmt::Debug for JitsiConferenceInner { } impl JitsiConference { - #[tracing::instrument(level = "debug", skip(xmpp_tx), err)] - pub(crate) async fn new( + #[tracing::instrument(level = "debug", err)] + pub async fn join( + xmpp_connection: Connection, glib_main_context: glib::MainContext, - jid: FullJid, - xmpp_tx: mpsc::Sender, config: JitsiConferenceConfig, - external_services: Vec, ) -> Result { + let conference_stanza = xmpp::jitsi::Conference { + machine_uid: Uuid::new_v4().to_string(), + room: config.muc.to_string(), + properties: hashmap! { + // Disable voice processing + // TODO put this in config + "stereo".to_string() => "true".to_string(), + "startBitrate".to_string() => "800".to_string(), + }, + }; + let (tx, rx) = oneshot::channel(); - Ok(Self { + + let focus = config.focus.clone(); + + let conference = Self { glib_main_context, - jid, - xmpp_tx, + jid: xmpp_connection + .jid() + .await + .context("not connected (no JID)")?, + xmpp_tx: xmpp_connection.tx.clone(), config, - external_services, + external_services: xmpp_connection.external_services().await, + jingle_session: Arc::new(Mutex::new(None)), inner: Arc::new(Mutex::new(JitsiConferenceInner { state: JitsiConferenceState::Discovering, participants: HashMap::new(), on_participant: None, on_participant_left: None, on_colibri_message: None, - jingle_session: None, connected_tx: Some(tx), - connected_rx: Some(rx), })), - }) - } - - pub(crate) async fn connected(&self) -> Result<()> { - let rx = { - let mut locked_inner = self.inner.lock().await; - locked_inner - .connected_rx - .take() - .context("connected() called twice")? }; + + xmpp_connection.add_stanza_filter(conference.clone()).await; + + let iq = Iq::from_set(generate_id(), conference_stanza).with_to(Jid::Full(focus)); + xmpp_connection.tx.send(iq.into()).await?; + rx.await?; - Ok(()) + + Ok(conference) } #[tracing::instrument(level = "debug", err)] pub async fn leave(self) -> Result<()> { - let mut inner = self.inner.lock().await; - - if let Some(jingle_session) = inner.jingle_session.take() { + if let Some(jingle_session) = self.jingle_session.lock().await.take() { debug!("pausing all sinks"); jingle_session.pause_all_sinks(); @@ -224,10 +240,9 @@ impl JitsiConference { pub async fn pipeline(&self) -> Result { Ok( self - .inner + .jingle_session .lock() .await - .jingle_session .as_ref() .context("not connected (no jingle session)")? .pipeline(), @@ -255,10 +270,9 @@ impl JitsiConference { pub async fn audio_sink_element(&self) -> Result { Ok( self - .inner + .jingle_session .lock() .await - .jingle_session .as_ref() .context("not connected (no jingle session)")? .audio_sink_element(), @@ -268,10 +282,9 @@ impl JitsiConference { pub async fn video_sink_element(&self) -> Result { Ok( self - .inner + .jingle_session .lock() .await - .jingle_session .as_ref() .context("not connected (no jingle session)")? .video_sink_element(), @@ -280,10 +293,9 @@ impl JitsiConference { pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> { self - .inner + .jingle_session .lock() .await - .jingle_session .as_ref() .context("not connected (no jingle session)")? .colibri_channel @@ -293,6 +305,54 @@ impl JitsiConference { .await } + pub async fn send_json_message(&self, payload: &T) -> Result<()> { + let message = Message { + from: Some(Jid::Full(self.jid.clone())), + to: Some(Jid::Bare(self.config.muc.clone())), + id: Some(Uuid::new_v4().to_string()), + type_: MessageType::Groupchat, + bodies: Default::default(), + subjects: Default::default(), + thread: None, + payloads: vec![Element::try_from(xmpp::jitsi::JsonMessage { + payload: serde_json::to_value(payload)?, + })?], + }; + self.xmpp_tx.send(message.into()).await?; + Ok(()) + } + + pub(crate) async fn ensure_participant(&self, id: &str) -> Result<()> { + if !self.inner.lock().await.participants.contains_key(id) { + let participant = Participant { + jid: None, + muc_jid: self.config.muc.clone().with_resource(id), + nick: None, + }; + self + .inner + .lock() + .await + .participants + .insert(id.to_owned(), participant.clone()); + if let Some(f) = self.inner.lock().await.on_participant.as_ref().cloned() { + if let Err(e) = f(self.clone(), participant.clone()).await { + warn!("on_participant failed: {:?}", e); + } + else { + if let Ok(pipeline) = self.pipeline().await { + gstreamer::debug_bin_to_dot_file( + &pipeline, + gstreamer::DebugGraphDetails::ALL, + &format!("participant-added-{}", participant.muc_jid.resource), + ); + } + } + } + } + Ok(()) + } + #[tracing::instrument(level = "trace", skip(f))] pub async fn on_participant( &self, @@ -313,6 +373,15 @@ impl JitsiConference { if let Err(e) = f(self.clone(), participant.clone()).await { warn!("on_participant failed: {:?}", e); } + else { + if let Ok(pipeline) = self.pipeline().await { + gstreamer::debug_bin_to_dot_file( + &pipeline, + gstreamer::DebugGraphDetails::ALL, + &format!("participant-added-{}", participant.muc_jid.resource), + ); + } + } } } @@ -349,10 +418,9 @@ impl StanzaFilter for JitsiConference { #[tracing::instrument(level = "trace", err)] async fn take(&self, element: Element) -> Result<()> { - let mut locked_inner = self.inner.lock().await; - use JitsiConferenceState::*; - match locked_inner.state { + let state = self.inner.lock().await.state; + match state { Discovering => { let iq = Iq::try_from(element)?; if let IqType::Result(Some(element)) = iq.payload { @@ -377,34 +445,40 @@ impl StanzaFilter for JitsiConference { let jitsi_disco_hash = ecaps2::hash_ecaps2(&ecaps2::compute_disco(&jitsi_disco_info)?, Algo::Sha_256)?; - self - .send_presence(vec![ - Muc::new().into(), - ECaps2::new(vec![jitsi_disco_hash]).into(), - Element::builder("stats-id", "").append("gst-meet").build(), - Element::builder("jitsi_participant_codecType", "") - .append(self.config.video_codec.as_str()) - .build(), - Element::builder("jitsi_participant_region", "") - .append(self.config.region.as_str()) - .build(), - Element::builder("audiomuted", "").append("false").build(), - Element::builder("videomuted", "").append("false").build(), - Element::builder("nick", "http://jabber.org/protocol/nick") - .append(self.config.nick.as_str()) - .build(), - Element::builder("region", "http://jitsi.org/jitsi-meet") - .attr("id", &self.config.region) - .build(), - ]) - .await?; - locked_inner.state = JoiningMuc; + let mut presence = vec![ + Muc::new().into(), + ECaps2::new(vec![jitsi_disco_hash]).into(), + Element::builder("stats-id", "").append("gst-meet").build(), + Element::builder("jitsi_participant_codecType", "") + .append(self.config.video_codec.as_str()) + .build(), + Element::builder("jitsi_participant_region", "") + .append(self.config.region.as_str()) + .build(), + Element::builder("audiomuted", "").append("false").build(), + Element::builder("videomuted", "").append("false").build(), + Element::builder("nick", "http://jabber.org/protocol/nick") + .append(self.config.nick.as_str()) + .build(), + Element::builder("region", "http://jitsi.org/jitsi-meet") + .attr("id", &self.config.region) + .build(), + ]; + presence.extend( + self + .config + .extra_muc_features + .iter() + .map(|feature| Element::builder("feature", "").attr("var", feature).build()), + ); + self.send_presence(presence).await?; + self.inner.lock().await.state = JoiningMuc; }, JoiningMuc => { let presence = Presence::try_from(element)?; if BareJid::from(presence.from.as_ref().unwrap().clone()) == self.config.muc { debug!("Joined MUC: {}", self.config.muc); - locked_inner.state = Idle; + self.inner.lock().await.state = Idle; } }, Idle => { @@ -438,7 +512,7 @@ impl StanzaFilter for JitsiConference { .with_from(Jid::Full(self.jid.clone())); self.xmpp_tx.send(result_iq.into()).await?; - locked_inner.jingle_session = + *self.jingle_session.lock().await = Some(JingleSession::initiate(self, jingle).await?); } else { @@ -453,8 +527,10 @@ impl StanzaFilter for JitsiConference { .with_from(Jid::Full(self.jid.clone())); self.xmpp_tx.send(result_iq.into()).await?; - locked_inner + self .jingle_session + .lock() + .await .as_mut() .context("not connected (no jingle session")? .source_add(jingle) @@ -470,11 +546,11 @@ impl StanzaFilter for JitsiConference { } }, IqType::Result(_) => { - if let Some(jingle_session) = locked_inner.jingle_session.as_ref() { + if let Some(jingle_session) = self.jingle_session.lock().await.as_mut() { if Some(iq.id) == jingle_session.accept_iq_id { let colibri_url = jingle_session.colibri_url.clone(); - locked_inner.jingle_session.as_mut().unwrap().accept_iq_id = None; + jingle_session.accept_iq_id = None; debug!("Focus acknowledged session-accept"); @@ -483,11 +559,7 @@ impl StanzaFilter for JitsiConference { let colibri_channel = ColibriChannel::new(&colibri_url).await?; let (tx, rx) = mpsc::channel(8); colibri_channel.subscribe(tx).await; - locked_inner - .jingle_session - .as_mut() - .unwrap() - .colibri_channel = Some(colibri_channel); + jingle_session.colibri_channel = Some(colibri_channel); let self_ = self.clone(); tokio::spawn(async move { @@ -503,7 +575,7 @@ impl StanzaFilter for JitsiConference { }); } - if let Some(connected_tx) = locked_inner.connected_tx.take() { + if let Some(connected_tx) = self.inner.lock().await.connected_tx.take() { connected_tx.send(()).unwrap(); } } @@ -522,47 +594,77 @@ impl StanzaFilter for JitsiConference { let bare_from: BareJid = from.clone().into(); if bare_from == self.config.muc && from.resource != "focus" { trace!("received MUC presence from {}", from.resource); - for payload in presence.payloads { - if !payload.is("x", ns::MUC_USER) { - continue; - } - let muc_user = MucUser::try_from(payload)?; - debug!("MUC user presence: {:?}", muc_user); + let nick_payload = presence + .payloads + .iter() + .find(|e| e.is("nick", ns::NICK)) + .map(|e| Nick::try_from(e.clone())) + .transpose()?; + if let Some(muc_user_payload) = presence + .payloads + .into_iter() + .find(|e| e.is("x", ns::MUC_USER)) + { + let muc_user = MucUser::try_from(muc_user_payload)?; for item in muc_user.items { if let Some(jid) = &item.jid { if jid == &self.jid { continue; } let participant = Participant { - jid: jid.clone(), + jid: Some(jid.clone()), muc_jid: from.clone(), - nick: item.nick, + nick: item + .nick + .or_else(|| nick_payload.as_ref().map(|nick| nick.0.clone())), }; if presence.type_ == presence::Type::Unavailable - && locked_inner + && self + .inner + .lock() + .await .participants .remove(&from.resource.clone()) .is_some() { debug!("participant left: {:?}", jid); - if let Some(f) = &locked_inner.on_participant_left { + if let Some(f) = &self + .inner + .lock() + .await + .on_participant_left + .as_ref() + .cloned() + { debug!("calling on_participant_left with old participant"); if let Err(e) = f(self.clone(), participant).await { warn!("on_participant_left failed: {:?}", e); } } } - else if locked_inner + else if self + .inner + .lock() + .await .participants .insert(from.resource.clone(), participant.clone()) .is_none() { debug!("new participant: {:?}", jid); - if let Some(f) = &locked_inner.on_participant { + if let Some(f) = &self.inner.lock().await.on_participant.as_ref().cloned() { debug!("calling on_participant with new participant"); - if let Err(e) = f(self.clone(), participant).await { + if let Err(e) = f(self.clone(), participant.clone()).await { warn!("on_participant failed: {:?}", e); } + else { + if let Some(jingle_session) = self.jingle_session.lock().await.as_ref() { + gstreamer::debug_bin_to_dot_file( + &jingle_session.pipeline(), + gstreamer::DebugGraphDetails::ALL, + &format!("participant-added-{}", participant.muc_jid.resource), + ); + } + } } } } diff --git a/lib-gst-meet/src/jingle.rs b/lib-gst-meet/src/jingle.rs index 143287b..554520a 100644 --- a/lib-gst-meet/src/jingle.rs +++ b/lib-gst-meet/src/jingle.rs @@ -236,6 +236,7 @@ impl JingleSession { .transpose()?; } else { + debug!("skipping media: {}", description.media); continue; } @@ -246,20 +247,24 @@ impl JingleSession { .context("missing ssrc-info")? .owner .clone(); - if owner == "jvb" { - debug!("skipping ssrc (owner = jvb)"); - continue; - } + debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc); remote_ssrc_map.insert( ssrc.id.parse()?, Source { ssrc: ssrc.id.parse()?, - participant_id: owner - .split('/') - .nth(1) - .context("invalid ssrc-info owner")? - .to_owned(), + participant_id: if owner == "jvb" { + None + } + else { + Some( + owner + .split('/') + .nth(1) + .context("invalid ssrc-info owner")? + .to_owned(), + ) + }, media_type: if description.media == "audio" { MediaType::Audio } @@ -576,10 +581,10 @@ impl JingleSession { })?; let handle = Handle::current(); - let inner_ = conference.inner.clone(); + let jingle_session = conference.jingle_session.clone(); rtpbin.connect("new-jitterbuffer", false, move |values| { let handle = handle.clone(); - let inner_ = inner_.clone(); + let jingle_session = jingle_session.clone(); let f = move || { let rtpjitterbuffer: gstreamer::Element = values[1].get()?; let session: u32 = values[2].get()?; @@ -590,13 +595,12 @@ impl JingleSession { ); let source = handle.block_on(async move { - let locked_inner = inner_.lock().await; - let jingle_session = locked_inner - .jingle_session - .as_ref() - .context("not connected (no jingle session)")?; Ok::<_, anyhow::Error>( jingle_session + .lock() + .await + .as_ref() + .context("not connected (no jingle session)")? .remote_ssrc_map .get(&ssrc) .context(format!("unknown ssrc: {}", ssrc))? @@ -604,7 +608,7 @@ impl JingleSession { ) })?; debug!("jitterbuffer is for remote source: {:?}", source); - if source.media_type == MediaType::Video { + if source.media_type == MediaType::Video && source.participant_id.is_some() { debug!("enabling RTX for ssrc {}", ssrc); rtpjitterbuffer.set_property("do-retransmission", true)?; } @@ -703,7 +707,7 @@ impl JingleSession { { let handle = Handle::current(); - let inner = conference.inner.clone(); + let conference = conference.clone(); let pipeline = pipeline.clone(); let rtpbin_ = rtpbin.clone(); rtpbin.connect("pad-added", false, move |values| { @@ -717,13 +721,13 @@ impl JingleSession { let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?; let pt: u8 = parts.next().context("malformed pad name")?.parse()?; let source = handle.block_on(async { - let locked_inner = inner.lock().await; - let jingle_session = locked_inner - .jingle_session - .as_ref() - .context("not connected (no jingle session)")?; Ok::<_, anyhow::Error>( - jingle_session + conference + .jingle_session + .lock() + .await + .as_ref() + .context("not connected (no jingle session)")? .remote_ssrc_map .get(&ssrc) .context(format!("unknown ssrc: {}", ssrc))? @@ -805,26 +809,32 @@ impl JingleSession { .static_pad("src") .context("depayloader has no src pad")?; - if let Some(participant_bin) = - pipeline.by_name(&format!("participant_{}", source.participant_id)) - { - let sink_pad_name = match source.media_type { - MediaType::Audio => "audio", - MediaType::Video => "video", - }; - if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { - debug!("linking depayloader to participant bin"); - src_pad.link(&sink_pad)?; + if let Some(participant_id) = source.participant_id { + handle.block_on(conference.ensure_participant(&participant_id))?; + if let Some(participant_bin) = + pipeline.by_name(&format!("participant_{}", participant_id)) + { + let sink_pad_name = match source.media_type { + MediaType::Audio => "audio", + MediaType::Video => "video", + }; + if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { + debug!("linking depayloader to participant bin"); + src_pad.link(&sink_pad)?; + } + else { + warn!( + "no {} sink pad in {} participant bin", + sink_pad_name, participant_id + ); + } } else { - warn!( - "no {} sink pad in {} participant bin", - sink_pad_name, source.participant_id - ); + debug!("no participant bin for {}", participant_id); } } else { - debug!("no participant bin for {}", source.participant_id); + debug!("not looking for participant bin, source is owned by JVB"); } if !src_pad.is_linked() { @@ -861,38 +871,43 @@ impl JingleSession { )?; audio_sink_element.set_property("min-ptime", 10i64 * 1000 * 1000)?; audio_sink_element.set_property("ssrc", audio_ssrc)?; - audio_sink_element.set_property("auto-header-extension", false)?; - audio_sink_element.connect("request-extension", false, move |values| { - let f = || { - let ext_id: u32 = values[1].get()?; - let ext_uri: String = values[2].get()?; - debug!( - "audio payloader requested extension: {} {}", - ext_id, ext_uri - ); - let hdrext = - RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; - hdrext.set_id(ext_id); - if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + if audio_sink_element.has_property("auto-header-extension", None) { + audio_sink_element.set_property("auto-header-extension", false)?; + audio_sink_element.connect("request-extension", false, move |values| { + let f = || { + let ext_id: u32 = values[1].get()?; + let ext_uri: String = values[2].get()?; + debug!( + "audio payloader requested extension: {} {}", + ext_id, ext_uri + ); + let hdrext = + RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; + hdrext.set_id(ext_id); + if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + } + else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL { + } + else if ext_uri == RTP_HDREXT_TRANSPORT_CC { + // hdrext.set_property("n-streams", 2u32)?; + } + else { + bail!("unknown rtp hdrext: {}", ext_uri); + } + Ok::<_, anyhow::Error>(hdrext) + }; + match f() { + Ok(hdrext) => Some(hdrext.to_value()), + Err(e) => { + warn!("request-extension: {:?}", e); + None + }, } - else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL { - } - else if ext_uri == RTP_HDREXT_TRANSPORT_CC { - // hdrext.set_property("n-streams", 2u32)?; - } - else { - bail!("unknown rtp hdrext: {}", ext_uri); - } - Ok::<_, anyhow::Error>(hdrext) - }; - match f() { - Ok(hdrext) => Some(hdrext.to_value()), - Err(e) => { - warn!("request-extension: {:?}", e); - None - }, - } - })?; + })?; + } + else { + debug!("audio payloader: no rtp header extension support"); + } pipeline.add(&audio_sink_element)?; let video_sink_element = match conference.config.video_codec.as_str() { @@ -926,43 +941,47 @@ impl JingleSession { other => bail!("unsupported video codec: {}", other), }; video_sink_element.set_property("ssrc", video_ssrc)?; - video_sink_element.set_property("auto-header-extension", false)?; - video_sink_element.connect("request-extension", false, move |values| { - let f = || { - let ext_id: u32 = values[1].get()?; - let ext_uri: String = values[2].get()?; - debug!( - "video payloader requested extension: {} {}", - ext_id, ext_uri - ); - let hdrext = - RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; - hdrext.set_id(ext_id); - if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + if video_sink_element.has_property("auto-header-extension", None) { + video_sink_element.set_property("auto-header-extension", false)?; + video_sink_element.connect("request-extension", false, move |values| { + let f = || { + let ext_id: u32 = values[1].get()?; + let ext_uri: String = values[2].get()?; + debug!( + "video payloader requested extension: {} {}", + ext_id, ext_uri + ); + let hdrext = + RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; + hdrext.set_id(ext_id); + if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + } + else if ext_uri == RTP_HDREXT_TRANSPORT_CC { + // hdrext.set_property("n-streams", 2u32)?; + } + else { + bail!("unknown rtp hdrext: {}", ext_uri); + } + Ok::<_, anyhow::Error>(hdrext) + }; + match f() { + Ok(hdrext) => Some(hdrext.to_value()), + Err(e) => { + warn!("request-extension: {:?}", e); + None + }, } - else if ext_uri == RTP_HDREXT_TRANSPORT_CC { - // hdrext.set_property("n-streams", 2u32)?; - } - else { - bail!("unknown rtp hdrext: {}", ext_uri); - } - Ok::<_, anyhow::Error>(hdrext) - }; - match f() { - Ok(hdrext) => Some(hdrext.to_value()), - Err(e) => { - warn!("request-extension: {:?}", e); - None - }, - } - })?; + })?; + } + else { + debug!("video payloader: no rtp header extension support"); + } pipeline.add(&video_sink_element)?; let mut audio_caps = gstreamer::Caps::builder("application/x-rtp"); - // TODO: fails to negotiate - // if let Some(hdrext) = audio_hdrext_ssrc_audio_level { - // audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL); - // } + if let Some(hdrext) = audio_hdrext_ssrc_audio_level { + audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL); + } if let Some(hdrext) = audio_hdrext_transport_cc { audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC); } @@ -981,13 +1000,19 @@ impl JingleSession { video_capsfilter.set_property("caps", video_caps.build())?; pipeline.add(&video_capsfilter)?; - debug!("linking video payloader -> rtpbin"); - video_sink_element.link(&video_capsfilter)?; - video_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?; + let rtpfunnel = gstreamer::ElementFactory::make("rtpfunnel", None)?; + pipeline.add(&rtpfunnel)?; - debug!("linking audio payloader -> rtpbin"); + debug!("linking video payloader -> rtpfunnel"); + video_sink_element.link(&video_capsfilter)?; + video_capsfilter.link(&rtpfunnel)?; + + debug!("linking audio payloader -> rtpfunnel"); audio_sink_element.link(&audio_capsfilter)?; - audio_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_1"))?; + audio_capsfilter.link(&rtpfunnel)?; + + debug!("linking rtpfunnel -> rtpbin"); + rtpfunnel.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?; debug!("link dtlssrtpdec -> rtpbin"); dtlssrtpdec.link_pads(Some("rtp_src"), &rtpbin, Some("recv_rtp_sink_0"))?; @@ -996,8 +1021,6 @@ impl JingleSession { debug!("linking rtpbin -> dtlssrtpenc"); rtpbin.link_pads(Some("send_rtp_src_0"), &dtlssrtpenc, Some("rtp_sink_0"))?; rtpbin.link_pads(Some("send_rtcp_src_0"), &dtlssrtpenc, Some("rtcp_sink_0"))?; - rtpbin.link_pads(Some("send_rtp_src_1"), &dtlssrtpenc, Some("rtp_sink_1"))?; - rtpbin.link_pads(Some("send_rtcp_src_1"), &dtlssrtpenc, Some("rtcp_sink_1"))?; debug!("linking ice src -> dtlssrtpdec"); nicesrc.link(&dtlssrtpdec)?; @@ -1177,15 +1200,12 @@ impl JingleSession { }; if initiate_content.name.0 == "audio" { - // TODO: fails to negotiate - // if let Some(hdrext) = audio_hdrext_ssrc_audio_level { - // if audio_hdrext_supported { - // description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned())); - // } - // else { - // debug!("ssrc-audio-level hdrext requested but not supported"); - // } - // } + if let Some(hdrext) = audio_hdrext_ssrc_audio_level { + description.hdrexts.push(RtpHdrext::new( + hdrext.to_string(), + RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned(), + )); + } if let Some(hdrext) = audio_hdrext_transport_cc { description.hdrexts.push(RtpHdrext::new( hdrext.to_string(), @@ -1276,21 +1296,24 @@ impl JingleSession { .context("missing ssrc-info")? .owner .clone(); - if owner == "jvb" { - debug!("skipping ssrc (owner = jvb)"); - continue; - } debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc); self.remote_ssrc_map.insert( ssrc.id.parse()?, Source { ssrc: ssrc.id.parse()?, - participant_id: owner - .split('/') - .nth(1) - .context("invalid ssrc-info owner")? - .to_owned(), + participant_id: if owner == "jvb" { + None + } + else { + Some( + owner + .split('/') + .nth(1) + .context("invalid ssrc-info owner")? + .to_owned(), + ) + }, media_type: if description.media == "audio" { MediaType::Audio } diff --git a/lib-gst-meet/src/lib.rs b/lib-gst-meet/src/lib.rs index 1ccd5a0..f641b48 100644 --- a/lib-gst-meet/src/lib.rs +++ b/lib-gst-meet/src/lib.rs @@ -1,6 +1,5 @@ -pub mod colibri; +mod colibri; mod conference; -mod connection; mod jingle; mod pinger; mod source; @@ -10,9 +9,10 @@ mod xmpp; pub use crate::{ colibri::ColibriMessage, - conference::{JitsiConference, JitsiConferenceConfig, Participant}, - connection::JitsiConnection, + conference::{Feature, JitsiConference, JitsiConferenceConfig, Participant}, source::MediaType, + stanza_filter::StanzaFilter, + xmpp::connection::{Authentication, Connection}, }; #[cfg(feature = "tracing-subscriber")] diff --git a/lib-gst-meet/src/source.rs b/lib-gst-meet/src/source.rs index 9be8222..8dbec9a 100644 --- a/lib-gst-meet/src/source.rs +++ b/lib-gst-meet/src/source.rs @@ -1,7 +1,7 @@ #[derive(Debug, Clone)] pub(crate) struct Source { pub(crate) ssrc: u32, - pub(crate) participant_id: String, + pub(crate) participant_id: Option, pub(crate) media_type: MediaType, } diff --git a/lib-gst-meet/src/stanza_filter.rs b/lib-gst-meet/src/stanza_filter.rs index c66d6bd..6eaca3e 100644 --- a/lib-gst-meet/src/stanza_filter.rs +++ b/lib-gst-meet/src/stanza_filter.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use xmpp_parsers::Element; #[async_trait] -pub(crate) trait StanzaFilter { +pub trait StanzaFilter { fn filter(&self, element: &Element) -> bool; async fn take(&self, element: Element) -> Result<()>; } diff --git a/lib-gst-meet/src/connection.rs b/lib-gst-meet/src/xmpp/connection.rs similarity index 67% rename from lib-gst-meet/src/connection.rs rename to lib-gst-meet/src/xmpp/connection.rs index 22b930b..d9f82e6 100644 --- a/lib-gst-meet/src/connection.rs +++ b/lib-gst-meet/src/xmpp/connection.rs @@ -23,16 +23,10 @@ use xmpp_parsers::{ BareJid, Element, FullJid, Jid, }; -use crate::{ - conference::{JitsiConference, JitsiConferenceConfig}, - pinger::Pinger, - stanza_filter::StanzaFilter, - util::generate_id, - xmpp, -}; +use crate::{pinger::Pinger, stanza_filter::StanzaFilter, util::generate_id, xmpp}; #[derive(Debug, Clone, Copy)] -enum JitsiConnectionState { +enum ConnectionState { OpeningPreAuthentication, ReceivingFeaturesPreAuthentication, Authenticating, @@ -44,35 +38,41 @@ enum JitsiConnectionState { Idle, } -struct JitsiConnectionInner { - state: JitsiConnectionState, - xmpp_domain: BareJid, +struct ConnectionInner { + state: ConnectionState, jid: Option, + xmpp_domain: BareJid, + authentication: Authentication, external_services: Vec, connected_tx: Option>>, stanza_filters: Vec>, } -impl fmt::Debug for JitsiConnectionInner { +impl fmt::Debug for ConnectionInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JitsiConnectionInner") + f.debug_struct("ConnectionInner") .field("state", &self.state) - .field("xmpp_domain", &self.xmpp_domain) .field("jid", &self.jid) .finish() } } #[derive(Debug, Clone)] -pub struct JitsiConnection { - tx: mpsc::Sender, - inner: Arc>, +pub struct Connection { + pub(crate) tx: mpsc::Sender, + inner: Arc>, } -impl JitsiConnection { +pub enum Authentication { + Anonymous, + Plain { username: String, password: String }, +} + +impl Connection { pub async fn new( websocket_url: &str, xmpp_domain: &str, + authentication: Authentication, ) -> Result<(Self, impl Future)> { let websocket_url: Uri = websocket_url.parse().context("invalid WebSocket URL")?; let xmpp_domain: BareJid = xmpp_domain.parse().context("invalid XMPP domain")?; @@ -88,10 +88,11 @@ impl JitsiConnection { let (sink, stream) = websocket.split(); let (tx, rx) = mpsc::channel(64); - let inner = Arc::new(Mutex::new(JitsiConnectionInner { - state: JitsiConnectionState::OpeningPreAuthentication, - xmpp_domain, + let inner = Arc::new(Mutex::new(ConnectionInner { + state: ConnectionState::OpeningPreAuthentication, jid: None, + xmpp_domain, + authentication, external_services: vec![], connected_tx: None, stanza_filters: vec![], @@ -102,8 +103,8 @@ impl JitsiConnection { inner: inner.clone(), }; - let writer = JitsiConnection::write_loop(rx, sink); - let reader = JitsiConnection::read_loop(inner, tx, stream); + let writer = Connection::write_loop(rx, sink); + let reader = Connection::read_loop(inner, tx, stream); let background = async move { tokio::select! { @@ -115,6 +116,11 @@ impl JitsiConnection { Ok((connection, background)) } + pub async fn add_stanza_filter(&self, stanza_filter: impl StanzaFilter + Send + Sync + 'static) { + let mut locked_inner = self.inner.lock().await; + locked_inner.stanza_filters.push(Box::new(stanza_filter)); + } + pub async fn connect(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); @@ -128,48 +134,14 @@ impl JitsiConnection { rx.await? } - pub async fn join_conference( - &self, - glib_main_context: glib::MainContext, - config: JitsiConferenceConfig, - ) -> Result { - let conference_stanza = xmpp::jitsi::Conference { - machine_uid: Uuid::new_v4().to_string(), - room: config.muc.to_string(), - properties: hashmap! { - // Disable voice processing - "stereo".to_string() => "true".to_string(), - "startBitrate".to_string() => "800".to_string(), - }, - }; + pub async fn jid(&self) -> Option { + let mut locked_inner = self.inner.lock().await; + locked_inner.jid.clone() + } - let iq = - Iq::from_set(generate_id(), conference_stanza).with_to(Jid::Full(config.focus.clone())); - self.tx.send(iq.into()).await?; - - let conference = { - let mut locked_inner = self.inner.lock().await; - let conference = JitsiConference::new( - glib_main_context, - locked_inner - .jid - .as_ref() - .context("not connected (no jid)")? - .clone(), - self.tx.clone(), - config, - locked_inner.external_services.clone(), - ) - .await?; - locked_inner - .stanza_filters - .push(Box::new(conference.clone())); - conference - }; - - conference.connected().await?; - - Ok(conference) + pub async fn external_services(&self) -> Vec { + let mut locked_inner = self.inner.lock().await; + locked_inner.external_services.clone() } async fn write_loop(rx: mpsc::Receiver, mut sink: S) -> Result<()> @@ -189,7 +161,7 @@ impl JitsiConnection { } async fn read_loop( - inner: Arc>, + inner: Arc>, tx: mpsc::Sender, mut stream: S, ) -> Result<()> @@ -217,7 +189,7 @@ impl JitsiConnection { let mut locked_inner = inner.lock().await; - use JitsiConnectionState::*; + use ConnectionState::*; match locked_inner.state { OpeningPreAuthentication => { Open::try_from(element)?; @@ -225,9 +197,22 @@ impl JitsiConnection { locked_inner.state = ReceivingFeaturesPreAuthentication; }, ReceivingFeaturesPreAuthentication => { - let auth = Auth { - mechanism: Mechanism::Anonymous, - data: vec![], + let auth = match &locked_inner.authentication { + Authentication::Anonymous => Auth { + mechanism: Mechanism::Anonymous, + data: vec![], + }, + Authentication::Plain { username, password } => { + let mut data = Vec::with_capacity(username.len() + password.len() + 2); + data.push(0u8); + data.extend_from_slice(username.as_bytes()); + data.push(0u8); + data.extend_from_slice(password.as_bytes()); + Auth { + mechanism: Mechanism::Plain, + data, + } + }, }; tx.send(auth.into()).await?; locked_inner.state = Authenticating; @@ -241,8 +226,10 @@ impl JitsiConnection { }, OpeningPostAuthentication => { Open::try_from(element)?; - info!("Logged in anonymously"); - + match &locked_inner.authentication { + Authentication::Anonymous => info!("Logged in anonymously"), + Authentication::Plain { .. } => info!("Logged in with PLAIN"), + } locked_inner.state = ReceivingFeaturesPostAuthentication; }, ReceivingFeaturesPostAuthentication => { @@ -250,28 +237,33 @@ impl JitsiConnection { tx.send(iq.into()).await?; locked_inner.state = Binding; }, - Binding => { - let iq = Iq::try_from(element)?; - let jid = if let IqType::Result(Some(element)) = iq.payload { - let bind = BindResponse::try_from(element)?; - FullJid::try_from(bind)? - } - else { - bail!("bind failed"); - }; - info!("My JID: {}", jid); - locked_inner.jid = Some(jid.clone()); + Binding => match Iq::try_from(element) { + Ok(iq) => { + let jid = if let IqType::Result(Some(element)) = iq.payload { + let bind = BindResponse::try_from(element)?; + FullJid::try_from(bind)? + } + else { + bail!("bind failed"); + }; + info!("My JID: {}", jid); + locked_inner.jid = Some(jid.clone()); - locked_inner.stanza_filters.push(Box::new(Pinger { - jid: jid.clone(), - tx: tx.clone(), - })); + locked_inner.stanza_filters.push(Box::new(Pinger { + jid: jid.clone(), + tx: tx.clone(), + })); - let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None }) - .with_from(Jid::Full(jid.clone())) - .with_to(Jid::Bare(locked_inner.xmpp_domain.clone())); - tx.send(iq.into()).await?; - locked_inner.state = Discovering; + let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None }) + .with_from(Jid::Full(jid.clone())) + .with_to(Jid::Bare(locked_inner.xmpp_domain.clone())); + tx.send(iq.into()).await?; + locked_inner.state = Discovering; + }, + Err(e) => debug!( + "received unexpected element while waiting for bind response: {}", + e + ), }, Discovering => { let iq = Iq::try_from(element)?; diff --git a/lib-gst-meet/src/xmpp/extdisco.rs b/lib-gst-meet/src/xmpp/extdisco.rs index e7390e5..c1388eb 100644 --- a/lib-gst-meet/src/xmpp/extdisco.rs +++ b/lib-gst-meet/src/xmpp/extdisco.rs @@ -26,7 +26,7 @@ impl From for Element { impl IqGetPayload for ServicesQuery {} #[derive(Debug, Clone)] -pub(crate) struct Service { +pub struct Service { pub(crate) r#type: String, pub(crate) name: Option, pub(crate) host: String, diff --git a/lib-gst-meet/src/xmpp/jitsi.rs b/lib-gst-meet/src/xmpp/jitsi.rs index 8aae939..17e1cf9 100644 --- a/lib-gst-meet/src/xmpp/jitsi.rs +++ b/lib-gst-meet/src/xmpp/jitsi.rs @@ -37,3 +37,19 @@ impl From for Element { builder.build() } } + +pub(crate) struct JsonMessage { + pub(crate) payload: serde_json::Value, +} + +impl TryFrom for Element { + type Error = anyhow::Error; + + fn try_from(message: JsonMessage) -> Result { + Ok( + Element::builder("json-message", ns::JITSI_JITMEET) + .append(serde_json::to_string(&message.payload)?) + .build(), + ) + } +} diff --git a/lib-gst-meet/src/xmpp/mod.rs b/lib-gst-meet/src/xmpp/mod.rs index 9a5ae3c..47cc282 100644 --- a/lib-gst-meet/src/xmpp/mod.rs +++ b/lib-gst-meet/src/xmpp/mod.rs @@ -1,3 +1,4 @@ +pub mod connection; pub(crate) mod extdisco; pub(crate) mod jitsi; mod ns; diff --git a/lib-gst-meet/src/xmpp/ns.rs b/lib-gst-meet/src/xmpp/ns.rs index 44b9bf5..9d3022f 100644 --- a/lib-gst-meet/src/xmpp/ns.rs +++ b/lib-gst-meet/src/xmpp/ns.rs @@ -2,3 +2,5 @@ pub(crate) const EXTDISCO: &str = "urn:xmpp:extdisco:2"; pub(crate) const JITSI_FOCUS: &str = "http://jitsi.org/protocol/focus"; + +pub(crate) const JITSI_JITMEET: &str = "http://jitsi.org/jitmeet";