Automatically decode audio & video rather than requiring the pipeline to

This commit is contained in:
Jasper Hugo 2022-03-09 13:33:10 +07:00 committed by Jasper
parent d7e2320fe5
commit ea73eec7ee
6 changed files with 238 additions and 86 deletions

View File

@ -40,9 +40,13 @@ Install the dependencies described above, along with their `-dev` packages if yo
You can pass two different pipeline fragments to gst-meet. You can pass two different pipeline fragments to gst-meet.
`--send-pipeline` is for sending audio and video. If it contains an element named `audio`, this audio will be streamed to the conference. The audio codec must be 48kHz Opus. If it contains an element named `video`, this video will be streamed to the conference. The video codec must match the codec passed to `--video-codec`, which is VP8 by default. `--send-pipeline` is for sending audio and video. If it contains an element named `audio`, this audio will be streamed to the conference. The audio codec must be 48kHz Opus. If it contains an element named `video`, this video will be streamed to the conference. The video codec must match the codec passed to `--video-codec`, which is VP9 by default.
`--recv-pipeline-participant-template` is for receiving audio and video from other participants. This pipeline will be created once for each other participant in the conference. If it contains an element named `audio`, the participant's audio (48kHz Opus) will be sent to that element. If it contains an element named `video`, the participant's video will be sent to that element. The strings `{jid}`, `{jid_user}`, `{participant_id}` and `{nick}` are replaced in the template with the participant's full JID, user part, MUC JID resource part (a.k.a. participant/occupant ID) and nickname respectively. `--recv-pipeline` is for receiving audio and video, if you want a single pipeline to handle all participants. If it contains an element named `audio`, a sink pad is requested on that element for each new participant, and decoded audio is sent to that pad. Similarly, if it contains an element named `video`, a sink pad is requred on that element for each new participant, and decoded & scaled video is sent to that pad.
`--recv-pipeline-participant-template` is for receiving audio and video, if you want a separate pipeline for each participant. This pipeline will be created once for each other participant in the conference. If it contains an element named `audio`, the participant's decoded audio will be sent to that element. If it contains an element named `video`, the participant's decoded & scaled video will be sent to that element. The strings `{jid}`, `{jid_user}`, `{participant_id}` and `{nick}` are replaced in the template with the participant's full JID, user part, MUC JID resource part (a.k.a. participant/occupant ID) and nickname respectively.
You can use `--recv-pipeline` and `--recv-pipeline-participant-template` together, for example to handle all the audio with a single `audiomixer` element but handle each video stream separately. If an `audio` or `video` element is found in both `--recv-pipeline` and `--recv-pipeline-participant-template`, then the one in `--recv-pipeline` is used.
## Examples ## Examples
@ -66,7 +70,7 @@ gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
--send-pipeline="filesrc location=shake-it-off.flac ! queue ! flacdec ! audioconvert ! audioresample ! opusenc name=audio" --send-pipeline="filesrc location=shake-it-off.flac ! queue ! flacdec ! audioconvert ! audioresample ! opusenc name=audio"
``` ```
Stream a .webm file containing VP8 video and Vorbis audio to the conference. This pipeline passes the VP8 stream through efficiently without transcoding, and transcodes the audio from Vorbis to Opus: Stream a .webm file containing VP9 video and Vorbis audio to the conference. This pipeline passes the VP9 stream through efficiently without transcoding, and transcodes the audio from Vorbis to Opus:
``` ```
gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \ gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
@ -76,35 +80,28 @@ gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
demuxer.audio_0 ! queue ! vorbisdec ! audioconvert ! audioresample ! opusenc name=audio" demuxer.audio_0 ! queue ! vorbisdec ! audioconvert ! audioresample ! opusenc name=audio"
``` ```
Stream the default video & audio inputs to the conference, encoding as VP8 and Opus, display incoming video streams at 360p, and play back incoming audio (a very basic, but completely native, Jitsi Meet conference!): Stream the default video & audio inputs to the conference, encoding as VP9 and Opus, display up to two remote participants' video streams composited side-by-side at 360p each, and play back all incoming audio mixed together (a very basic, but completely native, Jitsi Meet conference!):
``` ```
gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \ gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
--room-name=roomname \ --room-name=roomname \
--send-pipeline="autovideosrc ! queue ! videoconvert ! vp8enc buffer-size=1000 deadline=1 name=video --recv-video-scale-width=640 \
--recv-video-scale-height=360 \
--send-pipeline="autovideosrc ! queue ! videoscale ! video/x-raw,width=640,height=360 ! videoconvert ! vp9enc buffer-size=1000 deadline=1 name=video
autoaudiosrc ! queue ! audioconvert ! audioresample ! opusenc name=audio" \ autoaudiosrc ! queue ! audioconvert ! audioresample ! opusenc name=audio" \
--recv-pipeline-participant-template="opusdec name=audio ! autoaudiosink --recv-pipeline="audiomixer name=audio ! autoaudiosink
vp8dec name=video ! videoconvert ! videoscale ! video/x-raw,width=640,height=360 ! autovideosink" compositor name=video sink_1::xpos=640 ! autovideosink"
``` ```
Record a .webm file for each other participant, containing VP8 video and Opus audio, without needing to do any transcoding: Record a .webm file for each other participant, containing VP9 video and Opus audio:
``` ```
gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \ gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
--room-name=roomname \ --room-name=roomname \
--recv-pipeline-participant-template="webmmux name=muxer ! filesink location={participant_id}.webm --video-codec=vp9 \
opusparse name=audio ! muxer.audio_0 --recv-pipeline-participant-template="webmmux name=muxer ! queue ! filesink location={participant_id}.webm
identity name=video ! muxer.video_0" opusenc name=audio ! muxer.audio_0
``` vp9enc name=video ! muxer.video_0"
The above .webm file unfortunately may not play in most players, because the resolution will change whenever JVB changes which simulcast layer it is forwarding. You could post-process the recording to scale the lower resolution frames up, or you could do the decoding, scaling, and re-encoding at recording time as follows:
```
gst-meet --web-socket-url=wss://your.jitsi.domain/xmpp-websocket \
--room-name=roomname \
--recv-pipeline-participant-template="webmmux name=muxer ! filesink location={participant_id}.webm
opusparse name=audio ! muxer.audio_0
vp8dec name=video ! videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! vp8enc ! muxer.video_0"
``` ```
Play a YouTube video in the conference. By requesting Opus audio and VP9 video from YouTube, and setting the Jitsi Meet video codec to VP9, no transcoding is necessary. Note that not every YouTube video has VP9 and Opus available, so the pipeline may need adjusting for other videos. Play a YouTube video in the conference. By requesting Opus audio and VP9 video from YouTube, and setting the Jitsi Meet video codec to VP9, no transcoding is necessary. Note that not every YouTube video has VP9 and Opus available, so the pipeline may need adjusting for other videos.

View File

@ -49,8 +49,8 @@ struct Opt {
#[structopt( #[structopt(
long, long,
default_value = "vp8", default_value = "vp9",
help = "The video codec to negotiate support for. One of: vp8, vp9, h264", help = "The video codec to negotiate support for. One of: vp9, vp8, h264",
)] )]
video_codec: String, video_codec: String,
@ -63,7 +63,16 @@ struct Opt {
#[structopt(long)] #[structopt(long)]
send_pipeline: Option<String>, send_pipeline: Option<String>,
#[structopt(long)] #[structopt(
long,
help = "A GStreamer pipeline which will be instantiated at startup. If an element named 'audio' is found, every remote participant's audio will be linked to it (and any 'audio' element in the recv-pipeline-participant-template will be ignored). If an element named 'video' is found, every remote participant's video will be linked to it (and any 'video' element in the recv-pipeline-participant-template will be ignored)."
)]
recv_pipeline: Option<String>,
#[structopt(
long,
help = "A GStreamer pipeline which will be instantiated for each remote participant. If an element named 'audio' is found, the participant's audio will be linked to it. If an element named 'video' is found, the participant's video will be linked to it."
)]
recv_pipeline_participant_template: Option<String>, recv_pipeline_participant_template: Option<String>,
#[structopt( #[structopt(
@ -80,15 +89,10 @@ struct Opt {
#[structopt( #[structopt(
long, long,
help = "The maximum height to receive video at." default_value = "720",
)]
recv_video_height: Option<u16>,
#[structopt(
long,
help = "The maximum height we plan to send video at (used for stats only)." help = "The maximum height we plan to send video at (used for stats only)."
)] )]
send_video_height: Option<u16>, send_video_height: u16,
#[structopt( #[structopt(
long, long,
@ -96,6 +100,20 @@ struct Opt {
)] )]
video_type: Option<String>, video_type: Option<String>,
#[structopt(
long,
default_value = "1280",
help = "The width to scale received video to before passing it to the recv-pipeline.",
)]
recv_video_scale_width: u16,
#[structopt(
long,
default_value = "720",
help = "The height to scale received video to before passing it to the recv-pipeline. This will also be signalled as the maximum height that JVB should send video to us at.",
)]
recv_video_scale_height: u16,
#[structopt( #[structopt(
long, long,
default_value = "200", default_value = "200",
@ -184,7 +202,7 @@ async fn main_inner() -> Result<()> {
init_gstreamer()?; init_gstreamer()?;
// Parse pipeline early so that we don't bother connecting to the conference if it's invalid. // Parse pipelines early so that we don't bother connecting to the conference if it's invalid.
let send_pipeline = opt let send_pipeline = opt
.send_pipeline .send_pipeline
@ -193,6 +211,13 @@ async fn main_inner() -> Result<()> {
.transpose() .transpose()
.context("failed to parse send pipeline")?; .context("failed to parse send pipeline")?;
let recv_pipeline = opt
.recv_pipeline
.as_ref()
.map(|pipeline| gstreamer::parse_bin_from_description(pipeline, false))
.transpose()
.context("failed to parse recv pipeline")?;
let web_socket_url: Uri = opt.web_socket_url.parse()?; let web_socket_url: Uri = opt.web_socket_url.parse()?;
let xmpp_domain = opt let xmpp_domain = opt
@ -237,6 +262,8 @@ async fn main_inner() -> Result<()> {
video_codec, video_codec,
recv_pipeline_participant_template, recv_pipeline_participant_template,
send_video_height, send_video_height,
recv_video_scale_height,
recv_video_scale_width,
buffer_size, buffer_size,
start_bitrate, start_bitrate,
stereo, stereo,
@ -256,6 +283,8 @@ async fn main_inner() -> Result<()> {
extra_muc_features: vec![], extra_muc_features: vec![],
start_bitrate: start_bitrate.unwrap_or(800), start_bitrate: start_bitrate.unwrap_or(800),
stereo: stereo.unwrap_or_default(), stereo: stereo.unwrap_or_default(),
recv_video_scale_height,
recv_video_scale_width,
buffer_size, buffer_size,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
log_rtp, log_rtp,
@ -269,26 +298,22 @@ async fn main_inner() -> Result<()> {
.await .await
.context("failed to join conference")?; .context("failed to join conference")?;
if let Some(height) = send_video_height { conference.set_send_resolution(send_video_height.into()).await;
conference.set_send_resolution(height.into()).await;
}
if opt.select_endpoints.is_some() || opt.last_n.is_some() || opt.recv_video_height.is_some() { conference
conference .send_colibri_message(ColibriMessage::ReceiverVideoConstraints {
.send_colibri_message(ColibriMessage::ReceiverVideoConstraints { last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)),
last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)), selected_endpoints: opt
selected_endpoints: opt .select_endpoints
.select_endpoints .map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()),
.map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()), on_stage_endpoints: None,
on_stage_endpoints: None, default_constraints: Some(Constraints {
default_constraints: opt.recv_video_height.map(|height| Constraints { max_height: Some(opt.recv_video_scale_height.into()),
max_height: Some(height.into()), ideal_height: None,
ideal_height: None, }),
}), constraints: None,
constraints: None, })
}) .await?;
.await?;
}
if let Some(video_type) = opt.video_type { if let Some(video_type) = opt.video_type {
conference conference
@ -328,6 +353,20 @@ async fn main_inner() -> Result<()> {
conference.set_muted(MediaType::Video, true).await?; conference.set_muted(MediaType::Video, true).await?;
} }
if let Some(bin) = recv_pipeline {
conference.add_bin(&bin).await?;
if let Some(audio_element) = bin.by_name("audio") {
info!("recv pipeline has an audio element, a sink pad will be requested from it for each participant");
conference.set_remote_participant_audio_sink_element(Some(audio_element)).await;
}
if let Some(video_element) = bin.by_name("video") {
info!("recv pipeline has a video element, a sink pad will be requested from it for each participant");
conference.set_remote_participant_video_sink_element(Some(video_element)).await;
}
}
conference conference
.on_participant(move |conference, participant| { .on_participant(move |conference, participant| {
let recv_pipeline_participant_template = recv_pipeline_participant_template.clone(); let recv_pipeline_participant_template = recv_pipeline_participant_template.clone();
@ -364,9 +403,6 @@ async fn main_inner() -> Result<()> {
)?; )?;
bin.add_pad(&GhostPad::with_target(Some("audio"), &sink_pad)?)?; bin.add_pad(&GhostPad::with_target(Some("audio"), &sink_pad)?)?;
} }
else {
info!("No audio sink element found in recv pipeline participant template");
}
if let Some(video_sink_element) = bin.by_name("video") { if let Some(video_sink_element) = bin.by_name("video") {
let sink_pad = video_sink_element.static_pad("sink").context( let sink_pad = video_sink_element.static_pad("sink").context(
@ -374,9 +410,6 @@ async fn main_inner() -> Result<()> {
)?; )?;
bin.add_pad(&GhostPad::with_target(Some("video"), &sink_pad)?)?; bin.add_pad(&GhostPad::with_target(Some("video"), &sink_pad)?)?;
} }
else {
info!("No video sink element found in recv pipeline participant template");
}
bin.set_property( bin.set_property(
"name", "name",
@ -384,9 +417,6 @@ async fn main_inner() -> Result<()> {
); );
conference.add_bin(&bin).await?; conference.add_bin(&bin).await?;
} }
else {
info!("No template for handling new participant");
}
Ok(()) Ok(())
}) })

View File

@ -160,7 +160,12 @@ pub unsafe extern "C" fn gstmeet_connection_join_conference(
// TODO // TODO
start_bitrate: 800, start_bitrate: 800,
stereo: false, stereo: false,
recv_video_scale_width: 1280,
recv_video_scale_height: 720,
buffer_size: 200, buffer_size: 200,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
log_rtp: false, log_rtp: false,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]

View File

@ -78,9 +78,15 @@ pub struct JitsiConferenceConfig {
pub region: Option<String>, pub region: Option<String>,
pub video_codec: String, pub video_codec: String,
pub extra_muc_features: Vec<String>, pub extra_muc_features: Vec<String>,
pub start_bitrate: u32, pub start_bitrate: u32,
pub stereo: bool, pub stereo: bool,
pub recv_video_scale_width: u16,
pub recv_video_scale_height: u16,
pub buffer_size: u32, pub buffer_size: u32,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
pub log_rtp: bool, pub log_rtp: bool,
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
@ -120,6 +126,8 @@ type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
pub(crate) struct JitsiConferenceInner { pub(crate) struct JitsiConferenceInner {
participants: HashMap<String, Participant>, participants: HashMap<String, Participant>,
audio_sink: Option<gstreamer::Element>,
video_sink: Option<gstreamer::Element>,
on_participant: on_participant:
Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>, Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_participant_left: on_participant_left:
@ -214,6 +222,8 @@ impl JitsiConference {
state: JitsiConferenceState::Discovering, state: JitsiConferenceState::Discovering,
presence, presence,
participants: HashMap::new(), participants: HashMap::new(),
audio_sink: None,
video_sink: None,
on_participant: None, on_participant: None,
on_participant_left: None, on_participant_left: None,
on_colibri_message: None, on_colibri_message: None,
@ -326,6 +336,22 @@ impl JitsiConference {
Ok(()) Ok(())
} }
pub async fn remote_participant_audio_sink_element(&self) -> Option<gstreamer::Element> {
self.inner.lock().await.audio_sink.as_ref().cloned()
}
pub async fn set_remote_participant_audio_sink_element(&self, sink: Option<gstreamer::Element>) {
self.inner.lock().await.audio_sink = sink;
}
pub async fn remote_participant_video_sink_element(&self) -> Option<gstreamer::Element> {
self.inner.lock().await.video_sink.as_ref().cloned()
}
pub async fn set_remote_participant_video_sink_element(&self, sink: Option<gstreamer::Element>) {
self.inner.lock().await.video_sink = sink;
}
pub async fn audio_sink_element(&self) -> Result<gstreamer::Element> { pub async fn audio_sink_element(&self) -> Result<gstreamer::Element> {
Ok( Ok(
self self

View File

@ -2,8 +2,8 @@ use std::{collections::HashMap, fmt, net::SocketAddr};
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use glib::{ObjectExt, ToValue}; use glib::{Cast, ObjectExt, ToValue};
use gstreamer::{Element, prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt}}; use gstreamer::{Bin, Element, GhostPad, prelude::{ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstObjectExt, PadExt}};
use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension}; use gstreamer_rtp::{prelude::RTPHeaderExtensionExt, RTPHeaderExtension};
#[cfg(feature = "log-rtp")] #[cfg(feature = "log-rtp")]
use gstreamer_rtp::RTPBuffer; use gstreamer_rtp::RTPBuffer;
@ -21,7 +21,7 @@ use rand::random;
use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256}; use rcgen::{Certificate, CertificateParams, PKCS_ECDSA_P256_SHA256};
use ring::digest::{digest, SHA256}; use ring::digest::{digest, SHA256};
use tokio::{net::lookup_host, runtime::Handle, sync::oneshot, task::JoinHandle}; use tokio::{net::lookup_host, runtime::Handle, sync::oneshot, task::JoinHandle};
use tracing::{debug, error, warn}; use tracing::{debug, error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use xmpp_parsers::{ use xmpp_parsers::{
hashes::Algo, hashes::Algo,
@ -107,7 +107,7 @@ impl Codec {
} }
} }
fn make_depay_name(&self) -> &'static str { fn depayloader_name(&self) -> &'static str {
match self.name { match self.name {
CodecName::Opus => "rtpopusdepay", CodecName::Opus => "rtpopusdepay",
CodecName::H264 => "rtph264depay", CodecName::H264 => "rtph264depay",
@ -116,7 +116,16 @@ impl Codec {
} }
} }
fn make_pay_name(&self) -> &'static str { fn decoder_name(&self) -> &'static str {
match self.name {
CodecName::Opus => "opusdec",
CodecName::H264 => "avdec_h264",
CodecName::Vp8 => "vp8dec",
CodecName::Vp9 => "vp9dec",
}
}
fn payloader_name(&self) -> &'static str {
match self.name { match self.name {
CodecName::Opus => "rtpopuspay", CodecName::Opus => "rtpopuspay",
CodecName::H264 => "rtph264pay", CodecName::H264 => "rtph264pay",
@ -802,14 +811,14 @@ impl JingleSession {
debug!("pad added for remote source: {:?}", source); debug!("pad added for remote source: {:?}", source);
let source_element = match source.media_type { let depayloader = match source.media_type {
MediaType::Audio => { MediaType::Audio => {
let codec = codecs let codec = codecs
.iter() .iter()
.filter(|codec| codec.is_audio()) .filter(|codec| codec.is_audio())
.find(|codec| codec.is(pt)); .find(|codec| codec.is(pt));
if let Some(codec) = codec { if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.make_depay_name(), None)? gstreamer::ElementFactory::make(codec.depayloader_name(), None)?
} }
else { else {
bail!("received audio with unsupported PT {}", pt); bail!("received audio with unsupported PT {}", pt);
@ -821,7 +830,7 @@ impl JingleSession {
.filter(|codec| codec.is_video()) .filter(|codec| codec.is_video())
.find(|codec| codec.is(pt)); .find(|codec| codec.is(pt));
if let Some(codec) = codec { if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.make_depay_name(), None)? gstreamer::ElementFactory::make(codec.depayloader_name(), None)?
} }
else { else {
bail!("received video with unsupported PT {}", pt); bail!("received video with unsupported PT {}", pt);
@ -829,8 +838,8 @@ impl JingleSession {
}, },
}; };
source_element.set_property("auto-header-extension", false); depayloader.set_property("auto-header-extension", false);
source_element.connect("request-extension", false, move |values| { depayloader.connect("request-extension", false, move |values| {
let f = || { let f = || {
let ext_id: u32 = values[1].get()?; let ext_id: u32 = values[1].get()?;
let ext_uri: String = values[2].get()?; let ext_uri: String = values[2].get()?;
@ -850,25 +859,101 @@ impl JingleSession {
}); });
pipeline pipeline
.add(&source_element) .add(&depayloader)
.context("failed to add depayloader to pipeline")?; .context("failed to add depayloader to pipeline")?;
source_element.sync_state_with_parent()?; depayloader.sync_state_with_parent()?;
debug!("created depayloader"); debug!("created depayloader");
rtpbin rtpbin
.link_pads(Some(&pad_name), &source_element, None) .link_pads(Some(&pad_name), &depayloader, None)
.context(format!("failed to link rtpbin.{} to depayloader", pad_name))?; .context(format!("failed to link rtpbin.{} to depayloader", pad_name))?;
debug!("linked rtpbin.{} to depayloader", pad_name); debug!("linked rtpbin.{} to depayloader", pad_name);
debug!("rtpbin pads:\n{}", dump_pads(&rtpbin)); debug!("rtpbin pads:\n{}", dump_pads(&rtpbin));
let src_pad = source_element let queue = gstreamer::ElementFactory::make("queue", None)?;
.static_pad("src") pipeline.add(&queue).context("failed to add queue to pipeline")?;
.context("depayloader has no src pad")?; queue.sync_state_with_parent()?;
depayloader.link(&queue).context("failed to link depayloader to queue")?;
let decoder = match source.media_type {
MediaType::Audio => {
let codec = codecs
.iter()
.filter(|codec| codec.is_audio())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
gstreamer::ElementFactory::make(codec.decoder_name(), None)?
// TODO: fec
}
else {
bail!("received audio with unsupported PT {}", pt);
}
},
MediaType::Video => {
let codec = codecs
.iter()
.filter(|codec| codec.is_video())
.find(|codec| codec.is(pt));
if let Some(codec) = codec {
let decoder = gstreamer::ElementFactory::make(codec.decoder_name(), None)?;
decoder.set_property("automatic-request-sync-points", true);
decoder.set_property_from_str("automatic-request-sync-point-flags", "GST_VIDEO_DECODER_REQUEST_SYNC_POINT_CORRUPT_OUTPUT");
decoder
}
else {
bail!("received video with unsupported PT {}", pt);
}
},
};
pipeline.add(&decoder).context("failed to add decoder to pipeline")?;
decoder.sync_state_with_parent()?;
queue.link(&decoder).context("failed to link queue to decoder")?;
let src_pad = match source.media_type {
MediaType::Audio => decoder
.static_pad("src")
.context("decoder has no src pad")?,
MediaType::Video => {
let videoscale = gstreamer::ElementFactory::make("videoscale", None)?;
pipeline.add(&videoscale).context("failed to add videoscale to pipeline")?;
videoscale.sync_state_with_parent()?;
decoder.link(&videoscale).context("failed to link decoder to videoscale")?;
let capsfilter = gstreamer::ElementFactory::make("capsfilter", None)?;
capsfilter.set_property_from_str("caps", &format!("video/x-raw, width={}, height={}", conference.config.recv_video_scale_width, conference.config.recv_video_scale_height));
pipeline.add(&capsfilter).context("failed to add capsfilter to pipeline")?;
capsfilter.sync_state_with_parent()?;
videoscale.link(&capsfilter).context("failed to link videoscale to capsfilter")?;
let videoconvert = gstreamer::ElementFactory::make("videoconvert", None)?;
pipeline.add(&videoconvert).context("failed to add videoconvert to pipeline")?;
videoconvert.sync_state_with_parent()?;
capsfilter.link(&videoconvert).context("failed to link capsfilter to videoconvert")?;
videoconvert.static_pad("src").context("videoconvert has no src pad")?
},
};
if let Some(participant_id) = source.participant_id { if let Some(participant_id) = source.participant_id {
handle.block_on(conference.ensure_participant(&participant_id))?; handle.block_on(conference.ensure_participant(&participant_id))?;
if let Some(participant_bin) = let maybe_sink_element = match source.media_type {
MediaType::Audio => handle.block_on(conference.remote_participant_audio_sink_element()),
MediaType::Video => handle.block_on(conference.remote_participant_video_sink_element()),
};
if let Some(sink_element) = maybe_sink_element {
let sink_pad = sink_element
.request_pad_simple("sink_%u")
.context("no suitable sink pad provided by sink element in recv pipeline")?;
let ghost_pad = GhostPad::with_target(Some(&format!("participant_{}_{:?}", participant_id, source.media_type)), &sink_pad)?;
let bin: Bin = sink_element.parent().context("sink element has no parent")?.downcast().map_err(|_| anyhow!("sink element's parent is not a bin"))?;
bin.add_pad(&ghost_pad)?;
src_pad.link(&ghost_pad).context("failed to link decode chain to participant bin from recv pipeline")?;
info!("linked {}/{:?} to new pad in recv pipeline", participant_id, source.media_type);
}
else if let Some(participant_bin) =
pipeline.by_name(&format!("participant_{}", participant_id)) pipeline.by_name(&format!("participant_{}", participant_id))
{ {
let sink_pad_name = match source.media_type { let sink_pad_name = match source.media_type {
@ -876,18 +961,18 @@ impl JingleSession {
MediaType::Video => "video", MediaType::Video => "video",
}; };
if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) { if let Some(sink_pad) = participant_bin.static_pad(sink_pad_name) {
debug!("linking depayloader to participant bin"); src_pad.link(&sink_pad).context("failed to link decode chain to participant bin from recv participant pipeline")?;
src_pad.link(&sink_pad)?; info!("linked {}/{:?} to recv participant pipeline", participant_id, source.media_type);
} }
else { else {
warn!( warn!(
"no {} sink pad in {} participant bin", "no {} sink pad on {} participant bin in recv participant pipeline",
sink_pad_name, participant_id sink_pad_name, participant_id
); );
} }
} }
else { else {
debug!("no participant bin for {}", participant_id); warn!("no pipeline handled new participant: {}", participant_id);
} }
} }
else { else {
@ -895,11 +980,12 @@ impl JingleSession {
} }
if !src_pad.is_linked() { if !src_pad.is_linked() {
debug!("nothing linked to depayloader, adding fakesink"); debug!("nothing linked to decoder, adding fakesink");
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?; let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
pipeline.add(&fakesink)?; pipeline.add(&fakesink)?;
fakesink.sync_state_with_parent()?; fakesink.sync_state_with_parent()?;
source_element.link(&fakesink)?; let sink_pad = fakesink.static_pad("sink").context("fakesink has no sink pad")?;
src_pad.link(&sink_pad)?;
} }
gstreamer::debug_bin_to_dot_file( gstreamer::debug_bin_to_dot_file(
@ -923,7 +1009,7 @@ impl JingleSession {
let opus = codecs.iter().find(|codec| codec.name == CodecName::Opus); let opus = codecs.iter().find(|codec| codec.name == CodecName::Opus);
let audio_sink_element = if let Some(opus) = opus { let audio_sink_element = if let Some(opus) = opus {
let audio_sink_element = gstreamer::ElementFactory::make(opus.make_pay_name(), None)?; let audio_sink_element = gstreamer::ElementFactory::make(opus.payloader_name(), None)?;
audio_sink_element.set_property("pt", opus.pt as u32); audio_sink_element.set_property("pt", opus.pt as u32);
audio_sink_element audio_sink_element
} }
@ -964,7 +1050,7 @@ impl JingleSession {
let codec_name = conference.config.video_codec.as_str(); let codec_name = conference.config.video_codec.as_str();
let codec = codecs.iter().find(|codec| codec.is_codec(codec_name)); let codec = codecs.iter().find(|codec| codec.is_codec(codec_name));
let video_sink_element = if let Some(codec) = codec { let video_sink_element = if let Some(codec) = codec {
let element = gstreamer::ElementFactory::make(codec.make_pay_name(), None)?; let element = gstreamer::ElementFactory::make(codec.payloader_name(), None)?;
element.set_property("pt", codec.pt as u32); element.set_property("pt", codec.pt as u32);
if codec.name == CodecName::H264 { if codec.name == CodecName::H264 {
element.set_property_from_str("aggregate-mode", "zero-latency"); element.set_property_from_str("aggregate-mode", "zero-latency");

View File

@ -10,6 +10,13 @@ let
mesonFlags = old.mesonFlags ++ ["-Dgupnp=disabled" "-Dgtk_doc=disabled"]; mesonFlags = old.mesonFlags ++ ["-Dgupnp=disabled" "-Dgtk_doc=disabled"];
meta.platforms = lib.platforms.unix; meta.platforms = lib.platforms.unix;
}); });
gst-plugins-bad-patched = gst_all_1.gst-plugins-bad.override {
faacSupport = true;
};
gst-plugins-ugly-patched = gst_all_1.gst-plugins-ugly.overrideAttrs(old: rec {
buildInputs = lib.lists.subtractLists [a52dec] old.buildInputs;
mesonFlags = old.mesonFlags ++ ["-Da52dec=disabled"];
});
in in
mkShell { mkShell {
name = "gst-meet"; name = "gst-meet";
@ -22,7 +29,8 @@ mkShell {
gst_all_1.gstreamer gst_all_1.gstreamer
gst_all_1.gst-plugins-base gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst-plugins-bad-patched
gst-plugins-ugly-patched
libnice-patched libnice-patched
] ++ (if stdenv.isDarwin then [ ] ++ (if stdenv.isDarwin then [
darwin.apple_sdk.frameworks.AppKit darwin.apple_sdk.frameworks.AppKit