Transmit EndpointStats on the colibri channel

This commit is contained in:
Jasper Hugo 2022-03-09 10:04:45 +07:00 committed by Jasper
parent d9642aa760
commit 6ef974719b
4 changed files with 223 additions and 49 deletions

View File

@ -71,6 +71,11 @@ struct Opt {
help = "The maximum height to receive video at." help = "The maximum height to receive video at."
)] )]
recv_video_height: Option<u16>, recv_video_height: Option<u16>,
#[structopt(
long,
help = "The maximum height we plan to send video at (used for stats only)."
)]
send_video_height: Option<u16>,
#[structopt( #[structopt(
long, long,
help = "The video type to signal that we are sending. One of: camera, desktop" help = "The video type to signal that we are sending. One of: camera, desktop"
@ -198,6 +203,7 @@ async fn main_inner() -> Result<()> {
region, region,
video_codec, video_codec,
recv_pipeline_participant_template, recv_pipeline_participant_template,
send_video_height,
start_bitrate, start_bitrate,
stereo, stereo,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
@ -223,6 +229,10 @@ async fn main_inner() -> Result<()> {
let conference = JitsiConference::join(connection, main_loop.context(), config) let conference = JitsiConference::join(connection, main_loop.context(), config)
.await .await
.context("failed to join conference")?; .context("failed to join conference")?;
if let Some(height) = send_video_height {
conference.set_send_resolution(height.into()).await;
}
if opt.select_endpoints.is_some() || opt.last_n.is_some() || opt.recv_video_height.is_some() { if opt.select_endpoints.is_some() || opt.last_n.is_some() || opt.recv_video_height.is_some() {
conference conference

View File

@ -2,7 +2,7 @@
name = "lib-gst-meet" name = "lib-gst-meet"
description = "Connect GStreamer pipelines to Jitsi Meet conferences" description = "Connect GStreamer pipelines to Jitsi Meet conferences"
version = "0.5.0" version = "0.5.0"
edition = "2018" edition = "2021"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
readme = "../README.md" readme = "../README.md"
repository = "https://github.com/avstack/gst-meet" repository = "https://github.com/avstack/gst-meet"

View File

@ -1,15 +1,16 @@
use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc}; use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc, time::Duration};
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use colibri::{ColibriMessage, JsonMessage}; use colibri::{ColibriMessage, JsonMessage};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use glib::ObjectExt;
use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt};
use jitsi_xmpp_parsers::jingle::{Action, Jingle}; use jitsi_xmpp_parsers::jingle::{Action, Jingle};
use maplit::hashmap; use maplit::hashmap;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Serialize; use serde::Serialize;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::{sync::{mpsc, oneshot, Mutex}, time};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use uuid::Uuid; use uuid::Uuid;
@ -26,7 +27,7 @@ use xmpp_parsers::{
ns, ns,
presence::{self, Presence}, presence::{self, Presence},
stanza_error::{DefinedCondition, ErrorType, StanzaError}, stanza_error::{DefinedCondition, ErrorType, StanzaError},
BareJid, Element, FullJid, Jid, BareJid, FullJid, Jid,
}; };
use crate::{ use crate::{
@ -38,6 +39,8 @@ use crate::{
xmpp::{self, connection::Connection}, xmpp::{self, connection::Connection},
}; };
const SEND_STATS_INTERVAL: Duration = Duration::from_secs(10);
const DISCO_NODE: &str = "https://github.com/avstack/gst-meet"; const DISCO_NODE: &str = "https://github.com/avstack/gst-meet";
static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| DiscoInfoResult { static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| DiscoInfoResult {
@ -85,7 +88,7 @@ pub struct JitsiConferenceConfig {
pub struct JitsiConference { pub struct JitsiConference {
pub(crate) glib_main_context: glib::MainContext, pub(crate) glib_main_context: glib::MainContext,
pub(crate) jid: FullJid, pub(crate) jid: FullJid,
pub(crate) xmpp_tx: mpsc::Sender<Element>, pub(crate) xmpp_tx: mpsc::Sender<xmpp_parsers::Element>,
pub(crate) config: JitsiConferenceConfig, pub(crate) config: JitsiConferenceConfig,
pub(crate) external_services: Vec<xmpp::extdisco::Service>, pub(crate) external_services: Vec<xmpp::extdisco::Service>,
pub(crate) jingle_session: Arc<Mutex<Option<JingleSession>>>, pub(crate) jingle_session: Arc<Mutex<Option<JingleSession>>>,
@ -120,8 +123,9 @@ pub(crate) struct JitsiConferenceInner {
Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>, Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_colibri_message: on_colibri_message:
Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>, Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>,
presence: Vec<Element>, presence: Vec<xmpp_parsers::Element>,
state: JitsiConferenceState, state: JitsiConferenceState,
send_resolution: Option<i32>,
connected_tx: Option<oneshot::Sender<()>>, connected_tx: Option<oneshot::Sender<()>>,
} }
@ -158,28 +162,28 @@ impl JitsiConference {
Muc::new().into(), Muc::new().into(),
Caps::new(DISCO_NODE, COMPUTED_CAPS_HASH.clone()).into(), Caps::new(DISCO_NODE, COMPUTED_CAPS_HASH.clone()).into(),
ECaps2::new(vec![ecaps2_hash]).into(), ECaps2::new(vec![ecaps2_hash]).into(),
Element::builder("stats-id", ns::DEFAULT_NS) xmpp_parsers::Element::builder("stats-id", ns::DEFAULT_NS)
.append("gst-meet") .append("gst-meet")
.build(), .build(),
Element::builder("jitsi_participant_codecType", ns::DEFAULT_NS) xmpp_parsers::Element::builder("jitsi_participant_codecType", ns::DEFAULT_NS)
.append(config.video_codec.as_str()) .append(config.video_codec.as_str())
.build(), .build(),
Element::builder("audiomuted", ns::DEFAULT_NS) xmpp_parsers::Element::builder("audiomuted", ns::DEFAULT_NS)
.append("false") .append("false")
.build(), .build(),
Element::builder("videomuted", ns::DEFAULT_NS) xmpp_parsers::Element::builder("videomuted", ns::DEFAULT_NS)
.append("false") .append("false")
.build(), .build(),
Element::builder("nick", "http://jabber.org/protocol/nick") xmpp_parsers::Element::builder("nick", "http://jabber.org/protocol/nick")
.append(config.nick.as_str()) .append(config.nick.as_str())
.build(), .build(),
]; ];
if let Some(region) = &config.region { if let Some(region) = &config.region {
presence.extend([ presence.extend([
Element::builder("jitsi_participant_region", ns::DEFAULT_NS) xmpp_parsers::Element::builder("jitsi_participant_region", ns::DEFAULT_NS)
.append(region.as_str()) .append(region.as_str())
.build(), .build(),
Element::builder("region", "http://jitsi.org/jitsi-meet") xmpp_parsers::Element::builder("region", "http://jitsi.org/jitsi-meet")
.attr("id", region) .attr("id", region)
.build(), .build(),
]); ]);
@ -210,6 +214,7 @@ impl JitsiConference {
on_participant: None, on_participant: None,
on_participant_left: None, on_participant_left: None,
on_colibri_message: None, on_colibri_message: None,
send_resolution: None,
connected_tx: Some(tx), connected_tx: Some(tx),
})), })),
tls_insecure: xmpp_connection.tls_insecure, tls_insecure: xmpp_connection.tls_insecure,
@ -245,16 +250,19 @@ impl JitsiConference {
Ok(()) Ok(())
} }
fn jid_in_muc(&self) -> Result<FullJid> { fn endpoint_id(&self) -> Result<&str> {
let resource = self self
.jid .jid
.node .node
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("invalid jid"))? .ok_or_else(|| anyhow!("invalid jid"))?
.split('-') .split('-')
.next() .next()
.ok_or_else(|| anyhow!("invalid jid"))?; .ok_or_else(|| anyhow!("invalid jid"))
Ok(self.config.muc.clone().with_resource(resource)) }
fn jid_in_muc(&self) -> Result<FullJid> {
Ok(self.config.muc.clone().with_resource(self.endpoint_id()?))
} }
pub(crate) fn focus_jid_in_muc(&self) -> Result<FullJid> { pub(crate) fn focus_jid_in_muc(&self) -> Result<FullJid> {
@ -262,7 +270,7 @@ impl JitsiConference {
} }
#[tracing::instrument(level = "debug", err)] #[tracing::instrument(level = "debug", err)]
async fn send_presence(&self, payloads: &[Element]) -> Result<()> { async fn send_presence(&self, payloads: &[xmpp_parsers::Element]) -> Result<()> {
let mut presence = Presence::new(presence::Type::None).with_to(self.jid_in_muc()?); let mut presence = Presence::new(presence::Type::None).with_to(self.jid_in_muc()?);
presence.payloads = payloads.to_owned(); presence.payloads = payloads.to_owned();
self.xmpp_tx.send(presence.into()).await?; self.xmpp_tx.send(presence.into()).await?;
@ -272,7 +280,7 @@ impl JitsiConference {
#[tracing::instrument(level = "debug", err)] #[tracing::instrument(level = "debug", err)]
pub async fn set_muted(&self, media_type: MediaType, muted: bool) -> Result<()> { pub async fn set_muted(&self, media_type: MediaType, muted: bool) -> Result<()> {
let mut locked_inner = self.inner.lock().await; let mut locked_inner = self.inner.lock().await;
let element = Element::builder( let element = xmpp_parsers::Element::builder(
media_type.jitsi_muted_presence_element_name(), media_type.jitsi_muted_presence_element_name(),
ns::DEFAULT_NS, ns::DEFAULT_NS,
) )
@ -339,6 +347,17 @@ impl JitsiConference {
) )
} }
/// Set the max resolution that we are currently sending.
///
/// Setting this is required for browser clients in the same conference to display
/// the stats that we broadcast.
///
/// Note that lib-gst-meet does not encode video (that is the responsibility of your
/// GStreamer pipeline), so this is purely informational.
pub async fn set_send_resolution(&self, height: i32) {
self.inner.lock().await.send_resolution = Some(height);
}
pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> { pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> {
self self
.jingle_session .jingle_session
@ -362,7 +381,7 @@ impl JitsiConference {
bodies: Default::default(), bodies: Default::default(),
subjects: Default::default(), subjects: Default::default(),
thread: None, thread: None,
payloads: vec![Element::try_from(xmpp::jitsi::JsonMessage { payloads: vec![xmpp_parsers::Element::try_from(xmpp::jitsi::JsonMessage {
payload: serde_json::to_value(payload)?, payload: serde_json::to_value(payload)?,
})?], })?],
}; };
@ -449,7 +468,7 @@ impl JitsiConference {
#[async_trait] #[async_trait]
impl StanzaFilter for JitsiConference { impl StanzaFilter for JitsiConference {
#[tracing::instrument(level = "trace")] #[tracing::instrument(level = "trace")]
fn filter(&self, element: &Element) -> bool { fn filter(&self, element: &xmpp_parsers::Element) -> bool {
element.attr("from") == Some(self.config.focus.to_string().as_str()) element.attr("from") == Some(self.config.focus.to_string().as_str())
&& element.is("iq", ns::DEFAULT_NS) && element.is("iq", ns::DEFAULT_NS)
|| element || element
@ -461,7 +480,7 @@ impl StanzaFilter for JitsiConference {
} }
#[tracing::instrument(level = "trace", err)] #[tracing::instrument(level = "trace", err)]
async fn take(&self, element: Element) -> Result<()> { async fn take(&self, element: xmpp_parsers::Element) -> Result<()> {
use JitsiConferenceState::*; use JitsiConferenceState::*;
let state = self.inner.lock().await.state; let state = self.inner.lock().await.state;
match state { match state {
@ -599,45 +618,188 @@ impl StanzaFilter for JitsiConference {
ColibriChannel::new(&colibri_url, self.tls_insecure).await?; ColibriChannel::new(&colibri_url, self.tls_insecure).await?;
let (tx, rx) = mpsc::channel(8); let (tx, rx) = mpsc::channel(8);
colibri_channel.subscribe(tx).await; colibri_channel.subscribe(tx).await;
let colibri_channel_ = colibri_channel.clone(); jingle_session.colibri_channel = Some(colibri_channel.clone());
jingle_session.colibri_channel = Some(colibri_channel);
let self_ = self.clone(); let my_endpoint_id = self.endpoint_id()?.to_owned();
tokio::spawn(async move {
let mut stream = ReceiverStream::new(rx);
while let Some(msg) = stream.next().await {
// Some message types are handled internally rather than passed to the on_colibri_message handler.
// End-to-end ping {
if let ColibriMessage::EndpointMessage { to, from, msg_payload } = &msg { let my_endpoint_id = my_endpoint_id.clone();
if let Some(to) = to { let colibri_channel = colibri_channel.clone();
let my_endpoint_id = self_.jid.to_string().split('-').next().unwrap().to_owned(); let self_ = self.clone();
if to == &my_endpoint_id { jingle_session.stats_handler_task = Some(tokio::spawn(async move {
let mut interval = time::interval(SEND_STATS_INTERVAL);
loop {
let maybe_remote_ssrc_map = self_.jingle_session.lock().await.as_ref().map(|sess| sess.remote_ssrc_map.clone());
let maybe_source_stats: Option<Vec<gstreamer::Structure>> = self_
.pipeline()
.await
.ok()
.and_then(|pipeline| pipeline.by_name("rtpbin"))
.and_then(|rtpbin| rtpbin.try_emit_by_name("get-session", &[&0u32]).ok())
.and_then(|rtpsession: gstreamer::Element| rtpsession.try_property("stats").ok())
.and_then(|stats: gstreamer::Structure| stats.get("source-stats").ok())
.and_then(|stats: glib::ValueArray| stats.into_iter().map(|v| v.get()).collect::<Result<_, _>>().ok());
if let (Some(remote_ssrc_map), Some(source_stats)) = (maybe_remote_ssrc_map, maybe_source_stats) {
debug!("source stats: {:#?}", source_stats);
let audio_recv_bitrate: u64 = source_stats
.iter()
.filter(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.filter_map(|stat| stat.get::<u64>("bitrate").ok())
.sum();
let video_recv_bitrate: u64 = source_stats
.iter()
.filter(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.filter_map(|stat| stat.get::<u64>("bitrate").ok())
.sum();
let audio_send_bitrate: u64 = source_stats
.iter()
.find(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.and_then(|stat| stat.get("bitrate").ok())
.unwrap_or_default();
let video_send_bitrate: u64 = source_stats
.iter()
.find(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.and_then(|stat| stat.get("bitrate").ok())
.unwrap_or_default();
let recv_packets: u64 = source_stats
.iter()
.filter(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.filter_map(|stat| stat.get::<u64>("packets-received").ok())
.sum();
let recv_lost: u64 = source_stats
.iter()
.filter(|stat| {
stat
.get("ssrc")
.ok()
.and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc))
.map(|source| source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default())
.unwrap_or_default()
})
.filter_map(|stat| stat.get::<i32>("packets-lost").ok())
.sum::<i32>()
// Loss can be negative because of duplicate packets. Clamp it to zero.
.try_into()
.unwrap_or_default();
let recv_loss = recv_lost as f64 / (recv_packets as f64 + recv_lost as f64);
let stats = ColibriMessage::EndpointStats {
from: None,
bitrate: colibri::Bitrates {
audio: colibri::Bitrate {
upload: audio_send_bitrate / 1024,
download: audio_recv_bitrate / 1024,
},
video: colibri::Bitrate {
upload: video_send_bitrate / 1024,
download: video_recv_bitrate / 1024,
},
total: colibri::Bitrate {
upload: (audio_send_bitrate + video_send_bitrate) / 1024,
download: (audio_recv_bitrate + video_recv_bitrate) / 1024,
},
},
packet_loss: colibri::PacketLoss {
total: (recv_loss * 100.) as u64,
download: (recv_loss * 100.) as u64,
upload: 0, // TODO
},
connection_quality: 100.0,
jvb_rtt: Some(0), // TODO
server_region: self_.config.region.clone(),
max_enabled_resolution: self_.inner.lock().await.send_resolution,
};
if let Err(e) = colibri_channel.send(stats).await {
warn!("failed to send stats: {:?}", e);
}
}
else {
warn!("unable to get stats from pipeline");
}
interval.tick().await;
}
}));
}
{
let self_ = self.clone();
tokio::spawn(async move {
let mut stream = ReceiverStream::new(rx);
while let Some(msg) = stream.next().await {
// Some message types are handled internally rather than passed to the on_colibri_message handler.
let handled = match &msg {
ColibriMessage::EndpointMessage { to: Some(to), from, msg_payload } if to == &my_endpoint_id => {
match serde_json::from_value::<JsonMessage>(msg_payload.clone()) { match serde_json::from_value::<JsonMessage>(msg_payload.clone()) {
Ok(JsonMessage::E2ePingRequest { id }) => { Ok(JsonMessage::E2ePingRequest { id }) => {
if let Err(e) = colibri_channel_.send(ColibriMessage::EndpointMessage { if let Err(e) = colibri_channel.send(ColibriMessage::EndpointMessage {
from: Some(my_endpoint_id), from: None,
to: from.clone(), to: from.clone(),
msg_payload: serde_json::to_value(JsonMessage::E2ePingResponse { id }).unwrap(), msg_payload: serde_json::to_value(JsonMessage::E2ePingResponse { id }).unwrap(),
}).await { }).await {
warn!("failed to send e2e ping response: {:?}", e); warn!("failed to send e2e ping response: {:?}", e);
} }
continue; true
}, },
_ => {}, _ => false,
} }
},
_ => false,
};
if handled {
continue;
}
let locked_inner = self_.inner.lock().await;
if let Some(f) = &locked_inner.on_colibri_message {
if let Err(e) = f(self_.clone(), msg).await {
warn!("on_colibri_message failed: {:?}", e);
} }
} }
} }
Ok::<_, anyhow::Error>(())
let locked_inner = self_.inner.lock().await; });
if let Some(f) = &locked_inner.on_colibri_message { }
if let Err(e) = f(self_.clone(), msg).await {
warn!("on_colibri_message failed: {:?}", e);
}
}
}
});
} }
if let Some(connected_tx) = self.inner.lock().await.connected_tx.take() { if let Some(connected_tx) = self.inner.lock().await.connected_tx.take() {

View File

@ -20,7 +20,7 @@ use pem::Pem;
use rand::random; use rand::random;
use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256}; use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256};
use ring::digest::{digest, SHA256}; use ring::digest::{digest, SHA256};
use tokio::{net::lookup_host, runtime::Handle, sync::oneshot}; use tokio::{net::lookup_host, runtime::Handle, sync::oneshot, task::JoinHandle};
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use uuid::Uuid; use uuid::Uuid;
use xmpp_parsers::{ use xmpp_parsers::{
@ -137,11 +137,12 @@ pub(crate) struct JingleSession {
pipeline: gstreamer::Pipeline, pipeline: gstreamer::Pipeline,
audio_sink_element: gstreamer::Element, audio_sink_element: gstreamer::Element,
video_sink_element: gstreamer::Element, video_sink_element: gstreamer::Element,
remote_ssrc_map: HashMap<u32, Source>, pub(crate) remote_ssrc_map: HashMap<u32, Source>,
_ice_agent: nice::Agent, _ice_agent: nice::Agent,
pub(crate) accept_iq_id: Option<String>, pub(crate) accept_iq_id: Option<String>,
pub(crate) colibri_url: Option<String>, pub(crate) colibri_url: Option<String>,
pub(crate) colibri_channel: Option<ColibriChannel>, pub(crate) colibri_channel: Option<ColibriChannel>,
pub(crate) stats_handler_task: Option<JoinHandle<()>>,
pipeline_state_null_rx: oneshot::Receiver<()>, pipeline_state_null_rx: oneshot::Receiver<()>,
} }
@ -1320,6 +1321,7 @@ impl JingleSession {
accept_iq_id: Some(accept_iq_id), accept_iq_id: Some(accept_iq_id),
colibri_url: ice_transport.web_socket.clone().map(|ws| ws.url), colibri_url: ice_transport.web_socket.clone().map(|ws| ws.url),
colibri_channel: None, colibri_channel: None,
stats_handler_task: None,
pipeline_state_null_rx, pipeline_state_null_rx,
}) })
} }