write hptp-send more like a state machine

This commit is contained in:
Milo Turner 2020-03-09 13:59:49 -04:00
parent 39fc9a5a48
commit 9553c25b26
1 changed files with 19 additions and 21 deletions

View File

@ -91,36 +91,34 @@ async fn send(peer: &mut Peer, m: Msg) -> Result<(), Error> {
} }
} }
async fn recv(log: &mut Logger, peer: &mut Peer) -> Result<Option<Msg>, Error> {
match peer.recv().await {
Ok(m) => Ok(Some(m)),
Err(peer::RecvError::Io { source }) => Err(source.into()),
Err(peer::RecvError::InvalidMessage { .. }) => {
log.recv_corrupt().await;
Ok(None)
}
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error> async fn upload<IN>(log: &mut Logger, peer: &mut Peer, inp: &mut IN) -> Result<(), Error>
where where
IN: AsyncRead + Unpin, IN: AsyncRead + Unpin,
{ {
let mut pos = 0; let mut pos = 0;
let mut next_ack_len: Option<usize> = None;
loop { loop {
match read_data(inp).await? { if let Some(ack_len) = next_ack_len {
Some(data) => { match peer.recv().await {
let len = data.len(); Ok(Msg::Ack) => {
send(peer, Msg::Data(data)).await?;
log.send_data(pos, len).await;
if let Some(Msg::Ack) = recv(log, peer).await? {
log.recv_ack(pos).await; log.recv_ack(pos).await;
} else { pos += ack_len;
log.debug_msg("didn't get ack??").await; next_ack_len = None;
} }
pos += len; Ok(_) => log.debug_msg("got some other packet??").await,
Err(peer::RecvError::InvalidMessage { .. }) => log.recv_corrupt().await,
Err(peer::RecvError::Io { source }) => return Err(source.into()),
}
} else {
match read_data(inp).await? {
Some(data) => {
next_ack_len = Some(data.len());
log.send_data(pos, data.len()).await;
send(peer, Msg::Data(data)).await?;
}
None => break,
} }
None => break,
} }
} }
log.completed().await; log.completed().await;