quit on file completed

This commit is contained in:
Milo Turner 2020-03-14 15:41:55 -04:00
parent 7d3715ab02
commit c37b767cb9
1 changed files with 13 additions and 6 deletions

View File

@ -86,8 +86,6 @@ struct UnAcked {
nacks: usize,
}
const IN_FLIGHT: usize = 30;
impl UnAcked {
fn nack(self) -> Self {
Self {
@ -204,6 +202,10 @@ where
self.unacked_segs = new_unacked_segs;
self.next_retrans_idx = 0;
}
fn is_file_completed(&self) -> bool {
self.eof && self.unacked_segs.is_empty()
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error>
@ -230,7 +232,7 @@ where
let mut src = SegmentSource::new(inp);
const DELAY_MS: u64 = (1000 / IN_FLIGHT) as u64;
let in_flight = 66;
let mut deadline = Instant::now();
let mut to_send = vec![];
@ -252,7 +254,8 @@ where
let act = match evt {
Evt::Timer => {
deadline += Duration::from_millis(DELAY_MS);
let delay_ms = (1000 / in_flight) as u64;
deadline += Duration::from_millis(delay_ms);
match src.get_segment().await {
Some((seg_idx, payload)) => {
log.send_data(seg_idx as usize, payload.len()).await;
@ -267,10 +270,14 @@ where
}
Evt::Recv(DownMsg::Ack { idxs }) => {
log.debug_msg(format!("got acks: {:?}", idxs)).await;
log.debug_msg(format!("got {} acks", idxs.len())).await;
src.ack(&idxs);
if src.is_file_completed() {
Action::Quit
} else {
Action::Continue
}
}
};
for m in to_send.drain(..) {