From 7b70e802bb1b8b6d175189f1dd5ce1100cb6e903 Mon Sep 17 00:00:00 2001 From: xenia Date: Sun, 8 May 2022 04:26:35 -0400 Subject: [PATCH] implement bosh transport for old jitsi versions --- Cargo.lock | 239 ++++++++++++++++++++++- gst-meet/src/main.rs | 112 ++++++----- lib-gst-meet-c/src/lib.rs | 2 +- lib-gst-meet/Cargo.toml | 1 + lib-gst-meet/src/conference.rs | 4 + lib-gst-meet/src/xmpp/connection.rs | 284 +++++++++++++++++++++++++++- 6 files changed, 595 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d98158b..a7383cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "fastrand" version = "1.7.0" @@ -611,6 +620,31 @@ dependencies = [ "system-deps", ] +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + [[package]] name = "heck" version = "0.3.3" @@ -652,12 +686,79 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +dependencies = [ + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -675,6 +776,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.12" @@ -769,6 +880,7 @@ dependencies = [ "pem", "rand", "rcgen", + "reqwest", "ring", "rtcp", "rustls", @@ -857,6 +969,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "minidom" version = "0.14.0" @@ -1292,6 +1410,47 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile 0.3.0", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tokio-rustls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -1337,7 +1496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 0.2.1", "schannel", "security-framework", ] @@ -1351,6 +1510,15 @@ dependencies = [ "base64", ] +[[package]] +name = "rustls-pemfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +dependencies = [ + "base64", +] + [[package]] name = "rustversion" version = "1.0.6" @@ -1443,6 +1611,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "1.12.0" @@ -1769,6 +1949,20 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "tokio-util" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "toml" version = "0.5.8" @@ -1778,6 +1972,12 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + [[package]] name = "tracing" version = "0.1.31" @@ -1836,6 +2036,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "tungstenite" version = "0.17.2" @@ -1954,6 +2160,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" @@ -1985,6 +2201,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb6ec270a31b1d3c7e266b999739109abce8b6c87e4b31fcfcd788b65267395" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.79" @@ -2130,6 +2358,15 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "xmpp-parsers" version = "0.19.0" diff --git a/gst-meet/src/main.rs b/gst-meet/src/main.rs index 85f29b1..eeec2bb 100644 --- a/gst-meet/src/main.rs +++ b/gst-meet/src/main.rs @@ -23,8 +23,11 @@ use tracing::{error, info, trace, warn}; about = "Connect a GStreamer pipeline to a Jitsi Meet conference." )] struct Opt { - #[structopt(long)] - web_socket_url: String, + #[structopt(long, required_unless="bosh-url")] + web_socket_url: Option, + + #[structopt(long, required_unless="web-socket-url")] + bosh_url: Option, #[structopt( long, @@ -221,7 +224,13 @@ async fn main_inner() -> Result<()> { .transpose() .context("failed to parse recv pipeline")?; - let web_socket_url: Uri = opt.web_socket_url.parse()?; + let (is_ws, url_to_use) = if opt.web_socket_url.is_some() { + (true, opt.web_socket_url.unwrap()) + } else { + (false, opt.bosh_url.unwrap()) + }; + + let web_socket_url: Uri = url_to_use.parse()?; let xmpp_domain = opt .xmpp_domain @@ -229,25 +238,38 @@ async fn main_inner() -> Result<()> { .or_else(|| web_socket_url.host()) .context("invalid WebSocket URL")?; - let (connection, background) = Connection::new( - &opt.web_socket_url, - xmpp_domain, - match opt.xmpp_username { - Some(username) => Authentication::Plain { - username, - password: opt.xmpp_password.context("if xmpp-username is provided, xmpp-password must also be provided")?, - }, - None => Authentication::Anonymous, - }, - #[cfg(feature = "tls-insecure")] - opt.tls_insecure, - #[cfg(not(feature = "tls-insecure"))] - false, - ) - .await - .context("failed to build connection")?; - - tokio::spawn(background); + let connection = if is_ws { + let (connection, background) = + Connection::new_websocket( + &url_to_use, + xmpp_domain, + match opt.xmpp_username { + Some(username) => Authentication::Plain { + username, + password: opt.xmpp_password.context("if xmpp-username is provided, xmpp-password must also be provided")?, + }, + None => Authentication::Anonymous, + }, + #[cfg(feature = "tls-insecure")] + opt.tls_insecure, + #[cfg(not(feature = "tls-insecure"))] + false, + ) + .await + .context("failed to build connection")?; + tokio::spawn(background); + connection + } else { + let (connection, background) = + Connection::new_bosh( + &url_to_use, + xmpp_domain, + Authentication::Anonymous, + false + ).await.context("failed to build BOSH connection")?; + tokio::spawn(background); + connection + }; connection.connect().await?; @@ -311,31 +333,33 @@ async fn main_inner() -> Result<()> { .set_send_resolution(send_video_height.into()) .await; - conference - .send_colibri_message(ColibriMessage::ReceiverVideoConstraints { - last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)), - selected_endpoints: opt - .select_endpoints - .map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()), - on_stage_endpoints: None, - default_constraints: Some(Constraints { - max_height: Some(opt.recv_video_scale_height.into()), - ideal_height: None, - }), - constraints: None, - }) - .await?; - - if let Some(video_type) = opt.video_type { + if is_ws { conference - .send_colibri_message(ColibriMessage::VideoTypeMessage { - video_type: match video_type.as_str() { - "camera" => VideoType::Camera, - "desktop" => VideoType::Desktop, - other => bail!(format!("invalid video type: {}", other)), - }, + .send_colibri_message(ColibriMessage::ReceiverVideoConstraints { + last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)), + selected_endpoints: opt + .select_endpoints + .map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()), + on_stage_endpoints: None, + default_constraints: Some(Constraints { + max_height: Some(opt.recv_video_scale_height.into()), + ideal_height: None, + }), + constraints: None, }) .await?; + + if let Some(video_type) = opt.video_type { + conference + .send_colibri_message(ColibriMessage::VideoTypeMessage { + video_type: match video_type.as_str() { + "camera" => VideoType::Camera, + "desktop" => VideoType::Desktop, + other => bail!(format!("invalid video type: {}", other)), + }, + }) + .await?; + } } if let Some(bin) = send_pipeline { diff --git a/lib-gst-meet-c/src/lib.rs b/lib-gst-meet-c/src/lib.rs index 9d4be2d..a66ca07 100644 --- a/lib-gst-meet-c/src/lib.rs +++ b/lib-gst-meet-c/src/lib.rs @@ -86,7 +86,7 @@ pub unsafe extern "C" fn gstmeet_connection_new( let xmpp_domain = CStr::from_ptr(xmpp_domain); (*context) .runtime - .block_on(Connection::new( + .block_on(Connection::new_websocket( &websocket_url.to_string_lossy(), &xmpp_domain.to_string_lossy(), Authentication::Anonymous, diff --git a/lib-gst-meet/Cargo.toml b/lib-gst-meet/Cargo.toml index fd2ad8a..a767d7e 100644 --- a/lib-gst-meet/Cargo.toml +++ b/lib-gst-meet/Cargo.toml @@ -31,6 +31,7 @@ once_cell = { version = "1", default-features = false, features = ["std"] } pem = { version = "1", default-features = false } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } rcgen = { version = "0.9", default-features = false } +reqwest = { version = "0.11", features = ["rustls-tls"] } ring = { version = "0.16", default-features = false } rtcp = { version = "0.6", default-features = false, optional = true } rustls = { version = "0.20", default-features = false, features = ["logging", "tls12"], optional = true } diff --git a/lib-gst-meet/src/conference.rs b/lib-gst-meet/src/conference.rs index 66bb61f..292d8c2 100644 --- a/lib-gst-meet/src/conference.rs +++ b/lib-gst-meet/src/conference.rs @@ -517,6 +517,7 @@ impl StanzaFilter for JitsiConference { async fn take(&self, element: xmpp_parsers::Element) -> Result<()> { use JitsiConferenceState::*; let state = self.inner.lock().await.state; + info!("conference state: {:?}", state); match state { Discovering => { let iq = Iq::try_from(element)?; @@ -539,12 +540,15 @@ impl StanzaFilter for JitsiConference { }, JoiningMuc => { let presence = Presence::try_from(element)?; + info!("joiningmuc presence: {:?}", presence); if let Some(payload) = presence .payloads .iter() .find(|payload| payload.is("x", ns::MUC_USER)) { + info!("joiningmuc payload: {:?}", payload); let muc_user = MucUser::try_from(payload.clone())?; + info!("joining muc user: {:?} status {:?}", muc_user, muc_user.status); if muc_user.status.contains(&MucStatus::SelfPresence) { debug!("Joined MUC: {}", self.config.muc); self.inner.lock().await.state = Idle; diff --git a/lib-gst-meet/src/xmpp/connection.rs b/lib-gst-meet/src/xmpp/connection.rs index a0c5a2a..be25554 100644 --- a/lib-gst-meet/src/xmpp/connection.rs +++ b/lib-gst-meet/src/xmpp/connection.rs @@ -71,7 +71,7 @@ pub enum Authentication { } impl Connection { - pub async fn new( + pub async fn new_websocket( websocket_url: &str, xmpp_domain: &str, authentication: Authentication, @@ -143,6 +143,288 @@ impl Connection { Ok((connection, background)) } + pub async fn new_bosh( + bind_url: &str, + xmpp_domain: &str, + authentication: Authentication, + tls_insecure: bool, + ) -> Result<(Self, impl Future)> { + let bosh_url: reqwest::Url = reqwest::Url::parse(bind_url).context("invalid BOSH url")?; + let xmpp_domain_jid: BareJid = xmpp_domain.parse().context("invalid XMPP domain")?; + let xmpp_domain_str: String = xmpp_domain.into(); + + info!("Connecting XMPP BOSH to {}", bind_url); + + let inner = Arc::new(Mutex::new(ConnectionInner { + state: ConnectionState::OpeningPreAuthentication, + jid: None, + xmpp_domain: xmpp_domain_jid, + authentication, + external_services: vec![], + connected_tx: None, + stanza_filters: vec![], + })); + + let bg_inner = inner.clone(); + + let (tx, rx) = mpsc::channel(64); + + let bg_tx = tx.clone(); + + let background = async move { + let inner = bg_inner; + let tx = bg_tx; + + let tls: rustls::ClientConfig = { + let mut roots = rustls::RootCertStore::empty(); + for cert in + rustls_native_certs::load_native_certs().context("failed to load native root certs")? + { + roots.add(&rustls::Certificate(cert.0)) + .context("failed to add native root certs")? + } + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + config.alpn_protocols.push(b"http/1.1".to_vec()); + config + }; + let client = reqwest::Client::builder() + .user_agent("Mozilla/5.0 (X11; Linux aarch64; rv:99.0) Gecko/20100101 Firefox/99.0") + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("content-type", reqwest::header::HeaderValue::from_static("text/xml; charset=utf-8")); + headers + }) + .use_preconfigured_tls(tls).build()?; + + let mut rid: u64 = 13371337; + + let mut rx = ReceiverStream::new(rx); + + if let Some(x) = rx.next().await { + info!("BOSH eating open tag: {:?}", x); + { + let mut locked_inner = inner.lock().await; + locked_inner.state = ConnectionState::ReceivingFeaturesPreAuthentication; + } + } else { + panic!("failed to get open tag"); + } + + let (sid, stream_features): (String, Element) = { + let first_req = format!("", rid, xmpp_domain_str); + info!("making unauth open"); + + let resp = client.post(bosh_url.clone()).body(first_req) .send() + .await.context("failed to post initial BOSH message")?; + + rid += 1; + + let (status, xml_text) = (resp.status(), resp.text().await?); + if status != reqwest::StatusCode::OK { + panic!("status is not 200"); + } + let el: Element = xml_text.parse()?; + + let sid: String = el.attr("sid").unwrap().to_string(); + let stream_features: Element = el.children().next().unwrap().clone(); + info!("sid: {}", sid); + + { + let mut locked_inner = inner.lock().await; + locked_inner.state = ConnectionState::Authenticating; + } + + (sid, stream_features) + }; + + { + let auth_req = format!("", rid, sid); + + let resp = client.post(bosh_url.clone()).body(auth_req).send() + .await.context("failed to post BOSH auth")?; + + rid += 1; + + let (status, xml_text) = (resp.status(), resp.text().await?); + if status != reqwest::StatusCode::OK { + panic!("status is not 200"); + } + + let el: Element = { + let el: Element = xml_text.parse()?; + el.children().next().unwrap().clone() + }; + info!("auth response: {:?}", el); + + { + let mut locked_inner = inner.lock().await; + locked_inner.state = ConnectionState::ReceivingFeaturesPostAuthentication; + } + } + + { + let reopen_req = format!("", rid, sid, xmpp_domain_str); + + let resp = client.post(bosh_url.clone()).body(reopen_req).send() + .await.context("failed to post BOSH auth")?; + + rid += 1; + + let (status, xml_text) = (resp.status(), resp.text().await?); + if status != reqwest::StatusCode::OK { + panic!("status is not 200"); + } + + let el: Element = { + let el: Element = xml_text.parse()?; + el.children().next().unwrap().clone() + }; + info!("authenticated features response: {:?}", el); + + { + let mut locked_inner = inner.lock().await; + locked_inner.state = ConnectionState::Binding; + } + + let iq = Iq::from_set(generate_id(), BindQuery::new(None)); + tx.send(iq.into()).await?; + } + + loop { + let next: Option = tokio::select! { + res = rx.next() => if let Some(res) = res { Some(res) } else { break }, + _ = tokio::time::sleep(std::time::Duration::from_millis(500)) => None, + }; + + let send_str: String = if let Some(element) = next { + let mut bytes = Vec::new(); + element.write_to(&mut bytes)?; + let xml = String::from_utf8(bytes)?; + debug!("XMPPBOSH>>> {}", xml); + format!("{}", rid, sid, xml) + } else { + debug!("XMPPBOSH>>> no content"); + format!("", rid, sid) + }; + + let resp = client.post(bosh_url.clone()).body(send_str).send() + .await.context("failed to post BOSH req")?; + + rid += 1; + + let (status, xml_text) = (resp.status(), resp.text().await?); + + if status != reqwest::StatusCode::OK { + panic!("status is not 200"); + } + + let el: Element = xml_text.parse()?; + info!("element response: {:?}", el); + let mut children = el.children(); + + while let Some(element_ref) = children.next() { + let element = element_ref.clone(); + + let mut locked_inner = inner.lock().await; + use ConnectionState::*; + match locked_inner.state { + Binding => match Iq::try_from(element) { + Ok(iq) => { + info!("doing binding state"); + let jid = if let IqType::Result(Some(element)) = iq.payload { + let bind = BindResponse::try_from(element)?; + FullJid::try_from(bind)? + } + else { + bail!("bind failed"); + }; + info!("My JID: {}", jid); + locked_inner.jid = Some(jid.clone()); + + locked_inner + .stanza_filters + .push(Box::new(Pinger::new(jid.clone(), tx.clone()))); + + let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None }) + .with_from(Jid::Full(jid.clone())) + .with_to(Jid::Bare(locked_inner.xmpp_domain.clone())); + tx.send(iq.into()).await?; + locked_inner.state = Discovering; + }, + Err(e) => debug!( + "received unexpected element while waiting for bind response: {}", + e + ), + }, + Discovering => { + info!("doing discovering state"); + let iq = Iq::try_from(element)?; + if let IqType::Result(Some(element)) = iq.payload { + let _disco_info = DiscoInfoResult::try_from(element)?; + } + else { + bail!("disco failed"); + } + + let iq = Iq::from_get(generate_id(), xmpp::extdisco::ServicesQuery {}) + .with_from(Jid::Full( + locked_inner.jid.as_ref().context("missing jid")?.clone(), + )) + .with_to(Jid::Bare(locked_inner.xmpp_domain.clone())); + tx.send(iq.into()).await?; + locked_inner.state = DiscoveringExternalServices; + }, + DiscoveringExternalServices => { + info!("doing discovering ext services state"); + let iq = Iq::try_from(element)?; + if let IqType::Result(Some(element)) = iq.payload { + let services = xmpp::extdisco::ServicesResult::try_from(element)?; + debug!("external services: {:?}", services.services); + locked_inner.external_services = services.services; + } + else { + warn!("discovering external services failed: STUN/TURN will not work"); + } + + if let Some(tx) = locked_inner.connected_tx.take() { + tx.send(Ok(())).map_err(|_| anyhow!("channel closed"))?; + } + locked_inner.state = Idle; + }, + Idle => { + info!("doing idle state"); + for filter in &locked_inner.stanza_filters { + if filter.filter(&element) { + filter.take(element).await?; + break; + } + } + }, + _ => panic!("shouldn't be in this state: {:?}", locked_inner.state), + } + } + } + + Ok(()) + }; + + let bg_wrap = async move { + let x: Result<()> = background.await; + x.unwrap() + }; + + let connection = Self { + tx: tx.clone(), + inner: inner.clone(), + tls_insecure, + }; + + Ok((connection, bg_wrap)) + } + pub async fn add_stanza_filter(&self, stanza_filter: impl StanzaFilter + Send + Sync + 'static) { let mut locked_inner = self.inner.lock().await; locked_inner.stanza_filters.push(Box::new(stanza_filter));