Skip to content

API: Partitioning::DynamicRange for runtime-discovered split points #23093

Description

@avantgardnerio

Status: Implementation PR open at #23094. This issue stays open as the design home — discuss the API here, code review there.

What is the problem the feature is trying to solve?

Partitioning::Range(RangePartitioning { ordering, split_points }) landed in #22207 (@gene-bordegaray, Datadog) as the declarative form of range partitioning — split points are known at plan time, either declared by a TableProvider or computed by a planner from statistics.

What is missing is the symmetric runtime-discovered form: a sibling variant where the boundary set is only known once an upstream operator has observed its actual data range. Two concrete needs:

  1. Parallelizing single-partition window functions (RANGE frames, no PARTITION BY). A planner can declare "I want N output partitions of this sorted stream, split by sort-key range" without knowing the actual range yet. The implementing operator discovers the range at execute time (typically from its input's runtime extrema, see API: runtime partition extrema for range-aware operators #23089) and computes interior split points before routing rows. The motivating use case from the spike in Parallel bounded RANGE-frame window functions without PARTITION BY (draft) #23026.
  2. Future range-aware operators where boundaries are data-dependent. Anything that wants to bucket a single stream into N value-ranges without forcing statistics-based estimates to be precise. The existing declarative form is the right answer when split points are known a priori; the dynamic form is the right answer when they're not.

These two cases share the partitioning semantics (lexicographic ordering, half-open intervals) but differ on when the boundaries are known. Keeping them as two variants of Partitioning lets downstream distribution requirements have a stable answer at plan time (partition count is fixed) while leaving the boundary discovery to the implementing operator.

Describe the solution you'd like

A small addition to Partitioning:

pub enum Partitioning {
    RoundRobinBatch(usize),
    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
    Range(RangePartitioning),
    DynamicRange(DynamicRangePartitioning),  // <- new
    UnknownPartitioning(usize),
}

pub struct HaloSpec {
    preceding: ScalarValue,
    following: ScalarValue,
}

pub struct DynamicRangePartitioning {
    ordering: LexOrdering,
    partition_count: usize,
    halo: Option<HaloSpec>,
}

partition_count is fixed at plan time so downstream distribution requirements have a stable answer. Only the split point values are runtime-discovered.

halo is the plan-time contract for "this bucket carries extra rows beyond its primary [min, max] range by distance D on each side". Distances are in the leading sort key's domain — e.g. for RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING over an Int64 key, the halo is preceding=5, following=3. With halo unset, the partitioning produces disjoint buckets and a downstream consumer sees [ExtremaKind::Observed] extrema (per #23089). With halo set, the routing operator publishes [ExtremaKind::Expanded] extrema, and a downstream halo-strip filter is contracted to read the primary range and discard the extra rows.

ROWS-frame halo (a count of neighbor rows rather than a domain distance) is intentionally not represented; a separate variant can be added later if motivated.

Variant introduction only — no execution slot in this PR. RepartitionExec returns not_impl_err! for DynamicRange at every site it does for Range. Proto serialization returns not_impl_err! (proto plumbing follows the same incremental cadence as #22207#22787). FFI bridges to UnknownPartitioning(n) (same path Range takes per #22394).

Implementation in PR #23094 (+470 / -14 LoC, 6 unit tests covering construction, compatible_with, project, halo metadata + display, halo affecting compatibility, and halo preserved through projection).

Design points worth debating

  1. Is Partitioning::DynamicRange the right shape, or should this be a flag on the existing RangePartitioning? A Option<Vec<SplitPoint>> field on RangePartitioning (None = "discover at runtime") is the alternative. Two variants is preferred here because the operator contract is materially different — declared partitioning is a static property the planner can reason about; dynamic partitioning is a runtime contract whose split points only become defined once an upstream operator publishes them. Keeping them separate lets the type system enforce the distinction at every match site. Open to be talked out of this.
  2. partition_count at plan time vs. dynamic. Pinning the partition count at plan time keeps the rest of the optimizer/planner simple — every distribution decision still has a stable answer. The alternative (dynamic partition count too) would require deeper planner changes and is not motivated by any current use case.
  3. How does the execution slot discover boundaries? That's the follow-up: the implementing operator reads its input's runtime extrema via the API proposed in API: runtime partition extrema for range-aware operators #23089 / implemented in feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough #23090. This issue scopes only the partitioning variant itself; the execution slot will be filed separately once both this and feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough #23090 settle.
  4. HaloSpec on the partitioning vs. on the operator. Halo could alternatively live as a config field on the routing operator (RepartitionExec's new dynamic-range mode) rather than on the partitioning type. Putting it on the partitioning means a downstream halo-strip filter can read the halo distance directly from the upstream's declared output partitioning — no operator-internal handshake needed. Open to be pushed back on.
  5. Naming. DynamicRange is straightforward but other options exist (DiscoveredRange, RuntimeRange). Open to the reviewer's preference.

Describe alternatives you've considered

  • A flag on RangePartitioning. Discussed above (point 1).
  • A new dedicated operator (not a Partitioning variant) that routes by runtime-discovered boundaries. Rejected because that would duplicate RepartitionExec's mechanics and bypass the unified EnforceDistribution story the existing Partitioning enum gives.
  • Inferring "dynamic" from absence of split points in RangePartitioning. Same problem as MINOR: Set GitHub description and labels #1 — collapses two contracts into one type at the cost of every consumer having to guard against the absent case.

Coexistence with existing work

Additional context

This issue is the discussion home for the variant. The implementation is PR #23094 — pure addition, behavior mirrors Range at every match site (every not_impl_err! site for Range gets the same treatment for DynamicRange). Execution-slot, proto, logical representation, and substrait round-trip land as separate follow-ups on top, mirroring the cadence Gene used for Range.

Metadata

Metadata

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions