Skip to content

Conversation

@peterxcli
Copy link
Member

Which issue does this PR close?

Rationale for this change

The list operation returns a stream, so it previously recorded duration: None, missing performance insights. Time-to-first-item is a useful metric for list operations, indicating how quickly results start. This adds duration tracking by measuring time until the first item is yielded (or the stream ends).

What changes are included in this PR?

  1. Added TimeToFirstItemStream: A stream wrapper that measures elapsed time from creation until the first item is yielded (or the stream ends if empty).
  2. Updated instrumented_list: Wraps the inner stream with TimeToFirstItemStream to record duration.
  3. Changed requests field: Switched from Mutex<Vec<RequestDetails>> to Arc<Mutex<Vec<RequestDetails>>> to allow sharing across async boundaries (needed for the stream wrapper).
  4. Updated tests: Modified instrumented_store_list to consume at least one stream item and verify that duration is now Some(Duration) instead of None.

Are these changes tested?

Yes. The existing test instrumented_store_list was updated to:

  • Consume at least one item from the stream using stream.next().await
  • Assert that request.duration.is_some() (previously is_none())

All tests pass, including the updated list test and other instrumented operation tests.

Are there any user-facing changes?

Users with profiling enabled will see duration values for list operations instead of nothing.

@alamb
Copy link
Contributor

alamb commented Dec 8, 2025

FYI @BlakeOrth -- are you available to review this PR?

@BlakeOrth
Copy link
Contributor

@alamb Yes, I will allocate time for a review.

Copy link
Contributor

@BlakeOrth BlakeOrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking this on! Overall I think the strategy of wrapping the stream seems sound.

I do have one small concern about how the timing is being tracked; I've left an inline comment with more details.

Comment on lines +84 to +86
if !self.first_item_yielded && poll_result.is_ready() {
self.first_item_yielded = true;
let elapsed = self.start.elapsed();
Copy link
Contributor

@BlakeOrth BlakeOrth Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat concerned this elapsed calculation could end up generating misleading results in some cases. The concern stems from the fact that self.start is set when the stream is created. While this will probably be pretty accurate in many scenarios, imagine a scenario where something like the following happens:

let list_stream = TimeToFirstItemStream::new(stream, Instant::now(), 0, requests);
// The stream is created here, but has never been polled because the user has yet
// to await the stream. However, the "timer" is already running.
some_long_running_method().await;
let item = list_stream.next().await.unwrap();

In this case the elapsed duration would effectively be measuring the time of both some_long_running_method() as well as the time it took to yield the first element on the stream.
I'm wondering if we can set self.start once on the first call to poll_next(...) and then set elapsed on the first time an element hits Poll::Ready (as you've already done here) to get more accurate results.

Comment on lines +88 to +91
let mut requests = self.requests.lock();
if let Some(request) = requests.get_mut(self.request_index) {
request.duration = Some(elapsed);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the current implementation I believe this strategy is currently "safe" (e.g. we won't accidentally modify the duration of a different request, leading to errant data). However, it does rely on the assumption that self.requests never has items removed from the middle of the Vec.

It might be useful to find a place to leave a comment noting that requests should be append-only to make it less likely for this assumption to be broken in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of relying on some potentially tricky race condition, we instead use an Arc<AtomicU64> for the duration? Then instead of

    request_index: usize,
    requests: Arc<Mutex<Vec<RequestDetails>>>,

We could just pass in

    request_duration: Arc<AtomicU64>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like this solution quite a bit better than indexing into an array!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @peterxcli and @BlakeOrth

I think this PR good from my perspective. I have a suggestion on how to make it simpler maybe but I don't think that is needed

Comment on lines +88 to +91
let mut requests = self.requests.lock();
if let Some(request) = requests.get_mut(self.request_index) {
request.duration = Some(elapsed);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of relying on some potentially tricky race condition, we instead use an Arc<AtomicU64> for the duration? Then instead of

    request_index: usize,
    requests: Arc<Mutex<Vec<RequestDetails>>>,

We could just pass in

    request_duration: Arc<AtomicU64>,

assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
let mut stream = instrumented.list(Some(&path));
// Consume at least one item from the stream to trigger duration measurement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend also testing the duration value -- perhaps

        let start = Instant::now();
        let mut stream = instrumented.list(Some(&path));
        // Consume at least one item from the stream to trigger duration measurement
        let time_before_start = start.elapsed();
        let _ = stream.next().await;
...
        
        assert!(request.duration.unwrap() > time_before_start);

@alamb
Copy link
Contributor

alamb commented Dec 14, 2025

BTW I tested it out locally and it worked was great:

ObjectStore Profile mode set to Trace
> CREATE EXTERNAL TABLE overture_partitioned
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-09-24.0/theme=addresses/';


0 row(s) fetched.
Elapsed 5.442 seconds.

Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(overturemaps-us-west-2)
2025-12-14T12:45:54.938044+00:00 operation=List duration=0.304596s path=release/2025-09-24.0/theme=addresses
2025-12-14T12:45:55.242976+00:00 operation=List duration=0.092714s path=release/2025-09-24.0/theme=addresses
2025-12-14T12:45:55.338804+00:00 operation=List duration=0.089522s path=release/2025-09-24.0/theme=addresses

Summaries:
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Operation | Metric   | min       | max       | avg       | sum       | count |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| List      | duration | 0.089522s | 0.304596s | 0.162277s | 0.486832s | 3     |
| List      | size     |           |           |           |           | 3     |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
> select * from overture_partitioned limit 10;

@alamb
Copy link
Contributor

alamb commented Dec 18, 2025

While I think there are some additional improvements possible, lets just merge this PR and doing them as follow on PRs

@alamb alamb added this pull request to the merge queue Dec 18, 2025
@alamb
Copy link
Contributor

alamb commented Dec 18, 2025

Thanks @peterxcli and @BlakeOrth

Merged via the queue into apache:main with commit 2c9077a Dec 18, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[datafusion-cli] Implement average LIST duration for object store profiling

3 participants