keep multiple un-ack'd packets in flight
This commit is contained in:
parent
4fd8a45689
commit
954e25b055
|
@ -79,6 +79,8 @@ struct UnAcked {
|
||||||
nacks: usize,
|
nacks: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const IN_FLIGHT: usize = 30;
|
||||||
|
|
||||||
impl<'i, IN> SegmentSource<'i, IN>
|
impl<'i, IN> SegmentSource<'i, IN>
|
||||||
where
|
where
|
||||||
IN: AsyncRead + Unpin,
|
IN: AsyncRead + Unpin,
|
||||||
|
@ -92,7 +94,15 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_segment(&mut self) -> Option<(SegIdx, &SegData)> {
|
fn retrans_seg_idx(&self) -> Option<SegIdx> {
|
||||||
|
self.unacked_segs.keys().skip(IN_FLIGHT).next().cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn any_unacked_seg_idx(&self) -> Option<SegIdx> {
|
||||||
|
self.unacked_segs.keys().next().cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fresh_seg_idx(&mut self) -> Option<SegIdx> {
|
||||||
if self.eof {
|
if self.eof {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
|
@ -100,21 +110,28 @@ where
|
||||||
let payload = SegData::read(self.inp).await.expect("god help us");
|
let payload = SegData::read(self.inp).await.expect("god help us");
|
||||||
self.eof = payload.is_last_segment;
|
self.eof = payload.is_last_segment;
|
||||||
self.unacked_upper_bound += 1;
|
self.unacked_upper_bound += 1;
|
||||||
let unack = self.unacked_segs.entry(seg_idx);
|
self.unacked_segs
|
||||||
let unack = unack.or_insert(UnAcked { payload, nacks: 0 });
|
.insert(seg_idx, UnAcked { payload, nacks: 0 });
|
||||||
Some((seg_idx, &unack.payload))
|
Some(seg_idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> {
|
async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> {
|
||||||
if self.unacked_segs.is_empty() {
|
let seg_idx = {
|
||||||
self.read_segment().await
|
if let Some(si) = self.retrans_seg_idx() {
|
||||||
} else {
|
si
|
||||||
self.unacked_segs
|
} else if let Some(si) = self.fresh_seg_idx().await {
|
||||||
.iter()
|
si
|
||||||
.next() // get any entry, no ordering guarunteed or needed
|
} else if let Some(si) = self.any_unacked_seg_idx() {
|
||||||
.map(|(seg_idx, unack)| (*seg_idx, &unack.payload))
|
si
|
||||||
}
|
} else {
|
||||||
|
// early exit cus there's nothing left
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.unacked_segs
|
||||||
|
.get(&seg_idx)
|
||||||
|
.map(|u| (seg_idx, &u.payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `seg_idxs` should be distinct and in increasing order.
|
/// `seg_idxs` should be distinct and in increasing order.
|
||||||
|
@ -149,7 +166,7 @@ where
|
||||||
|
|
||||||
let mut src = SegmentSource::new(inp);
|
let mut src = SegmentSource::new(inp);
|
||||||
|
|
||||||
const DELAY_MS: u64 = 100;
|
const DELAY_MS: u64 = (1000 / IN_FLIGHT) as u64;
|
||||||
let mut deadline = Instant::now();
|
let mut deadline = Instant::now();
|
||||||
let mut to_send = vec![];
|
let mut to_send = vec![];
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue