-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[datafusion-cli] Implement average LIST duration for object store profiling #19127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[datafusion-cli] Implement average LIST duration for object store profiling #19127
Conversation
…mentedObjectStore.
|
FYI @BlakeOrth -- are you available to review this PR? |
|
@alamb Yes, I will allocate time for a review. |
BlakeOrth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking this on! Overall I think the strategy of wrapping the stream seems sound.
I do have one small concern about how the timing is being tracked; I've left an inline comment with more details.
| if !self.first_item_yielded && poll_result.is_ready() { | ||
| self.first_item_yielded = true; | ||
| let elapsed = self.start.elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm somewhat concerned this elapsed calculation could end up generating misleading results in some cases. The concern stems from the fact that self.start is set when the stream is created. While this will probably be pretty accurate in many scenarios, imagine a scenario where something like the following happens:
let list_stream = TimeToFirstItemStream::new(stream, Instant::now(), 0, requests);
// The stream is created here, but has never been polled because the user has yet
// to await the stream. However, the "timer" is already running.
some_long_running_method().await;
let item = list_stream.next().await.unwrap();In this case the elapsed duration would effectively be measuring the time of both some_long_running_method() as well as the time it took to yield the first element on the stream.
I'm wondering if we can set self.start once on the first call to poll_next(...) and then set elapsed on the first time an element hits Poll::Ready (as you've already done here) to get more accurate results.
| let mut requests = self.requests.lock(); | ||
| if let Some(request) = requests.get_mut(self.request_index) { | ||
| request.duration = Some(elapsed); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current implementation I believe this strategy is currently "safe" (e.g. we won't accidentally modify the duration of a different request, leading to errant data). However, it does rely on the assumption that self.requests never has items removed from the middle of the Vec.
It might be useful to find a place to leave a comment noting that requests should be append-only to make it less likely for this assumption to be broken in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead of relying on some potentially tricky race condition, we instead use an Arc<AtomicU64> for the duration? Then instead of
request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,We could just pass in
request_duration: Arc<AtomicU64>,There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like this solution quite a bit better than indexing into an array!
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @peterxcli and @BlakeOrth
I think this PR good from my perspective. I have a suggestion on how to make it simpler maybe but I don't think that is needed
| let mut requests = self.requests.lock(); | ||
| if let Some(request) = requests.get_mut(self.request_index) { | ||
| request.duration = Some(elapsed); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead of relying on some potentially tricky race condition, we instead use an Arc<AtomicU64> for the duration? Then instead of
request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,We could just pass in
request_duration: Arc<AtomicU64>,| assert!(instrumented.requests.lock().is_empty()); | ||
| let _ = instrumented.list(Some(&path)); | ||
| let mut stream = instrumented.list(Some(&path)); | ||
| // Consume at least one item from the stream to trigger duration measurement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend also testing the duration value -- perhaps
let start = Instant::now();
let mut stream = instrumented.list(Some(&path));
// Consume at least one item from the stream to trigger duration measurement
let time_before_start = start.elapsed();
let _ = stream.next().await;
...
assert!(request.duration.unwrap() > time_before_start);|
BTW I tested it out locally and it worked was great: ObjectStore Profile mode set to Trace
> CREATE EXTERNAL TABLE overture_partitioned
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-09-24.0/theme=addresses/';
0 row(s) fetched.
Elapsed 5.442 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(overturemaps-us-west-2)
2025-12-14T12:45:54.938044+00:00 operation=List duration=0.304596s path=release/2025-09-24.0/theme=addresses
2025-12-14T12:45:55.242976+00:00 operation=List duration=0.092714s path=release/2025-09-24.0/theme=addresses
2025-12-14T12:45:55.338804+00:00 operation=List duration=0.089522s path=release/2025-09-24.0/theme=addresses
Summaries:
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Operation | Metric | min | max | avg | sum | count |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| List | duration | 0.089522s | 0.304596s | 0.162277s | 0.486832s | 3 |
| List | size | | | | | 3 |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
> select * from overture_partitioned limit 10; |
|
While I think there are some additional improvements possible, lets just merge this PR and doing them as follow on PRs |
|
Thanks @peterxcli and @BlakeOrth |
Which issue does this PR close?
Rationale for this change
The
listoperation returns a stream, so it previously recordedduration: None, missing performance insights. Time-to-first-item is a useful metric for list operations, indicating how quickly results start. This adds duration tracking by measuring time until the first item is yielded (or the stream ends).What changes are included in this PR?
TimeToFirstItemStream: A stream wrapper that measures elapsed time from creation until the first item is yielded (or the stream ends if empty).instrumented_list: Wraps the inner stream withTimeToFirstItemStreamto record duration.requestsfield: Switched fromMutex<Vec<RequestDetails>>toArc<Mutex<Vec<RequestDetails>>>to allow sharing across async boundaries (needed for the stream wrapper).instrumented_store_listto consume at least one stream item and verify thatdurationis nowSome(Duration)instead ofNone.Are these changes tested?
Yes. The existing test
instrumented_store_listwas updated to:stream.next().awaitrequest.duration.is_some()(previouslyis_none())All tests pass, including the updated list test and other instrumented operation tests.
Are there any user-facing changes?
Users with profiling enabled will see duration values for
listoperations instead of nothing.