This commit is contained in:
Jasper Hugo 2021-08-25 11:21:43 +07:00
parent a868e7e781
commit 4575371d6d
6 changed files with 432 additions and 332 deletions

View File

@ -1,14 +1,20 @@
use std::{
ffi::{CStr, CString, c_void},
ffi::{c_void, CStr, CString},
os::raw::c_char,
ptr,
sync::{Arc, atomic::{AtomicPtr, Ordering}},
sync::{
atomic::{AtomicPtr, Ordering},
Arc,
},
};
use anyhow::Result;
use glib::{ffi::GMainContext, translate::{from_glib, from_glib_full, ToGlibPtr}};
pub use lib_gst_meet::{init_tracing, JitsiConference, JitsiConnection, MediaType};
use glib::{
ffi::GMainContext,
translate::{from_glib, from_glib_full, ToGlibPtr},
};
use lib_gst_meet::JitsiConferenceConfig;
pub use lib_gst_meet::{init_tracing, JitsiConference, JitsiConnection, MediaType};
use tokio::runtime::Runtime;
pub struct Context {
@ -42,7 +48,7 @@ impl<T> ResultExt<T> for Result<T> {
Err(e) => {
eprintln!("lib-gst-meet: {:?}", e);
ptr::null_mut()
}
},
}
}
}
@ -79,7 +85,10 @@ pub unsafe extern "C" fn gstmeet_connection_new(
let xmpp_domain = CStr::from_ptr(xmpp_domain);
(*context)
.runtime
.block_on(JitsiConnection::new(&websocket_url.to_string_lossy(), &xmpp_domain.to_string_lossy()))
.block_on(JitsiConnection::new(
&websocket_url.to_string_lossy(),
&xmpp_domain.to_string_lossy(),
))
.map(|(connection, background)| {
(*context).runtime.spawn(background);
connection
@ -93,7 +102,10 @@ pub unsafe extern "C" fn gstmeet_connection_free(connection: *mut JitsiConnectio
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_connection_connect(context: *mut Context, connection: *mut JitsiConnection) -> bool {
pub unsafe extern "C" fn gstmeet_connection_connect(
context: *mut Context,
connection: *mut JitsiConnection,
) -> bool {
(*context)
.runtime
.block_on((*connection).connect())
@ -126,8 +138,12 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference(
muc,
focus,
nick: CStr::from_ptr((*config).nick).to_string_lossy().to_string(),
region: CStr::from_ptr((*config).region).to_string_lossy().to_string(),
video_codec: CStr::from_ptr((*config).video_codec).to_string_lossy().to_string(),
region: CStr::from_ptr((*config).region)
.to_string_lossy()
.to_string(),
video_codec: CStr::from_ptr((*config).video_codec)
.to_string_lossy()
.to_string(),
};
(*context)
.runtime
@ -136,7 +152,10 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference(
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_conference_leave(context: *mut Context, conference: *mut JitsiConference) -> bool {
pub unsafe extern "C" fn gstmeet_conference_leave(
context: *mut Context,
conference: *mut JitsiConference,
) -> bool {
(*context)
.runtime
.block_on(Box::from_raw(conference).leave())
@ -145,7 +164,12 @@ pub unsafe extern "C" fn gstmeet_conference_leave(context: *mut Context, confere
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_conference_set_muted(context: *mut Context, conference: *mut JitsiConference, media_type: MediaType, muted: bool) -> bool {
pub unsafe extern "C" fn gstmeet_conference_set_muted(
context: *mut Context,
conference: *mut JitsiConference,
media_type: MediaType,
muted: bool,
) -> bool {
(*context)
.runtime
.block_on((*conference).set_muted(media_type, muted))
@ -154,7 +178,10 @@ pub unsafe extern "C" fn gstmeet_conference_set_muted(context: *mut Context, con
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_conference_pipeline(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstPipeline {
pub unsafe extern "C" fn gstmeet_conference_pipeline(
context: *mut Context,
conference: *mut JitsiConference,
) -> *mut gstreamer::ffi::GstPipeline {
(*context)
.runtime
.block_on((*conference).pipeline())
@ -164,7 +191,10 @@ pub unsafe extern "C" fn gstmeet_conference_pipeline(context: *mut Context, conf
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_conference_audio_sink_element(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstElement {
pub unsafe extern "C" fn gstmeet_conference_audio_sink_element(
context: *mut Context,
conference: *mut JitsiConference,
) -> *mut gstreamer::ffi::GstElement {
(*context)
.runtime
.block_on((*conference).audio_sink_element())
@ -174,7 +204,10 @@ pub unsafe extern "C" fn gstmeet_conference_audio_sink_element(context: *mut Con
}
#[no_mangle]
pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Context, conference: *mut JitsiConference) -> *mut gstreamer::ffi::GstElement {
pub unsafe extern "C" fn gstmeet_conference_video_sink_element(
context: *mut Context,
conference: *mut JitsiConference,
) -> *mut gstreamer::ffi::GstElement {
(*context)
.runtime
.block_on((*conference).video_sink_element())
@ -187,13 +220,16 @@ pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Con
pub unsafe extern "C" fn gstmeet_conference_on_participant(
context: *mut Context,
conference: *mut JitsiConference,
f: unsafe extern "C" fn(*mut JitsiConference, Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin,
f: unsafe extern "C" fn(
*mut JitsiConference,
Participant,
*mut c_void,
) -> *mut gstreamer::ffi::GstBin,
ctx: *mut c_void,
) {
let ctx = Arc::new(AtomicPtr::new(ctx));
(*context)
.runtime
.block_on((*conference).on_participant(move |conference, participant| {
(*context).runtime.block_on(
(*conference).on_participant(move |conference, participant| {
let ctx = ctx.clone();
Box::pin(async move {
let participant = Participant {
@ -205,10 +241,15 @@ pub unsafe extern "C" fn gstmeet_conference_on_participant(
.transpose()?
.unwrap_or_else(ptr::null),
};
f(Box::into_raw(Box::new(conference)), participant, ctx.load(Ordering::Relaxed));
f(
Box::into_raw(Box::new(conference)),
participant,
ctx.load(Ordering::Relaxed),
);
Ok(())
})
}));
}),
);
}
#[no_mangle]

View File

@ -13,7 +13,8 @@ async-trait = { version = "0.1", default-features = false }
bytes = { version = "1", default-features = false, features = ["std"] }
futures = { version = "0.3", default-features = false }
glib = { version = "0.14", default-features = false }
gstreamer = { version = "0.17", default-features = false, features = ["v1_16"] }
gstreamer = { version = "0.17", default-features = false, features = ["v1_20"] }
gstreamer-rtp = { version = "0.17", default-features = false, features = ["v1_20"] }
hex = { version = "0.4", default-features = false, features = ["std"] }
itertools = { version = "0.10", default-features = false, features = ["use_std"] }
libc = { version = "0.2", default-features = false }

View File

@ -20,10 +20,7 @@ pub enum ColibriMessage {
previous_speakers: Vec<String>,
},
#[serde(rename_all = "camelCase")]
EndpointConnectivityStatusChangeEvent {
endpoint: String,
active: bool,
},
EndpointConnectivityStatusChangeEvent { endpoint: String, active: bool },
#[serde(rename_all = "camelCase")]
EndpointMessage {
from: String,
@ -42,17 +39,11 @@ pub enum ColibriMessage {
max_enabled_resolution: u16,
},
#[serde(rename_all = "camelCase")]
LastNChangedEvent {
last_n: u16,
},
LastNChangedEvent { last_n: u16 },
#[serde(rename_all = "camelCase")]
LastNEndpointsChangeEvent {
last_n_endpoints: Vec<String>,
},
LastNEndpointsChangeEvent { last_n_endpoints: Vec<String> },
#[serde(rename_all = "camelCase")]
ReceiverVideoConstraint {
max_frame_height: u16,
},
ReceiverVideoConstraint { max_frame_height: u16 },
#[serde(rename_all = "camelCase")]
ReceiverVideoConstraints {
last_n: Option<u16>,
@ -62,21 +53,13 @@ pub enum ColibriMessage {
constraints: Option<HashMap<String, Constraints>>,
},
#[serde(rename_all = "camelCase")]
SelectedEndpointsChangedEvent {
selected_endpoints: Vec<String>,
},
SelectedEndpointsChangedEvent { selected_endpoints: Vec<String> },
#[serde(rename_all = "camelCase")]
SenderVideoConstraints {
video_constraints: Constraints,
},
SenderVideoConstraints { video_constraints: Constraints },
#[serde(rename_all = "camelCase")]
ServerHello {
version: Option<String>,
},
ServerHello { version: Option<String> },
#[serde(rename_all = "camelCase")]
VideoTypeMessage {
video_type: VideoType,
},
VideoTypeMessage { video_type: VideoType },
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
@ -124,10 +107,8 @@ pub(crate) struct ColibriChannel {
impl ColibriChannel {
pub(crate) async fn new(colibri_url: &str) -> Result<Self> {
let request =
Request::get(colibri_url).body(())?;
let (colibri_websocket, _response) =
tokio_tungstenite::connect_async(request).await?;
let request = Request::get(colibri_url).body(())?;
let (colibri_websocket, _response) = tokio_tungstenite::connect_async(request).await?;
info!("Connected Colibri WebSocket");
@ -151,11 +132,17 @@ impl ColibriChannel {
}
}
},
Err(e) => warn!("failed to parse frame on colibri websocket: {:?}\nframe: {}", e, text),
Err(e) => warn!(
"failed to parse frame on colibri websocket: {:?}\nframe: {}",
e, text
),
}
},
Message::Binary(data) => debug!("received unexpected {} byte binary frame on colibri websocket", data.len()),
Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite
Message::Binary(data) => debug!(
"received unexpected {} byte binary frame on colibri websocket",
data.len()
),
Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite
Message::Close(_) => {
debug!("received close frame on colibri websocket");
// TODO reconnect
@ -194,10 +181,7 @@ impl ColibriChannel {
};
});
Ok(Self {
send_tx,
recv_tx,
})
Ok(Self { send_tx, recv_tx })
}
pub(crate) async fn subscribe(&self, tx: mpsc::Sender<ColibriMessage>) {

View File

@ -34,10 +34,9 @@ static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| {
Feature::new(ns::JINGLE_RTP_VIDEO),
Feature::new(ns::JINGLE_ICE_UDP),
Feature::new(ns::JINGLE_DTLS),
Feature::new("urn:ietf:rfc:5888"), // BUNDLE
Feature::new("urn:ietf:rfc:5761"), // RTCP-MUX
Feature::new("urn:ietf:rfc:4588"), // RTX
Feature::new("urn:ietf:rfc:5888"), // BUNDLE
Feature::new("urn:ietf:rfc:5761"), // RTCP-MUX
Feature::new("urn:ietf:rfc:4588"), // RTX
];
let gst_version = gstreamer::version();
if gst_version.0 >= 1 && gst_version.1 >= 19 {
@ -105,9 +104,12 @@ type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
pub(crate) struct JitsiConferenceInner {
pub(crate) jingle_session: Option<JingleSession>,
participants: HashMap<String, Participant>,
on_participant: Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_participant_left: Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_colibri_message: Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>,
on_participant:
Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_participant_left:
Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_colibri_message:
Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>,
state: JitsiConferenceState,
connected_tx: Option<oneshot::Sender<()>>,
connected_rx: Option<oneshot::Receiver<()>>,
@ -292,7 +294,10 @@ impl JitsiConference {
}
#[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) {
pub async fn on_participant(
&self,
f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static,
) {
let f = Arc::new(f);
let f2 = f.clone();
let existing_participants: Vec<_> = {
@ -312,12 +317,18 @@ impl JitsiConference {
}
#[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant_left(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) {
pub async fn on_participant_left(
&self,
f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static,
) {
self.inner.lock().await.on_participant_left = Some(Arc::new(f));
}
#[tracing::instrument(level = "trace", skip(f))]
pub async fn on_colibri_message(&self, f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static) {
pub async fn on_colibri_message(
&self,
f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static,
) {
self.inner.lock().await.on_colibri_message = Some(Arc::new(f));
}
}
@ -472,7 +483,11 @@ impl StanzaFilter for JitsiConference {
let colibri_channel = ColibriChannel::new(&colibri_url).await?;
let (tx, rx) = mpsc::channel(8);
colibri_channel.subscribe(tx).await;
locked_inner.jingle_session.as_mut().unwrap().colibri_channel = Some(colibri_channel);
locked_inner
.jingle_session
.as_mut()
.unwrap()
.colibri_channel = Some(colibri_channel);
let self_ = self.clone();
tokio::spawn(async move {
@ -523,7 +538,12 @@ impl StanzaFilter for JitsiConference {
muc_jid: from.clone(),
nick: item.nick,
};
if presence.type_ == presence::Type::Unavailable && locked_inner.participants.remove(&from.resource.clone()).is_some() {
if presence.type_ == presence::Type::Unavailable
&& locked_inner
.participants
.remove(&from.resource.clone())
.is_some()
{
debug!("participant left: {:?}", jid);
if let Some(f) = &locked_inner.on_participant_left {
debug!("calling on_participant_left with old participant");

View File

@ -4,16 +4,13 @@ use anyhow::{anyhow, bail, Context, Result};
use futures::stream::StreamExt;
use glib::{ObjectExt, ToValue};
use gstreamer::prelude::{ElementExt, GObjectExtManualGst, GstBinExt, PadExt};
use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension};
use nice_gst_meet as nice;
use pem::Pem;
use rand::random;
use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256};
use ring::digest::{digest, SHA256};
use tokio::{
net::lookup_host,
runtime::Handle,
sync::oneshot,
};
use tokio::{net::lookup_host, runtime::Handle, sync::oneshot};
use tracing::{debug, error, warn};
use uuid::Uuid;
use xmpp_parsers::{
@ -37,7 +34,8 @@ use crate::{
const RTP_HDREXT_SSRC_AUDIO_LEVEL: &str = "urn:ietf:params:rtp-hdrext:ssrc-audio-level";
const RTP_HDREXT_ABS_SEND_TIME: &str = "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time";
const RTP_HDREXT_TRANSPORT_CC: &str = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
const RTP_HDREXT_TRANSPORT_CC: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
const DEFAULT_STUN_PORT: u16 = 3478;
const DEFAULT_TURNS_PORT: u16 = 5349;
@ -162,14 +160,12 @@ impl JingleSession {
.iter()
.find(|pt| {
pt.name.as_deref() == Some("rtx")
&&
pt
.parameters
.iter()
.any(|param| {
&& pt.parameters.iter().any(|param| {
param.name == "apt"
&&
param.value == h264_payload_type.map(|pt| pt.to_string()).unwrap_or_default()
&& param.value
== h264_payload_type
.map(|pt| pt.to_string())
.unwrap_or_default()
})
})
.map(|pt| pt.id);
@ -188,14 +184,12 @@ impl JingleSession {
.iter()
.find(|pt| {
pt.name.as_deref() == Some("rtx")
&&
pt
.parameters
.iter()
.any(|param| {
&& pt.parameters.iter().any(|param| {
param.name == "apt"
&&
param.value == vp8_payload_type.map(|pt| pt.to_string()).unwrap_or_default()
&& param.value
== vp8_payload_type
.map(|pt| pt.to_string())
.unwrap_or_default()
})
})
.map(|pt| pt.id);
@ -214,14 +208,12 @@ impl JingleSession {
.iter()
.find(|pt| {
pt.name.as_deref() == Some("rtx")
&&
pt
.parameters
.iter()
.any(|param| {
&& pt.parameters.iter().any(|param| {
param.name == "apt"
&&
param.value == vp9_payload_type.map(|pt| pt.to_string()).unwrap_or_default()
&& param.value
== vp9_payload_type
.map(|pt| pt.to_string())
.unwrap_or_default()
})
})
.map(|pt| pt.id);
@ -510,50 +502,59 @@ impl JingleSession {
}
Ok::<_, anyhow::Error>(Some(caps.build()))
}
else if Some(pt) == h264_payload_type || Some(pt) == vp8_payload_type || Some(pt) == vp9_payload_type {
else if Some(pt) == h264_payload_type
|| Some(pt) == vp8_payload_type
|| Some(pt) == vp9_payload_type
{
caps = caps
.field("media", "video")
.field("clock-rate", 90000)
.field("encoding-name", if Some(pt) == h264_payload_type {
"H264"
}
else if Some(pt) == vp8_payload_type {
"VP8"
}
else if Some(pt) == vp9_payload_type {
"VP9"
}
else {
unreachable!()
});
if let Some(hdrext) = video_hdrext_abs_send_time {
caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME);
}
.field(
"encoding-name",
if Some(pt) == h264_payload_type {
"H264"
}
else if Some(pt) == vp8_payload_type {
"VP8"
}
else if Some(pt) == vp9_payload_type {
"VP9"
}
else {
unreachable!()
},
);
// if let Some(hdrext) = video_hdrext_abs_send_time {
// caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME);
// }
if let Some(hdrext) = video_hdrext_transport_cc {
caps = caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_TRANSPORT_CC);
}
Ok(Some(caps.build()))
}
else if Some(pt) == vp8_rtx_payload_type || Some(pt) == vp9_rtx_payload_type || Some(pt) == h264_rtx_payload_type {
else if Some(pt) == vp8_rtx_payload_type
|| Some(pt) == vp9_rtx_payload_type
|| Some(pt) == h264_rtx_payload_type
{
caps = caps
.field("media", "video")
.field("clock-rate", 90000)
.field("encoding-name", "RTX")
.field("apt", if Some(pt) == vp8_rtx_payload_type {
vp8_payload_type
.context("missing VP8 payload type")?
}
else if Some(pt) == vp9_rtx_payload_type {
vp9_payload_type
.context("missing VP9 payload type")?
}
else if Some(pt) == h264_rtx_payload_type {
h264_payload_type
.context("missing H264 payload type")?
}
else {
unreachable!()
});
.field(
"apt",
if Some(pt) == vp8_rtx_payload_type {
vp8_payload_type.context("missing VP8 payload type")?
}
else if Some(pt) == vp9_rtx_payload_type {
vp9_payload_type.context("missing VP9 payload type")?
}
else if Some(pt) == h264_rtx_payload_type {
h264_payload_type.context("missing H264 payload type")?
}
else {
unreachable!()
},
);
Ok(Some(caps.build()))
}
else {
@ -583,7 +584,10 @@ impl JingleSession {
let rtpjitterbuffer: gstreamer::Element = values[1].get()?;
let session: u32 = values[2].get()?;
let ssrc: u32 = values[3].get()?;
debug!("new jitterbuffer created for session {} ssrc {}", session, ssrc);
debug!(
"new jitterbuffer created for session {} ssrc {}",
session, ssrc
);
let source = handle.block_on(async move {
let locked_inner = inner_.lock().await;
@ -633,22 +637,18 @@ impl JingleSession {
rtx_sender.set_property("payload-type-map", pt_map.build())?;
rtx_sender.set_property("ssrc-map", ssrc_map.build())?;
bin.add(&rtx_sender)?;
bin.add_pad(
&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_sender
.static_pad("src")
.context("rtprtxsend has no src pad")?,
)?,
)?;
bin.add_pad(
&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_sender
.static_pad("sink")
.context("rtprtxsend has no sink pad")?,
)?,
)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_sender
.static_pad("src")
.context("rtprtxsend has no src pad")?,
)?)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_sender
.static_pad("sink")
.context("rtprtxsend has no sink pad")?,
)?)?;
Ok::<_, anyhow::Error>(Some(bin.to_value()))
};
match f() {
@ -678,20 +678,18 @@ impl JingleSession {
let rtx_receiver = gstreamer::ElementFactory::make("rtprtxreceive", None)?;
rtx_receiver.set_property("payload-type-map", pt_map.build())?;
bin.add(&rtx_receiver)?;
bin.add_pad(
&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_receiver.static_pad("src")
.context("rtprtxreceive has no src pad")?,
)?,
)?;
bin.add_pad(
&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_receiver.static_pad("sink")
.context("rtprtxreceive has no sink pad")?,
)?,
)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("src_{}", session)),
&rtx_receiver
.static_pad("src")
.context("rtprtxreceive has no src pad")?,
)?)?;
bin.add_pad(&gstreamer::GhostPad::with_target(
Some(&format!("sink_{}", session)),
&rtx_receiver
.static_pad("sink")
.context("rtprtxreceive has no sink pad")?,
)?)?;
Ok::<_, anyhow::Error>(Some(bin.to_value()))
};
match f() {
@ -703,136 +701,158 @@ impl JingleSession {
}
})?;
let handle = Handle::current();
let inner_ = conference.inner.clone();
let pipeline_ = pipeline.clone();
let rtpbin_ = rtpbin.clone();
rtpbin.connect("pad-added", false, move |values| {
let inner_ = inner_.clone();
let handle = handle.clone();
let pipeline_ = pipeline_.clone();
let rtpbin_ = rtpbin_.clone();
let f = move || {
debug!("rtpbin pad-added {:?}", values);
let pad: gstreamer::Pad = values[1].get()?;
let pad_name: String = pad.property("name")?.get()?;
if pad_name.starts_with("recv_rtp_src_0_") {
let mut parts = pad_name.split('_').skip(4);
let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?;
let pt: u8 = parts.next().context("malformed pad name")?.parse()?;
let source = handle.block_on(async move {
let locked_inner = inner_.lock().await;
let jingle_session = locked_inner
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?;
Ok::<_, anyhow::Error>(
jingle_session
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
.clone(),
)
})?;
{
let handle = Handle::current();
let inner = conference.inner.clone();
let pipeline = pipeline.clone();
let rtpbin_ = rtpbin.clone();
rtpbin.connect("pad-added", false, move |values| {
let rtpbin = &rtpbin_;
let f = || {
debug!("rtpbin pad-added {:?}", values);
let pad: gstreamer::Pad = values[1].get()?;
let pad_name: String = pad.property("name")?.get()?;
if pad_name.starts_with("recv_rtp_src_0_") {
let mut parts = pad_name.split('_').skip(4);
let ssrc: u32 = parts.next().context("malformed pad name")?.parse()?;
let pt: u8 = parts.next().context("malformed pad name")?.parse()?;
let source = handle.block_on(async {
let locked_inner = inner.lock().await;
let jingle_session = locked_inner
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?;
Ok::<_, anyhow::Error>(
jingle_session
.remote_ssrc_map
.get(&ssrc)
.context(format!("unknown ssrc: {}", ssrc))?
.clone(),
)
})?;
debug!("pad added for remote source: {:?}", source);
debug!("pad added for remote source: {:?}", source);
let element_name = match source.media_type {
MediaType::Audio => {
if Some(pt) == opus_payload_type {
"rtpopusdepay"
}
else {
bail!("received audio with unsupported PT {}", pt);
}
},
MediaType::Video => {
if Some(pt) == h264_payload_type {
"rtph264depay"
}
else if Some(pt) == vp8_payload_type {
"rtpvp8depay"
}
else if Some(pt) == vp9_payload_type {
"rtpvp9depay"
}
else {
bail!("received video with unsupported PT {}", pt);
}
},
};
let source_element = gstreamer::ElementFactory::make(element_name, None)?;
if source_element.has_property("auto-header-extension", None) {
source_element.set_property("auto-header-extension", true)?;
}
if source_element.has_property("request-keyframe", None) {
source_element.set_property("request-keyframe", true)?;
}
pipeline_
.add(&source_element)
.context(format!("failed to add {} to pipeline", element_name))?;
source_element.sync_state_with_parent()?;
debug!("created {} element", element_name);
rtpbin_
.link_pads(Some(&pad_name), &source_element, None)
.context(format!(
"failed to link rtpbin.{} to {}",
pad_name, element_name
))?;
debug!("linked rtpbin.{} to {}", pad_name, element_name);
let src_pad = source_element
.static_pad("src")
.context("depayloader has no src pad")?;
if let Some(participant_bin) =
pipeline_.by_name(&format!("participant_{}", source.participant_id))
{
let sink_pad_name = match source.media_type {
MediaType::Audio => "audio",
MediaType::Video => "video",
let source_element = match source.media_type {
MediaType::Audio => {
if Some(pt) == opus_payload_type {
gstreamer::ElementFactory::make("rtpopusdepay", None)?
}
else {
bail!("received audio with unsupported PT {}", pt);
}
},
MediaType::Video => {
if Some(pt) == h264_payload_type {
let element = gstreamer::ElementFactory::make("rtph264depay", None)?;
element.set_property("request-keyframe", true)?;
element
}
else if Some(pt) == vp8_payload_type {
let element = gstreamer::ElementFactory::make("rtpvp8depay", None)?;
element.set_property("request-keyframe", true)?;
element
}
else if Some(pt) == vp9_payload_type {
let element = gstreamer::ElementFactory::make("rtpvp9depay", None)?;
element.set_property("request-keyframe", true)?;
element
}
else {
bail!("received video with unsupported PT {}", pt);
}
},
};
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
debug!("linking depayloader to participant bin");
src_pad.link(&sink_pad)?;
source_element.set_property("auto-header-extension", false)?;
source_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!("depayloader requested extension: {} {}", ext_id, ext_uri);
let hdrext = RTPHeaderExtension::create_from_uri(&ext_uri)
.context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {}
if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
};
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
})?;
pipeline
.add(&source_element)
.context("failed to add depayloader to pipeline")?;
source_element.sync_state_with_parent()?;
debug!("created depayloader");
rtpbin
.link_pads(Some(&pad_name), &source_element, None)
.context(format!("failed to link rtpbin.{} to depayloader", pad_name))?;
debug!("linked rtpbin.{} to depayloader", pad_name);
let src_pad = source_element
.static_pad("src")
.context("depayloader has no src pad")?;
if let Some(participant_bin) =
pipeline.by_name(&format!("participant_{}", source.participant_id))
{
let sink_pad_name = match source.media_type {
MediaType::Audio => "audio",
MediaType::Video => "video",
};
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
debug!("linking depayloader to participant bin");
src_pad.link(&sink_pad)?;
}
else {
warn!(
"no {} sink pad in {} participant bin",
sink_pad_name, source.participant_id
);
}
}
else {
warn!(
"no {} sink pad in {} participant bin",
sink_pad_name, source.participant_id
);
debug!("no participant bin for {}", source.participant_id);
}
if !src_pad.is_linked() {
debug!("nothing linked to depayloader, adding fakesink");
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
pipeline.add(&fakesink)?;
fakesink.sync_state_with_parent()?;
source_element.link(&fakesink)?;
}
gstreamer::debug_bin_to_dot_file(
&pipeline,
gstreamer::DebugGraphDetails::ALL,
&format!("ssrc-added-{}", ssrc),
);
Ok::<_, anyhow::Error>(())
}
else {
debug!("no participant bin for {}", source.participant_id);
Ok(())
}
if !src_pad.is_linked() {
debug!("nothing linked to {}, adding fakesink", element_name);
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
pipeline_.add(&fakesink)?;
fakesink.sync_state_with_parent()?;
source_element.link(&fakesink)?;
}
gstreamer::debug_bin_to_dot_file(
&pipeline_,
gstreamer::DebugGraphDetails::ALL,
&format!("ssrc-added-{}", ssrc),
);
Ok::<_, anyhow::Error>(())
};
if let Err(e) = f() {
error!("handling pad-added: {:?}", e);
}
else {
Ok(())
}
};
if let Err(e) = f() {
error!("handling pad-added: {:?}", e);
}
None
})?;
None
})?;
}
let audio_sink_element = gstreamer::ElementFactory::make("rtpopuspay", None)?;
audio_sink_element.set_property(
@ -841,13 +861,38 @@ impl JingleSession {
)?;
audio_sink_element.set_property("min-ptime", 10i64 * 1000 * 1000)?;
audio_sink_element.set_property("ssrc", audio_ssrc)?;
let audio_hdrext_supported = if audio_sink_element.has_property("auto-header-extension", None) {
audio_sink_element.set_property("auto-header-extension", true)?;
true
}
else {
false
};
audio_sink_element.set_property("auto-header-extension", false)?;
audio_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"audio payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
}
else if ext_uri == RTP_HDREXT_SSRC_AUDIO_LEVEL {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
})?;
pipeline.add(&audio_sink_element)?;
let video_sink_element = match conference.config.video_codec.as_str() {
@ -881,13 +926,36 @@ impl JingleSession {
other => bail!("unsupported video codec: {}", other),
};
video_sink_element.set_property("ssrc", video_ssrc)?;
let video_hdrext_supported = if video_sink_element.has_property("auto-header-extension", None) {
video_sink_element.set_property("auto-header-extension", true)?;
true
}
else {
false
};
video_sink_element.set_property("auto-header-extension", false)?;
video_sink_element.connect("request-extension", false, move |values| {
let f = || {
let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?;
debug!(
"video payloader requested extension: {} {}",
ext_id, ext_uri
);
let hdrext =
RTPHeaderExtension::create_from_uri(&ext_uri).context("failed to create hdrext")?;
hdrext.set_id(ext_id);
if ext_uri == RTP_HDREXT_ABS_SEND_TIME {
}
else if ext_uri == RTP_HDREXT_TRANSPORT_CC {
// hdrext.set_property("n-streams", 2u32)?;
}
else {
bail!("unknown rtp hdrext: {}", ext_uri);
}
Ok::<_, anyhow::Error>(hdrext)
};
match f() {
Ok(hdrext) => Some(hdrext.to_value()),
Err(e) => {
warn!("request-extension: {:?}", e);
None
},
}
})?;
pipeline.add(&video_sink_element)?;
let mut audio_caps = gstreamer::Caps::builder("application/x-rtp");
@ -903,9 +971,9 @@ impl JingleSession {
pipeline.add(&audio_capsfilter)?;
let mut video_caps = gstreamer::Caps::builder("application/x-rtp");
if let Some(hdrext) = video_hdrext_abs_send_time {
video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME);
}
// if let Some(hdrext) = video_hdrext_abs_send_time {
// video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_ABS_SEND_TIME);
// }
if let Some(hdrext) = video_hdrext_transport_cc {
video_caps = video_caps.field(&format!("extmap-{}", hdrext), &RTP_HDREXT_TRANSPORT_CC);
}
@ -913,19 +981,13 @@ impl JingleSession {
video_capsfilter.set_property("caps", video_caps.build())?;
pipeline.add(&video_capsfilter)?;
let rtpfunnel = gstreamer::ElementFactory::make("funnel", None)?;
pipeline.add(&rtpfunnel)?;
debug!("linking audio payloader -> rtp funnel");
audio_sink_element.link(&audio_capsfilter)?;
audio_capsfilter.link_pads(None, &rtpfunnel, Some("sink_0"))?;
debug!("linking video payloader -> rtp funnel");
debug!("linking video payloader -> rtpbin");
video_sink_element.link(&video_capsfilter)?;
video_capsfilter.link_pads(None, &rtpfunnel, Some("sink_1"))?;
video_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?;
debug!("linking rtp funnel -> rtpbin");
rtpfunnel.link_pads(None, &rtpbin, Some("send_rtp_sink_0"))?;
debug!("linking audio payloader -> rtpbin");
audio_sink_element.link(&audio_capsfilter)?;
audio_capsfilter.link_pads(None, &rtpbin, Some("send_rtp_sink_1"))?;
debug!("link dtlssrtpdec -> rtpbin");
dtlssrtpdec.link_pads(Some("rtp_src"), &rtpbin, Some("recv_rtp_sink_0"))?;
@ -934,6 +996,8 @@ impl JingleSession {
debug!("linking rtpbin -> dtlssrtpenc");
rtpbin.link_pads(Some("send_rtp_src_0"), &dtlssrtpenc, Some("rtp_sink_0"))?;
rtpbin.link_pads(Some("send_rtcp_src_0"), &dtlssrtpenc, Some("rtcp_sink_0"))?;
rtpbin.link_pads(Some("send_rtp_src_1"), &dtlssrtpenc, Some("rtp_sink_1"))?;
rtpbin.link_pads(Some("send_rtcp_src_1"), &dtlssrtpenc, Some("rtcp_sink_1"))?;
debug!("linking ice src -> dtlssrtpdec");
nicesrc.link(&dtlssrtpdec)?;
@ -1059,7 +1123,6 @@ impl JingleSession {
else {
bail!("no vp9 payload type in jingle session-initiate");
}
},
other => bail!("unsupported video codec: {}", other),
}
@ -1124,30 +1187,21 @@ impl JingleSession {
// }
// }
if let Some(hdrext) = audio_hdrext_transport_cc {
if audio_hdrext_supported {
description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_TRANSPORT_CC.to_owned()));
}
else {
debug!("transport-cc hdrext requested, but hdrext not supported by audio payloader");
}
description.hdrexts.push(RtpHdrext::new(
hdrext.to_string(),
RTP_HDREXT_TRANSPORT_CC.to_owned(),
));
}
}
else if initiate_content.name.0 == "video" {
if let Some(hdrext) = video_hdrext_abs_send_time {
if video_hdrext_supported {
description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_ABS_SEND_TIME.to_owned()));
}
else {
debug!("abs-send-time hdrext requested, but hdrext not supported by video payloader");
}
}
// if let Some(hdrext) = video_hdrext_abs_send_time {
// description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_ABS_SEND_TIME.to_owned()));
// }
if let Some(hdrext) = video_hdrext_transport_cc {
if video_hdrext_supported {
description.hdrexts.push(RtpHdrext::new(hdrext.to_string(), RTP_HDREXT_TRANSPORT_CC.to_owned()));
}
else {
debug!("transport-cc hdrext requested, but hdrext not supported by video payloader");
}
description.hdrexts.push(RtpHdrext::new(
hdrext.to_string(),
RTP_HDREXT_TRANSPORT_CC.to_owned(),
));
}
}