lib-gst-meet 0.3: colibri message support, more flexible recv handling

This commit is contained in:
Jasper Hugo 2021-08-19 17:06:12 +07:00
parent e31c81b68e
commit 6f67e2536d
12 changed files with 342 additions and 119 deletions

37
Cargo.lock generated
View File

@ -473,7 +473,7 @@ dependencies = [
[[package]] [[package]]
name = "gst-meet" name = "gst-meet"
version = "0.2.0" version = "0.2.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"cocoa", "cocoa",
@ -639,7 +639,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "lib-gst-meet" name = "lib-gst-meet"
version = "0.2.1" version = "0.3.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -658,6 +658,8 @@ dependencies = [
"rand", "rand",
"rcgen", "rcgen",
"ring", "ring",
"serde",
"serde_json",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-tungstenite", "tokio-tungstenite",
@ -1153,6 +1155,12 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -1174,6 +1182,31 @@ name = "serde"
version = "1.0.127" version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8" checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"

View File

@ -1,7 +1,7 @@
[package] [package]
name = "gst-meet" name = "gst-meet"
description = "A tool for connecting GStreamer pipelines to Jitsi Meet conferences" description = "A tool for connecting GStreamer pipelines to Jitsi Meet conferences"
version = "0.2.1" version = "0.2.2"
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
authors = ["Jasper Hugo <jasper@avstack.io>"] authors = ["Jasper Hugo <jasper@avstack.io>"]
@ -11,7 +11,7 @@ anyhow = { version = "1", default-features = false, features = ["std"] }
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
glib = { version = "0.14", default-features = false, features = ["log"] } glib = { version = "0.14", default-features = false, features = ["log"] }
gstreamer = { version = "0.17", default-features = false, features = ["v1_16"] } gstreamer = { version = "0.17", default-features = false, features = ["v1_16"] }
lib-gst-meet = { version = "0.2", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } lib-gst-meet = { version = "0.3", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] }
structopt = { version = "0.3", default-features = false } structopt = { version = "0.3", default-features = false }
tokio = { version = "1", default-features = false, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tokio = { version = "1", default-features = false, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", default-features = false } tokio-stream = { version = "0.1", default-features = false }

View File

@ -3,6 +3,7 @@ use std::time::Duration;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
use cocoa::appkit::NSApplication; use cocoa::appkit::NSApplication;
use glib::ObjectExt;
use gstreamer::{ use gstreamer::{
prelude::{ElementExt, GstBinExt}, prelude::{ElementExt, GstBinExt},
GhostPad, GhostPad,
@ -10,7 +11,7 @@ use gstreamer::{
use lib_gst_meet::{init_tracing, JitsiConferenceConfig, JitsiConnection}; use lib_gst_meet::{init_tracing, JitsiConferenceConfig, JitsiConnection};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{signal::ctrl_c, task, time::timeout}; use tokio::{signal::ctrl_c, task, time::timeout};
use tracing::{info, warn}; use tracing::{info, trace, warn};
#[derive(Debug, Clone, StructOpt)] #[derive(Debug, Clone, StructOpt)]
#[structopt( #[structopt(
@ -71,6 +72,12 @@ fn main() {
} }
} }
fn init_gstreamer() {
trace!("starting gstreamer init");
gstreamer::init().expect("gstreamer init failed");
trace!("finished gstreamer init");
}
async fn main_inner() -> Result<()> { async fn main_inner() -> Result<()> {
let opt = Opt::from_args(); let opt = Opt::from_args();
@ -79,10 +86,9 @@ async fn main_inner() -> Result<()> {
1 => tracing::Level::DEBUG, 1 => tracing::Level::DEBUG,
_ => tracing::Level::TRACE, _ => tracing::Level::TRACE,
}); });
glib::log_set_default_handler(glib::rust_log_handler); glib::log_set_default_handler(glib::rust_log_handler);
let main_loop = glib::MainLoop::new(None, false);
gstreamer::init().unwrap(); init_gstreamer();
let parsed_bin = opt let parsed_bin = opt
.send_pipeline .send_pipeline
@ -127,6 +133,8 @@ async fn main_inner() -> Result<()> {
video_codec, video_codec,
}; };
let main_loop = glib::MainLoop::new(None, false);
let conference = connection let conference = connection
.join_conference(main_loop.context(), config) .join_conference(main_loop.context(), config)
.await?; .await?;
@ -148,7 +156,7 @@ async fn main_inner() -> Result<()> {
} }
conference conference
.on_participant(move |participant| { .on_participant(move |conference, participant| {
let recv_pipeline_participant_template = recv_pipeline_participant_template.clone(); let recv_pipeline_participant_template = recv_pipeline_participant_template.clone();
Box::pin(async move { Box::pin(async move {
info!("New participant: {:?}", participant); info!("New participant: {:?}", participant);
@ -186,18 +194,20 @@ async fn main_inner() -> Result<()> {
info!("No video sink element found in recv pipeline participant template"); info!("No video sink element found in recv pipeline participant template");
} }
Ok(Some(bin)) bin.set_property("name", format!("participant_{}", participant.muc_jid.resource))?;
conference.add_bin(&bin).await?;
} }
else { else {
info!("No template for handling new participant"); info!("No template for handling new participant");
Ok(None)
} }
Ok(())
}) })
}) })
.await; .await;
conference conference
.on_participant_left(move |participant| { .on_participant_left(move |_conference, participant| {
Box::pin(async move { Box::pin(async move {
info!("Participant left: {:?}", participant); info!("Participant left: {:?}", participant);
Ok(()) Ok(())
@ -205,6 +215,15 @@ async fn main_inner() -> Result<()> {
}) })
.await; .await;
conference
.on_colibri_message(move |_conference, message| {
Box::pin(async move {
info!("Colibri message: {:?}", message);
Ok(())
})
})
.await;
conference conference
.set_pipeline_state(gstreamer::State::Playing) .set_pipeline_state(gstreamer::State::Playing)
.await?; .await?;

View File

@ -10,7 +10,7 @@ authors = ["Jasper Hugo <jasper@avstack.io>"]
anyhow = { version = "1", default-features = false } anyhow = { version = "1", default-features = false }
glib = { version = "0.14", default-features = false } glib = { version = "0.14", default-features = false }
gstreamer = { version = "0.17", default-features = false } gstreamer = { version = "0.17", default-features = false }
lib-gst-meet = { version = "0.2", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] } lib-gst-meet = { version = "0.3", path = "../lib-gst-meet", default-features = false, features = ["tracing-subscriber"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
tracing = { version = "0.1", default-features = false } tracing = { version = "0.1", default-features = false }

View File

@ -187,13 +187,13 @@ pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Con
pub unsafe extern "C" fn gstmeet_conference_on_participant( pub unsafe extern "C" fn gstmeet_conference_on_participant(
context: *mut Context, context: *mut Context,
conference: *mut JitsiConference, conference: *mut JitsiConference,
f: unsafe extern "C" fn(Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin, f: unsafe extern "C" fn(*mut JitsiConference, Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin,
ctx: *mut c_void, ctx: *mut c_void,
) { ) {
let ctx = Arc::new(AtomicPtr::new(ctx)); let ctx = Arc::new(AtomicPtr::new(ctx));
(*context) (*context)
.runtime .runtime
.block_on((*conference).on_participant(move |participant| { .block_on((*conference).on_participant(move |conference, participant| {
let ctx = ctx.clone(); let ctx = ctx.clone();
Box::pin(async move { Box::pin(async move {
let participant = Participant { let participant = Participant {
@ -205,13 +205,8 @@ pub unsafe extern "C" fn gstmeet_conference_on_participant(
.transpose()? .transpose()?
.unwrap_or_else(ptr::null), .unwrap_or_else(ptr::null),
}; };
let maybe_bin = f(participant, ctx.load(Ordering::Relaxed)); f(Box::into_raw(Box::new(conference)), participant, ctx.load(Ordering::Relaxed));
if maybe_bin.is_null() { Ok(())
Ok(None)
}
else {
Ok(Some(from_glib_full(maybe_bin)))
}
}) })
})); }));
} }

View File

@ -1,7 +1,7 @@
[package] [package]
name = "lib-gst-meet" name = "lib-gst-meet"
description = "Connect GStreamer pipelines to Jitsi Meet conferences" description = "Connect GStreamer pipelines to Jitsi Meet conferences"
version = "0.2.1" version = "0.3.0"
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
authors = ["Jasper Hugo <jasper@avstack.io>"] authors = ["Jasper Hugo <jasper@avstack.io>"]
@ -24,6 +24,8 @@ pem = { version = "0.8", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
rcgen = { version = "0.8", default-features = false } rcgen = { version = "0.8", default-features = false }
ring = { version = "0.16", default-features = false } ring = { version = "0.16", default-features = false }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros", "sync", "time"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = { version = "0.1", default-features = false, features = ["time"] } tokio-stream = { version = "0.1", default-features = false, features = ["time"] }
tokio-tungstenite = { version = "0.14", default-features = false, features = ["connect", "rustls-tls"] } tokio-tungstenite = { version = "0.14", default-features = false, features = ["connect", "rustls-tls"] }

203
lib-gst-meet/src/colibri.rs Normal file
View File

@ -0,0 +1,203 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Result;
use futures::{
sink::SinkExt,
stream::{StreamExt, TryStreamExt},
};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_tungstenite::tungstenite::{http::Request, Message};
use tracing::{debug, error, info, warn};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "colibriClass")]
pub enum ColibriMessage {
#[serde(rename_all = "camelCase")]
DominantSpeakerEndpointChangeEvent {
dominant_speaker_endpoint: String,
previous_speakers: Vec<String>,
},
#[serde(rename_all = "camelCase")]
EndpointConnectivityStatusChangeEvent {
endpoint: String,
active: bool,
},
#[serde(rename_all = "camelCase")]
EndpointMessage {
from: String,
to: Option<String>,
msg_payload: serde_json::Value,
},
#[serde(rename_all = "camelCase")]
EndpointStats {
from: String,
bitrate: Bitrates,
packet_loss: PacketLoss,
connection_quality: f32,
#[serde(rename = "jvbRTT")]
jvb_rtt: u16,
server_region: String,
max_enabled_resolution: u16,
},
#[serde(rename_all = "camelCase")]
LastNChangedEvent {
last_n: u16,
},
#[serde(rename_all = "camelCase")]
LastNEndpointsChangeEvent {
last_n_endpoints: Vec<String>,
},
#[serde(rename_all = "camelCase")]
ReceiverVideoConstraint {
max_frame_height: u16,
},
#[serde(rename_all = "camelCase")]
ReceiverVideoConstraints {
last_n: Option<u16>,
selected_endpoints: Option<Vec<String>>,
on_stage_endpoints: Option<Vec<String>>,
default_constraints: Option<Constraints>,
constraints: HashMap<String, Constraints>,
},
#[serde(rename_all = "camelCase")]
SelectedEndpointsChangedEvent {
selected_endpoints: Vec<String>,
},
#[serde(rename_all = "camelCase")]
SenderVideoConstraints {
video_constraints: Constraints,
},
#[serde(rename_all = "camelCase")]
ServerHello {
version: Option<String>,
},
#[serde(rename_all = "camelCase")]
VideoTypeMessage {
video_type: String,
},
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Constraints {
ideal_height: Option<u16>,
max_height: Option<u16>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Bitrates {
audio: Bitrate,
video: Bitrate,
#[serde(flatten)]
total: Bitrate,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Bitrate {
upload: u32,
download: u32,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PacketLoss {
total: u32,
download: u32,
upload: u32,
}
pub(crate) struct ColibriChannel {
send_tx: mpsc::Sender<ColibriMessage>,
recv_tx: Arc<Mutex<Vec<mpsc::Sender<ColibriMessage>>>>,
}
impl ColibriChannel {
pub(crate) async fn new(colibri_url: &str) -> Result<Self> {
let request =
Request::get(colibri_url).body(())?;
let (colibri_websocket, _response) =
tokio_tungstenite::connect_async(request).await?;
info!("Connected Colibri WebSocket");
let (mut colibri_sink, mut colibri_stream) = colibri_websocket.split();
let recv_tx: Arc<Mutex<Vec<mpsc::Sender<ColibriMessage>>>> = Arc::new(Mutex::new(vec![]));
let recv_task = {
let recv_tx = recv_tx.clone();
tokio::spawn(async move {
while let Some(msg) = colibri_stream.try_next().await? {
match msg {
Message::Text(text) => {
debug!("colibri: {}", text);
match serde_json::from_str::<ColibriMessage>(&text) {
Ok(colibri_msg) => {
let mut txs = recv_tx.lock().await;
let txs_clone = txs.clone();
for (i, tx) in txs_clone.iter().enumerate().rev() {
if let Err(_) = tx.send(colibri_msg.clone()).await {
debug!("colibri subscriber closed, removing");
txs.remove(i);
}
}
},
Err(e) => warn!("failed to parse frame on colibri websocket: {:?}\nframe: {}", e, text),
}
},
Message::Binary(data) => debug!("received unexpected {} byte binary frame on colibri websocket", data.len()),
Message::Ping(_) | Message::Pong(_) => {}, // handled automatically by tungstenite
Message::Close(_) => {
debug!("received close frame on colibri websocket");
// TODO reconnect
break;
},
}
}
Ok::<_, anyhow::Error>(())
})
};
let (send_tx, send_rx) = mpsc::channel(8);
let send_task = tokio::spawn(async move {
let mut stream = ReceiverStream::new(send_rx);
while let Some(colibri_msg) = stream.next().await {
match serde_json::to_string(&colibri_msg) {
Ok(json) => {
let msg = Message::Text(json);
colibri_sink.send(msg).await?;
},
Err(e) => warn!("failed to serialise colibri message: {:?}", e),
}
}
Ok::<_, anyhow::Error>(())
});
tokio::spawn(async move {
tokio::select! {
res = recv_task => if let Ok(Err(e)) = res {
error!("colibri recv loop: {:?}", e);
},
res = send_task => if let Ok(Err(e)) = res {
error!("colibri send loop: {:?}", e);
},
};
});
Ok(Self {
send_tx,
recv_tx,
})
}
pub(crate) async fn subscribe(&self, tx: mpsc::Sender<ColibriMessage>) {
self.recv_tx.lock().await.push(tx);
}
pub(crate) async fn send(&self, msg: ColibriMessage) -> Result<()> {
self.send_tx.send(msg).await?;
Ok(())
}
}

View File

@ -3,7 +3,6 @@ use std::{collections::HashMap, convert::TryFrom, fmt, future::Future, pin::Pin,
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use glib::ObjectExt;
use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt}; use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{mpsc, oneshot, Mutex};
@ -21,7 +20,13 @@ use xmpp_parsers::{
BareJid, Element, FullJid, Jid, BareJid, Element, FullJid, Jid,
}; };
use crate::{jingle::JingleSession, source::MediaType, stanza_filter::StanzaFilter, xmpp}; use crate::{
colibri::{ColibriChannel, ColibriMessage},
jingle::JingleSession,
source::MediaType,
stanza_filter::StanzaFilter,
xmpp,
};
static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| DiscoInfoResult { static DISCO_INFO: Lazy<DiscoInfoResult> = Lazy::new(|| DiscoInfoResult {
node: None, node: None,
@ -91,17 +96,16 @@ pub struct Participant {
pub jid: FullJid, pub jid: FullJid,
pub muc_jid: FullJid, pub muc_jid: FullJid,
pub nick: Option<String>, pub nick: Option<String>,
bin: Option<gstreamer::Bin>,
} }
type BoxedBinResultFuture = Pin<Box<dyn Future<Output = Result<Option<gstreamer::Bin>>> + Send>>;
type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>; type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
pub(crate) struct JitsiConferenceInner { pub(crate) struct JitsiConferenceInner {
pub(crate) jingle_session: Option<JingleSession>, pub(crate) jingle_session: Option<JingleSession>,
participants: HashMap<String, Participant>, participants: HashMap<String, Participant>,
on_participant: Option<Arc<dyn (Fn(Participant) -> BoxedBinResultFuture) + Send + Sync>>, on_participant: Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_participant_left: Option<Arc<dyn (Fn(Participant) -> BoxedResultFuture) + Send + Sync>>, on_participant_left: Option<Arc<dyn (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync>>,
on_colibri_message: Option<Arc<dyn (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync>>,
state: JitsiConferenceState, state: JitsiConferenceState,
connected_tx: Option<oneshot::Sender<()>>, connected_tx: Option<oneshot::Sender<()>>,
connected_rx: Option<oneshot::Receiver<()>>, connected_rx: Option<oneshot::Receiver<()>>,
@ -136,6 +140,7 @@ impl JitsiConference {
participants: HashMap::new(), participants: HashMap::new(),
on_participant: None, on_participant: None,
on_participant_left: None, on_participant_left: None,
on_colibri_message: None,
jingle_session: None, jingle_session: None,
connected_tx: Some(tx), connected_tx: Some(tx),
connected_rx: Some(rx), connected_rx: Some(rx),
@ -269,8 +274,23 @@ impl JitsiConference {
) )
} }
pub async fn send_colibri_message(&self, message: ColibriMessage) -> Result<()> {
self
.inner
.lock()
.await
.jingle_session
.as_ref()
.context("not connected (no jingle session)")?
.colibri_channel
.as_ref()
.context("no colibri channel")?
.send(message)
.await
}
#[tracing::instrument(level = "trace", skip(f))] #[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant(&self, f: impl (Fn(Participant) -> BoxedBinResultFuture) + Send + Sync + 'static) { pub async fn on_participant(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) {
let f = Arc::new(f); let f = Arc::new(f);
let f2 = f.clone(); let f2 = f.clone();
let existing_participants: Vec<_> = { let existing_participants: Vec<_> = {
@ -283,37 +303,21 @@ impl JitsiConference {
"calling on_participant with existing participant: {:?}", "calling on_participant with existing participant: {:?}",
participant participant
); );
match f(participant.clone()).await { if let Err(e) = f(self.clone(), participant.clone()).await {
Ok(Some(bin)) => { warn!("on_participant failed: {:?}", e);
bin
.set_property(
"name",
format!("participant_{}", participant.muc_jid.resource),
)
.unwrap();
match self.add_bin(&bin).await {
Ok(_) => {
let mut locked_inner = self.inner.lock().await;
if let Some(p) = locked_inner
.participants
.get_mut(&participant.muc_jid.resource)
{
p.bin = Some(bin);
}
},
Err(e) => warn!("failed to add participant bin: {:?}", e),
}
},
Ok(None) => {},
Err(e) => warn!("on_participant failed: {:?}", e),
} }
} }
} }
#[tracing::instrument(level = "trace", skip(f))] #[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant_left(&self, f: impl (Fn(Participant) -> BoxedResultFuture) + Send + Sync + 'static) { pub async fn on_participant_left(&self, f: impl (Fn(JitsiConference, Participant) -> BoxedResultFuture) + Send + Sync + 'static) {
self.inner.lock().await.on_participant_left = Some(Arc::new(f)); self.inner.lock().await.on_participant_left = Some(Arc::new(f));
} }
#[tracing::instrument(level = "trace", skip(f))]
pub async fn on_colibri_message(&self, f: impl (Fn(JitsiConference, ColibriMessage) -> BoxedResultFuture) + Send + Sync + 'static) {
self.inner.lock().await.on_colibri_message = Some(Arc::new(f));
}
} }
#[async_trait] #[async_trait]
@ -463,37 +467,22 @@ impl StanzaFilter for JitsiConference {
if let Some(colibri_url) = colibri_url { if let Some(colibri_url) = colibri_url {
info!("Connecting Colibri WebSocket to {}", colibri_url); info!("Connecting Colibri WebSocket to {}", colibri_url);
let colibri_channel = ColibriChannel::new(&colibri_url).await?;
let (tx, rx) = mpsc::channel(8);
colibri_channel.subscribe(tx).await;
locked_inner.jingle_session.as_mut().unwrap().colibri_channel = Some(colibri_channel);
let request = let self_ = self.clone();
tokio_tungstenite::tungstenite::http::Request::get(colibri_url).body(())?;
let (colibri_websocket, _response) =
tokio_tungstenite::connect_async(request).await?;
info!("Connected Colibri WebSocket");
let (colibri_sink, mut colibri_stream) = colibri_websocket.split();
let colibri_receive_task = tokio::spawn(async move {
while let Some(msg) = colibri_stream.next().await {
debug!("colibri: {:?}", msg);
}
Ok::<_, anyhow::Error>(())
});
let (colibri_tx, colibri_rx) = mpsc::channel(8);
locked_inner.jingle_session.as_mut().unwrap().colibri_tx = Some(colibri_tx);
let colibri_transmit_task = tokio::spawn(async move {
let stream = ReceiverStream::new(colibri_rx);
stream.forward(colibri_sink).await?;
Ok::<_, anyhow::Error>(())
});
tokio::spawn(async move { tokio::spawn(async move {
tokio::select! { let mut stream = ReceiverStream::new(rx);
res = colibri_receive_task => if let Ok(Err(e)) = res { while let Some(msg) = stream.next().await {
error!("colibri read loop: {:?}", e); let locked_inner = self_.inner.lock().await;
}, if let Some(f) = &locked_inner.on_colibri_message {
res = colibri_transmit_task => if let Ok(Err(e)) = res { if let Err(e) = f(self_.clone(), msg).await {
error!("colibri write loop: {:?}", e); warn!("on_colibri_message failed: {:?}", e);
}, }
}; }
}
}); });
} }
@ -531,13 +520,12 @@ impl StanzaFilter for JitsiConference {
jid: jid.clone(), jid: jid.clone(),
muc_jid: from.clone(), muc_jid: from.clone(),
nick: item.nick, nick: item.nick,
bin: None,
}; };
if presence.type_ == presence::Type::Unavailable && locked_inner.participants.remove(&from.resource.clone()).is_some() { if presence.type_ == presence::Type::Unavailable && locked_inner.participants.remove(&from.resource.clone()).is_some() {
debug!("participant left: {:?}", jid); debug!("participant left: {:?}", jid);
if let Some(f) = &locked_inner.on_participant_left { if let Some(f) = &locked_inner.on_participant_left {
debug!("calling on_participant_left with old participant"); debug!("calling on_participant_left with old participant");
if let Err(e) = f(participant).await { if let Err(e) = f(self.clone(), participant).await {
warn!("on_participant_left failed: {:?}", e); warn!("on_participant_left failed: {:?}", e);
} }
} }
@ -550,20 +538,8 @@ impl StanzaFilter for JitsiConference {
debug!("new participant: {:?}", jid); debug!("new participant: {:?}", jid);
if let Some(f) = &locked_inner.on_participant { if let Some(f) = &locked_inner.on_participant {
debug!("calling on_participant with new participant"); debug!("calling on_participant with new participant");
match f(participant).await { if let Err(e) = f(self.clone(), participant).await {
Ok(Some(bin)) => { warn!("on_participant failed: {:?}", e);
bin.set_property("name", format!("participant_{}", from.resource))?;
match self.add_bin(&bin).await {
Ok(_) => {
if let Some(p) = locked_inner.participants.get_mut(&from.resource) {
p.bin = Some(bin);
}
},
Err(e) => warn!("failed to add participant bin: {:?}", e),
}
},
Ok(None) => {},
Err(e) => warn!("on_participant failed: {:?}", e),
} }
} }
} }

View File

@ -12,7 +12,7 @@ use ring::digest::{digest, SHA256};
use tokio::{ use tokio::{
net::lookup_host, net::lookup_host,
runtime::Handle, runtime::Handle,
sync::{mpsc, oneshot}, sync::oneshot,
}; };
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use uuid::Uuid; use uuid::Uuid;
@ -28,6 +28,7 @@ use xmpp_parsers::{
}; };
use crate::{ use crate::{
colibri::ColibriChannel,
conference::JitsiConference, conference::JitsiConference,
source::{MediaType, Source}, source::{MediaType, Source},
util::generate_id, util::generate_id,
@ -46,11 +47,7 @@ pub(crate) struct JingleSession {
ice_component_id: u32, ice_component_id: u32,
pub(crate) accept_iq_id: Option<String>, pub(crate) accept_iq_id: Option<String>,
pub(crate) colibri_url: Option<String>, pub(crate) colibri_url: Option<String>,
pub(crate) colibri_tx: Option< pub(crate) colibri_channel: Option<ColibriChannel>,
mpsc::Sender<
Result<tokio_tungstenite::tungstenite::Message, tokio_tungstenite::tungstenite::Error>,
>,
>,
pipeline_state_null_rx: oneshot::Receiver<()>, pipeline_state_null_rx: oneshot::Receiver<()>,
} }
@ -801,7 +798,7 @@ impl JingleSession {
ice_component_id, ice_component_id,
accept_iq_id: Some(accept_iq_id), accept_iq_id: Some(accept_iq_id),
colibri_url, colibri_url,
colibri_tx: None, colibri_channel: None,
pipeline_state_null_rx, pipeline_state_null_rx,
}) })
} }

View File

@ -1,16 +1,18 @@
pub mod conference; mod colibri;
pub mod connection; mod conference;
mod connection;
mod jingle; mod jingle;
mod pinger; mod pinger;
pub mod source; mod source;
mod stanza_filter; mod stanza_filter;
mod util; mod util;
mod xmpp; mod xmpp;
pub use crate::{ pub use crate::{
colibri::ColibriMessage,
conference::{JitsiConference, JitsiConferenceConfig, Participant}, conference::{JitsiConference, JitsiConferenceConfig, Participant},
connection::JitsiConnection, connection::JitsiConnection,
source::{MediaType, Source}, source::MediaType,
}; };
#[cfg(feature = "tracing-subscriber")] #[cfg(feature = "tracing-subscriber")]

View File

@ -1,8 +1,8 @@
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Source { pub(crate) struct Source {
pub(crate) ssrc: u32, pub(crate) ssrc: u32,
pub participant_id: String, pub(crate) participant_id: String,
pub media_type: MediaType, pub(crate) media_type: MediaType,
} }
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]

View File

@ -1,17 +1,13 @@
with import <nixpkgs> {}; with import <nixpkgs> {};
let
gst-plugins-base = gst_all_1.gst-plugins-base.override {
enableCocoa = stdenv.isDarwin;
};
in
mkShell { mkShell {
name = "gst-meet"; name = "gst-meet";
buildInputs = [ buildInputs = [
cargo
pkg-config pkg-config
glib glib
glib-networking glib-networking
gst_all_1.gstreamer gst_all_1.gstreamer
gst-plugins-base gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad gst_all_1.gst-plugins-bad
libnice libnice