rename SegmentStore => SegmentSource

This commit is contained in:
Milo Turner 2020-03-10 20:46:02 -04:00
parent 924e7d28b3
commit 3820b28964
1 changed files with 18 additions and 9 deletions

View File

@ -6,7 +6,7 @@ extern crate thiserror;
use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{Peer, UpPeer};
use hptp::peer::{self, Peer, UpPeer};
use hptp::seg::SegIdx;
use std::collections::HashMap;
use std::net::SocketAddr;
@ -67,7 +67,7 @@ fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Erro
.map_err(|_| Error::InvalidArgs)
}
struct SegmentStore<'i, IN> {
struct SegmentSource<'i, IN> {
inp: &'i mut IN,
unacked_segs: HashMap<SegIdx, UnAcked>,
unacked_upper_bound: u32,
@ -78,12 +78,12 @@ struct UnAcked {
nacks: usize,
}
impl<'i, IN> SegmentStore<'i, IN>
impl<'i, IN> SegmentSource<'i, IN>
where
IN: AsyncRead + Unpin,
{
fn new(inp: &'i mut IN) -> Self {
SegmentStore {
SegmentSource {
inp,
unacked_segs: HashMap::new(),
unacked_upper_bound: 0,
@ -149,7 +149,7 @@ where
Quit,
}
let mut store = SegmentStore::new(inp);
let mut src = SegmentSource::new(inp);
let mut deadline = Instant::now();
const DELAY_MS: u64 = 100;
@ -159,13 +159,22 @@ where
let timer = tokio::time::delay_until(deadline);
let evt = tokio::select!(
_ = timer => Evt::Timer,
Ok(m) = peer.recv() => Evt::Recv(m),
r = peer.recv() => match r {
Ok(m) => Evt::Recv(m),
Err(peer::RecvError::Io { source }) => {
return Err(source.into());
}
Err(peer::RecvError::InvalidMessage { .. }) => {
log.recv_corrupt().await;
continue;
}
}
);
let act = match evt {
Evt::Timer => {
deadline += Duration::from_millis(DELAY_MS);
match store.get_segment().await {
match src.get_segment().await {
Some((seg_idx, payload)) => {
log.send_data(seg_idx as usize, payload.len()).await;
to_send.push(UpMsg::Data {
@ -180,7 +189,7 @@ where
Evt::Recv(DownMsg::Ack { idxs }) => {
log.debug_msg(format!("got acks: {:?}", idxs)).await;
store.ack(&idxs);
src.ack(&idxs);
Action::Continue
}
};
@ -194,8 +203,8 @@ where
}
match act {
Action::Quit => break,
Action::Continue => (),
Action::Quit => break,
}
}