segment store

This commit is contained in:
Milo Turner 2020-03-10 17:46:10 -04:00
parent 86af734542
commit dfd33acf1e
1 changed files with 60 additions and 0 deletions

View File

@ -7,6 +7,8 @@ extern crate thiserror;
use hptp::logger::Logger;
use hptp::msg::UpMsg;
use hptp::peer::{self, Peer, UpPeer};
use hptp::seg::SegIdx;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::AsyncRead;
// use tokio::net::UdpSocket;
@ -87,6 +89,64 @@ async fn send(peer: &mut UpPeer, m: UpMsg) -> Result<(), Error> {
}
}
struct SegmentStore<'i, IN> {
inp: &'i mut IN,
unacked_segs: HashMap<SegIdx, UnAcked>,
unacked_upper_bound: u32,
}
struct UnAcked {
payload: Vec<u8>,
nacks: usize,
}
impl<'i, IN> SegmentStore<'i, IN>
where
IN: AsyncRead + Unpin,
{
fn new(inp: &'i mut IN) -> Self {
SegmentStore {
inp,
unacked_segs: HashMap::new(),
unacked_upper_bound: 0,
}
}
async fn read_segment(&mut self) -> Option<(SegIdx, &[u8])> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; hptp::seg::MAX_SEG_SIZE];
let len = self.inp.read(&mut buf).await.unwrap_or(0);
if len > 0 {
let seg_idx = self.unacked_upper_bound;
self.unacked_upper_bound += 1;
let unack = self.unacked_segs.entry(seg_idx);
let unack = unack.or_insert(UnAcked {
payload: buf[..len].into(),
nacks: 0,
});
Some((seg_idx, unack.payload.as_ref()))
} else {
None
}
}
async fn get_segment(&mut self) -> Option<(SegIdx, &[u8])> {
if self.unacked_segs.is_empty() {
self.read_segment().await
} else {
self.unacked_segs
.iter()
.next() // get any entry, no ordering guarunteed or needed
.map(|(seg_idx, unack)| (*seg_idx, unack.payload.as_ref()))
}
}
fn ack(&mut self, seg_idx: SegIdx) {
self.unacked_segs.remove(&seg_idx);
// TODO: update bounds
}
}
async fn upload<IN>(log: &mut Logger, peer: &mut UpPeer, inp: &mut IN) -> Result<(), Error>
where
IN: AsyncRead + Unpin,