unified "peer" type for upload & download

This commit is contained in:
Milo Turner 2020-03-09 10:46:37 -04:00
parent a372532e96
commit 204aa4998f
3 changed files with 45 additions and 70 deletions

View File

@ -6,7 +6,7 @@ extern crate thiserror;
use hptp::log::Logger; use hptp::log::Logger;
use hptp::msg::Msg; use hptp::msg::Msg;
use hptp::peer::DownloadPeer; use hptp::peer::Peer;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -42,20 +42,23 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
.await .await
.map_err(|_| Error::NoPortAvail)?; .map_err(|_| Error::NoPortAvail)?;
log.bound(sock.local_addr()?.port()).await; log.bound(sock.local_addr()?.port()).await;
let mut peer = DownloadPeer::new(tokio::io::stdout(), sock); let mut out = tokio::io::stdout();
download(log, &mut peer).await let mut peer = Peer::new(sock);
download(log, &mut peer, &mut out).await
} }
async fn download<OUT>(log: &mut Logger, peer: &mut DownloadPeer<OUT>) -> Result<(), Error> async fn download<OUT>(log: &mut Logger, peer: &mut Peer, out: &mut OUT) -> Result<(), Error>
where where
OUT: AsyncWrite + Unpin, OUT: AsyncWrite + Unpin,
{ {
use tokio::io::AsyncWriteExt;
let mut pos = 0; let mut pos = 0;
loop { loop {
match peer.recv().await? { match peer.recv().await? {
Msg::Data(data) => { Msg::Data(data) => {
let len = data.len(); let len = data.len();
peer.write_output(&data).await; out.write_all(&data).await?;
out.flush().await?;
log.recv_data_accepted(pos, len, hptp::log::InOrder).await; log.recv_data_accepted(pos, len, hptp::log::InOrder).await;
pos += len; pos += len;
} }

View File

@ -6,8 +6,8 @@ extern crate thiserror;
use hptp::log::Logger; use hptp::log::Logger;
use hptp::msg::Msg; use hptp::msg::Msg;
use hptp::peer::UploadPeer; use hptp::peer::Peer;
use std::net::SocketAddrV4; use std::net::SocketAddr;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
// use tokio::net::UdpSocket; // use tokio::net::UdpSocket;
@ -56,24 +56,40 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
.map_err(|_| Error::NoAvailPort)?; .map_err(|_| Error::NoAvailPort)?;
log.debug_msg(format!("bound on {}", sock.local_addr()?)) log.debug_msg(format!("bound on {}", sock.local_addr()?))
.await; .await;
let mut peer = UploadPeer::new(tokio::io::stdin(), sock, targ_addr); let mut out = tokio::io::stdin();
upload(log, &mut peer).await let mut peer = Peer::new(sock);
peer.set_known_target(targ_addr);
upload(log, &mut peer, &mut out).await
} }
fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddrV4, Error> { fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Error> {
args.nth(1) args.nth(1)
.ok_or(Error::InvalidArgs)? .ok_or(Error::InvalidArgs)?
.parse() .parse()
.map_err(|_| Error::InvalidArgs) .map_err(|_| Error::InvalidArgs)
} }
async fn upload<IN>(log: &mut Logger, peer: &mut UploadPeer<IN>) -> Result<(), Error> async fn read_some<IN>(inp: &mut IN) -> Result<Option<Vec<u8>>, Error>
where
IN: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
let len = inp.read(&mut buf).await?;
Ok(if len > 0 {
Some(buf[..len].into())
} else {
None
})
}
async fn upload<IN>(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error>
where where
IN: AsyncRead + Unpin, IN: AsyncRead + Unpin,
{ {
let mut pos = 0; let mut pos = 0;
loop { loop {
match peer.read_input().await { match read_some(inp).await? {
Some(data) => { Some(data) => {
let len = data.len(); let len = data.len();
peer.send(Msg::Data(data)).await?; peer.send(Msg::Data(data)).await?;

View File

@ -1,83 +1,39 @@
use std::net::SocketAddrV4; use std::net::SocketAddr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use super::msg::Msg; use super::msg::Msg;
pub struct UploadPeer<IN> { pub struct Peer {
input: IN,
sock: UdpSocket,
targ: SocketAddrV4,
}
pub struct DownloadPeer<OUT> {
output: OUT,
sock: UdpSocket, sock: UdpSocket,
targ: Option<SocketAddr>,
} }
const BUFFER_SIZE: usize = 1000; const BUFFER_SIZE: usize = 1000;
impl<IN> UploadPeer<IN> { impl Peer {
pub fn new(input: IN, sock: UdpSocket, targ: SocketAddrV4) -> Self { pub fn new(sock: UdpSocket) -> Self {
UploadPeer { input, sock, targ } Peer { sock, targ: None }
}
pub fn set_known_target(&mut self, addr: SocketAddr) {
self.targ = Some(addr);
} }
pub async fn send(&mut self, msg: Msg) -> Result<(), tokio::io::Error> { pub async fn send(&mut self, msg: Msg) -> Result<(), tokio::io::Error> {
let targ = self.targ.expect("no target to send to");
let bs = msg.ser(); let bs = msg.ser();
let mut i = 0; let mut i = 0;
while i < bs.len() { while i < bs.len() {
let n = self.sock.send_to(&bs[i..], self.targ).await?; let n = self.sock.send_to(&bs[i..], targ).await?;
i += n i += n
} }
Ok(()) Ok(())
} }
}
impl<IN> UploadPeer<IN>
where
IN: AsyncRead + Unpin,
{
pub async fn read_input(&mut self) -> Option<Vec<u8>> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; BUFFER_SIZE];
let len = self
.input
.read(&mut buf)
.await
.expect("failed to read from stdin");
if len == 0 {
None
} else {
Some(buf[..len].into())
}
}
}
impl<OUT> DownloadPeer<OUT> {
pub fn new(output: OUT, sock: UdpSocket) -> Self {
DownloadPeer { output, sock }
}
pub async fn recv(&mut self) -> Result<Msg, tokio::io::Error> { pub async fn recv(&mut self) -> Result<Msg, tokio::io::Error> {
let mut buf = [0u8; BUFFER_SIZE]; let mut buf = [0u8; BUFFER_SIZE];
let (len, _who) = self.sock.recv_from(&mut buf).await?; let (len, who) = self.sock.recv_from(&mut buf).await?;
self.set_known_target(who);
Ok(Msg::des(&buf[..len])) Ok(Msg::des(&buf[..len]))
} }
} }
impl<OUT> DownloadPeer<OUT>
where
OUT: AsyncWrite + Unpin,
{
pub async fn write_output(&mut self, data: &[u8]) {
use tokio::io::AsyncWriteExt;
self.output
.write_all(data)
.await
.expect("failed to write to stdout");
self.output
.flush()
.await
.expect("failed to write to stdout")
}
}