diff --git a/gst-meet/src/main.rs b/gst-meet/src/main.rs index 3a9b210..9e7fe7a 100644 --- a/gst-meet/src/main.rs +++ b/gst-meet/src/main.rs @@ -28,7 +28,7 @@ struct Opt { #[structopt( long, - help = "If not specified, assumed to be the host part of ", + help = "If not specified, assumed to be the host part of " )] xmpp_domain: Option, @@ -37,20 +37,20 @@ struct Opt { #[structopt( long, - help = "If not specified, assumed to be conference.", + help = "If not specified, assumed to be conference." )] muc_domain: Option, #[structopt( long, - help = "If not specified, assumed to be focus@auth./focus", + help = "If not specified, assumed to be focus@auth./focus" )] focus_jid: Option, #[structopt( long, default_value = "vp9", - help = "The video codec to negotiate support for. One of: vp9, vp8, h264", + help = "The video codec to negotiate support for. One of: vp9, vp8, h264" )] video_codec: String, @@ -77,13 +77,13 @@ struct Opt { #[structopt( long, - help = "Comma-separated endpoint IDs to select (prioritise receiving of)", + help = "Comma-separated endpoint IDs to select (prioritise receiving of)" )] select_endpoints: Option, #[structopt( long, - help = "The maximum number of video streams we would like to receive", + help = "The maximum number of video streams we would like to receive" )] last_n: Option, @@ -103,21 +103,21 @@ struct Opt { #[structopt( long, default_value = "1280", - help = "The width to scale received video to before passing it to the recv-pipeline.", + help = "The width to scale received video to before passing it to the recv-pipeline." )] recv_video_scale_width: u16, #[structopt( long, default_value = "720", - help = "The height to scale received video to before passing it to the recv-pipeline. This will also be signalled as the maximum height that JVB should send video to us at.", + help = "The height to scale received video to before passing it to the recv-pipeline. This will also be signalled as the maximum height that JVB should send video to us at." )] recv_video_scale_height: u16, #[structopt( long, default_value = "200", - help = "The size of the jitter buffers in milliseconds. Larger values are more resilient to packet loss and jitter, smaller values give lower latency.", + help = "The size of the jitter buffers in milliseconds. Larger values are more resilient to packet loss and jitter, smaller values give lower latency." )] buffer_size: u32, @@ -138,17 +138,11 @@ struct Opt { tls_insecure: bool, #[cfg(feature = "log-rtp")] - #[structopt( - long, - help = "Log all RTP packets at DEBUG level (extremely verbose)" - )] + #[structopt(long, help = "Log all RTP packets at DEBUG level (extremely verbose)")] log_rtp: bool, #[cfg(feature = "log-rtp")] - #[structopt( - long, - help = "Log all RTCP packets at DEBUG level" - )] + #[structopt(long, help = "Log all RTCP packets at DEBUG level")] log_rtcp: bool, } @@ -210,14 +204,14 @@ async fn main_inner() -> Result<()> { .map(|pipeline| gstreamer::parse_bin_from_description(pipeline, false)) .transpose() .context("failed to parse send pipeline")?; - + let recv_pipeline = opt .recv_pipeline .as_ref() .map(|pipeline| gstreamer::parse_bin_from_description(pipeline, false)) .transpose() .context("failed to parse recv pipeline")?; - + let web_socket_url: Uri = opt.web_socket_url.parse()?; let xmpp_domain = opt @@ -297,8 +291,10 @@ async fn main_inner() -> Result<()> { let conference = JitsiConference::join(connection, main_loop.context(), config) .await .context("failed to join conference")?; - - conference.set_send_resolution(send_video_height.into()).await; + + conference + .set_send_resolution(send_video_height.into()) + .await; conference .send_colibri_message(ColibriMessage::ReceiverVideoConstraints { @@ -357,13 +353,21 @@ async fn main_inner() -> Result<()> { conference.add_bin(&bin).await?; if let Some(audio_element) = bin.by_name("audio") { - info!("recv pipeline has an audio element, a sink pad will be requested from it for each participant"); - conference.set_remote_participant_audio_sink_element(Some(audio_element)).await; + info!( + "recv pipeline has an audio element, a sink pad will be requested from it for each participant" + ); + conference + .set_remote_participant_audio_sink_element(Some(audio_element)) + .await; } if let Some(video_element) = bin.by_name("video") { - info!("recv pipeline has a video element, a sink pad will be requested from it for each participant"); - conference.set_remote_participant_video_sink_element(Some(video_element)).await; + info!( + "recv pipeline has a video element, a sink pad will be requested from it for each participant" + ); + conference + .set_remote_participant_video_sink_element(Some(video_element)) + .await; } } diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 744d98d..66bb61f 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -1,4 +1,6 @@ -use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, sync::Arc, time::Duration, +}; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; @@ -10,7 +12,10 @@ use jitsi_xmpp_parsers::jingle::{Action, Jingle}; use maplit::hashmap; use once_cell::sync::Lazy; use serde::Serialize; -use tokio::{sync::{mpsc, oneshot, Mutex}, time}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + time, +}; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; @@ -78,7 +83,7 @@ pub struct JitsiConferenceConfig { pub region: Option, pub video_codec: String, pub extra_muc_features: Vec, - + pub start_bitrate: u32, pub stereo: bool, @@ -377,10 +382,10 @@ impl JitsiConference { } /// Set the max resolution that we are currently sending. - /// + /// /// Setting this is required for browser clients in the same conference to display /// the stats that we broadcast. - /// + /// /// Note that lib-gst-meet does not encode video (that is the responsibility of your /// GStreamer pipeline), so this is purely informational. pub async fn set_send_resolution(&self, height: i32) { @@ -658,18 +663,35 @@ impl StanzaFilter for JitsiConference { jingle_session.stats_handler_task = Some(tokio::spawn(async move { let mut interval = time::interval(SEND_STATS_INTERVAL); loop { - let maybe_remote_ssrc_map = self_.jingle_session.lock().await.as_ref().map(|sess| sess.remote_ssrc_map.clone()); + let maybe_remote_ssrc_map = self_ + .jingle_session + .lock() + .await + .as_ref() + .map(|sess| sess.remote_ssrc_map.clone()); let maybe_source_stats: Option> = self_ .pipeline() .await .ok() .and_then(|pipeline| pipeline.by_name("rtpbin")) - .and_then(|rtpbin| rtpbin.try_emit_by_name("get-session", &[&0u32]).ok()) - .and_then(|rtpsession: gstreamer::Element| rtpsession.try_property("stats").ok()) + .and_then(|rtpbin| { + rtpbin.try_emit_by_name("get-session", &[&0u32]).ok() + }) + .and_then(|rtpsession: gstreamer::Element| { + rtpsession.try_property("stats").ok() + }) .and_then(|stats: gstreamer::Structure| stats.get("source-stats").ok()) - .and_then(|stats: glib::ValueArray| stats.into_iter().map(|v| v.get()).collect::>().ok()); - - if let (Some(remote_ssrc_map), Some(source_stats)) = (maybe_remote_ssrc_map, maybe_source_stats) { + .and_then(|stats: glib::ValueArray| { + stats + .into_iter() + .map(|v| v.get()) + .collect::>() + .ok() + }); + + if let (Some(remote_ssrc_map), Some(source_stats)) = + (maybe_remote_ssrc_map, maybe_source_stats) + { debug!("source stats: {:#?}", source_stats); let audio_recv_bitrate: u64 = source_stats @@ -679,7 +701,14 @@ impl StanzaFilter for JitsiConference { .get("ssrc") .ok() .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) - .map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .map(|source| { + source.media_type == MediaType::Audio + && source + .participant_id + .as_ref() + .map(|id| id != &my_endpoint_id) + .unwrap_or_default() + }) .unwrap_or_default() }) .filter_map(|stat| stat.get::("bitrate").ok()) @@ -692,7 +721,14 @@ impl StanzaFilter for JitsiConference { .get("ssrc") .ok() .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) - .map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .map(|source| { + source.media_type == MediaType::Video + && source + .participant_id + .as_ref() + .map(|id| id != &my_endpoint_id) + .unwrap_or_default() + }) .unwrap_or_default() }) .filter_map(|stat| stat.get::("bitrate").ok()) @@ -705,7 +741,14 @@ impl StanzaFilter for JitsiConference { .get("ssrc") .ok() .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) - .map(|source| source.media_type == MediaType::Audio && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default()) + .map(|source| { + source.media_type == MediaType::Audio + && source + .participant_id + .as_ref() + .map(|id| id == &my_endpoint_id) + .unwrap_or_default() + }) .unwrap_or_default() }) .and_then(|stat| stat.get("bitrate").ok()) @@ -717,7 +760,14 @@ impl StanzaFilter for JitsiConference { .get("ssrc") .ok() .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) - .map(|source| source.media_type == MediaType::Video && source.participant_id.as_ref().map(|id| id == &my_endpoint_id).unwrap_or_default()) + .map(|source| { + source.media_type == MediaType::Video + && source + .participant_id + .as_ref() + .map(|id| id == &my_endpoint_id) + .unwrap_or_default() + }) .unwrap_or_default() }) .and_then(|stat| stat.get("bitrate").ok()) @@ -730,7 +780,13 @@ impl StanzaFilter for JitsiConference { .get("ssrc") .ok() .and_then(|ssrc: u32| remote_ssrc_map.get(&ssrc)) - .map(|source| source.participant_id.as_ref().map(|id| id != &my_endpoint_id).unwrap_or_default()) + .map(|source| { + source + .participant_id + .as_ref() + .map(|id| id != &my_endpoint_id) + .unwrap_or_default() + }) .unwrap_or_default() }) .filter_map(|stat| stat.get::("packets-received").ok()) @@ -750,7 +806,8 @@ impl StanzaFilter for JitsiConference { // Loss can be negative because of duplicate packets. Clamp it to zero. .try_into() .unwrap_or_default(); - let recv_loss = recv_lost as f64 / (recv_packets as f64 + recv_lost as f64); + let recv_loss = + recv_lost as f64 / (recv_packets as f64 + recv_lost as f64); let stats = ColibriMessage::EndpointStats { from: None, @@ -771,10 +828,10 @@ impl StanzaFilter for JitsiConference { packet_loss: colibri::PacketLoss { total: (recv_loss * 100.) as u64, download: (recv_loss * 100.) as u64, - upload: 0, // TODO + upload: 0, // TODO }, connection_quality: 100.0, - jvb_rtt: Some(0), // TODO + jvb_rtt: Some(0), // TODO server_region: self_.config.region.clone(), max_enabled_resolution: self_.inner.lock().await.send_resolution, }; @@ -786,7 +843,7 @@ impl StanzaFilter for JitsiConference { warn!("unable to get stats from pipeline"); } interval.tick().await; - } + } })); } @@ -797,14 +854,24 @@ impl StanzaFilter for JitsiConference { while let Some(msg) = stream.next().await { // Some message types are handled internally rather than passed to the on_colibri_message handler. let handled = match &msg { - ColibriMessage::EndpointMessage { to: Some(to), from, msg_payload } if to == &my_endpoint_id => { + ColibriMessage::EndpointMessage { + to: Some(to), + from, + msg_payload, + } if to == &my_endpoint_id => { match serde_json::from_value::(msg_payload.clone()) { Ok(JsonMessage::E2ePingRequest { id }) => { - if let Err(e) = colibri_channel.send(ColibriMessage::EndpointMessage { - from: None, - to: from.clone(), - msg_payload: serde_json::to_value(JsonMessage::E2ePingResponse { id }).unwrap(), - }).await { + if let Err(e) = colibri_channel + .send(ColibriMessage::EndpointMessage { + from: None, + to: from.clone(), + msg_payload: serde_json::to_value( + JsonMessage::E2ePingResponse { id }, + ) + .unwrap(), + }) + .await + { warn!("failed to send e2e ping response: {:?}", e); } true diff --git a/lib-gst-meet/src/jingle.rs b/lib-gst-meet/src/jingle.rs index eb76fd2..9003007 100644 --- a/lib-gst-meet/src/jingle.rs +++ b/lib-gst-meet/src/jingle.rs @@ -3,10 +3,13 @@ use std::{collections::HashMap, fmt, net::SocketAddr}; use anyhow::{anyhow, bail, Context, Result}; use futures::stream::StreamExt; use glib::{Cast, ObjectExt, ToValue}; -use gstreamer::{Bin, Element, GhostPad, prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt}}; -use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension}; +use gstreamer::{ + prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt}, + Bin, Element, GhostPad, +}; #[cfg(feature = "log-rtp")] use gstreamer_rtp::RTPBuffer; +use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension}; use itertools::Itertools; use jitsi_xmpp_parsers::{ jingle::{Action, Content, Description, Jingle, Transport}, @@ -594,8 +597,7 @@ impl JingleSession { let f = || { debug!("rtpbin request-pt-map {:?}", values); let pt = values[2].get::()? as u8; - let mut caps = gstreamer::Caps::builder("application/x-rtp") - .field("payload", pt as i32); + let mut caps = gstreamer::Caps::builder("application/x-rtp").field("payload", pt as i32); for codec in codecs.iter() { if codec.is(pt) { if codec.is_audio() { @@ -861,7 +863,7 @@ impl JingleSession { pipeline .add(&depayloader) .context("failed to add depayloader to pipeline")?; - depayloader.sync_state_with_parent()?; + depayloader.sync_state_with_parent()?; debug!("created depayloader"); rtpbin .link_pads(Some(&pad_name), &depayloader, None) @@ -872,9 +874,13 @@ impl JingleSession { debug!("rtpbin pads:\n{}", dump_pads(&rtpbin)); let queue = gstreamer::ElementFactory::make("queue", None)?; - pipeline.add(&queue).context("failed to add queue to pipeline")?; + pipeline + .add(&queue) + .context("failed to add queue to pipeline")?; queue.sync_state_with_parent()?; - depayloader.link(&queue).context("failed to link depayloader to queue")?; + depayloader + .link(&queue) + .context("failed to link depayloader to queue")?; let decoder = match source.media_type { MediaType::Audio => { @@ -898,7 +904,10 @@ impl JingleSession { if let Some(codec) = codec { let decoder = gstreamer::ElementFactory::make(codec.decoder_name(), None)?; decoder.set_property("automatic-request-sync-points", true); - decoder.set_property_from_str("automatic-request-sync-point-flags", "GST_VIDEO_DECODER_REQUEST_SYNC_POINT_CORRUPT_OUTPUT"); + decoder.set_property_from_str( + "automatic-request-sync-point-flags", + "GST_VIDEO_DECODER_REQUEST_SYNC_POINT_CORRUPT_OUTPUT", + ); decoder } else { @@ -907,9 +916,13 @@ impl JingleSession { }, }; - pipeline.add(&decoder).context("failed to add decoder to pipeline")?; + pipeline + .add(&decoder) + .context("failed to add decoder to pipeline")?; decoder.sync_state_with_parent()?; - queue.link(&decoder).context("failed to link queue to decoder")?; + queue + .link(&decoder) + .context("failed to link queue to decoder")?; let src_pad = match source.media_type { MediaType::Audio => decoder @@ -917,41 +930,81 @@ impl JingleSession { .context("decoder has no src pad")?, MediaType::Video => { let videoscale = gstreamer::ElementFactory::make("videoscale", None)?; - pipeline.add(&videoscale).context("failed to add videoscale to pipeline")?; + pipeline + .add(&videoscale) + .context("failed to add videoscale to pipeline")?; videoscale.sync_state_with_parent()?; - decoder.link(&videoscale).context("failed to link decoder to videoscale")?; + decoder + .link(&videoscale) + .context("failed to link decoder to videoscale")?; let capsfilter = gstreamer::ElementFactory::make("capsfilter", None)?; - capsfilter.set_property_from_str("caps", &format!("video/x-raw, width={}, height={}", conference.config.recv_video_scale_width, conference.config.recv_video_scale_height)); - pipeline.add(&capsfilter).context("failed to add capsfilter to pipeline")?; + capsfilter.set_property_from_str( + "caps", + &format!( + "video/x-raw, width={}, height={}", + conference.config.recv_video_scale_width, + conference.config.recv_video_scale_height + ), + ); + pipeline + .add(&capsfilter) + .context("failed to add capsfilter to pipeline")?; capsfilter.sync_state_with_parent()?; - videoscale.link(&capsfilter).context("failed to link videoscale to capsfilter")?; + videoscale + .link(&capsfilter) + .context("failed to link videoscale to capsfilter")?; let videoconvert = gstreamer::ElementFactory::make("videoconvert", None)?; - pipeline.add(&videoconvert).context("failed to add videoconvert to pipeline")?; + pipeline + .add(&videoconvert) + .context("failed to add videoconvert to pipeline")?; videoconvert.sync_state_with_parent()?; - capsfilter.link(&videoconvert).context("failed to link capsfilter to videoconvert")?; + capsfilter + .link(&videoconvert) + .context("failed to link capsfilter to videoconvert")?; - videoconvert.static_pad("src").context("videoconvert has no src pad")? + videoconvert + .static_pad("src") + .context("videoconvert has no src pad")? }, }; if let Some(participant_id) = source.participant_id { handle.block_on(conference.ensure_participant(&participant_id))?; let maybe_sink_element = match source.media_type { - MediaType::Audio => handle.block_on(conference.remote_participant_audio_sink_element()), - MediaType::Video => handle.block_on(conference.remote_participant_video_sink_element()), + MediaType::Audio => { + handle.block_on(conference.remote_participant_audio_sink_element()) + }, + MediaType::Video => { + handle.block_on(conference.remote_participant_video_sink_element()) + }, }; if let Some(sink_element) = maybe_sink_element { let sink_pad = sink_element .request_pad_simple("sink_%u") .context("no suitable sink pad provided by sink element in recv pipeline")?; - let ghost_pad = GhostPad::with_target(Some(&format!("participant_{}_{:?}", participant_id, source.media_type)), &sink_pad)?; - let bin: Bin = sink_element.parent().context("sink element has no parent")?.downcast().map_err(|_| anyhow!("sink element's parent is not a bin"))?; + let ghost_pad = GhostPad::with_target( + Some(&format!( + "participant_{}_{:?}", + participant_id, source.media_type + )), + &sink_pad, + )?; + let bin: Bin = sink_element + .parent() + .context("sink element has no parent")? + .downcast() + .map_err(|_| anyhow!("sink element's parent is not a bin"))?; bin.add_pad(&ghost_pad)?; - - src_pad.link(&ghost_pad).context("failed to link decode chain to participant bin from recv pipeline")?; - info!("linked {}/{:?} to new pad in recv pipeline", participant_id, source.media_type); + + src_pad + .link(&ghost_pad) + .context("failed to link decode chain to participant bin from recv pipeline")?; + info!( + "linked {}/{:?} to new pad in recv pipeline", + participant_id, source.media_type + ); } else if let Some(participant_bin) = pipeline.by_name(&format!("participant_{}", participant_id)) @@ -961,8 +1014,13 @@ impl JingleSession { MediaType::Video => "video", }; if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { - src_pad.link(&sink_pad).context("failed to link decode chain to participant bin from recv participant pipeline")?; - info!("linked {}/{:?} to recv participant pipeline", participant_id, source.media_type); + src_pad.link(&sink_pad).context( + "failed to link decode chain to participant bin from recv participant pipeline", + )?; + info!( + "linked {}/{:?} to recv participant pipeline", + participant_id, source.media_type + ); } else { warn!( @@ -984,7 +1042,9 @@ impl JingleSession { let fakesink = gstreamer::ElementFactory::make("fakesink", None)?; pipeline.add(&fakesink)?; fakesink.sync_state_with_parent()?; - let sink_pad = fakesink.static_pad("sink").context("fakesink has no sink pad")?; + let sink_pad = fakesink + .static_pad("sink") + .context("fakesink has no sink pad")?; src_pad.link(&sink_pad)?; } @@ -1125,13 +1185,13 @@ impl JingleSession { let buffer: gstreamer::Buffer = values[1].get()?; let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer)?; debug!( - ssrc=rtp_buffer.ssrc(), - pt=rtp_buffer.payload_type(), - seq=rtp_buffer.seq(), - ts=rtp_buffer.timestamp(), - marker=rtp_buffer.is_marker(), - extension=rtp_buffer.is_extension(), - payload_size=rtp_buffer.payload_size(), + ssrc = rtp_buffer.ssrc(), + pt = rtp_buffer.payload_type(), + seq = rtp_buffer.seq(), + ts = rtp_buffer.timestamp(), + marker = rtp_buffer.is_marker(), + extension = rtp_buffer.is_extension(), + payload_size = rtp_buffer.payload_size(), "RTP {}", direction, ); @@ -1156,14 +1216,11 @@ impl JingleSession { let f = || { let buffer: gstreamer::Buffer = values[1].get()?; let mut buf = [0u8; 1500]; - buffer.copy_to_slice(0, &mut buf[..buffer.size()]).map_err(|_| anyhow!("invalid RTCP packet size"))?; + buffer + .copy_to_slice(0, &mut buf[..buffer.size()]) + .map_err(|_| anyhow!("invalid RTCP packet size"))?; let decoded = rtcp::packet::unmarshal(&mut &buf[..buffer.size()])?; - debug!( - "RTCP {} size={}\n{:#?}", - direction, - buffer.size(), - decoded, - ); + debug!("RTCP {} size={}\n{:#?}", direction, buffer.size(), decoded,); Ok::<_, anyhow::Error>(()) }; if let Err(e) = f() { @@ -1177,7 +1234,7 @@ impl JingleSession { rtcp_send_identity.connect("handoff", false, make_rtcp_logger("SEND")); } } - + debug!("link dtlssrtpdec -> rtpbin"); dtlssrtpdec.link_pads(Some("rtp_src"), &rtp_recv_identity, None)?; rtp_recv_identity.link_pads(None, &rtpbin, Some("recv_rtp_sink_0"))?; @@ -1472,12 +1529,21 @@ fn dump_pads(element: &Element) -> String { element .pads() .into_iter() - .map(|pad| format!( - " {}, peer={}.{}, caps=\"{}\"", - pad.name(), - pad.peer().and_then(|peer| peer.parent_element()).map(|element| element.name().to_string()).unwrap_or_default(), - pad.peer().map(|peer| peer.name().to_string()).unwrap_or_default(), - pad.caps().map(|caps| caps.to_string()).unwrap_or_default(), - )) + .map(|pad| { + format!( + " {}, peer={}.{}, caps=\"{}\"", + pad.name(), + pad + .peer() + .and_then(|peer| peer.parent_element()) + .map(|element| element.name().to_string()) + .unwrap_or_default(), + pad + .peer() + .map(|peer| peer.name().to_string()) + .unwrap_or_default(), + pad.caps().map(|caps| caps.to_string()).unwrap_or_default(), + ) + }) .join("\n") -} \ No newline at end of file +} diff --git a/lib-gst-meet/src/pinger.rs b/lib-gst-meet/src/pinger.rs index d084de0..fcd3061 100644 --- a/lib-gst-meet/src/pinger.rs +++ b/lib-gst-meet/src/pinger.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use tokio::{sync::mpsc, task::JoinHandle, time}; use tracing::warn; -use xmpp_parsers::{iq::Iq, Element, FullJid, Jid, ping::Ping}; +use xmpp_parsers::{iq::Iq, ping::Ping, Element, FullJid, Jid}; use crate::{stanza_filter::StanzaFilter, util::generate_id}; @@ -29,11 +29,7 @@ impl Pinger { } } }); - Pinger { - jid, - tx, - ping_task, - } + Pinger { jid, tx, ping_task } } } diff --git a/lib-gst-meet/src/tls.rs b/lib-gst-meet/src/tls.rs index bf6cc59..cf98549 100644 --- a/lib-gst-meet/src/tls.rs +++ b/lib-gst-meet/src/tls.rs @@ -12,8 +12,12 @@ use tokio_tungstenite::Connector; #[cfg(feature = "tls-rustls-native-roots")] pub(crate) fn wss_connector(insecure: bool) -> Result { let mut roots = rustls::RootCertStore::empty(); - for cert in rustls_native_certs::load_native_certs().context("failed to load native root certs")? { - roots.add(&rustls::Certificate(cert.0)).context("failed to add native root certs")?; + for cert in + rustls_native_certs::load_native_certs().context("failed to load native root certs")? + { + roots + .add(&rustls::Certificate(cert.0)) + .context("failed to add native root certs")?; } let mut config = rustls::ClientConfig::builder() @@ -80,7 +84,11 @@ pub(crate) fn wss_connector(insecure: bool) -> Result