implement bosh transport for old jitsi versions

This commit is contained in:
xenia 2022-05-08 04:26:35 -04:00
parent 173ace76a6
commit 7b70e802bb
6 changed files with 595 additions and 47 deletions

239
Cargo.lock generated
View File

@ -296,6 +296,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]
[[package]]
name = "fastrand"
version = "1.7.0"
@ -611,6 +620,31 @@ dependencies = [
"system-deps",
]
[[package]]
name = "h2"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
@ -652,12 +686,79 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4"
[[package]]
name = "httpdate"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
dependencies = [
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@ -675,6 +776,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -769,6 +880,7 @@ dependencies = [
"pem",
"rand",
"rcgen",
"reqwest",
"ring",
"rtcp",
"rustls",
@ -857,6 +969,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minidom"
version = "0.14.0"
@ -1292,6 +1410,47 @@ dependencies = [
"winapi",
]
[[package]]
name = "reqwest"
version = "0.11.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"native-tls",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile 0.3.0",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -1337,7 +1496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pemfile 0.2.1",
"schannel",
"security-framework",
]
@ -1351,6 +1510,15 @@ dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
dependencies = [
"base64",
]
[[package]]
name = "rustversion"
version = "1.0.6"
@ -1443,6 +1611,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_with"
version = "1.12.0"
@ -1769,6 +1949,20 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "tokio-util"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "toml"
version = "0.5.8"
@ -1778,6 +1972,12 @@ dependencies = [
"serde",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.31"
@ -1836,6 +2036,12 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.17.2"
@ -1954,6 +2160,16 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
@ -1985,6 +2201,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb6ec270a31b1d3c7e266b999739109abce8b6c87e4b31fcfcd788b65267395"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.79"
@ -2130,6 +2358,15 @@ version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "xmpp-parsers"
version = "0.19.0"

View File

@ -23,8 +23,11 @@ use tracing::{error, info, trace, warn};
about = "Connect a GStreamer pipeline to a Jitsi Meet conference."
)]
struct Opt {
#[structopt(long)]
web_socket_url: String,
#[structopt(long, required_unless="bosh-url")]
web_socket_url: Option<String>,
#[structopt(long, required_unless="web-socket-url")]
bosh_url: Option<String>,
#[structopt(
long,
@ -221,7 +224,13 @@ async fn main_inner() -> Result<()> {
.transpose()
.context("failed to parse recv pipeline")?;
let web_socket_url: Uri = opt.web_socket_url.parse()?;
let (is_ws, url_to_use) = if opt.web_socket_url.is_some() {
(true, opt.web_socket_url.unwrap())
} else {
(false, opt.bosh_url.unwrap())
};
let web_socket_url: Uri = url_to_use.parse()?;
let xmpp_domain = opt
.xmpp_domain
@ -229,25 +238,38 @@ async fn main_inner() -> Result<()> {
.or_else(|| web_socket_url.host())
.context("invalid WebSocket URL")?;
let (connection, background) = Connection::new(
&opt.web_socket_url,
xmpp_domain,
match opt.xmpp_username {
Some(username) => Authentication::Plain {
username,
password: opt.xmpp_password.context("if xmpp-username is provided, xmpp-password must also be provided")?,
},
None => Authentication::Anonymous,
},
#[cfg(feature = "tls-insecure")]
opt.tls_insecure,
#[cfg(not(feature = "tls-insecure"))]
false,
)
.await
.context("failed to build connection")?;
tokio::spawn(background);
let connection = if is_ws {
let (connection, background) =
Connection::new_websocket(
&url_to_use,
xmpp_domain,
match opt.xmpp_username {
Some(username) => Authentication::Plain {
username,
password: opt.xmpp_password.context("if xmpp-username is provided, xmpp-password must also be provided")?,
},
None => Authentication::Anonymous,
},
#[cfg(feature = "tls-insecure")]
opt.tls_insecure,
#[cfg(not(feature = "tls-insecure"))]
false,
)
.await
.context("failed to build connection")?;
tokio::spawn(background);
connection
} else {
let (connection, background) =
Connection::new_bosh(
&url_to_use,
xmpp_domain,
Authentication::Anonymous,
false
).await.context("failed to build BOSH connection")?;
tokio::spawn(background);
connection
};
connection.connect().await?;
@ -311,31 +333,33 @@ async fn main_inner() -> Result<()> {
.set_send_resolution(send_video_height.into())
.await;
conference
.send_colibri_message(ColibriMessage::ReceiverVideoConstraints {
last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)),
selected_endpoints: opt
.select_endpoints
.map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()),
on_stage_endpoints: None,
default_constraints: Some(Constraints {
max_height: Some(opt.recv_video_scale_height.into()),
ideal_height: None,
}),
constraints: None,
})
.await?;
if let Some(video_type) = opt.video_type {
if is_ws {
conference
.send_colibri_message(ColibriMessage::VideoTypeMessage {
video_type: match video_type.as_str() {
"camera" => VideoType::Camera,
"desktop" => VideoType::Desktop,
other => bail!(format!("invalid video type: {}", other)),
},
.send_colibri_message(ColibriMessage::ReceiverVideoConstraints {
last_n: Some(opt.last_n.map(i32::from).unwrap_or(-1)),
selected_endpoints: opt
.select_endpoints
.map(|endpoints| endpoints.split(',').map(ToOwned::to_owned).collect()),
on_stage_endpoints: None,
default_constraints: Some(Constraints {
max_height: Some(opt.recv_video_scale_height.into()),
ideal_height: None,
}),
constraints: None,
})
.await?;
if let Some(video_type) = opt.video_type {
conference
.send_colibri_message(ColibriMessage::VideoTypeMessage {
video_type: match video_type.as_str() {
"camera" => VideoType::Camera,
"desktop" => VideoType::Desktop,
other => bail!(format!("invalid video type: {}", other)),
},
})
.await?;
}
}
if let Some(bin) = send_pipeline {

View File

@ -86,7 +86,7 @@ pub unsafe extern "C" fn gstmeet_connection_new(
let xmpp_domain = CStr::from_ptr(xmpp_domain);
(*context)
.runtime
.block_on(Connection::new(
.block_on(Connection::new_websocket(
&websocket_url.to_string_lossy(),
&xmpp_domain.to_string_lossy(),
Authentication::Anonymous,

View File

@ -31,6 +31,7 @@ once_cell = { version = "1", default-features = false, features = ["std"] }
pem = { version = "1", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
rcgen = { version = "0.9", default-features = false }
reqwest = { version = "0.11", features = ["rustls-tls"] }
ring = { version = "0.16", default-features = false }
rtcp = { version = "0.6", default-features = false, optional = true }
rustls = { version = "0.20", default-features = false, features = ["logging", "tls12"], optional = true }

View File

@ -517,6 +517,7 @@ impl StanzaFilter for JitsiConference {
async fn take(&self, element: xmpp_parsers::Element) -> Result<()> {
use JitsiConferenceState::*;
let state = self.inner.lock().await.state;
info!("conference state: {:?}", state);
match state {
Discovering => {
let iq = Iq::try_from(element)?;
@ -539,12 +540,15 @@ impl StanzaFilter for JitsiConference {
},
JoiningMuc => {
let presence = Presence::try_from(element)?;
info!("joiningmuc presence: {:?}", presence);
if let Some(payload) = presence
.payloads
.iter()
.find(|payload| payload.is("x", ns::MUC_USER))
{
info!("joiningmuc payload: {:?}", payload);
let muc_user = MucUser::try_from(payload.clone())?;
info!("joining muc user: {:?} status {:?}", muc_user, muc_user.status);
if muc_user.status.contains(&MucStatus::SelfPresence) {
debug!("Joined MUC: {}", self.config.muc);
self.inner.lock().await.state = Idle;

View File

@ -71,7 +71,7 @@ pub enum Authentication {
}
impl Connection {
pub async fn new(
pub async fn new_websocket(
websocket_url: &str,
xmpp_domain: &str,
authentication: Authentication,
@ -143,6 +143,288 @@ impl Connection {
Ok((connection, background))
}
pub async fn new_bosh(
bind_url: &str,
xmpp_domain: &str,
authentication: Authentication,
tls_insecure: bool,
) -> Result<(Self, impl Future<Output = ()>)> {
let bosh_url: reqwest::Url = reqwest::Url::parse(bind_url).context("invalid BOSH url")?;
let xmpp_domain_jid: BareJid = xmpp_domain.parse().context("invalid XMPP domain")?;
let xmpp_domain_str: String = xmpp_domain.into();
info!("Connecting XMPP BOSH to {}", bind_url);
let inner = Arc::new(Mutex::new(ConnectionInner {
state: ConnectionState::OpeningPreAuthentication,
jid: None,
xmpp_domain: xmpp_domain_jid,
authentication,
external_services: vec![],
connected_tx: None,
stanza_filters: vec![],
}));
let bg_inner = inner.clone();
let (tx, rx) = mpsc::channel(64);
let bg_tx = tx.clone();
let background = async move {
let inner = bg_inner;
let tx = bg_tx;
let tls: rustls::ClientConfig = {
let mut roots = rustls::RootCertStore::empty();
for cert in
rustls_native_certs::load_native_certs().context("failed to load native root certs")?
{
roots.add(&rustls::Certificate(cert.0))
.context("failed to add native root certs")?
}
let mut config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
config.alpn_protocols.push(b"http/1.1".to_vec());
config
};
let client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (X11; Linux aarch64; rv:99.0) Gecko/20100101 Firefox/99.0")
.default_headers({
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("content-type", reqwest::header::HeaderValue::from_static("text/xml; charset=utf-8"));
headers
})
.use_preconfigured_tls(tls).build()?;
let mut rid: u64 = 13371337;
let mut rx = ReceiverStream::new(rx);
if let Some(x) = rx.next().await {
info!("BOSH eating open tag: {:?}", x);
{
let mut locked_inner = inner.lock().await;
locked_inner.state = ConnectionState::ReceivingFeaturesPreAuthentication;
}
} else {
panic!("failed to get open tag");
}
let (sid, stream_features): (String, Element) = {
let first_req = format!("<body content=\"text/xml; charset=utf-8\" hold=\"1\" rid=\"{}\" to=\"{}\" ver=\"1.6\" wait=\"2\" xml:lang=\"en\" xmlns=\"http://jabber.org/protocol/httpbind\" xmlns:xmpp=\"urn:xmpp:xbosh\" xmpp:version=\"1.0\"/>", rid, xmpp_domain_str);
info!("making unauth open");
let resp = client.post(bosh_url.clone()).body(first_req) .send()
.await.context("failed to post initial BOSH message")?;
rid += 1;
let (status, xml_text) = (resp.status(), resp.text().await?);
if status != reqwest::StatusCode::OK {
panic!("status is not 200");
}
let el: Element = xml_text.parse()?;
let sid: String = el.attr("sid").unwrap().to_string();
let stream_features: Element = el.children().next().unwrap().clone();
info!("sid: {}", sid);
{
let mut locked_inner = inner.lock().await;
locked_inner.state = ConnectionState::Authenticating;
}
(sid, stream_features)
};
{
let auth_req = format!("<body rid=\"{}\" sid=\"{}\" xmlns=\"http://jabber.org/protocol/httpbind\"><auth mechanism=\"ANONYMOUS\" xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/></body>", rid, sid);
let resp = client.post(bosh_url.clone()).body(auth_req).send()
.await.context("failed to post BOSH auth")?;
rid += 1;
let (status, xml_text) = (resp.status(), resp.text().await?);
if status != reqwest::StatusCode::OK {
panic!("status is not 200");
}
let el: Element = {
let el: Element = xml_text.parse()?;
el.children().next().unwrap().clone()
};
info!("auth response: {:?}", el);
{
let mut locked_inner = inner.lock().await;
locked_inner.state = ConnectionState::ReceivingFeaturesPostAuthentication;
}
}
{
let reopen_req = format!("<body rid=\"{}\" sid=\"{}\" to=\"{}\" xml:lang=\"en\" xmlns=\"http://jabber.org/protocol/httpbind\" xmlns:xmpp=\"urn:xmpp:xbosh\" xmpp:restart=\"true\"/>", rid, sid, xmpp_domain_str);
let resp = client.post(bosh_url.clone()).body(reopen_req).send()
.await.context("failed to post BOSH auth")?;
rid += 1;
let (status, xml_text) = (resp.status(), resp.text().await?);
if status != reqwest::StatusCode::OK {
panic!("status is not 200");
}
let el: Element = {
let el: Element = xml_text.parse()?;
el.children().next().unwrap().clone()
};
info!("authenticated features response: {:?}", el);
{
let mut locked_inner = inner.lock().await;
locked_inner.state = ConnectionState::Binding;
}
let iq = Iq::from_set(generate_id(), BindQuery::new(None));
tx.send(iq.into()).await?;
}
loop {
let next: Option<Element> = tokio::select! {
res = rx.next() => if let Some(res) = res { Some(res) } else { break },
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => None,
};
let send_str: String = if let Some(element) = next {
let mut bytes = Vec::new();
element.write_to(&mut bytes)?;
let xml = String::from_utf8(bytes)?;
debug!("XMPPBOSH>>> {}", xml);
format!("<body rid=\"{}\" sid=\"{}\" xmlns=\"http://jabber.org/protocol/httpbind\">{}</body>", rid, sid, xml)
} else {
debug!("XMPPBOSH>>> no content");
format!("<body rid=\"{}\" sid=\"{}\" xmlns=\"http://jabber.org/protocol/httpbind\" />", rid, sid)
};
let resp = client.post(bosh_url.clone()).body(send_str).send()
.await.context("failed to post BOSH req")?;
rid += 1;
let (status, xml_text) = (resp.status(), resp.text().await?);
if status != reqwest::StatusCode::OK {
panic!("status is not 200");
}
let el: Element = xml_text.parse()?;
info!("element response: {:?}", el);
let mut children = el.children();
while let Some(element_ref) = children.next() {
let element = element_ref.clone();
let mut locked_inner = inner.lock().await;
use ConnectionState::*;
match locked_inner.state {
Binding => match Iq::try_from(element) {
Ok(iq) => {
info!("doing binding state");
let jid = if let IqType::Result(Some(element)) = iq.payload {
let bind = BindResponse::try_from(element)?;
FullJid::try_from(bind)?
}
else {
bail!("bind failed");
};
info!("My JID: {}", jid);
locked_inner.jid = Some(jid.clone());
locked_inner
.stanza_filters
.push(Box::new(Pinger::new(jid.clone(), tx.clone())));
let iq = Iq::from_get(generate_id(), DiscoInfoQuery { node: None })
.with_from(Jid::Full(jid.clone()))
.with_to(Jid::Bare(locked_inner.xmpp_domain.clone()));
tx.send(iq.into()).await?;
locked_inner.state = Discovering;
},
Err(e) => debug!(
"received unexpected element while waiting for bind response: {}",
e
),
},
Discovering => {
info!("doing discovering state");
let iq = Iq::try_from(element)?;
if let IqType::Result(Some(element)) = iq.payload {
let _disco_info = DiscoInfoResult::try_from(element)?;
}
else {
bail!("disco failed");
}
let iq = Iq::from_get(generate_id(), xmpp::extdisco::ServicesQuery {})
.with_from(Jid::Full(
locked_inner.jid.as_ref().context("missing jid")?.clone(),
))
.with_to(Jid::Bare(locked_inner.xmpp_domain.clone()));
tx.send(iq.into()).await?;
locked_inner.state = DiscoveringExternalServices;
},
DiscoveringExternalServices => {
info!("doing discovering ext services state");
let iq = Iq::try_from(element)?;
if let IqType::Result(Some(element)) = iq.payload {
let services = xmpp::extdisco::ServicesResult::try_from(element)?;
debug!("external services: {:?}", services.services);
locked_inner.external_services = services.services;
}
else {
warn!("discovering external services failed: STUN/TURN will not work");
}
if let Some(tx) = locked_inner.connected_tx.take() {
tx.send(Ok(())).map_err(|_| anyhow!("channel closed"))?;
}
locked_inner.state = Idle;
},
Idle => {
info!("doing idle state");
for filter in &locked_inner.stanza_filters {
if filter.filter(&element) {
filter.take(element).await?;
break;
}
}
},
_ => panic!("shouldn't be in this state: {:?}", locked_inner.state),
}
}
}
Ok(())
};
let bg_wrap = async move {
let x: Result<()> = background.await;
x.unwrap()
};
let connection = Self {
tx: tx.clone(),
inner: inner.clone(),
tls_insecure,
};
Ok((connection, bg_wrap))
}
pub async fn add_stanza_filter(&self, stanza_filter: impl StanzaFilter + Send + Sync + 'static) {
let mut locked_inner = self.inner.lock().await;
locked_inner.stanza_filters.push(Box::new(stanza_filter));