diff --git a/Cargo.lock b/Cargo.lock index 32bd284..d6eca84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,9 +175,9 @@ dependencies = [ [[package]] name = "colibri" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2a17c805592bcb654dc14d11cd8f3ccb9d34ab2af7bdbde6e437aa055092df8" +checksum = "3c3218f52768bd253336854a154bda06c4e9a10bfea4186d6e7a86dad86704d9" dependencies = [ "serde", "serde_json", diff --git a/lib-gst-meet/src/colibri.rs b/lib-gst-meet/src/colibri.rs index c44383d..6599460 100644 --- a/lib-gst-meet/src/colibri.rs +++ b/lib-gst-meet/src/colibri.rs @@ -23,6 +23,7 @@ use crate::tls::wss_connector; const MAX_CONNECT_RETRIES: u8 = 3; const CONNECT_RETRY_SLEEP: Duration = Duration::from_secs(3); +#[derive(Clone)] pub(crate) struct ColibriChannel { send_tx: mpsc::Sender, recv_tx: Arc>>>, diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 34458fd..ad04780 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; -use colibri::ColibriMessage; +use colibri::{ColibriMessage, JsonMessage}; use futures::stream::StreamExt; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; use jitsi_xmpp_parsers::jingle::{Action, Jingle}; @@ -597,6 +597,7 @@ impl StanzaFilter for JitsiConference { ColibriChannel::new(&colibri_url, self.tls_insecure).await?; let (tx, rx) = mpsc::channel(8); colibri_channel.subscribe(tx).await; + let colibri_channel_ = colibri_channel.clone(); jingle_session.colibri_channel = Some(colibri_channel); let self_ = self.clone(); @@ -606,8 +607,25 @@ impl StanzaFilter for JitsiConference { // Some message types are handled internally rather than passed to the on_colibri_message handler. // End-to-end ping - if let ColibriMessage::EndpointMessage { to, .. } = &msg { - // if to == + if let ColibriMessage::EndpointMessage { to, from, msg_payload } = &msg { + if let Some(to) = to { + let my_endpoint_id = self_.jid.to_string().split('-').next().unwrap().to_owned(); + if to == &my_endpoint_id { + match serde_json::from_value::(msg_payload.clone()) { + Ok(JsonMessage::E2ePingRequest { id }) => { + if let Err(e) = colibri_channel_.send(ColibriMessage::EndpointMessage { + from: Some(my_endpoint_id), + to: from.clone(), + msg_payload: serde_json::to_value(JsonMessage::E2ePingResponse { id }).unwrap(), + }).await { + warn!("failed to send e2e ping response: {:?}", e); + } + continue; + }, + _ => {}, + } + } + } } let locked_inner = self_.inner.lock().await;