XMPP improvements

- Handle unsolicited XMPP messages gracefully
- Split out generic XMPP connection handler which can be used for
  connecting to brewery MUCs
- Don't deadlock during Jingle handling
This commit is contained in:
Jasper Hugo 2021-09-07 23:52:04 +07:00
parent 5759d5c800
commit 037c2d944f
13 changed files with 458 additions and 317 deletions

2
Cargo.lock generated
View File

@ -1829,8 +1829,6 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "xmpp-parsers-gst-meet"
version = "0.18.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cde8e9611ca7cac569119e4ec1b6fe50da4df91a168fdab786693029ab1482a"
dependencies = [
"base64",
"blake2",

View File

@ -27,6 +27,7 @@ rcgen = { version = "0.8", default-features = false }
ring = { version = "0.16", default-features = false }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["std"] }
serde_with = { version = "1", default-features = false, features = ["macros"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = { version = "0.1", default-features = false, features = ["time"] }
tokio-tungstenite = { version = "0.14", default-features = false, features = ["connect", "rustls-tls"] }
@ -39,7 +40,7 @@ tracing-subscriber = { version = "0.2", optional = true, default-features = fals
"tracing-log",
] }
uuid = { version = "0.8", default-features = false, features = ["v4"] }
xmpp-parsers = { package = "xmpp-parsers-gst-meet", version = "0.18", default-features = false }
xmpp-parsers = { path = "../../xmpp-rs/xmpp-parsers", package = "xmpp-parsers-gst-meet", version = "0.18", default-features = false }
[features]
default = []

View File

@ -6,11 +6,13 @@ use futures::{
stream::{StreamExt, TryStreamExt},
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_tungstenite::tungstenite::{http::Request, Message};
use tracing::{debug, error, info, warn};
#[serde_as]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "colibriClass")]
pub enum ColibriMessage {
@ -20,7 +22,11 @@ pub enum ColibriMessage {
previous_speakers: Vec<String>,
},
#[serde(rename_all = "camelCase")]
EndpointConnectivityStatusChangeEvent { endpoint: String, active: bool },
EndpointConnectivityStatusChangeEvent {
endpoint: String,
#[serde_as(as = "DisplayFromStr")]
active: bool,
},
#[serde(rename_all = "camelCase")]
EndpointMessage {
from: String,

View File

@ -4,17 +4,23 @@ use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
use futures::stream::StreamExt;
use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt};
use maplit::hashmap;
use once_cell::sync::Lazy;
use serde::Serialize;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
pub use xmpp_parsers::disco::Feature;
use xmpp_parsers::{
disco::{DiscoInfoQuery, DiscoInfoResult, Feature},
disco::{DiscoInfoQuery, DiscoInfoResult},
ecaps2::{self, ECaps2},
hashes::Algo,
iq::{Iq, IqType},
jingle::{Action, Jingle},
message::{Message, MessageType},
muc::{Muc, MucUser},
nick::Nick,
ns,
presence::{self, Presence},
BareJid, Element, FullJid, Jid,
@ -25,7 +31,8 @@ use crate::{
jingle::JingleSession,
source::MediaType,
stanza_filter::StanzaFilter,
xmpp,
util::generate_id,
xmpp::{self, connection::Connection},
};
static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| {
@ -70,6 +77,7 @@ pub struct JitsiConferenceConfig {
pub nick: String,
pub region: String,
pub video_codec: String,
pub extra_muc_features: Vec<String>,
}
#[derive(Clone)]
@ -79,6 +87,7 @@ pub struct JitsiConference {
pub(crate) xmpp_tx: mpsc::Sender<Element>,
pub(crate) config: JitsiConferenceConfig,
pub(crate) external_services: Vec<xmpp::extdisco::Service>,
pub(crate) jingle_session: Arc<Mutex<Option<JingleSession>>>,
pub(crate) inner: Arc<Mutex<JitsiConferenceInner>>,
}
@ -94,7 +103,7 @@ impl fmt::Debug for JitsiConference {
#[derive(Debug, Clone)]
pub struct Participant {
pub jid: FullJid,
pub jid: Option<FullJid>,
pub muc_jid: FullJid,
pub nick: Option<String>,
}
@ -102,7 +111,6 @@ pub struct Participant {
type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
pub(crate) struct JitsiConferenceInner {
pub(crate) jingle_session: Option<JingleSession>,
participants: HashMap<String, Participant>,
on_participant:
Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
@ -112,7 +120,6 @@ pub(crate) struct JitsiConferenceInner {
Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>,
state: JitsiConferenceState,
connected_tx: Option<oneshot::Sender<()>>,
connected_rx: Option<oneshot::Receiver<()>>,
}
impl fmt::Debug for JitsiConferenceInner {
@ -124,51 +131,60 @@ impl fmt::Debug for JitsiConferenceInner {
}
impl JitsiConference {
#[tracing::instrument(level = "debug", skip(xmpp_tx), err)]
pub(crate) async fn new(
#[tracing::instrument(level = "debug", err)]
pub async fn join(
xmpp_connection: Connection,
glib_main_context: glib::MainContext,
jid: FullJid,
xmpp_tx: mpsc::Sender<Element>,
config: JitsiConferenceConfig,
external_services: Vec<xmpp::extdisco::Service>,
) -> Result<Self> {
let conference_stanza = xmpp::jitsi::Conference {
machine_uid: Uuid::new_v4().to_string(),
room: config.muc.to_string(),
properties: hashmap! {
// Disable voice processing
// TODO put this in config
"stereo".to_string() => "true".to_string(),
"startBitrate".to_string() => "800".to_string(),
},
};
let (tx, rx) = oneshot::channel();
Ok(Self {
let focus = config.focus.clone();
let conference = Self {
glib_main_context,
jid,
xmpp_tx,
jid: xmpp_connection
.jid()
.await
.context("not connected (no JID)")?,
xmpp_tx: xmpp_connection.tx.clone(),
config,
external_services,
external_services: xmpp_connection.external_services().await,
jingle_session: Arc::new(Mutex::new(None)),
inner: Arc::new(Mutex::new(JitsiConferenceInner {
state: JitsiConferenceState::Discovering,
participants: HashMap::new(),
on_participant: None,
on_participant_left: None,
on_colibri_message: None,
jingle_session: None,
connected_tx: Some(tx),
connected_rx: Some(rx),
})),
})
}
pub(crate) async fn connected(&self) -> Result<()> {
let rx = {
let mut locked_inner = self.inner.lock().await;
locked_inner
.connected_rx
.take()
.context("connected() called twice")?
};
xmpp_connection.add_stanza_filter(conference.clone()).await;
let iq = Iq::from_set(generate_id(), conference_stanza).with_to(Jid::Full(focus));
xmpp_connection.tx.send(iq.into()).await?;
rx.await?;
Ok(())
Ok(conference)
}
#[tracing::instrument(level = "debug", err)]
pub async fn leave(self) -> Result<()> {
let mut inner = self.inner.lock().await;
if let Some(jingle_session) = inner.jingle_session.take() {
if let Some(jingle_session) = self.jingle_session.lock().await.take() {
debug!("pausing all sinks");
jingle_session.pause_all_sinks();
@ -224,10 +240,9 @@ impl JitsiConference {
pub async fn pipeline(&self) -> Result<gstreamer::Pipeline> {
Ok(
self
.inner
.jingle_session
.lock()
.await
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?
.pipeline(),
@ -255,10 +270,9 @@ impl JitsiConference {
pub async fn audio_sink_element(&self) -> Result<gstreamer::Element> {
Ok(
self
.inner
.jingle_session
.lock()
.await
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?
.audio_sink_element(),
@ -268,10 +282,9 @@ impl JitsiConference {
pub async fn video_sink_element(&self) -> Result<gstreamer::Element> {
Ok(
self
.inner
.jingle_session
.lock()
.await
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?
.video_sink_element(),
@ -280,10 +293,9 @@ impl JitsiConference {
pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> {
self
.inner
.jingle_session
.lock()
.await
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?
.colibri_channel
@ -293,6 +305,54 @@ impl JitsiConference {
.await
}
pub async fn send_json_message<T: Serialize>(&self, payload: &T) -> Result<()> {
let message = Message {
from: Some(Jid::Full(self.jid.clone())),
to: Some(Jid::Bare(self.config.muc.clone())),
id: Some(Uuid::new_v4().to_string()),
type_: MessageType::Groupchat,
bodies: Default::default(),
subjects: Default::default(),
thread: None,
payloads: vec![Element::try_from(xmpp::jitsi::JsonMessage {
payload: serde_json::to_value(payload)?,
})?],
};
self.xmpp_tx.send(message.into()).await?;
Ok(())
}
pub(crate) async fn ensure_participant(&self, id: &str) -> Result<()> {
if !self.inner.lock().await.participants.contains_key(id) {
let participant = Participant {
jid: None,
muc_jid: self.config.muc.clone().with_resource(id),
nick: None,
};
self
.inner
.lock()
.await
.participants
.insert(id.to_owned(), participant.clone());
if let Some(f) = self.inner.lock().await.on_participant.as_ref().cloned() {
if let Err(e) = f(self.clone(), participant.clone()).await {
warn!("on_participant failed: {:?}", e);
}
else {
if let Ok(pipeline) = self.pipeline().await {
gstreamer::debug_bin_to_dot_file(
&pipeline,
gstreamer::DebugGraphDetails::ALL,
&format!("participant-added-{}", participant.muc_jid.resource),
);
}
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant(
&self,
@ -313,6 +373,15 @@ impl JitsiConference {
if let Err(e) = f(self.clone(), participant.clone()).await {
warn!("on_participant failed: {:?}", e);
}
else {
if let Ok(pipeline) = self.pipeline().await {
gstreamer::debug_bin_to_dot_file(
&pipeline,
gstreamer::DebugGraphDetails::ALL,
&format!("participant-added-{}", participant.muc_jid.resource),
);
}
}
}
}
@ -349,10 +418,9 @@ impl StanzaFilter for JitsiConference {
#[tracing::instrument(level = "trace", err)]
async fn take(&self, element: Element) -> Result<()> {
let mut locked_inner = self.inner.lock().await;
use JitsiConferenceState::*;
match locked_inner.state {
let state = self.inner.lock().await.state;
match state {
Discovering => {
let iq = Iq::try_from(element)?;
if let IqType::Result(Some(element)) = iq.payload {
@ -377,34 +445,40 @@ impl StanzaFilter for JitsiConference {
let jitsi_disco_hash =
ecaps2::hash_ecaps2(&ecaps2::compute_disco(&jitsi_disco_info)?, Algo::Sha_256)?;
self
.send_presence(vec![
Muc::new().into(),
ECaps2::new(vec![jitsi_disco_hash]).into(),
Element::builder("stats-id", "").append("gst-meet").build(),
Element::builder("jitsi_participant_codecType", "")
.append(self.config.video_codec.as_str())
.build(),
Element::builder("jitsi_participant_region", "")
.append(self.config.region.as_str())
.build(),
Element::builder("audiomuted", "").append("false").build(),
Element::builder("videomuted", "").append("false").build(),
Element::builder("nick", "http://jabber.org/protocol/nick")
.append(self.config.nick.as_str())
.build(),
Element::builder("region", "http://jitsi.org/jitsi-meet")
.attr("id", &self.config.region)
.build(),
])
.await?;
locked_inner.state = JoiningMuc;
let mut presence = vec![
Muc::new().into(),
ECaps2::new(vec![jitsi_disco_hash]).into(),
Element::builder("stats-id", "").append("gst-meet").build(),
Element::builder("jitsi_participant_codecType", "")
.append(self.config.video_codec.as_str())
.build(),
Element::builder("jitsi_participant_region", "")
.append(self.config.region.as_str())
.build(),
Element::builder("audiomuted", "").append("false").build(),
Element::builder("videomuted", "").append("false").build(),
Element::builder("nick", "http://jabber.org/protocol/nick")
.append(self.config.nick.as_str())
.build(),
Element::builder("region", "http://jitsi.org/jitsi-meet")
.attr("id", &self.config.region)
.build(),
];
presence.extend(
self
.config
.extra_muc_features
.iter()
.map(|feature| Element::builder("feature", "").attr("var", feature).build()),
);
self.send_presence(presence).await?;
self.inner.lock().await.state = JoiningMuc;
},
JoiningMuc => {
let presence = Presence::try_from(element)?;
if BareJid::from(presence.from.as_ref().unwrap().clone()) == self.config.muc {
debug!("Joined MUC: {}", self.config.muc);
locked_inner.state = Idle;
self.inner.lock().await.state = Idle;
}
},
Idle => {
@ -438,7 +512,7 @@ impl StanzaFilter for JitsiConference {
.with_from(Jid::Full(self.jid.clone()));
self.xmpp_tx.send(result_iq.into()).await?;
locked_inner.jingle_session =
*self.jingle_session.lock().await =
Some(JingleSession::initiate(self, jingle).await?);
}
else {
@ -453,8 +527,10 @@ impl StanzaFilter for JitsiConference {
.with_from(Jid::Full(self.jid.clone()));
self.xmpp_tx.send(result_iq.into()).await?;
locked_inner
self
.jingle_session
.lock()
.await
.as_mut()
.context("not connected (no jingle session")?
.source_add(jingle)
@ -470,11 +546,11 @@ impl StanzaFilter for JitsiConference {
}
},
IqType::Result(_) => {
if let Some(jingle_session) = locked_inner.jingle_session.as_ref() {
if let Some(jingle_session) = self.jingle_session.lock().await.as_mut() {
if Some(iq.id) == jingle_session.accept_iq_id {
let colibri_url = jingle_session.colibri_url.clone();
locked_inner.jingle_session.as_mut().unwrap().accept_iq_id = None;
jingle_session.accept_iq_id = None;
debug!("Focus acknowledged session-accept");
@ -483,11 +559,7 @@ impl StanzaFilter for JitsiConference {
let colibri_channel = ColibriChannel::new(&colibri_url).await?;
let (tx, rx) = mpsc::channel(8);
colibri_channel.subscribe(tx).await;
locked_inner
.jingle_session
.as_mut()
.unwrap()
.colibri_channel = Some(colibri_channel);
jingle_session.colibri_channel = Some(colibri_channel);
let self_ = self.clone();
tokio::spawn(async move {
@ -503,7 +575,7 @@ impl StanzaFilter for JitsiConference {
});
}
if let Some(connected_tx) = locked_inner.connected_tx.take() {
if let Some(connected_tx) = self.inner.lock().await.connected_tx.take() {
connected_tx.send(()).unwrap();
}
}
@ -522,47 +594,77 @@ impl StanzaFilter for JitsiConference {
let bare_from: BareJid = from.clone().into();
if bare_from == self.config.muc && from.resource != "focus" {
trace!("received MUC presence from {}", from.resource);
for payload in presence.payloads {
if !payload.is("x", ns::MUC_USER) {
continue;
}
let muc_user = MucUser::try_from(payload)?;
debug!("MUC user presence: {:?}", muc_user);
let nick_payload = presence
.payloads
.iter()
.find(|e| e.is("nick", ns::NICK))
.map(|e| Nick::try_from(e.clone()))
.transpose()?;
if let Some(muc_user_payload) = presence
.payloads
.into_iter()
.find(|e| e.is("x", ns::MUC_USER))
{
let muc_user = MucUser::try_from(muc_user_payload)?;
for item in muc_user.items {
if let Some(jid) = &item.jid {
if jid == &self.jid {
continue;
}
let participant = Participant {
jid: jid.clone(),
jid: Some(jid.clone()),
muc_jid: from.clone(),
nick: item.nick,
nick: item
.nick
.or_else(|| nick_payload.as_ref().map(|nick| nick.0.clone())),
};
if presence.type_ == presence::Type::Unavailable
&& locked_inner
&& self
.inner
.lock()
.await
.participants
.remove(&from.resource.clone())
.is_some()
{
debug!("participant left: {:?}", jid);
if let Some(f) = &locked_inner.on_participant_left {
if let Some(f) = &self
.inner
.lock()
.await
.on_participant_left
.as_ref()
.cloned()
{
debug!("calling on_participant_left with old participant");
if let Err(e) = f(self.clone(), participant).await {
warn!("on_participant_left failed: {:?}", e);
}
}
}
else if locked_inner
else if self
.inner
.lock()
.await
.participants
.insert(from.resource.clone(), participant.clone())
.is_none()
{
debug!("new participant: {:?}", jid);
if let Some(f) = &locked_inner.on_participant {
if let Some(f) = &self.inner.lock().await.on_participant.as_ref().cloned() {
debug!("calling on_participant with new participant");
if let Err(e) = f(self.clone(), participant).await {
if let Err(e) = f(self.clone(), participant.clone()).await {
warn!("on_participant failed: {:?}", e);
}
else {
if let Some(jingle_session) = self.jingle_session.lock().await.as_ref() {
gstreamer::debug_bin_to_dot_file(
&jingle_session.pipeline(),
gstreamer::DebugGraphDetails::ALL,
&format!("participant-added-{}", participant.muc_jid.resource),
);
}
}
}
}
}

View File

@ -236,6 +236,7 @@ impl JingleSession {
.transpose()?;
}
else {
debug!("skipping media: {}", description.media);
continue;
}
@ -246,20 +247,24 @@ impl JingleSession {
.context("missing ssrc-info")?
.owner
.clone();
if owner == "jvb" {
debug!("skipping ssrc (owner = jvb)");
continue;
}
debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc);
remote_ssrc_map.insert(
ssrc.id.parse()?,
Source {
ssrc: ssrc.id.parse()?,
participant_id: owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
participant_id: if owner == "jvb" {
None
}
else {
Some(
owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
)
},
media_type: if description.media == "audio" {
MediaType::Audio
}
@ -576,10 +581,10 @@ impl JingleSession {
})?;
let handle = Handle::current();
let inner_ = conference.inner.clone();
let jingle_session = conference.jingle_session.clone();
rtpbin.connect("new-jitterbuffer", false, move |values| {
let handle = handle.clone();
let inner_ = inner_.clone();
let jingle_session = jingle_session.clone();
let f = move || {
let rtpjitterbuffer: gstreamer::Element = values[1].get()?;
let session: u32 = values[2].get()?;
@ -590,13 +595,12 @@ impl JingleSession {
);
let source = handle.block_on(async move {
let locked_inner = inner_.lock().await;
let jingle_session = locked_inner
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?;
Ok::<_, anyhow::Error>(
jingle_session
.lock()
.await
.as_ref()
.context("not connected (no jingle session)")?
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
@ -604,7 +608,7 @@ impl JingleSession {
)
})?;
debug!("jitterbuffer is for remote source: {:?}", source);
if source.media_type == MediaType::Video {
if source.media_type == MediaType::Video && source.participant_id.is_some() {
debug!("enabling RTX for ssrc {}", ssrc);
rtpjitterbuffer.set_property("do-retransmission", true)?;
}
@ -703,7 +707,7 @@ impl JingleSession {
{
let handle = Handle::current();
let inner = conference.inner.clone();
let conference = conference.clone();
let pipeline = pipeline.clone();
let rtpbin_ = rtpbin.clone();
rtpbin.connect("pad-added", false, move |values| {
@ -717,13 +721,13 @@ impl JingleSession {
let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?;
let pt: u8 = parts.next().context("malformed pad name")?.parse()?;
let source = handle.block_on(async {
let locked_inner = inner.lock().await;
let jingle_session = locked_inner
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?;
Ok::<_, anyhow::Error>(
jingle_session
conference
.jingle_session
.lock()
.await
.as_ref()
.context("not connected (no jingle session)")?
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
@ -805,26 +809,32 @@ impl JingleSession {
.static_pad("src")
.context("depayloader has no src pad")?;
if let Some(participant_bin) =
pipeline.by_name(&format!("participant_{}", source.participant_id))
{
let sink_pad_name = match source.media_type {
MediaType::Audio => "audio",
MediaType::Video => "video",
};
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
debug!("linking depayloader to participant bin");
src_pad.link(&sink_pad)?;
if let Some(participant_id) = source.participant_id {
handle.block_on(conference.ensure_participant(&participant_id))?;
if let Some(participant_bin) =
pipeline.by_name(&format!("participant_{}", participant_id))
{
let sink_pad_name = match source.media_type {
MediaType::Audio => "audio",
MediaType::Video => "video",
};
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
debug!("linking depayloader to participant bin");
src_pad.link(&sink_pad)?;
}
else {
warn!(
"no {} sink pad in {} participant bin",
sink_pad_name, participant_id
);
}
}
else {
warn!(
"no {} sink pad in {} participant bin",
sink_pad_name, source.participant_id
);
debug!("no participant bin for {}", participant_id);
}
}
else {
debug!("no participant bin for {}", source.participant_id);
debug!("not looking for participant bin, source is owned by JVB");
}
if !src_pad.is_linked() {
@ -861,38 +871,43 @@ impl JingleSession {
)?;
audio_sink_element.set_property("min-ptime", 10i64 * 1000 * 1000)?;
audio_sink_element.set_property("ssrc", audio_ssrc)?;
audio_sink_element.set_property("auto-header-extension", false)?;
audio_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"audio payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
if audio_sink_element.has_property("auto-header-extension", None) {
audio_sink_element.set_property("auto-header-extension", false)?;
audio_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"audio payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
}
else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
})?;
})?;
}
else {
debug!("audio payloader: no rtp header extension support");
}
pipeline.add(&audio_sink_element)?;
let video_sink_element = match conference.config.video_codec.as_str() {
@ -926,43 +941,47 @@ impl JingleSession {
other => bail!("unsupported video codec: {}", other),
};
video_sink_element.set_property("ssrc", video_ssrc)?;
video_sink_element.set_property("auto-header-extension", false)?;
video_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"video payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
if video_sink_element.has_property("auto-header-extension", None) {
video_sink_element.set_property("auto-header-extension", false)?;
video_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"video payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
})?;
})?;
}
else {
debug!("video payloader: no rtp header extension support");
}
pipeline.add(&video_sink_element)?;
let mut audio_caps = gstreamer::Caps::builder("application/x-rtp");
// TODO: fails to negotiate
// if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
// audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL);
// }
if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_SSRC_AUDIO_LEVEL);
}
if let Some(hdrext) = audio_hdrext_transport_cc {
audio_caps = audio_caps.field(&format!("extmap-{}", hdrext), RTP_HDREXT_TRANSPORT_CC);
}
@ -981,13 +1000,19 @@ impl JingleSession {
video_capsfilter.set_property("caps", video_caps.build())?;
pipeline.add(&video_capsfilter)?;
debug!("linking video payloader -> rtpbin");
video_sink_element.link(&video_capsfilter)?;
video_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?;
let rtpfunnel = gstreamer::ElementFactory::make("rtpfunnel", None)?;
pipeline.add(&rtpfunnel)?;
debug!("linking audio payloader -> rtpbin");
debug!("linking video payloader -> rtpfunnel");
video_sink_element.link(&video_capsfilter)?;
video_capsfilter.link(&rtpfunnel)?;
debug!("linking audio payloader -> rtpfunnel");
audio_sink_element.link(&audio_capsfilter)?;
audio_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_1"))?;
audio_capsfilter.link(&rtpfunnel)?;
debug!("linking rtpfunnel -> rtpbin");
rtpfunnel.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?;
debug!("link dtlssrtpdec -> rtpbin");
dtlssrtpdec.link_pads(Some("rtp_src"), &rtpbin, Some("recv_rtp_sink_0"))?;
@ -996,8 +1021,6 @@ impl JingleSession {
debug!("linking rtpbin -> dtlssrtpenc");
rtpbin.link_pads(Some("send_rtp_src_0"), &dtlssrtpenc, Some("rtp_sink_0"))?;
rtpbin.link_pads(Some("send_rtcp_src_0"), &dtlssrtpenc, Some("rtcp_sink_0"))?;
rtpbin.link_pads(Some("send_rtp_src_1"), &dtlssrtpenc, Some("rtp_sink_1"))?;
rtpbin.link_pads(Some("send_rtcp_src_1"), &dtlssrtpenc, Some("rtcp_sink_1"))?;
debug!("linking ice src -> dtlssrtpdec");
nicesrc.link(&dtlssrtpdec)?;
@ -1177,15 +1200,12 @@ impl JingleSession {
};
if initiate_content.name.0 == "audio" {
// TODO: fails to negotiate
// if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
// if audio_hdrext_supported {
// description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned()));
// }
// else {
// debug!("ssrc-audio-level hdrext requested but not supported");
// }
// }
if let Some(hdrext) = audio_hdrext_ssrc_audio_level {
description.hdrexts.push(RtpHdrext::new(
hdrext.to_string(),
RTP_HDREXT_SSRC_AUDIO_LEVEL.to_owned(),
));
}
if let Some(hdrext) = audio_hdrext_transport_cc {
description.hdrexts.push(RtpHdrext::new(
hdrext.to_string(),
@ -1276,21 +1296,24 @@ impl JingleSession {
.context("missing ssrc-info")?
.owner
.clone();
if owner == "jvb" {
debug!("skipping ssrc (owner = jvb)");
continue;
}
debug!("adding ssrc to remote_ssrc_map: {:?}", ssrc);
self.remote_ssrc_map.insert(
ssrc.id.parse()?,
Source {
ssrc: ssrc.id.parse()?,
participant_id: owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
participant_id: if owner == "jvb" {
None
}
else {
Some(
owner
.split('/')
.nth(1)
.context("invalid ssrc-info owner")?
.to_owned(),
)
},
media_type: if description.media == "audio" {
MediaType::Audio
}

View File

@ -1,6 +1,5 @@
pub mod colibri;
mod colibri;
mod conference;
mod connection;
mod jingle;
mod pinger;
mod source;
@ -10,9 +9,10 @@ mod xmpp;
pub use crate::{
colibri::ColibriMessage,
conference::{JitsiConference, JitsiConferenceConfig, Participant},
connection::JitsiConnection,
conference::{Feature, JitsiConference, JitsiConferenceConfig, Participant},
source::MediaType,
stanza_filter::StanzaFilter,
xmpp::connection::{Authentication, Connection},
};
#[cfg(feature = "tracing-subscriber")]

View File

@ -1,7 +1,7 @@
#[derive(Debug, Clone)]
pub(crate) struct Source {
pub(crate) ssrc: u32,
pub(crate) participant_id: String,
pub(crate) participant_id: Option<String>,
pub(crate) media_type: MediaType,
}

View File

@ -3,7 +3,7 @@ use async_trait::async_trait;
use xmpp_parsers::Element;
#[async_trait]
pub(crate) trait StanzaFilter {
pub trait StanzaFilter {
fn filter(&self, element: &Element) -> bool;
async fn take(&self, element: Element) -> Result<()>;
}

View File

@ -23,16 +23,10 @@ use xmpp_parsers::{
BareJid, Element, FullJid, Jid,
};
use crate::{
conference::{JitsiConference, JitsiConferenceConfig},
pinger::Pinger,
stanza_filter::StanzaFilter,
util::generate_id,
xmpp,
};
use crate::{pinger::Pinger, stanza_filter::StanzaFilter, util::generate_id, xmpp};
#[derive(Debug, Clone, Copy)]
enum JitsiConnectionState {
enum ConnectionState {
OpeningPreAuthentication,
ReceivingFeaturesPreAuthentication,
Authenticating,
@ -44,35 +38,41 @@ enum JitsiConnectionState {
Idle,
}
struct JitsiConnectionInner {
state: JitsiConnectionState,
xmpp_domain: BareJid,
struct ConnectionInner {
state: ConnectionState,
jid: Option<FullJid>,
xmpp_domain: BareJid,
authentication: Authentication,
external_services: Vec<xmpp::extdisco::Service>,
connected_tx: Option<oneshot::Sender<Result<()>>>,
stanza_filters: Vec<Box<dyn StanzaFilter + Send + Sync>>,
}
impl fmt::Debug for JitsiConnectionInner {
impl fmt::Debug for ConnectionInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JitsiConnectionInner")
f.debug_struct("ConnectionInner")
.field("state", &self.state)
.field("xmpp_domain", &self.xmpp_domain)
.field("jid", &self.jid)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct JitsiConnection {
tx: mpsc::Sender<Element>,
inner: Arc<Mutex<JitsiConnectionInner>>,
pub struct Connection {
pub(crate) tx: mpsc::Sender<Element>,
inner: Arc<Mutex<ConnectionInner>>,
}
impl JitsiConnection {
pub enum Authentication {
Anonymous,
Plain { username: String, password: String },
}
impl Connection {
pub async fn new(
websocket_url: &str,
xmpp_domain: &str,
authentication: Authentication,
) -> Result<(Self, impl Future<Output = ()>)> {
let websocket_url: Uri = websocket_url.parse().context("invalid WebSocket URL")?;
let xmpp_domain: BareJid = xmpp_domain.parse().context("invalid XMPP domain")?;
@ -88,10 +88,11 @@ impl JitsiConnection {
let (sink, stream) = websocket.split();
let (tx, rx) = mpsc::channel(64);
let inner = Arc::new(Mutex::new(JitsiConnectionInner {
state: JitsiConnectionState::OpeningPreAuthentication,
xmpp_domain,
let inner = Arc::new(Mutex::new(ConnectionInner {
state: ConnectionState::OpeningPreAuthentication,
jid: None,
xmpp_domain,
authentication,
external_services: vec![],
connected_tx: None,
stanza_filters: vec![],
@ -102,8 +103,8 @@ impl JitsiConnection {
inner: inner.clone(),
};
let writer = JitsiConnection::write_loop(rx, sink);
let reader = JitsiConnection::read_loop(inner, tx, stream);
let writer = Connection::write_loop(rx, sink);
let reader = Connection::read_loop(inner, tx, stream);
let background = async move {
tokio::select! {
@ -115,6 +116,11 @@ impl JitsiConnection {
Ok((connection, background))
}
pub async fn add_stanza_filter(&self, stanza_filter: impl StanzaFilter + Send + Sync + 'static) {
let mut locked_inner = self.inner.lock().await;
locked_inner.stanza_filters.push(Box::new(stanza_filter));
}
pub async fn connect(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
@ -128,48 +134,14 @@ impl JitsiConnection {
rx.await?
}
pub async fn join_conference(
&self,
glib_main_context: glib::MainContext,
config: JitsiConferenceConfig,
) -> Result<JitsiConference> {
let conference_stanza = xmpp::jitsi::Conference {
machine_uid: Uuid::new_v4().to_string(),
room: config.muc.to_string(),
properties: hashmap! {
// Disable voice processing
"stereo".to_string() => "true".to_string(),
"startBitrate".to_string() => "800".to_string(),
},
};
pub async fn jid(&self) -> Option<FullJid> {
let mut locked_inner = self.inner.lock().await;
locked_inner.jid.clone()
}
let iq =
Iq::from_set(generate_id(), conference_stanza).with_to(Jid::Full(config.focus.clone()));
self.tx.send(iq.into()).await?;
let conference = {
let mut locked_inner = self.inner.lock().await;
let conference = JitsiConference::new(
glib_main_context,
locked_inner
.jid
.as_ref()
.context("not connected (no jid)")?
.clone(),
self.tx.clone(),
config,
locked_inner.external_services.clone(),
)
.await?;
locked_inner
.stanza_filters
.push(Box::new(conference.clone()));
conference
};
conference.connected().await?;
Ok(conference)
pub async fn external_services(&self) -> Vec<xmpp::extdisco::Service> {
let mut locked_inner = self.inner.lock().await;
locked_inner.external_services.clone()
}
async fn write_loop<S>(rx: mpsc::Receiver<Element>, mut sink: S) -> Result<()>
@ -189,7 +161,7 @@ impl JitsiConnection {
}
async fn read_loop<S>(
inner: Arc<Mutex<JitsiConnectionInner>>,
inner: Arc<Mutex<ConnectionInner>>,
tx: mpsc::Sender<Element>,
mut stream: S,
) -> Result<()>
@ -217,7 +189,7 @@ impl JitsiConnection {
let mut locked_inner = inner.lock().await;
use JitsiConnectionState::*;
use ConnectionState::*;
match locked_inner.state {
OpeningPreAuthentication => {
Open::try_from(element)?;
@ -225,9 +197,22 @@ impl JitsiConnection {
locked_inner.state = ReceivingFeaturesPreAuthentication;
},
ReceivingFeaturesPreAuthentication => {
let auth = Auth {
mechanism: Mechanism::Anonymous,
data: vec![],
let auth = match &locked_inner.authentication {
Authentication::Anonymous => Auth {
mechanism: Mechanism::Anonymous,
data: vec![],
},
Authentication::Plain { username, password } => {
let mut data = Vec::with_capacity(username.len() + password.len() + 2);
data.push(0u8);
data.extend_from_slice(username.as_bytes());
data.push(0u8);
data.extend_from_slice(password.as_bytes());
Auth {
mechanism: Mechanism::Plain,
data,
}
},
};
tx.send(auth.into()).await?;
locked_inner.state = Authenticating;
@ -241,8 +226,10 @@ impl JitsiConnection {
},
OpeningPostAuthentication => {
Open::try_from(element)?;
info!("Logged in anonymously");
match &locked_inner.authentication {
Authentication::Anonymous => info!("Logged in anonymously"),
Authentication::Plain { .. } => info!("Logged in with PLAIN"),
}
locked_inner.state = ReceivingFeaturesPostAuthentication;
},
ReceivingFeaturesPostAuthentication => {
@ -250,28 +237,33 @@ impl JitsiConnection {
tx.send(iq.into()).await?;
locked_inner.state = Binding;
},
Binding => {
let iq = Iq::try_from(element)?;
let jid = if let IqType::Result(Some(element)) = iq.payload {
let bind = BindResponse::try_from(element)?;
FullJid::try_from(bind)?
}
else {
bail!("bind failed");
};
info!("My JID: {}", jid);
locked_inner.jid = Some(jid.clone());
Binding => match Iq::try_from(element) {
Ok(iq) => {
let jid = if let IqType::Result(Some(element)) = iq.payload {
let bind = BindResponse::try_from(element)?;
FullJid::try_from(bind)?
}
else {
bail!("bind failed");
};
info!("My JID: {}", jid);
locked_inner.jid = Some(jid.clone());
locked_inner.stanza_filters.push(Box::new(Pinger {
jid: jid.clone(),
tx: tx.clone(),
}));
locked_inner.stanza_filters.push(Box::new(Pinger {
jid: jid.clone(),
tx: tx.clone(),
}));
let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None })
.with_from(Jid::Full(jid.clone()))
.with_to(Jid::Bare(locked_inner.xmpp_domain.clone()));
tx.send(iq.into()).await?;
locked_inner.state = Discovering;
let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None })
.with_from(Jid::Full(jid.clone()))
.with_to(Jid::Bare(locked_inner.xmpp_domain.clone()));
tx.send(iq.into()).await?;
locked_inner.state = Discovering;
},
Err(e) => debug!(
"received unexpected element while waiting for bind response: {}",
e
),
},
Discovering => {
let iq = Iq::try_from(element)?;

View File

@ -26,7 +26,7 @@ impl From<ServicesQuery> for Element {
impl IqGetPayload for ServicesQuery {}
#[derive(Debug, Clone)]
pub(crate) struct Service {
pub struct Service {
pub(crate) r#type: String,
pub(crate) name: Option<String>,
pub(crate) host: String,

View File

@ -37,3 +37,19 @@ impl From<Conference> for Element {
builder.build()
}
}
pub(crate) struct JsonMessage {
pub(crate) payload: serde_json::Value,
}
impl TryFrom<JsonMessage> for Element {
type Error = anyhow::Error;
fn try_from(message: JsonMessage) -> Result<Element> {
Ok(
Element::builder("json-message", ns::JITSI_JITMEET)
.append(serde_json::to_string(&message.payload)?)
.build(),
)
}
}

View File

@ -1,3 +1,4 @@
pub mod connection;
pub(crate) mod extdisco;
pub(crate) mod jitsi;
mod ns;

View File

@ -2,3 +2,5 @@
pub(crate) const EXTDISCO: &str = "urn:xmpp:extdisco:2";
pub(crate) const JITSI_FOCUS: &str = "http://jitsi.org/protocol/focus";
pub(crate) const JITSI_JITMEET: &str = "http://jitsi.org/jitmeet";