Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 83 additions & 16 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,66 @@ use datafusion::{
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::BoxStream;
use futures::stream::{BoxStream, Stream};
use object_store::{
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
};
use parking_lot::{Mutex, RwLock};
use url::Url;

/// A stream wrapper that measures the time until the first response(item or end of stream) is yielded
struct TimeToFirstItemStream<S> {
inner: S,
start: Instant,
request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,
first_item_yielded: bool,
}

impl<S> TimeToFirstItemStream<S> {
fn new(
inner: S,
start: Instant,
request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,
) -> Self {
Self {
inner,
start,
request_index,
requests,
first_item_yielded: false,
}
}
}

impl<S> Stream for TimeToFirstItemStream<S>
where
S: Stream<Item = Result<ObjectMeta>> + Unpin,
{
type Item = Result<ObjectMeta>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);

if !self.first_item_yielded && poll_result.is_ready() {
self.first_item_yielded = true;
let elapsed = self.start.elapsed();
Comment on lines +84 to +86
Copy link
Contributor

@BlakeOrth BlakeOrth Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat concerned this elapsed calculation could end up generating misleading results in some cases. The concern stems from the fact that self.start is set when the stream is created. While this will probably be pretty accurate in many scenarios, imagine a scenario where something like the following happens:

let list_stream = TimeToFirstItemStream::new(stream, Instant::now(), 0, requests);
// The stream is created here, but has never been polled because the user has yet
// to await the stream. However, the "timer" is already running.
some_long_running_method().await;
let item = list_stream.next().await.unwrap();

In this case the elapsed duration would effectively be measuring the time of both some_long_running_method() as well as the time it took to yield the first element on the stream.
I'm wondering if we can set self.start once on the first call to poll_next(...) and then set elapsed on the first time an element hits Poll::Ready (as you've already done here) to get more accurate results.


let mut requests = self.requests.lock();
if let Some(request) = requests.get_mut(self.request_index) {
request.duration = Some(elapsed);
}
Comment on lines +88 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the current implementation I believe this strategy is currently "safe" (e.g. we won't accidentally modify the duration of a different request, leading to errant data). However, it does rely on the assumption that self.requests never has items removed from the middle of the Vec.

It might be useful to find a place to leave a comment noting that requests should be append-only to make it less likely for this assumption to be broken in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of relying on some potentially tricky race condition, we instead use an Arc<AtomicU64> for the duration? Then instead of

    request_index: usize,
    requests: Arc<Mutex<Vec<RequestDetails>>>,

We could just pass in

    request_duration: Arc<AtomicU64>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like this solution quite a bit better than indexing into an array!

}

poll_result
}
}

/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
Expand Down Expand Up @@ -91,7 +143,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
pub struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
instrument_mode: AtomicU8,
requests: Mutex<Vec<RequestDetails>>,
requests: Arc<Mutex<Vec<RequestDetails>>>,
}

impl InstrumentedObjectStore {
Expand All @@ -100,7 +152,7 @@ impl InstrumentedObjectStore {
Self {
inner: object_store,
instrument_mode,
requests: Mutex::new(Vec::new()),
requests: Arc::new(Mutex::new(Vec::new())),
}
}

Expand Down Expand Up @@ -218,19 +270,31 @@ impl InstrumentedObjectStore {
prefix: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let timestamp = Utc::now();
let ret = self.inner.list(prefix);
let start = Instant::now();
let inner_stream = self.inner.list(prefix);

let request_index = {
let mut requests = self.requests.lock();
requests.push(RequestDetails {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
duration: None,
size: None,
range: None,
extra_display: None,
});
requests.len() - 1
};

self.requests.lock().push(RequestDetails {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
duration: None, // list returns a stream, so the duration isn't meaningful
size: None,
range: None,
extra_display: None,
});
let wrapped_stream = TimeToFirstItemStream::new(
inner_stream,
start,
request_index,
Arc::clone(&self.requests),
);

ret
Box::pin(wrapped_stream)
}

async fn instrumented_list_with_delimiter(
Expand Down Expand Up @@ -758,6 +822,7 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {

#[cfg(test)]
mod tests {
use futures::StreamExt;
use object_store::WriteMultipart;

use super::*;
Expand Down Expand Up @@ -896,13 +961,15 @@ mod tests {

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
let mut stream = instrumented.list(Some(&path));
// Consume at least one item from the stream to trigger duration measurement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend also testing the duration value -- perhaps

        let start = Instant::now();
        let mut stream = instrumented.list(Some(&path));
        // Consume at least one item from the stream to trigger duration measurement
        let time_before_start = start.elapsed();
        let _ = stream.next().await;
...
        
        assert!(request.duration.unwrap() > time_before_start);

let _ = stream.next().await;
assert_eq!(instrumented.requests.lock().len(), 1);

let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
assert!(request.duration.is_none());
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
Expand Down