dumb simple Ack messages and peer error handling

This commit is contained in:
Milo Turner 2020-03-09 11:33:19 -04:00
parent a569737ba2
commit ebd3c8333e
7 changed files with 100 additions and 14 deletions

1
Cargo.lock generated
View File

@ -59,6 +59,7 @@ name = "hptp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"rand", "rand",
"thiserror",
"tokio", "tokio",
] ]

View File

@ -6,7 +6,7 @@ extern crate thiserror;
use hptp::log::Logger; use hptp::log::Logger;
use hptp::msg::Msg; use hptp::msg::Msg;
use hptp::peer::Peer; use hptp::peer::{self, Peer};
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -47,6 +47,14 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
download(log, &mut peer, &mut out).await download(log, &mut peer, &mut out).await
} }
async fn send(peer: &mut Peer, m: Msg) -> Result<(), Error> {
match peer.send(m).await {
Ok(()) => Ok(()),
Err(peer::SendError::Io { source }) => Err(source.into()),
Err(peer::SendError::NoTarget) => unreachable!(),
}
}
async fn download<OUT>(log: &mut Logger, peer: &mut Peer, out: &mut OUT) -> Result<(), Error> async fn download<OUT>(log: &mut Logger, peer: &mut Peer, out: &mut OUT) -> Result<(), Error>
where where
OUT: AsyncWrite + Unpin, OUT: AsyncWrite + Unpin,
@ -54,14 +62,24 @@ where
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
let mut pos = 0; let mut pos = 0;
loop { loop {
match peer.recv().await? { match peer.recv().await {
Msg::Data(data) => { Ok(Msg::Ack) => log.debug_msg("not expected an ack...").await,
Ok(Msg::Data(data)) => {
let len = data.len(); let len = data.len();
out.write_all(&data).await?; out.write_all(&data).await?;
out.flush().await?; out.flush().await?;
log.recv_data_accepted(pos, len, hptp::log::InOrder).await; log.recv_data_accepted(pos, len, hptp::log::InOrder).await;
send(peer, Msg::Ack).await?;
pos += len; pos += len;
} }
Err(peer::RecvError::InvalidMessage { .. }) => {
log.debug_msg(format!("got an invalid message; discarding"))
.await
}
Err(peer::RecvError::Io { source }) => return Err(source.into()),
} }
} }
} }

View File

@ -6,7 +6,7 @@ extern crate thiserror;
use hptp::log::Logger; use hptp::log::Logger;
use hptp::msg::{self, Msg}; use hptp::msg::{self, Msg};
use hptp::peer::Peer; use hptp::peer::{self, Peer};
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
// use tokio::net::UdpSocket; // use tokio::net::UdpSocket;
@ -83,6 +83,25 @@ where
}) })
} }
async fn send(peer: &mut Peer, m: Msg) -> Result<(), Error> {
match peer.send(m).await {
Ok(()) => Ok(()),
Err(peer::SendError::Io { source }) => Err(source.into()),
Err(peer::SendError::NoTarget) => panic!("tried to send w/ no target!"),
}
}
async fn recv(log: &mut Logger, peer: &mut Peer) -> Result<Option<Msg>, Error> {
match peer.recv().await {
Ok(m) => Ok(Some(m)),
Err(peer::RecvError::Io { source }) => Err(source.into()),
Err(peer::RecvError::InvalidMessage { .. }) => {
log.debug_msg("invalid message; discarding").await;
Ok(None)
}
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error> async fn upload<IN>(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error>
where where
IN: AsyncRead + Unpin, IN: AsyncRead + Unpin,
@ -92,8 +111,13 @@ where
match read_data(inp).await? { match read_data(inp).await? {
Some(data) => { Some(data) => {
let len = data.len(); let len = data.len();
peer.send(Msg::Data(data)).await?; send(peer, Msg::Data(data)).await?;
log.send_data(pos, len).await; log.send_data(pos, len).await;
if let Some(Msg::Ack) = recv(log, peer).await? {
log.debug_msg("got ack").await;
} else {
log.debug_msg("didn't get ack??").await;
}
pos += len; pos += len;
} }
None => return Ok(()), None => return Ok(()),

View File

@ -9,4 +9,5 @@ edition = "2018"
[dependencies] [dependencies]
tokio = {version = "0.2.*", features = ["io-std", "io-util", "udp"]} tokio = {version = "0.2.*", features = ["io-std", "io-util", "udp"]}
rand = "0.7.*" rand = "0.7.*"
thiserror = "*"

View File

@ -1,4 +1,6 @@
extern crate rand; extern crate rand;
#[macro_use]
extern crate thiserror;
pub mod log; pub mod log;
pub mod msg; pub mod msg;

View File

@ -1,19 +1,34 @@
#[derive(Clone)] #[derive(Clone)]
pub enum Msg { pub enum Msg {
Data(Vec<u8>), Data(Vec<u8>),
Ack,
} }
pub const MAX_DATA_SIZE: usize = 999; pub const MAX_DATA_SIZE: usize = 999;
pub const MAX_SERIALIZED_SIZE: usize = MAX_DATA_SIZE; pub const MAX_SERIALIZED_SIZE: usize = 1 + MAX_DATA_SIZE;
#[derive(Error, Debug)]
#[error("deserialization failed; malformed packet")]
pub struct DesError;
impl Msg { impl Msg {
pub fn des(data: &[u8]) -> Msg { pub fn des(data: &[u8]) -> Result<Msg, DesError> {
Msg::Data(data.into()) match data.first() {
Some(0) => Ok(Msg::Data(data[1..].into())),
Some(1) => Ok(Msg::Ack),
_ => Err(DesError),
}
} }
pub fn ser(&self) -> Vec<u8> { pub fn ser(&self) -> Vec<u8> {
let mut buf = Vec::new();
match self { match self {
Msg::Data(data) => data.clone(), Msg::Data(data) => {
buf.push(0);
buf.extend_from_slice(data);
}
Msg::Ack => buf.push(1),
} }
buf
} }
} }

View File

@ -8,6 +8,31 @@ pub struct Peer {
targ: Option<SocketAddr>, targ: Option<SocketAddr>,
} }
#[derive(Error, Debug)]
pub enum RecvError {
#[error("io error: {source}")]
Io {
#[from]
source: tokio::io::Error,
},
#[error("{source}")]
InvalidMessage {
#[from]
source: msg::DesError,
},
}
#[derive(Error, Debug)]
pub enum SendError {
#[error("io error: {source}")]
Io {
#[from]
source: tokio::io::Error,
},
#[error("no target to send to")]
NoTarget,
}
impl Peer { impl Peer {
pub fn new(sock: UdpSocket) -> Self { pub fn new(sock: UdpSocket) -> Self {
Peer { sock, targ: None } Peer { sock, targ: None }
@ -17,17 +42,17 @@ impl Peer {
self.targ = Some(addr); self.targ = Some(addr);
} }
pub async fn send(&mut self, msg: Msg) -> Result<(), tokio::io::Error> { pub async fn send(&mut self, msg: Msg) -> Result<(), SendError> {
let targ = self.targ.expect("no target to send to"); let targ = self.targ.ok_or(SendError::NoTarget)?;
let bs = msg.ser(); let bs = msg.ser();
let _n_sent = self.sock.send_to(&bs, targ).await?; let _n_sent = self.sock.send_to(&bs, targ).await?;
Ok(()) Ok(())
} }
pub async fn recv(&mut self) -> Result<Msg, tokio::io::Error> { pub async fn recv(&mut self) -> Result<Msg, RecvError> {
let mut buf = [0u8; msg::MAX_SERIALIZED_SIZE]; let mut buf = [0u8; msg::MAX_SERIALIZED_SIZE];
let (len, who) = self.sock.recv_from(&mut buf).await?; let (len, who) = self.sock.recv_from(&mut buf).await?;
self.set_known_target(who); self.set_known_target(who);
Ok(Msg::des(&buf[..len])) Ok(Msg::des(&buf[..len])?)
} }
} }