Add comments

This commit is contained in:
xenia 2020-03-14 18:47:17 -04:00
parent 59f6aa1eed
commit d921fcf556
8 changed files with 114 additions and 39 deletions

View File

@ -23,6 +23,7 @@ enum Error {
},
}
// starts start in a new tokio runtime
fn entry() -> Result<(), Error> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut log = Logger::new();
@ -39,6 +40,7 @@ fn main() {
}
}
// async entry point
async fn start(log: &mut Logger) -> Result<(), Error> {
let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?;
log.bound(sock.local_addr()?.port()).await;
@ -47,17 +49,26 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
download(log, &mut peer, &mut out).await
}
// keeps track of data we have received
struct SegmentSink<'o, OUT> {
// where to write out the data
out: &'o mut OUT,
// a MeowCoder instance for decoding
meow_coder: MeowCoder,
// a map of currently received segments
segs: HashMap<SegIdx, Option<SegData>>,
// which segments we have flushed to out
n_flushed: u32,
// whether we are complete
complete: bool,
}
// what happened when you put a new segment into a sink
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum Put {
// we already had this segment
Duplicate,
// this is a new segment
Fresh,
}
@ -75,6 +86,7 @@ where
}
}
// Flushes segments that we have all in order so far to out
async fn flush(&mut self) {
use tokio::io::AsyncWriteExt;
while let Some(cache) = self.segs.get_mut(&self.n_flushed) {
@ -96,6 +108,7 @@ where
self.out.flush().await.expect("god help us");
}
// Processes a new received segment
async fn put(&mut self, seg_idx: SegIdx, data: SegData) -> Put {
if seg_idx < self.n_flushed || self.segs.contains_key(&seg_idx) {
Put::Duplicate
@ -106,23 +119,30 @@ where
}
}
// Generates the acks to send back
fn ack_idxs(&self) -> Vec<SegIdx> {
let mut idxs: Vec<_> = self.segs.keys().cloned().collect();
idxs.sort();
idxs
}
// Whether the receiving is complete yet
fn is_file_complete(&self) -> bool {
self.complete
}
}
// Downloads a file from a sender
async fn download<OUT>(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) -> Result<(), Error>
where
OUT: AsyncWrite + Unpin,
{
use tokio::time::{Duration, Instant};
// General strategy: Listen for segments coming in, and ack once we have seen some number of
// segments or some time has elapsed. Once we see the eof (or detect that we must have passed
// eof and started retransmitting earlier packets) we ack slightly more often.
enum Evt {
Recv(UpMsg),
Timer,
@ -153,12 +173,14 @@ where
match evt {
Evt::Timer => {
// timeout, make sure to send an ack now
deadline += Duration::from_millis(1000);
log.debug_msg("timeout").await;
ack_buildup += 1;
ack_buildup += 10;
}
Evt::Recv(UpMsg::Data { payload, seg_idx }) => {
// got some data, update our data set
let len = payload.len();
let eof = payload.is_last_segment || seg_idx < max_seen_idx;
max_seen_idx = cmp::max(seg_idx + 1, max_seen_idx);
@ -174,6 +196,7 @@ where
}
if eof && !seen_eof {
// eof seen, probably ack now
seen_eof = true;
ack_buildup += 999;
}
@ -181,11 +204,16 @@ where
};
let num_acks = if sink.is_file_complete() {
// spam out 5 acks on shutdown just to be sure the sender gets at least 1
// instead of taking extra time with a closing handshake we make the assumption
// that at least one of these will go through
5
} else if ack_buildup > 10 {
// spam 2 acks at a time to hopefully avoid drops
ack_buildup = 0;
2
} else {
// send no acks right now
0
};
@ -201,6 +229,7 @@ where
}
if sink.is_file_complete() {
// we're done
break;
}
}

View File

@ -29,6 +29,7 @@ enum Error {
InvalidArgs,
}
// Main function
fn main() {
match entry() {
Err(Error::InvalidArgs) => print_usage(),
@ -47,12 +48,14 @@ fn print_usage() {
print!("Usage:\n./3700send <host-ip>:<host-port>\n")
}
// Starts async code in a new tokio runtime
fn entry() -> Result<(), Error> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut log = Logger::new();
rt.block_on(start(&mut log))
}
// The entry point for the async code
async fn start(log: &mut Logger) -> Result<(), Error> {
let targ_addr = parse_args(std::env::args())?;
let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?;
@ -64,6 +67,7 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
upload(log, &mut peer, &mut out).await
}
// Parses command line arguments
fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Error> {
args.nth(1)
.ok_or(Error::InvalidArgs)?
@ -71,19 +75,30 @@ fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Erro
.map_err(|_| Error::InvalidArgs)
}
// Tracks information about the ongoing data upload
struct SegmentSource<'i, IN> {
// The source of data to upload
inp: &'i mut IN,
// A buffer holding data we read from inp that hasn't been sent yet
inp_buffer: Vec<u8>,
// Whether we read all data from inp yet
read_input: bool,
// Whether to use MeowCoder or not
is_meow: bool,
// Whether MeowCoder::encode reported cut off data
is_cut: bool,
// ordered by seg idx
// Currently unacknowledged segments ordered by seg idx
unacked_segs: Vec<(SegIdx, UnAcked)>,
// The next segment to send during the initial pass over the data
next_seg_idx: u32,
// The next segment to retransmit
next_retrans_idx: usize, // NOTE: this is an index into *unacked_segs* array
// Whether we have reached the end of data to transmit (note: check unacked_segs.is_empty() as
// well)
eof: bool,
}
// A currently unacknowledged segment with a counter for how many times it's been nacked
struct UnAcked {
payload: SegData,
nacks: usize,
@ -116,6 +131,7 @@ where
}
}
// Reads the next SegData to transmit from inp
async fn read_next(&mut self) -> SegData {
use tokio::io::AsyncReadExt;
if !self.read_input {
@ -147,6 +163,7 @@ where
}
}
// Gets the next segment index to retransmit, if any
fn retrans_seg_idx(&mut self) -> Option<SegIdx> {
if self.unacked_segs.is_empty() {
return None
@ -156,6 +173,7 @@ where
Some(seg_idx)
}
// Gets the next new segment to transmit, if any
async fn fresh_seg_idx(&mut self) -> Option<SegIdx> {
if self.eof {
None
@ -172,6 +190,7 @@ where
}
}
// Gets a segment to transmit next (either a new segment or a retransmit segment)
async fn get_segment(&mut self) -> Option<(SegIdx, &SegData)> {
let seg_idx = {
if let Some(si) = self.fresh_seg_idx().await {
@ -190,7 +209,8 @@ where
Some((seg_idx, &self.unacked_segs[i].1.payload))
}
/// `seg_idxs` should be distinct and in increasing order.
// Processes received acks
// `seg_idxs` should be distinct and in increasing order.
fn ack(&mut self, ack_seg_idxs: &[SegIdx]) {
let mut new_unacked_segs = Vec::new();
for (seg_idx, unack) in self.unacked_segs.drain(..) {
@ -202,20 +222,20 @@ where
self.next_retrans_idx = 0;
}
// Checks if we are completely done with the upload
fn is_file_completed(&self) -> bool {
self.eof && self.unacked_segs.is_empty()
}
}
// Performs upload
async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error>
where
IN: AsyncRead + Unpin,
{
// TODO:
// 1. prioritize segments to send
// 2. set a timer for sending the next segment
// 3. recieve ack's in the meantime
// 4. adjust timer delay based on ack information
// The general strategy is to send all the segments in order, then go back and retransmit
// the ones that weren't acked in order until all segments are acked.
// We adjust the send rate according to estimations of drop rate based on estimations of RTT
enum Evt {
Recv(DownMsg),
@ -225,23 +245,29 @@ where
let mut src = SegmentSource::new(inp);
const DEFAULT_IN_FLIGHT: i32 = 66;
// the number of packets per second to limit to
let mut in_flight = DEFAULT_IN_FLIGHT;
let mut deadline = Instant::now();
let mut to_send = vec![];
// this keeps track of stuff that has happened since the last time we adjusted in_flight
// once we have enough data, we make an estimation of rtt and use this to estimate drop rate
let mut send_history: Vec<(SegIdx, Instant)> = vec![];
let mut ack_history: Vec<(SegIdx, Instant)> = vec![];
let mut acked: HashMap<SegIdx, Instant> = HashMap::new();
let mut sent: HashMap<SegIdx, Instant> = HashMap::new();
let mut rtt_est: Duration = Duration::from_millis(1050);
// fixed point representing droprate*100 as usize
let mut drop_est: usize = 0;
let mut drop_est_made: bool = false;
loop {
let now = Instant::now();
log.debug_msg(format!("rtt_est is {}, drop_est is {}", rtt_est.as_millis(), drop_est)).await;
// log.debug_msg(format!("rtt_est is {}, drop_est is {}",
// rtt_est.as_millis(), drop_est)).await;
if drop_est_made {
// we made an estimation, change parameters
if drop_est > 50 {
in_flight = DEFAULT_IN_FLIGHT;
} else if drop_est > 35 {
@ -296,6 +322,7 @@ where
log.debug_msg(format!("got {} acks", idxs.len())).await;
src.ack(&idxs);
// update the histories of what happened and try to make estimations
let mut rtts: Vec<Duration> = vec![];
for idx in idxs.iter().cloned() {
@ -311,13 +338,18 @@ where
}
}
// estimate rtt
if rtts.len() > 10 {
rtts.sort();
// the lower part of the rtts is going to be closer to the real rtt
// since the receiver batching and drops etc may impact the higher
// sections of the rtt array
let slice = &rtts[rtts.len()/5 - 1..=rtts.len()/5 + 1];
let mean = (slice[0] + slice[1] + slice[2]) / 3;
rtt_est = mean;
}
// estimate drops
let mut total_pkts = 0;
let mut acked_pkts = 0;
for (sent_idx, sent_time) in send_history.iter().rev().cloned() {

View File

@ -40,10 +40,12 @@ impl MeowCoder {
}
}
// Creates a new MeowCoder
pub fn new() -> MeowCoder {
MeowCoder { line_index: 0 }
}
// Converts a single hex character to the 4 bits of data it represents
pub fn hex_to_nibble(chr: u8) -> u8 {
const _AL: u8 = 'a' as u8;
const _FL: u8 = 'f' as u8;
@ -59,6 +61,7 @@ impl MeowCoder {
}
}
// Converts any u8 into 2 chars of corresponding hex
pub fn u8_to_hex(val: u8) -> (u8, u8) {
let first = val >> 4;
let second = val & 0xF;
@ -66,6 +69,9 @@ impl MeowCoder {
(LOOKUP[first as usize], LOOKUP[second as usize])
}
// Encodes a buffer using Meow coding
// Returns a coded buffer and a boolean indicating whether the input was cut off in the middle
// of a hex sequence
pub fn encode(input: &Vec<u8>) -> (Vec<u8>, bool) {
let mut out: Vec<u8> = Vec::new();
let mut prev_char: u8 = 0;
@ -93,6 +99,8 @@ impl MeowCoder {
}
}
// Decodes a buffer using Meow coding
// Pass in the input buffer and was_cut from encode()
pub fn decode(&mut self, input: &Vec<u8>, was_cut: bool) -> Vec<u8> {
let mut out: Vec<u8> = Vec::new();
for (pos, byte) in input.iter().enumerate() {

View File

@ -1,3 +1,5 @@
// HPTP core functionality
#[macro_use]
extern crate thiserror;
extern crate byteorder;

View File

@ -3,17 +3,20 @@ extern crate tokio;
use std::fmt::Display;
use tokio::io::{AsyncWriteExt, Stderr};
// Handles logging according to the assignment specs
pub struct Logger {
out: Stderr,
}
impl Logger {
// creates a new Logger sending to stderr
pub fn new() -> Self {
Logger {
out: tokio::io::stderr(),
}
}
// Helper function to log a structured message to out
async fn log_message(&mut self, msg: LogMessage<'_>) {
// These two lines should never fail and we realistically can't do anything about
// it if they do.
@ -24,54 +27,65 @@ impl Logger {
self.out.flush().await.expect("failed to flush stderr");
}
// Timestamps the given payload and logs it with log_message
async fn log_payload(&mut self, payload: LogPayload<'_>) {
self.log_message(LogMessage::now(payload)).await
}
// -----------------------------------------------------------------------------------
// Logs a debug message
pub async fn debug_msg<S: AsRef<str>>(&mut self, what: S) {
let what = what.as_ref();
self.log_payload(LogPayload::Debug { what }).await
}
// Logs a message indicating which port we bound
pub async fn bound(&mut self, port: u16) {
self.log_payload(LogPayload::Bound { port }).await
}
// Logs a message indicating we accepted some data
pub async fn recv_data_accepted(&mut self, start: usize, len: usize, order: AcceptedOrder) {
self.log_payload(LogPayload::RecvDataAccepted { start, len, order })
.await
}
// Logs a message indicating we ignored some received data
pub async fn recv_data_ignored(&mut self, start: usize, len: usize) {
self.log_payload(LogPayload::RecvDataIgnored { start, len })
.await
}
// Logs a message indicating we got an ack
pub async fn recv_ack(&mut self, last_offset: usize) {
self.log_payload(LogPayload::RecvAck { last_offset }).await
}
// Logs a message indicating we got corrupted data
pub async fn recv_corrupt(&mut self) {
self.log_payload(LogPayload::RecvCorrupt).await
}
// Logs a message indicating we sent data
pub async fn send_data(&mut self, start: usize, len: usize) {
self.log_payload(LogPayload::SendData { start, len }).await
}
// Logs a message indicating we completed transmission of data
pub async fn completed(&mut self) {
self.log_payload(LogPayload::Completed).await
}
}
// A single timestamped message
struct LogMessage<'a> {
pub timestamp: chrono::DateTime<chrono::Local>,
pub payload: LogPayload<'a>,
}
impl<'a> LogMessage<'a> {
// Timestamps a given payload
fn now(payload: LogPayload<'a>) -> Self {
LogMessage {
timestamp: chrono::Local::now(),
@ -80,6 +94,7 @@ impl<'a> LogMessage<'a> {
}
}
// Represents different kinds of messages we can log in the assignment-specified format
enum LogPayload<'a> {
Debug {
what: &'a str,
@ -107,6 +122,7 @@ enum LogPayload<'a> {
Completed,
}
// Whether packets were received in order or not
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum AcceptedOrder {
InOrder,
@ -115,6 +131,7 @@ pub enum AcceptedOrder {
pub use AcceptedOrder::*;
// Serializes a LogMessage to string
impl Display for LogMessage<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use chrono::Timelike;
@ -130,6 +147,7 @@ impl Display for LogMessage<'_> {
}
}
// Serializes a LogPayload to string
impl Display for LogPayload<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
@ -149,6 +167,7 @@ impl Display for LogPayload<'_> {
}
}
// Serializes AcceptedOrder to string
impl Display for AcceptedOrder {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str(match self {

View File

@ -2,11 +2,13 @@ use super::seg::{SegData, SegIdx};
pub use super::seg::{DOWN_HEADER_SIZE, MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE, MAX_SEG_SIZE};
use byteorder::ByteOrder;
// A message containing data for the receiver
#[derive(Clone)]
pub enum UpMsg {
Data { payload: SegData, seg_idx: SegIdx },
}
// An ack message containing info about which segments were received
#[derive(Clone)]
pub enum DownMsg {
/// `idxs` must be distinct and in increasing order.
@ -17,6 +19,7 @@ pub enum DownMsg {
#[error("deserialization failed; malformed packet")]
pub struct DesError;
// Something that we can serialize and deserialize with u8 slices
pub trait SerDes: Sized {
// `buf.len()` must be <= `MAX_TOTAL_PACKET_SIZE`
fn des(buf: &[u8]) -> Result<Self, DesError>;
@ -27,6 +30,7 @@ pub trait SerDes: Sized {
type BO = byteorder::LE;
// Masks for additional booleans encoded as bits in the segment index field
const LAST_SEG_MASK: u32 = 1 << 31;
const MEOW_CODED_MASK: u32 = 1 << 30;
const CUT_MASK: u32 = 1 << 29;

View File

@ -4,6 +4,7 @@ use tokio::net::UdpSocket;
use super::msg::{self, SerDes};
// Represents an endpoint we are getting packets from
pub struct Peer<S, R> {
sock: UdpSocket,
targ: Option<SocketAddr>,
@ -39,6 +40,7 @@ pub enum SendError {
}
impl<S, R> Peer<S, R> {
// Wraps a local UDP socket in a peer struct
pub fn new(sock: UdpSocket) -> Self {
Peer {
sock,
@ -47,10 +49,12 @@ impl<S, R> Peer<S, R> {
}
}
// Sets the actual address of the peer, once known
pub fn set_known_target(&mut self, addr: SocketAddr) {
self.targ = Some(addr);
}
// Sends as message to the peer
pub async fn send(&mut self, msg: S) -> Result<(), SendError>
where
S: SerDes,
@ -62,6 +66,7 @@ impl<S, R> Peer<S, R> {
Ok(())
}
// Receives a message from the peer
pub async fn recv(&mut self) -> Result<R, RecvError>
where
R: SerDes,

View File

@ -45,46 +45,22 @@ pub struct SegmentSet {
}
*/
// Represents one segment of data to transfer
#[derive(Clone)]
pub struct SegData {
// the data
pub bytes: Vec<u8>,
// whether this is the last segment in the set
pub is_last_segment: bool,
// was this encoded with MeowCoder
pub is_meow_encoded: bool,
// was this cut off according to MeowCoder::encode
pub is_cut: bool,
}
impl SegData {
// gets the length of the segment data
pub fn len(&self) -> usize {
self.bytes.len()
}
// pub async fn read<IN>(inp: &mut IN) -> Result<SegData, tokio::io::Error>
// where
// IN: AsyncRead + Unpin,
// {
// use tokio::io::AsyncReadExt;
// let mut buf = [0u8; MAX_SEG_SIZE];
// let len = inp.read(&mut buf).await.unwrap_or(0);
// if len > 0 {
// Ok(SegData {
// bytes: Vec::from(&buf[..len]),
// is_last_segment: false,
// is_meow_encoded: false,
// })
// } else {
// Ok(SegData {
// bytes: vec![],
// is_last_segment: true,
// is_meow_encoded: false,
// })
// }
// }
//
// pub async fn write<OUT>(&self, out: &mut OUT) -> Result<(), tokio::io::Error>
// where
// OUT: AsyncWrite + Unpin,
// {
// use tokio::io::AsyncWriteExt;
// out.write_all(&self.bytes).await
// }
}