diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index dbd7a98..0f2f80d 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -6,7 +6,7 @@ extern crate thiserror; use hptp::logger::Logger; use hptp::msg::{DownMsg, UpMsg}; -use hptp::peer::{Peer, UpPeer}; +use hptp::peer::{self, Peer, UpPeer}; use hptp::seg::SegIdx; use std::collections::HashMap; use std::net::SocketAddr; @@ -67,7 +67,7 @@ fn parse_args(mut args: impl Iterator) -> Result { +struct SegmentSource<'i, IN> { inp: &'i mut IN, unacked_segs: HashMap, unacked_upper_bound: u32, @@ -78,12 +78,12 @@ struct UnAcked { nacks: usize, } -impl<'i, IN> SegmentStore<'i, IN> +impl<'i, IN> SegmentSource<'i, IN> where IN: AsyncRead + Unpin, { fn new(inp: &'i mut IN) -> Self { - SegmentStore { + SegmentSource { inp, unacked_segs: HashMap::new(), unacked_upper_bound: 0, @@ -149,7 +149,7 @@ where Quit, } - let mut store = SegmentStore::new(inp); + let mut src = SegmentSource::new(inp); let mut deadline = Instant::now(); const DELAY_MS: u64 = 100; @@ -159,13 +159,22 @@ where let timer = tokio::time::delay_until(deadline); let evt = tokio::select!( _ = timer => Evt::Timer, - Ok(m) = peer.recv() => Evt::Recv(m), + r = peer.recv() => match r { + Ok(m) => Evt::Recv(m), + Err(peer::RecvError::Io { source }) => { + return Err(source.into()); + } + Err(peer::RecvError::InvalidMessage { .. }) => { + log.recv_corrupt().await; + continue; + } + } ); let act = match evt { Evt::Timer => { deadline += Duration::from_millis(DELAY_MS); - match store.get_segment().await { + match src.get_segment().await { Some((seg_idx, payload)) => { log.send_data(seg_idx as usize, payload.len()).await; to_send.push(UpMsg::Data { @@ -180,7 +189,7 @@ where Evt::Recv(DownMsg::Ack { idxs }) => { log.debug_msg(format!("got acks: {:?}", idxs)).await; - store.ack(&idxs); + src.ack(&idxs); Action::Continue } }; @@ -194,8 +203,8 @@ where } match act { - Action::Quit => break, Action::Continue => (), + Action::Quit => break, } }