work on batched ack's

This commit is contained in:
Milo Turner 2020-03-14 15:52:03 -04:00
parent f6cddca534
commit 0b2f61f9fb
1 changed files with 25 additions and 5 deletions

View File

@ -11,6 +11,7 @@ use hptp::peer::{self, DownPeer, Peer};
use hptp::seg::{SegData, SegIdx}; use hptp::seg::{SegData, SegIdx};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use std::cmp;
#[derive(Error, Debug)] #[derive(Error, Debug)]
enum Error { enum Error {
@ -128,10 +129,12 @@ where
} }
let mut sink = SegmentSink::new(out); let mut sink = SegmentSink::new(out);
const DELAY_MS: u64 = 1000;
let mut deadline = Instant::now(); let mut deadline = Instant::now();
let mut ack_buildup: usize = 0;
let mut seen_eof = false;
let mut max_seen_idx: SegIdx = 0;
loop { loop {
let timer = tokio::time::delay_until(deadline); let timer = tokio::time::delay_until(deadline);
let evt = tokio::select!( let evt = tokio::select!(
@ -150,12 +153,15 @@ where
match evt { match evt {
Evt::Timer => { Evt::Timer => {
deadline += Duration::from_millis(DELAY_MS); deadline += Duration::from_millis(1000);
log.debug_msg("timeout").await; log.debug_msg("timeout").await;
ack_buildup += 1;
} }
Evt::Recv(UpMsg::Data { payload, seg_idx }) => { Evt::Recv(UpMsg::Data { payload, seg_idx }) => {
let len = payload.len(); let len = payload.len();
let eof = payload.is_last_segment || seg_idx < max_seen_idx;
max_seen_idx = cmp::max(seg_idx + 1, max_seen_idx);
match sink.put(seg_idx, payload).await { match sink.put(seg_idx, payload).await {
Put::Duplicate => { Put::Duplicate => {
log.recv_data_ignored(seg_idx as usize, len).await; log.recv_data_ignored(seg_idx as usize, len).await;
@ -163,15 +169,29 @@ where
Put::Fresh => { Put::Fresh => {
let ord = hptp::logger::OutOfOrder; let ord = hptp::logger::OutOfOrder;
log.recv_data_accepted(seg_idx as usize, len, ord).await; log.recv_data_accepted(seg_idx as usize, len, ord).await;
ack_buildup += if eof { 10 } else { 2 };
} }
} }
if eof && !seen_eof {
seen_eof = true;
ack_buildup += 999;
}
} }
}; };
let num_acks = if sink.is_file_complete() { 8 } else { 1 }; let num_acks = if sink.is_file_complete() {
5
} else if ack_buildup > 10 {
ack_buildup = 0;
2
} else {
0
};
for _ in 0..num_acks { for _ in 0..num_acks {
let idxs = sink.ack_idxs(); let idxs = sink.ack_idxs();
log.debug_msg(format!("send acks {:?}", idxs)).await; log.debug_msg(format!("send {} acks", idxs.len())).await;
let m = DownMsg::Ack { idxs }; let m = DownMsg::Ack { idxs };
match peer.send(m).await { match peer.send(m).await {
Ok(()) => (), Ok(()) => (),