hptp-send "receives ACKs", but doesn't act on them

This commit is contained in:
Milo Turner 2020-03-10 17:49:28 -04:00
parent dfd33acf1e
commit 634389125d
3 changed files with 86 additions and 48 deletions

13
Cargo.lock generated
View File

@ -281,6 +281,19 @@ dependencies = [
"memchr", "memchr",
"mio", "mio",
"pin-project-lite", "pin-project-lite",
"slab",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote",
"syn",
] ]
[[package]] [[package]]

View File

@ -9,5 +9,5 @@ edition = "2018"
[dependencies] [dependencies]
hptp = { path = "../hptp" } hptp = { path = "../hptp" }
tokio = { version = "0.2.*", features = ["rt-core", "udp"] } tokio = { version = "0.2.*", features = ["rt-core", "udp", "time", "macros"] }
thiserror = "*" thiserror = "*"

View File

@ -5,8 +5,8 @@ extern crate tokio;
extern crate thiserror; extern crate thiserror;
use hptp::logger::Logger; use hptp::logger::Logger;
use hptp::msg::UpMsg; use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, Peer, UpPeer}; use hptp::peer::{Peer, UpPeer};
use hptp::seg::SegIdx; use hptp::seg::SegIdx;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -67,28 +67,6 @@ fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Erro
.map_err(|_| Error::InvalidArgs) .map_err(|_| Error::InvalidArgs)
} }
async fn read_segment<IN>(inp: &mut IN) -> Result<Option<Vec<u8>>, Error>
where
IN: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
let mut buf = [0u8; hptp::seg::MAX_SEG_SIZE];
let len = inp.read(&mut buf).await?;
Ok(if len > 0 {
Some(buf[..len].into())
} else {
None
})
}
async fn send(peer: &mut UpPeer, m: UpMsg) -> Result<(), Error> {
match peer.send(m).await {
Ok(()) => Ok(()),
Err(peer::SendError::Io { source }) => Err(source.into()),
Err(peer::SendError::NoTarget) => panic!("tried to send w/ no target!"),
}
}
struct SegmentStore<'i, IN> { struct SegmentStore<'i, IN> {
inp: &'i mut IN, inp: &'i mut IN,
unacked_segs: HashMap<SegIdx, UnAcked>, unacked_segs: HashMap<SegIdx, UnAcked>,
@ -151,30 +129,77 @@ async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result
where where
IN: AsyncRead + Unpin, IN: AsyncRead + Unpin,
{ {
let mut seg_idx = 0; // TODO:
// TODO: async recieve acks // 1. prioritize segments to send
loop { // 2. set a timer for sending the next segment
/* if let Some(ack_len) = next_ack_len { // 3. recieve ack's in the meantime
match peer.recv().await { // 4. adjust timer delay based on ack information
Ok(DownMsg::Ack { idxs }) => {
// log.recv_ack(pos).await; use tokio::time::{Duration, Instant};
next_ack_len = None;
} enum Evt {
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, Recv(DownMsg),
Err(peer::RecvError::Io { source }) => return Err(source.into()), Timer,
}
} else {*/
match read_segment(inp).await? {
Some(payload) => {
log.send_data(seg_idx as usize, payload.len()).await;
let data = UpMsg::Data { payload, seg_idx };
send(peer, data).await?;
seg_idx += 1;
}
None => break,
}
// }
} }
enum Action {
Continue,
Quit,
}
let mut store = SegmentStore::new(inp);
let mut deadline = Instant::now();
const DELAY_MS: u64 = 250;
let mut to_send = vec![];
loop {
let timer = tokio::time::delay_until(deadline);
let evt = tokio::select!(
_ = timer => Evt::Timer,
Ok(m) = peer.recv() => Evt::Recv(m),
);
let act = match evt {
Evt::Timer => {
deadline += Duration::from_millis(DELAY_MS);
match store.get_segment().await {
Some((seg_idx, payload)) => {
log.send_data(seg_idx as usize, payload.len()).await;
to_send.push(UpMsg::Data {
seg_idx,
payload: Vec::from(payload),
});
Action::Continue
}
None => {
log.completed().await;
Action::Quit
}
}
}
Evt::Recv(m) => {
log.debug_msg(format!("{:?}", m)).await;
Action::Continue
}
};
for m in to_send.drain(..) {
match peer.send(m).await {
Ok(()) => (),
Err(hptp::peer::SendError::NoTarget) => unreachable!("no target"),
Err(hptp::peer::SendError::Io { source }) => return Err(source.into()),
}
}
match act {
Action::Quit => break,
Action::Continue => (),
}
}
log.completed().await; log.completed().await;
Ok(()) Ok(())
} }