Skip to content

Commit 2c9077a

Browse files
authored
[datafusion-cli] Implement average LIST duration for object store profiling (#19127)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18138 ## Rationale for this change The `list` operation returns a stream, so it previously recorded `duration: None`, missing performance insights. Time-to-first-item is a useful metric for list operations, indicating how quickly results start. This adds duration tracking by measuring time until the first item is yielded (or the stream ends). ## What changes are included in this PR? 1. Added `TimeToFirstItemStream`: A stream wrapper that measures elapsed time from creation until the first item is yielded (or the stream ends if empty). 2. Updated `instrumented_list`: Wraps the inner stream with `TimeToFirstItemStream` to record duration. 3. Changed `requests` field: Switched from `Mutex<Vec<RequestDetails>>` to `Arc<Mutex<Vec<RequestDetails>>>` to allow sharing across async boundaries (needed for the stream wrapper). 4. Updated tests: Modified `instrumented_store_list` to consume at least one stream item and verify that `duration` is now `Some(Duration)` instead of `None`. ## Are these changes tested? Yes. The existing test `instrumented_store_list` was updated to: - Consume at least one item from the stream using `stream.next().await` - Assert that `request.duration.is_some()` (previously `is_none()`) All tests pass, including the updated list test and other instrumented operation tests. ## Are there any user-facing changes? Users with profiling enabled will see duration values for `list` operations instead of nothing.
1 parent 0a0416d commit 2c9077a

File tree

1 file changed

+83
-16
lines changed

1 file changed

+83
-16
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion::{
3535
error::DataFusionError,
3636
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
3737
};
38-
use futures::stream::BoxStream;
38+
use futures::stream::{BoxStream, Stream};
3939
use object_store::{
4040
GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
4141
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
@@ -44,6 +44,58 @@ use object_store::{
4444
use parking_lot::{Mutex, RwLock};
4545
use url::Url;
4646

47+
/// A stream wrapper that measures the time until the first response(item or end of stream) is yielded
48+
struct TimeToFirstItemStream<S> {
49+
inner: S,
50+
start: Instant,
51+
request_index: usize,
52+
requests: Arc<Mutex<Vec<RequestDetails>>>,
53+
first_item_yielded: bool,
54+
}
55+
56+
impl<S> TimeToFirstItemStream<S> {
57+
fn new(
58+
inner: S,
59+
start: Instant,
60+
request_index: usize,
61+
requests: Arc<Mutex<Vec<RequestDetails>>>,
62+
) -> Self {
63+
Self {
64+
inner,
65+
start,
66+
request_index,
67+
requests,
68+
first_item_yielded: false,
69+
}
70+
}
71+
}
72+
73+
impl<S> Stream for TimeToFirstItemStream<S>
74+
where
75+
S: Stream<Item = Result<ObjectMeta>> + Unpin,
76+
{
77+
type Item = Result<ObjectMeta>;
78+
79+
fn poll_next(
80+
mut self: std::pin::Pin<&mut Self>,
81+
cx: &mut std::task::Context<'_>,
82+
) -> std::task::Poll<Option<Self::Item>> {
83+
let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
84+
85+
if !self.first_item_yielded && poll_result.is_ready() {
86+
self.first_item_yielded = true;
87+
let elapsed = self.start.elapsed();
88+
89+
let mut requests = self.requests.lock();
90+
if let Some(request) = requests.get_mut(self.request_index) {
91+
request.duration = Some(elapsed);
92+
}
93+
}
94+
95+
poll_result
96+
}
97+
}
98+
4799
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
48100
/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
49101
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
@@ -92,7 +144,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
92144
pub struct InstrumentedObjectStore {
93145
inner: Arc<dyn ObjectStore>,
94146
instrument_mode: AtomicU8,
95-
requests: Mutex<Vec<RequestDetails>>,
147+
requests: Arc<Mutex<Vec<RequestDetails>>>,
96148
}
97149

98150
impl InstrumentedObjectStore {
@@ -101,7 +153,7 @@ impl InstrumentedObjectStore {
101153
Self {
102154
inner: object_store,
103155
instrument_mode,
104-
requests: Mutex::new(Vec::new()),
156+
requests: Arc::new(Mutex::new(Vec::new())),
105157
}
106158
}
107159

@@ -219,19 +271,31 @@ impl InstrumentedObjectStore {
219271
prefix: Option<&Path>,
220272
) -> BoxStream<'static, Result<ObjectMeta>> {
221273
let timestamp = Utc::now();
222-
let ret = self.inner.list(prefix);
274+
let start = Instant::now();
275+
let inner_stream = self.inner.list(prefix);
276+
277+
let request_index = {
278+
let mut requests = self.requests.lock();
279+
requests.push(RequestDetails {
280+
op: Operation::List,
281+
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
282+
timestamp,
283+
duration: None,
284+
size: None,
285+
range: None,
286+
extra_display: None,
287+
});
288+
requests.len() - 1
289+
};
223290

224-
self.requests.lock().push(RequestDetails {
225-
op: Operation::List,
226-
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
227-
timestamp,
228-
duration: None, // list returns a stream, so the duration isn't meaningful
229-
size: None,
230-
range: None,
231-
extra_display: None,
232-
});
291+
let wrapped_stream = TimeToFirstItemStream::new(
292+
inner_stream,
293+
start,
294+
request_index,
295+
Arc::clone(&self.requests),
296+
);
233297

234-
ret
298+
Box::pin(wrapped_stream)
235299
}
236300

237301
async fn instrumented_list_with_delimiter(
@@ -759,6 +823,7 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
759823

760824
#[cfg(test)]
761825
mod tests {
826+
use futures::StreamExt;
762827
use object_store::WriteMultipart;
763828

764829
use super::*;
@@ -899,13 +964,15 @@ mod tests {
899964

900965
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
901966
assert!(instrumented.requests.lock().is_empty());
902-
let _ = instrumented.list(Some(&path));
967+
let mut stream = instrumented.list(Some(&path));
968+
// Consume at least one item from the stream to trigger duration measurement
969+
let _ = stream.next().await;
903970
assert_eq!(instrumented.requests.lock().len(), 1);
904971

905972
let request = instrumented.take_requests().pop().unwrap();
906973
assert_eq!(request.op, Operation::List);
907974
assert_eq!(request.path, path);
908-
assert!(request.duration.is_none());
975+
assert!(request.duration.is_some());
909976
assert!(request.size.is_none());
910977
assert!(request.range.is_none());
911978
assert!(request.extra_display.is_none());

0 commit comments

Comments
 (0)