From 4fd8a45689e70cb6c28addb1767cd72c859b3752 Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Thu, 12 Mar 2020 17:42:18 -0400 Subject: [PATCH] send acks on timeout --- hptp-recv/src/main.rs | 89 +++++++++++++++++++++++-------------------- hptp-send/src/main.rs | 4 +- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index 28b4f35..449ba68 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -95,10 +95,10 @@ where } } - fn ack_msg(&self) -> DownMsg { - DownMsg::Ack { - idxs: self.segs.keys().cloned().collect(), - } + fn ack_idxs(&self) -> Vec { + let mut idxs: Vec<_> = self.segs.keys().cloned().collect(); + idxs.sort(); + idxs } fn is_file_complete(&self) -> bool { @@ -110,53 +110,61 @@ async fn download(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) -> where OUT: AsyncWrite + Unpin, { - let mut sink = SegmentSink::new(out); - let mut to_send = vec![]; + use tokio::time::{Duration, Instant}; - enum Action { - Continue, - Quit, + enum Evt { + Recv(UpMsg), + Timer, } - loop { - let msg = match peer.recv().await { - Ok(m) => m, - Err(peer::RecvError::InvalidMessage { .. }) => { - log.recv_corrupt().await; - continue; - } - Err(peer::RecvError::Io { source }) => { - return Err(source.into()); - } - }; + let mut sink = SegmentSink::new(out); - let act; - match msg { - UpMsg::Data { payload, seg_idx } => { + const DELAY_MS: u64 = 1000; + let mut deadline = Instant::now(); + let mut to_send = vec![]; + + loop { + let timer = tokio::time::delay_until(deadline); + let evt = tokio::select!( + _ = timer => Evt::Timer, + r = peer.recv() => match r { + Ok(m) => Evt::Recv(m), + Err(peer::RecvError::InvalidMessage { .. }) => { + log.recv_corrupt().await; + continue; + } + Err(peer::RecvError::Io { source }) => { + return Err(source.into()); + } + } + ); + + match evt { + Evt::Timer => { + deadline += Duration::from_millis(DELAY_MS); + log.debug_msg("timeout").await; + } + + Evt::Recv(UpMsg::Data { payload, seg_idx }) => { let len = payload.len(); match sink.put(seg_idx, payload).await { Put::Duplicate => { log.recv_data_ignored(seg_idx as usize, len).await; } - Put::Fresh => { - log.recv_data_accepted(seg_idx as usize, len, hptp::logger::OutOfOrder) - .await; - log.debug_msg(format!("sending acks: {:?}", { - let mut idxs = sink.segs.keys().cloned().collect::>(); - idxs.sort(); - idxs - })) - .await; - to_send.push(sink.ack_msg()); + let ord = hptp::logger::OutOfOrder; + log.recv_data_accepted(seg_idx as usize, len, ord).await; } } - act = if sink.is_file_complete() { - Action::Quit - } else { - Action::Continue - } } + }; + + { + let idxs = sink.ack_idxs(); + log.debug_msg(format!("send acks {:?}", idxs)) + .await; + to_send.push(DownMsg::Ack { idxs: idxs.clone() }); + to_send.push(DownMsg::Ack { idxs }); } for m in to_send.drain(..) { @@ -167,9 +175,8 @@ where } } - match act { - Action::Continue => (), - Action::Quit => break, + if sink.is_file_complete() { + break } } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index f063028..4fa6b28 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -148,9 +148,9 @@ where } let mut src = SegmentSource::new(inp); - let mut deadline = Instant::now(); - const DELAY_MS: u64 = 100; + const DELAY_MS: u64 = 100; + let mut deadline = Instant::now(); let mut to_send = vec![]; loop {