diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 4bd39ea..e9ffbc8 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -91,36 +91,34 @@ async fn send(peer: &mut Peer, m: Msg) -> Result<(), Error> { } } -async fn recv(log: &mut Logger, peer: &mut Peer) -> Result, Error> { - match peer.recv().await { - Ok(m) => Ok(Some(m)), - Err(peer::RecvError::Io { source }) => Err(source.into()), - Err(peer::RecvError::InvalidMessage { .. }) => { - log.recv_corrupt().await; - Ok(None) - } - } -} - async fn upload(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error> where IN: AsyncRead + Unpin, { let mut pos = 0; + let mut next_ack_len: Option = None; loop { - match read_data(inp).await? { - Some(data) => { - let len = data.len(); - send(peer, Msg::Data(data)).await?; - log.send_data(pos, len).await; - if let Some(Msg::Ack) = recv(log, peer).await? { + if let Some(ack_len) = next_ack_len { + match peer.recv().await { + Ok(Msg::Ack) => { log.recv_ack(pos).await; - } else { - log.debug_msg("didn't get ack??").await; + pos += ack_len; + next_ack_len = None; } - pos += len; + Ok(_) => log.debug_msg("got some other packet??").await, + + Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await, + Err(peer::RecvError::Io { source }) => return Err(source.into()), + } + } else { + match read_data(inp).await? { + Some(data) => { + next_ack_len = Some(data.len()); + log.send_data(pos, data.len()).await; + send(peer, Msg::Data(data)).await?; + } + None => break, } - None => break, } } log.completed().await;