diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index b957ac3..e0ccfeb 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -10,9 +10,12 @@ use hptp::msg::{DownMsg, UpMsg}; use hptp::peer::{self, Peer, UpPeer}; use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE}; use std::cmp; +use std::collections::HashMap; use std::net::SocketAddr; use tokio::io::AsyncRead; // use tokio::net::UdpSocket; +use tokio::time::{Duration, Instant}; + #[derive(Error, Debug)] enum Error { @@ -130,10 +133,6 @@ where self.inp_buffer = buf; self.is_cut = is_cut; self.is_meow = true; - } else { - // should always be true with the test cases - // TODO! remove for final submission - panic!(); } } @@ -218,8 +217,6 @@ where // 3. recieve ack's in the meantime // 4. adjust timer delay based on ack information - use tokio::time::{Duration, Instant}; - enum Evt { Recv(DownMsg), Timer, @@ -227,11 +224,29 @@ where let mut src = SegmentSource::new(inp); - let in_flight = 66; + const DEFAULT_IN_FLIGHT: i32 = 66; + let mut in_flight = DEFAULT_IN_FLIGHT; let mut deadline = Instant::now(); let mut to_send = vec![]; + let mut send_history: Vec<(SegIdx, Instant)> = vec![]; + let mut ack_history: Vec<(SegIdx, Instant)> = vec![]; + let mut acked: HashMap = HashMap::new(); + let mut sent: HashMap = HashMap::new(); + let mut rtt_est: Duration = Duration::from_millis(1050); + let mut drop_est: usize = 0; + loop { + log.debug_msg(format!("rtt_est is {}, drop_est is {}", rtt_est.as_millis(), drop_est)).await; + + if drop_est > 50 { + in_flight = DEFAULT_IN_FLIGHT; + } else if drop_est > 35 { + in_flight = std::cmp::max(in_flight - 5, DEFAULT_IN_FLIGHT); + } else { + in_flight += 5; + } + let timer = tokio::time::delay_until(deadline); let evt = tokio::select!( _ = timer => Evt::Timer, @@ -263,10 +278,55 @@ where Evt::Recv(DownMsg::Ack { idxs }) => { log.debug_msg(format!("got {} acks", idxs.len())).await; src.ack(&idxs); + + let now = Instant::now(); + let mut rtts: Vec = vec![]; + + for idx in idxs.iter().cloned() { + if !acked.contains_key(&idx) { + acked.insert(idx, now); + ack_history.push((idx, now)); + } + } + + for (ack, ack_time) in ack_history.iter().rev().cloned() { + if sent.contains_key(&ack) { + rtts.push(ack_time - *sent.get(&ack).unwrap()); + } + } + + if rtts.len() > 10 { + rtts.sort(); + let slice = &rtts[rtts.len()/5 - 1..=rtts.len()/5 + 1]; + let mean = (slice[0] + slice[1] + slice[2]) / 3; + rtt_est = mean; + } + + let mut total_pkts = 0; + let mut acked_pkts = 0; + for (sent_idx, sent_time) in send_history.iter().rev().cloned() { + if sent_time > now - (rtt_est / 2) { + continue; + } + total_pkts += 1; + if acked.contains_key(&sent_idx) { + acked_pkts += 1; + } + } + if total_pkts > 10 { + drop_est = 100 - (acked_pkts * 100 / total_pkts); + } } }; for m in to_send.drain(..) { + match m { + UpMsg::Data{seg_idx, ..} => { + let now = Instant::now(); + send_history.push((seg_idx, now)); + sent.insert(seg_idx, now); + }, + } match peer.send(m).await { Ok(()) => (), Err(hptp::peer::SendError::NoTarget) => unreachable!("no target"),