diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index a3643f7..a53fac8 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -23,6 +23,7 @@ enum Error { }, } +// starts start in a new tokio runtime fn entry() -> Result<(), Error> { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut log = Logger::new(); @@ -39,6 +40,7 @@ fn main() { } } +// async entry point async fn start(log: &mut Logger) -> Result<(), Error> { let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?; log.bound(sock.local_addr()?.port()).await; @@ -47,17 +49,26 @@ async fn start(log: &mut Logger) -> Result<(), Error> { download(log, &mut peer, &mut out).await } +// keeps track of data we have received struct SegmentSink<'o, OUT> { + // where to write out the data out: &'o mut OUT, + // a MeowCoder instance for decoding meow_coder: MeowCoder, + // a map of currently received segments segs: HashMap>, + // which segments we have flushed to out n_flushed: u32, + // whether we are complete complete: bool, } +// what happened when you put a new segment into a sink #[derive(Copy, Clone, Eq, PartialEq, Debug)] enum Put { + // we already had this segment Duplicate, + // this is a new segment Fresh, } @@ -75,6 +86,7 @@ where } } + // Flushes segments that we have all in order so far to out async fn flush(&mut self) { use tokio::io::AsyncWriteExt; while let Some(cache) = self.segs.get_mut(&self.n_flushed) { @@ -96,6 +108,7 @@ where self.out.flush().await.expect("god help us"); } + // Processes a new received segment async fn put(&mut self, seg_idx: SegIdx, data: SegData) -> Put { if seg_idx < self.n_flushed || self.segs.contains_key(&seg_idx) { Put::Duplicate @@ -106,23 +119,30 @@ where } } + // Generates the acks to send back fn ack_idxs(&self) -> Vec { let mut idxs: Vec<_> = self.segs.keys().cloned().collect(); idxs.sort(); idxs } + // Whether the receiving is complete yet fn is_file_complete(&self) -> bool { self.complete } } +// Downloads a file from a sender async fn download(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) -> Result<(), Error> where OUT: AsyncWrite + Unpin, { use tokio::time::{Duration, Instant}; + // General strategy: Listen for segments coming in, and ack once we have seen some number of + // segments or some time has elapsed. Once we see the eof (or detect that we must have passed + // eof and started retransmitting earlier packets) we ack slightly more often. + enum Evt { Recv(UpMsg), Timer, @@ -153,12 +173,14 @@ where match evt { Evt::Timer => { + // timeout, make sure to send an ack now deadline += Duration::from_millis(1000); log.debug_msg("timeout").await; - ack_buildup += 1; + ack_buildup += 10; } Evt::Recv(UpMsg::Data { payload, seg_idx }) => { + // got some data, update our data set let len = payload.len(); let eof = payload.is_last_segment || seg_idx < max_seen_idx; max_seen_idx = cmp::max(seg_idx + 1, max_seen_idx); @@ -174,6 +196,7 @@ where } if eof && !seen_eof { + // eof seen, probably ack now seen_eof = true; ack_buildup += 999; } @@ -181,11 +204,16 @@ where }; let num_acks = if sink.is_file_complete() { + // spam out 5 acks on shutdown just to be sure the sender gets at least 1 + // instead of taking extra time with a closing handshake we make the assumption + // that at least one of these will go through 5 } else if ack_buildup > 10 { + // spam 2 acks at a time to hopefully avoid drops ack_buildup = 0; 2 } else { + // send no acks right now 0 }; @@ -201,6 +229,7 @@ where } if sink.is_file_complete() { + // we're done break; } } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index a43002f..cf00aa5 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -29,6 +29,7 @@ enum Error { InvalidArgs, } +// Main function fn main() { match entry() { Err(Error::InvalidArgs) => print_usage(), @@ -47,12 +48,14 @@ fn print_usage() { print!("Usage:\n./3700send :\n") } +// Starts async code in a new tokio runtime fn entry() -> Result<(), Error> { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut log = Logger::new(); rt.block_on(start(&mut log)) } +// The entry point for the async code async fn start(log: &mut Logger) -> Result<(), Error> { let targ_addr = parse_args(std::env::args())?; let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?; @@ -64,6 +67,7 @@ async fn start(log: &mut Logger) -> Result<(), Error> { upload(log, &mut peer, &mut out).await } +// Parses command line arguments fn parse_args(mut args: impl Iterator) -> Result { args.nth(1) .ok_or(Error::InvalidArgs)? @@ -71,19 +75,30 @@ fn parse_args(mut args: impl Iterator) -> Result { + // The source of data to upload inp: &'i mut IN, + // A buffer holding data we read from inp that hasn't been sent yet inp_buffer: Vec, + // Whether we read all data from inp yet read_input: bool, + // Whether to use MeowCoder or not is_meow: bool, + // Whether MeowCoder::encode reported cut off data is_cut: bool, - // ordered by seg idx + // Currently unacknowledged segments ordered by seg idx unacked_segs: Vec<(SegIdx, UnAcked)>, + // The next segment to send during the initial pass over the data next_seg_idx: u32, + // The next segment to retransmit next_retrans_idx: usize, // NOTE: this is an index into *unacked_segs* array + // Whether we have reached the end of data to transmit (note: check unacked_segs.is_empty() as + // well) eof: bool, } +// A currently unacknowledged segment with a counter for how many times it's been nacked struct UnAcked { payload: SegData, nacks: usize, @@ -116,6 +131,7 @@ where } } + // Reads the next SegData to transmit from inp async fn read_next(&mut self) -> SegData { use tokio::io::AsyncReadExt; if !self.read_input { @@ -147,6 +163,7 @@ where } } + // Gets the next segment index to retransmit, if any fn retrans_seg_idx(&mut self) -> Option { if self.unacked_segs.is_empty() { return None @@ -156,6 +173,7 @@ where Some(seg_idx) } + // Gets the next new segment to transmit, if any async fn fresh_seg_idx(&mut self) -> Option { if self.eof { None @@ -172,6 +190,7 @@ where } } + // Gets a segment to transmit next (either a new segment or a retransmit segment) async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> { let seg_idx = { if let Some(si) = self.fresh_seg_idx().await { @@ -190,7 +209,8 @@ where Some((seg_idx, &self.unacked_segs[i].1.payload)) } - /// `seg_idxs` should be distinct and in increasing order. + // Processes received acks + // `seg_idxs` should be distinct and in increasing order. fn ack(&mut self, ack_seg_idxs: &[SegIdx]) { let mut new_unacked_segs = Vec::new(); for (seg_idx, unack) in self.unacked_segs.drain(..) { @@ -202,20 +222,20 @@ where self.next_retrans_idx = 0; } + // Checks if we are completely done with the upload fn is_file_completed(&self) -> bool { self.eof && self.unacked_segs.is_empty() } } +// Performs upload async fn upload(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error> where IN: AsyncRead + Unpin, { - // TODO: - // 1. prioritize segments to send - // 2. set a timer for sending the next segment - // 3. recieve ack's in the meantime - // 4. adjust timer delay based on ack information + // The general strategy is to send all the segments in order, then go back and retransmit + // the ones that weren't acked in order until all segments are acked. + // We adjust the send rate according to estimations of drop rate based on estimations of RTT enum Evt { Recv(DownMsg), @@ -225,23 +245,29 @@ where let mut src = SegmentSource::new(inp); const DEFAULT_IN_FLIGHT: i32 = 66; + // the number of packets per second to limit to let mut in_flight = DEFAULT_IN_FLIGHT; let mut deadline = Instant::now(); let mut to_send = vec![]; + // this keeps track of stuff that has happened since the last time we adjusted in_flight + // once we have enough data, we make an estimation of rtt and use this to estimate drop rate let mut send_history: Vec<(SegIdx, Instant)> = vec![]; let mut ack_history: Vec<(SegIdx, Instant)> = vec![]; let mut acked: HashMap = HashMap::new(); let mut sent: HashMap = HashMap::new(); let mut rtt_est: Duration = Duration::from_millis(1050); + // fixed point representing droprate*100 as usize let mut drop_est: usize = 0; let mut drop_est_made: bool = false; loop { let now = Instant::now(); - log.debug_msg(format!("rtt_est is {}, drop_est is {}", rtt_est.as_millis(), drop_est)).await; + // log.debug_msg(format!("rtt_est is {}, drop_est is {}", + // rtt_est.as_millis(), drop_est)).await; if drop_est_made { + // we made an estimation, change parameters if drop_est > 50 { in_flight = DEFAULT_IN_FLIGHT; } else if drop_est > 35 { @@ -296,6 +322,7 @@ where log.debug_msg(format!("got {} acks", idxs.len())).await; src.ack(&idxs); + // update the histories of what happened and try to make estimations let mut rtts: Vec = vec![]; for idx in idxs.iter().cloned() { @@ -311,13 +338,18 @@ where } } + // estimate rtt if rtts.len() > 10 { rtts.sort(); + // the lower part of the rtts is going to be closer to the real rtt + // since the receiver batching and drops etc may impact the higher + // sections of the rtt array let slice = &rtts[rtts.len()/5 - 1..=rtts.len()/5 + 1]; let mean = (slice[0] + slice[1] + slice[2]) / 3; rtt_est = mean; } + // estimate drops let mut total_pkts = 0; let mut acked_pkts = 0; for (sent_idx, sent_time) in send_history.iter().rev().cloned() { diff --git a/hptp/src/encoding.rs b/hptp/src/encoding.rs index 487a3fd..7f1f94e 100644 --- a/hptp/src/encoding.rs +++ b/hptp/src/encoding.rs @@ -40,10 +40,12 @@ impl MeowCoder { } } + // Creates a new MeowCoder pub fn new() -> MeowCoder { MeowCoder { line_index: 0 } } + // Converts a single hex character to the 4 bits of data it represents pub fn hex_to_nibble(chr: u8) -> u8 { const _AL: u8 = 'a' as u8; const _FL: u8 = 'f' as u8; @@ -59,6 +61,7 @@ impl MeowCoder { } } + // Converts any u8 into 2 chars of corresponding hex pub fn u8_to_hex(val: u8) -> (u8, u8) { let first = val >> 4; let second = val & 0xF; @@ -66,6 +69,9 @@ impl MeowCoder { (LOOKUP[first as usize], LOOKUP[second as usize]) } + // Encodes a buffer using Meow coding + // Returns a coded buffer and a boolean indicating whether the input was cut off in the middle + // of a hex sequence pub fn encode(input: &Vec) -> (Vec, bool) { let mut out: Vec = Vec::new(); let mut prev_char: u8 = 0; @@ -93,6 +99,8 @@ impl MeowCoder { } } + // Decodes a buffer using Meow coding + // Pass in the input buffer and was_cut from encode() pub fn decode(&mut self, input: &Vec, was_cut: bool) -> Vec { let mut out: Vec = Vec::new(); for (pos, byte) in input.iter().enumerate() { diff --git a/hptp/src/lib.rs b/hptp/src/lib.rs index 849a036..f65314f 100644 --- a/hptp/src/lib.rs +++ b/hptp/src/lib.rs @@ -1,3 +1,5 @@ +// HPTP core functionality + #[macro_use] extern crate thiserror; extern crate byteorder; diff --git a/hptp/src/logger.rs b/hptp/src/logger.rs index 5a02a05..ce237a5 100644 --- a/hptp/src/logger.rs +++ b/hptp/src/logger.rs @@ -3,17 +3,20 @@ extern crate tokio; use std::fmt::Display; use tokio::io::{AsyncWriteExt, Stderr}; +// Handles logging according to the assignment specs pub struct Logger { out: Stderr, } impl Logger { + // creates a new Logger sending to stderr pub fn new() -> Self { Logger { out: tokio::io::stderr(), } } + // Helper function to log a structured message to out async fn log_message(&mut self, msg: LogMessage<'_>) { // These two lines should never fail and we realistically can't do anything about // it if they do. @@ -24,54 +27,65 @@ impl Logger { self.out.flush().await.expect("failed to flush stderr"); } + // Timestamps the given payload and logs it with log_message async fn log_payload(&mut self, payload: LogPayload<'_>) { self.log_message(LogMessage::now(payload)).await } // ----------------------------------------------------------------------------------- + // Logs a debug message pub async fn debug_msg>(&mut self, what: S) { let what = what.as_ref(); self.log_payload(LogPayload::Debug { what }).await } + // Logs a message indicating which port we bound pub async fn bound(&mut self, port: u16) { self.log_payload(LogPayload::Bound { port }).await } + // Logs a message indicating we accepted some data pub async fn recv_data_accepted(&mut self, start: usize, len: usize, order: AcceptedOrder) { self.log_payload(LogPayload::RecvDataAccepted { start, len, order }) .await } + // Logs a message indicating we ignored some received data pub async fn recv_data_ignored(&mut self, start: usize, len: usize) { self.log_payload(LogPayload::RecvDataIgnored { start, len }) .await } + // Logs a message indicating we got an ack pub async fn recv_ack(&mut self, last_offset: usize) { self.log_payload(LogPayload::RecvAck { last_offset }).await } + // Logs a message indicating we got corrupted data pub async fn recv_corrupt(&mut self) { self.log_payload(LogPayload::RecvCorrupt).await } + // Logs a message indicating we sent data pub async fn send_data(&mut self, start: usize, len: usize) { self.log_payload(LogPayload::SendData { start, len }).await } + // Logs a message indicating we completed transmission of data pub async fn completed(&mut self) { self.log_payload(LogPayload::Completed).await } } +// A single timestamped message struct LogMessage<'a> { pub timestamp: chrono::DateTime, pub payload: LogPayload<'a>, } impl<'a> LogMessage<'a> { + // Timestamps a given payload fn now(payload: LogPayload<'a>) -> Self { LogMessage { timestamp: chrono::Local::now(), @@ -80,6 +94,7 @@ impl<'a> LogMessage<'a> { } } +// Represents different kinds of messages we can log in the assignment-specified format enum LogPayload<'a> { Debug { what: &'a str, @@ -107,6 +122,7 @@ enum LogPayload<'a> { Completed, } +// Whether packets were received in order or not #[derive(Copy, Clone, Eq, PartialEq)] pub enum AcceptedOrder { InOrder, @@ -115,6 +131,7 @@ pub enum AcceptedOrder { pub use AcceptedOrder::*; +// Serializes a LogMessage to string impl Display for LogMessage<'_> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { use chrono::Timelike; @@ -130,6 +147,7 @@ impl Display for LogMessage<'_> { } } +// Serializes a LogPayload to string impl Display for LogPayload<'_> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { @@ -149,6 +167,7 @@ impl Display for LogPayload<'_> { } } +// Serializes AcceptedOrder to string impl Display for AcceptedOrder { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.write_str(match self { diff --git a/hptp/src/msg.rs b/hptp/src/msg.rs index d87d319..766cf1a 100644 --- a/hptp/src/msg.rs +++ b/hptp/src/msg.rs @@ -2,11 +2,13 @@ use super::seg::{SegData, SegIdx}; pub use super::seg::{DOWN_HEADER_SIZE, MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE, MAX_SEG_SIZE}; use byteorder::ByteOrder; +// A message containing data for the receiver #[derive(Clone)] pub enum UpMsg { Data { payload: SegData, seg_idx: SegIdx }, } +// An ack message containing info about which segments were received #[derive(Clone)] pub enum DownMsg { /// `idxs` must be distinct and in increasing order. @@ -17,6 +19,7 @@ pub enum DownMsg { #[error("deserialization failed; malformed packet")] pub struct DesError; +// Something that we can serialize and deserialize with u8 slices pub trait SerDes: Sized { // `buf.len()` must be <= `MAX_TOTAL_PACKET_SIZE` fn des(buf: &[u8]) -> Result; @@ -27,6 +30,7 @@ pub trait SerDes: Sized { type BO = byteorder::LE; +// Masks for additional booleans encoded as bits in the segment index field const LAST_SEG_MASK: u32 = 1 << 31; const MEOW_CODED_MASK: u32 = 1 << 30; const CUT_MASK: u32 = 1 << 29; diff --git a/hptp/src/peer.rs b/hptp/src/peer.rs index cb5d48d..b966718 100644 --- a/hptp/src/peer.rs +++ b/hptp/src/peer.rs @@ -4,6 +4,7 @@ use tokio::net::UdpSocket; use super::msg::{self, SerDes}; +// Represents an endpoint we are getting packets from pub struct Peer { sock: UdpSocket, targ: Option, @@ -39,6 +40,7 @@ pub enum SendError { } impl Peer { + // Wraps a local UDP socket in a peer struct pub fn new(sock: UdpSocket) -> Self { Peer { sock, @@ -47,10 +49,12 @@ impl Peer { } } + // Sets the actual address of the peer, once known pub fn set_known_target(&mut self, addr: SocketAddr) { self.targ = Some(addr); } + // Sends as message to the peer pub async fn send(&mut self, msg: S) -> Result<(), SendError> where S: SerDes, @@ -62,6 +66,7 @@ impl Peer { Ok(()) } + // Receives a message from the peer pub async fn recv(&mut self) -> Result where R: SerDes, diff --git a/hptp/src/seg.rs b/hptp/src/seg.rs index 5066093..f2386e3 100644 --- a/hptp/src/seg.rs +++ b/hptp/src/seg.rs @@ -45,46 +45,22 @@ pub struct SegmentSet { } */ +// Represents one segment of data to transfer #[derive(Clone)] pub struct SegData { + // the data pub bytes: Vec, + // whether this is the last segment in the set pub is_last_segment: bool, + // was this encoded with MeowCoder pub is_meow_encoded: bool, + // was this cut off according to MeowCoder::encode pub is_cut: bool, } impl SegData { + // gets the length of the segment data pub fn len(&self) -> usize { self.bytes.len() } - - // pub async fn read(inp: &mut IN) -> Result - // where - // IN: AsyncRead + Unpin, - // { - // use tokio::io::AsyncReadExt; - // let mut buf = [0u8; MAX_SEG_SIZE]; - // let len = inp.read(&mut buf).await.unwrap_or(0); - // if len > 0 { - // Ok(SegData { - // bytes: Vec::from(&buf[..len]), - // is_last_segment: false, - // is_meow_encoded: false, - // }) - // } else { - // Ok(SegData { - // bytes: vec![], - // is_last_segment: true, - // is_meow_encoded: false, - // }) - // } - // } - // - // pub async fn write(&self, out: &mut OUT) -> Result<(), tokio::io::Error> - // where - // OUT: AsyncWrite + Unpin, - // { - // use tokio::io::AsyncWriteExt; - // out.write_all(&self.bytes).await - // } }