*very* primitive segment-index ack's

This commit is contained in:
Milo Turner 2020-03-10 12:01:44 -04:00
parent f0481e350d
commit dd6c4ff088
7 changed files with 80 additions and 40 deletions

7
Cargo.lock generated
View File

@ -12,6 +12,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "byteorder"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "bytes"
version = "0.5.4"
@ -55,6 +61,7 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
name = "hptp"
version = "0.1.0"
dependencies = [
"byteorder",
"chrono",
"thiserror",
"tokio",

View File

@ -7,6 +7,7 @@ extern crate thiserror;
use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, DownPeer, Peer};
use std::collections::HashMap;
use tokio::io::AsyncWrite;
#[derive(Error, Debug)]
@ -55,22 +56,34 @@ async fn download<OUT>(log: &mut Logger, peer: &mut DownPeer, out: &mut OUT) ->
where
OUT: AsyncWrite + Unpin,
{
use tokio::io::AsyncWriteExt;
let mut pos = 0;
let mut segs = HashMap::new();
let mut flush_seg_idx = 0;
loop {
match peer.recv().await {
Ok(UpMsg::Data { payload, .. }) => {
Ok(UpMsg::Data { payload, seg_idx }) => {
let len = payload.len();
out.write_all(&payload).await?;
out.flush().await?;
log.recv_data_accepted(pos, len, hptp::logger::InOrder)
segs.entry(seg_idx).or_insert(Some(payload));
let ack = DownMsg::Ack {
idxs: segs.keys().cloned().collect(),
};
log.debug_msg(format!("sent ack: {:?}",
segs.keys().collect::<Vec<_>>())).await;
send(peer, ack).await?;
log.recv_data_accepted(seg_idx as usize, len, hptp::logger::OutOfOrder)
.await;
send(peer, DownMsg::Ack {}).await?;
pos += len;
}
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await,
Err(peer::RecvError::Io { source }) => return Err(source.into()),
}
while let Some(v) = segs.get_mut(&flush_seg_idx) {
if let Some(payload) = v.take() {
use tokio::io::AsyncWriteExt;
out.write_all(&payload).await?;
out.flush().await?;
}
flush_seg_idx += 1;
}
}
}

View File

@ -5,7 +5,7 @@ extern crate tokio;
extern crate thiserror;
use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg};
use hptp::msg::UpMsg;
use hptp::peer::{self, Peer, UpPeer};
use std::net::SocketAddr;
use tokio::io::AsyncRead;
@ -91,30 +91,29 @@ async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result
where
IN: AsyncRead + Unpin,
{
let mut pos = 0;
let mut next_ack_len: Option<usize> = None;
let mut seg_idx = 0;
// TODO: async recieve acks
loop {
if let Some(ack_len) = next_ack_len {
/* if let Some(ack_len) = next_ack_len {
match peer.recv().await {
Ok(DownMsg::Ack { .. }) => {
log.recv_ack(pos).await;
pos += ack_len;
Ok(DownMsg::Ack { idxs }) => {
// log.recv_ack(pos).await;
next_ack_len = None;
}
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await,
Err(peer::RecvError::Io { source }) => return Err(source.into()),
}
} else {
match read_segment(inp).await? {
Some(data) => {
next_ack_len = Some(data.len());
log.send_data(pos, data.len()).await;
send(peer, UpMsg::Data { payload: data }).await?;
}
None => break,
} else {*/
match read_segment(inp).await? {
Some(payload) => {
log.send_data(seg_idx as usize, payload.len()).await;
let data = UpMsg::Data { payload, seg_idx };
send(peer, data).await?;
seg_idx += 1;
}
None => break,
}
// }
}
log.completed().await;
Ok(())

View File

@ -10,4 +10,5 @@ edition = "2018"
[dependencies]
tokio = {version = "0.2.*", features = ["io-std", "io-util", "udp"]}
thiserror = "*"
chrono = "0.4.*"
chrono = "0.4.*"
byteorder = "1.3.*"

View File

@ -1,8 +1,9 @@
#[macro_use]
extern crate thiserror;
extern crate byteorder;
extern crate chrono;
pub mod logger;
pub mod seg;
pub mod msg;
pub mod peer;
pub mod seg;

View File

@ -1,18 +1,20 @@
pub use super::seg::{MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE, DOWN_HEADER_SIZE};
use super::seg::SegIdx;
pub use super::seg::{DOWN_HEADER_SIZE, MAX_TOTAL_PACKET_SIZE, UP_HEADER_SIZE};
use byteorder::ByteOrder;
use std::collections::HashSet;
#[derive(Clone, Debug)]
pub enum UpMsg {
Data {
payload: Vec<u8>,
// seg_idx: SegIdx,
// encoding: SegmentEncoding,
seg_idx: SegIdx,
// is_final_packet: bool,
},
}
#[derive(Clone, Debug)]
pub enum DownMsg {
Ack {}, // ackd: SegmentSet
Ack { idxs: HashSet<SegIdx> },
}
#[derive(Error, Debug)]
@ -26,30 +28,47 @@ pub trait SerDes: Sized {
fn ser_to(&self, buf: &mut [u8]) -> usize;
}
type BO = byteorder::LE;
impl SerDes for UpMsg {
fn des(buf: &[u8]) -> Result<Self, DesError> {
Ok(UpMsg::Data {
payload: buf.into(),
})
if buf.len() < UP_HEADER_SIZE {
Err(DesError)
} else {
Ok(UpMsg::Data {
seg_idx: BO::read_u32(&buf[0..4]),
payload: buf[4..].into(),
})
}
}
fn ser_to(&self, buf: &mut [u8]) -> usize {
match self {
UpMsg::Data { payload, .. } => {
UpMsg::Data { payload, seg_idx } => {
let len = payload.len();
buf[..len].copy_from_slice(&payload[..]);
len
BO::write_u32(&mut buf[0..4], *seg_idx);
buf[4..4 + len].copy_from_slice(&payload[..]);
4 + len
}
}
}
}
impl SerDes for DownMsg {
fn des(_buf: &[u8]) -> Result<Self, DesError> {
Ok(DownMsg::Ack {})
fn des(buf: &[u8]) -> Result<Self, DesError> {
let mut idxs = HashSet::new();
for (i, b) in buf.iter().cloned().enumerate() {
for j in 0..8 {
if b & (1 << j) != 0 {
idxs.insert((i * 8 + j) as SegIdx);
}
}
}
Ok(DownMsg::Ack { idxs })
}
fn ser_to(&self, _buf: &mut [u8]) -> usize {
1
// TODO: implement this!
0
}
}

View File

@ -2,7 +2,7 @@
pub const MAX_TOTAL_PACKET_SIZE: usize = 1472;
// TODO: change these based off the decoders
pub const UP_HEADER_SIZE: usize = 0;
pub const UP_HEADER_SIZE: usize = 4;
pub const DOWN_HEADER_SIZE: usize = 1;
/// This is the maximum amount of segment data we can fit into a packet.