Estimate rtt and drop rate for fun and profit

This commit is contained in:
xenia 2020-03-14 17:18:14 -04:00
parent 0b2f61f9fb
commit e5b8d8f8ae
1 changed files with 67 additions and 7 deletions

View File

@ -10,9 +10,12 @@ use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, Peer, UpPeer};
use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE};
use std::cmp;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::AsyncRead;
// use tokio::net::UdpSocket;
use tokio::time::{Duration, Instant};
#[derive(Error, Debug)]
enum Error {
@ -130,10 +133,6 @@ where
self.inp_buffer = buf;
self.is_cut = is_cut;
self.is_meow = true;
} else {
// should always be true with the test cases
// TODO! remove for final submission
panic!();
}
}
@ -218,8 +217,6 @@ where
// 3. recieve ack's in the meantime
// 4. adjust timer delay based on ack information
use tokio::time::{Duration, Instant};
enum Evt {
Recv(DownMsg),
Timer,
@ -227,11 +224,29 @@ where
let mut src = SegmentSource::new(inp);
let in_flight = 66;
const DEFAULT_IN_FLIGHT: i32 = 66;
let mut in_flight = DEFAULT_IN_FLIGHT;
let mut deadline = Instant::now();
let mut to_send = vec![];
let mut send_history: Vec<(SegIdx, Instant)> = vec![];
let mut ack_history: Vec<(SegIdx, Instant)> = vec![];
let mut acked: HashMap<SegIdx, Instant> = HashMap::new();
let mut sent: HashMap<SegIdx, Instant> = HashMap::new();
let mut rtt_est: Duration = Duration::from_millis(1050);
let mut drop_est: usize = 0;
loop {
log.debug_msg(format!("rtt_est is {}, drop_est is {}", rtt_est.as_millis(), drop_est)).await;
if drop_est > 50 {
in_flight = DEFAULT_IN_FLIGHT;
} else if drop_est > 35 {
in_flight = std::cmp::max(in_flight - 5, DEFAULT_IN_FLIGHT);
} else {
in_flight += 5;
}
let timer = tokio::time::delay_until(deadline);
let evt = tokio::select!(
_ = timer => Evt::Timer,
@ -263,10 +278,55 @@ where
Evt::Recv(DownMsg::Ack { idxs }) => {
log.debug_msg(format!("got {} acks", idxs.len())).await;
src.ack(&idxs);
let now = Instant::now();
let mut rtts: Vec<Duration> = vec![];
for idx in idxs.iter().cloned() {
if !acked.contains_key(&idx) {
acked.insert(idx, now);
ack_history.push((idx, now));
}
}
for (ack, ack_time) in ack_history.iter().rev().cloned() {
if sent.contains_key(&ack) {
rtts.push(ack_time - *sent.get(&ack).unwrap());
}
}
if rtts.len() > 10 {
rtts.sort();
let slice = &rtts[rtts.len()/5 - 1..=rtts.len()/5 + 1];
let mean = (slice[0] + slice[1] + slice[2]) / 3;
rtt_est = mean;
}
let mut total_pkts = 0;
let mut acked_pkts = 0;
for (sent_idx, sent_time) in send_history.iter().rev().cloned() {
if sent_time > now - (rtt_est / 2) {
continue;
}
total_pkts += 1;
if acked.contains_key(&sent_idx) {
acked_pkts += 1;
}
}
if total_pkts > 10 {
drop_est = 100 - (acked_pkts * 100 / total_pkts);
}
}
};
for m in to_send.drain(..) {
match m {
UpMsg::Data{seg_idx, ..} => {
let now = Instant::now();
send_history.push((seg_idx, now));
sent.insert(seg_idx, now);
},
}
match peer.send(m).await {
Ok(()) => (),
Err(hptp::peer::SendError::NoTarget) => unreachable!("no target"),