From a372532e963f886637ee9379beb878d7be4c98fc Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Mon, 9 Mar 2020 01:23:29 -0400 Subject: [PATCH] peer / msg abstractions --- hptp-recv/src/main.rs | 23 ++++++------ hptp-send/src/main.rs | 31 ++++++++-------- hptp/src/lib.rs | 2 ++ hptp/src/msg.rs | 16 +++++++++ hptp/src/peer.rs | 83 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 128 insertions(+), 27 deletions(-) create mode 100644 hptp/src/msg.rs create mode 100644 hptp/src/peer.rs diff --git a/hptp-recv/src/main.rs b/hptp-recv/src/main.rs index e0265ec..7956ae5 100644 --- a/hptp-recv/src/main.rs +++ b/hptp-recv/src/main.rs @@ -5,8 +5,9 @@ extern crate tokio; extern crate thiserror; use hptp::log::Logger; +use hptp::msg::Msg; +use hptp::peer::DownloadPeer; use tokio::io::AsyncWrite; -use tokio::net::UdpSocket; #[derive(Error, Debug)] enum Error { @@ -41,21 +42,23 @@ async fn start(log: &mut Logger) -> Result<(), Error> { .await .map_err(|_| Error::NoPortAvail)?; log.bound(sock.local_addr()?.port()).await; - download(log, sock, tokio::io::stdout()).await + let mut peer = DownloadPeer::new(tokio::io::stdout(), sock); + download(log, &mut peer).await } -async fn download(log: &mut Logger, mut sock: UdpSocket, mut out: OUT) -> Result<(), Error> +async fn download(log: &mut Logger, peer: &mut DownloadPeer) -> Result<(), Error> where OUT: AsyncWrite + Unpin, { - use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 2000]; let mut pos = 0; loop { - let (len, _who) = sock.recv_from(&mut buf).await?; - out.write_all(&buf[..len]).await?; - out.flush().await?; - log.recv_data_accepted(pos, len, hptp::log::InOrder).await; - pos += len; + match peer.recv().await? { + Msg::Data(data) => { + let len = data.len(); + peer.write_output(&data).await; + log.recv_data_accepted(pos, len, hptp::log::InOrder).await; + pos += len; + } + } } } diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 55192bc..d429eec 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -5,9 +5,11 @@ extern crate tokio; extern crate thiserror; use hptp::log::Logger; +use hptp::msg::Msg; +use hptp::peer::UploadPeer; use std::net::SocketAddrV4; use tokio::io::AsyncRead; -use tokio::net::UdpSocket; +// use tokio::net::UdpSocket; #[derive(Error, Debug)] enum Error { @@ -54,7 +56,8 @@ async fn start(log: &mut Logger) -> Result<(), Error> { .map_err(|_| Error::NoAvailPort)?; log.debug_msg(format!("bound on {}", sock.local_addr()?)) .await; - upload(log, sock, tokio::io::stdin(), targ_addr).await + let mut peer = UploadPeer::new(tokio::io::stdin(), sock, targ_addr); + upload(log, &mut peer).await } fn parse_args(mut args: impl Iterator) -> Result { @@ -64,26 +67,20 @@ fn parse_args(mut args: impl Iterator) -> Result( - log: &mut Logger, - mut sock: UdpSocket, - mut inp: IN, - targ_addr: SocketAddrV4, -) -> Result<(), Error> +async fn upload(log: &mut Logger, peer: &mut UploadPeer) -> Result<(), Error> where IN: AsyncRead + Unpin, { - use tokio::io::AsyncReadExt; - let mut buf = [0u8; 500]; let mut pos = 0; loop { - let len = inp.read(&mut buf).await?; - if len == 0 { - break; + match peer.read_input().await { + Some(data) => { + let len = data.len(); + peer.send(Msg::Data(data)).await?; + log.send_data(pos, len).await; + pos += len; + } + None => return Ok(()), } - sock.send_to(&buf[..len], targ_addr).await?; - log.send_data(pos, len).await; - pos += len; } - Ok(()) } diff --git a/hptp/src/lib.rs b/hptp/src/lib.rs index a510249..0851005 100644 --- a/hptp/src/lib.rs +++ b/hptp/src/lib.rs @@ -1,6 +1,8 @@ extern crate rand; pub mod log; +pub mod msg; +pub mod peer; use std::net::Ipv4Addr; use tokio::net::UdpSocket; diff --git a/hptp/src/msg.rs b/hptp/src/msg.rs new file mode 100644 index 0000000..72a2c9f --- /dev/null +++ b/hptp/src/msg.rs @@ -0,0 +1,16 @@ +#[derive(Clone)] +pub enum Msg { + Data(Vec), +} + +impl Msg { + pub fn des(data: &[u8]) -> Msg { + Msg::Data(data.into()) + } + + pub fn ser(&self) -> Vec { + match self { + Msg::Data(data) => data.clone(), + } + } +} diff --git a/hptp/src/peer.rs b/hptp/src/peer.rs new file mode 100644 index 0000000..7435087 --- /dev/null +++ b/hptp/src/peer.rs @@ -0,0 +1,83 @@ +use std::net::SocketAddrV4; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::UdpSocket; + +use super::msg::Msg; + +pub struct UploadPeer { + input: IN, + sock: UdpSocket, + targ: SocketAddrV4, +} + +pub struct DownloadPeer { + output: OUT, + sock: UdpSocket, +} + +const BUFFER_SIZE: usize = 1000; + +impl UploadPeer { + pub fn new(input: IN, sock: UdpSocket, targ: SocketAddrV4) -> Self { + UploadPeer { input, sock, targ } + } + + pub async fn send(&mut self, msg: Msg) -> Result<(), tokio::io::Error> { + let bs = msg.ser(); + let mut i = 0; + while i < bs.len() { + let n = self.sock.send_to(&bs[i..], self.targ).await?; + i += n + } + Ok(()) + } +} + +impl UploadPeer +where + IN: AsyncRead + Unpin, +{ + pub async fn read_input(&mut self) -> Option> { + use tokio::io::AsyncReadExt; + let mut buf = [0u8; BUFFER_SIZE]; + let len = self + .input + .read(&mut buf) + .await + .expect("failed to read from stdin"); + if len == 0 { + None + } else { + Some(buf[..len].into()) + } + } +} + +impl DownloadPeer { + pub fn new(output: OUT, sock: UdpSocket) -> Self { + DownloadPeer { output, sock } + } + + pub async fn recv(&mut self) -> Result { + let mut buf = [0u8; BUFFER_SIZE]; + let (len, _who) = self.sock.recv_from(&mut buf).await?; + Ok(Msg::des(&buf[..len])) + } +} + +impl DownloadPeer +where + OUT: AsyncWrite + Unpin, +{ + pub async fn write_output(&mut self, data: &[u8]) { + use tokio::io::AsyncWriteExt; + self.output + .write_all(data) + .await + .expect("failed to write to stdout"); + self.output + .flush() + .await + .expect("failed to write to stdout") + } +}