receive ACKs

This commit is contained in:
Milo Turner 2020-03-10 18:35:54 -04:00
parent 6b3d8acdf1
commit 924e7d28b3
1 changed files with 10 additions and 11 deletions

View File

@ -119,9 +119,11 @@ where
} }
} }
fn ack(&mut self, seg_idx: SegIdx) { /// `seg_idxs` should be distinct and in increasing order.
self.unacked_segs.remove(&seg_idx); fn ack(&mut self, seg_idxs: &[SegIdx]) {
// TODO: update bounds for seg_idx in seg_idxs {
self.unacked_segs.remove(seg_idx);
}
} }
} }
@ -149,13 +151,12 @@ where
let mut store = SegmentStore::new(inp); let mut store = SegmentStore::new(inp);
let mut deadline = Instant::now(); let mut deadline = Instant::now();
const DELAY_MS: u64 = 250; const DELAY_MS: u64 = 100;
let mut to_send = vec![]; let mut to_send = vec![];
loop { loop {
let timer = tokio::time::delay_until(deadline); let timer = tokio::time::delay_until(deadline);
let evt = tokio::select!( let evt = tokio::select!(
_ = timer => Evt::Timer, _ = timer => Evt::Timer,
Ok(m) = peer.recv() => Evt::Recv(m), Ok(m) = peer.recv() => Evt::Recv(m),
@ -173,15 +174,13 @@ where
}); });
Action::Continue Action::Continue
} }
None => { None => Action::Quit,
log.completed().await;
Action::Quit
}
} }
} }
Evt::Recv(m) => { Evt::Recv(DownMsg::Ack { idxs }) => {
log.debug_msg(format!("{:?}", m)).await; log.debug_msg(format!("got acks: {:?}", idxs)).await;
store.ack(&idxs);
Action::Continue Action::Continue
} }
}; };