diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index e358b32..9033742 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -10,7 +10,6 @@ use hptp::msg::{DownMsg, UpMsg}; use hptp::peer::{self, Peer, UpPeer}; use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE}; use std::cmp; -use std::collections::HashMap; use std::net::SocketAddr; use tokio::io::AsyncRead; // use tokio::net::UdpSocket; @@ -75,8 +74,9 @@ struct SegmentSource<'i, IN> { read_input: bool, is_meow: bool, is_cut: bool, - unacked_segs: HashMap, - unacked_upper_bound: u32, + // ordered by seg idx + unacked_segs: Vec<(SegIdx, UnAcked)>, + next_seg_idx: u32, eof: bool, } @@ -87,6 +87,15 @@ struct UnAcked { const IN_FLIGHT: usize = 30; +impl UnAcked { + fn nack(self) -> Self { + Self { + nacks: self.nacks + 1, + payload: self.payload, + } + } +} + impl<'i, IN> SegmentSource<'i, IN> where IN: AsyncRead + Unpin, @@ -98,20 +107,12 @@ where read_input: false, is_meow: false, is_cut: false, - unacked_segs: HashMap::new(), - unacked_upper_bound: 0, + unacked_segs: Vec::new(), + next_seg_idx: 0, eof: false, } } - fn retrans_seg_idx(&self) -> Option { - self.unacked_segs.keys().skip(IN_FLIGHT).next().cloned() - } - - fn any_unacked_seg_idx(&self) -> Option { - self.unacked_segs.keys().next().cloned() - } - async fn read_next(&mut self) -> SegData { use tokio::io::AsyncReadExt; if !self.read_input { @@ -147,43 +148,53 @@ where } } + fn retrans_seg_idx(&self) -> Option { + self.unacked_segs.iter().map(|(si, _)| *si).next() + } + async fn fresh_seg_idx(&mut self) -> Option { if self.eof { None } else { - let seg_idx = self.unacked_upper_bound; - let payload = self.read_next().await; - self.eof = payload.is_last_segment; - self.unacked_upper_bound += 1; - self.unacked_segs - .insert(seg_idx, UnAcked { payload, nacks: 0 }); + let seg_idx = self.next_seg_idx; + let unack = UnAcked { + payload: self.read_next().await, + nacks: 0, + }; + self.next_seg_idx += 1; + self.eof = unack.payload.is_last_segment; + self.unacked_segs.push((seg_idx, unack)); Some(seg_idx) } } async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> { let seg_idx = { - if let Some(si) = self.retrans_seg_idx() { + if let Some(si) = self.fresh_seg_idx().await { si - } else if let Some(si) = self.fresh_seg_idx().await { - si - } else if let Some(si) = self.any_unacked_seg_idx() { + } else if let Some(si) = self.retrans_seg_idx() { si } else { // early exit cus there's nothing left return None; } }; - self.unacked_segs - .get(&seg_idx) - .map(|u| (seg_idx, &u.payload)) + let i = self + .unacked_segs + .binary_search_by_key(&seg_idx, |x| x.0) + .expect("seg doesn't exist!"); + Some((seg_idx, &self.unacked_segs[i].1.payload)) } /// `seg_idxs` should be distinct and in increasing order. - fn ack(&mut self, seg_idxs: &[SegIdx]) { - for seg_idx in seg_idxs { - self.unacked_segs.remove(seg_idx); + fn ack(&mut self, ack_seg_idxs: &[SegIdx]) { + let mut new_unacked_segs = Vec::new(); + for (seg_idx, unack) in self.unacked_segs.drain(..) { + if ack_seg_idxs.binary_search(&seg_idx).is_err() { + new_unacked_segs.push((seg_idx, unack.nack())) + } } + self.unacked_segs = new_unacked_segs; } }