diff --git a/datafusion/ffi/src/physical_expr/partitioning.rs b/datafusion/ffi/src/physical_expr/partitioning.rs index eec437639e156..44fecd14e2e1b 100644 --- a/datafusion/ffi/src/physical_expr/partitioning.rs +++ b/datafusion/ffi/src/physical_expr/partitioning.rs @@ -50,6 +50,9 @@ impl From<&Partitioning> for FFI_Partitioning { Partitioning::Range(range) => { Self::UnknownPartitioning(range.partition_count()) } + Partitioning::DynamicRange(range) => { + Self::UnknownPartitioning(range.partition_count()) + } Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size), } } diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2e0aaaf3fb4b7..fc0e446790c79 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -22,7 +22,7 @@ use crate::{ expressions::UnKnownColumn, physical_exprs_equal, }; pub use datafusion_common::SplitPoint; -use datafusion_common::{Result, validate_range_split_points}; +use datafusion_common::{Result, ScalarValue, validate_range_split_points}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::fmt; @@ -122,6 +122,10 @@ pub enum Partitioning { Hash(Vec>, usize), /// Partition rows by source-declared ranges Range(RangePartitioning), + /// Partition rows by ranges whose split points are discovered at execution + /// time from upstream data, not declared at plan time. See + /// [`DynamicRangePartitioning`] for the contract. + DynamicRange(DynamicRangePartitioning), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), } @@ -139,6 +143,7 @@ impl Display for Partitioning { write!(f, "Hash([{phy_exprs_str}], {size})") } Partitioning::Range(range) => write!(f, "{range}"), + Partitioning::DynamicRange(range) => write!(f, "{range}"), Partitioning::UnknownPartitioning(size) => { write!(f, "UnknownPartitioning({size})") } @@ -344,6 +349,252 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String { .join(", ") } +/// Per-side halo distances for a [`DynamicRangePartitioning`]. +/// +/// Halo rows are extra rows deliberately routed beyond a bucket's primary +/// `[min, max]` range so a downstream operator (typically a windowing or +/// filter pass) can see the full neighborhood at each seam. Each side's +/// distance is measured in the **leading sort key's domain** — the same +/// `DataType` as the first expression in +/// [`DynamicRangePartitioning::ordering`]. +/// +/// For example, for a `RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING` window +/// frame over an `Int64` sort key, the halo is `preceding = 5, following +/// = 3` — bucket `i`'s output contains its own primary range plus 5 +/// units of overlap to the left and 3 units to the right. +/// +/// Halo here is RANGE-frame style: a distance in the sort key's domain. +/// ROWS-frame halo (a count of neighboring rows) is not represented; +/// that interpretation will need a separate variant if and when it is +/// motivated. +#[derive(Debug, Clone, PartialEq)] +pub struct HaloSpec { + /// Distance the bucket extends below its primary `min` (toward + /// lex-smaller values along the leading sort key). Must share its + /// `DataType` with the leading sort key expression. + preceding: ScalarValue, + /// Distance the bucket extends above its primary `max` (toward + /// lex-larger values along the leading sort key). Must share its + /// `DataType` with the leading sort key expression. + following: ScalarValue, +} + +impl HaloSpec { + /// Creates a halo spec. `preceding` and `following` must share their + /// `DataType` with the leading sort key expression of the partitioning + /// they will be attached to; this is not validated at construction. + pub fn new(preceding: ScalarValue, following: ScalarValue) -> Self { + Self { + preceding, + following, + } + } + + /// Distance the bucket extends below its primary `min`. + pub fn preceding(&self) -> &ScalarValue { + &self.preceding + } + + /// Distance the bucket extends above its primary `max`. + pub fn following(&self) -> &ScalarValue { + &self.following + } +} + +impl Display for HaloSpec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "halo(preceding={}, following={})", + self.preceding, self.following + ) + } +} + +/// Physical range partitioning where split points are discovered at execution +/// time from upstream data, not declared at plan time. +/// +/// Where [`RangePartitioning`] takes its split points as a plan-time +/// constant (declared by a `TableProvider` or computed by a planner from +/// statistics), `DynamicRangePartitioning` describes the same model except +/// the boundary set is only known once an upstream operator has observed +/// its actual data range. The implementing operator is expected to +/// discover the range at `execute()` time — typically by reading runtime +/// extrema from its input — and to compute interior split points before +/// it routes the first row. +/// +/// The number of output partitions is fixed at plan time so downstream +/// distribution requirements have a stable answer. Only the split point +/// values are runtime-discovered. +/// +/// Once the operator has computed split points, the partition contract is +/// the same as [`RangePartitioning`]: lexicographic ordering, half-open +/// intervals, one row per output partition — unless [`Self::halo`] is set, +/// in which case the operator deliberately routes extra rows beyond each +/// bucket's primary range so a downstream pass can see the full +/// neighborhood at each seam. See [`HaloSpec`]. +/// +/// NOTE: Optimizer and execution behavior for this partitioning is +/// intentionally not implemented and will be introduced incrementally. +/// See . +#[derive(Debug, Clone, PartialEq)] +pub struct DynamicRangePartitioning { + /// Ordered partitioning key. + ordering: LexOrdering, + /// Number of output partitions. Fixed at plan time; split points + /// between them are discovered at execute time. + partition_count: usize, + /// Optional per-side halo distance. When set, the implementing + /// operator routes extra rows beyond each bucket's primary range; a + /// downstream operator is expected to strip them back to the primary + /// range by reading [`ExtremaKind::Expanded`] extrema. When unset, the + /// partitioning produces disjoint buckets and a downstream consumer + /// sees [`ExtremaKind::Observed`] extrema. + /// + /// [`ExtremaKind::Expanded`]: https://docs.rs/datafusion/latest/datafusion_physical_plan/enum.ExtremaKind.html#variant.Expanded + /// [`ExtremaKind::Observed`]: https://docs.rs/datafusion/latest/datafusion_physical_plan/enum.ExtremaKind.html#variant.Observed + halo: Option, +} + +impl DynamicRangePartitioning { + /// Creates dynamic range partitioning metadata with no halo. + /// + /// `partition_count` must be at least 1. + pub fn new(ordering: LexOrdering, partition_count: usize) -> Self { + Self { + ordering, + partition_count, + halo: None, + } + } + + /// Attaches a [`HaloSpec`] to this partitioning, declaring that the + /// implementing operator routes extra rows beyond each bucket's + /// primary range. Builder-style. + pub fn with_halo(mut self, halo: HaloSpec) -> Self { + self.halo = Some(halo); + self + } + + /// Returns the ordering that defines the range key. + pub fn ordering(&self) -> &LexOrdering { + &self.ordering + } + + /// Returns the number of output partitions. + pub fn partition_count(&self) -> usize { + self.partition_count + } + + /// Returns the halo spec, if set. + pub fn halo(&self) -> Option<&HaloSpec> { + self.halo.as_ref() + } + + /// Returns true when `self` and `other` describe the same dynamic range + /// partition map. + /// + /// Single-partition dynamic range partitionings are always compatible. + /// Otherwise the two partitionings must have the same partition count, + /// matching halo (or both `None`), and equivalent ordering expressions + /// with the same sort options. Split points are not compared because + /// neither side has them yet. + pub fn compatible_with( + &self, + other: &Self, + eq_properties: &EquivalenceProperties, + ) -> bool { + if self.partition_count == 1 && other.partition_count == 1 { + return true; + } + + if self.partition_count != other.partition_count + || self.ordering.len() != other.ordering.len() + || self.halo != other.halo + { + return false; + } + + if !self + .ordering + .iter() + .zip(other.ordering.iter()) + .all(|(left, right)| left.options == right.options) + { + return false; + } + + let left_exprs = self + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + let right_exprs = other + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + + equivalent_exprs(&left_exprs, &right_exprs, eq_properties) + } + + /// Calculates the dynamic range partitioning after applying the given + /// projection. + /// + /// Returns `None` if any range key cannot be projected or if projection + /// collapses distinct range keys into duplicate output expressions. + /// Halo (if any) is preserved unchanged — halo is measured in the + /// leading sort key's domain, which the projection must keep stable + /// for the result to be valid. + fn project( + &self, + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, + ) -> Option { + let exprs = self + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + let projected_exprs = input_eq_properties + .project_expressions(&exprs, mapping) + .collect::>>()?; + let sort_exprs = self + .ordering + .iter() + .zip(projected_exprs) + .map(|(sort_expr, expr)| PhysicalSortExpr::new(expr, sort_expr.options)) + .collect::>(); + let ordering = LexOrdering::new(sort_exprs)?; + if ordering.len() != self.ordering.len() { + return None; + } + + Some(Self { + ordering, + partition_count: self.partition_count, + halo: self.halo.clone(), + }) + } +} + +impl Display for DynamicRangePartitioning { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self.halo { + Some(halo) => write!( + f, + "DynamicRange([{}], {}, {})", + self.ordering, self.partition_count, halo + ), + None => write!( + f, + "DynamicRange([{}], {})", + self.ordering, self.partition_count + ), + } + } +} + fn equivalent_exprs( left: &[Arc], right: &[Arc], @@ -403,6 +654,7 @@ impl Partitioning { match self { RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, Range(range) => range.partition_count(), + DynamicRange(range) => range.partition_count(), } } @@ -438,6 +690,9 @@ impl Partitioning { (Partitioning::Range(left), Partitioning::Range(right)) => { left.compatible_with(right, eq_properties) } + (Partitioning::DynamicRange(left), Partitioning::DynamicRange(right)) => { + left.compatible_with(right, eq_properties) + } _ => false, } } @@ -526,6 +781,7 @@ impl Partitioning { } Partitioning::RoundRobinBatch(_) | Partitioning::Range(_) + | Partitioning::DynamicRange(_) | Partitioning::UnknownPartitioning(_) => { PartitioningSatisfaction::NotSatisfied } @@ -560,6 +816,13 @@ impl Partitioning { Partitioning::UnknownPartitioning(range.partition_count()) } } + Partitioning::DynamicRange(range) => { + if let Some(projected) = range.project(mapping, input_eq_properties) { + Partitioning::DynamicRange(projected) + } else { + Partitioning::UnknownPartitioning(range.partition_count()) + } + } Partitioning::RoundRobinBatch(_) | Partitioning::UnknownPartitioning(_) => { self.clone() } @@ -580,6 +843,9 @@ impl PartialEq for Partitioning { true } (Partitioning::Range(left), Partitioning::Range(right)) => left == right, + (Partitioning::DynamicRange(left), Partitioning::DynamicRange(right)) => { + left == right + } _ => false, } } @@ -743,6 +1009,22 @@ mod tests { .expect("test range partitioning should be valid"), ) } + + fn dynamic_range( + &self, + indices: impl IntoIterator, + partition_count: usize, + ) -> DynamicRangePartitioning { + DynamicRangePartitioning::new(self.range_ordering(indices), partition_count) + } + + fn dynamic_range_partitioning( + &self, + indices: impl IntoIterator, + partition_count: usize, + ) -> Partitioning { + Partitioning::DynamicRange(self.dynamic_range(indices, partition_count)) + } } #[test] @@ -1310,6 +1592,168 @@ mod tests { Ok(()) } + #[test] + fn test_dynamic_range_partitioning_metadata() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let dynamic_range = fixture.dynamic_range([0], 4); + assert_eq!(dynamic_range.ordering()[0].to_string(), "a@0 ASC"); + assert_eq!(dynamic_range.partition_count(), 4); + + let partitioning = Partitioning::DynamicRange(dynamic_range); + assert_eq!(partitioning.partition_count(), 4); + assert_eq!(partitioning.to_string(), "DynamicRange([a@0 ASC], 4)"); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let mut eq_properties = fixture.eq_properties.clone(); + eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; + + let range_a = fixture.dynamic_range([0], 4); + let range_a_same = fixture.dynamic_range([0], 4); + let range_b_equivalent = fixture.dynamic_range([1], 4); + let range_b_different_count = fixture.dynamic_range([1], 8); + let range_a_desc = DynamicRangePartitioning::new( + [fixture.range_sort_expr(0, SortOptions::new(true, false))].into(), + 4, + ); + let single_partition_range_a = fixture.dynamic_range([0], 1); + let single_partition_range_b = fixture.dynamic_range([1], 1); + + assert!(range_a.compatible_with(&range_a_same, &fixture.eq_properties)); + assert!(range_a.compatible_with(&range_b_equivalent, &eq_properties)); + assert!(!range_a.compatible_with(&range_b_equivalent, &fixture.eq_properties)); + assert!(!range_a.compatible_with(&range_b_different_count, &eq_properties)); + assert!(!range_a.compatible_with(&range_a_desc, &eq_properties)); + assert!( + single_partition_range_a + .compatible_with(&single_partition_range_b, &fixture.eq_properties) + ); + + // Through the Partitioning enum, with cross-variant mismatch: + assert!(fixture.dynamic_range_partitioning([0], 4).compatible_with( + &fixture.dynamic_range_partitioning([0], 4), + &fixture.eq_properties + )); + assert!( + !fixture.dynamic_range_partitioning([0], 4).compatible_with( + &fixture.hash_partitioning([0], 4), + &fixture.eq_properties + ) + ); + // DynamicRange vs declared Range are never compatible — they + // describe different operator contracts. + assert!(!fixture.dynamic_range_partitioning([0], 3).compatible_with( + &fixture.range_partitioning( + [0], + vec![int_split_point([10]), int_split_point([20])] + ), + &fixture.eq_properties + )); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_project_preserves_or_degrades() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let dynamic_range = Partitioning::DynamicRange(DynamicRangePartitioning::new( + [fixture.range_sort_expr(1, SortOptions::new(true, false))].into(), + 4, + )); + + let keep_b_mapping = ProjectionMapping::from_indices(&[1], &fixture.schema)?; + let projected = dynamic_range.project(&keep_b_mapping, &fixture.eq_properties); + assert_eq!( + projected.to_string(), + "DynamicRange([b@0 DESC NULLS LAST], 4)" + ); + + let drop_b_mapping = ProjectionMapping::from_indices(&[0], &fixture.schema)?; + let projected = dynamic_range.project(&drop_b_mapping, &fixture.eq_properties); + let Partitioning::UnknownPartitioning(partition_count) = projected else { + panic!("expected UnknownPartitioning, got {projected:?}"); + }; + assert_eq!(partition_count, 4); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_halo_metadata() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let with_halo = fixture.dynamic_range([0], 4).with_halo(halo.clone()); + + assert_eq!(with_halo.halo(), Some(&halo)); + assert_eq!( + with_halo.to_string(), + "DynamicRange([a@0 ASC], 4, halo(preceding=5, following=3))" + ); + + // No halo round-trips unchanged. + let no_halo = fixture.dynamic_range([0], 4); + assert_eq!(no_halo.halo(), None); + assert_eq!(no_halo.to_string(), "DynamicRange([a@0 ASC], 4)"); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_halo_affects_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo_a = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let halo_b = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(7))); + + let plain = fixture.dynamic_range([0], 4); + let with_a = fixture.dynamic_range([0], 4).with_halo(halo_a.clone()); + let with_a_same = fixture.dynamic_range([0], 4).with_halo(halo_a.clone()); + let with_b = fixture.dynamic_range([0], 4).with_halo(halo_b); + + // Identical halos → compatible. + assert!(with_a.compatible_with(&with_a_same, &fixture.eq_properties)); + // Mismatched halos → not compatible. + assert!(!with_a.compatible_with(&with_b, &fixture.eq_properties)); + // No-halo vs halo → not compatible. + assert!(!plain.compatible_with(&with_a, &fixture.eq_properties)); + assert!(!with_a.compatible_with(&plain, &fixture.eq_properties)); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_project_preserves_halo() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let dynamic_range = Partitioning::DynamicRange( + DynamicRangePartitioning::new( + [fixture.range_sort_expr(1, SortOptions::new(true, false))].into(), + 4, + ) + .with_halo(halo.clone()), + ); + + let keep_b_mapping = ProjectionMapping::from_indices(&[1], &fixture.schema)?; + let projected = dynamic_range.project(&keep_b_mapping, &fixture.eq_properties); + let Partitioning::DynamicRange(projected_inner) = &projected else { + panic!("expected DynamicRange, got {projected:?}"); + }; + assert_eq!(projected_inner.halo(), Some(&halo)); + assert_eq!( + projected.to_string(), + "DynamicRange([b@0 DESC NULLS LAST], 4, halo(preceding=5, following=3))" + ); + + Ok(()) + } + #[test] fn test_hash_partitioning_compatible_with() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2298183485f55..6405eddd2b2d3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1439,7 +1439,7 @@ impl ExecutionPlan for RepartitionExec { } Partitioning::Hash(new_partitions, *size) } - Partitioning::Range(_) => { + Partitioning::Range(_) | Partitioning::DynamicRange(_) => { // Range partitioning optimizer propagation is tracked in // https://github.com/apache/datafusion/issues/22395 return not_impl_err!( @@ -1483,7 +1483,7 @@ impl ExecutionPlan for RepartitionExec { return Ok(SortOrderPushdownResult::Unsupported); } match self.partitioning() { - Partitioning::Range(_) => { + Partitioning::Range(_) | Partitioning::DynamicRange(_) => { // Range partitioning optimizer propagation is tracked in // https://github.com/apache/datafusion/issues/22395 return not_impl_err!( @@ -1517,7 +1517,7 @@ impl ExecutionPlan for RepartitionExec { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), UnknownPartitioning(_) => UnknownPartitioning(target_partitions), - Range(_) => { + Range(_) | DynamicRange(_) => { // Range repartition execution is tracked in // https://github.com/apache/datafusion/issues/22397 return not_impl_err!( diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7310c0928eee4..a498758c52a2f 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -380,6 +380,15 @@ pub fn serialize_partitioning( serialize_range_partitioning(range, codec, proto_converter)?, )), }, + Partitioning::DynamicRange(_) => { + // Proto plumbing for DynamicRange is intentionally not + // implemented in the variant-introduction PR and will be + // added incrementally. See + // . + return not_impl_err!( + "Serialization of DynamicRange partitioning is not implemented" + ); + } Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning { partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown( *partition_count as u64,