From 69d37684e5b164ee39014cccc953f70be2aa2ef6 Mon Sep 17 00:00:00 2001 From: Thorrester Date: Wed, 13 May 2026 17:43:25 -0400 Subject: [PATCH] Add object store observability wrapper --- crates/scouter_dataframe/src/caching_store.rs | 215 ++++++- crates/scouter_dataframe/src/parquet/utils.rs | 582 ++++++++++++++++++ 2 files changed, 788 insertions(+), 9 deletions(-) diff --git a/crates/scouter_dataframe/src/caching_store.rs b/crates/scouter_dataframe/src/caching_store.rs index fc2177c13..21fd6268a 100644 --- a/crates/scouter_dataframe/src/caching_store.rs +++ b/crates/scouter_dataframe/src/caching_store.rs @@ -1,3 +1,9 @@ +use crate::parquet::utils::{ + OBJECT_STORE_OPERATION_COPY, OBJECT_STORE_OPERATION_DELETE, OBJECT_STORE_OPERATION_GET_RANGE, + OBJECT_STORE_OPERATION_LIST, OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER, + OBJECT_STORE_OPERATION_PUT, ObjectStoreRequestTelemetry, get_options_range, + observe_object_meta_stream, observed_get_result_bytes, +}; use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; @@ -12,6 +18,7 @@ use object_store::{ use std::fmt; use std::sync::Arc; use std::time::Duration; +use tracing::Instrument; /// Cache key for range reads: (path, start, end). #[derive(Clone, Debug, Hash, Eq, PartialEq)] @@ -25,6 +32,7 @@ struct RangeCacheKey { /// Parquet footers are typically well under this; column data reads are larger /// and will pass through uncached. const MAX_CACHEABLE_BYTES: u64 = 2 * 1024 * 1024; +const CACHING_STORE_BACKEND: &str = "cache"; /// An `ObjectStore` wrapper that caches `head()` and small `get_range()` responses. /// @@ -99,7 +107,25 @@ impl ObjectStore for CachingStore { payload: PutPayload, opts: PutOptions, ) -> Result { - self.inner.put_opts(location, payload, opts).await + let bytes = payload.content_length() as u64; + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_PUT, + Some(location), + None, + Some(bytes), + None, + ); + let result = self + .inner + .put_opts(location, payload, opts) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(bytes), + Err(error) => telemetry.finish_error(error), + } + result } async fn put_multipart_opts( @@ -107,14 +133,48 @@ impl ObjectStore for CachingStore { location: &Path, opts: PutMultipartOptions, ) -> Result> { - self.inner.put_multipart_opts(location, opts).await + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_PUT, + Some(location), + None, + None, + None, + ); + let result = self + .inner + .put_multipart_opts(location, opts) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(0), + Err(error) => telemetry.finish_error(error), + } + result } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let key: Arc = location.to_string().into(); + let operation = if options.head && options.range.is_none() { + crate::parquet::utils::OBJECT_STORE_OPERATION_HEAD + } else if options.range.is_some() { + OBJECT_STORE_OPERATION_GET_RANGE + } else { + crate::parquet::utils::OBJECT_STORE_OPERATION_GET + }; + let (range_start, range_len) = get_options_range(&options); if options.head && options.range.is_none() && is_plain_request(&options) { if let Some(meta) = self.head_cache.get(&key) { + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + operation, + Some(location), + range_start, + range_len, + Some(true), + ); + telemetry.finish_success(0); return Ok(GetResult { payload: GetResultPayload::Stream(futures::stream::empty().boxed()), meta, @@ -123,7 +183,26 @@ impl ObjectStore for CachingStore { }); } - let result = self.inner.get_opts(location, options).await?; + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + operation, + Some(location), + range_start, + range_len, + Some(false), + ); + let result = self + .inner + .get_opts(location, options) + .instrument(telemetry.span()) + .await; + match &result { + Ok(result) => { + telemetry.finish_success(observed_get_result_bytes(operation, result)) + } + Err(error) => telemetry.finish_error(error), + } + let result = result?; self.head_cache.insert(key, result.meta.clone()); return Ok(result); } @@ -134,7 +213,24 @@ impl ObjectStore for CachingStore { let meta = match self.head_cache.get(&key) { Some(meta) => meta, None => { - let meta = self.inner.head(location).await?; + let head_telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + crate::parquet::utils::OBJECT_STORE_OPERATION_HEAD, + Some(location), + None, + None, + Some(false), + ); + let meta = self + .inner + .head(location) + .instrument(head_telemetry.span()) + .await; + match &meta { + Ok(_) => head_telemetry.finish_success(0), + Err(error) => head_telemetry.finish_error(error), + } + let meta = meta?; self.head_cache.insert(key.clone(), meta.clone()); meta } @@ -148,6 +244,15 @@ impl ObjectStore for CachingStore { }; if let Some(bytes) = self.range_cache.get(&range_key) { + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + operation, + Some(location), + range_start, + range_len, + Some(true), + ); + telemetry.finish_success(bytes.len() as u64); return Ok(GetResult { payload: GetResultPayload::Stream( futures::stream::once(async move { Ok(bytes) }).boxed(), @@ -158,7 +263,24 @@ impl ObjectStore for CachingStore { }); } - let bytes = self.inner.get_range(location, range.clone()).await?; + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + operation, + Some(location), + range_start, + range_len, + Some(false), + ); + let bytes = self + .inner + .get_range(location, range.clone()) + .instrument(telemetry.span()) + .await; + match &bytes { + Ok(bytes) => telemetry.finish_success(bytes.len() as u64), + Err(error) => telemetry.finish_error(error), + } + let bytes = bytes?; self.range_cache.insert(range_key, bytes.clone()); return Ok(GetResult { payload: GetResultPayload::Stream( @@ -172,7 +294,24 @@ impl ObjectStore for CachingStore { } } - self.inner.get_opts(location, options).await + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + operation, + Some(location), + range_start, + range_len, + Some(false), + ); + let result = self + .inner + .get_opts(location, options) + .instrument(telemetry.span()) + .await; + match &result { + Ok(result) => telemetry.finish_success(observed_get_result_bytes(operation, result)), + Err(error) => telemetry.finish_error(error), + } + result } fn delete_stream( @@ -185,6 +324,16 @@ impl ObjectStore for CachingStore { .delete_stream(locations) .map(move |result| { if let Ok(location) = &result { + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_DELETE, + Some(location), + None, + None, + None, + ); + let _entered = telemetry.enter(); + telemetry.finish_success(0); let key: Arc = location.to_string().into(); head_cache.invalidate(&key); range_cache.invalidate_all(); @@ -195,15 +344,63 @@ impl ObjectStore for CachingStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { - self.inner.list(prefix) + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_LIST, + prefix, + None, + None, + None, + ); + observe_object_meta_stream(self.inner.list(prefix), telemetry) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - self.inner.list_with_delimiter(prefix).await + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER, + prefix, + None, + None, + None, + ); + let result = self + .inner + .list_with_delimiter(prefix) + .instrument(telemetry.span()) + .await; + match &result { + Ok(result) => { + let object_bytes = result + .objects + .iter() + .fold(0_u64, |bytes, meta| bytes.saturating_add(meta.size)); + telemetry.finish_success(object_bytes); + } + Err(error) => telemetry.finish_error(error), + } + result } async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> { - self.inner.copy_opts(from, to, options).await?; + let telemetry = ObjectStoreRequestTelemetry::new( + CACHING_STORE_BACKEND, + OBJECT_STORE_OPERATION_COPY, + Some(from), + None, + None, + None, + ); + let result = self + .inner + .copy_opts(from, to, options) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(0), + Err(error) => telemetry.finish_error(error), + } + result?; let to_key: Arc = to.to_string().into(); self.head_cache.invalidate(&to_key); self.range_cache.invalidate_all(); diff --git a/crates/scouter_dataframe/src/parquet/utils.rs b/crates/scouter_dataframe/src/parquet/utils.rs index 59fdb92a9..849b0c8b0 100644 --- a/crates/scouter_dataframe/src/parquet/utils.rs +++ b/crates/scouter_dataframe/src/parquet/utils.rs @@ -8,6 +8,7 @@ use arrow_array::RecordBatch; use arrow_array::StringViewArray; use arrow_array::types::Float64Type; use arrow_array::types::TimestampNanosecondType; +use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::ScalarFunctionArgs; @@ -20,8 +21,24 @@ use deltalake::DeltaResult; use deltalake::logstore::{ LogStore, LogStoreFactory, ObjectStoreRef, StorageConfig, default_logstore, logstore_factories, }; +use futures::Stream; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ + CopyOptions, Error as ObjectStoreError, GetOptions, GetRange, GetResult, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result as ObjectStoreResult, +}; use scouter_types::{BinnedMetric, BinnedMetricStats, BinnedMetrics}; +use std::collections::hash_map::DefaultHasher; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; +use tracing::{Instrument, Span, field}; use tracing::{debug, error, instrument}; use url::Url; @@ -192,6 +209,481 @@ impl BinnedMetricsExtractor { } } +pub(crate) const OBJECT_STORE_SPAN_NAME: &str = "object_store.request"; +pub(crate) const OBJECT_STORE_STATUS_ATTR: &str = "object_store.status"; +pub(crate) const OBJECT_STORE_ERROR_KIND_ATTR: &str = "object_store.error.kind"; + +pub(crate) const OBJECT_STORE_OPERATION_LIST: &str = "list"; +pub(crate) const OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER: &str = "list_with_delimiter"; +pub(crate) const OBJECT_STORE_OPERATION_HEAD: &str = "head"; +pub(crate) const OBJECT_STORE_OPERATION_GET: &str = "get"; +pub(crate) const OBJECT_STORE_OPERATION_GET_RANGE: &str = "get_range"; +pub(crate) const OBJECT_STORE_OPERATION_PUT: &str = "put"; +pub(crate) const OBJECT_STORE_OPERATION_DELETE: &str = "delete"; +pub(crate) const OBJECT_STORE_OPERATION_COPY: &str = "copy"; + +pub(crate) const OBJECT_STORE_PATH_KIND_DELTA_LOG: &str = "delta_log"; +pub(crate) const OBJECT_STORE_PATH_KIND_PARQUET_DATA: &str = "parquet_data"; +pub(crate) const OBJECT_STORE_PATH_KIND_CHECKPOINT: &str = "checkpoint"; +pub(crate) const OBJECT_STORE_PATH_KIND_UNKNOWN: &str = "unknown"; + +const TRACE_OBJECT_STORE_REQUESTS_TOTAL: &str = "scouter_trace_object_store_requests_total"; +const TRACE_OBJECT_STORE_REQUEST_DURATION_MS: &str = + "scouter_trace_object_store_request_duration_ms"; +const TRACE_OBJECT_STORE_BYTES_TOTAL: &str = "scouter_trace_object_store_bytes_total"; +const CACHE_HIT_UNKNOWN: &str = "unknown"; +const STATUS_OK: &str = "ok"; +const STATUS_ERROR: &str = "error"; +const STATUS_DROPPED: &str = "dropped"; +const PARQUET_FOOTER_CANDIDATE_MAX_BYTES: u64 = 2 * 1024 * 1024; + +pub(crate) fn classify_object_path(location: &Path) -> &'static str { + let path = location.as_ref(); + let file_name = path.rsplit('/').next().unwrap_or(path); + + if path.ends_with("_delta_log/_last_checkpoint") || file_name.contains(".checkpoint.") { + OBJECT_STORE_PATH_KIND_CHECKPOINT + } else if path.split('/').any(|segment| segment == "_delta_log") { + OBJECT_STORE_PATH_KIND_DELTA_LOG + } else if path.ends_with(".parquet") { + OBJECT_STORE_PATH_KIND_PARQUET_DATA + } else { + OBJECT_STORE_PATH_KIND_UNKNOWN + } +} + +fn path_kind(location: Option<&Path>) -> &'static str { + location + .map(classify_object_path) + .unwrap_or(OBJECT_STORE_PATH_KIND_UNKNOWN) +} + +fn path_hash(location: Option<&Path>) -> String { + let mut hasher = DefaultHasher::new(); + location.map(Path::as_ref).unwrap_or("").hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +fn backend_from_url(location: &Url) -> &'static str { + match location.scheme() { + "file" => "local", + "gs" => "gcs", + "s3" | "s3a" => "s3", + "az" | "abfs" | "abfss" => "azure", + _ => "unknown", + } +} + +fn get_operation(options: &GetOptions) -> &'static str { + if options.head && options.range.is_none() { + OBJECT_STORE_OPERATION_HEAD + } else if options.range.is_some() { + OBJECT_STORE_OPERATION_GET_RANGE + } else { + OBJECT_STORE_OPERATION_GET + } +} + +pub(crate) fn get_options_range(options: &GetOptions) -> (Option, Option) { + match options.range.as_ref() { + Some(GetRange::Bounded(range)) => ( + Some(range.start), + Some(range.end.saturating_sub(range.start)), + ), + Some(GetRange::Offset(start)) => (Some(*start), None), + Some(GetRange::Suffix(len)) => (None, Some(*len)), + None => (None, None), + } +} + +pub(crate) fn is_parquet_footer_candidate(location: &Path, range_len: Option) -> bool { + classify_object_path(location) == OBJECT_STORE_PATH_KIND_PARQUET_DATA + && range_len + .map(|len| len <= PARQUET_FOOTER_CANDIDATE_MAX_BYTES) + .unwrap_or(false) +} + +fn object_store_error_kind(error: &ObjectStoreError) -> &'static str { + match error { + ObjectStoreError::Generic { .. } => "generic", + ObjectStoreError::NotFound { .. } => "not_found", + ObjectStoreError::InvalidPath { .. } => "invalid_path", + ObjectStoreError::JoinError { .. } => "join_error", + ObjectStoreError::NotSupported { .. } => "not_supported", + ObjectStoreError::AlreadyExists { .. } => "already_exists", + ObjectStoreError::Precondition { .. } => "precondition", + ObjectStoreError::NotModified { .. } => "not_modified", + ObjectStoreError::NotImplemented { .. } => "not_implemented", + ObjectStoreError::PermissionDenied { .. } => "permission_denied", + ObjectStoreError::Unauthenticated { .. } => "unauthenticated", + ObjectStoreError::UnknownConfigurationKey { .. } => "unknown_configuration_key", + _ => "unknown", + } +} + +#[derive(Clone, Debug)] +pub(crate) struct ObjectStoreRequestTelemetry { + backend: Arc, + operation: &'static str, + path_kind: &'static str, + span: Span, + start: Instant, +} + +impl ObjectStoreRequestTelemetry { + pub(crate) fn new( + backend: impl Into>, + operation: &'static str, + location: Option<&Path>, + range_start: Option, + range_len: Option, + cache_hit: Option, + ) -> Self { + let backend = backend.into(); + let path_kind = path_kind(location); + let path_hash = path_hash(location); + let cache_hit_value = cache_hit + .map(|hit| hit.to_string()) + .unwrap_or_else(|| CACHE_HIT_UNKNOWN.to_string()); + let parquet_footer_candidate = location + .map(|path| is_parquet_footer_candidate(path, range_len)) + .unwrap_or(false); + + let span = tracing::info_span!( + OBJECT_STORE_SPAN_NAME, + "object_store.backend" = %backend, + "object_store.operation" = operation, + "object_store.path_kind" = path_kind, + "object_store.path_hash" = %path_hash, + "object_store.range_start" = range_start.map(|value| value as i64), + "object_store.range_len" = range_len.map(|value| value as i64), + "object_store.cache.hit" = %cache_hit_value, + "object_store.status" = field::Empty, + "object_store.error.kind" = field::Empty, + "parquet_footer_candidate" = parquet_footer_candidate, + ); + + Self { + backend, + operation, + path_kind, + span, + start: Instant::now(), + } + } + + pub(crate) fn span(&self) -> Span { + self.span.clone() + } + + pub(crate) fn enter(&self) -> tracing::span::Entered<'_> { + self.span.enter() + } + + pub(crate) fn finish_success(&self, bytes: u64) { + self.finish(STATUS_OK, None, bytes); + } + + pub(crate) fn finish_error(&self, error: &ObjectStoreError) { + self.finish(STATUS_ERROR, Some(object_store_error_kind(error)), 0); + } + + fn finish_dropped(&self, bytes: u64) { + self.finish(STATUS_DROPPED, None, bytes); + } + + fn finish(&self, status: &'static str, error_kind: Option<&'static str>, bytes: u64) { + self.span.record(OBJECT_STORE_STATUS_ATTR, status); + if let Some(error_kind) = error_kind { + self.span.record(OBJECT_STORE_ERROR_KIND_ATTR, error_kind); + } + + let duration_ms = self.start.elapsed().as_secs_f64() * 1000.0; + metrics::counter!( + TRACE_OBJECT_STORE_REQUESTS_TOTAL, + "backend" => self.backend.to_string(), + "operation" => self.operation, + "path_kind" => self.path_kind, + "status" => status, + ) + .increment(1); + metrics::histogram!( + TRACE_OBJECT_STORE_REQUEST_DURATION_MS, + "backend" => self.backend.to_string(), + "operation" => self.operation, + "path_kind" => self.path_kind, + "status" => status, + ) + .record(duration_ms); + + if bytes > 0 { + metrics::counter!( + TRACE_OBJECT_STORE_BYTES_TOTAL, + "backend" => self.backend.to_string(), + "operation" => self.operation, + "path_kind" => self.path_kind, + ) + .increment(bytes); + } + } +} + +pub(crate) fn observed_get_result_bytes(operation: &str, result: &GetResult) -> u64 { + if operation == OBJECT_STORE_OPERATION_HEAD { + 0 + } else if operation == OBJECT_STORE_OPERATION_GET_RANGE { + result.range.end.saturating_sub(result.range.start) + } else { + result.meta.size + } +} + +struct ObservedObjectMetaStream { + inner: BoxStream<'static, ObjectStoreResult>, + telemetry: ObjectStoreRequestTelemetry, + bytes: u64, + complete: bool, +} + +impl Stream for ObservedObjectMetaStream { + type Item = ObjectStoreResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let _entered = this.telemetry.enter(); + match this.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(meta))) => { + this.bytes = this.bytes.saturating_add(meta.size); + Poll::Ready(Some(Ok(meta))) + } + Poll::Ready(Some(Err(error))) => { + this.complete = true; + this.telemetry.finish_error(&error); + Poll::Ready(Some(Err(error))) + } + Poll::Ready(None) => { + this.complete = true; + this.telemetry.finish_success(this.bytes); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for ObservedObjectMetaStream { + fn drop(&mut self) { + if !self.complete { + self.telemetry.finish_dropped(self.bytes); + } + } +} + +pub(crate) fn observe_object_meta_stream( + stream: BoxStream<'static, ObjectStoreResult>, + telemetry: ObjectStoreRequestTelemetry, +) -> BoxStream<'static, ObjectStoreResult> { + Box::pin(ObservedObjectMetaStream { + inner: stream, + telemetry, + bytes: 0, + complete: false, + }) +} + +#[derive(Debug, Clone)] +pub(crate) struct ObjectStoreSpanLayer { + inner: ObjectStoreRef, + backend: Arc, +} + +impl ObjectStoreSpanLayer { + pub(crate) fn new(inner: ObjectStoreRef, backend: impl Into>) -> Self { + Self { + inner, + backend: backend.into(), + } + } +} + +impl fmt::Display for ObjectStoreSpanLayer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ObjectStoreSpanLayer({}, {})", self.backend, self.inner) + } +} + +#[async_trait] +impl ObjectStore for ObjectStoreSpanLayer { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult { + let bytes = payload.content_length() as u64; + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + OBJECT_STORE_OPERATION_PUT, + Some(location), + None, + Some(bytes), + None, + ); + let result = self + .inner + .put_opts(location, payload, opts) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(bytes), + Err(error) => telemetry.finish_error(error), + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult> { + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + OBJECT_STORE_OPERATION_PUT, + Some(location), + None, + None, + None, + ); + let result = self + .inner + .put_multipart_opts(location, opts) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(0), + Err(error) => telemetry.finish_error(error), + } + result + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + let operation = get_operation(&options); + let (range_start, range_len) = get_options_range(&options); + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + operation, + Some(location), + range_start, + range_len, + None, + ); + let result = self + .inner + .get_opts(location, options) + .instrument(telemetry.span()) + .await; + match &result { + Ok(result) => telemetry.finish_success(observed_get_result_bytes(operation, result)), + Err(error) => telemetry.finish_error(error), + } + result + } + + fn delete_stream( + &self, + locations: BoxStream<'static, ObjectStoreResult>, + ) -> BoxStream<'static, ObjectStoreResult> { + let backend = self.backend.clone(); + self.inner + .delete_stream(locations) + .map(move |result| { + if let Ok(location) = &result { + let telemetry = ObjectStoreRequestTelemetry::new( + backend.clone(), + OBJECT_STORE_OPERATION_DELETE, + Some(location), + None, + None, + None, + ); + let _entered = telemetry.enter(); + telemetry.finish_success(0); + } + result + }) + .boxed() + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult> { + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + OBJECT_STORE_OPERATION_LIST, + prefix, + None, + None, + None, + ); + observe_object_meta_stream(self.inner.list(prefix), telemetry) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + OBJECT_STORE_OPERATION_LIST_WITH_DELIMITER, + prefix, + None, + None, + None, + ); + let result = self + .inner + .list_with_delimiter(prefix) + .instrument(telemetry.span()) + .await; + match &result { + Ok(result) => { + let object_bytes = result + .objects + .iter() + .fold(0_u64, |bytes, meta| bytes.saturating_add(meta.size)); + telemetry.finish_success(object_bytes); + } + Err(error) => telemetry.finish_error(error), + } + result + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> ObjectStoreResult<()> { + let telemetry = ObjectStoreRequestTelemetry::new( + self.backend.clone(), + OBJECT_STORE_OPERATION_COPY, + Some(from), + None, + None, + None, + ); + let result = self + .inner + .copy_opts(from, to, options) + .instrument(telemetry.span()) + .await; + match &result { + Ok(_) => telemetry.finish_success(0), + Err(error) => telemetry.finish_error(error), + } + result + } +} + +fn object_store_with_spans(store: ObjectStoreRef, backend: &'static str) -> ObjectStoreRef { + Arc::new(ObjectStoreSpanLayer::new(store, backend)) as ObjectStoreRef +} + pub(crate) struct PassthroughLogStoreFactory; impl LogStoreFactory for PassthroughLogStoreFactory { @@ -225,6 +717,9 @@ impl LogStoreFactory for PassthroughLogStoreFactory { } else { prefixed_store }; + let backend = backend_from_url(location); + let store = object_store_with_spans(store, backend); + let root_store = object_store_with_spans(root_store, backend); Ok(default_logstore(store, root_store, location, options)) } } @@ -439,3 +934,90 @@ pub fn create_attr_match_udf() -> ScalarUDF { pub fn match_attr_expr(search_blob: Expr, pattern: Expr) -> Expr { create_attr_match_udf().call(vec![search_blob, pattern]) } + +#[cfg(test)] +mod tests { + use super::*; + use futures::TryStreamExt; + use object_store::ObjectStoreExt; + use object_store::memory::InMemory; + + #[test] + fn classify_object_paths_by_delta_and_parquet_kind() { + assert_eq!( + classify_object_path(&Path::from("traces/_delta_log/00000000000000000001.json")), + OBJECT_STORE_PATH_KIND_DELTA_LOG + ); + assert_eq!( + classify_object_path(&Path::from("traces/_delta_log/_last_checkpoint")), + OBJECT_STORE_PATH_KIND_CHECKPOINT + ); + assert_eq!( + classify_object_path(&Path::from( + "traces/_delta_log/00000000000000000010.checkpoint.parquet" + )), + OBJECT_STORE_PATH_KIND_CHECKPOINT + ); + assert_eq!( + classify_object_path(&Path::from("traces/partition_date=2026-05-13/part.parquet")), + OBJECT_STORE_PATH_KIND_PARQUET_DATA + ); + assert_eq!( + classify_object_path(&Path::from("traces/readme.txt")), + OBJECT_STORE_PATH_KIND_UNKNOWN + ); + } + + #[test] + fn identifies_small_parquet_ranges_as_footer_candidates() { + let path = Path::from("traces/partition_date=2026-05-13/part.parquet"); + + assert!(is_parquet_footer_candidate(&path, Some(64 * 1024))); + assert!(!is_parquet_footer_candidate(&path, Some(4 * 1024 * 1024))); + assert!(!is_parquet_footer_candidate( + &Path::from("traces/_delta_log/00000000000000000001.json"), + Some(64 * 1024) + )); + } + + #[tokio::test] + async fn object_store_span_layer_delegates_core_operations() { + let inner = Arc::new(InMemory::new()) as ObjectStoreRef; + let store = ObjectStoreSpanLayer::new(inner, "memory"); + let path = Path::from("traces/partition_date=2026-05-13/part.parquet"); + let copy_path = Path::from("traces/partition_date=2026-05-13/part-copy.parquet"); + + store + .put(&path, PutPayload::from_static(b"0123456789abcdef")) + .await + .unwrap(); + + let meta = store.head(&path).await.unwrap(); + assert_eq!(meta.size, 16); + + let all_bytes = store.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(&all_bytes[..], b"0123456789abcdef"); + + let range_bytes = store.get_range(&path, 4..10).await.unwrap(); + assert_eq!(&range_bytes[..], b"456789"); + + let listed = store + .list(Some(&Path::from("traces"))) + .try_collect::>() + .await + .unwrap(); + assert_eq!(listed.len(), 1); + + let delimited = store + .list_with_delimiter(Some(&Path::from("traces"))) + .await + .unwrap(); + assert!(!delimited.common_prefixes.is_empty() || !delimited.objects.is_empty()); + + store.copy(&path, ©_path).await.unwrap(); + assert_eq!(store.head(©_path).await.unwrap().size, 16); + + store.delete(©_path).await.unwrap(); + assert!(store.head(©_path).await.is_err()); + } +}