CS3700-project3/hptp-send/src/main.rs

122 lines
3.3 KiB
Rust

#![feature(backtrace)]
extern crate hptp;
extern crate tokio;
#[macro_use]
extern crate thiserror;
use hptp::logger::Logger;
use hptp::msg::{DownMsg, UpMsg};
use hptp::peer::{self, Peer, UpPeer};
use std::net::SocketAddr;
use tokio::io::AsyncRead;
// use tokio::net::UdpSocket;
#[derive(Error, Debug)]
enum Error {
#[error("io error: {source}")]
Io {
#[from]
source: tokio::io::Error,
backtrace: std::backtrace::Backtrace,
},
#[error("invalid command-line arguments")]
InvalidArgs,
}
fn main() {
match entry() {
Err(Error::InvalidArgs) => print_usage(),
Err(e) => {
use std::error::Error;
println!("Error: {:?}", e);
for bt in e.backtrace() {
println!("{}", bt);
}
}
Ok(()) => (),
}
}
fn print_usage() {
print!("Usage:\n./3700send <host-ip>:<host-port>\n")
}
fn entry() -> Result<(), Error> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut log = Logger::new();
rt.block_on(start(&mut log))
}
async fn start(log: &mut Logger) -> Result<(), Error> {
let targ_addr = parse_args(std::env::args())?;
let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?;
log.debug_msg(format!("bound on {}", sock.local_addr()?))
.await;
let mut out = tokio::io::stdin();
let mut peer = Peer::new(sock);
peer.set_known_target(targ_addr);
upload(log, &mut peer, &mut out).await
}
fn parse_args(mut args: impl Iterator<Item = String>) -> Result<SocketAddr, Error> {
args.nth(1)
.ok_or(Error::InvalidArgs)?
.parse()
.map_err(|_| Error::InvalidArgs)
}
async fn read_segment<IN>(inp: &mut IN) -> Result<Option<Vec<u8>>, Error>
where
IN: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
let mut buf = [0u8; hptp::seg::MAX_SEG_SIZE];
let len = inp.read(&mut buf).await?;
Ok(if len > 0 {
Some(buf[..len].into())
} else {
None
})
}
async fn send(peer: &mut UpPeer, m: UpMsg) -> Result<(), Error> {
match peer.send(m).await {
Ok(()) => Ok(()),
Err(peer::SendError::Io { source }) => Err(source.into()),
Err(peer::SendError::NoTarget) => panic!("tried to send w/ no target!"),
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error>
where
IN: AsyncRead + Unpin,
{
let mut pos = 0;
let mut next_ack_len: Option<usize> = None;
loop {
if let Some(ack_len) = next_ack_len {
match peer.recv().await {
Ok(DownMsg::Ack { .. }) => {
log.recv_ack(pos).await;
pos += ack_len;
next_ack_len = None;
}
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await,
Err(peer::RecvError::Io { source }) => return Err(source.into()),
}
} else {
match read_segment(inp).await? {
Some(data) => {
next_ack_len = Some(data.len());
log.send_data(pos, data.len()).await;
send(peer, UpMsg::Data { payload: data }).await?;
}
None => break,
}
}
}
log.completed().await;
Ok(())
}