Status: Implementation PR open at #23090. This issue stays open as the design home — discuss the API here, code review there.
What is the problem the feature is trying to solve?
Several downstream consumers want to know, at runtime, the lex-min / lex-max of a partition's output along its declared output ordering. Today there is no way to ask an ExecutionPlan for this. The need shows up in at least three places:
- Parallelizing single-partition window functions (RANGE frames, no PARTITION BY). To fan a single sorted stream into N parallel output partitions without a partition key, a planner needs the global value range so it can compute interior split points. Statistics-based estimates from the catalog are too loose; we need what the sort actually produced. This is the use case that motivated the work — see the existing draft PR #23026 for the full picture.
- Range-elimination rules. A future optimizer could prune downstream filters or skip irrelevant data when it can prove a partition's range doesn't intersect a predicate's range. Needs observed extrema from the upstream.
- Dynamic-range repartitioning, complementing
Partitioning::Range. #22207 landed Partitioning::Range { ordering, split_points } for the declarative case — split points known at plan time, e.g. from a TableProvider declaring its output partitioning. A natural follow-up is a dynamic variant where split points are discovered at execute time from an upstream's actual data range. Runtime partition extrema are the missing primitive: the dynamic variant routes rows by a runtime-computed boundary set, and downstream operators learn the per-bucket boundaries through the same trait method.
Today the only way to get this is to drain the operator into a separate aggregate pass — which defeats the point.
Describe the solution you'd like
A small addition to ExecutionPlan:
pub enum ExtremaKind {
/// `min` / `max` literally bound the partition's data.
/// `SortExec` is the canonical implementer.
Observed,
/// `min` / `max` describe the partition's primary range.
/// The partition deliberately carries rows outside that range
/// (a "halo"); a downstream operator is contracted to filter
/// back to the range. No operator in this branch returns
/// `Expanded`; a future dynamic-range repartition (e.g. a
/// `Partitioning::DynamicRange` routed through `RepartitionExec`)
/// would.
Expanded,
}
pub struct PartitionExtrema {
pub kind: ExtremaKind,
pub min: Vec<ScalarValue>,
pub max: Vec<ScalarValue>,
pub row_count: usize,
}
// New trait method on ExecutionPlan, default Ok(None):
fn runtime_partition_extrema(
&self,
partition: usize,
) -> Result<Option<PartitionExtrema>>;
Initial overrides:
SortExec populates a per-partition slot inside the sort code path; the override returns the slot's contents once execution has folded any sorted chunk into it. Reading before the upstream has been driven enough returns Ok(None) rather than panicking.
BoundedWindowAggExec passthroughs to its input (BWAG extends input equivalence properties, preserving the leading sort exprs along the same column indices in its output).
Implementation in PR #23090.
Design points worth debating
- One trait method, two semantics, encoded by
ExtremaKind. The downstream consumer of "Expanded" extrema and the upstream consumer of "Observed" extrema want exactly the same shape — Vec<ScalarValue> per endpoint, matched against the output ordering, populated once the partition is ready to be read. Splitting into two trait methods would force every passthrough operator to implement both with identical bodies. The contract is documented at each implementer's call site. Alternative: a pure enum with arm-specific data — rejected because passthroughs that don't care about the variant would still need to match. See the type-level rustdoc in the branch for the full rationale.
- Caller contract: silent
Ok(None) if read too early, not a panic. Reading before the operator's slot is populated returns Ok(None). The progress contract is that callers drive execute(partition) past the point where the implementing operator can know. Tradeoff: easier to misuse silently vs. panicking. The branch's rustdoc spells this out as a doc-invariant rather than a runtime gate.
- Passthrough policy. Operators whose output ordering matches the input's leading-key ordering MAY forward via
self.input.runtime_partition_extrema(...), preserving kind unchanged. The trait does not enforce this — it's per-operator. BoundedWindowAggExec is wired in this branch; ProjectionExec (conditional on the leading sort col surviving) and SortPreservingMergeExec (an N→1 reducer, not a passthrough) are follow-ups.
Describe alternatives you've considered
- Statistics-shaped runtime stats. First version of the spike used
Statistics/ColumnStatistics. Doesn't fit: sort keys are arbitrary expressions, not just columns, and the lex-extreme row's trailing-key values aren't natural extrema of those keys.
- Async signaling on the trait method. Overengineered for our case — every implementer that has an answer has it at a known point in execution (post-buffer for
SortExec). The caller contract handles "not yet" via Ok(None).
- Side-channel between operators (instead of a uniform trait). Threading boundary state through specific operator pairs is fragile and doesn't generalize to other range-aware optimizations (point 2 above).
Coexistence with existing work
Partitioning::Range / RangePartitioning { ordering, split_points } from #22207, #22607, #22777 (@gene-bordegaray, Datadog) covers the declarative case: split points known at plan time. PartitionExtrema doesn't change that path. What it enables is a future dynamic sibling — where the boundary set is discovered at runtime — without inventing a parallel runtime-stats facility. @stuhood's comment on #22395 about overlapping output partitioning is the design point a dynamic-range variant could explore.
Additional context
This issue is the discussion home for the API. The implementation is PR #23090 — pure addition, zero behavior change, default Ok(None) so nothing currently in tree calls or implements the method. Window-function parallelization and a dynamic-range repartitioning variant land as separate follow-ups on top.
What is the problem the feature is trying to solve?
Several downstream consumers want to know, at runtime, the lex-min / lex-max of a partition's output along its declared output ordering. Today there is no way to ask an
ExecutionPlanfor this. The need shows up in at least three places:Partitioning::Range. #22207 landedPartitioning::Range { ordering, split_points }for the declarative case — split points known at plan time, e.g. from aTableProviderdeclaring its output partitioning. A natural follow-up is a dynamic variant where split points are discovered at execute time from an upstream's actual data range. Runtime partition extrema are the missing primitive: the dynamic variant routes rows by a runtime-computed boundary set, and downstream operators learn the per-bucket boundaries through the same trait method.Today the only way to get this is to drain the operator into a separate aggregate pass — which defeats the point.
Describe the solution you'd like
A small addition to
ExecutionPlan:Initial overrides:
SortExecpopulates a per-partition slot inside the sort code path; the override returns the slot's contents once execution has folded any sorted chunk into it. Reading before the upstream has been driven enough returnsOk(None)rather than panicking.BoundedWindowAggExecpassthroughs to its input (BWAG extends input equivalence properties, preserving the leading sort exprs along the same column indices in its output).Implementation in PR #23090.
Design points worth debating
ExtremaKind. The downstream consumer of "Expanded" extrema and the upstream consumer of "Observed" extrema want exactly the same shape —Vec<ScalarValue>per endpoint, matched against the output ordering, populated once the partition is ready to be read. Splitting into two trait methods would force every passthrough operator to implement both with identical bodies. The contract is documented at each implementer's call site. Alternative: a pure enum with arm-specific data — rejected because passthroughs that don't care about the variant would still need tomatch. See the type-level rustdoc in the branch for the full rationale.Ok(None)if read too early, not a panic. Reading before the operator's slot is populated returnsOk(None). The progress contract is that callers driveexecute(partition)past the point where the implementing operator can know. Tradeoff: easier to misuse silently vs. panicking. The branch's rustdoc spells this out as a doc-invariant rather than a runtime gate.self.input.runtime_partition_extrema(...), preservingkindunchanged. The trait does not enforce this — it's per-operator.BoundedWindowAggExecis wired in this branch;ProjectionExec(conditional on the leading sort col surviving) andSortPreservingMergeExec(an N→1 reducer, not a passthrough) are follow-ups.Describe alternatives you've considered
Statistics/ColumnStatistics. Doesn't fit: sort keys are arbitrary expressions, not just columns, and the lex-extreme row's trailing-key values aren't natural extrema of those keys.SortExec). The caller contract handles "not yet" viaOk(None).Coexistence with existing work
Partitioning::Range/RangePartitioning { ordering, split_points }from #22207, #22607, #22777 (@gene-bordegaray, Datadog) covers the declarative case: split points known at plan time.PartitionExtremadoesn't change that path. What it enables is a future dynamic sibling — where the boundary set is discovered at runtime — without inventing a parallel runtime-stats facility. @stuhood's comment on #22395 about overlapping output partitioning is the design point a dynamic-range variant could explore.Additional context
This issue is the discussion home for the API. The implementation is PR #23090 — pure addition, zero behavior change, default
Ok(None)so nothing currently in tree calls or implements the method. Window-function parallelization and a dynamic-range repartitioning variant land as separate follow-ups on top.