feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090
Open
avantgardnerio wants to merge 3 commits into
Open
feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090avantgardnerio wants to merge 3 commits into
avantgardnerio wants to merge 3 commits into
Conversation
…ded" - `runtime_partition_extrema`: replace the "observed-vs-intended" pointer with a one-sentence handoff to `ExtremaKind::Observed` / `ExtremaKind::Expanded`. The discriminant is now the source of truth for which interpretation applies. - `ExtremaKind::Expanded`: rename "intended primary range" → "primary range". The earlier doc carried "intended" as a leftover from the observed-vs-intended framing the enum replaces; with the variant named `Expanded`, "intended" only adds an extra synonym to map.
BWAG processes its input in order and appends new window-result
columns on the right of the existing schema. Its equivalence
properties are built by extending the input's, leaving the input's
leading sort expressions in place along the same column indices in
the output. So along the output's declared ordering, the partition
range observed (or expanded) by the upstream is the same range we
emit — a clean passthrough.
Skipped: CoalesceBatchesExec. The operator is deprecated since 52.0.0
("we now use BatchCoalescer from arrow-rs instead of a dedicated
operator"); coalescing now happens inside other operators' streams,
so there's no dedicated plan node to override.
This was referenced Jun 22, 2026
Contributor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Implements the proposal in #23089.
Rationale for this change
See #23089 for the full design rationale, including the dual-semantics motivation, alternatives considered, and coexistence with
Partitioning::Range. Short version: range-aware operators (parallel window functions, future dynamic-range repartitioning, range-elimination optimizations) want to ask anExecutionPlanfor the lex-min / lex-max of a partition's output along its declared ordering. Today there's no way to ask.What changes are included in this PR?
Pure addition; zero behavior change in any path that doesn't call the new method.
ExecutionPlan::runtime_partition_extrema(&self, partition) -> Result<Option<PartitionExtrema>>— defaultOk(None).PartitionExtrema { kind, min, max, row_count }andenum ExtremaKind { Observed, Expanded }.Observed(the only kind any operator in this PR returns) means the reported range literally bounds the partition's data.Expandedis reserved for future operators that deliberately route rows outside the reported range as a "halo" for a downstream filter to strip. The dual semantics live on the enum so passthroughs that don't care don't have tomatch.SortExecoverride: a per-partition slot is populated inside the sort code path (eachsort_batch_chunkedcall folds first/last sorted rows into the slot, zero-copy viaRecordBatch::slice). Once execution has consumed the input, the slot holds the lex-min / lex-max along the declared ordering.BoundedWindowAggExecoverride: passthrough. BWAG extends its input's equivalence properties and appends new window-result columns on the right of the schema, so the leading sort exprs remain stable in the output along the same column indices.Are these changes tested?
7 unit tests in
datafusion/physical-plan/src/sorts/sort.rs::tests:test_runtime_partition_extrema_before_execute_is_none— caller contract: reading without a poll returnsOk(None).test_runtime_partition_extrema_after_full_sort— two batches, in-memory merge path; extrema match expected lex-min / lex-max withkind = Observed.test_runtime_partition_extrema_descending_swaps_min_max— DESC sort:minis the largest value,maxis the smallest.test_runtime_partition_extrema_per_partition— two input partitions withpreserve_partitioning=true: each output partition's extrema track its own range.test_runtime_partition_extrema_default_is_none— default trait impl returnsOk(None)on a non-overriding operator (EmptyExec).Are there any user-facing changes?
datafusion::physical_plan:PartitionExtrema,ExtremaKind.ExecutionPlan::runtime_partition_extremawith a defaultOk(None). Existing customExecutionPlanimplementations are not required to change.