diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index bc5f142..8a6878d 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -11,6 +11,7 @@ use hptp::peer::{self, DownPeer, Peer}; use hptp::seg::{SegData, SegIdx}; use std::collections::HashMap; use tokio::io::AsyncWrite; +use std::cmp; #[derive(Error, Debug)] enum Error { @@ -128,10 +129,12 @@ where } let mut sink = SegmentSink::new(out); - - const DELAY_MS: u64 = 1000; let mut deadline = Instant::now(); + let mut ack_buildup: usize = 0; + let mut seen_eof = false; + let mut max_seen_idx: SegIdx = 0; + loop { let timer = tokio::time::delay_until(deadline); let evt = tokio::select!( @@ -150,12 +153,15 @@ where match evt { Evt::Timer => { - deadline += Duration::from_millis(DELAY_MS); + deadline += Duration::from_millis(1000); log.debug_msg("timeout").await; + ack_buildup += 1; } Evt::Recv(UpMsg::Data { payload, seg_idx }) => { let len = payload.len(); + let eof = payload.is_last_segment || seg_idx < max_seen_idx; + max_seen_idx = cmp::max(seg_idx + 1, max_seen_idx); match sink.put(seg_idx, payload).await { Put::Duplicate => { log.recv_data_ignored(seg_idx as usize, len).await; @@ -163,15 +169,29 @@ where Put::Fresh => { let ord = hptp::logger::OutOfOrder; log.recv_data_accepted(seg_idx as usize, len, ord).await; + ack_buildup += if eof { 10 } else { 2 }; } } + + if eof && !seen_eof { + seen_eof = true; + ack_buildup += 999; + } } }; - let num_acks = if sink.is_file_complete() { 8 } else { 1 }; + let num_acks = if sink.is_file_complete() { + 5 + } else if ack_buildup > 10 { + ack_buildup = 0; + 2 + } else { + 0 + }; + for _ in 0..num_acks { let idxs = sink.ack_idxs(); - log.debug_msg(format!("send acks {:?}", idxs)).await; + log.debug_msg(format!("send {} acks", idxs.len())).await; let m = DownMsg::Ack { idxs }; match peer.send(m).await { Ok(()) => (),