diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 76abf73e0ebbe..4230a96cb945a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -29,7 +29,7 @@ use arrow_schema::Schema; pub use datafusion_common::hash_utils; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; -pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; +pub use datafusion_common::{ColumnStatistics, ScalarValue, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; @@ -531,6 +531,35 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { self.partition_statistics(args.partition()) } + /// Runtime-derived endpoints of `partition`'s output along this + /// operator's declared output ordering (i.e. each `PhysicalSortExpr` in + /// `equivalence_properties().output_ordering()`). + /// + /// Default implementation returns `Ok(None)` — appropriate for any + /// streaming operator that cannot know its own min/max before emitting + /// its first batch. + /// + /// Operators that are pipeline-breaking along their output ordering + /// (e.g. `SortExec`) may override to expose the lex-min / lex-max tuple + /// once the partition's input has been fully consumed. The returned + /// [`PartitionExtrema::kind`] tells the caller which interpretation + /// applies — [`ExtremaKind::Observed`] when no row falls outside the + /// range, [`ExtremaKind::Expanded`] when the partition deliberately + /// carries rows outside the range as halo. See [`PartitionExtrema`] + /// for the caller's progress contract and passthrough guidance. + /// + /// Callers must drive `execute(partition)` to the point where the + /// implementing operator has filled in its slot before relying on the + /// result; until then `Ok(None)` is returned. Reading too early silently + /// degrades to "no extrema" rather than panicking. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + let _ = partition; + Ok(None) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// @@ -759,6 +788,75 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { } } +/// Whether [`PartitionExtrema::min`] / [`PartitionExtrema::max`] describe +/// the data the partition actually carries, or a (typically narrower) +/// primary range that excludes deliberately-routed extra rows. +/// +/// Implementing operators must document which variant they return so +/// consumers can pick the right interpretation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExtremaKind { + /// The reported `min` / `max` are the literal lex-min and lex-max of + /// the rows this partition has produced (or will produce, by the time + /// the upstream is fully consumed). No row in the partition lies + /// outside `[min, max]`. `SortExec` is the canonical implementer: + /// once it has read all input rows and sorted them, it knows the + /// endpoints exactly. + Observed, + /// The reported `min` / `max` describe the partition's primary range. + /// The partition's data has been **expanded** with extra rows outside + /// that range — typically a "halo" routed in by an upstream + /// range-shuffle so a downstream window or filter pass can see the + /// full neighborhood at each seam. A downstream operator is + /// contracted to read the primary range from + /// [`ExecutionPlan::runtime_partition_extrema`] and discard the halo + /// rows lying outside it. + Expanded, +} + +/// Endpoints of a single partition's output expressed in the operator's +/// declared output ordering (the `PhysicalSortExpr` list returned by +/// `equivalence_properties().output_ordering()`). +/// +/// The lex-min and lex-max rows of +/// the partition along its output ordering. `min` and `max` are tuples of +/// values, one entry per sort key, taken from those two rows. For the +/// leading sort key these are exactly the natural min/max of that key +/// (after honoring `ASC`/`DESC` and `NULLS FIRST/LAST`). +/// +/// # Caller contract +/// +/// - **The upstream must have made enough progress** before +/// `runtime_partition_extrema` returns a meaningful value. For +/// pipeline-breaking implementations like `SortExec`, this means at +/// least one `execute(partition)` poll must have reached the point where +/// the input is fully consumed and the first sorted chunk is produced. +/// Until then `Ok(None)` is returned. Reading too early silently +/// degrades to "no extrema". +/// - **`Ok(None)` is the streaming-operator answer.** Operators that emit +/// batches as soon as input arrives cannot know their extrema ahead of +/// time and should return `Ok(None)` — which is the default trait +/// implementation, so they need do nothing. +/// - **Passthrough**: an operator whose output ordering is the same as its +/// input's leading-key ordering may forward +/// `self.input.runtime_partition_extrema(partition)` after its own first +/// batch is emitted. Whether it does so is the operator's choice; the +/// trait does not enforce it. A passthrough preserves the upstream +/// `kind` unchanged. +#[derive(Debug, Clone)] +pub struct PartitionExtrema { + /// Whether the reported `min` / `max` are observed from the data + /// (no row lies outside the range) or an expanded interpretation + /// (rows outside the range exist as halo). See [`ExtremaKind`]. + pub kind: ExtremaKind, + /// Sort-key values at the lex-smallest row across the partition. + pub min: Vec, + /// Sort-key values at the lex-largest row across the partition. + pub max: Vec, + /// Total non-empty rows that contributed to `min`/`max`. + pub row_count: usize, +} + impl dyn ExecutionPlan { /// Returns `true` if the plan is of type `T`. /// diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6cc6e44c32cc3..f628068a3edc1 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -42,9 +42,10 @@ pub use datafusion_physical_expr::{ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; pub use crate::execution_plan::{ - ExecutionPlan, ExecutionPlanProperties, PlanProperties, collect, collect_partitioned, - displayable, execute_input_stream, execute_stream, execute_stream_partitioned, - get_plan_string, with_new_children_if_necessary, + ExecutionPlan, ExecutionPlanProperties, ExtremaKind, PartitionExtrema, + PlanProperties, collect, collect_partitioned, displayable, execute_input_stream, + execute_stream, execute_stream_partitioned, get_plan_string, + with_new_children_if_necessary, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f48bb15a7beae..227b1758aaabd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -52,13 +52,14 @@ use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, - Statistics, + ExecutionPlanProperties, ExtremaKind, PartitionExtrema, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; +use datafusion_common::ScalarValue; use datafusion_common::config::SpillCompression; use datafusion_common::{ DataFusionError, Result, assert_or_internal_err, internal_datafusion_err, @@ -73,6 +74,138 @@ use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +use std::sync::Mutex; + +/// One slot per SortExec output partition. Populated inside the sort code +/// path each time `sort_batch_chunked` produces sorted batches, and surfaced +/// via [`ExecutionPlan::runtime_partition_extrema`]. By the time downstream +/// sees the first output batch the slot for that partition is already +/// populated. +type PartitionExtremaSlot = Arc>>; +type PartitionExtremaSlots = Arc>; + +fn make_partition_extrema_slots(n_partitions: usize) -> PartitionExtremaSlots { + Arc::new( + (0..n_partitions) + .map(|_| Arc::new(Mutex::new(None))) + .collect(), + ) +} + +/// Pull row values for each sort expression at `row` of an already-sorted +/// chunk. Used to read the lex-smallest (row 0) and lex-largest (row n-1) +/// candidate tuples from each `sort_batch_chunked` invocation. +/// +/// `RecordBatch::slice` is zero-copy (bumps offsets on the underlying +/// arrays without copying data), so the cost of an evaluation is O(1) in +/// the chunk size rather than O(rows-in-chunk). +fn evaluate_sort_row( + expressions: &LexOrdering, + batch: &RecordBatch, + row: usize, +) -> Result> { + let row_batch = batch.slice(row, 1); + expressions + .iter() + .map(|sort_expr| { + let arr: ArrayRef = sort_expr.expr.evaluate(&row_batch)?.into_array(1)?; + ScalarValue::try_from_array(&arr, 0) + }) + .collect() +} + +/// Compare two `ScalarValue` tuples using `SortOptions` per key (descending +/// and `nulls_first`), so callers can pick the lex-smaller or lex-larger of +/// two endpoint candidates without an arrow round-trip. +fn lex_compare( + a: &[ScalarValue], + b: &[ScalarValue], + expressions: &LexOrdering, +) -> std::cmp::Ordering { + use std::cmp::Ordering; + for ((va, vb), sort_expr) in a.iter().zip(b.iter()).zip(expressions.iter()) { + let cmp = match (va.is_null(), vb.is_null()) { + (true, true) => Ordering::Equal, + (true, false) => { + if sort_expr.options.nulls_first { + Ordering::Less + } else { + Ordering::Greater + } + } + (false, true) => { + if sort_expr.options.nulls_first { + Ordering::Greater + } else { + Ordering::Less + } + } + (false, false) => va.partial_cmp(vb).unwrap_or(Ordering::Equal), + }; + let cmp = if sort_expr.options.descending { + cmp.reverse() + } else { + cmp + }; + if cmp != Ordering::Equal { + return cmp; + } + } + Ordering::Equal +} + +/// Fold the endpoints of one already-sorted chunk produced by +/// `sort_batch_chunked` into the running [`PartitionExtrema`] for one +/// partition. Multi-chunk sorts (sort/spill/merge path) call this once per +/// chunk; the final slot holds the global lex-min / lex-max. +fn merge_chunk_into_extrema_slot( + slot: &Mutex>, + expressions: &LexOrdering, + sorted_chunks: &[RecordBatch], +) -> Result<()> { + let total_rows: usize = sorted_chunks.iter().map(|b| b.num_rows()).sum(); + if total_rows == 0 { + return Ok(()); + } + let Some(first_chunk) = sorted_chunks.iter().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let Some(last_chunk) = sorted_chunks.iter().rev().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let chunk_min = evaluate_sort_row(expressions, first_chunk, 0)?; + let chunk_max = + evaluate_sort_row(expressions, last_chunk, last_chunk.num_rows() - 1)?; + + let mut guard = slot.lock().unwrap(); + *guard = Some(match guard.take() { + None => PartitionExtrema { + kind: ExtremaKind::Observed, + min: chunk_min, + max: chunk_max, + row_count: total_rows, + }, + Some(prev) => { + let min = if lex_compare(&chunk_min, &prev.min, expressions).is_lt() { + chunk_min + } else { + prev.min + }; + let max = if lex_compare(&chunk_max, &prev.max, expressions).is_gt() { + chunk_max + } else { + prev.max + }; + PartitionExtrema { + kind: ExtremaKind::Observed, + min, + max, + row_count: prev.row_count + total_rows, + } + } + }); + Ok(()) +} struct ExternalSorterMetrics { /// metrics @@ -262,6 +395,12 @@ struct ExternalSorter { /// How much memory to reserve for performing in-memory sort/merges /// prior to spilling. sort_spill_reservation_bytes: usize, + + /// Optional slot that `sort_batch_stream` updates after each + /// `sort_batch_chunked` call with the leading-key endpoints of the sorted + /// output. `SortExec` provides this so consumers can fetch the observed + /// extrema via [`ExecutionPlan::runtime_partition_extrema`]. + extrema_slot: Option, } impl ExternalSorter { @@ -310,9 +449,18 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + extrema_slot: None, }) } + /// Provide a slot for `sort_batch_stream` to publish runtime endpoints + /// into. Used by `SortExec` so its `runtime_partition_extrema` override + /// has a value to return. + fn with_extrema_slot(mut self, slot: PartitionExtremaSlot) -> Self { + self.extrema_slot = Some(slot); + self + } + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk @@ -670,6 +818,7 @@ impl ExternalSorter { let expressions = self.expr.clone(); let batch_size = self.batch_size; let output_row_metrics = metrics.output_rows().clone(); + let extrema_slot = self.extrema_slot.clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -677,6 +826,13 @@ impl ExternalSorter { // Sort the batch immediately and get all output batches let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?; + // Publish leading-key endpoints from this sorted chunk into the + // partition's extrema slot. Multi-chunk folds (sort/spill/merge + // path) happen inside `merge_chunk_into_extrema_slot`. + if let Some(slot) = &extrema_slot { + merge_chunk_into_extrema_slot(slot, &expressions, &sorted_batches)?; + } + // Resize the reservation to match the actual sorted output size. // Using try_resize avoids a release-then-reacquire cycle, which // matters for MemoryPool implementations where grow/shrink have @@ -867,6 +1023,10 @@ pub struct SortExec { /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. filter: Option>>, + /// Per-output-partition slot populated by the sort code path. Surfaced via + /// [`ExecutionPlan::runtime_partition_extrema`] so range-shuffle callers + /// can derive global boundaries from runtime data. + runtime_extrema: PartitionExtremaSlots, } impl SortExec { @@ -877,6 +1037,8 @@ impl SortExec { let (cache, sort_prefix) = Self::compute_properties(&input, expr.clone(), preserve_partitioning) .unwrap(); + let runtime_extrema = + make_partition_extrema_slots(cache.partitioning.partition_count()); Self { expr, input, @@ -886,6 +1048,7 @@ impl SortExec { common_sort_prefix: sort_prefix, cache: Arc::new(cache), filter: None, + runtime_extrema, } } @@ -905,6 +1068,8 @@ impl SortExec { self.preserve_partitioning = preserve_partitioning; Arc::make_mut(&mut self.cache).partitioning = Self::output_partitioning_helper(&self.input, self.preserve_partitioning); + self.runtime_extrema = + make_partition_extrema_slots(self.cache.partitioning.partition_count()); if self.fetch.is_some() { self.rebuild_filter_for_current_partitioning(); } @@ -962,6 +1127,7 @@ impl SortExec { fetch: self.fetch, cache: Arc::clone(&self.cache), filter: self.filter.clone(), + runtime_extrema: Arc::clone(&self.runtime_extrema), } } @@ -1211,6 +1377,9 @@ impl ExecutionPlan for SortExec { )?; new_sort.cache = Arc::new(cache); new_sort.common_sort_prefix = sort_prefix; + new_sort.runtime_extrema = make_partition_extrema_slots( + new_sort.cache.partitioning.partition_count(), + ); if new_sort.fetch.is_some() { new_sort.rebuild_filter_for_current_partitioning(); } @@ -1305,6 +1474,9 @@ impl ExecutionPlan for SortExec { &self.metrics_set, context.runtime_env(), )?; + if let Some(slot) = self.runtime_extrema.get(partition) { + sorter = sorter.with_extrema_slot(Arc::clone(slot)); + } Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -1336,6 +1508,20 @@ impl ExecutionPlan for SortExec { Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } + /// Returns the lex-min/lex-max observed by this sort for the requested + /// output partition, or `None` if execution has not yet folded any + /// chunk into the slot. The caller is expected to have driven enough of + /// `execute(partition)` to reach the first `sort_batch_chunked` call. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + Ok(self + .runtime_extrema + .get(partition) + .and_then(|slot| slot.lock().unwrap().clone())) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(SortExec::with_fetch(self, limit))) } @@ -3144,4 +3330,161 @@ mod tests { assert_eq!(desc.self_filters()[0].len(), 1); Ok(()) } + + #[tokio::test] + async fn test_runtime_partition_extrema_before_execute_is_none() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 2])) as ArrayRef], + )?; + let input = + TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + let sort_exec = SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + ); + + // Per the trait contract, reading before any execute() poll returns + // `Ok(None)` — the slot has not been populated yet. + assert!(sort_exec.runtime_partition_extrema(0)?.is_none()); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_after_full_sort() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + // Two batches with overlapping ranges so the in-memory merge path + // actually has work to do; min is -4, max is 9. + let batches = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 9, -2])) as ArrayRef], + )?, + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![5, -4, 0, 7])) as ArrayRef], + )?, + ]; + let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?; + + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + )); + + let result = collect(Arc::clone(&sort_exec) as _, task_ctx).await?; + assert_eq!(result.iter().map(|b| b.num_rows()).sum::(), 8); + + let extrema = sort_exec + .runtime_partition_extrema(0)? + .expect("extrema slot populated after collect()"); + assert_eq!(extrema.kind, ExtremaKind::Observed); + assert_eq!(extrema.row_count, 8); + assert_eq!(extrema.min, vec![ScalarValue::Int32(Some(-4))]); + assert_eq!(extrema.max, vec![ScalarValue::Int32(Some(9))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_descending_swaps_min_max() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![3, 1, 9, -2, 5])) as ArrayRef], + )?; + let input = + TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + // Descending order: the lex-smallest row is the largest value, and + // the lex-largest row is the smallest value. + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions { + descending: true, + nulls_first: false, + }, + }] + .into(), + input, + )); + + collect(Arc::clone(&sort_exec) as _, task_ctx).await?; + let extrema = sort_exec.runtime_partition_extrema(0)?.unwrap(); + assert_eq!(extrema.kind, ExtremaKind::Observed); + assert_eq!(extrema.row_count, 5); + assert_eq!(extrema.min, vec![ScalarValue::Int32(Some(9))]); + assert_eq!(extrema.max, vec![ScalarValue::Int32(Some(-2))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_per_partition() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let p0 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef], + )?; + let p1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![-5, -1, -3])) as ArrayRef], + )?; + // Two input partitions, preserve_partitioning=true → two sorted outputs. + let input = TestMemoryExec::try_new_exec( + &[vec![p0], vec![p1]], + Arc::clone(&schema), + None, + )?; + + let sort_exec = Arc::new( + SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }] + .into(), + input, + ) + .with_preserve_partitioning(true), + ); + + // Drive both partitions to completion. + for part in 0..2 { + let mut stream = sort_exec.execute(part, Arc::clone(&task_ctx))?; + while stream.next().await.is_some() {} + } + + let e0 = sort_exec.runtime_partition_extrema(0)?.unwrap(); + let e1 = sort_exec.runtime_partition_extrema(1)?.unwrap(); + assert_eq!(e0.kind, ExtremaKind::Observed); + assert_eq!(e1.kind, ExtremaKind::Observed); + assert_eq!(e0.min, vec![ScalarValue::Int32(Some(10))]); + assert_eq!(e0.max, vec![ScalarValue::Int32(Some(30))]); + assert_eq!(e1.min, vec![ScalarValue::Int32(Some(-5))]); + assert_eq!(e1.max, vec![ScalarValue::Int32(Some(-1))]); + Ok(()) + } + + #[tokio::test] + async fn test_runtime_partition_extrema_default_is_none() -> Result<()> { + // Generic operators should pick up the default `Ok(None)` and never + // need to override. + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, true)])); + let empty: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + assert!(empty.runtime_partition_extrema(0)?.is_none()); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b0a0330441e94..43fd710a086d9 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -36,8 +36,9 @@ use crate::windows::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, + ExecutionPlanProperties, InputOrderMode, PartitionExtrema, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -385,6 +386,18 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(Arc::new(self.statistics_helper(input_stat)?)) } + /// Passthrough: a bounded window aggregate processes the partition in + /// input order, preserves it, and appends new window-result columns on + /// the right. The leading sort expressions are the same physical + /// expressions in the input and the output, so the input's extrema + /// describe our output along the same ordering. + fn runtime_partition_extrema( + &self, + partition: usize, + ) -> Result> { + self.input.runtime_partition_extrema(partition) + } + fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal }