From 4575371d6db6f01177279d43c01350a1d41724c3 Mon Sep 17 00:00:00 2001 From: Jasper Hugo Date: Wed, 25 Aug 2021 11:21:43 +0700 Subject: [PATCH] fmt --- lib-gst-meet-c/src/lib.rs | 83 +++-- lib-gst-meet/Cargo.toml | 3 +- lib-gst-meet/src/colibri.rs | 60 ++-- lib-gst-meet/src/conference.rs | 44 ++- lib-gst-meet/src/jingle.rs | 572 ++++++++++++++++++--------------- lib-gst-meet/src/lib.rs | 2 +- 6 files changed, 432 insertions(+), 332 deletions(-) diff --git a/lib-gst-meet-c/src/lib.rs b/lib-gst-meet-c/src/lib.rs index 7abdac7..6f86162 100644 --- a/lib-gst-meet-c/src/lib.rs +++ b/lib-gst-meet-c/src/lib.rs @@ -1,14 +1,20 @@ use std::{ - ffi::{CStr, CString, c_void}, + ffi::{c_void, CStr, CString}, os::raw::c_char, ptr, - sync::{Arc, atomic::{AtomicPtr, Ordering}}, + sync::{ + atomic::{AtomicPtr, Ordering}, + Arc, + }, }; use anyhow::Result; -use glib::{ffi::GMainContext, translate::{from_glib, from_glib_full, ToGlibPtr}}; -pub use lib_gst_meet::{init_tracing, JitsiConference, JitsiConnection, MediaType}; +use glib::{ + ffi::GMainContext, + translate::{from_glib, from_glib_full, ToGlibPtr}, +}; use lib_gst_meet::JitsiConferenceConfig; +pub use lib_gst_meet::{init_tracing, JitsiConference, JitsiConnection, MediaType}; use tokio::runtime::Runtime; pub struct Context { @@ -42,7 +48,7 @@ impl ResultExt for Result { Err(e) => { eprintln!("lib-gst-meet: {:?}", e); ptr::null_mut() - } + }, } } } @@ -79,7 +85,10 @@ pub unsafe extern "C" fn gstmeet_connection_new( let xmpp_domain = CStr::from_ptr(xmpp_domain); (*context) .runtime - .block_on(JitsiConnection::new(&websocket_url.to_string_lossy(), &xmpp_domain.to_string_lossy())) + .block_on(JitsiConnection::new( + &websocket_url.to_string_lossy(), + &xmpp_domain.to_string_lossy(), + )) .map(|(connection, background)| { (*context).runtime.spawn(background); connection @@ -93,7 +102,10 @@ pub unsafe extern "C" fn gstmeet_connection_free(connection: *mut JitsiConnectio } #[no_mangle] -pub unsafe extern "C" fn gstmeet_connection_connect(context: *mut Context, connection: *mut JitsiConnection) -> bool { +pub unsafe extern "C" fn gstmeet_connection_connect( + context: *mut Context, + connection: *mut JitsiConnection, +) -> bool { (*context) .runtime .block_on((*connection).connect()) @@ -126,8 +138,12 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference( muc, focus, nick: CStr::from_ptr((*config).nick).to_string_lossy().to_string(), - region: CStr::from_ptr((*config).region).to_string_lossy().to_string(), - video_codec: CStr::from_ptr((*config).video_codec).to_string_lossy().to_string(), + region: CStr::from_ptr((*config).region) + .to_string_lossy() + .to_string(), + video_codec: CStr::from_ptr((*config).video_codec) + .to_string_lossy() + .to_string(), }; (*context) .runtime @@ -136,7 +152,10 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference( } #[no_mangle] -pub unsafe extern "C" fn gstmeet_conference_leave(context: *mut Context, conference: *mut JitsiConference) -> bool { +pub unsafe extern "C" fn gstmeet_conference_leave( + context: *mut Context, + conference: *mut JitsiConference, +) -> bool { (*context) .runtime .block_on(Box::from_raw(conference).leave()) @@ -145,7 +164,12 @@ pub unsafe extern "C" fn gstmeet_conference_leave(context: *mut Context, confere } #[no_mangle] -pub unsafe extern "C" fn gstmeet_conference_set_muted(context: *mut Context, conference: *mut JitsiConference, media_type: MediaType, muted: bool) -> bool { +pub unsafe extern "C" fn gstmeet_conference_set_muted( + context: *mut Context, + conference: *mut JitsiConference, + media_type: MediaType, + muted: bool, +) -> bool { (*context) .runtime .block_on((*conference).set_muted(media_type, muted)) @@ -154,7 +178,10 @@ pub unsafe extern "C" fn gstmeet_conference_set_muted(context: *mut Context, con } #[no_mangle] -pub unsafe extern "C" fn gstmeet_conference_pipeline(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstPipeline { +pub unsafe extern "C" fn gstmeet_conference_pipeline( + context: *mut Context, + conference: *mut JitsiConference, +) -> *mut gstreamer::ffi::GstPipeline { (*context) .runtime .block_on((*conference).pipeline()) @@ -164,7 +191,10 @@ pub unsafe extern "C" fn gstmeet_conference_pipeline(context: *mut Context, conf } #[no_mangle] -pub unsafe extern "C" fn gstmeet_conference_audio_sink_element(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstElement { +pub unsafe extern "C" fn gstmeet_conference_audio_sink_element( + context: *mut Context, + conference: *mut JitsiConference, +) -> *mut gstreamer::ffi::GstElement { (*context) .runtime .block_on((*conference).audio_sink_element()) @@ -174,7 +204,10 @@ pub unsafe extern "C" fn gstmeet_conference_audio_sink_element(context: *mut Con } #[no_mangle] -pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstElement { +pub unsafe extern "C" fn gstmeet_conference_video_sink_element( + context: *mut Context, + conference: *mut JitsiConference, +) -> *mut gstreamer::ffi::GstElement { (*context) .runtime .block_on((*conference).video_sink_element()) @@ -187,13 +220,16 @@ pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Con pub unsafe extern "C" fn gstmeet_conference_on_participant( context: *mut Context, conference: *mut JitsiConference, - f: unsafe extern "C" fn(*mut JitsiConference, Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin, + f: unsafe extern "C" fn( + *mut JitsiConference, + Participant, + *mut c_void, + ) -> *mut gstreamer::ffi::GstBin, ctx: *mut c_void, ) { let ctx = Arc::new(AtomicPtr::new(ctx)); - (*context) - .runtime - .block_on((*conference).on_participant(move |conference, participant| { + (*context).runtime.block_on( + (*conference).on_participant(move |conference, participant| { let ctx = ctx.clone(); Box::pin(async move { let participant = Participant { @@ -205,10 +241,15 @@ pub unsafe extern "C" fn gstmeet_conference_on_participant( .transpose()? .unwrap_or_else(ptr::null), }; - f(Box::into_raw(Box::new(conference)), participant, ctx.load(Ordering::Relaxed)); + f( + Box::into_raw(Box::new(conference)), + participant, + ctx.load(Ordering::Relaxed), + ); Ok(()) }) - })); + }), + ); } #[no_mangle] @@ -222,4 +263,4 @@ pub unsafe extern "C" fn gstmeet_conference_set_pipeline_state( .block_on((*conference).set_pipeline_state(from_glib(state))) .map_err(|e| eprintln!("lib-gst-meet: {:?}", e)) .is_ok() -} \ No newline at end of file +} diff --git a/lib-gst-meet/Cargo.toml b/lib-gst-meet/Cargo.toml index bd46715..3df4899 100644 --- a/lib-gst-meet/Cargo.toml +++ b/lib-gst-meet/Cargo.toml @@ -13,7 +13,8 @@ async-trait = { version = "0.1", default-features = false } bytes = { version = "1", default-features = false, features = ["std"] } futures = { version = "0.3", default-features = false } glib = { version = "0.14", default-features = false } -gstreamer = { version = "0.17", default-features = false, features = ["v1_16"] } +gstreamer = { version = "0.17", default-features = false, features = ["v1_20"] } +gstreamer-rtp = { version = "0.17", default-features = false, features = ["v1_20"] } hex = { version = "0.4", default-features = false, features = ["std"] } itertools = { version = "0.10", default-features = false, features = ["use_std"] } libc = { version = "0.2", default-features = false } diff --git a/lib-gst-meet/src/colibri.rs b/lib-gst-meet/src/colibri.rs index 0d6f233..3b07b61 100644 --- a/lib-gst-meet/src/colibri.rs +++ b/lib-gst-meet/src/colibri.rs @@ -20,10 +20,7 @@ pub enum ColibriMessage { previous_speakers: Vec, }, #[serde(rename_all = "camelCase")] - EndpointConnectivityStatusChangeEvent { - endpoint: String, - active: bool, - }, + EndpointConnectivityStatusChangeEvent { endpoint: String, active: bool }, #[serde(rename_all = "camelCase")] EndpointMessage { from: String, @@ -42,17 +39,11 @@ pub enum ColibriMessage { max_enabled_resolution: u16, }, #[serde(rename_all = "camelCase")] - LastNChangedEvent { - last_n: u16, - }, + LastNChangedEvent { last_n: u16 }, #[serde(rename_all = "camelCase")] - LastNEndpointsChangeEvent { - last_n_endpoints: Vec, - }, + LastNEndpointsChangeEvent { last_n_endpoints: Vec }, #[serde(rename_all = "camelCase")] - ReceiverVideoConstraint { - max_frame_height: u16, - }, + ReceiverVideoConstraint { max_frame_height: u16 }, #[serde(rename_all = "camelCase")] ReceiverVideoConstraints { last_n: Option, @@ -62,21 +53,13 @@ pub enum ColibriMessage { constraints: Option>, }, #[serde(rename_all = "camelCase")] - SelectedEndpointsChangedEvent { - selected_endpoints: Vec, - }, + SelectedEndpointsChangedEvent { selected_endpoints: Vec }, #[serde(rename_all = "camelCase")] - SenderVideoConstraints { - video_constraints: Constraints, - }, + SenderVideoConstraints { video_constraints: Constraints }, #[serde(rename_all = "camelCase")] - ServerHello { - version: Option, - }, + ServerHello { version: Option }, #[serde(rename_all = "camelCase")] - VideoTypeMessage { - video_type: VideoType, - }, + VideoTypeMessage { video_type: VideoType }, } #[derive(Clone, Copy, Debug, Deserialize, Serialize)] @@ -124,11 +107,9 @@ pub(crate) struct ColibriChannel { impl ColibriChannel { pub(crate) async fn new(colibri_url: &str) -> Result { - let request = - Request::get(colibri_url).body(())?; - let (colibri_websocket, _response) = - tokio_tungstenite::connect_async(request).await?; - + let request = Request::get(colibri_url).body(())?; + let (colibri_websocket, _response) = tokio_tungstenite::connect_async(request).await?; + info!("Connected Colibri WebSocket"); let (mut colibri_sink, mut colibri_stream) = colibri_websocket.split(); @@ -151,11 +132,17 @@ impl ColibriChannel { } } }, - Err(e) => warn!("failed to parse frame on colibri websocket: {:?}\nframe: {}", e, text), + Err(e) => warn!( + "failed to parse frame on colibri websocket: {:?}\nframe: {}", + e, text + ), } }, - Message::Binary(data) => debug!("received unexpected {} byte binary frame on colibri websocket", data.len()), - Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite + Message::Binary(data) => debug!( + "received unexpected {} byte binary frame on colibri websocket", + data.len() + ), + Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite Message::Close(_) => { debug!("received close frame on colibri websocket"); // TODO reconnect @@ -194,10 +181,7 @@ impl ColibriChannel { }; }); - Ok(Self { - send_tx, - recv_tx, - }) + Ok(Self { send_tx, recv_tx }) } pub(crate) async fn subscribe(&self, tx: mpsc::Sender) { @@ -208,4 +192,4 @@ impl ColibriChannel { self.send_tx.send(msg).await?; Ok(()) } -} \ No newline at end of file +} diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 0aadb39..cc3e387 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -34,10 +34,9 @@ static DISCO_INFO: Lazy = Lazy::new(|| { Feature::new(ns::JINGLE_RTP_VIDEO), Feature::new(ns::JINGLE_ICE_UDP), Feature::new(ns::JINGLE_DTLS), - Feature::new("urn:ietf:rfc:5888"), // BUNDLE - Feature::new("urn:ietf:rfc:5761"), // RTCP-MUX - Feature::new("urn:ietf:rfc:4588"), // RTX - + Feature::new("urn:ietf:rfc:5888"), // BUNDLE + Feature::new("urn:ietf:rfc:5761"), // RTCP-MUX + Feature::new("urn:ietf:rfc:4588"), // RTX ]; let gst_version = gstreamer::version(); if gst_version.0 >= 1 && gst_version.1 >= 19 { @@ -105,9 +104,12 @@ type BoxedResultFuture = Pin> + Send>>; pub(crate) struct JitsiConferenceInner { pub(crate) jingle_session: Option, participants: HashMap, - on_participant: Option BoxedResultFuture) + Send + Sync>>, - on_participant_left: Option BoxedResultFuture) + Send + Sync>>, - on_colibri_message: Option BoxedResultFuture) + Send + Sync>>, + on_participant: + Option BoxedResultFuture) + Send + Sync>>, + on_participant_left: + Option BoxedResultFuture) + Send + Sync>>, + on_colibri_message: + Option BoxedResultFuture) + Send + Sync>>, state: JitsiConferenceState, connected_tx: Option>, connected_rx: Option>, @@ -292,7 +294,10 @@ impl JitsiConference { } #[tracing::instrument(level = "trace", skip(f))] - pub async fn on_participant(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) { + pub async fn on_participant( + &self, + f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static, + ) { let f = Arc::new(f); let f2 = f.clone(); let existing_participants: Vec<_> = { @@ -312,12 +317,18 @@ impl JitsiConference { } #[tracing::instrument(level = "trace", skip(f))] - pub async fn on_participant_left(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) { + pub async fn on_participant_left( + &self, + f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static, + ) { self.inner.lock().await.on_participant_left = Some(Arc::new(f)); } #[tracing::instrument(level = "trace", skip(f))] - pub async fn on_colibri_message(&self, f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static) { + pub async fn on_colibri_message( + &self, + f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static, + ) { self.inner.lock().await.on_colibri_message = Some(Arc::new(f)); } } @@ -472,7 +483,11 @@ impl StanzaFilter for JitsiConference { let colibri_channel = ColibriChannel::new(&colibri_url).await?; let (tx, rx) = mpsc::channel(8); colibri_channel.subscribe(tx).await; - locked_inner.jingle_session.as_mut().unwrap().colibri_channel = Some(colibri_channel); + locked_inner + .jingle_session + .as_mut() + .unwrap() + .colibri_channel = Some(colibri_channel); let self_ = self.clone(); tokio::spawn(async move { @@ -523,7 +538,12 @@ impl StanzaFilter for JitsiConference { muc_jid: from.clone(), nick: item.nick, }; - if presence.type_ == presence::Type::Unavailable && locked_inner.participants.remove(&from.resource.clone()).is_some() { + if presence.type_ == presence::Type::Unavailable + && locked_inner + .participants + .remove(&from.resource.clone()) + .is_some() + { debug!("participant left: {:?}", jid); if let Some(f) = &locked_inner.on_participant_left { debug!("calling on_participant_left with old participant"); diff --git a/lib-gst-meet/src/jingle.rs b/lib-gst-meet/src/jingle.rs index 455ba08..143287b 100644 --- a/lib-gst-meet/src/jingle.rs +++ b/lib-gst-meet/src/jingle.rs @@ -4,16 +4,13 @@ use anyhow::{anyhow, bail, Context, Result}; use futures::stream::StreamExt; use glib::{ObjectExt, ToValue}; use gstreamer::prelude::{ElementExt, GObjectExtManualGst, GstBinExt, PadExt}; +use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension}; use nice_gst_meet as nice; use pem::Pem; use rand::random; use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256}; use ring::digest::{digest, SHA256}; -use tokio::{ - net::lookup_host, - runtime::Handle, - sync::oneshot, -}; +use tokio::{net::lookup_host, runtime::Handle, sync::oneshot}; use tracing::{debug, error, warn}; use uuid::Uuid; use xmpp_parsers::{ @@ -37,7 +34,8 @@ use crate::{ const RTP_HDREXT_SSRC_AUDIO_LEVEL: &str = "urn:ietf:params:rtp-hdrext:ssrc-audio-level"; const RTP_HDREXT_ABS_SEND_TIME: &str = "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time"; -const RTP_HDREXT_TRANSPORT_CC: &str = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; +const RTP_HDREXT_TRANSPORT_CC: &str = + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; const DEFAULT_STUN_PORT: u16 = 3478; const DEFAULT_TURNS_PORT: u16 = 5349; @@ -162,14 +160,12 @@ impl JingleSession { .iter() .find(|pt| { pt.name.as_deref() == Some("rtx") - && - pt - .parameters - .iter() - .any(|param| { + && pt.parameters.iter().any(|param| { param.name == "apt" - && - param.value == h264_payload_type.map(|pt| pt.to_string()).unwrap_or_default() + && param.value + == h264_payload_type + .map(|pt| pt.to_string()) + .unwrap_or_default() }) }) .map(|pt| pt.id); @@ -188,14 +184,12 @@ impl JingleSession { .iter() .find(|pt| { pt.name.as_deref() == Some("rtx") - && - pt - .parameters - .iter() - .any(|param| { + && pt.parameters.iter().any(|param| { param.name == "apt" - && - param.value == vp8_payload_type.map(|pt| pt.to_string()).unwrap_or_default() + && param.value + == vp8_payload_type + .map(|pt| pt.to_string()) + .unwrap_or_default() }) }) .map(|pt| pt.id); @@ -214,14 +208,12 @@ impl JingleSession { .iter() .find(|pt| { pt.name.as_deref() == Some("rtx") - && - pt - .parameters - .iter() - .any(|param| { + && pt.parameters.iter().any(|param| { param.name == "apt" - && - param.value == vp9_payload_type.map(|pt| pt.to_string()).unwrap_or_default() + && param.value + == vp9_payload_type + .map(|pt| pt.to_string()) + .unwrap_or_default() }) }) .map(|pt| pt.id); @@ -510,50 +502,59 @@ impl JingleSession { } Ok::<_, anyhow::Error>(Some(caps.build())) } - else if Some(pt) == h264_payload_type || Some(pt) == vp8_payload_type || Some(pt) == vp9_payload_type { + else if Some(pt) == h264_payload_type + || Some(pt) == vp8_payload_type + || Some(pt) == vp9_payload_type + { caps = caps .field("media", "video") .field("clock-rate", 90000) - .field("encoding-name", if Some(pt) == h264_payload_type { - "H264" - } - else if Some(pt) == vp8_payload_type { - "VP8" - } - else if Some(pt) == vp9_payload_type { - "VP9" - } - else { - unreachable!() - }); - if let Some(hdrext) = video_hdrext_abs_send_time { - caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME); - } + .field( + "encoding-name", + if Some(pt) == h264_payload_type { + "H264" + } + else if Some(pt) == vp8_payload_type { + "VP8" + } + else if Some(pt) == vp9_payload_type { + "VP9" + } + else { + unreachable!() + }, + ); + // if let Some(hdrext) = video_hdrext_abs_send_time { + // caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME); + // } if let Some(hdrext) = video_hdrext_transport_cc { caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_TRANSPORT_CC); } Ok(Some(caps.build())) } - else if Some(pt) == vp8_rtx_payload_type || Some(pt) == vp9_rtx_payload_type || Some(pt) == h264_rtx_payload_type { + else if Some(pt) == vp8_rtx_payload_type + || Some(pt) == vp9_rtx_payload_type + || Some(pt) == h264_rtx_payload_type + { caps = caps .field("media", "video") .field("clock-rate", 90000) .field("encoding-name", "RTX") - .field("apt", if Some(pt) == vp8_rtx_payload_type { - vp8_payload_type - .context("missing VP8 payload type")? - } - else if Some(pt) == vp9_rtx_payload_type { - vp9_payload_type - .context("missing VP9 payload type")? - } - else if Some(pt) == h264_rtx_payload_type { - h264_payload_type - .context("missing H264 payload type")? - } - else { - unreachable!() - }); + .field( + "apt", + if Some(pt) == vp8_rtx_payload_type { + vp8_payload_type.context("missing VP8 payload type")? + } + else if Some(pt) == vp9_rtx_payload_type { + vp9_payload_type.context("missing VP9 payload type")? + } + else if Some(pt) == h264_rtx_payload_type { + h264_payload_type.context("missing H264 payload type")? + } + else { + unreachable!() + }, + ); Ok(Some(caps.build())) } else { @@ -583,7 +584,10 @@ impl JingleSession { let rtpjitterbuffer: gstreamer::Element = values[1].get()?; let session: u32 = values[2].get()?; let ssrc: u32 = values[3].get()?; - debug!("new jitterbuffer created for session {} ssrc {}", session, ssrc); + debug!( + "new jitterbuffer created for session {} ssrc {}", + session, ssrc + ); let source = handle.block_on(async move { let locked_inner = inner_.lock().await; @@ -633,22 +637,18 @@ impl JingleSession { rtx_sender.set_property("payload-type-map", pt_map.build())?; rtx_sender.set_property("ssrc-map", ssrc_map.build())?; bin.add(&rtx_sender)?; - bin.add_pad( - &gstreamer::GhostPad::with_target( - Some(&format!("src_{}", session)), - &rtx_sender - .static_pad("src") - .context("rtprtxsend has no src pad")?, - )?, - )?; - bin.add_pad( - &gstreamer::GhostPad::with_target( - Some(&format!("sink_{}", session)), - &rtx_sender - .static_pad("sink") - .context("rtprtxsend has no sink pad")?, - )?, - )?; + bin.add_pad(&gstreamer::GhostPad::with_target( + Some(&format!("src_{}", session)), + &rtx_sender + .static_pad("src") + .context("rtprtxsend has no src pad")?, + )?)?; + bin.add_pad(&gstreamer::GhostPad::with_target( + Some(&format!("sink_{}", session)), + &rtx_sender + .static_pad("sink") + .context("rtprtxsend has no sink pad")?, + )?)?; Ok::<_, anyhow::Error>(Some(bin.to_value())) }; match f() { @@ -678,20 +678,18 @@ impl JingleSession { let rtx_receiver = gstreamer::ElementFactory::make("rtprtxreceive", None)?; rtx_receiver.set_property("payload-type-map", pt_map.build())?; bin.add(&rtx_receiver)?; - bin.add_pad( - &gstreamer::GhostPad::with_target( - Some(&format!("src_{}", session)), - &rtx_receiver.static_pad("src") - .context("rtprtxreceive has no src pad")?, - )?, - )?; - bin.add_pad( - &gstreamer::GhostPad::with_target( - Some(&format!("sink_{}", session)), - &rtx_receiver.static_pad("sink") - .context("rtprtxreceive has no sink pad")?, - )?, - )?; + bin.add_pad(&gstreamer::GhostPad::with_target( + Some(&format!("src_{}", session)), + &rtx_receiver + .static_pad("src") + .context("rtprtxreceive has no src pad")?, + )?)?; + bin.add_pad(&gstreamer::GhostPad::with_target( + Some(&format!("sink_{}", session)), + &rtx_receiver + .static_pad("sink") + .context("rtprtxreceive has no sink pad")?, + )?)?; Ok::<_, anyhow::Error>(Some(bin.to_value())) }; match f() { @@ -703,136 +701,158 @@ impl JingleSession { } })?; - let handle = Handle::current(); - let inner_ = conference.inner.clone(); - let pipeline_ = pipeline.clone(); - let rtpbin_ = rtpbin.clone(); - rtpbin.connect("pad-added", false, move |values| { - let inner_ = inner_.clone(); - let handle = handle.clone(); - let pipeline_ = pipeline_.clone(); - let rtpbin_ = rtpbin_.clone(); - let f = move || { - debug!("rtpbin pad-added {:?}", values); - let pad: gstreamer::Pad = values[1].get()?; - let pad_name: String = pad.property("name")?.get()?; - if pad_name.starts_with("recv_rtp_src_0_") { - let mut parts = pad_name.split('_').skip(4); - let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?; - let pt: u8 = parts.next().context("malformed pad name")?.parse()?; - let source = handle.block_on(async move { - let locked_inner = inner_.lock().await; - let jingle_session = locked_inner - .jingle_session - .as_ref() - .context("not connected (no jingle session)")?; - Ok::<_, anyhow::Error>( - jingle_session - .remote_ssrc_map - .get(&ssrc) - .context(format!("unknown ssrc: {}", ssrc))? - .clone(), - ) - })?; + { + let handle = Handle::current(); + let inner = conference.inner.clone(); + let pipeline = pipeline.clone(); + let rtpbin_ = rtpbin.clone(); + rtpbin.connect("pad-added", false, move |values| { + let rtpbin = &rtpbin_; + let f = || { + debug!("rtpbin pad-added {:?}", values); + let pad: gstreamer::Pad = values[1].get()?; + let pad_name: String = pad.property("name")?.get()?; + if pad_name.starts_with("recv_rtp_src_0_") { + let mut parts = pad_name.split('_').skip(4); + let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?; + let pt: u8 = parts.next().context("malformed pad name")?.parse()?; + let source = handle.block_on(async { + let locked_inner = inner.lock().await; + let jingle_session = locked_inner + .jingle_session + .as_ref() + .context("not connected (no jingle session)")?; + Ok::<_, anyhow::Error>( + jingle_session + .remote_ssrc_map + .get(&ssrc) + .context(format!("unknown ssrc: {}", ssrc))? + .clone(), + ) + })?; - debug!("pad added for remote source: {:?}", source); + debug!("pad added for remote source: {:?}", source); - let element_name = match source.media_type { - MediaType::Audio => { - if Some(pt) == opus_payload_type { - "rtpopusdepay" - } - else { - bail!("received audio with unsupported PT {}", pt); - } - }, - MediaType::Video => { - if Some(pt) == h264_payload_type { - "rtph264depay" - } - else if Some(pt) == vp8_payload_type { - "rtpvp8depay" - } - else if Some(pt) == vp9_payload_type { - "rtpvp9depay" - } - else { - bail!("received video with unsupported PT {}", pt); - } - }, - }; - - let source_element = gstreamer::ElementFactory::make(element_name, None)?; - if source_element.has_property("auto-header-extension", None) { - source_element.set_property("auto-header-extension", true)?; - } - if source_element.has_property("request-keyframe", None) { - source_element.set_property("request-keyframe", true)?; - } - pipeline_ - .add(&source_element) - .context(format!("failed to add {} to pipeline", element_name))?; - source_element.sync_state_with_parent()?; - debug!("created {} element", element_name); - rtpbin_ - .link_pads(Some(&pad_name), &source_element, None) - .context(format!( - "failed to link rtpbin.{} to {}", - pad_name, element_name - ))?; - debug!("linked rtpbin.{} to {}", pad_name, element_name); - - let src_pad = source_element - .static_pad("src") - .context("depayloader has no src pad")?; - - if let Some(participant_bin) = - pipeline_.by_name(&format!("participant_{}", source.participant_id)) - { - let sink_pad_name = match source.media_type { - MediaType::Audio => "audio", - MediaType::Video => "video", + let source_element = match source.media_type { + MediaType::Audio => { + if Some(pt) == opus_payload_type { + gstreamer::ElementFactory::make("rtpopusdepay", None)? + } + else { + bail!("received audio with unsupported PT {}", pt); + } + }, + MediaType::Video => { + if Some(pt) == h264_payload_type { + let element = gstreamer::ElementFactory::make("rtph264depay", None)?; + element.set_property("request-keyframe", true)?; + element + } + else if Some(pt) == vp8_payload_type { + let element = gstreamer::ElementFactory::make("rtpvp8depay", None)?; + element.set_property("request-keyframe", true)?; + element + } + else if Some(pt) == vp9_payload_type { + let element = gstreamer::ElementFactory::make("rtpvp9depay", None)?; + element.set_property("request-keyframe", true)?; + element + } + else { + bail!("received video with unsupported PT {}", pt); + } + }, }; - if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { - debug!("linking depayloader to participant bin"); - src_pad.link(&sink_pad)?; + + source_element.set_property("auto-header-extension", false)?; + source_element.connect("request-extension", false, move |values| { + let f = || { + let ext_id: u32 = values[1].get()?; + let ext_uri: String = values[2].get()?; + debug!("depayloader requested extension: {} {}", ext_id, ext_uri); + let hdrext = RTPHeaderExtension::create_from_uri(&ext_uri) + .context("failed to create hdrext")?; + hdrext.set_id(ext_id); + if ext_uri == RTP_HDREXT_ABS_SEND_TIME {} + if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL { + } + else if ext_uri == RTP_HDREXT_TRANSPORT_CC { + } + else { + bail!("unknown rtp hdrext: {}", ext_uri); + }; + Ok::<_, anyhow::Error>(hdrext) + }; + match f() { + Ok(hdrext) => Some(hdrext.to_value()), + Err(e) => { + warn!("request-extension: {:?}", e); + None + }, + } + })?; + pipeline + .add(&source_element) + .context("failed to add depayloader to pipeline")?; + source_element.sync_state_with_parent()?; + debug!("created depayloader"); + rtpbin + .link_pads(Some(&pad_name), &source_element, None) + .context(format!("failed to link rtpbin.{} to depayloader", pad_name))?; + debug!("linked rtpbin.{} to depayloader", pad_name); + + let src_pad = source_element + .static_pad("src") + .context("depayloader has no src pad")?; + + if let Some(participant_bin) = + pipeline.by_name(&format!("participant_{}", source.participant_id)) + { + let sink_pad_name = match source.media_type { + MediaType::Audio => "audio", + MediaType::Video => "video", + }; + if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { + debug!("linking depayloader to participant bin"); + src_pad.link(&sink_pad)?; + } + else { + warn!( + "no {} sink pad in {} participant bin", + sink_pad_name, source.participant_id + ); + } } else { - warn!( - "no {} sink pad in {} participant bin", - sink_pad_name, source.participant_id - ); + debug!("no participant bin for {}", source.participant_id); } + + if !src_pad.is_linked() { + debug!("nothing linked to depayloader, adding fakesink"); + let fakesink = gstreamer::ElementFactory::make("fakesink", None)?; + pipeline.add(&fakesink)?; + fakesink.sync_state_with_parent()?; + source_element.link(&fakesink)?; + } + + gstreamer::debug_bin_to_dot_file( + &pipeline, + gstreamer::DebugGraphDetails::ALL, + &format!("ssrc-added-{}", ssrc), + ); + + Ok::<_, anyhow::Error>(()) } else { - debug!("no participant bin for {}", source.participant_id); + Ok(()) } - - if !src_pad.is_linked() { - debug!("nothing linked to {}, adding fakesink", element_name); - let fakesink = gstreamer::ElementFactory::make("fakesink", None)?; - pipeline_.add(&fakesink)?; - fakesink.sync_state_with_parent()?; - source_element.link(&fakesink)?; - } - - gstreamer::debug_bin_to_dot_file( - &pipeline_, - gstreamer::DebugGraphDetails::ALL, - &format!("ssrc-added-{}", ssrc), - ); - - Ok::<_, anyhow::Error>(()) + }; + if let Err(e) = f() { + error!("handling pad-added: {:?}", e); } - else { - Ok(()) - } - }; - if let Err(e) = f() { - error!("handling pad-added: {:?}", e); - } - None - })?; + None + })?; + } let audio_sink_element = gstreamer::ElementFactory::make("rtpopuspay", None)?; audio_sink_element.set_property( @@ -841,13 +861,38 @@ impl JingleSession { )?; audio_sink_element.set_property("min-ptime", 10i64 * 1000 * 1000)?; audio_sink_element.set_property("ssrc", audio_ssrc)?; - let audio_hdrext_supported = if audio_sink_element.has_property("auto-header-extension", None) { - audio_sink_element.set_property("auto-header-extension", true)?; - true - } - else { - false - }; + audio_sink_element.set_property("auto-header-extension", false)?; + audio_sink_element.connect("request-extension", false, move |values| { + let f = || { + let ext_id: u32 = values[1].get()?; + let ext_uri: String = values[2].get()?; + debug!( + "audio payloader requested extension: {} {}", + ext_id, ext_uri + ); + let hdrext = + RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; + hdrext.set_id(ext_id); + if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + } + else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL { + } + else if ext_uri == RTP_HDREXT_TRANSPORT_CC { + // hdrext.set_property("n-streams", 2u32)?; + } + else { + bail!("unknown rtp hdrext: {}", ext_uri); + } + Ok::<_, anyhow::Error>(hdrext) + }; + match f() { + Ok(hdrext) => Some(hdrext.to_value()), + Err(e) => { + warn!("request-extension: {:?}", e); + None + }, + } + })?; pipeline.add(&audio_sink_element)?; let video_sink_element = match conference.config.video_codec.as_str() { @@ -881,13 +926,36 @@ impl JingleSession { other => bail!("unsupported video codec: {}", other), }; video_sink_element.set_property("ssrc", video_ssrc)?; - let video_hdrext_supported = if video_sink_element.has_property("auto-header-extension", None) { - video_sink_element.set_property("auto-header-extension", true)?; - true - } - else { - false - }; + video_sink_element.set_property("auto-header-extension", false)?; + video_sink_element.connect("request-extension", false, move |values| { + let f = || { + let ext_id: u32 = values[1].get()?; + let ext_uri: String = values[2].get()?; + debug!( + "video payloader requested extension: {} {}", + ext_id, ext_uri + ); + let hdrext = + RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?; + hdrext.set_id(ext_id); + if ext_uri == RTP_HDREXT_ABS_SEND_TIME { + } + else if ext_uri == RTP_HDREXT_TRANSPORT_CC { + // hdrext.set_property("n-streams", 2u32)?; + } + else { + bail!("unknown rtp hdrext: {}", ext_uri); + } + Ok::<_, anyhow::Error>(hdrext) + }; + match f() { + Ok(hdrext) => Some(hdrext.to_value()), + Err(e) => { + warn!("request-extension: {:?}", e); + None + }, + } + })?; pipeline.add(&video_sink_element)?; let mut audio_caps = gstreamer::Caps::builder("application/x-rtp"); @@ -903,9 +971,9 @@ impl JingleSession { pipeline.add(&audio_capsfilter)?; let mut video_caps = gstreamer::Caps::builder("application/x-rtp"); - if let Some(hdrext) = video_hdrext_abs_send_time { - video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME); - } + // if let Some(hdrext) = video_hdrext_abs_send_time { + // video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME); + // } if let Some(hdrext) = video_hdrext_transport_cc { video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_TRANSPORT_CC); } @@ -913,19 +981,13 @@ impl JingleSession { video_capsfilter.set_property("caps", video_caps.build())?; pipeline.add(&video_capsfilter)?; - let rtpfunnel = gstreamer::ElementFactory::make("funnel", None)?; - pipeline.add(&rtpfunnel)?; - - debug!("linking audio payloader -> rtp funnel"); - audio_sink_element.link(&audio_capsfilter)?; - audio_capsfilter.link_pads(None, &rtpfunnel, Some("sink_0"))?; - - debug!("linking video payloader -> rtp funnel"); + debug!("linking video payloader -> rtpbin"); video_sink_element.link(&video_capsfilter)?; - video_capsfilter.link_pads(None, &rtpfunnel, Some("sink_1"))?; + video_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?; - debug!("linking rtp funnel -> rtpbin"); - rtpfunnel.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?; + debug!("linking audio payloader -> rtpbin"); + audio_sink_element.link(&audio_capsfilter)?; + audio_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_1"))?; debug!("link dtlssrtpdec -> rtpbin"); dtlssrtpdec.link_pads(Some("rtp_src"), &rtpbin, Some("recv_rtp_sink_0"))?; @@ -934,6 +996,8 @@ impl JingleSession { debug!("linking rtpbin -> dtlssrtpenc"); rtpbin.link_pads(Some("send_rtp_src_0"), &dtlssrtpenc, Some("rtp_sink_0"))?; rtpbin.link_pads(Some("send_rtcp_src_0"), &dtlssrtpenc, Some("rtcp_sink_0"))?; + rtpbin.link_pads(Some("send_rtp_src_1"), &dtlssrtpenc, Some("rtp_sink_1"))?; + rtpbin.link_pads(Some("send_rtcp_src_1"), &dtlssrtpenc, Some("rtcp_sink_1"))?; debug!("linking ice src -> dtlssrtpdec"); nicesrc.link(&dtlssrtpdec)?; @@ -1059,7 +1123,6 @@ impl JingleSession { else { bail!("no vp9 payload type in jingle session-initiate"); } - }, other => bail!("unsupported video codec: {}", other), } @@ -1124,30 +1187,21 @@ impl JingleSession { // } // } if let Some(hdrext) = audio_hdrext_transport_cc { - if audio_hdrext_supported { - description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_TRANSPORT_CC.to_owned())); - } - else { - debug!("transport-cc hdrext requested, but hdrext not supported by audio payloader"); - } + description.hdrexts.push(RtpHdrext::new( + hdrext.to_string(), + RTP_HDREXT_TRANSPORT_CC.to_owned(), + )); } } else if initiate_content.name.0 == "video" { - if let Some(hdrext) = video_hdrext_abs_send_time { - if video_hdrext_supported { - description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_ABS_SEND_TIME.to_owned())); - } - else { - debug!("abs-send-time hdrext requested, but hdrext not supported by video payloader"); - } - } + // if let Some(hdrext) = video_hdrext_abs_send_time { + // description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_ABS_SEND_TIME.to_owned())); + // } if let Some(hdrext) = video_hdrext_transport_cc { - if video_hdrext_supported { - description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_TRANSPORT_CC.to_owned())); - } - else { - debug!("transport-cc hdrext requested, but hdrext not supported by video payloader"); - } + description.hdrexts.push(RtpHdrext::new( + hdrext.to_string(), + RTP_HDREXT_TRANSPORT_CC.to_owned(), + )); } } diff --git a/lib-gst-meet/src/lib.rs b/lib-gst-meet/src/lib.rs index 75b23ad..1ccd5a0 100644 --- a/lib-gst-meet/src/lib.rs +++ b/lib-gst-meet/src/lib.rs @@ -22,4 +22,4 @@ pub fn init_tracing(level: tracing::Level) { .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) .with_target(false) .init(); -} \ No newline at end of file +}