diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index d9b0184..f0bc835 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -63,13 +63,13 @@ where let mut pos = 0; loop { match peer.recv().await { - Ok(UpMsg::Data(data)) => { - let len = data.len(); - out.write_all(&data).await?; + Ok(UpMsg::Data { payload, .. }) => { + let len = payload.len(); + out.write_all(&payload).await?; out.flush().await?; log.recv_data_accepted(pos, len, hptp::logger::InOrder) .await; - send(peer, DownMsg::Ack).await?; + send(peer, DownMsg::Ack {}).await?; pos += len; } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 9f8b9d6..7c0236e 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -74,7 +74,7 @@ where IN: AsyncRead + Unpin, { use tokio::io::AsyncReadExt; - let mut buf = [0u8; msg::MAX_DATA_SIZE]; + let mut buf = [0u8; msg::MAX_SEG_SIZE]; let len = inp.read(&mut buf).await?; Ok(if len > 0 { Some(buf[..len].into()) @@ -100,7 +100,7 @@ where loop { if let Some(ack_len) = next_ack_len { match peer.recv().await { - Ok(DownMsg::Ack) => { + Ok(DownMsg::Ack { .. }) => { log.recv_ack(pos).await; pos += ack_len; next_ack_len = None; @@ -114,7 +114,7 @@ where Some(data) => { next_ack_len = Some(data.len()); log.send_data(pos, data.len()).await; - send(peer, UpMsg::Data(data)).await?; + send(peer, UpMsg::Data { payload: data }).await?; } None => break, } diff --git a/hptp/src/msg.rs b/hptp/src/msg.rs index b18f326..2314ac9 100644 --- a/hptp/src/msg.rs +++ b/hptp/src/msg.rs @@ -1,37 +1,89 @@ -#[derive(Clone)] +/// Per the assignment spec, `1472` is the maximum size packet we're allowed to send. +pub const MAX_SERIALIZED_SIZE: usize = 1472; + +// TODO: change these based off the decoders +pub const UP_HEADER_SIZE: usize = 0; +pub const DOWN_HEADER_SIZE: usize = 0; + +/// This is the maximum amount of segment data we can fit into a packet. +pub const MAX_SEG_SIZE: usize = MAX_SERIALIZED_SIZE - UP_HEADER_SIZE; + +// Note: we can only keep so much file data in RAM, so let's see what would be the +// maximum amount of file data we keep in flux. +// +// 1456 B (MAX_SEG_SIZE) * 2.03% (compression factor for PSHex) = 2.96 KB/seg +// 2.96 KB/seg * 11,648 seg (max possible SEGS_PER_CHUNK) = 34.5 MB +// +// 34 MB is actually much larger than the maximum test case size. + +/// This is calculated based on the max size we would need for a bit-field specifying +/// which segments are present in a chunk. +pub const SEG_PER_CHUNK: usize = (MAX_SERIALIZED_SIZE - DOWN_HEADER_SIZE) * 8; + +/// Is `u32` big enough to handle all file sizes? +/// +/// 1.46 KB/seg * 2^32 seg = 6.3 TB +/// +/// If we used `u16` instead we would be just barely OK: +/// +/// 1.46 KB/seg * 2^16 seg = 95.4 MB +/// +pub type SegIdx = u32; + +#[derive(Clone, Debug)] pub enum UpMsg { - Data(Vec), + Data { + payload: Vec, + // seg_idx: SegIdx, + // encoding: SegmentEncoding, + // is_final_packet: bool, + }, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum DownMsg { - Ack, + Ack {}, // ackd: SegmentSet } -pub const MAX_DATA_SIZE: usize = 999; -pub const MAX_SERIALIZED_SIZE: usize = 1 + MAX_DATA_SIZE; +/* +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum SegmentEncoding { + /// Un-encoded byte sequence. + Raw, + /// "PostScript style" Hex w/ linebreaks: ([0-9a-f]{60}\n)*. + PSHex, +} + +#[derive(Clone, Debug)] +pub struct SegmentSet { + pub latest_seg_idx: SegIdx, + pub other_segs: std::collections::HashSet, +} + */ #[derive(Error, Debug)] #[error("deserialization failed; malformed packet")] pub struct DesError; pub trait SerDes: Sized { - fn des(data: &[u8]) -> Result; + // For both methods: `buf.len()` must be >= `MAX_SERIALIZED_SIZE` - // `buf.len()` must be >= `MAX_SERIALIZED_SIZE` + fn des(buf: &[u8]) -> Result; fn ser_to(&self, buf: &mut [u8]) -> usize; } impl SerDes for UpMsg { - fn des(data: &[u8]) -> Result { - Ok(UpMsg::Data(data.into())) + fn des(buf: &[u8]) -> Result { + Ok(UpMsg::Data { + payload: buf.into(), + }) } fn ser_to(&self, buf: &mut [u8]) -> usize { match self { - UpMsg::Data(data) => { - let len = data.len(); - buf[..len].copy_from_slice(&data[..]); + UpMsg::Data { payload, .. } => { + let len = payload.len(); + buf[..len].copy_from_slice(&payload[..]); len } } @@ -39,15 +91,11 @@ impl SerDes for UpMsg { } impl SerDes for DownMsg { - fn des(data: &[u8]) -> Result { - match data.first() { - Some(0) => Ok(DownMsg::Ack), - _ => Err(DesError), - } + fn des(_buf: &[u8]) -> Result { + Ok(DownMsg::Ack {}) } - fn ser_to(&self, buf: &mut [u8]) -> usize { - buf[0] = 0; + fn ser_to(&self, _buf: &mut [u8]) -> usize { 1 } }