peer / msg abstractions
This commit is contained in:
parent
31ec42bf3a
commit
a372532e96
|
@ -5,8 +5,9 @@ extern crate tokio;
|
||||||
extern crate thiserror;
|
extern crate thiserror;
|
||||||
|
|
||||||
use hptp::log::Logger;
|
use hptp::log::Logger;
|
||||||
|
use hptp::msg::Msg;
|
||||||
|
use hptp::peer::DownloadPeer;
|
||||||
use tokio::io::AsyncWrite;
|
use tokio::io::AsyncWrite;
|
||||||
use tokio::net::UdpSocket;
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
enum Error {
|
enum Error {
|
||||||
|
@ -41,21 +42,23 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Error::NoPortAvail)?;
|
.map_err(|_| Error::NoPortAvail)?;
|
||||||
log.bound(sock.local_addr()?.port()).await;
|
log.bound(sock.local_addr()?.port()).await;
|
||||||
download(log, sock, tokio::io::stdout()).await
|
let mut peer = DownloadPeer::new(tokio::io::stdout(), sock);
|
||||||
|
download(log, &mut peer).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download<OUT>(log: &mut Logger, mut sock: UdpSocket, mut out: OUT) -> Result<(), Error>
|
async fn download<OUT>(log: &mut Logger, peer: &mut DownloadPeer<OUT>) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
OUT: AsyncWrite + Unpin,
|
OUT: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
let mut buf = [0u8; 2000];
|
|
||||||
let mut pos = 0;
|
let mut pos = 0;
|
||||||
loop {
|
loop {
|
||||||
let (len, _who) = sock.recv_from(&mut buf).await?;
|
match peer.recv().await? {
|
||||||
out.write_all(&buf[..len]).await?;
|
Msg::Data(data) => {
|
||||||
out.flush().await?;
|
let len = data.len();
|
||||||
log.recv_data_accepted(pos, len, hptp::log::InOrder).await;
|
peer.write_output(&data).await;
|
||||||
pos += len;
|
log.recv_data_accepted(pos, len, hptp::log::InOrder).await;
|
||||||
|
pos += len;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,11 @@ extern crate tokio;
|
||||||
extern crate thiserror;
|
extern crate thiserror;
|
||||||
|
|
||||||
use hptp::log::Logger;
|
use hptp::log::Logger;
|
||||||
|
use hptp::msg::Msg;
|
||||||
|
use hptp::peer::UploadPeer;
|
||||||
use std::net::SocketAddrV4;
|
use std::net::SocketAddrV4;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
use tokio::net::UdpSocket;
|
// use tokio::net::UdpSocket;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
enum Error {
|
enum Error {
|
||||||
|
@ -54,7 +56,8 @@ async fn start(log: &mut Logger) -> Result<(), Error> {
|
||||||
.map_err(|_| Error::NoAvailPort)?;
|
.map_err(|_| Error::NoAvailPort)?;
|
||||||
log.debug_msg(format!("bound on {}", sock.local_addr()?))
|
log.debug_msg(format!("bound on {}", sock.local_addr()?))
|
||||||
.await;
|
.await;
|
||||||
upload(log, sock, tokio::io::stdin(), targ_addr).await
|
let mut peer = UploadPeer::new(tokio::io::stdin(), sock, targ_addr);
|
||||||
|
upload(log, &mut peer).await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddrV4, Error> {
|
fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddrV4, Error> {
|
||||||
|
@ -64,26 +67,20 @@ fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddrV4, Er
|
||||||
.map_err(|_| Error::InvalidArgs)
|
.map_err(|_| Error::InvalidArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload<IN>(
|
async fn upload<IN>(log: &mut Logger, peer: &mut UploadPeer<IN>) -> Result<(), Error>
|
||||||
log: &mut Logger,
|
|
||||||
mut sock: UdpSocket,
|
|
||||||
mut inp: IN,
|
|
||||||
targ_addr: SocketAddrV4,
|
|
||||||
) -> Result<(), Error>
|
|
||||||
where
|
where
|
||||||
IN: AsyncRead + Unpin,
|
IN: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
let mut buf = [0u8; 500];
|
|
||||||
let mut pos = 0;
|
let mut pos = 0;
|
||||||
loop {
|
loop {
|
||||||
let len = inp.read(&mut buf).await?;
|
match peer.read_input().await {
|
||||||
if len == 0 {
|
Some(data) => {
|
||||||
break;
|
let len = data.len();
|
||||||
|
peer.send(Msg::Data(data)).await?;
|
||||||
|
log.send_data(pos, len).await;
|
||||||
|
pos += len;
|
||||||
|
}
|
||||||
|
None => return Ok(()),
|
||||||
}
|
}
|
||||||
sock.send_to(&buf[..len], targ_addr).await?;
|
|
||||||
log.send_data(pos, len).await;
|
|
||||||
pos += len;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
|
|
||||||
pub mod log;
|
pub mod log;
|
||||||
|
pub mod msg;
|
||||||
|
pub mod peer;
|
||||||
|
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum Msg {
|
||||||
|
Data(Vec<u8>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Msg {
|
||||||
|
pub fn des(data: &[u8]) -> Msg {
|
||||||
|
Msg::Data(data.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ser(&self) -> Vec<u8> {
|
||||||
|
match self {
|
||||||
|
Msg::Data(data) => data.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
use std::net::SocketAddrV4;
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
|
|
||||||
|
use super::msg::Msg;
|
||||||
|
|
||||||
|
pub struct UploadPeer<IN> {
|
||||||
|
input: IN,
|
||||||
|
sock: UdpSocket,
|
||||||
|
targ: SocketAddrV4,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DownloadPeer<OUT> {
|
||||||
|
output: OUT,
|
||||||
|
sock: UdpSocket,
|
||||||
|
}
|
||||||
|
|
||||||
|
const BUFFER_SIZE: usize = 1000;
|
||||||
|
|
||||||
|
impl<IN> UploadPeer<IN> {
|
||||||
|
pub fn new(input: IN, sock: UdpSocket, targ: SocketAddrV4) -> Self {
|
||||||
|
UploadPeer { input, sock, targ }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send(&mut self, msg: Msg) -> Result<(), tokio::io::Error> {
|
||||||
|
let bs = msg.ser();
|
||||||
|
let mut i = 0;
|
||||||
|
while i < bs.len() {
|
||||||
|
let n = self.sock.send_to(&bs[i..], self.targ).await?;
|
||||||
|
i += n
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<IN> UploadPeer<IN>
|
||||||
|
where
|
||||||
|
IN: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
pub async fn read_input(&mut self) -> Option<Vec<u8>> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
let mut buf = [0u8; BUFFER_SIZE];
|
||||||
|
let len = self
|
||||||
|
.input
|
||||||
|
.read(&mut buf)
|
||||||
|
.await
|
||||||
|
.expect("failed to read from stdin");
|
||||||
|
if len == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(buf[..len].into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<OUT> DownloadPeer<OUT> {
|
||||||
|
pub fn new(output: OUT, sock: UdpSocket) -> Self {
|
||||||
|
DownloadPeer { output, sock }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn recv(&mut self) -> Result<Msg, tokio::io::Error> {
|
||||||
|
let mut buf = [0u8; BUFFER_SIZE];
|
||||||
|
let (len, _who) = self.sock.recv_from(&mut buf).await?;
|
||||||
|
Ok(Msg::des(&buf[..len]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<OUT> DownloadPeer<OUT>
|
||||||
|
where
|
||||||
|
OUT: AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
|
pub async fn write_output(&mut self, data: &[u8]) {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
self.output
|
||||||
|
.write_all(data)
|
||||||
|
.await
|
||||||
|
.expect("failed to write to stdout");
|
||||||
|
self.output
|
||||||
|
.flush()
|
||||||
|
.await
|
||||||
|
.expect("failed to write to stdout")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue