diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index e0ccfeb..1483ce5 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -235,16 +235,33 @@ where let mut sent: HashMap = HashMap::new(); let mut rtt_est: Duration = Duration::from_millis(1050); let mut drop_est: usize = 0; + let mut drop_est_made: bool = false; loop { + let now = Instant::now(); log.debug_msg(format!("rtt_est is {}, drop_est is {}", rtt_est.as_millis(), drop_est)).await; - if drop_est > 50 { - in_flight = DEFAULT_IN_FLIGHT; - } else if drop_est > 35 { - in_flight = std::cmp::max(in_flight - 5, DEFAULT_IN_FLIGHT); - } else { - in_flight += 5; + if drop_est_made { + if drop_est > 50 { + in_flight = DEFAULT_IN_FLIGHT; + } else if drop_est > 35 { + in_flight = std::cmp::max(in_flight - 5, DEFAULT_IN_FLIGHT); + } else { + in_flight += 150; + } + drop_est_made = false; + let mut drain_idx = send_history.len(); + for entry in send_history.iter().rev() { + if entry.1 >= now - (rtt_est / 2) { + drain_idx -= 1; + } else { + break; + } + } + send_history.drain(0..drain_idx); + ack_history.clear(); + sent.clear(); + acked.clear(); } let timer = tokio::time::delay_until(deadline); @@ -279,7 +296,6 @@ where log.debug_msg(format!("got {} acks", idxs.len())).await; src.ack(&idxs); - let now = Instant::now(); let mut rtts: Vec = vec![]; for idx in idxs.iter().cloned() { @@ -315,6 +331,7 @@ where } if total_pkts > 10 { drop_est = 100 - (acked_pkts * 100 / total_pkts); + drop_est_made = true; } } }; @@ -322,7 +339,6 @@ where for m in to_send.drain(..) { match m { UpMsg::Data{seg_idx, ..} => { - let now = Instant::now(); send_history.push((seg_idx, now)); sent.insert(seg_idx, now); },