#![feature(backtrace)] extern crate hptp; extern crate tokio; #[macro_use] extern crate thiserror; use hptp::logger::Logger; use hptp::msg::{self, Msg}; use hptp::peer::{self, Peer}; use std::net::SocketAddr; use tokio::io::AsyncRead; // use tokio::net::UdpSocket; #[derive(Error, Debug)] enum Error { #[error("io error: {source}")] Io { #[from] source: tokio::io::Error, backtrace: std::backtrace::Backtrace, }, #[error("invalid command-line arguments")] InvalidArgs, #[error("did not find an available local port")] NoAvailPort, } fn main() { match entry() { Err(Error::InvalidArgs) => print_usage(), Err(e) => { use std::error::Error; println!("Error: {:?}", e); for bt in e.backtrace() { println!("{}", bt); } } Ok(()) => (), } } fn print_usage() { print!("Usage:\n./3700send :\n") } fn entry() -> Result<(), Error> { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut log = Logger::new(); rt.block_on(start(&mut log)) } async fn start(log: &mut Logger) -> Result<(), Error> { let targ_addr = parse_args(std::env::args())?; let sock = hptp::udp_retrying_bind(16, hptp::random_port) .await .map_err(|_| Error::NoAvailPort)?; log.debug_msg(format!("bound on {}", sock.local_addr()?)) .await; let mut out = tokio::io::stdin(); let mut peer = Peer::new(sock); peer.set_known_target(targ_addr); upload(log, &mut peer, &mut out).await } fn parse_args(mut args: impl Iterator) -> Result { args.nth(1) .ok_or(Error::InvalidArgs)? .parse() .map_err(|_| Error::InvalidArgs) } async fn read_data(inp: &mut IN) -> Result>, Error> where IN: AsyncRead + Unpin, { use tokio::io::AsyncReadExt; let mut buf = [0u8; msg::MAX_DATA_SIZE]; let len = inp.read(&mut buf).await?; Ok(if len > 0 { Some(buf[..len].into()) } else { None }) } async fn send(peer: &mut Peer, m: Msg) -> Result<(), Error> { match peer.send(m).await { Ok(()) => Ok(()), Err(peer::SendError::Io { source }) => Err(source.into()), Err(peer::SendError::NoTarget) => panic!("tried to send w/ no target!"), } } async fn recv(log: &mut Logger, peer: &mut Peer) -> Result, Error> { match peer.recv().await { Ok(m) => Ok(Some(m)), Err(peer::RecvError::Io { source }) => Err(source.into()), Err(peer::RecvError::InvalidMessage { .. }) => { log.recv_corrupt().await; Ok(None) } } } async fn upload(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error> where IN: AsyncRead + Unpin, { let mut pos = 0; loop { match read_data(inp).await? { Some(data) => { let len = data.len(); send(peer, Msg::Data(data)).await?; log.send_data(pos, len).await; if let Some(Msg::Ack) = recv(log, peer).await? { log.recv_ack(pos).await; } else { log.debug_msg("didn't get ack??").await; } pos += len; } None => break, } } log.completed().await; Ok(()) }