From 634389125d292b15ec85b2a3ad58234b3e28e541 Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Tue, 10 Mar 2020 17:49:28 -0400 Subject: [PATCH] hptp-send "receives ACKs", but doesn't act on them --- Cargo.lock | 13 +++++ hptp-send/Cargo.toml | 2 +- hptp-send/src/main.rs | 119 +++++++++++++++++++++++++----------------- 3 files changed, 86 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 752e980..cfc767e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,19 @@ dependencies = [ "memchr", "mio", "pin-project-lite", + "slab", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/hptp-send/Cargo.toml b/hptp-send/Cargo.toml index 70ae0ae..4b72592 100644 --- a/hptp-send/Cargo.toml +++ b/hptp-send/Cargo.toml @@ -9,5 +9,5 @@ edition = "2018" [dependencies] hptp = { path = "../hptp" } -tokio = { version = "0.2.*", features = ["rt-core", "udp"] } +tokio = { version = "0.2.*", features = ["rt-core", "udp", "time", "macros"] } thiserror = "*" \ No newline at end of file diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 475e3dd..5461f73 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -5,8 +5,8 @@ extern crate tokio; extern crate thiserror; use hptp::logger::Logger; -use hptp::msg::UpMsg; -use hptp::peer::{self, Peer, UpPeer}; +use hptp::msg::{DownMsg, UpMsg}; +use hptp::peer::{Peer, UpPeer}; use hptp::seg::SegIdx; use std::collections::HashMap; use std::net::SocketAddr; @@ -67,28 +67,6 @@ fn parse_args(mut args: impl Iterator) -> Result(inp: &mut IN) -> Result>, Error> -where - IN: AsyncRead + Unpin, -{ - use tokio::io::AsyncReadExt; - let mut buf = [0u8; hptp::seg::MAX_SEG_SIZE]; - let len = inp.read(&mut buf).await?; - Ok(if len > 0 { - Some(buf[..len].into()) - } else { - None - }) -} - -async fn send(peer: &mut UpPeer, m: UpMsg) -> Result<(), Error> { - match peer.send(m).await { - Ok(()) => Ok(()), - Err(peer::SendError::Io { source }) => Err(source.into()), - Err(peer::SendError::NoTarget) => panic!("tried to send w/ no target!"), - } -} - struct SegmentStore<'i, IN> { inp: &'i mut IN, unacked_segs: HashMap, @@ -151,30 +129,77 @@ async fn upload(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result where IN: AsyncRead + Unpin, { - let mut seg_idx = 0; - // TODO: async recieve acks - loop { - /* if let Some(ack_len) = next_ack_len { - match peer.recv().await { - Ok(DownMsg::Ack { idxs }) => { - // log.recv_ack(pos).await; - next_ack_len = None; - } - Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, - Err(peer::RecvError::Io { source }) => return Err(source.into()), - } - } else {*/ - match read_segment(inp).await? { - Some(payload) => { - log.send_data(seg_idx as usize, payload.len()).await; - let data = UpMsg::Data { payload, seg_idx }; - send(peer, data).await?; - seg_idx += 1; - } - None => break, - } - // } + // TODO: + // 1. prioritize segments to send + // 2. set a timer for sending the next segment + // 3. recieve ack's in the meantime + // 4. adjust timer delay based on ack information + + use tokio::time::{Duration, Instant}; + + enum Evt { + Recv(DownMsg), + Timer, } + + enum Action { + Continue, + Quit, + } + + let mut store = SegmentStore::new(inp); + let mut deadline = Instant::now(); + const DELAY_MS: u64 = 250; + + let mut to_send = vec![]; + + loop { + let timer = tokio::time::delay_until(deadline); + + let evt = tokio::select!( + _ = timer => Evt::Timer, + Ok(m) = peer.recv() => Evt::Recv(m), + ); + + let act = match evt { + Evt::Timer => { + deadline += Duration::from_millis(DELAY_MS); + match store.get_segment().await { + Some((seg_idx, payload)) => { + log.send_data(seg_idx as usize, payload.len()).await; + to_send.push(UpMsg::Data { + seg_idx, + payload: Vec::from(payload), + }); + Action::Continue + } + None => { + log.completed().await; + Action::Quit + } + } + } + + Evt::Recv(m) => { + log.debug_msg(format!("{:?}", m)).await; + Action::Continue + } + }; + + for m in to_send.drain(..) { + match peer.send(m).await { + Ok(()) => (), + Err(hptp::peer::SendError::NoTarget) => unreachable!("no target"), + Err(hptp::peer::SendError::Io { source }) => return Err(source.into()), + } + } + + match act { + Action::Quit => break, + Action::Continue => (), + } + } + log.completed().await; Ok(()) }