Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 78 additions & 38 deletions cas_client/src/download_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::Write;
use std::sync::{Arc, Mutex, RwLock};
Expand All @@ -15,10 +16,12 @@ use http::header::RANGE;
use merklehash::MerkleHash;
use reqwest::Response;
use reqwest_middleware::ClientWithMiddleware;
use tokio::io::AsyncWriteExt;
use tracing::{debug, error, info, trace, warn};
use url::Url;
use utils::singleflight::Group;

use crate::SequentialOutput;
use crate::error::{CasClientError, Result};
use crate::http_client::Api;
use crate::output_provider::SeekingOutputProvider;
Expand Down Expand Up @@ -281,7 +284,7 @@ impl FetchTermDownload {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct ChunkRangeWrite {
pub chunk_range: ChunkRange,
pub unpacked_length: u32,
Expand All @@ -290,58 +293,95 @@ pub(crate) struct ChunkRangeWrite {
pub writer_offset: u64,
}

impl ChunkRangeWrite {
pub fn write_to_seek_writer(&self, download: &TermDownloadOutput, writer: &SeekingOutputProvider) -> Result<u64> {
let mut writer = writer.get_writer_at(self.writer_offset)?;
let data_sub_range_sliced = self.derive_write_bytes(&download);
writer.write_all(data_sub_range_sliced)?;
writer.flush()?;
Ok(self.take)
}

pub async fn write_to_sequential_writer(
&self,
download: &TermDownloadOutput,
writer: &mut SequentialOutput,
) -> Result<u64> {
let data_sub_range_sliced = self.derive_write_bytes(&download);
writer.write_all(data_sub_range_sliced).await?;
writer.flush().await?;
Ok(self.take)
}

fn derive_write_bytes<'a>(&self, download: &'a TermDownloadOutput) -> &'a [u8] {
let TermDownloadOutput {
data,
chunk_byte_indices,
chunk_range,
} = download;
debug_assert_eq!(chunk_byte_indices.len(), (chunk_range.end - chunk_range.start + 1) as usize);
debug_assert_eq!(*chunk_byte_indices.last().expect("checked len is something") as usize, data.len());

debug_assert!(self.chunk_range.start >= chunk_range.start);
debug_assert!(self.chunk_range.end > chunk_range.start);
debug_assert!(
self.chunk_range.start < chunk_range.end,
"{} < {} ;;; write {:?} term {:?}",
self.chunk_range.start,
chunk_range.end,
self.chunk_range,
chunk_range
);
debug_assert!(self.chunk_range.end <= chunk_range.end);

let start_chunk_offset_index = self.chunk_range.start - chunk_range.start;
let end_chunk_offset_index = self.chunk_range.end - chunk_range.start;
let start_chunk_offset = chunk_byte_indices[start_chunk_offset_index as usize] as usize;
let end_chunk_offset = chunk_byte_indices[end_chunk_offset_index as usize] as usize;
let data_sub_range = &data[start_chunk_offset..end_chunk_offset];
debug_assert_eq!(data_sub_range.len(), self.unpacked_length as usize);

debug_assert!(data_sub_range.len() as u64 >= self.skip_bytes + self.take);
&data_sub_range[(self.skip_bytes as usize)..((self.skip_bytes + self.take) as usize)]
}
}

#[derive(Debug)]
pub(crate) struct DownloadAndWriteAllSequential {
pub download: FetchTermDownload,
pub xorb_hash: MerkleHash,
pub writes: Vec<ChunkRangeWrite>,
}

impl DownloadAndWriteAllSequential {
pub async fn run(self) -> Result<TermDownloadResult<(TermDownloadOutput, MerkleHash, Vec<ChunkRangeWrite>)>> {
let download_result = self.download.run().await?;
Ok(TermDownloadResult {
payload: (download_result.payload, self.xorb_hash, self.writes),
duration: download_result.duration,
n_retries_on_403: download_result.n_retries_on_403,
})
}
}

/// Helper object containing the structs needed when downloading and writing a term in parallel
/// during reconstruction.
#[derive(Debug)]
pub(crate) struct FetchTermDownloadOnceAndWriteEverywhereUsed {
pub(crate) struct DownloadAndWriteAllParallel {
pub download: FetchTermDownload,
// pub write_offset: u64, // start position of the writer to write to
pub output: SeekingOutputProvider,
pub writes: Vec<ChunkRangeWrite>,
}

impl FetchTermDownloadOnceAndWriteEverywhereUsed {
impl DownloadAndWriteAllParallel {
/// Download the term and write it to the underlying storage, retry on 403
pub async fn run(self) -> Result<TermDownloadResult<u64>> {
let download_result = self.download.run().await?;
let TermDownloadOutput {
data,
chunk_byte_indices,
chunk_range,
} = download_result.payload;
debug_assert_eq!(chunk_byte_indices.len(), (chunk_range.end - chunk_range.start + 1) as usize);
debug_assert_eq!(*chunk_byte_indices.last().expect("checked len is something") as usize, data.len());

// write out the data
let mut total_written = 0;
for write in self.writes {
debug_assert!(write.chunk_range.start >= chunk_range.start);
debug_assert!(write.chunk_range.end > chunk_range.start);
debug_assert!(
write.chunk_range.start < chunk_range.end,
"{} < {} ;;; write {:?} term {:?}",
write.chunk_range.start,
chunk_range.end,
write.chunk_range,
chunk_range
);
debug_assert!(write.chunk_range.end <= chunk_range.end);

let start_chunk_offset_index = write.chunk_range.start - chunk_range.start;
let end_chunk_offset_index = write.chunk_range.end - chunk_range.start;
let start_chunk_offset = chunk_byte_indices[start_chunk_offset_index as usize] as usize;
let end_chunk_offset = chunk_byte_indices[end_chunk_offset_index as usize] as usize;
let data_sub_range = &data[start_chunk_offset..end_chunk_offset];
debug_assert_eq!(data_sub_range.len(), write.unpacked_length as usize);

debug_assert!(data_sub_range.len() as u64 >= write.skip_bytes + write.take);
let data_sub_range_sliced =
&data_sub_range[(write.skip_bytes as usize)..((write.skip_bytes + write.take) as usize)];

let mut writer = self.output.get_writer_at(write.writer_offset)?;
writer.write_all(data_sub_range_sliced)?;
writer.flush()?;
total_written += write.take;
total_written += write.write_to_seek_writer(&download_result.payload, &self.output)?;
}

Ok(TermDownloadResult {
Expand Down
1 change: 1 addition & 0 deletions cas_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod http_client;
mod interface;
#[cfg(not(target_family = "wasm"))]
mod local_client;
mod memory_cache;
#[cfg(not(target_family = "wasm"))]
mod output_provider;
pub mod remote_client;
Expand Down
42 changes: 42 additions & 0 deletions cas_client/src/memory_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// A single-threaded cache with Bélády's optimal replacement policy, with optional disk back up.

use std::collections::HashMap;
use std::fmt::Display;

use bytes::Bytes;
use serde::Serialize;
use tempfile::TempDir;

pub struct ClairvoyantHybridCache<K, V: Serialize> {
memory: HashMap<K, V>,
disk: Option<DiskStorage>,
}

impl<K, V: Serialize> ClairvoyantHybridCache<K, V> {}

struct DiskStorage {
_tempdir: TempDir,
}

impl DiskStorage {
fn put<K>(&self, k: K, v: &[u8]) -> std::io::Result<()>
where
K: Display,
{
todo!()
}

fn get<K>(&self, k: &K) -> std::io::Result<Bytes>
where
K: Display,
{
todo!()
}

fn remove<K>(&self, k: &K)
where
K: Display,
{
todo!()
}
}
Loading
Loading