From dd6c4ff088f765f8ac7f87f0d02bd8db957f676d Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Tue, 10 Mar 2020 12:01:44 -0400 Subject: [PATCH] *very* primitive segment-index ack's --- Cargo.lock | 7 +++++++ hptp-recv/src/main.rs | 29 ++++++++++++++++++++-------- hptp-send/src/main.rs | 31 +++++++++++++++-------------- hptp/Cargo.toml | 3 ++- hptp/src/lib.rs | 3 ++- hptp/src/msg.rs | 45 ++++++++++++++++++++++++++++++------------- hptp/src/seg.rs | 2 +- 7 files changed, 80 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2880c79..752e980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + [[package]] name = "bytes" version = "0.5.4" @@ -55,6 +61,7 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" name = "hptp" version = "0.1.0" dependencies = [ + "byteorder", "chrono", "thiserror", "tokio", diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index c12ee10..54250c4 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -7,6 +7,7 @@ extern crate thiserror; use hptp::logger::Logger; use hptp::msg::{DownMsg, UpMsg}; use hptp::peer::{self, DownPeer, Peer}; +use std::collections::HashMap; use tokio::io::AsyncWrite; #[derive(Error, Debug)] @@ -55,22 +56,34 @@ async fn download(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) -> where OUT: AsyncWrite + Unpin, { - use tokio::io::AsyncWriteExt; - let mut pos = 0; + let mut segs = HashMap::new(); + let mut flush_seg_idx = 0; loop { match peer.recv().await { - Ok(UpMsg::Data { payload, .. }) => { + Ok(UpMsg::Data { payload, seg_idx }) => { let len = payload.len(); - out.write_all(&payload).await?; - out.flush().await?; - log.recv_data_accepted(pos, len, hptp::logger::InOrder) + segs.entry(seg_idx).or_insert(Some(payload)); + let ack = DownMsg::Ack { + idxs: segs.keys().cloned().collect(), + }; + log.debug_msg(format!("sent ack: {:?}", + segs.keys().collect::>())).await; + send(peer, ack).await?; + log.recv_data_accepted(seg_idx as usize, len, hptp::logger::OutOfOrder) .await; - send(peer, DownMsg::Ack {}).await?; - pos += len; } Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, Err(peer::RecvError::Io { source }) => return Err(source.into()), } + + while let Some(v) = segs.get_mut(&flush_seg_idx) { + if let Some(payload) = v.take() { + use tokio::io::AsyncWriteExt; + out.write_all(&payload).await?; + out.flush().await?; + } + flush_seg_idx += 1; + } } } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 76dc5a1..81d8754 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -5,7 +5,7 @@ extern crate tokio; extern crate thiserror; use hptp::logger::Logger; -use hptp::msg::{DownMsg, UpMsg}; +use hptp::msg::UpMsg; use hptp::peer::{self, Peer, UpPeer}; use std::net::SocketAddr; use tokio::io::AsyncRead; @@ -91,30 +91,29 @@ async fn upload(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result where IN: AsyncRead + Unpin, { - let mut pos = 0; - let mut next_ack_len: Option = None; + let mut seg_idx = 0; + // TODO: async recieve acks loop { - if let Some(ack_len) = next_ack_len { + /* if let Some(ack_len) = next_ack_len { match peer.recv().await { - Ok(DownMsg::Ack { .. }) => { - log.recv_ack(pos).await; - pos += ack_len; + Ok(DownMsg::Ack { idxs }) => { + // log.recv_ack(pos).await; next_ack_len = None; } - Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, Err(peer::RecvError::Io { source }) => return Err(source.into()), } - } else { - match read_segment(inp).await? { - Some(data) => { - next_ack_len = Some(data.len()); - log.send_data(pos, data.len()).await; - send(peer, UpMsg::Data { payload: data }).await?; - } - None => break, + } else {*/ + match read_segment(inp).await? { + Some(payload) => { + log.send_data(seg_idx as usize, payload.len()).await; + let data = UpMsg::Data { payload, seg_idx }; + send(peer, data).await?; + seg_idx += 1; } + None => break, } + // } } log.completed().await; Ok(()) diff --git a/hptp/Cargo.toml b/hptp/Cargo.toml index 3d53c90..b34975f 100644 --- a/hptp/Cargo.toml +++ b/hptp/Cargo.toml @@ -10,4 +10,5 @@ edition = "2018" [dependencies] tokio = {version = "0.2.*", features = ["io-std", "io-util", "udp"]} thiserror = "*" -chrono = "0.4.*" \ No newline at end of file +chrono = "0.4.*" +byteorder = "1.3.*" \ No newline at end of file diff --git a/hptp/src/lib.rs b/hptp/src/lib.rs index ed3c22f..eec3778 100644 --- a/hptp/src/lib.rs +++ b/hptp/src/lib.rs @@ -1,8 +1,9 @@ #[macro_use] extern crate thiserror; +extern crate byteorder; extern crate chrono; pub mod logger; -pub mod seg; pub mod msg; pub mod peer; +pub mod seg; diff --git a/hptp/src/msg.rs b/hptp/src/msg.rs index e70e5a8..99cfa38 100644 --- a/hptp/src/msg.rs +++ b/hptp/src/msg.rs @@ -1,18 +1,20 @@ -pub use super::seg::{MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE, DOWN_HEADER_SIZE}; +use super::seg::SegIdx; +pub use super::seg::{DOWN_HEADER_SIZE, MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE}; +use byteorder::ByteOrder; +use std::collections::HashSet; #[derive(Clone, Debug)] pub enum UpMsg { Data { payload: Vec, - // seg_idx: SegIdx, - // encoding: SegmentEncoding, + seg_idx: SegIdx, // is_final_packet: bool, }, } #[derive(Clone, Debug)] pub enum DownMsg { - Ack {}, // ackd: SegmentSet + Ack { idxs: HashSet }, } #[derive(Error, Debug)] @@ -26,30 +28,47 @@ pub trait SerDes: Sized { fn ser_to(&self, buf: &mut [u8]) -> usize; } +type BO = byteorder::LE; + impl SerDes for UpMsg { fn des(buf: &[u8]) -> Result { - Ok(UpMsg::Data { - payload: buf.into(), - }) + if buf.len() < UP_HEADER_SIZE { + Err(DesError) + } else { + Ok(UpMsg::Data { + seg_idx: BO::read_u32(&buf[0..4]), + payload: buf[4..].into(), + }) + } } fn ser_to(&self, buf: &mut [u8]) -> usize { match self { - UpMsg::Data { payload, .. } => { + UpMsg::Data { payload, seg_idx } => { let len = payload.len(); - buf[..len].copy_from_slice(&payload[..]); - len + BO::write_u32(&mut buf[0..4], *seg_idx); + buf[4..4 + len].copy_from_slice(&payload[..]); + 4 + len } } } } impl SerDes for DownMsg { - fn des(_buf: &[u8]) -> Result { - Ok(DownMsg::Ack {}) + fn des(buf: &[u8]) -> Result { + let mut idxs = HashSet::new(); + for (i, b) in buf.iter().cloned().enumerate() { + for j in 0..8 { + if b & (1 << j) != 0 { + idxs.insert((i * 8 + j) as SegIdx); + } + } + } + Ok(DownMsg::Ack { idxs }) } fn ser_to(&self, _buf: &mut [u8]) -> usize { - 1 + // TODO: implement this! + 0 } } diff --git a/hptp/src/seg.rs b/hptp/src/seg.rs index 50e2304..040a4ed 100644 --- a/hptp/src/seg.rs +++ b/hptp/src/seg.rs @@ -2,7 +2,7 @@ pub const MAX_TOTAL_PACKET_SIZE: usize = 1472; // TODO: change these based off the decoders -pub const UP_HEADER_SIZE: usize = 0; +pub const UP_HEADER_SIZE: usize = 4; pub const DOWN_HEADER_SIZE: usize = 1; /// This is the maximum amount of segment data we can fit into a packet.