From 86af73454270d30acae193f98fb75bb1b8bee71b Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Tue, 10 Mar 2020 12:32:23 -0400 Subject: [PATCH] handle duplicate packets --- hptp-recv/src/main.rs | 29 ++++++++++++++++++++--------- hptp/src/logger.rs | 14 ++++++++++---- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index 54250c4..b29a5a0 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -61,16 +61,27 @@ where loop { match peer.recv().await { Ok(UpMsg::Data { payload, seg_idx }) => { - let len = payload.len(); - segs.entry(seg_idx).or_insert(Some(payload)); - let ack = DownMsg::Ack { - idxs: segs.keys().cloned().collect(), - }; - log.debug_msg(format!("sent ack: {:?}", - segs.keys().collect::>())).await; - send(peer, ack).await?; - log.recv_data_accepted(seg_idx as usize, len, hptp::logger::OutOfOrder) + if segs.contains_key(&seg_idx) { + log.recv_data_ignored(seg_idx as usize, payload.len()).await; + } else { + log.recv_data_accepted( + seg_idx as usize, + payload.len(), + hptp::logger::OutOfOrder, + ) .await; + segs.insert(seg_idx, Some(payload)); + let ack = DownMsg::Ack { + idxs: segs.keys().cloned().collect(), + }; + log.debug_msg(format!("sent ack: {:?}", { + let mut idxs = segs.keys().collect::>(); + idxs.sort_unstable(); + idxs + })) + .await; + send(peer, ack).await?; + } } Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, diff --git a/hptp/src/logger.rs b/hptp/src/logger.rs index 2376e81..bf8ec0d 100644 --- a/hptp/src/logger.rs +++ b/hptp/src/logger.rs @@ -44,8 +44,9 @@ impl Logger { .await } - pub async fn recv_data_ignored(&mut self) { - self.log_payload(LogPayload::RecvDataIgnored).await + pub async fn recv_data_ignored(&mut self, start: usize, len: usize) { + self.log_payload(LogPayload::RecvDataIgnored { start, len }) + .await } pub async fn recv_ack(&mut self, last_offset: usize) { @@ -91,7 +92,10 @@ enum LogPayload<'a> { len: usize, order: AcceptedOrder, }, - RecvDataIgnored, + RecvDataIgnored { + start: usize, + len: usize, + }, RecvAck { last_offset: usize, }, @@ -134,7 +138,9 @@ impl Display for LogPayload<'_> { LogPayload::RecvDataAccepted { start, len, order } => { write!(f, "[recv data] {} ({}) ACCEPTED ({})", start, len, order) } - LogPayload::RecvDataIgnored => write!(f, "[recv data] IGNORED"), + LogPayload::RecvDataIgnored { start, len } => { + write!(f, "[recv data] {} ({}) IGNORED", start, len) + } LogPayload::RecvCorrupt => write!(f, "[recv corrupt packet]"), LogPayload::RecvAck { last_offset } => write!(f, "[recv ack] {}", last_offset), LogPayload::SendData { start, len } => write!(f, "[send data] {} ({})", start, len),