send acks on timeout

This commit is contained in:
Milo Turner 2020-03-12 17:42:18 -04:00
parent 038fd9037a
commit 4fd8a45689
2 changed files with 50 additions and 43 deletions

View File

@ -95,10 +95,10 @@ where
} }
} }
fn ack_msg(&self) -> DownMsg { fn ack_idxs(&self) -> Vec<SegIdx> {
DownMsg::Ack { let mut idxs: Vec<_> = self.segs.keys().cloned().collect();
idxs: self.segs.keys().cloned().collect(), idxs.sort();
} idxs
} }
fn is_file_complete(&self) -> bool { fn is_file_complete(&self) -> bool {
@ -110,17 +110,25 @@ async fn download<OUT>(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) ->
where where
OUT: AsyncWrite + Unpin, OUT: AsyncWrite + Unpin,
{ {
let mut sink = SegmentSink::new(out); use tokio::time::{Duration, Instant};
let mut to_send = vec![];
enum Action { enum Evt {
Continue, Recv(UpMsg),
Quit, Timer,
} }
let mut sink = SegmentSink::new(out);
const DELAY_MS: u64 = 1000;
let mut deadline = Instant::now();
let mut to_send = vec![];
loop { loop {
let msg = match peer.recv().await { let timer = tokio::time::delay_until(deadline);
Ok(m) => m, let evt = tokio::select!(
_ = timer => Evt::Timer,
r = peer.recv() => match r {
Ok(m) => Evt::Recv(m),
Err(peer::RecvError::InvalidMessage { .. }) => { Err(peer::RecvError::InvalidMessage { .. }) => {
log.recv_corrupt().await; log.recv_corrupt().await;
continue; continue;
@ -128,35 +136,35 @@ where
Err(peer::RecvError::Io { source }) => { Err(peer::RecvError::Io { source }) => {
return Err(source.into()); return Err(source.into());
} }
}; }
);
let act; match evt {
match msg { Evt::Timer => {
UpMsg::Data { payload, seg_idx } => { deadline += Duration::from_millis(DELAY_MS);
log.debug_msg("timeout").await;
}
Evt::Recv(UpMsg::Data { payload, seg_idx }) => {
let len = payload.len(); let len = payload.len();
match sink.put(seg_idx, payload).await { match sink.put(seg_idx, payload).await {
Put::Duplicate => { Put::Duplicate => {
log.recv_data_ignored(seg_idx as usize, len).await; log.recv_data_ignored(seg_idx as usize, len).await;
} }
Put::Fresh => { Put::Fresh => {
log.recv_data_accepted(seg_idx as usize, len, hptp::logger::OutOfOrder) let ord = hptp::logger::OutOfOrder;
log.recv_data_accepted(seg_idx as usize, len, ord).await;
}
}
}
};
{
let idxs = sink.ack_idxs();
log.debug_msg(format!("send acks {:?}", idxs))
.await; .await;
log.debug_msg(format!("sending acks: {:?}", { to_send.push(DownMsg::Ack { idxs: idxs.clone() });
let mut idxs = sink.segs.keys().cloned().collect::<Vec<_>>(); to_send.push(DownMsg::Ack { idxs });
idxs.sort();
idxs
}))
.await;
to_send.push(sink.ack_msg());
}
}
act = if sink.is_file_complete() {
Action::Quit
} else {
Action::Continue
}
}
} }
for m in to_send.drain(..) { for m in to_send.drain(..) {
@ -167,9 +175,8 @@ where
} }
} }
match act { if sink.is_file_complete() {
Action::Continue => (), break
Action::Quit => break,
} }
} }

View File

@ -148,9 +148,9 @@ where
} }
let mut src = SegmentSource::new(inp); let mut src = SegmentSource::new(inp);
let mut deadline = Instant::now();
const DELAY_MS: u64 = 100;
const DELAY_MS: u64 = 100;
let mut deadline = Instant::now();
let mut to_send = vec![]; let mut to_send = vec![];
loop { loop {