begin fleshing out package types

This commit is contained in:
Milo Turner 2020-03-09 16:25:25 -04:00
parent 87fd190378
commit 4ad767e5c4
3 changed files with 75 additions and 27 deletions

View File

@ -63,13 +63,13 @@ where
let mut pos = 0; let mut pos = 0;
loop { loop {
match peer.recv().await { match peer.recv().await {
Ok(UpMsg::Data(data)) => { Ok(UpMsg::Data { payload, .. }) => {
let len = data.len(); let len = payload.len();
out.write_all(&data).await?; out.write_all(&payload).await?;
out.flush().await?; out.flush().await?;
log.recv_data_accepted(pos, len, hptp::logger::InOrder) log.recv_data_accepted(pos, len, hptp::logger::InOrder)
.await; .await;
send(peer, DownMsg::Ack).await?; send(peer, DownMsg::Ack {}).await?;
pos += len; pos += len;
} }

View File

@ -74,7 +74,7 @@ where
IN: AsyncRead + Unpin, IN: AsyncRead + Unpin,
{ {
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut buf = [0u8; msg::MAX_DATA_SIZE]; let mut buf = [0u8; msg::MAX_SEG_SIZE];
let len = inp.read(&mut buf).await?; let len = inp.read(&mut buf).await?;
Ok(if len > 0 { Ok(if len > 0 {
Some(buf[..len].into()) Some(buf[..len].into())
@ -100,7 +100,7 @@ where
loop { loop {
if let Some(ack_len) = next_ack_len { if let Some(ack_len) = next_ack_len {
match peer.recv().await { match peer.recv().await {
Ok(DownMsg::Ack) => { Ok(DownMsg::Ack { .. }) => {
log.recv_ack(pos).await; log.recv_ack(pos).await;
pos += ack_len; pos += ack_len;
next_ack_len = None; next_ack_len = None;
@ -114,7 +114,7 @@ where
Some(data) => { Some(data) => {
next_ack_len = Some(data.len()); next_ack_len = Some(data.len());
log.send_data(pos, data.len()).await; log.send_data(pos, data.len()).await;
send(peer, UpMsg::Data(data)).await?; send(peer, UpMsg::Data { payload: data }).await?;
} }
None => break, None => break,
} }

View File

@ -1,37 +1,89 @@
#[derive(Clone)] /// Per the assignment spec, `1472` is the maximum size packet we're allowed to send.
pub const MAX_SERIALIZED_SIZE: usize = 1472;
// TODO: change these based off the decoders
pub const UP_HEADER_SIZE: usize = 0;
pub const DOWN_HEADER_SIZE: usize = 0;
/// This is the maximum amount of segment data we can fit into a packet.
pub const MAX_SEG_SIZE: usize = MAX_SERIALIZED_SIZE - UP_HEADER_SIZE;
// Note: we can only keep so much file data in RAM, so let's see what would be the
// maximum amount of file data we keep in flux.
//
// 1456 B (MAX_SEG_SIZE) * 2.03% (compression factor for PSHex) = 2.96 KB/seg
// 2.96 KB/seg * 11,648 seg (max possible SEGS_PER_CHUNK) = 34.5 MB
//
// 34 MB is actually much larger than the maximum test case size.
/// This is calculated based on the max size we would need for a bit-field specifying
/// which segments are present in a chunk.
pub const SEG_PER_CHUNK: usize = (MAX_SERIALIZED_SIZE - DOWN_HEADER_SIZE) * 8;
/// Is `u32` big enough to handle all file sizes?
///
/// 1.46 KB/seg * 2^32 seg = 6.3 TB
///
/// If we used `u16` instead we would be just barely OK:
///
/// 1.46 KB/seg * 2^16 seg = 95.4 MB
///
pub type SegIdx = u32;
#[derive(Clone, Debug)]
pub enum UpMsg { pub enum UpMsg {
Data(Vec<u8>), Data {
payload: Vec<u8>,
// seg_idx: SegIdx,
// encoding: SegmentEncoding,
// is_final_packet: bool,
},
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum DownMsg { pub enum DownMsg {
Ack, Ack {}, // ackd: SegmentSet
} }
pub const MAX_DATA_SIZE: usize = 999; /*
pub const MAX_SERIALIZED_SIZE: usize = 1 + MAX_DATA_SIZE; #[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SegmentEncoding {
/// Un-encoded byte sequence.
Raw,
/// "PostScript style" Hex w/ linebreaks: ([0-9a-f]{60}\n)*.
PSHex,
}
#[derive(Clone, Debug)]
pub struct SegmentSet {
pub latest_seg_idx: SegIdx,
pub other_segs: std::collections::HashSet<SegIdx>,
}
*/
#[derive(Error, Debug)] #[derive(Error, Debug)]
#[error("deserialization failed; malformed packet")] #[error("deserialization failed; malformed packet")]
pub struct DesError; pub struct DesError;
pub trait SerDes: Sized { pub trait SerDes: Sized {
fn des(data: &[u8]) -> Result<Self, DesError>; // For both methods: `buf.len()` must be >= `MAX_SERIALIZED_SIZE`
// `buf.len()` must be >= `MAX_SERIALIZED_SIZE` fn des(buf: &[u8]) -> Result<Self, DesError>;
fn ser_to(&self, buf: &mut [u8]) -> usize; fn ser_to(&self, buf: &mut [u8]) -> usize;
} }
impl SerDes for UpMsg { impl SerDes for UpMsg {
fn des(data: &[u8]) -> Result<Self, DesError> { fn des(buf: &[u8]) -> Result<Self, DesError> {
Ok(UpMsg::Data(data.into())) Ok(UpMsg::Data {
payload: buf.into(),
})
} }
fn ser_to(&self, buf: &mut [u8]) -> usize { fn ser_to(&self, buf: &mut [u8]) -> usize {
match self { match self {
UpMsg::Data(data) => { UpMsg::Data { payload, .. } => {
let len = data.len(); let len = payload.len();
buf[..len].copy_from_slice(&data[..]); buf[..len].copy_from_slice(&payload[..]);
len len
} }
} }
@ -39,15 +91,11 @@ impl SerDes for UpMsg {
} }
impl SerDes for DownMsg { impl SerDes for DownMsg {
fn des(data: &[u8]) -> Result<Self, DesError> { fn des(_buf: &[u8]) -> Result<Self, DesError> {
match data.first() { Ok(DownMsg::Ack {})
Some(0) => Ok(DownMsg::Ack),
_ => Err(DesError),
}
} }
fn ser_to(&self, buf: &mut [u8]) -> usize { fn ser_to(&self, _buf: &mut [u8]) -> usize {
buf[0] = 0;
1 1
} }
} }