provide a ctx argument for the C API callbacks

This commit is contained in:
Jasper Hugo 2021-08-17 08:55:17 +07:00
parent 3487596b92
commit 2fd02c7e68
4 changed files with 38 additions and 43 deletions

View File

@ -67,6 +67,7 @@ GstElement *gstmeet_conference_video_sink_element(struct Context *context,
void gstmeet_conference_on_participant(struct Context *context, void gstmeet_conference_on_participant(struct Context *context,
JitsiConference *conference, JitsiConference *conference,
GstBin *(*f)(struct Participant)); GstBin *(*f)(struct Participant, void *),
void *callback_context);
#endif /* gstmeet_h */ #endif /* gstmeet_h */

View File

@ -1,7 +1,8 @@
use std::{ use std::{
ffi::{CStr, CString}, ffi::{CStr, CString, c_void},
os::raw::c_char, os::raw::c_char,
ptr, ptr,
sync::{Arc, atomic::{AtomicPtr, Ordering}},
}; };
use anyhow::Result; use anyhow::Result;
@ -186,11 +187,15 @@ pub unsafe extern "C" fn gstmeet_conference_video_sink_element(context: *mut Con
pub unsafe extern "C" fn gstmeet_conference_on_participant( pub unsafe extern "C" fn gstmeet_conference_on_participant(
context: *mut Context, context: *mut Context,
conference: *mut JitsiConference, conference: *mut JitsiConference,
f: unsafe extern "C" fn(Participant) -> *mut gstreamer::ffi::GstBin, f: unsafe extern "C" fn(Participant, *mut c_void) -> *mut gstreamer::ffi::GstBin,
ctx: *mut c_void,
) { ) {
let ctx = Arc::new(AtomicPtr::new(ctx));
(*context) (*context)
.runtime .runtime
.block_on((*conference).on_participant(move |participant| Box::pin(async move { .block_on((*conference).on_participant(move |participant| {
let ctx = ctx.clone();
Box::pin(async move {
let participant = Participant { let participant = Participant {
jid: CString::new(participant.jid.to_string())?.into_raw() as *const _, jid: CString::new(participant.jid.to_string())?.into_raw() as *const _,
muc_jid: CString::new(participant.muc_jid.to_string())?.into_raw() as *const _, muc_jid: CString::new(participant.muc_jid.to_string())?.into_raw() as *const _,
@ -200,12 +205,13 @@ pub unsafe extern "C" fn gstmeet_conference_on_participant(
.transpose()? .transpose()?
.unwrap_or_else(ptr::null), .unwrap_or_else(ptr::null),
}; };
let maybe_bin = f(participant); let maybe_bin = f(participant, ctx.load(Ordering::Relaxed));
if maybe_bin.is_null() { if maybe_bin.is_null() {
Ok(None) Ok(None)
} }
else { else {
Ok(Some(from_glib_full(maybe_bin))) Ok(Some(from_glib_full(maybe_bin)))
} }
}))); })
}));
} }

View File

@ -94,17 +94,14 @@ pub struct Participant {
bin: Option<gstreamer::Bin>, bin: Option<gstreamer::Bin>,
} }
type BoxedBinResultFuture = Pin<Box<dyn Future<Output = Result<Option<gstreamer::Bin>>> + Send>>;
type BoxedResultFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
pub(crate) struct JitsiConferenceInner { pub(crate) struct JitsiConferenceInner {
pub(crate) jingle_session: Option<JingleSession>, pub(crate) jingle_session: Option<JingleSession>,
participants: HashMap<String, Participant>, participants: HashMap<String, Participant>,
on_participant: Option< on_participant: Option<Arc<dyn (Fn(Participant) -> BoxedBinResultFuture) + Send + Sync>>,
Arc< on_participant_left: Option<Arc<dyn (Fn(Participant) -> BoxedResultFuture) + Send + Sync>>,
dyn (Fn(Participant) -> Pin<Box<dyn Future<Output = Result<Option<gstreamer::Bin>>> + Send>>)
+ Send
+ Sync,
>,
>,
on_participant_left: Option<Arc<dyn (Fn(Participant) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>) + Send + Sync>>,
state: JitsiConferenceState, state: JitsiConferenceState,
connected_tx: Option<oneshot::Sender<()>>, connected_tx: Option<oneshot::Sender<()>>,
connected_rx: Option<oneshot::Receiver<()>>, connected_rx: Option<oneshot::Receiver<()>>,
@ -273,13 +270,7 @@ impl JitsiConference {
} }
#[tracing::instrument(level = "trace", skip(f))] #[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant( pub async fn on_participant(&self, f: impl (Fn(Participant) -> BoxedBinResultFuture) + Send + Sync + 'static) {
&self,
f: impl (Fn(Participant) -> Pin<Box<dyn Future<Output = Result<Option<gstreamer::Bin>>> + Send>>)
+ Send
+ Sync
+ 'static,
) {
let f = Arc::new(f); let f = Arc::new(f);
let f2 = f.clone(); let f2 = f.clone();
let existing_participants: Vec<_> = { let existing_participants: Vec<_> = {
@ -320,13 +311,7 @@ impl JitsiConference {
} }
#[tracing::instrument(level = "trace", skip(f))] #[tracing::instrument(level = "trace", skip(f))]
pub async fn on_participant_left( pub async fn on_participant_left(&self, f: impl (Fn(Participant) -> BoxedResultFuture) + Send + Sync + 'static) {
&self,
f: impl (Fn(Participant) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>)
+ Send
+ Sync
+ 'static,
) {
self.inner.lock().await.on_participant_left = Some(Arc::new(f)); self.inner.lock().await.on_participant_left = Some(Arc::new(f));
} }
} }

View File

@ -4,6 +4,7 @@ mkShell {
buildInputs = [ buildInputs = [
pkg-config pkg-config
glib glib
glib-networking
gst_all_1.gstreamer gst_all_1.gstreamer
gst_all_1.gst-plugins-base gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good gst_all_1.gst-plugins-good
@ -13,4 +14,6 @@ mkShell {
darwin.apple_sdk.frameworks.AppKit darwin.apple_sdk.frameworks.AppKit
darwin.apple_sdk.frameworks.Security darwin.apple_sdk.frameworks.Security
] else []); ] else []);
GIO_EXTRA_MODULES = ["${pkgs.glib-networking.out}/lib/gio/modules"];
} }