implement other log functions and use them
This commit is contained in:
parent
ebd3c8333e
commit
ca8a1b6884
|
@ -4,7 +4,7 @@ extern crate tokio;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate thiserror;
|
extern crate thiserror;
|
||||||
|
|
||||||
use hptp::log::Logger;
|
use hptp::logger::Logger;
|
||||||
use hptp::msg::Msg;
|
use hptp::msg::Msg;
|
||||||
use hptp::peer::{self, Peer};
|
use hptp::peer::{self, Peer};
|
||||||
use tokio::io::AsyncWrite;
|
use tokio::io::AsyncWrite;
|
||||||
|
@ -63,21 +63,19 @@ where
|
||||||
let mut pos = 0;
|
let mut pos = 0;
|
||||||
loop {
|
loop {
|
||||||
match peer.recv().await {
|
match peer.recv().await {
|
||||||
Ok(Msg::Ack) => log.debug_msg("not expected an ack...").await,
|
Ok(Msg::Ack) => log.debug_msg("not expecting an ack...").await,
|
||||||
|
|
||||||
Ok(Msg::Data(data)) => {
|
Ok(Msg::Data(data)) => {
|
||||||
let len = data.len();
|
let len = data.len();
|
||||||
out.write_all(&data).await?;
|
out.write_all(&data).await?;
|
||||||
out.flush().await?;
|
out.flush().await?;
|
||||||
log.recv_data_accepted(pos, len, hptp::log::InOrder).await;
|
log.recv_data_accepted(pos, len, hptp::logger::InOrder)
|
||||||
|
.await;
|
||||||
send(peer, Msg::Ack).await?;
|
send(peer, Msg::Ack).await?;
|
||||||
pos += len;
|
pos += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(peer::RecvError::InvalidMessage { .. }) => {
|
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await,
|
||||||
log.debug_msg(format!("got an invalid message; discarding"))
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(peer::RecvError::Io { source }) => return Err(source.into()),
|
Err(peer::RecvError::Io { source }) => return Err(source.into()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ extern crate tokio;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate thiserror;
|
extern crate thiserror;
|
||||||
|
|
||||||
use hptp::log::Logger;
|
use hptp::logger::Logger;
|
||||||
use hptp::msg::{self, Msg};
|
use hptp::msg::{self, Msg};
|
||||||
use hptp::peer::{self, Peer};
|
use hptp::peer::{self, Peer};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -96,7 +96,7 @@ async fn recv(log: &mut Logger, peer: &mut Peer) -> Result<Option<Msg>, Error> {
|
||||||
Ok(m) => Ok(Some(m)),
|
Ok(m) => Ok(Some(m)),
|
||||||
Err(peer::RecvError::Io { source }) => Err(source.into()),
|
Err(peer::RecvError::Io { source }) => Err(source.into()),
|
||||||
Err(peer::RecvError::InvalidMessage { .. }) => {
|
Err(peer::RecvError::InvalidMessage { .. }) => {
|
||||||
log.debug_msg("invalid message; discarding").await;
|
log.recv_corrupt().await;
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,13 +114,15 @@ where
|
||||||
send(peer, Msg::Data(data)).await?;
|
send(peer, Msg::Data(data)).await?;
|
||||||
log.send_data(pos, len).await;
|
log.send_data(pos, len).await;
|
||||||
if let Some(Msg::Ack) = recv(log, peer).await? {
|
if let Some(Msg::Ack) = recv(log, peer).await? {
|
||||||
log.debug_msg("got ack").await;
|
log.recv_ack(pos).await;
|
||||||
} else {
|
} else {
|
||||||
log.debug_msg("didn't get ack??").await;
|
log.debug_msg("didn't get ack??").await;
|
||||||
}
|
}
|
||||||
pos += len;
|
pos += len;
|
||||||
}
|
}
|
||||||
None => return Ok(()),
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.completed().await;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ extern crate rand;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate thiserror;
|
extern crate thiserror;
|
||||||
|
|
||||||
pub mod log;
|
pub mod logger;
|
||||||
pub mod msg;
|
pub mod msg;
|
||||||
pub mod peer;
|
pub mod peer;
|
||||||
|
|
||||||
|
|
|
@ -53,9 +53,21 @@ impl Logger {
|
||||||
self.log_payload(LogPayload::RecvDataIgnored).await
|
self.log_payload(LogPayload::RecvDataIgnored).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn recv_ack(&mut self, last_offset: usize) {
|
||||||
|
self.log_payload(LogPayload::RecvAck { last_offset }).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn recv_corrupt(&mut self) {
|
||||||
|
self.log_payload(LogPayload::RecvCorrupt).await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn send_data(&mut self, start: usize, len: usize) {
|
pub async fn send_data(&mut self, start: usize, len: usize) {
|
||||||
self.log_payload(LogPayload::SendData { start, len }).await
|
self.log_payload(LogPayload::SendData { start, len }).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn completed(&mut self) {
|
||||||
|
self.log_payload(LogPayload::Completed).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LogMessage<'a> {
|
struct LogMessage<'a> {
|
||||||
|
@ -76,10 +88,15 @@ enum LogPayload<'a> {
|
||||||
order: AcceptedOrder,
|
order: AcceptedOrder,
|
||||||
},
|
},
|
||||||
RecvDataIgnored,
|
RecvDataIgnored,
|
||||||
|
RecvAck {
|
||||||
|
last_offset: usize,
|
||||||
|
},
|
||||||
|
RecvCorrupt,
|
||||||
SendData {
|
SendData {
|
||||||
start: usize,
|
start: usize,
|
||||||
len: usize,
|
len: usize,
|
||||||
},
|
},
|
||||||
|
Completed,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||||
|
@ -106,7 +123,10 @@ impl Display for LogPayload<'_> {
|
||||||
write!(f, "[recv data] {} ({}) ACCEPTED ({})", start, len, order)
|
write!(f, "[recv data] {} ({}) ACCEPTED ({})", start, len, order)
|
||||||
}
|
}
|
||||||
LogPayload::RecvDataIgnored => write!(f, "[recv data] IGNORED"),
|
LogPayload::RecvDataIgnored => write!(f, "[recv data] IGNORED"),
|
||||||
|
LogPayload::RecvCorrupt => write!(f, "[recv corrupt packet]"),
|
||||||
|
LogPayload::RecvAck { last_offset } => write!(f, "[recv ack] {}", last_offset),
|
||||||
LogPayload::SendData { start, len } => write!(f, "[send data] {} ({})", start, len),
|
LogPayload::SendData { start, len } => write!(f, "[send data] {} ({})", start, len),
|
||||||
|
LogPayload::Completed => write!(f, "[completed]"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue