diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 4fa6b28..ca01ff8 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -79,6 +79,8 @@ struct UnAcked { nacks: usize, } +const IN_FLIGHT: usize = 30; + impl<'i, IN> SegmentSource<'i, IN> where IN: AsyncRead + Unpin, @@ -92,7 +94,15 @@ where } } - async fn read_segment(&mut self) -> Option<(SegIdx, &SegData)> { + fn retrans_seg_idx(&self) -> Option { + self.unacked_segs.keys().skip(IN_FLIGHT).next().cloned() + } + + fn any_unacked_seg_idx(&self) -> Option { + self.unacked_segs.keys().next().cloned() + } + + async fn fresh_seg_idx(&mut self) -> Option { if self.eof { None } else { @@ -100,21 +110,28 @@ where let payload = SegData::read(self.inp).await.expect("god help us"); self.eof = payload.is_last_segment; self.unacked_upper_bound += 1; - let unack = self.unacked_segs.entry(seg_idx); - let unack = unack.or_insert(UnAcked { payload, nacks: 0 }); - Some((seg_idx, &unack.payload)) + self.unacked_segs + .insert(seg_idx, UnAcked { payload, nacks: 0 }); + Some(seg_idx) } } async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> { - if self.unacked_segs.is_empty() { - self.read_segment().await - } else { - self.unacked_segs - .iter() - .next() // get any entry, no ordering guarunteed or needed - .map(|(seg_idx, unack)| (*seg_idx, &unack.payload)) - } + let seg_idx = { + if let Some(si) = self.retrans_seg_idx() { + si + } else if let Some(si) = self.fresh_seg_idx().await { + si + } else if let Some(si) = self.any_unacked_seg_idx() { + si + } else { + // early exit cus there's nothing left + return None; + } + }; + self.unacked_segs + .get(&seg_idx) + .map(|u| (seg_idx, &u.payload)) } /// `seg_idxs` should be distinct and in increasing order. @@ -149,7 +166,7 @@ where let mut src = SegmentSource::new(inp); - const DELAY_MS: u64 = 100; + const DELAY_MS: u64 = (1000 / IN_FLIGHT) as u64; let mut deadline = Instant::now(); let mut to_send = vec![];