Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,32 @@ impl TrackedConsumer {
}
}

/// A point-in-time snapshot of a tracked memory consumer's state.
///
/// Returned by [`TrackConsumersPool::metrics()`].
#[derive(Debug, Clone)]
pub struct MemoryConsumerMetrics {
/// The name of the memory consumer
pub name: String,
/// Whether this consumer can spill to disk
pub can_spill: bool,
/// The number of bytes currently reserved by this consumer
pub reserved: usize,
/// The peak number of bytes reserved by this consumer
pub peak: usize,
}

impl From<&TrackedConsumer> for MemoryConsumerMetrics {
fn from(tracked: &TrackedConsumer) -> Self {
Self {
name: tracked.name.clone(),
can_spill: tracked.can_spill,
reserved: tracked.reserved(),
peak: tracked.peak(),
}
}
}

/// A [`MemoryPool`] that tracks the consumers that have
/// reserved memory within the inner memory pool.
///
Expand Down Expand Up @@ -381,6 +407,15 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
}
}

/// Returns a snapshot of all currently tracked consumers.
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
self.tracked_consumers
.lock()
.values()
.map(Into::into)
.collect()
}

/// Returns a formatted string with the top memory consumers.
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
Expand Down Expand Up @@ -778,6 +813,54 @@ mod tests {
test_per_pool_type(tracked_greedy_pool);
}

#[test]
fn test_track_consumers_pool_metrics() {
let track_consumers_pool = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(1000),
NonZeroUsize::new(3).unwrap(),
));
let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;

// Empty pool has no metrics
assert!(track_consumers_pool.metrics().is_empty());

// Register consumers with different spill settings
let r1 = MemoryConsumer::new("spilling")
.with_can_spill(true)
.register(&memory_pool);
let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);

// Grow r1 in two steps to verify peak tracking
r1.grow(100);
r1.grow(50);
r1.shrink(50); // reserved=100, peak=150

r2.grow(200); // reserved=200, peak=200

let mut metrics = track_consumers_pool.metrics();
metrics.sort_by_key(|m| m.name.clone());

assert_eq!(metrics.len(), 2);

let m_non = &metrics[0];
assert_eq!(m_non.name, "non-spilling");
assert!(!m_non.can_spill);
assert_eq!(m_non.reserved, 200);
assert_eq!(m_non.peak, 200);

let m_spill = &metrics[1];
assert_eq!(m_spill.name, "spilling");
assert!(m_spill.can_spill);
assert_eq!(m_spill.reserved, 100);
assert_eq!(m_spill.peak, 150);

// Unregistered consumers are removed from metrics
drop(r2);
let metrics = track_consumers_pool.metrics();
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].name, "spilling");
}

#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let setting = make_settings();
Expand Down
Loading