diff --git a/gst-meet/src/main.rs b/gst-meet/src/main.rs index e6cf9ba..e770982 100644 --- a/gst-meet/src/main.rs +++ b/gst-meet/src/main.rs @@ -71,6 +71,11 @@ struct Opt { help = "The maximum height to receive video at." )] recv_video_height: Option, + #[structopt( + long, + help = "The maximum height we plan to send video at (used for stats only)." + )] + send_video_height: Option, #[structopt( long, help = "The video type to signal that we are sending. One of: camera, desktop" @@ -198,6 +203,7 @@ async fn main_inner() -> Result<()> { region, video_codec, recv_pipeline_participant_template, + send_video_height, start_bitrate, stereo, #[cfg(feature = "log-rtp")] @@ -223,6 +229,10 @@ async fn main_inner() -> Result<()> { let conference = JitsiConference::join(connection, main_loop.context(), config) .await .context("failed to join conference")?; + + if let Some(height) = send_video_height { + conference.set_send_resolution(height.into()).await; + } if opt.select_endpoints.is_some() || opt.last_n.is_some() || opt.recv_video_height.is_some() { conference diff --git a/lib-gst-meet/Cargo.toml b/lib-gst-meet/Cargo.toml index e4e5706..d1715dc 100644 --- a/lib-gst-meet/Cargo.toml +++ b/lib-gst-meet/Cargo.toml @@ -2,7 +2,7 @@ name = "lib-gst-meet" description = "Connect GStreamer pipelines to Jitsi Meet conferences" version = "0.5.0" -edition = "2018" +edition = "2021" license = "MIT/Apache-2.0" readme = "../README.md" repository = "https://github.com/avstack/gst-meet" diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 5d1246a..4d8121b 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -1,15 +1,16 @@ -use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc}; +use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc, time::Duration}; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use colibri::{ColibriMessage, JsonMessage}; use futures::stream::StreamExt; +use glib::ObjectExt; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; use jitsi_xmpp_parsers::jingle::{Action, Jingle}; use maplit::hashmap; use once_cell::sync::Lazy; use serde::Serialize; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::{sync::{mpsc, oneshot, Mutex}, time}; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; @@ -26,7 +27,7 @@ use xmpp_parsers::{ ns, presence::{self, Presence}, stanza_error::{DefinedCondition, ErrorType, StanzaError}, - BareJid, Element, FullJid, Jid, + BareJid, FullJid, Jid, }; use crate::{ @@ -38,6 +39,8 @@ use crate::{ xmpp::{self, connection::Connection}, }; +const SEND_STATS_INTERVAL: Duration = Duration::from_secs(10); + const DISCO_NODE: &str = "https://github.com/avstack/gst-meet"; static DISCO_INFO: Lazy = Lazy::new(|| DiscoInfoResult { @@ -85,7 +88,7 @@ pub struct JitsiConferenceConfig { pub struct JitsiConference { pub(crate) glib_main_context: glib::MainContext, pub(crate) jid: FullJid, - pub(crate) xmpp_tx: mpsc::Sender, + pub(crate) xmpp_tx: mpsc::Sender, pub(crate) config: JitsiConferenceConfig, pub(crate) external_services: Vec, pub(crate) jingle_session: Arc>>, @@ -120,8 +123,9 @@ pub(crate) struct JitsiConferenceInner { Option BoxedResultFuture) + Send + Sync>>, on_colibri_message: Option BoxedResultFuture) + Send + Sync>>, - presence: Vec, + presence: Vec, state: JitsiConferenceState, + send_resolution: Option, connected_tx: Option>, } @@ -158,28 +162,28 @@ impl JitsiConference { Muc::new().into(), Caps::new(DISCO_NODE, COMPUTED_CAPS_HASH.clone()).into(), ECaps2::new(vec![ecaps2_hash]).into(), - Element::builder("stats-id", ns::DEFAULT_NS) + xmpp_parsers::Element::builder("stats-id", ns::DEFAULT_NS) .append("gst-meet") .build(), - Element::builder("jitsi_participant_codecType", ns::DEFAULT_NS) + xmpp_parsers::Element::builder("jitsi_participant_codecType", ns::DEFAULT_NS) .append(config.video_codec.as_str()) .build(), - Element::builder("audiomuted", ns::DEFAULT_NS) + xmpp_parsers::Element::builder("audiomuted", ns::DEFAULT_NS) .append("false") .build(), - Element::builder("videomuted", ns::DEFAULT_NS) + xmpp_parsers::Element::builder("videomuted", ns::DEFAULT_NS) .append("false") .build(), - Element::builder("nick", "http://jabber.org/protocol/nick") + xmpp_parsers::Element::builder("nick", "http://jabber.org/protocol/nick") .append(config.nick.as_str()) .build(), ]; if let Some(region) = &config.region { presence.extend([ - Element::builder("jitsi_participant_region", ns::DEFAULT_NS) + xmpp_parsers::Element::builder("jitsi_participant_region", ns::DEFAULT_NS) .append(region.as_str()) .build(), - Element::builder("region", "http://jitsi.org/jitsi-meet") + xmpp_parsers::Element::builder("region", "http://jitsi.org/jitsi-meet") .attr("id", region) .build(), ]); @@ -210,6 +214,7 @@ impl JitsiConference { on_participant: None, on_participant_left: None, on_colibri_message: None, + send_resolution: None, connected_tx: Some(tx), })), tls_insecure: xmpp_connection.tls_insecure, @@ -245,16 +250,19 @@ impl JitsiConference { Ok(()) } - fn jid_in_muc(&self) -> Result { - let resource = self + fn endpoint_id(&self) -> Result<&str> { + self .jid .node .as_ref() .ok_or_else(|| anyhow!("invalid jid"))? .split('-') .next() - .ok_or_else(|| anyhow!("invalid jid"))?; - Ok(self.config.muc.clone().with_resource(resource)) + .ok_or_else(|| anyhow!("invalid jid")) + } + + fn jid_in_muc(&self) -> Result { + Ok(self.config.muc.clone().with_resource(self.endpoint_id()?)) } pub(crate) fn focus_jid_in_muc(&self) -> Result { @@ -262,7 +270,7 @@ impl JitsiConference { } #[tracing::instrument(level = "debug", err)] - async fn send_presence(&self, payloads: &[Element]) -> Result<()> { + async fn send_presence(&self, payloads: &[xmpp_parsers::Element]) -> Result<()> { let mut presence = Presence::new(presence::Type::None).with_to(self.jid_in_muc()?); presence.payloads = payloads.to_owned(); self.xmpp_tx.send(presence.into()).await?; @@ -272,7 +280,7 @@ impl JitsiConference { #[tracing::instrument(level = "debug", err)] pub async fn set_muted(&self, media_type: MediaType, muted: bool) -> Result<()> { let mut locked_inner = self.inner.lock().await; - let element = Element::builder( + let element = xmpp_parsers::Element::builder( media_type.jitsi_muted_presence_element_name(), ns::DEFAULT_NS, ) @@ -339,6 +347,17 @@ impl JitsiConference { ) } + /// Set the max resolution that we are currently sending. + /// + /// Setting this is required for browser clients in the same conference to display + /// the stats that we broadcast. + /// + /// Note that lib-gst-meet does not encode video (that is the responsibility of your + /// GStreamer pipeline), so this is purely informational. + pub async fn set_send_resolution(&self, height: i32) { + self.inner.lock().await.send_resolution = Some(height); + } + pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> { self .jingle_session @@ -362,7 +381,7 @@ impl JitsiConference { bodies: Default::default(), subjects: Default::default(), thread: None, - payloads: vec![Element::try_from(xmpp::jitsi::JsonMessage { + payloads: vec![xmpp_parsers::Element::try_from(xmpp::jitsi::JsonMessage { payload: serde_json::to_value(payload)?, })?], }; @@ -449,7 +468,7 @@ impl JitsiConference { #[async_trait] impl StanzaFilter for JitsiConference { #[tracing::instrument(level = "trace")] - fn filter(&self, element: &Element) -> bool { + fn filter(&self, element: &xmpp_parsers::Element) -> bool { element.attr("from") == Some(self.config.focus.to_string().as_str()) && element.is("iq", ns::DEFAULT_NS) || element @@ -461,7 +480,7 @@ impl StanzaFilter for JitsiConference { } #[tracing::instrument(level = "trace", err)] - async fn take(&self, element: Element) -> Result<()> { + async fn take(&self, element: xmpp_parsers::Element) -> Result<()> { use JitsiConferenceState::*; let state = self.inner.lock().await.state; match state { @@ -599,45 +618,188 @@ impl StanzaFilter for JitsiConference { ColibriChannel::new(&colibri_url, self.tls_insecure).await?; let (tx, rx) = mpsc::channel(8); colibri_channel.subscribe(tx).await; - let colibri_channel_ = colibri_channel.clone(); - jingle_session.colibri_channel = Some(colibri_channel); + jingle_session.colibri_channel = Some(colibri_channel.clone()); - let self_ = self.clone(); - tokio::spawn(async move { - let mut stream = ReceiverStream::new(rx); - while let Some(msg) = stream.next().await { - // Some message types are handled internally rather than passed to the on_colibri_message handler. + let my_endpoint_id = self.endpoint_id()?.to_owned(); - // End-to-end ping - if let ColibriMessage::EndpointMessage { to, from, msg_payload } = &msg { - if let Some(to) = to { - let my_endpoint_id = self_.jid.to_string().split('-').next().unwrap().to_owned(); - if to == &my_endpoint_id { + { + let my_endpoint_id = my_endpoint_id.clone(); + let colibri_channel = colibri_channel.clone(); + let self_ = self.clone(); + jingle_session.stats_handler_task = Some(tokio::spawn(async move { + let mut interval = time::interval(SEND_STATS_INTERVAL); + loop { + let maybe_remote_ssrc_map = self_.jingle_session.lock().await.as_ref().map(|sess| sess.remote_ssrc_map.clone()); + let maybe_source_stats: Option> = self_ + .pipeline() + .await + .ok() + .and_then(|pipeline| pipeline.by_name("rtpbin")) + .and_then(|rtpbin| rtpbin.try_emit_by_name("get-session", &[&0u32]).ok()) + .and_then(|rtpsession: gstreamer::Element| rtpsession.try_property("stats").ok()) + .and_then(|stats: gstreamer::Structure| stats.get("source-stats").ok()) + .and_then(|stats: glib::ValueArray| stats.into_iter().map(|v| v.get()).collect::>().ok()); + + if let (Some(remote_ssrc_map), Some(source_stats)) = (maybe_remote_ssrc_map, maybe_source_stats) { + debug!("source stats: {:#?}", source_stats); + + let audio_recv_bitrate: u64 = source_stats + .iter() + .filter(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .filter_map(|stat| stat.get::("bitrate").ok()) + .sum(); + + let video_recv_bitrate: u64 = source_stats + .iter() + .filter(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .filter_map(|stat| stat.get::("bitrate").ok()) + .sum(); + + let audio_send_bitrate: u64 = source_stats + .iter() + .find(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .and_then(|stat| stat.get("bitrate").ok()) + .unwrap_or_default(); + let video_send_bitrate: u64 = source_stats + .iter() + .find(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .and_then(|stat| stat.get("bitrate").ok()) + .unwrap_or_default(); + + let recv_packets: u64 = source_stats + .iter() + .filter(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .filter_map(|stat| stat.get::("packets-received").ok()) + .sum(); + let recv_lost: u64 = source_stats + .iter() + .filter(|stat| { + stat + .get("ssrc") + .ok() + .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) + .map(|source| source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .unwrap_or_default() + }) + .filter_map(|stat| stat.get::("packets-lost").ok()) + .sum::() + // Loss can be negative because of duplicate packets. Clamp it to zero. + .try_into() + .unwrap_or_default(); + let recv_loss = recv_lost as f64 / (recv_packets as f64 + recv_lost as f64); + + let stats = ColibriMessage::EndpointStats { + from: None, + bitrate: colibri::Bitrates { + audio: colibri::Bitrate { + upload: audio_send_bitrate / 1024, + download: audio_recv_bitrate / 1024, + }, + video: colibri::Bitrate { + upload: video_send_bitrate / 1024, + download: video_recv_bitrate / 1024, + }, + total: colibri::Bitrate { + upload: (audio_send_bitrate + video_send_bitrate) / 1024, + download: (audio_recv_bitrate + video_recv_bitrate) / 1024, + }, + }, + packet_loss: colibri::PacketLoss { + total: (recv_loss * 100.) as u64, + download: (recv_loss * 100.) as u64, + upload: 0, // TODO + }, + connection_quality: 100.0, + jvb_rtt: Some(0), // TODO + server_region: self_.config.region.clone(), + max_enabled_resolution: self_.inner.lock().await.send_resolution, + }; + if let Err(e) = colibri_channel.send(stats).await { + warn!("failed to send stats: {:?}", e); + } + } + else { + warn!("unable to get stats from pipeline"); + } + interval.tick().await; + } + })); + } + + { + let self_ = self.clone(); + tokio::spawn(async move { + let mut stream = ReceiverStream::new(rx); + while let Some(msg) = stream.next().await { + // Some message types are handled internally rather than passed to the on_colibri_message handler. + let handled = match &msg { + ColibriMessage::EndpointMessage { to: Some(to), from, msg_payload } if to == &my_endpoint_id => { match serde_json::from_value::(msg_payload.clone()) { Ok(JsonMessage::E2ePingRequest { id }) => { - if let Err(e) = colibri_channel_.send(ColibriMessage::EndpointMessage { - from: Some(my_endpoint_id), + if let Err(e) = colibri_channel.send(ColibriMessage::EndpointMessage { + from: None, to: from.clone(), msg_payload: serde_json::to_value(JsonMessage::E2ePingResponse { id }).unwrap(), }).await { warn!("failed to send e2e ping response: {:?}", e); } - continue; + true }, - _ => {}, + _ => false, } + }, + _ => false, + }; + + if handled { + continue; + } + + let locked_inner = self_.inner.lock().await; + if let Some(f) = &locked_inner.on_colibri_message { + if let Err(e) = f(self_.clone(), msg).await { + warn!("on_colibri_message failed: {:?}", e); } } } - - let locked_inner = self_.inner.lock().await; - if let Some(f) = &locked_inner.on_colibri_message { - if let Err(e) = f(self_.clone(), msg).await { - warn!("on_colibri_message failed: {:?}", e); - } - } - } - }); + Ok::<_, anyhow::Error>(()) + }); + } } if let Some(connected_tx) = self.inner.lock().await.connected_tx.take() { diff --git a/lib-gst-meet/src/jingle.rs b/lib-gst-meet/src/jingle.rs index ca18948..59687b7 100644 --- a/lib-gst-meet/src/jingle.rs +++ b/lib-gst-meet/src/jingle.rs @@ -20,7 +20,7 @@ use pem::Pem; use rand::random; use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256}; use ring::digest::{digest, SHA256}; -use tokio::{net::lookup_host, runtime::Handle, sync::oneshot}; +use tokio::{net::lookup_host, runtime::Handle, sync::oneshot, task::JoinHandle}; use tracing::{debug, error, warn}; use uuid::Uuid; use xmpp_parsers::{ @@ -137,11 +137,12 @@ pub(crate) struct JingleSession { pipeline: gstreamer::Pipeline, audio_sink_element: gstreamer::Element, video_sink_element: gstreamer::Element, - remote_ssrc_map: HashMap, + pub(crate) remote_ssrc_map: HashMap, _ice_agent: nice::Agent, pub(crate) accept_iq_id: Option, pub(crate) colibri_url: Option, pub(crate) colibri_channel: Option, + pub(crate) stats_handler_task: Option>, pipeline_state_null_rx: oneshot::Receiver<()>, } @@ -1320,6 +1321,7 @@ impl JingleSession { accept_iq_id: Some(accept_iq_id), colibri_url: ice_transport.web_socket.clone().map(|ws| ws.url), colibri_channel: None, + stats_handler_task: None, pipeline_state_null_rx, }) }