Skip to content

feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090

Open
avantgardnerio wants to merge 3 commits into
apache:mainfrom
coralogix:brent/partition-extrema
Open

feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090
avantgardnerio wants to merge 3 commits into
apache:mainfrom
coralogix:brent/partition-extrema

Conversation

@avantgardnerio

@avantgardnerio avantgardnerio commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Implements the proposal in #23089.

Rationale for this change

See #23089 for the full design rationale, including the dual-semantics motivation, alternatives considered, and coexistence with Partitioning::Range. Short version: range-aware operators (parallel window functions, future dynamic-range repartitioning, range-elimination optimizations) want to ask an ExecutionPlan for the lex-min / lex-max of a partition's output along its declared ordering. Today there's no way to ask.

What changes are included in this PR?

Pure addition; zero behavior change in any path that doesn't call the new method.

  • ExecutionPlan::runtime_partition_extrema(&self, partition) -> Result<Option<PartitionExtrema>> — default Ok(None).
  • PartitionExtrema { kind, min, max, row_count } and enum ExtremaKind { Observed, Expanded }. Observed (the only kind any operator in this PR returns) means the reported range literally bounds the partition's data. Expanded is reserved for future operators that deliberately route rows outside the reported range as a "halo" for a downstream filter to strip. The dual semantics live on the enum so passthroughs that don't care don't have to match.
  • SortExec override: a per-partition slot is populated inside the sort code path (each sort_batch_chunked call folds first/last sorted rows into the slot, zero-copy via RecordBatch::slice). Once execution has consumed the input, the slot holds the lex-min / lex-max along the declared ordering.
  • BoundedWindowAggExec override: passthrough. BWAG extends its input's equivalence properties and appends new window-result columns on the right of the schema, so the leading sort exprs remain stable in the output along the same column indices.

Are these changes tested?

7 unit tests in datafusion/physical-plan/src/sorts/sort.rs::tests:

  • test_runtime_partition_extrema_before_execute_is_none — caller contract: reading without a poll returns Ok(None).
  • test_runtime_partition_extrema_after_full_sort — two batches, in-memory merge path; extrema match expected lex-min / lex-max with kind = Observed.
  • test_runtime_partition_extrema_descending_swaps_min_max — DESC sort: min is the largest value, max is the smallest.
  • test_runtime_partition_extrema_per_partition — two input partitions with preserve_partitioning=true: each output partition's extrema track its own range.
  • test_runtime_partition_extrema_default_is_none — default trait impl returns Ok(None) on a non-overriding operator (EmptyExec).
  • Plus two more under earlier commits, covering the chunk-fold path.

Are there any user-facing changes?

  • New public types in datafusion::physical_plan: PartitionExtrema, ExtremaKind.
  • New trait method ExecutionPlan::runtime_partition_extrema with a default Ok(None). Existing custom ExecutionPlan implementations are not required to change.
  • No SQL surface changes.

…ded"

- `runtime_partition_extrema`: replace the "observed-vs-intended" pointer
  with a one-sentence handoff to `ExtremaKind::Observed` /
  `ExtremaKind::Expanded`. The discriminant is now the source of truth
  for which interpretation applies.
- `ExtremaKind::Expanded`: rename "intended primary range" → "primary
  range". The earlier doc carried "intended" as a leftover from the
  observed-vs-intended framing the enum replaces; with the variant
  named `Expanded`, "intended" only adds an extra synonym to map.
BWAG processes its input in order and appends new window-result
columns on the right of the existing schema. Its equivalence
properties are built by extending the input's, leaving the input's
leading sort expressions in place along the same column indices in
the output. So along the output's declared ordering, the partition
range observed (or expanded) by the upstream is the same range we
emit — a clean passthrough.

Skipped: CoalesceBatchesExec. The operator is deprecated since 52.0.0
("we now use BatchCoalescer from arrow-rs instead of a dedicated
operator"); coalescing now happens inside other operators' streams,
so there's no dedicated plan node to override.
@alamb

alamb commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

I left comments in the linked tickets like #23093 / #23094

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants