diff --git a/hptp-send/src/main.rs b/hptp-send/src/main.rs index 81d8754..475e3dd 100644 --- a/hptp-send/src/main.rs +++ b/hptp-send/src/main.rs @@ -7,6 +7,8 @@ extern crate thiserror; use hptp::logger::Logger; use hptp::msg::UpMsg; use hptp::peer::{self, Peer, UpPeer}; +use hptp::seg::SegIdx; +use std::collections::HashMap; use std::net::SocketAddr; use tokio::io::AsyncRead; // use tokio::net::UdpSocket; @@ -87,6 +89,64 @@ async fn send(peer: &mut UpPeer, m: UpMsg) -> Result<(), Error> { } } +struct SegmentStore<'i, IN> { + inp: &'i mut IN, + unacked_segs: HashMap, + unacked_upper_bound: u32, +} + +struct UnAcked { + payload: Vec, + nacks: usize, +} + +impl<'i, IN> SegmentStore<'i, IN> +where + IN: AsyncRead + Unpin, +{ + fn new(inp: &'i mut IN) -> Self { + SegmentStore { + inp, + unacked_segs: HashMap::new(), + unacked_upper_bound: 0, + } + } + + async fn read_segment(&mut self) -> Option<(SegIdx, &[u8])> { + use tokio::io::AsyncReadExt; + let mut buf = [0u8; hptp::seg::MAX_SEG_SIZE]; + let len = self.inp.read(&mut buf).await.unwrap_or(0); + if len > 0 { + let seg_idx = self.unacked_upper_bound; + self.unacked_upper_bound += 1; + let unack = self.unacked_segs.entry(seg_idx); + let unack = unack.or_insert(UnAcked { + payload: buf[..len].into(), + nacks: 0, + }); + Some((seg_idx, unack.payload.as_ref())) + } else { + None + } + } + + async fn get_segment(&mut self) -> Option<(SegIdx, &[u8])> { + if self.unacked_segs.is_empty() { + self.read_segment().await + } else { + self.unacked_segs + .iter() + .next() // get any entry, no ordering guarunteed or needed + .map(|(seg_idx, unack)| (*seg_idx, unack.payload.as_ref())) + } + } + + fn ack(&mut self, seg_idx: SegIdx) { + self.unacked_segs.remove(&seg_idx); + // TODO: update bounds + } +} + async fn upload(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error> where IN: AsyncRead + Unpin,