-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[datafusion-cli] Implement average LIST duration for object store profiling #19127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,14 +35,66 @@ use datafusion::{ | |
| error::DataFusionError, | ||
| execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, | ||
| }; | ||
| use futures::stream::BoxStream; | ||
| use futures::stream::{BoxStream, Stream}; | ||
| use object_store::{ | ||
| path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, | ||
| ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, | ||
| }; | ||
| use parking_lot::{Mutex, RwLock}; | ||
| use url::Url; | ||
|
|
||
| /// A stream wrapper that measures the time until the first response(item or end of stream) is yielded | ||
| struct TimeToFirstItemStream<S> { | ||
| inner: S, | ||
| start: Instant, | ||
| request_index: usize, | ||
| requests: Arc<Mutex<Vec<RequestDetails>>>, | ||
| first_item_yielded: bool, | ||
| } | ||
|
|
||
| impl<S> TimeToFirstItemStream<S> { | ||
| fn new( | ||
| inner: S, | ||
| start: Instant, | ||
| request_index: usize, | ||
| requests: Arc<Mutex<Vec<RequestDetails>>>, | ||
| ) -> Self { | ||
| Self { | ||
| inner, | ||
| start, | ||
| request_index, | ||
| requests, | ||
| first_item_yielded: false, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<S> Stream for TimeToFirstItemStream<S> | ||
| where | ||
| S: Stream<Item = Result<ObjectMeta>> + Unpin, | ||
| { | ||
| type Item = Result<ObjectMeta>; | ||
|
|
||
| fn poll_next( | ||
| mut self: std::pin::Pin<&mut Self>, | ||
| cx: &mut std::task::Context<'_>, | ||
| ) -> std::task::Poll<Option<Self::Item>> { | ||
| let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx); | ||
|
|
||
| if !self.first_item_yielded && poll_result.is_ready() { | ||
| self.first_item_yielded = true; | ||
| let elapsed = self.start.elapsed(); | ||
|
|
||
| let mut requests = self.requests.lock(); | ||
| if let Some(request) = requests.get_mut(self.request_index) { | ||
| request.duration = Some(elapsed); | ||
| } | ||
|
Comment on lines
+88
to
+91
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the current implementation I believe this strategy is currently "safe" (e.g. we won't accidentally modify the duration of a different request, leading to errant data). However, it does rely on the assumption that It might be useful to find a place to leave a comment noting that
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about instead of relying on some potentially tricky race condition, we instead use an request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,We could just pass in request_duration: Arc<AtomicU64>,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I like this solution quite a bit better than indexing into an array! |
||
| } | ||
|
|
||
| poll_result | ||
| } | ||
| } | ||
|
|
||
| /// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling | ||
| /// data will have a small negative impact on both CPU and memory usage. Default is `Disabled` | ||
| #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] | ||
|
|
@@ -91,7 +143,7 @@ impl From<u8> for InstrumentedObjectStoreMode { | |
| pub struct InstrumentedObjectStore { | ||
| inner: Arc<dyn ObjectStore>, | ||
| instrument_mode: AtomicU8, | ||
| requests: Mutex<Vec<RequestDetails>>, | ||
| requests: Arc<Mutex<Vec<RequestDetails>>>, | ||
| } | ||
|
|
||
| impl InstrumentedObjectStore { | ||
|
|
@@ -100,7 +152,7 @@ impl InstrumentedObjectStore { | |
| Self { | ||
| inner: object_store, | ||
| instrument_mode, | ||
| requests: Mutex::new(Vec::new()), | ||
| requests: Arc::new(Mutex::new(Vec::new())), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -218,19 +270,31 @@ impl InstrumentedObjectStore { | |
| prefix: Option<&Path>, | ||
| ) -> BoxStream<'static, Result<ObjectMeta>> { | ||
| let timestamp = Utc::now(); | ||
| let ret = self.inner.list(prefix); | ||
| let start = Instant::now(); | ||
| let inner_stream = self.inner.list(prefix); | ||
|
|
||
| let request_index = { | ||
| let mut requests = self.requests.lock(); | ||
| requests.push(RequestDetails { | ||
| op: Operation::List, | ||
| path: prefix.cloned().unwrap_or_else(|| Path::from("")), | ||
| timestamp, | ||
| duration: None, | ||
| size: None, | ||
| range: None, | ||
| extra_display: None, | ||
| }); | ||
| requests.len() - 1 | ||
| }; | ||
|
|
||
| self.requests.lock().push(RequestDetails { | ||
| op: Operation::List, | ||
| path: prefix.cloned().unwrap_or_else(|| Path::from("")), | ||
| timestamp, | ||
| duration: None, // list returns a stream, so the duration isn't meaningful | ||
| size: None, | ||
| range: None, | ||
| extra_display: None, | ||
| }); | ||
| let wrapped_stream = TimeToFirstItemStream::new( | ||
| inner_stream, | ||
| start, | ||
| request_index, | ||
| Arc::clone(&self.requests), | ||
| ); | ||
|
|
||
| ret | ||
| Box::pin(wrapped_stream) | ||
| } | ||
|
|
||
| async fn instrumented_list_with_delimiter( | ||
|
|
@@ -758,6 +822,7 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use futures::StreamExt; | ||
| use object_store::WriteMultipart; | ||
|
|
||
| use super::*; | ||
|
|
@@ -896,13 +961,15 @@ mod tests { | |
|
|
||
| instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); | ||
| assert!(instrumented.requests.lock().is_empty()); | ||
| let _ = instrumented.list(Some(&path)); | ||
| let mut stream = instrumented.list(Some(&path)); | ||
| // Consume at least one item from the stream to trigger duration measurement | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend also testing the duration value -- perhaps let start = Instant::now();
let mut stream = instrumented.list(Some(&path));
// Consume at least one item from the stream to trigger duration measurement
let time_before_start = start.elapsed();
let _ = stream.next().await;
...
assert!(request.duration.unwrap() > time_before_start); |
||
| let _ = stream.next().await; | ||
| assert_eq!(instrumented.requests.lock().len(), 1); | ||
|
|
||
| let request = instrumented.take_requests().pop().unwrap(); | ||
| assert_eq!(request.op, Operation::List); | ||
| assert_eq!(request.path, path); | ||
| assert!(request.duration.is_none()); | ||
| assert!(request.duration.is_some()); | ||
| assert!(request.size.is_none()); | ||
| assert!(request.range.is_none()); | ||
| assert!(request.extra_display.is_none()); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm somewhat concerned this elapsed calculation could end up generating misleading results in some cases. The concern stems from the fact that
self.startis set when the stream is created. While this will probably be pretty accurate in many scenarios, imagine a scenario where something like the following happens:In this case the elapsed duration would effectively be measuring the time of both
some_long_running_method()as well as the time it took to yield the first element on the stream.I'm wondering if we can set
self.startonce on the first call topoll_next(...)and then set elapsed on the first time an element hitsPoll::Ready(as you've already done here) to get more accurate results.