diff --git a/Cargo.lock b/Cargo.lock index 67e9eff..b6f8400 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,7 +473,7 @@ dependencies = [ [[package]] name = "gst-meet" -version = "0.2.0" +version = "0.2.2" dependencies = [ "anyhow", "cocoa", @@ -639,7 +639,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "lib-gst-meet" -version = "0.2.1" +version = "0.3.0" dependencies = [ "anyhow", "async-stream", @@ -658,6 +658,8 @@ dependencies = [ "rand", "rcgen", "ring", + "serde", + "serde_json", "tokio", "tokio-stream", "tokio-tungstenite", @@ -1153,6 +1155,12 @@ dependencies = [ "webpki", ] +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1174,6 +1182,31 @@ name = "serde" version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.127" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" +dependencies = [ + "itoa", + "ryu", + "serde", +] [[package]] name = "sha-1" diff --git a/gst-meet/Cargo.toml b/gst-meet/Cargo.toml index 6e62151..ba8e1ed 100644 --- a/gst-meet/Cargo.toml +++ b/gst-meet/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "gst-meet" description = "A tool for connecting GStreamer pipelines to Jitsi Meet conferences" -version = "0.2.1" +version = "0.2.2" edition = "2018" license = "MIT/Apache-2.0" authors = ["Jasper Hugo "] @@ -11,7 +11,7 @@ anyhow = { version = "1", default-features = false, features = ["std"] } futures = { version = "0.3", default-features = false } glib = { version = "0.14", default-features = false, features = ["log"] } gstreamer = { version = "0.17", default-features = false, features = ["v1_16"] } -lib-gst-meet = { version = "0.2", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } +lib-gst-meet = { version = "0.3", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } structopt = { version = "0.3", default-features = false } tokio = { version = "1", default-features = false, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tokio-stream = { version = "0.1", default-features = false } diff --git a/gst-meet/src/main.rs b/gst-meet/src/main.rs index 16d473c..1934503 100644 --- a/gst-meet/src/main.rs +++ b/gst-meet/src/main.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::{Context, Result}; #[cfg(target_os = "macos")] use cocoa::appkit::NSApplication; +use glib::ObjectExt; use gstreamer::{ prelude::{ElementExt, GstBinExt}, GhostPad, @@ -10,7 +11,7 @@ use gstreamer::{ use lib_gst_meet::{init_tracing, JitsiConferenceConfig, JitsiConnection}; use structopt::StructOpt; use tokio::{signal::ctrl_c, task, time::timeout}; -use tracing::{info, warn}; +use tracing::{info, trace, warn}; #[derive(Debug, Clone, StructOpt)] #[structopt( @@ -71,6 +72,12 @@ fn main() { } } +fn init_gstreamer() { + trace!("starting gstreamer init"); + gstreamer::init().expect("gstreamer init failed"); + trace!("finished gstreamer init"); +} + async fn main_inner() -> Result<()> { let opt = Opt::from_args(); @@ -79,10 +86,9 @@ async fn main_inner() -> Result<()> { 1 => tracing::Level::DEBUG, _ => tracing::Level::TRACE, }); - glib::log_set_default_handler(glib::rust_log_handler); - let main_loop = glib::MainLoop::new(None, false); - gstreamer::init().unwrap(); + + init_gstreamer(); let parsed_bin = opt .send_pipeline @@ -127,6 +133,8 @@ async fn main_inner() -> Result<()> { video_codec, }; + let main_loop = glib::MainLoop::new(None, false); + let conference = connection .join_conference(main_loop.context(), config) .await?; @@ -148,7 +156,7 @@ async fn main_inner() -> Result<()> { } conference - .on_participant(move |participant| { + .on_participant(move |conference, participant| { let recv_pipeline_participant_template = recv_pipeline_participant_template.clone(); Box::pin(async move { info!("New participant: {:?}", participant); @@ -186,18 +194,20 @@ async fn main_inner() -> Result<()> { info!("No video sink element found in recv pipeline participant template"); } - Ok(Some(bin)) + bin.set_property("name", format!("participant_{}", participant.muc_jid.resource))?; + conference.add_bin(&bin).await?; } else { info!("No template for handling new participant"); - Ok(None) } + + Ok(()) }) }) .await; conference - .on_participant_left(move |participant| { + .on_participant_left(move |_conference, participant| { Box::pin(async move { info!("Participant left: {:?}", participant); Ok(()) @@ -205,6 +215,15 @@ async fn main_inner() -> Result<()> { }) .await; + conference + .on_colibri_message(move |_conference, message| { + Box::pin(async move { + info!("Colibri message: {:?}", message); + Ok(()) + }) + }) + .await; + conference .set_pipeline_state(gstreamer::State::Playing) .await?; diff --git a/lib-gst-meet-c/Cargo.toml b/lib-gst-meet-c/Cargo.toml index c060b67..5f5ab02 100644 --- a/lib-gst-meet-c/Cargo.toml +++ b/lib-gst-meet-c/Cargo.toml @@ -10,7 +10,7 @@ authors = ["Jasper Hugo "] anyhow = { version = "1", default-features = false } glib = { version = "0.14", default-features = false } gstreamer = { version = "0.17", default-features = false } -lib-gst-meet = { version = "0.2", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } +lib-gst-meet = { version = "0.3", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } tracing = { version = "0.1", default-features = false } diff --git a/lib-gst-meet-c/src/lib.rs b/lib-gst-meet-c/src/lib.rs index a6feb1b..7abdac7 100644 --- a/lib-gst-meet-c/src/lib.rs +++ b/lib-gst-meet-c/src/lib.rs @@ -187,13 +187,13 @@ pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Con pub unsafe extern "C" fn gstmeet_conference_on_participant( context: *mut Context, conference: *mut JitsiConference, - f: unsafe extern "C" fn(Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin, + f: unsafe extern "C" fn(*mut JitsiConference, Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin, ctx: *mut c_void, ) { let ctx = Arc::new(AtomicPtr::new(ctx)); (*context) .runtime - .block_on((*conference).on_participant(move |participant| { + .block_on((*conference).on_participant(move |conference, participant| { let ctx = ctx.clone(); Box::pin(async move { let participant = Participant { @@ -205,13 +205,8 @@ pub unsafe extern "C" fn gstmeet_conference_on_participant( .transpose()? .unwrap_or_else(ptr::null), }; - let maybe_bin = f(participant, ctx.load(Ordering::Relaxed)); - if maybe_bin.is_null() { - Ok(None) - } - else { - Ok(Some(from_glib_full(maybe_bin))) - } + f(Box::into_raw(Box::new(conference)), participant, ctx.load(Ordering::Relaxed)); + Ok(()) }) })); } diff --git a/lib-gst-meet/Cargo.toml b/lib-gst-meet/Cargo.toml index 348cd5d..2a8e543 100644 --- a/lib-gst-meet/Cargo.toml +++ b/lib-gst-meet/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lib-gst-meet" description = "Connect GStreamer pipelines to Jitsi Meet conferences" -version = "0.2.1" +version = "0.3.0" edition = "2018" license = "MIT/Apache-2.0" authors = ["Jasper Hugo "] @@ -24,6 +24,8 @@ pem = { version = "0.8", default-features = false } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } rcgen = { version = "0.8", default-features = false } ring = { version = "0.16", default-features = false } +serde = { version = "1", default-features = false, features = ["derive"] } +serde_json = { version = "1", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros", "sync", "time"] } tokio-stream = { version = "0.1", default-features = false, features = ["time"] } tokio-tungstenite = { version = "0.14", default-features = false, features = ["connect", "rustls-tls"] } diff --git a/lib-gst-meet/src/colibri.rs b/lib-gst-meet/src/colibri.rs new file mode 100644 index 0000000..08e1b79 --- /dev/null +++ b/lib-gst-meet/src/colibri.rs @@ -0,0 +1,203 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Result; +use futures::{ + sink::SinkExt, + stream::{StreamExt, TryStreamExt}, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, Mutex}; +use tokio_stream::wrappers::ReceiverStream; +use tokio_tungstenite::tungstenite::{http::Request, Message}; +use tracing::{debug, error, info, warn}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(tag = "colibriClass")] +pub enum ColibriMessage { + #[serde(rename_all = "camelCase")] + DominantSpeakerEndpointChangeEvent { + dominant_speaker_endpoint: String, + previous_speakers: Vec, + }, + #[serde(rename_all = "camelCase")] + EndpointConnectivityStatusChangeEvent { + endpoint: String, + active: bool, + }, + #[serde(rename_all = "camelCase")] + EndpointMessage { + from: String, + to: Option, + msg_payload: serde_json::Value, + }, + #[serde(rename_all = "camelCase")] + EndpointStats { + from: String, + bitrate: Bitrates, + packet_loss: PacketLoss, + connection_quality: f32, + #[serde(rename = "jvbRTT")] + jvb_rtt: u16, + server_region: String, + max_enabled_resolution: u16, + }, + #[serde(rename_all = "camelCase")] + LastNChangedEvent { + last_n: u16, + }, + #[serde(rename_all = "camelCase")] + LastNEndpointsChangeEvent { + last_n_endpoints: Vec, + }, + #[serde(rename_all = "camelCase")] + ReceiverVideoConstraint { + max_frame_height: u16, + }, + #[serde(rename_all = "camelCase")] + ReceiverVideoConstraints { + last_n: Option, + selected_endpoints: Option>, + on_stage_endpoints: Option>, + default_constraints: Option, + constraints: HashMap, + }, + #[serde(rename_all = "camelCase")] + SelectedEndpointsChangedEvent { + selected_endpoints: Vec, + }, + #[serde(rename_all = "camelCase")] + SenderVideoConstraints { + video_constraints: Constraints, + }, + #[serde(rename_all = "camelCase")] + ServerHello { + version: Option, + }, + #[serde(rename_all = "camelCase")] + VideoTypeMessage { + video_type: String, + }, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Constraints { + ideal_height: Option, + max_height: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Bitrates { + audio: Bitrate, + video: Bitrate, + #[serde(flatten)] + total: Bitrate, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Bitrate { + upload: u32, + download: u32, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PacketLoss { + total: u32, + download: u32, + upload: u32, +} + +pub(crate) struct ColibriChannel { + send_tx: mpsc::Sender, + recv_tx: Arc>>>, +} + +impl ColibriChannel { + pub(crate) async fn new(colibri_url: &str) -> Result { + let request = + Request::get(colibri_url).body(())?; + let (colibri_websocket, _response) = + tokio_tungstenite::connect_async(request).await?; + + info!("Connected Colibri WebSocket"); + + let (mut colibri_sink, mut colibri_stream) = colibri_websocket.split(); + let recv_tx: Arc>>> = Arc::new(Mutex::new(vec![])); + let recv_task = { + let recv_tx = recv_tx.clone(); + tokio::spawn(async move { + while let Some(msg) = colibri_stream.try_next().await? { + match msg { + Message::Text(text) => { + debug!("colibri: {}", text); + match serde_json::from_str::(&text) { + Ok(colibri_msg) => { + let mut txs = recv_tx.lock().await; + let txs_clone = txs.clone(); + for (i, tx) in txs_clone.iter().enumerate().rev() { + if let Err(_) = tx.send(colibri_msg.clone()).await { + debug!("colibri subscriber closed, removing"); + txs.remove(i); + } + } + }, + Err(e) => warn!("failed to parse frame on colibri websocket: {:?}\nframe: {}", e, text), + } + }, + Message::Binary(data) => debug!("received unexpected {} byte binary frame on colibri websocket", data.len()), + Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite + Message::Close(_) => { + debug!("received close frame on colibri websocket"); + // TODO reconnect + break; + }, + } + } + Ok::<_, anyhow::Error>(()) + }) + }; + + let (send_tx, send_rx) = mpsc::channel(8); + let send_task = tokio::spawn(async move { + let mut stream = ReceiverStream::new(send_rx); + while let Some(colibri_msg) = stream.next().await { + match serde_json::to_string(&colibri_msg) { + Ok(json) => { + let msg = Message::Text(json); + colibri_sink.send(msg).await?; + }, + Err(e) => warn!("failed to serialise colibri message: {:?}", e), + } + } + Ok::<_, anyhow::Error>(()) + }); + + tokio::spawn(async move { + tokio::select! { + res = recv_task => if let Ok(Err(e)) = res { + error!("colibri recv loop: {:?}", e); + }, + res = send_task => if let Ok(Err(e)) = res { + error!("colibri send loop: {:?}", e); + }, + }; + }); + + Ok(Self { + send_tx, + recv_tx, + }) + } + + pub(crate) async fn subscribe(&self, tx: mpsc::Sender) { + self.recv_tx.lock().await.push(tx); + } + + pub(crate) async fn send(&self, msg: ColibriMessage) -> Result<()> { + self.send_tx.send(msg).await?; + Ok(()) + } +} \ No newline at end of file diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 7f6f5e0..40470c7 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -3,7 +3,6 @@ use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin, use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use futures::stream::StreamExt; -use glib::ObjectExt; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; use once_cell::sync::Lazy; use tokio::sync::{mpsc, oneshot, Mutex}; @@ -21,7 +20,13 @@ use xmpp_parsers::{ BareJid, Element, FullJid, Jid, }; -use crate::{jingle::JingleSession, source::MediaType, stanza_filter::StanzaFilter, xmpp}; +use crate::{ + colibri::{ColibriChannel, ColibriMessage}, + jingle::JingleSession, + source::MediaType, + stanza_filter::StanzaFilter, + xmpp, +}; static DISCO_INFO: Lazy = Lazy::new(|| DiscoInfoResult { node: None, @@ -91,17 +96,16 @@ pub struct Participant { pub jid: FullJid, pub muc_jid: FullJid, pub nick: Option, - bin: Option, } -type BoxedBinResultFuture = Pin>> + Send>>; type BoxedResultFuture = Pin> + Send>>; pub(crate) struct JitsiConferenceInner { pub(crate) jingle_session: Option, participants: HashMap, - on_participant: Option BoxedBinResultFuture) + Send + Sync>>, - on_participant_left: Option BoxedResultFuture) + Send + Sync>>, + on_participant: Option BoxedResultFuture) + Send + Sync>>, + on_participant_left: Option BoxedResultFuture) + Send + Sync>>, + on_colibri_message: Option BoxedResultFuture) + Send + Sync>>, state: JitsiConferenceState, connected_tx: Option>, connected_rx: Option>, @@ -136,6 +140,7 @@ impl JitsiConference { participants: HashMap::new(), on_participant: None, on_participant_left: None, + on_colibri_message: None, jingle_session: None, connected_tx: Some(tx), connected_rx: Some(rx), @@ -269,8 +274,23 @@ impl JitsiConference { ) } + pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> { + self + .inner + .lock() + .await + .jingle_session + .as_ref() + .context("not connected (no jingle session)")? + .colibri_channel + .as_ref() + .context("no colibri channel")? + .send(message) + .await + } + #[tracing::instrument(level = "trace", skip(f))] - pub async fn on_participant(&self, f: impl (Fn(Participant) -> BoxedBinResultFuture) + Send + Sync + 'static) { + pub async fn on_participant(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) { let f = Arc::new(f); let f2 = f.clone(); let existing_participants: Vec<_> = { @@ -283,37 +303,21 @@ impl JitsiConference { "calling on_participant with existing participant: {:?}", participant ); - match f(participant.clone()).await { - Ok(Some(bin)) => { - bin - .set_property( - "name", - format!("participant_{}", participant.muc_jid.resource), - ) - .unwrap(); - match self.add_bin(&bin).await { - Ok(_) => { - let mut locked_inner = self.inner.lock().await; - if let Some(p) = locked_inner - .participants - .get_mut(&participant.muc_jid.resource) - { - p.bin = Some(bin); - } - }, - Err(e) => warn!("failed to add participant bin: {:?}", e), - } - }, - Ok(None) => {}, - Err(e) => warn!("on_participant failed: {:?}", e), + if let Err(e) = f(self.clone(), participant.clone()).await { + warn!("on_participant failed: {:?}", e); } } } #[tracing::instrument(level = "trace", skip(f))] - pub async fn on_participant_left(&self, f: impl (Fn(Participant) -> BoxedResultFuture) + Send + Sync + 'static) { + pub async fn on_participant_left(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) { self.inner.lock().await.on_participant_left = Some(Arc::new(f)); } + + #[tracing::instrument(level = "trace", skip(f))] + pub async fn on_colibri_message(&self, f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static) { + self.inner.lock().await.on_colibri_message = Some(Arc::new(f)); + } } #[async_trait] @@ -463,37 +467,22 @@ impl StanzaFilter for JitsiConference { if let Some(colibri_url) = colibri_url { info!("Connecting Colibri WebSocket to {}", colibri_url); + let colibri_channel = ColibriChannel::new(&colibri_url).await?; + let (tx, rx) = mpsc::channel(8); + colibri_channel.subscribe(tx).await; + locked_inner.jingle_session.as_mut().unwrap().colibri_channel = Some(colibri_channel); - let request = - tokio_tungstenite::tungstenite::http::Request::get(colibri_url).body(())?; - let (colibri_websocket, _response) = - tokio_tungstenite::connect_async(request).await?; - info!("Connected Colibri WebSocket"); - - let (colibri_sink, mut colibri_stream) = colibri_websocket.split(); - let colibri_receive_task = tokio::spawn(async move { - while let Some(msg) = colibri_stream.next().await { - debug!("colibri: {:?}", msg); - } - Ok::<_, anyhow::Error>(()) - }); - let (colibri_tx, colibri_rx) = mpsc::channel(8); - locked_inner.jingle_session.as_mut().unwrap().colibri_tx = Some(colibri_tx); - let colibri_transmit_task = tokio::spawn(async move { - let stream = ReceiverStream::new(colibri_rx); - stream.forward(colibri_sink).await?; - Ok::<_, anyhow::Error>(()) - }); - + let self_ = self.clone(); tokio::spawn(async move { - tokio::select! { - res = colibri_receive_task => if let Ok(Err(e)) = res { - error!("colibri read loop: {:?}", e); - }, - res = colibri_transmit_task => if let Ok(Err(e)) = res { - error!("colibri write loop: {:?}", e); - }, - }; + let mut stream = ReceiverStream::new(rx); + while let Some(msg) = stream.next().await { + let locked_inner = self_.inner.lock().await; + if let Some(f) = &locked_inner.on_colibri_message { + if let Err(e) = f(self_.clone(), msg).await { + warn!("on_colibri_message failed: {:?}", e); + } + } + } }); } @@ -531,13 +520,12 @@ impl StanzaFilter for JitsiConference { jid: jid.clone(), muc_jid: from.clone(), nick: item.nick, - bin: None, }; if presence.type_ == presence::Type::Unavailable && locked_inner.participants.remove(&from.resource.clone()).is_some() { debug!("participant left: {:?}", jid); if let Some(f) = &locked_inner.on_participant_left { debug!("calling on_participant_left with old participant"); - if let Err(e) = f(participant).await { + if let Err(e) = f(self.clone(), participant).await { warn!("on_participant_left failed: {:?}", e); } } @@ -550,20 +538,8 @@ impl StanzaFilter for JitsiConference { debug!("new participant: {:?}", jid); if let Some(f) = &locked_inner.on_participant { debug!("calling on_participant with new participant"); - match f(participant).await { - Ok(Some(bin)) => { - bin.set_property("name", format!("participant_{}", from.resource))?; - match self.add_bin(&bin).await { - Ok(_) => { - if let Some(p) = locked_inner.participants.get_mut(&from.resource) { - p.bin = Some(bin); - } - }, - Err(e) => warn!("failed to add participant bin: {:?}", e), - } - }, - Ok(None) => {}, - Err(e) => warn!("on_participant failed: {:?}", e), + if let Err(e) = f(self.clone(), participant).await { + warn!("on_participant failed: {:?}", e); } } } diff --git a/lib-gst-meet/src/jingle.rs b/lib-gst-meet/src/jingle.rs index 52f5eb6..1560a15 100644 --- a/lib-gst-meet/src/jingle.rs +++ b/lib-gst-meet/src/jingle.rs @@ -12,7 +12,7 @@ use ring::digest::{digest, SHA256}; use tokio::{ net::lookup_host, runtime::Handle, - sync::{mpsc, oneshot}, + sync::oneshot, }; use tracing::{debug, error, warn}; use uuid::Uuid; @@ -28,6 +28,7 @@ use xmpp_parsers::{ }; use crate::{ + colibri::ColibriChannel, conference::JitsiConference, source::{MediaType, Source}, util::generate_id, @@ -46,11 +47,7 @@ pub(crate) struct JingleSession { ice_component_id: u32, pub(crate) accept_iq_id: Option, pub(crate) colibri_url: Option, - pub(crate) colibri_tx: Option< - mpsc::Sender< - Result, - >, - >, + pub(crate) colibri_channel: Option, pipeline_state_null_rx: oneshot::Receiver<()>, } @@ -801,7 +798,7 @@ impl JingleSession { ice_component_id, accept_iq_id: Some(accept_iq_id), colibri_url, - colibri_tx: None, + colibri_channel: None, pipeline_state_null_rx, }) } diff --git a/lib-gst-meet/src/lib.rs b/lib-gst-meet/src/lib.rs index 785cc76..b93c900 100644 --- a/lib-gst-meet/src/lib.rs +++ b/lib-gst-meet/src/lib.rs @@ -1,16 +1,18 @@ -pub mod conference; -pub mod connection; +mod colibri; +mod conference; +mod connection; mod jingle; mod pinger; -pub mod source; +mod source; mod stanza_filter; mod util; mod xmpp; pub use crate::{ + colibri::ColibriMessage, conference::{JitsiConference, JitsiConferenceConfig, Participant}, connection::JitsiConnection, - source::{MediaType, Source}, + source::MediaType, }; #[cfg(feature = "tracing-subscriber")] diff --git a/lib-gst-meet/src/source.rs b/lib-gst-meet/src/source.rs index ac67277..9be8222 100644 --- a/lib-gst-meet/src/source.rs +++ b/lib-gst-meet/src/source.rs @@ -1,8 +1,8 @@ #[derive(Debug, Clone)] -pub struct Source { +pub(crate) struct Source { pub(crate) ssrc: u32, - pub participant_id: String, - pub media_type: MediaType, + pub(crate) participant_id: String, + pub(crate) media_type: MediaType, } #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] diff --git a/shell.nix b/shell.nix index b6efbeb..c54e988 100644 --- a/shell.nix +++ b/shell.nix @@ -1,17 +1,13 @@ with import {}; -let - gst-plugins-base = gst_all_1.gst-plugins-base.override { - enableCocoa = stdenv.isDarwin; - }; -in mkShell { name = "gst-meet"; buildInputs = [ + cargo pkg-config glib glib-networking gst_all_1.gstreamer - gst-plugins-base + gst_all_1.gst-plugins-base gst_all_1.gst-plugins-good gst_all_1.gst-plugins-bad libnice