From 1b4731fa576ea4a45a75c95774e27b5ce0b0f4eb Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 22 Jun 2026 07:05:49 -0600 Subject: [PATCH 1/3] add PartitionExtrema + SortExec observer --- .../physical-plan/src/execution_plan.rs | 106 +++++- datafusion/physical-plan/src/lib.rs | 7 +- datafusion/physical-plan/src/sorts/sort.rs | 349 +++++++++++++++++- 3 files changed, 455 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 76abf73e0ebbe..4500e3a0775aa 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -29,7 +29,7 @@ use arrow_schema::Schema; pub use datafusion_common::hash_utils; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; -pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; +pub use datafusion_common::{ColumnStatistics, ScalarValue, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; @@ -531,6 +531,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { self.partition_statistics(args.partition()) } + /// Runtime-derived endpoints of `partition`'s output along this + /// operator's declared output ordering (i.e. each `PhysicalSortExpr` in + /// `equivalence_properties().output_ordering()`). + /// + /// Default implementation returns `Ok(None)` — appropriate for any + /// streaming operator that cannot know its own min/max before emitting + /// its first batch. + /// + /// Operators that are pipeline-breaking along their output ordering + /// (e.g. `SortExec`) may override to expose the lex-min / lex-max tuple + /// once the partition's input has been fully consumed. See the + /// [`PartitionExtrema`] type-level doc for the dual *observed*-vs-*intended* + /// semantics, the caller's progress contract, and passthrough guidance. + /// + /// Callers must drive `execute(partition)` to the point where the + /// implementing operator has filled in its slot before relying on the + /// result; until then `Ok(None)` is returned. Reading too early silently + /// degrades to "no extrema" rather than panicking. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + let _ = partition; + Ok(None) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// @@ -759,6 +785,84 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { } } +/// Whether [`PartitionExtrema::min`] / [`PartitionExtrema::max`] describe +/// the data the partition actually carries, or a (typically narrower) +/// primary range that excludes deliberately-routed extra rows. +/// +/// Implementing operators must document which variant they return so +/// consumers can pick the right interpretation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExtremaKind { + /// The reported `min` / `max` are the literal lex-min and lex-max of + /// the rows this partition has produced (or will produce, by the time + /// the upstream is fully consumed). No row in the partition lies + /// outside `[min, max]`. `SortExec` is the canonical implementer: + /// once it has read all input rows and sorted them, it knows the + /// endpoints exactly. + Observed, + /// The reported `min` / `max` describe the partition's *intended + /// primary range*. The partition's data has been **expanded** with + /// extra rows outside that range — typically a "halo" routed in by an + /// upstream range-shuffle so a downstream window or filter pass can + /// see the full neighborhood at each seam. A downstream operator is + /// contracted to read the primary range from + /// [`ExecutionPlan::runtime_partition_extrema`] and discard the halo + /// rows lying outside it. No operator in this PR returns `Expanded`; + /// the planned `RangeRepartitionExec` will. + Expanded, +} + +/// Endpoints of a single partition's output expressed in the operator's +/// declared output ordering (the `PhysicalSortExpr` list returned by +/// `equivalence_properties().output_ordering()`). +/// +/// "Extrema" is the plural of *extremum* — the lex-min and lex-max rows of +/// the partition along its output ordering. `min` and `max` are tuples of +/// values, one entry per sort key, taken from those two rows. For the +/// leading sort key these are exactly the natural min/max of that key +/// (after honoring `ASC`/`DESC` and `NULLS FIRST/LAST`). For trailing sort +/// keys the entries hold the value of that key at the lex-extreme row, not +/// that key's own natural extremum. +/// +/// `kind` distinguishes whether the reported `min` / `max` bound the data +/// the partition actually carries ([`ExtremaKind::Observed`]) or describe a +/// narrower primary range that the partition's data extends beyond +/// ([`ExtremaKind::Expanded`]). See [`ExtremaKind`] for the contract each +/// implementer signs. +/// +/// # Caller contract +/// +/// - **The upstream must have made enough progress** before +/// `runtime_partition_extrema` returns a meaningful value. For +/// pipeline-breaking implementations like `SortExec`, this means at +/// least one `execute(partition)` poll must have reached the point where +/// the input is fully consumed and the first sorted chunk is produced. +/// Until then `Ok(None)` is returned. Reading too early silently +/// degrades to "no extrema" rather than panicking. +/// - **`Ok(None)` is the streaming-operator answer.** Operators that emit +/// batches as soon as input arrives cannot know their extrema ahead of +/// time and should return `Ok(None)` — which is the default trait +/// implementation, so they need do nothing. +/// - **Passthrough**: an operator whose output ordering is the same as its +/// input's leading-key ordering may forward +/// `self.input.runtime_partition_extrema(partition)` after its own first +/// batch is emitted. Whether it does so is the operator's choice; the +/// trait does not enforce it. A passthrough preserves the upstream +/// `kind` unchanged. +#[derive(Debug, Clone)] +pub struct PartitionExtrema { + /// Whether the reported `min` / `max` are observed from the data + /// (no row lies outside the range) or an expanded interpretation + /// (rows outside the range exist as halo). See [`ExtremaKind`]. + pub kind: ExtremaKind, + /// Sort-key values at the lex-smallest row across the partition. + pub min: Vec, + /// Sort-key values at the lex-largest row across the partition. + pub max: Vec, + /// Total non-empty rows that contributed to `min`/`max`. + pub row_count: usize, +} + impl dyn ExecutionPlan { /// Returns `true` if the plan is of type `T`. /// diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6cc6e44c32cc3..f628068a3edc1 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -42,9 +42,10 @@ pub use datafusion_physical_expr::{ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; pub use crate::execution_plan::{ - ExecutionPlan, ExecutionPlanProperties, PlanProperties, collect, collect_partitioned, - displayable, execute_input_stream, execute_stream, execute_stream_partitioned, - get_plan_string, with_new_children_if_necessary, + ExecutionPlan, ExecutionPlanProperties, ExtremaKind, PartitionExtrema, + PlanProperties, collect, collect_partitioned, displayable, execute_input_stream, + execute_stream, execute_stream_partitioned, get_plan_string, + with_new_children_if_necessary, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f48bb15a7beae..227b1758aaabd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -52,13 +52,14 @@ use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, - Statistics, + ExecutionPlanProperties, ExtremaKind, PartitionExtrema, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; +use datafusion_common::ScalarValue; use datafusion_common::config::SpillCompression; use datafusion_common::{ DataFusionError, Result, assert_or_internal_err, internal_datafusion_err, @@ -73,6 +74,138 @@ use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +use std::sync::Mutex; + +/// One slot per SortExec output partition. Populated inside the sort code +/// path each time `sort_batch_chunked` produces sorted batches, and surfaced +/// via [`ExecutionPlan::runtime_partition_extrema`]. By the time downstream +/// sees the first output batch the slot for that partition is already +/// populated. +type PartitionExtremaSlot = Arc>>; +type PartitionExtremaSlots = Arc>; + +fn make_partition_extrema_slots(n_partitions: usize) -> PartitionExtremaSlots { + Arc::new( + (0..n_partitions) + .map(|_| Arc::new(Mutex::new(None))) + .collect(), + ) +} + +/// Pull row values for each sort expression at `row` of an already-sorted +/// chunk. Used to read the lex-smallest (row 0) and lex-largest (row n-1) +/// candidate tuples from each `sort_batch_chunked` invocation. +/// +/// `RecordBatch::slice` is zero-copy (bumps offsets on the underlying +/// arrays without copying data), so the cost of an evaluation is O(1) in +/// the chunk size rather than O(rows-in-chunk). +fn evaluate_sort_row( + expressions: &LexOrdering, + batch: &RecordBatch, + row: usize, +) -> Result> { + let row_batch = batch.slice(row, 1); + expressions + .iter() + .map(|sort_expr| { + let arr: ArrayRef = sort_expr.expr.evaluate(&row_batch)?.into_array(1)?; + ScalarValue::try_from_array(&arr, 0) + }) + .collect() +} + +/// Compare two `ScalarValue` tuples using `SortOptions` per key (descending +/// and `nulls_first`), so callers can pick the lex-smaller or lex-larger of +/// two endpoint candidates without an arrow round-trip. +fn lex_compare( + a: &[ScalarValue], + b: &[ScalarValue], + expressions: &LexOrdering, +) -> std::cmp::Ordering { + use std::cmp::Ordering; + for ((va, vb), sort_expr) in a.iter().zip(b.iter()).zip(expressions.iter()) { + let cmp = match (va.is_null(), vb.is_null()) { + (true, true) => Ordering::Equal, + (true, false) => { + if sort_expr.options.nulls_first { + Ordering::Less + } else { + Ordering::Greater + } + } + (false, true) => { + if sort_expr.options.nulls_first { + Ordering::Greater + } else { + Ordering::Less + } + } + (false, false) => va.partial_cmp(vb).unwrap_or(Ordering::Equal), + }; + let cmp = if sort_expr.options.descending { + cmp.reverse() + } else { + cmp + }; + if cmp != Ordering::Equal { + return cmp; + } + } + Ordering::Equal +} + +/// Fold the endpoints of one already-sorted chunk produced by +/// `sort_batch_chunked` into the running [`PartitionExtrema`] for one +/// partition. Multi-chunk sorts (sort/spill/merge path) call this once per +/// chunk; the final slot holds the global lex-min / lex-max. +fn merge_chunk_into_extrema_slot( + slot: &Mutex>, + expressions: &LexOrdering, + sorted_chunks: &[RecordBatch], +) -> Result<()> { + let total_rows: usize = sorted_chunks.iter().map(|b| b.num_rows()).sum(); + if total_rows == 0 { + return Ok(()); + } + let Some(first_chunk) = sorted_chunks.iter().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let Some(last_chunk) = sorted_chunks.iter().rev().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let chunk_min = evaluate_sort_row(expressions, first_chunk, 0)?; + let chunk_max = + evaluate_sort_row(expressions, last_chunk, last_chunk.num_rows() - 1)?; + + let mut guard = slot.lock().unwrap(); + *guard = Some(match guard.take() { + None => PartitionExtrema { + kind: ExtremaKind::Observed, + min: chunk_min, + max: chunk_max, + row_count: total_rows, + }, + Some(prev) => { + let min = if lex_compare(&chunk_min, &prev.min, expressions).is_lt() { + chunk_min + } else { + prev.min + }; + let max = if lex_compare(&chunk_max, &prev.max, expressions).is_gt() { + chunk_max + } else { + prev.max + }; + PartitionExtrema { + kind: ExtremaKind::Observed, + min, + max, + row_count: prev.row_count + total_rows, + } + } + }); + Ok(()) +} struct ExternalSorterMetrics { /// metrics @@ -262,6 +395,12 @@ struct ExternalSorter { /// How much memory to reserve for performing in-memory sort/merges /// prior to spilling. sort_spill_reservation_bytes: usize, + + /// Optional slot that `sort_batch_stream` updates after each + /// `sort_batch_chunked` call with the leading-key endpoints of the sorted + /// output. `SortExec` provides this so consumers can fetch the observed + /// extrema via [`ExecutionPlan::runtime_partition_extrema`]. + extrema_slot: Option, } impl ExternalSorter { @@ -310,9 +449,18 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + extrema_slot: None, }) } + /// Provide a slot for `sort_batch_stream` to publish runtime endpoints + /// into. Used by `SortExec` so its `runtime_partition_extrema` override + /// has a value to return. + fn with_extrema_slot(mut self, slot: PartitionExtremaSlot) -> Self { + self.extrema_slot = Some(slot); + self + } + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk @@ -670,6 +818,7 @@ impl ExternalSorter { let expressions = self.expr.clone(); let batch_size = self.batch_size; let output_row_metrics = metrics.output_rows().clone(); + let extrema_slot = self.extrema_slot.clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -677,6 +826,13 @@ impl ExternalSorter { // Sort the batch immediately and get all output batches let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?; + // Publish leading-key endpoints from this sorted chunk into the + // partition's extrema slot. Multi-chunk folds (sort/spill/merge + // path) happen inside `merge_chunk_into_extrema_slot`. + if let Some(slot) = &extrema_slot { + merge_chunk_into_extrema_slot(slot, &expressions, &sorted_batches)?; + } + // Resize the reservation to match the actual sorted output size. // Using try_resize avoids a release-then-reacquire cycle, which // matters for MemoryPool implementations where grow/shrink have @@ -867,6 +1023,10 @@ pub struct SortExec { /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. filter: Option>>, + /// Per-output-partition slot populated by the sort code path. Surfaced via + /// [`ExecutionPlan::runtime_partition_extrema`] so range-shuffle callers + /// can derive global boundaries from runtime data. + runtime_extrema: PartitionExtremaSlots, } impl SortExec { @@ -877,6 +1037,8 @@ impl SortExec { let (cache, sort_prefix) = Self::compute_properties(&input, expr.clone(), preserve_partitioning) .unwrap(); + let runtime_extrema = + make_partition_extrema_slots(cache.partitioning.partition_count()); Self { expr, input, @@ -886,6 +1048,7 @@ impl SortExec { common_sort_prefix: sort_prefix, cache: Arc::new(cache), filter: None, + runtime_extrema, } } @@ -905,6 +1068,8 @@ impl SortExec { self.preserve_partitioning = preserve_partitioning; Arc::make_mut(&mut self.cache).partitioning = Self::output_partitioning_helper(&self.input, self.preserve_partitioning); + self.runtime_extrema = + make_partition_extrema_slots(self.cache.partitioning.partition_count()); if self.fetch.is_some() { self.rebuild_filter_for_current_partitioning(); } @@ -962,6 +1127,7 @@ impl SortExec { fetch: self.fetch, cache: Arc::clone(&self.cache), filter: self.filter.clone(), + runtime_extrema: Arc::clone(&self.runtime_extrema), } } @@ -1211,6 +1377,9 @@ impl ExecutionPlan for SortExec { )?; new_sort.cache = Arc::new(cache); new_sort.common_sort_prefix = sort_prefix; + new_sort.runtime_extrema = make_partition_extrema_slots( + new_sort.cache.partitioning.partition_count(), + ); if new_sort.fetch.is_some() { new_sort.rebuild_filter_for_current_partitioning(); } @@ -1305,6 +1474,9 @@ impl ExecutionPlan for SortExec { &self.metrics_set, context.runtime_env(), )?; + if let Some(slot) = self.runtime_extrema.get(partition) { + sorter = sorter.with_extrema_slot(Arc::clone(slot)); + } Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -1336,6 +1508,20 @@ impl ExecutionPlan for SortExec { Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } + /// Returns the lex-min/lex-max observed by this sort for the requested + /// output partition, or `None` if execution has not yet folded any + /// chunk into the slot. The caller is expected to have driven enough of + /// `execute(partition)` to reach the first `sort_batch_chunked` call. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + Ok(self + .runtime_extrema + .get(partition) + .and_then(|slot| slot.lock().unwrap().clone())) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(SortExec::with_fetch(self, limit))) } @@ -3144,4 +3330,161 @@ mod tests { assert_eq!(desc.self_filters()[0].len(), 1); Ok(()) } + + #[tokio::test] + async fn test_runtime_partition_extrema_before_execute_is_none() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 2])) as ArrayRef], + )?; + let input = + TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + let sort_exec = SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + ); + + // Per the trait contract, reading before any execute() poll returns + // `Ok(None)` — the slot has not been populated yet. + assert!(sort_exec.runtime_partition_extrema(0)?.is_none()); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_after_full_sort() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + // Two batches with overlapping ranges so the in-memory merge path + // actually has work to do; min is -4, max is 9. + let batches = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 9, -2])) as ArrayRef], + )?, + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![5, -4, 0, 7])) as ArrayRef], + )?, + ]; + let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?; + + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + )); + + let result = collect(Arc::clone(&sort_exec) as _, task_ctx).await?; + assert_eq!(result.iter().map(|b| b.num_rows()).sum::(), 8); + + let extrema = sort_exec + .runtime_partition_extrema(0)? + .expect("extrema slot populated after collect()"); + assert_eq!(extrema.kind, ExtremaKind::Observed); + assert_eq!(extrema.row_count, 8); + assert_eq!(extrema.min, vec![ScalarValue::Int32(Some(-4))]); + assert_eq!(extrema.max, vec![ScalarValue::Int32(Some(9))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_descending_swaps_min_max() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 9, -2, 5])) as ArrayRef], + )?; + let input = + TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + // Descending order: the lex-smallest row is the largest value, and + // the lex-largest row is the smallest value. + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions { + descending: true, + nulls_first: false, + }, + }] + .into(), + input, + )); + + collect(Arc::clone(&sort_exec) as _, task_ctx).await?; + let extrema = sort_exec.runtime_partition_extrema(0)?.unwrap(); + assert_eq!(extrema.kind, ExtremaKind::Observed); + assert_eq!(extrema.row_count, 5); + assert_eq!(extrema.min, vec![ScalarValue::Int32(Some(9))]); + assert_eq!(extrema.max, vec![ScalarValue::Int32(Some(-2))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_per_partition() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let p0 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef], + )?; + let p1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![-5, -1, -3])) as ArrayRef], + )?; + // Two input partitions, preserve_partitioning=true → two sorted outputs. + let input = TestMemoryExec::try_new_exec( + &[vec![p0], vec![p1]], + Arc::clone(&schema), + None, + )?; + + let sort_exec = Arc::new( + SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + ) + .with_preserve_partitioning(true), + ); + + // Drive both partitions to completion. + for part in 0..2 { + let mut stream = sort_exec.execute(part, Arc::clone(&task_ctx))?; + while stream.next().await.is_some() {} + } + + let e0 = sort_exec.runtime_partition_extrema(0)?.unwrap(); + let e1 = sort_exec.runtime_partition_extrema(1)?.unwrap(); + assert_eq!(e0.kind, ExtremaKind::Observed); + assert_eq!(e1.kind, ExtremaKind::Observed); + assert_eq!(e0.min, vec![ScalarValue::Int32(Some(10))]); + assert_eq!(e0.max, vec![ScalarValue::Int32(Some(30))]); + assert_eq!(e1.min, vec![ScalarValue::Int32(Some(-5))]); + assert_eq!(e1.max, vec![ScalarValue::Int32(Some(-1))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_default_is_none() -> Result<()> { + // Generic operators should pick up the default `Ok(None)` and never + // need to override. + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let empty: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + assert!(empty.runtime_partition_extrema(0)?.is_none()); + Ok(()) + } } From 9ccb4b1f075b5ef85bbf1975f88a806ef4d36139 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 22 Jun 2026 08:43:02 -0600 Subject: [PATCH 2/3] PartitionExtrema docs: route readers through ExtremaKind, drop "intended" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `runtime_partition_extrema`: replace the "observed-vs-intended" pointer with a one-sentence handoff to `ExtremaKind::Observed` / `ExtremaKind::Expanded`. The discriminant is now the source of truth for which interpretation applies. - `ExtremaKind::Expanded`: rename "intended primary range" → "primary range". The earlier doc carried "intended" as a leftover from the observed-vs-intended framing the enum replaces; with the variant named `Expanded`, "intended" only adds an extra synonym to map. --- .../physical-plan/src/execution_plan.rs | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 4500e3a0775aa..4230a96cb945a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -541,9 +541,12 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// /// Operators that are pipeline-breaking along their output ordering /// (e.g. `SortExec`) may override to expose the lex-min / lex-max tuple - /// once the partition's input has been fully consumed. See the - /// [`PartitionExtrema`] type-level doc for the dual *observed*-vs-*intended* - /// semantics, the caller's progress contract, and passthrough guidance. + /// once the partition's input has been fully consumed. The returned + /// [`PartitionExtrema::kind`] tells the caller which interpretation + /// applies — [`ExtremaKind::Observed`] when no row falls outside the + /// range, [`ExtremaKind::Expanded`] when the partition deliberately + /// carries rows outside the range as halo. See [`PartitionExtrema`] + /// for the caller's progress contract and passthrough guidance. /// /// Callers must drive `execute(partition)` to the point where the /// implementing operator has filled in its slot before relying on the @@ -800,15 +803,14 @@ pub enum ExtremaKind { /// once it has read all input rows and sorted them, it knows the /// endpoints exactly. Observed, - /// The reported `min` / `max` describe the partition's *intended - /// primary range*. The partition's data has been **expanded** with - /// extra rows outside that range — typically a "halo" routed in by an - /// upstream range-shuffle so a downstream window or filter pass can - /// see the full neighborhood at each seam. A downstream operator is + /// The reported `min` / `max` describe the partition's primary range. + /// The partition's data has been **expanded** with extra rows outside + /// that range — typically a "halo" routed in by an upstream + /// range-shuffle so a downstream window or filter pass can see the + /// full neighborhood at each seam. A downstream operator is /// contracted to read the primary range from /// [`ExecutionPlan::runtime_partition_extrema`] and discard the halo - /// rows lying outside it. No operator in this PR returns `Expanded`; - /// the planned `RangeRepartitionExec` will. + /// rows lying outside it. Expanded, } @@ -816,19 +818,11 @@ pub enum ExtremaKind { /// declared output ordering (the `PhysicalSortExpr` list returned by /// `equivalence_properties().output_ordering()`). /// -/// "Extrema" is the plural of *extremum* — the lex-min and lex-max rows of +/// The lex-min and lex-max rows of /// the partition along its output ordering. `min` and `max` are tuples of /// values, one entry per sort key, taken from those two rows. For the /// leading sort key these are exactly the natural min/max of that key -/// (after honoring `ASC`/`DESC` and `NULLS FIRST/LAST`). For trailing sort -/// keys the entries hold the value of that key at the lex-extreme row, not -/// that key's own natural extremum. -/// -/// `kind` distinguishes whether the reported `min` / `max` bound the data -/// the partition actually carries ([`ExtremaKind::Observed`]) or describe a -/// narrower primary range that the partition's data extends beyond -/// ([`ExtremaKind::Expanded`]). See [`ExtremaKind`] for the contract each -/// implementer signs. +/// (after honoring `ASC`/`DESC` and `NULLS FIRST/LAST`). /// /// # Caller contract /// @@ -838,7 +832,7 @@ pub enum ExtremaKind { /// least one `execute(partition)` poll must have reached the point where /// the input is fully consumed and the first sorted chunk is produced. /// Until then `Ok(None)` is returned. Reading too early silently -/// degrades to "no extrema" rather than panicking. +/// degrades to "no extrema". /// - **`Ok(None)` is the streaming-operator answer.** Operators that emit /// batches as soon as input arrives cannot know their extrema ahead of /// time and should return `Ok(None)` — which is the default trait From 2cc5bd1451067ad1ab9bedd859a8e6bbb6a75833 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 22 Jun 2026 08:59:27 -0600 Subject: [PATCH 3/3] BoundedWindowAggExec: passthrough runtime_partition_extrema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BWAG processes its input in order and appends new window-result columns on the right of the existing schema. Its equivalence properties are built by extending the input's, leaving the input's leading sort expressions in place along the same column indices in the output. So along the output's declared ordering, the partition range observed (or expanded) by the upstream is the same range we emit — a clean passthrough. Skipped: CoalesceBatchesExec. The operator is deprecated since 52.0.0 ("we now use BatchCoalescer from arrow-rs instead of a dedicated operator"); coalescing now happens inside other operators' streams, so there's no dedicated plan node to override. --- .../src/windows/bounded_window_agg_exec.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b0a0330441e94..43fd710a086d9 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -36,8 +36,9 @@ use crate::windows::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, + ExecutionPlanProperties, InputOrderMode, PartitionExtrema, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -385,6 +386,18 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(Arc::new(self.statistics_helper(input_stat)?)) } + /// Passthrough: a bounded window aggregate processes the partition in + /// input order, preserves it, and appends new window-result columns on + /// the right. The leading sort expressions are the same physical + /// expressions in the input and the output, so the input's extrema + /// describe our output along the same ordering. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + self.input.runtime_partition_extrema(partition) + } + fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal }