Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 206 additions & 9 deletions crates/scouter_dataframe/src/caching_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use crate::parquet::utils::{
OBJECT_STORE_OPERATION_COPY, OBJECT_STORE_OPERATION_DELETE, OBJECT_STORE_OPERATION_GET_RANGE,
OBJECT_STORE_OPERATION_LIST, OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER,
OBJECT_STORE_OPERATION_PUT, ObjectStoreRequestTelemetry, get_options_range,
observe_object_meta_stream, observed_get_result_bytes,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
Expand All @@ -12,6 +18,7 @@ use object_store::{
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tracing::Instrument;

/// Cache key for range reads: (path, start, end).
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
Expand All @@ -25,6 +32,7 @@ struct RangeCacheKey {
/// Parquet footers are typically well under this; column data reads are larger
/// and will pass through uncached.
const MAX_CACHEABLE_BYTES: u64 = 2 * 1024 * 1024;
const CACHING_STORE_BACKEND: &str = "cache";

/// An `ObjectStore` wrapper that caches `head()` and small `get_range()` responses.
///
Expand Down Expand Up @@ -99,22 +107,74 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
let bytes = payload.content_length() as u64;
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_PUT,
Some(location),
None,
Some(bytes),
None,
);
let result = self
.inner
.put_opts(location, payload, opts)
.instrument(telemetry.span())
.await;
match &result {
Ok(_) => telemetry.finish_success(bytes),
Err(error) => telemetry.finish_error(error),
}
result
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_PUT,
Some(location),
None,
None,
None,
);
let result = self
.inner
.put_multipart_opts(location, opts)
.instrument(telemetry.span())
.await;
match &result {
Ok(_) => telemetry.finish_success(0),
Err(error) => telemetry.finish_error(error),
}
result
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let key: Arc<str> = location.to_string().into();
let operation = if options.head && options.range.is_none() {
crate::parquet::utils::OBJECT_STORE_OPERATION_HEAD
} else if options.range.is_some() {
OBJECT_STORE_OPERATION_GET_RANGE
} else {
crate::parquet::utils::OBJECT_STORE_OPERATION_GET
};
let (range_start, range_len) = get_options_range(&options);

if options.head && options.range.is_none() && is_plain_request(&options) {
if let Some(meta) = self.head_cache.get(&key) {
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
operation,
Some(location),
range_start,
range_len,
Some(true),
);
telemetry.finish_success(0);
return Ok(GetResult {
payload: GetResultPayload::Stream(futures::stream::empty().boxed()),
meta,
Expand All @@ -123,7 +183,26 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
});
}

let result = self.inner.get_opts(location, options).await?;
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
operation,
Some(location),
range_start,
range_len,
Some(false),
);
let result = self
.inner
.get_opts(location, options)
.instrument(telemetry.span())
.await;
match &result {
Ok(result) => {
telemetry.finish_success(observed_get_result_bytes(operation, result))
}
Err(error) => telemetry.finish_error(error),
}
let result = result?;
self.head_cache.insert(key, result.meta.clone());
return Ok(result);
}
Expand All @@ -134,7 +213,24 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
let meta = match self.head_cache.get(&key) {
Some(meta) => meta,
None => {
let meta = self.inner.head(location).await?;
let head_telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
crate::parquet::utils::OBJECT_STORE_OPERATION_HEAD,
Some(location),
None,
None,
Some(false),
);
let meta = self
.inner
.head(location)
.instrument(head_telemetry.span())
.await;
match &meta {
Ok(_) => head_telemetry.finish_success(0),
Err(error) => head_telemetry.finish_error(error),
}
let meta = meta?;
self.head_cache.insert(key.clone(), meta.clone());
meta
}
Expand All @@ -148,6 +244,15 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
};

if let Some(bytes) = self.range_cache.get(&range_key) {
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
operation,
Some(location),
range_start,
range_len,
Some(true),
);
telemetry.finish_success(bytes.len() as u64);
return Ok(GetResult {
payload: GetResultPayload::Stream(
futures::stream::once(async move { Ok(bytes) }).boxed(),
Expand All @@ -158,7 +263,24 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
});
}

let bytes = self.inner.get_range(location, range.clone()).await?;
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
operation,
Some(location),
range_start,
range_len,
Some(false),
);
let bytes = self
.inner
.get_range(location, range.clone())
.instrument(telemetry.span())
.await;
match &bytes {
Ok(bytes) => telemetry.finish_success(bytes.len() as u64),
Err(error) => telemetry.finish_error(error),
}
let bytes = bytes?;
self.range_cache.insert(range_key, bytes.clone());
return Ok(GetResult {
payload: GetResultPayload::Stream(
Expand All @@ -172,7 +294,24 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
}
}

self.inner.get_opts(location, options).await
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
operation,
Some(location),
range_start,
range_len,
Some(false),
);
let result = self
.inner
.get_opts(location, options)
.instrument(telemetry.span())
.await;
match &result {
Ok(result) => telemetry.finish_success(observed_get_result_bytes(operation, result)),
Err(error) => telemetry.finish_error(error),
}
result
}

fn delete_stream(
Expand All @@ -185,6 +324,16 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
.delete_stream(locations)
.map(move |result| {
if let Ok(location) = &result {
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_DELETE,
Some(location),
None,
None,
None,
);
let _entered = telemetry.enter();
telemetry.finish_success(0);
let key: Arc<str> = location.to_string().into();
head_cache.invalidate(&key);
range_cache.invalidate_all();
Expand All @@ -195,15 +344,63 @@ impl<T: ObjectStore> ObjectStore for CachingStore<T> {
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_LIST,
prefix,
None,
None,
None,
);
observe_object_meta_stream(self.inner.list(prefix), telemetry)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER,
prefix,
None,
None,
None,
);
let result = self
.inner
.list_with_delimiter(prefix)
.instrument(telemetry.span())
.await;
match &result {
Ok(result) => {
let object_bytes = result
.objects
.iter()
.fold(0_u64, |bytes, meta| bytes.saturating_add(meta.size));
telemetry.finish_success(object_bytes);
}
Err(error) => telemetry.finish_error(error),
}
result
}

async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
self.inner.copy_opts(from, to, options).await?;
let telemetry = ObjectStoreRequestTelemetry::new(
CACHING_STORE_BACKEND,
OBJECT_STORE_OPERATION_COPY,
Some(from),
None,
None,
None,
);
let result = self
.inner
.copy_opts(from, to, options)
.instrument(telemetry.span())
.await;
match &result {
Ok(_) => telemetry.finish_success(0),
Err(error) => telemetry.finish_error(error),
}
result?;
let to_key: Arc<str> = to.to_string().into();
self.head_cache.invalidate(&to_key);
self.range_cache.invalidate_all();
Expand Down
Loading
Loading