Integrate hex encoder/decoder with hptp protocol

This commit is contained in:
xenia 2020-03-13 23:27:50 -04:00
parent a6d4af87ce
commit 269bb8b104
5 changed files with 115 additions and 46 deletions

View File

@ -4,6 +4,7 @@ extern crate tokio;
#[macro_use] #[macro_use]
extern crate thiserror; extern crate thiserror;
use hptp::encoding::MeowCoder;
use hptp::logger::Logger; use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg}; use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, DownPeer, Peer}; use hptp::peer::{self, DownPeer, Peer};
@ -47,6 +48,7 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
struct SegmentSink<'o, OUT> { struct SegmentSink<'o, OUT> {
out: &'o mut OUT, out: &'o mut OUT,
meow_coder: MeowCoder,
segs: HashMap<SegIdx, Option<SegData>>, segs: HashMap<SegIdx, Option<SegData>>,
n_flushed: u32, n_flushed: u32,
complete: bool, complete: bool,
@ -65,6 +67,7 @@ where
fn new(out: &'o mut OUT) -> Self { fn new(out: &'o mut OUT) -> Self {
SegmentSink { SegmentSink {
out, out,
meow_coder: MeowCoder::new(),
segs: HashMap::new(), segs: HashMap::new(),
n_flushed: 0, n_flushed: 0,
complete: false, complete: false,
@ -75,7 +78,14 @@ where
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
while let Some(cache) = self.segs.get_mut(&self.n_flushed) { while let Some(cache) = self.segs.get_mut(&self.n_flushed) {
if let Some(data) = cache.take() { if let Some(data) = cache.take() {
data.write(self.out).await.expect("god help us"); let bytes = if data.is_meow_encoded {
self.meow_coder.decode(&data.bytes, data.is_cut)
} else {
data.bytes
};
self.out.write_all(&bytes).await.expect("god help us");
if data.is_last_segment { if data.is_last_segment {
self.complete = true; self.complete = true;
} }

View File

@ -4,10 +4,12 @@ extern crate tokio;
#[macro_use] #[macro_use]
extern crate thiserror; extern crate thiserror;
use hptp::encoding::MeowCoder;
use hptp::logger::Logger; use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg}; use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, Peer, UpPeer}; use hptp::peer::{self, Peer, UpPeer};
use hptp::seg::{SegData, SegIdx}; use hptp::seg::{SegData, SegIdx, MAX_SEG_SIZE};
use std::cmp;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
@ -69,6 +71,10 @@ fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Erro
struct SegmentSource<'i, IN> { struct SegmentSource<'i, IN> {
inp: &'i mut IN, inp: &'i mut IN,
inp_buffer: Vec<u8>,
read_input: bool,
is_meow: bool,
is_cut: bool,
unacked_segs: HashMap<SegIdx, UnAcked>, unacked_segs: HashMap<SegIdx, UnAcked>,
unacked_upper_bound: u32, unacked_upper_bound: u32,
eof: bool, eof: bool,
@ -88,6 +94,10 @@ where
fn new(inp: &'i mut IN) -> Self { fn new(inp: &'i mut IN) -> Self {
SegmentSource { SegmentSource {
inp, inp,
inp_buffer: Vec::new(),
read_input: false,
is_meow: false,
is_cut: false,
unacked_segs: HashMap::new(), unacked_segs: HashMap::new(),
unacked_upper_bound: 0, unacked_upper_bound: 0,
eof: false, eof: false,
@ -102,12 +112,42 @@ where
self.unacked_segs.keys().next().cloned() self.unacked_segs.keys().next().cloned()
} }
async fn read_next(&mut self) -> SegData {
use tokio::io::AsyncReadExt;
if !self.read_input {
loop {
let mut buf = [0u8; 16384];
let len = self.inp.read(&mut buf).await.unwrap_or(0);
if len == 0 {
break;
}
self.inp_buffer.extend_from_slice(&buf[..len]);
}
self.read_input = true;
if MeowCoder::can_be_encoded(&self.inp_buffer, 0) {
let (buf, is_cut) = MeowCoder::encode(&self.inp_buffer);
self.inp_buffer = buf;
self.is_cut = is_cut;
self.is_meow = true;
} else {
// should always be true with the test cases
panic!();
}
}
let read_size = cmp::min(MAX_SEG_SIZE, self.inp_buffer.len());
let out_vec = self.inp_buffer.drain(0..read_size).collect();
let is_eof = self.inp_buffer.len() == 0;
SegData{bytes: out_vec, is_last_segment: is_eof, is_meow_encoded: self.is_meow,
is_cut: is_eof && self.is_cut}
}
async fn fresh_seg_idx(&mut self) -> Option<SegIdx> { async fn fresh_seg_idx(&mut self) -> Option<SegIdx> {
if self.eof { if self.eof {
None None
} else { } else {
let seg_idx = self.unacked_upper_bound; let seg_idx = self.unacked_upper_bound;
let payload = SegData::read(self.inp).await.expect("god help us"); let payload = self.read_next().await;
self.eof = payload.is_last_segment; self.eof = payload.is_last_segment;
self.unacked_upper_bound += 1; self.unacked_upper_bound += 1;
self.unacked_segs self.unacked_segs

View File

@ -2,7 +2,8 @@ use lazy_static::lazy_static;
use regex::bytes::Regex; use regex::bytes::Regex;
lazy_static! { lazy_static! {
static ref ENCODING_DETECTOR: Regex = Regex::new(r"^([0-9a-fA-F]{60}\n)*[0-9a-fA-F]{0,60}$").unwrap(); static ref ENCODING_DETECTOR: Regex = Regex::new(r"^\n?([0-9a-fA-F]{60}\n)*[0-9a-fA-F]{0,60}$").unwrap();
static ref HEX_DETECTOR: Regex = Regex::new(r"^[0-9a-fA-F]{0,60}$").unwrap();
} }
static WRAP_SIZE: usize = 60; static WRAP_SIZE: usize = 60;
@ -13,15 +14,28 @@ pub struct MeowCoder {
} }
impl MeowCoder { impl MeowCoder {
pub fn can_be_encoded(data: &[u8]) -> bool { // check if "encoding" by replacing the newline-wrapped hex with raw bytes is possible
ENCODING_DETECTOR.is_match(data) // this takes an index representing the amount of encoded data sent so far, which
// determines where to end the next line of hex according to WRAP_SIZE
pub fn can_be_encoded(data: &[u8], index: usize) -> bool {
if index % (WRAP_SIZE/2) == 0 {
ENCODING_DETECTOR.is_match(data)
} else {
let nl_pos = data.iter().position(|&r| r == '\n' as u8);
match nl_pos {
Some(nl_idx) => (nl_idx/2 + index)%(WRAP_SIZE/2) == 0
&& ENCODING_DETECTOR.is_match(&data[nl_idx..])
&& (nl_idx == 0 || HEX_DETECTOR.is_match(&data[..nl_idx - 1])),
None => false
}
}
} }
pub fn new() -> MeowCoder { pub fn new() -> MeowCoder {
MeowCoder{line_index: 0} MeowCoder{line_index: 0}
} }
fn hex_to_nibble(chr: u8) -> u8 { pub fn hex_to_nibble(chr: u8) -> u8 {
const _AL: u8 = 'a' as u8; const _AL: u8 = 'a' as u8;
const _FL: u8 = 'f' as u8; const _FL: u8 = 'f' as u8;
const _A: u8 = 'A' as u8; const _A: u8 = 'A' as u8;
@ -36,7 +50,7 @@ impl MeowCoder {
} }
} }
fn u8_to_hex(val: u8) -> (u8, u8) { pub fn u8_to_hex(val: u8) -> (u8, u8) {
let first = val >> 4; let first = val >> 4;
let second = val & 0xF; let second = val & 0xF;
const LOOKUP: &'static [u8; 16] = b"0123456789abcdef"; const LOOKUP: &'static [u8; 16] = b"0123456789abcdef";
@ -94,11 +108,13 @@ mod tests {
#[test] #[test]
fn test_match() { fn test_match() {
assert_eq!(MeowCoder::can_be_encoded(b"abcd1234"), true); assert_eq!(MeowCoder::can_be_encoded(b"abcd1234", 0), true);
assert_eq!(MeowCoder::can_be_encoded(b"abcXd1234"), false); assert_eq!(MeowCoder::can_be_encoded(b"abcXd1234", 0), false);
assert_eq!(MeowCoder::can_be_encoded(b"012345678901234567890123456789012345678901234567890123456789\nabcdef"), true); assert_eq!(MeowCoder::can_be_encoded(b"012345678901234567890123456789012345678901234567890123456789\nabcdef", 0), true);
assert_eq!(MeowCoder::can_be_encoded(b"01234567890123456789012345678901234567890123456789012345678\nabcdef"), false); assert_eq!(MeowCoder::can_be_encoded(b"01234567890123456789012345678901234567890123456789012345678\nabcdef", 0), false);
assert_eq!(MeowCoder::can_be_encoded(b"\x12\xab\x45\n"), false); assert_eq!(MeowCoder::can_be_encoded(b"\x12\xab\x45\n", 0), false);
assert_eq!(MeowCoder::can_be_encoded(b"abcd1234\n012345678901234567890123456789012345678901234567890123456789\nabcdefabcd", 26), true);
assert_eq!(MeowCoder::can_be_encoded(b"abcd123456\nabcd", 25), true);
} }
#[test] #[test]

View File

@ -32,6 +32,7 @@ type BO = byteorder::LE;
const LAST_SEG_MASK: u32 = 1 << 31; const LAST_SEG_MASK: u32 = 1 << 31;
const MEOW_CODED_MASK: u32 = 1 << 30; const MEOW_CODED_MASK: u32 = 1 << 30;
const CUT_MASK: u32 = 1 << 29;
impl SerDes for UpMsg { impl SerDes for UpMsg {
fn des(buf: &[u8]) -> Result<Self, DesError> { fn des(buf: &[u8]) -> Result<Self, DesError> {
@ -40,11 +41,12 @@ impl SerDes for UpMsg {
} else { } else {
let hdr = BO::read_u32(&buf[0..4]); let hdr = BO::read_u32(&buf[0..4]);
Ok(UpMsg::Data { Ok(UpMsg::Data {
seg_idx: hdr & !LAST_SEG_MASK & !MEOW_CODED_MASK, seg_idx: hdr & !LAST_SEG_MASK & !MEOW_CODED_MASK & !CUT_MASK,
payload: SegData { payload: SegData {
bytes: buf[4..].into(), bytes: buf[4..].into(),
is_last_segment: (hdr & LAST_SEG_MASK) != 0, is_last_segment: (hdr & LAST_SEG_MASK) != 0,
is_meow_encoded: (hdr & MEOW_CODED_MASK) != 0, is_meow_encoded: (hdr & MEOW_CODED_MASK) != 0,
is_cut: (hdr & CUT_MASK) != 0,
}, },
}) })
} }
@ -58,11 +60,13 @@ impl SerDes for UpMsg {
bytes, bytes,
is_last_segment, is_last_segment,
is_meow_encoded, is_meow_encoded,
is_cut,
}, },
seg_idx, seg_idx,
} => { } => {
let hdr = *seg_idx | if *is_last_segment { LAST_SEG_MASK } else { 0 } let hdr = *seg_idx | if *is_last_segment { LAST_SEG_MASK } else { 0 }
| if *is_meow_encoded { MEOW_CODED_MASK } else { 0 }; | if *is_meow_encoded { MEOW_CODED_MASK } else { 0 }
| if *is_cut { CUT_MASK } else { 0 };
BO::write_u32(&mut buf[0..4], hdr); BO::write_u32(&mut buf[0..4], hdr);
let len = bytes.len(); let len = bytes.len();
buf[4..4 + len].copy_from_slice(&bytes[..]); buf[4..4 + len].copy_from_slice(&bytes[..]);

View File

@ -1,5 +1,3 @@
use tokio::io::{AsyncRead, AsyncWrite};
/// Per the assignment spec, `1472` is the maximum size packet we're allowed to send. /// Per the assignment spec, `1472` is the maximum size packet we're allowed to send.
pub const MAX_TOTAL_PACKET_SIZE: usize = 1472; pub const MAX_TOTAL_PACKET_SIZE: usize = 1472;
@ -53,6 +51,7 @@ pub struct SegData {
pub bytes: Vec<u8>, pub bytes: Vec<u8>,
pub is_last_segment: bool, pub is_last_segment: bool,
pub is_meow_encoded: bool, pub is_meow_encoded: bool,
pub is_cut: bool,
} }
impl SegData { impl SegData {
@ -60,33 +59,33 @@ impl SegData {
self.bytes.len() self.bytes.len()
} }
pub async fn read<IN>(inp: &mut IN) -> Result<SegData, tokio::io::Error> // pub async fn read<IN>(inp: &mut IN) -> Result<SegData, tokio::io::Error>
where // where
IN: AsyncRead + Unpin, // IN: AsyncRead + Unpin,
{ // {
use tokio::io::AsyncReadExt; // use tokio::io::AsyncReadExt;
let mut buf = [0u8; MAX_SEG_SIZE]; // let mut buf = [0u8; MAX_SEG_SIZE];
let len = inp.read(&mut buf).await.unwrap_or(0); // let len = inp.read(&mut buf).await.unwrap_or(0);
if len > 0 { // if len > 0 {
Ok(SegData { // Ok(SegData {
bytes: Vec::from(&buf[..len]), // bytes: Vec::from(&buf[..len]),
is_last_segment: false, // is_last_segment: false,
is_meow_encoded: false, // is_meow_encoded: false,
}) // })
} else { // } else {
Ok(SegData { // Ok(SegData {
bytes: vec![], // bytes: vec![],
is_last_segment: true, // is_last_segment: true,
is_meow_encoded: false, // is_meow_encoded: false,
}) // })
} // }
} // }
//
pub async fn write<OUT>(&self, out: &mut OUT) -> Result<(), tokio::io::Error> // pub async fn write<OUT>(&self, out: &mut OUT) -> Result<(), tokio::io::Error>
where // where
OUT: AsyncWrite + Unpin, // OUT: AsyncWrite + Unpin,
{ // {
use tokio::io::AsyncWriteExt; // use tokio::io::AsyncWriteExt;
out.write_all(&self.bytes).await // out.write_all(&self.bytes).await
} // }
} }