dont use hashmap

This commit is contained in:
Milo Turner 2020-03-14 14:34:52 -04:00
parent 2e6c441194
commit b5b2cd44df
1 changed files with 40 additions and 29 deletions

View File

@ -10,7 +10,6 @@ use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, Peer, UpPeer};
use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE};
use std::cmp;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::AsyncRead;
// use tokio::net::UdpSocket;
@ -75,8 +74,9 @@ struct SegmentSource<'i, IN> {
read_input: bool,
is_meow: bool,
is_cut: bool,
unacked_segs: HashMap<SegIdx, UnAcked>,
unacked_upper_bound: u32,
// ordered by seg idx
unacked_segs: Vec<(SegIdx, UnAcked)>,
next_seg_idx: u32,
eof: bool,
}
@ -87,6 +87,15 @@ struct UnAcked {
const IN_FLIGHT: usize = 30;
impl UnAcked {
fn nack(self) -> Self {
Self {
nacks: self.nacks + 1,
payload: self.payload,
}
}
}
impl<'i, IN> SegmentSource<'i, IN>
where
IN: AsyncRead + Unpin,
@ -98,20 +107,12 @@ where
read_input: false,
is_meow: false,
is_cut: false,
unacked_segs: HashMap::new(),
unacked_upper_bound: 0,
unacked_segs: Vec::new(),
next_seg_idx: 0,
eof: false,
}
}
fn retrans_seg_idx(&self) -> Option<SegIdx> {
self.unacked_segs.keys().skip(IN_FLIGHT).next().cloned()
}
fn any_unacked_seg_idx(&self) -> Option<SegIdx> {
self.unacked_segs.keys().next().cloned()
}
async fn read_next(&mut self) -> SegData {
use tokio::io::AsyncReadExt;
if !self.read_input {
@ -147,44 +148,54 @@ where
}
}
fn retrans_seg_idx(&self) -> Option<SegIdx> {
self.unacked_segs.iter().map(|(si, _)| *si).next()
}
async fn fresh_seg_idx(&mut self) -> Option<SegIdx> {
if self.eof {
None
} else {
let seg_idx = self.unacked_upper_bound;
let payload = self.read_next().await;
self.eof = payload.is_last_segment;
self.unacked_upper_bound += 1;
self.unacked_segs
.insert(seg_idx, UnAcked { payload, nacks: 0 });
let seg_idx = self.next_seg_idx;
let unack = UnAcked {
payload: self.read_next().await,
nacks: 0,
};
self.next_seg_idx += 1;
self.eof = unack.payload.is_last_segment;
self.unacked_segs.push((seg_idx, unack));
Some(seg_idx)
}
}
async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> {
let seg_idx = {
if let Some(si) = self.retrans_seg_idx() {
if let Some(si) = self.fresh_seg_idx().await {
si
} else if let Some(si) = self.fresh_seg_idx().await {
si
} else if let Some(si) = self.any_unacked_seg_idx() {
} else if let Some(si) = self.retrans_seg_idx() {
si
} else {
// early exit cus there's nothing left
return None;
}
};
self.unacked_segs
.get(&seg_idx)
.map(|u| (seg_idx, &u.payload))
let i = self
.unacked_segs
.binary_search_by_key(&seg_idx, |x| x.0)
.expect("seg doesn't exist!");
Some((seg_idx, &self.unacked_segs[i].1.payload))
}
/// `seg_idxs` should be distinct and in increasing order.
fn ack(&mut self, seg_idxs: &[SegIdx]) {
for seg_idx in seg_idxs {
self.unacked_segs.remove(seg_idx);
fn ack(&mut self, ack_seg_idxs: &[SegIdx]) {
let mut new_unacked_segs = Vec::new();
for (seg_idx, unack) in self.unacked_segs.drain(..) {
if ack_seg_idxs.binary_search(&seg_idx).is_err() {
new_unacked_segs.push((seg_idx, unack.nack()))
}
}
self.unacked_segs = new_unacked_segs;
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error>