Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_schema::Schema;
pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
pub use datafusion_common::{ColumnStatistics, ScalarValue, Statistics, internal_err};
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
pub use datafusion_expr::{Accumulator, ColumnarValue};
pub use datafusion_physical_expr::window::WindowExpr;
Expand Down Expand Up @@ -531,6 +531,35 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
self.partition_statistics(args.partition())
}

/// Runtime-derived endpoints of `partition`'s output along this
/// operator's declared output ordering (i.e. each `PhysicalSortExpr` in
/// `equivalence_properties().output_ordering()`).
///
/// Default implementation returns `Ok(None)` — appropriate for any
/// streaming operator that cannot know its own min/max before emitting
/// its first batch.
///
/// Operators that are pipeline-breaking along their output ordering
/// (e.g. `SortExec`) may override to expose the lex-min / lex-max tuple
/// once the partition's input has been fully consumed. The returned
/// [`PartitionExtrema::kind`] tells the caller which interpretation
/// applies — [`ExtremaKind::Observed`] when no row falls outside the
/// range, [`ExtremaKind::Expanded`] when the partition deliberately
/// carries rows outside the range as halo. See [`PartitionExtrema`]
/// for the caller's progress contract and passthrough guidance.
///
/// Callers must drive `execute(partition)` to the point where the
/// implementing operator has filled in its slot before relying on the
/// result; until then `Ok(None)` is returned. Reading too early silently
/// degrades to "no extrema" rather than panicking.
fn runtime_partition_extrema(
&self,
partition: usize,
) -> Result<Option<PartitionExtrema>> {
let _ = partition;
Ok(None)
}

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
///
Expand Down Expand Up @@ -759,6 +788,75 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
}
}

/// Whether [`PartitionExtrema::min`] / [`PartitionExtrema::max`] describe
/// the data the partition actually carries, or a (typically narrower)
/// primary range that excludes deliberately-routed extra rows.
///
/// Implementing operators must document which variant they return so
/// consumers can pick the right interpretation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExtremaKind {
/// The reported `min` / `max` are the literal lex-min and lex-max of
/// the rows this partition has produced (or will produce, by the time
/// the upstream is fully consumed). No row in the partition lies
/// outside `[min, max]`. `SortExec` is the canonical implementer:
/// once it has read all input rows and sorted them, it knows the
/// endpoints exactly.
Observed,
/// The reported `min` / `max` describe the partition's primary range.
/// The partition's data has been **expanded** with extra rows outside
/// that range — typically a "halo" routed in by an upstream
/// range-shuffle so a downstream window or filter pass can see the
/// full neighborhood at each seam. A downstream operator is
/// contracted to read the primary range from
/// [`ExecutionPlan::runtime_partition_extrema`] and discard the halo
/// rows lying outside it.
Expanded,
}

/// Endpoints of a single partition's output expressed in the operator's
/// declared output ordering (the `PhysicalSortExpr` list returned by
/// `equivalence_properties().output_ordering()`).
///
/// The lex-min and lex-max rows of
/// the partition along its output ordering. `min` and `max` are tuples of
/// values, one entry per sort key, taken from those two rows. For the
/// leading sort key these are exactly the natural min/max of that key
/// (after honoring `ASC`/`DESC` and `NULLS FIRST/LAST`).
///
/// # Caller contract
///
/// - **The upstream must have made enough progress** before
/// `runtime_partition_extrema` returns a meaningful value. For
/// pipeline-breaking implementations like `SortExec`, this means at
/// least one `execute(partition)` poll must have reached the point where
/// the input is fully consumed and the first sorted chunk is produced.
/// Until then `Ok(None)` is returned. Reading too early silently
/// degrades to "no extrema".
/// - **`Ok(None)` is the streaming-operator answer.** Operators that emit
/// batches as soon as input arrives cannot know their extrema ahead of
/// time and should return `Ok(None)` — which is the default trait
/// implementation, so they need do nothing.
/// - **Passthrough**: an operator whose output ordering is the same as its
/// input's leading-key ordering may forward
/// `self.input.runtime_partition_extrema(partition)` after its own first
/// batch is emitted. Whether it does so is the operator's choice; the
/// trait does not enforce it. A passthrough preserves the upstream
/// `kind` unchanged.
#[derive(Debug, Clone)]
pub struct PartitionExtrema {
/// Whether the reported `min` / `max` are observed from the data
/// (no row lies outside the range) or an expanded interpretation
/// (rows outside the range exist as halo). See [`ExtremaKind`].
pub kind: ExtremaKind,
/// Sort-key values at the lex-smallest row across the partition.
pub min: Vec<ScalarValue>,
/// Sort-key values at the lex-largest row across the partition.
pub max: Vec<ScalarValue>,
/// Total non-empty rows that contributed to `min`/`max`.
pub row_count: usize,
}

impl dyn ExecutionPlan {
/// Returns `true` if the plan is of type `T`.
///
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ pub use datafusion_physical_expr::{

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::execution_plan::{
ExecutionPlan, ExecutionPlanProperties, PlanProperties, collect, collect_partitioned,
displayable, execute_input_stream, execute_stream, execute_stream_partitioned,
get_plan_string, with_new_children_if_necessary,
ExecutionPlan, ExecutionPlanProperties, ExtremaKind, PartitionExtrema,
PlanProperties, collect, collect_partitioned, displayable, execute_input_stream,
execute_stream, execute_stream_partitioned, get_plan_string,
with_new_children_if_necessary,
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
Expand Down
Loading
Loading