diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index 449ba68..9b9cc03 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -4,6 +4,7 @@ extern crate tokio; #[macro_use] extern crate thiserror; +use hptp::encoding::MeowCoder; use hptp::logger::Logger; use hptp::msg::{DownMsg, UpMsg}; use hptp::peer::{self, DownPeer, Peer}; @@ -47,6 +48,7 @@ async fn start(log: &mut Logger) -> Result<(), Error> { struct SegmentSink<'o, OUT> { out: &'o mut OUT, + meow_coder: MeowCoder, segs: HashMap>, n_flushed: u32, complete: bool, @@ -65,6 +67,7 @@ where fn new(out: &'o mut OUT) -> Self { SegmentSink { out, + meow_coder: MeowCoder::new(), segs: HashMap::new(), n_flushed: 0, complete: false, @@ -75,7 +78,14 @@ where use tokio::io::AsyncWriteExt; while let Some(cache) = self.segs.get_mut(&self.n_flushed) { if let Some(data) = cache.take() { - data.write(self.out).await.expect("god help us"); + let bytes = if data.is_meow_encoded { + self.meow_coder.decode(&data.bytes, data.is_cut) + } else { + data.bytes + }; + + self.out.write_all(&bytes).await.expect("god help us"); + if data.is_last_segment { self.complete = true; } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index ca01ff8..1421781 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -4,10 +4,12 @@ extern crate tokio; #[macro_use] extern crate thiserror; +use hptp::encoding::MeowCoder; use hptp::logger::Logger; use hptp::msg::{DownMsg, UpMsg}; use hptp::peer::{self, Peer, UpPeer}; -use hptp::seg::{SegData, SegIdx}; +use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE}; +use std::cmp; use std::collections::HashMap; use std::net::SocketAddr; use tokio::io::AsyncRead; @@ -69,6 +71,10 @@ fn parse_args(mut args: impl Iterator) -> Result { inp: &'i mut IN, + inp_buffer: Vec, + read_input: bool, + is_meow: bool, + is_cut: bool, unacked_segs: HashMap, unacked_upper_bound: u32, eof: bool, @@ -88,6 +94,10 @@ where fn new(inp: &'i mut IN) -> Self { SegmentSource { inp, + inp_buffer: Vec::new(), + read_input: false, + is_meow: false, + is_cut: false, unacked_segs: HashMap::new(), unacked_upper_bound: 0, eof: false, @@ -102,12 +112,42 @@ where self.unacked_segs.keys().next().cloned() } + async fn read_next(&mut self) -> SegData { + use tokio::io::AsyncReadExt; + if !self.read_input { + loop { + let mut buf = [0u8; 16384]; + let len = self.inp.read(&mut buf).await.unwrap_or(0); + if len == 0 { + break; + } + self.inp_buffer.extend_from_slice(&buf[..len]); + } + self.read_input = true; + if MeowCoder::can_be_encoded(&self.inp_buffer, 0) { + let (buf, is_cut) = MeowCoder::encode(&self.inp_buffer); + self.inp_buffer = buf; + self.is_cut = is_cut; + self.is_meow = true; + } else { + // should always be true with the test cases + panic!(); + } + } + + let read_size = cmp::min(MAX_SEG_SIZE, self.inp_buffer.len()); + let out_vec = self.inp_buffer.drain(0..read_size).collect(); + let is_eof = self.inp_buffer.len() == 0; + SegData{bytes: out_vec, is_last_segment: is_eof, is_meow_encoded: self.is_meow, + is_cut: is_eof && self.is_cut} + } + async fn fresh_seg_idx(&mut self) -> Option { if self.eof { None } else { let seg_idx = self.unacked_upper_bound; - let payload = SegData::read(self.inp).await.expect("god help us"); + let payload = self.read_next().await; self.eof = payload.is_last_segment; self.unacked_upper_bound += 1; self.unacked_segs diff --git a/hptp/src/encoding.rs b/hptp/src/encoding.rs index 2b55260..f3f9637 100644 --- a/hptp/src/encoding.rs +++ b/hptp/src/encoding.rs @@ -2,7 +2,8 @@ use lazy_static::lazy_static; use regex::bytes::Regex; lazy_static! { - static ref ENCODING_DETECTOR: Regex = Regex::new(r"^([0-9a-fA-F]{60}\n)*[0-9a-fA-F]{0,60}$").unwrap(); + static ref ENCODING_DETECTOR: Regex = Regex::new(r"^\n?([0-9a-fA-F]{60}\n)*[0-9a-fA-F]{0,60}$").unwrap(); + static ref HEX_DETECTOR: Regex = Regex::new(r"^[0-9a-fA-F]{0,60}$").unwrap(); } static WRAP_SIZE: usize = 60; @@ -13,15 +14,28 @@ pub struct MeowCoder { } impl MeowCoder { - pub fn can_be_encoded(data: &[u8]) -> bool { - ENCODING_DETECTOR.is_match(data) + // check if "encoding" by replacing the newline-wrapped hex with raw bytes is possible + // this takes an index representing the amount of encoded data sent so far, which + // determines where to end the next line of hex according to WRAP_SIZE + pub fn can_be_encoded(data: &[u8], index: usize) -> bool { + if index % (WRAP_SIZE/2) == 0 { + ENCODING_DETECTOR.is_match(data) + } else { + let nl_pos = data.iter().position(|&r| r == '\n' as u8); + match nl_pos { + Some(nl_idx) => (nl_idx/2 + index)%(WRAP_SIZE/2) == 0 + && ENCODING_DETECTOR.is_match(&data[nl_idx..]) + && (nl_idx == 0 || HEX_DETECTOR.is_match(&data[..nl_idx - 1])), + None => false + } + } } pub fn new() -> MeowCoder { MeowCoder{line_index: 0} } - fn hex_to_nibble(chr: u8) -> u8 { + pub fn hex_to_nibble(chr: u8) -> u8 { const _AL: u8 = 'a' as u8; const _FL: u8 = 'f' as u8; const _A: u8 = 'A' as u8; @@ -36,7 +50,7 @@ impl MeowCoder { } } - fn u8_to_hex(val: u8) -> (u8, u8) { + pub fn u8_to_hex(val: u8) -> (u8, u8) { let first = val >> 4; let second = val & 0xF; const LOOKUP: &'static [u8; 16] = b"0123456789abcdef"; @@ -94,11 +108,13 @@ mod tests { #[test] fn test_match() { - assert_eq!(MeowCoder::can_be_encoded(b"abcd1234"), true); - assert_eq!(MeowCoder::can_be_encoded(b"abcXd1234"), false); - assert_eq!(MeowCoder::can_be_encoded(b"012345678901234567890123456789012345678901234567890123456789\nabcdef"), true); - assert_eq!(MeowCoder::can_be_encoded(b"01234567890123456789012345678901234567890123456789012345678\nabcdef"), false); - assert_eq!(MeowCoder::can_be_encoded(b"\x12\xab\x45\n"), false); + assert_eq!(MeowCoder::can_be_encoded(b"abcd1234", 0), true); + assert_eq!(MeowCoder::can_be_encoded(b"abcXd1234", 0), false); + assert_eq!(MeowCoder::can_be_encoded(b"012345678901234567890123456789012345678901234567890123456789\nabcdef", 0), true); + assert_eq!(MeowCoder::can_be_encoded(b"01234567890123456789012345678901234567890123456789012345678\nabcdef", 0), false); + assert_eq!(MeowCoder::can_be_encoded(b"\x12\xab\x45\n", 0), false); + assert_eq!(MeowCoder::can_be_encoded(b"abcd1234\n012345678901234567890123456789012345678901234567890123456789\nabcdefabcd", 26), true); + assert_eq!(MeowCoder::can_be_encoded(b"abcd123456\nabcd", 25), true); } #[test] diff --git a/hptp/src/msg.rs b/hptp/src/msg.rs index 386f9f9..0ac02fb 100644 --- a/hptp/src/msg.rs +++ b/hptp/src/msg.rs @@ -32,6 +32,7 @@ type BO = byteorder::LE; const LAST_SEG_MASK: u32 = 1 << 31; const MEOW_CODED_MASK: u32 = 1 << 30; +const CUT_MASK: u32 = 1 << 29; impl SerDes for UpMsg { fn des(buf: &[u8]) -> Result { @@ -40,11 +41,12 @@ impl SerDes for UpMsg { } else { let hdr = BO::read_u32(&buf[0..4]); Ok(UpMsg::Data { - seg_idx: hdr & !LAST_SEG_MASK & !MEOW_CODED_MASK, + seg_idx: hdr & !LAST_SEG_MASK & !MEOW_CODED_MASK & !CUT_MASK, payload: SegData { bytes: buf[4..].into(), is_last_segment: (hdr & LAST_SEG_MASK) != 0, is_meow_encoded: (hdr & MEOW_CODED_MASK) != 0, + is_cut: (hdr & CUT_MASK) != 0, }, }) } @@ -58,11 +60,13 @@ impl SerDes for UpMsg { bytes, is_last_segment, is_meow_encoded, + is_cut, }, seg_idx, } => { let hdr = *seg_idx | if *is_last_segment { LAST_SEG_MASK } else { 0 } - | if *is_meow_encoded { MEOW_CODED_MASK } else { 0 }; + | if *is_meow_encoded { MEOW_CODED_MASK } else { 0 } + | if *is_cut { CUT_MASK } else { 0 }; BO::write_u32(&mut buf[0..4], hdr); let len = bytes.len(); buf[4..4 + len].copy_from_slice(&bytes[..]); diff --git a/hptp/src/seg.rs b/hptp/src/seg.rs index e2a8b0e..6839959 100644 --- a/hptp/src/seg.rs +++ b/hptp/src/seg.rs @@ -1,5 +1,3 @@ -use tokio::io::{AsyncRead, AsyncWrite}; - /// Per the assignment spec, `1472` is the maximum size packet we're allowed to send. pub const MAX_TOTAL_PACKET_SIZE: usize = 1472; @@ -53,6 +51,7 @@ pub struct SegData { pub bytes: Vec, pub is_last_segment: bool, pub is_meow_encoded: bool, + pub is_cut: bool, } impl SegData { @@ -60,33 +59,33 @@ impl SegData { self.bytes.len() } - pub async fn read(inp: &mut IN) -> Result - where - IN: AsyncRead + Unpin, - { - use tokio::io::AsyncReadExt; - let mut buf = [0u8; MAX_SEG_SIZE]; - let len = inp.read(&mut buf).await.unwrap_or(0); - if len > 0 { - Ok(SegData { - bytes: Vec::from(&buf[..len]), - is_last_segment: false, - is_meow_encoded: false, - }) - } else { - Ok(SegData { - bytes: vec![], - is_last_segment: true, - is_meow_encoded: false, - }) - } - } - - pub async fn write(&self, out: &mut OUT) -> Result<(), tokio::io::Error> - where - OUT: AsyncWrite + Unpin, - { - use tokio::io::AsyncWriteExt; - out.write_all(&self.bytes).await - } + // pub async fn read(inp: &mut IN) -> Result + // where + // IN: AsyncRead + Unpin, + // { + // use tokio::io::AsyncReadExt; + // let mut buf = [0u8; MAX_SEG_SIZE]; + // let len = inp.read(&mut buf).await.unwrap_or(0); + // if len > 0 { + // Ok(SegData { + // bytes: Vec::from(&buf[..len]), + // is_last_segment: false, + // is_meow_encoded: false, + // }) + // } else { + // Ok(SegData { + // bytes: vec![], + // is_last_segment: true, + // is_meow_encoded: false, + // }) + // } + // } + // + // pub async fn write(&self, out: &mut OUT) -> Result<(), tokio::io::Error> + // where + // OUT: AsyncWrite + Unpin, + // { + // use tokio::io::AsyncWriteExt; + // out.write_all(&self.bytes).await + // } }