diff --git a/benchmarks/queries/h2o/window.sql b/benchmarks/queries/h2o/window.sql index fa16a3de32ca5..a8d34e6140b70 100644 --- a/benchmarks/queries/h2o/window.sql +++ b/benchmarks/queries/h2o/window.sql @@ -117,3 +117,34 @@ SELECT id2, largest2_v2 FROM ( ROW_NUMBER() OVER (PARTITION BY id2 ORDER BY v2 DESC) AS order_v2 FROM large WHERE v2 IS NOT NULL ) sub_query WHERE order_v2 <= 2; + +-- Window Top-N partition cardinality sweep (id3 % N gives N distinct partitions). +-- These exercise PartitionedTopKExec across cardinalities to validate it stays +-- competitive with the SortExec+Filter baseline as partition count grows. +-- Window Top-N: 100 partitions +SELECT pk, largest2_v2 FROM ( + SELECT id3 % 100 AS pk, v2 AS largest2_v2, + ROW_NUMBER() OVER (PARTITION BY id3 % 100 ORDER BY v2 DESC) AS order_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE order_v2 <= 2; + +-- Window Top-N: 1,000 partitions +SELECT pk, largest2_v2 FROM ( + SELECT id3 % 1000 AS pk, v2 AS largest2_v2, + ROW_NUMBER() OVER (PARTITION BY id3 % 1000 ORDER BY v2 DESC) AS order_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE order_v2 <= 2; + +-- Window Top-N: 10,000 partitions +SELECT pk, largest2_v2 FROM ( + SELECT id3 % 10000 AS pk, v2 AS largest2_v2, + ROW_NUMBER() OVER (PARTITION BY id3 % 10000 ORDER BY v2 DESC) AS order_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE order_v2 <= 2; + +-- Window Top-N: 100,000 partitions +SELECT pk, largest2_v2 FROM ( + SELECT id3 % 100000 AS pk, v2 AS largest2_v2, + ROW_NUMBER() OVER (PARTITION BY id3 % 100000 ORDER BY v2 DESC) AS order_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE order_v2 <= 2; \ No newline at end of file diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index fe876eeddf7f2..f312ba527e58c 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -31,22 +31,19 @@ use std::fmt::{self, Formatter}; use std::sync::Arc; -use arrow::array::{RecordBatch, UInt32Array}; -use arrow::compute::{BatchCoalescer, take_record_batch}; use arrow::datatypes::SchemaRef; -use arrow::row::{OwnedRow, RowConverter}; -use datafusion_common::{HashMap, Result}; +use arrow::row::SortField; +use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::StreamExt; use futures::TryStreamExt; -use parking_lot::RwLock; use crate::execution_plan::{Boundedness, EmissionType}; use crate::metrics::ExecutionPlanMetricsSet; -use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields}; +use crate::topk::{PartitionedTopK, build_sort_fields}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, stream::RecordBatchStreamAdapter, @@ -342,8 +339,6 @@ impl ExecutionPlan for PartitionedTopKExec { let partition_sort_fields = build_sort_fields(&self.expr[..self.partition_prefix_len], &schema)?; - let partition_converter = RowConverter::new(partition_sort_fields)?; - let partition_exprs: Vec> = self.expr [..self.partition_prefix_len] .iter() @@ -359,10 +354,11 @@ impl ExecutionPlan for PartitionedTopKExec { let stream = futures::stream::once(async move { do_partitioned_topk( + partition, input, schema, - partition_converter, partition_exprs, + partition_sort_fields, order_expr, fetch, batch_size, @@ -380,36 +376,20 @@ impl ExecutionPlan for PartitionedTopKExec { } } -/// Create a no-op [`TopKDynamicFilters`] for a per-partition [`TopK`]. -/// -/// In normal `SortExec` top-K mode, dynamic filters push predicates down to -/// the data source (e.g., telling Parquet to skip rows worse than the current -/// K-th best). For per-partition heaps the data is already in memory and split -/// by partition key, so there is no data source to push filters to. We pass -/// `lit(true)` (accept everything) so the filter never rejects any row. -fn create_noop_dynamic_filter() -> Arc> { - Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![], lit(true)), - )))) -} - -/// Read all input, split batches by partition key, feed each sub-batch -/// to a per-partition [`TopK`], then emit results in partition-key order. +/// Read all input, feed each batch into a [`PartitionedTopK`] (which +/// maintains one heap per distinct partition key), then emit results +/// ordered by `(partition_keys, order_keys)`. /// /// # Phases /// -/// 1. **Accumulation** — For each input batch: -/// - Evaluate partition key expressions to get partition column arrays -/// - Convert partition columns to binary [`arrow::row::Row`] format -/// - Group row indices by partition key -/// - Extract sub-batches via [`take_record_batch`] and insert into -/// the partition's [`TopK`] heap +/// 1. **Accumulation** — forward each input `RecordBatch` to +/// [`PartitionedTopK::insert_batch`], which demultiplexes rows by +/// partition key and dispatches them into the per-key heap. The +/// `RowConverter` and `MemoryReservation` are shared across all +/// partitions for this operator instance. /// -/// 2. **Emission** — After all input is consumed: -/// - Sort partition keys so output is ordered by partition key -/// - For each partition in sorted order, call [`TopK::emit`] to get -/// rows sorted by order-by key -/// - Return all batches as a single stream +/// 2. **Emission** — [`PartitionedTopK::emit`] drains all heaps in +/// sorted partition-key order, returning a coalesced batch stream. /// /// # Cost /// @@ -417,99 +397,33 @@ fn create_noop_dynamic_filter() -> Arc> { /// - Memory: O(K × P × row_size) where P = number of distinct partitions #[expect(clippy::too_many_arguments)] async fn do_partitioned_topk( + partition_id: usize, mut input: SendableRecordBatchStream, schema: SchemaRef, - partition_converter: RowConverter, partition_exprs: Vec>, + partition_sort_fields: Vec, order_expr: LexOrdering, fetch: usize, batch_size: usize, - runtime: Arc, + runtime: Arc, metrics_set: ExecutionPlanMetricsSet, ) -> Result { - let mut partitions: HashMap = HashMap::new(); - let mut partition_counter: usize = 0; - - // Macro-like helper: create a new TopK for a partition - macro_rules! new_topk { - () => {{ - let id = partition_counter; - partition_counter += 1; - TopK::try_new( - id, - Arc::clone(&schema), - vec![], - order_expr.clone(), - fetch, - batch_size, - Arc::clone(&runtime), - &metrics_set, - create_noop_dynamic_filter(), - ) - }}; - } + let mut state = PartitionedTopK::try_new( + partition_id, + schema, + partition_exprs, + partition_sort_fields, + order_expr, + fetch, + batch_size, + &runtime, + &metrics_set, + )?; - // ---------- Accumulation phase ---------- while let Some(batch) = input.next().await { - let batch = batch?; - let num_rows = batch.num_rows(); - if num_rows == 0 { - continue; - } - - // Evaluate partition key columns - let pk_arrays: Vec<_> = partition_exprs - .iter() - .map(|e| e.evaluate(&batch).and_then(|v| v.into_array(num_rows))) - .collect::>>()?; - - let pk_rows = partition_converter.convert_columns(&pk_arrays)?; - - // Group row indices by partition key - let mut groups: HashMap> = HashMap::new(); - for row_idx in 0..num_rows { - let pk = pk_rows.row(row_idx).owned(); - groups.entry(pk).or_default().push(row_idx as u32); - } - - // For each partition group, create a sub-batch and feed to TopK - for (pk, indices) in groups { - if !partitions.contains_key(&pk) { - partitions.insert(pk.clone(), new_topk!()?); - } - let topk = partitions.get_mut(&pk).unwrap(); - let indices_array = UInt32Array::from(indices); - let sub_batch = take_record_batch(&batch, &indices_array)?; - topk.insert_batch(sub_batch)?; - } + state.insert_batch(&batch?)?; } - // Release the input pipeline now that accumulation is complete. drop(input); - // ---------- Emit phase ---------- - // Sort partition keys so output is ordered by (partition_keys, order_keys). - let mut sorted_pks: Vec = partitions.keys().cloned().collect(); - sorted_pks.sort(); - - let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), batch_size); - - for pk in sorted_pks { - if let Some(topk) = partitions.remove(&pk) { - // TopK::emit() returns a stream of sorted batches - let mut stream = topk.emit()?; - while let Some(batch) = stream.next().await { - coalescer.push_batch(batch?)?; - } - } - } - coalescer.finish_buffered_batch()?; - let mut output_batches: Vec = Vec::new(); - while let Some(batch) = coalescer.next_completed_batch() { - output_batches.push(batch); - } - - Ok(Box::pin(RecordBatchStreamAdapter::new( - schema, - futures::stream::iter(output_batches.into_iter().map(Ok)), - ))) + state.emit() } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 11cf54c904ac8..1c812e2fcdf2e 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -19,8 +19,11 @@ use arrow::{ array::{Array, AsArray}, - compute::{FilterBuilder, interleave_record_batch, prep_null_mask_filter}, - row::{RowConverter, Rows, SortField}, + compute::{ + BatchCoalescer, FilterBuilder, interleave_record_batch, prep_null_mask_filter, + take_record_batch, + }, + row::{OwnedRow, RowConverter, Rows, SortField}, }; use datafusion_expr::{ColumnarValue, Operator}; use std::mem::size_of; @@ -34,7 +37,7 @@ use super::metrics::{ use crate::spill::get_record_batch_memory_size; use crate::{SendableRecordBatchStream, stream::RecordBatchStreamAdapter}; -use arrow::array::{ArrayRef, RecordBatch}; +use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; use arrow::datatypes::SchemaRef; use datafusion_common::{ HashMap, Result, ScalarValue, internal_datafusion_err, internal_err, @@ -864,7 +867,7 @@ impl TopKHeap { /// Returns the values stored in this heap, from values low to /// high, as a single [`RecordBatch`], and a sorted vec of the /// current heap's contents - pub fn emit_with_state(&mut self) -> Result<(Option, Vec)> { + fn emit_with_state(&mut self) -> Result<(Option, Vec)> { // generate sorted rows let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec(); @@ -1166,6 +1169,218 @@ impl RecordBatchStore { } } +/// Top-K-per-partition operator state. +/// +/// Sibling to [`TopK`]. Where `TopK` maintains a single global heap, +/// `PartitionedTopK` maintains one [`TopKHeap`] per distinct partition +/// key while sharing a single [`RowConverter`], [`MemoryReservation`], +/// scratch [`Rows`] buffer, and [`TopKMetrics`] across all partitions. +/// +/// This sharing is the point of the type: with N distinct partition +/// keys, a naive `HashMap<_, TopK>` pays N × constant overhead for +/// `RowConverter::new`, `MemoryConsumer::register`, and metric +/// counter setup. `PartitionedTopK` pays it once. +pub(crate) struct PartitionedTopK { + schema: SchemaRef, + metrics: TopKMetrics, + reservation: MemoryReservation, + /// ORDER BY expressions (excludes PARTITION BY). + expr: LexOrdering, + /// Encoder for ORDER BY columns. Reused across partitions. + row_converter: RowConverter, + /// Scratch row buffer reused across `insert_batch` calls. + scratch_rows: Rows, + /// PARTITION BY expressions. + partition_exprs: Vec>, + /// Encoder for the partition key. + partition_converter: RowConverter, + /// One heap per distinct partition key seen so far. + heaps: HashMap, + k: usize, + batch_size: usize, +} + +impl PartitionedTopK { + #[expect(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition_id: usize, + schema: SchemaRef, + partition_exprs: Vec>, + partition_sort_fields: Vec, + order_expr: LexOrdering, + k: usize, + batch_size: usize, + runtime: &Arc, + metrics: &ExecutionPlanMetricsSet, + ) -> Result { + assert!(k > 0, "PartitionedTopK requires k > 0"); + let reservation = MemoryConsumer::new(format!("PartitionedTopK[{partition_id}]")) + .register(&runtime.memory_pool); + + let order_sort_fields = build_sort_fields(&order_expr, &schema)?; + let row_converter = RowConverter::new(order_sort_fields)?; + let scratch_rows = + row_converter.empty_rows(batch_size, ESTIMATED_BYTES_PER_ROW * batch_size); + + let partition_converter = RowConverter::new(partition_sort_fields)?; + + Ok(Self { + schema, + metrics: TopKMetrics::new(metrics, partition_id), + reservation, + expr: order_expr, + row_converter, + scratch_rows, + partition_exprs, + partition_converter, + heaps: HashMap::new(), + k, + batch_size, + }) + } + + /// Demultiplex `batch` rows by partition key, encode the ORDER BY + /// columns once for the whole batch, and feed each partition's + /// rows into its dedicated [`TopKHeap`]. + pub(crate) fn insert_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let baseline = self.metrics.baseline.clone(); + let _timer = baseline.elapsed_compute().timer(); + + let num_rows = batch.num_rows(); + if num_rows == 0 { + return Ok(()); + } + + // 1. Evaluate + encode partition columns. + let pk_arrays: Vec = self + .partition_exprs + .iter() + .map(|e| e.evaluate(batch).and_then(|v| v.into_array(num_rows))) + .collect::>()?; + let pk_rows = self.partition_converter.convert_columns(&pk_arrays)?; + + // 2. Demultiplex row indices by partition key (per-batch). + let mut groups: HashMap> = HashMap::new(); + for i in 0..num_rows { + groups + .entry(pk_rows.row(i).owned()) + .or_default() + .push(i as u32); + } + + // 3. Evaluate ORDER BY columns on the full batch and encode ONCE. + let ob_arrays: Vec = self + .expr + .iter() + .map(|e| e.expr.evaluate(batch).and_then(|v| v.into_array(num_rows))) + .collect::>()?; + self.scratch_rows.clear(); + self.row_converter + .append(&mut self.scratch_rows, &ob_arrays)?; + + // 4. Per-partition: take the sub-batch, walk indices, dispatch + // qualifying rows into the partition's heap. + let k = self.k; + let mut replacements: usize = 0; + for (pk, indices) in groups { + let heap = self.heaps.entry(pk).or_insert_with(|| TopKHeap::new(k)); + + // Once a heap is full, most rows at high partition cardinality + // are rejected. Skip the gather + batch registration entirely + // when nothing in this partition group can improve the heap. + let any_qualify = indices.iter().any(|&orig_idx| { + let bytes = self.scratch_rows.row(orig_idx as usize); + match heap.max() { + Some(max_row) => bytes.as_ref() < max_row.row(), + None => true, + } + }); + if !any_qualify { + continue; + } + + let indices_arr = UInt32Array::from(indices); + let sub_batch = take_record_batch(batch, &indices_arr)?; + let mut entry = heap.register_batch(sub_batch); + + for (sub_idx, &orig_idx) in indices_arr.values().iter().enumerate() { + let row = self.scratch_rows.row(orig_idx as usize); + match heap.max() { + Some(max_row) if row.as_ref() >= max_row.row() => {} + None | Some(_) => { + heap.add(&mut entry, row, sub_idx); + replacements += 1; + } + } + } + + heap.insert_batch_entry(entry); + heap.maybe_compact()?; + } + + if replacements > 0 { + self.metrics.row_replacements.add(replacements); + } + self.reservation.try_resize(self.size())?; + Ok(()) + } + + /// Drain all heaps in partition-key order and return the rows as + /// a stream of coalesced `RecordBatch`es ordered by + /// `(partition_keys, order_keys)`. + pub(crate) fn emit(self) -> Result { + let Self { + schema, + metrics, + reservation: _, + expr: _, + row_converter: _, + scratch_rows: _, + partition_exprs: _, + partition_converter: _, + mut heaps, + k: _, + batch_size, + } = self; + let _timer = metrics.baseline.elapsed_compute().timer(); + + let mut sorted_pks: Vec = heaps.keys().cloned().collect(); + sorted_pks.sort(); + + let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), batch_size); + + for pk in sorted_pks { + let mut heap = heaps.remove(&pk).expect("key from heaps.keys()"); + if let Some(batch) = heap.emit()? { + (&batch).record_output(&metrics.baseline); + coalescer.push_batch(batch)?; + } + } + coalescer.finish_buffered_batch()?; + + let mut out: Vec> = Vec::new(); + while let Some(b) = coalescer.next_completed_batch() { + out.push(Ok(b)); + } + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(out), + ))) + } + + /// Total memory currently held by this operator, including all + /// per-partition heaps. + fn size(&self) -> usize { + size_of::() + + self.row_converter.size() + + self.partition_converter.size() + + self.scratch_rows.size() + + self.heaps.values().map(|h| h.size()).sum::() + + self.heaps.capacity() * (size_of::() + size_of::()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -1812,4 +2027,157 @@ mod tests { Ok(()) } + + /// Builds a `(pk Int32, val Int32)` schema and a `PartitionedTopK` + /// partitioned by `pk` with order `val ASC`. Helper for the + /// `PartitionedTopK` tests below. + fn build_partitioned_topk(k: usize) -> Result<(Arc, PartitionedTopK)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("val", DataType::Int32, false), + ])); + + let pk_expr: Arc = col("pk", schema.as_ref())?; + let pk_sort_expr = PhysicalSortExpr { + expr: Arc::clone(&pk_expr), + options: SortOptions::default(), + }; + let val_sort_expr = PhysicalSortExpr { + expr: col("val", schema.as_ref())?, + options: SortOptions::default(), + }; + + let partition_sort_fields = build_sort_fields(&[pk_sort_expr], &schema)?; + let order_expr = LexOrdering::from([val_sort_expr]); + + let state = PartitionedTopK::try_new( + 0, + Arc::clone(&schema), + vec![pk_expr], + partition_sort_fields, + order_expr, + k, + 8, // batch_size + &Arc::new(RuntimeEnv::default()), + &ExecutionPlanMetricsSet::new(), + )?; + Ok((schema, state)) + } + + fn pk_val_batch( + schema: &Arc, + pks: Vec, + vals: Vec, + ) -> Result { + Ok(RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(Int32Array::from(pks)), + Arc::new(Int32Array::from(vals)), + ], + )?) + } + + /// Multiple distinct partition keys interleaved within a single + /// input batch — the per-batch demux, per-partition heap eviction, + /// and partition-key-ordered emit must all behave correctly. + #[tokio::test] + async fn test_partitioned_topk_multi_partition_within_batch() -> Result<()> { + let (schema, mut state) = build_partitioned_topk(2)?; + + // pk=1 vals: 10, 5, 8 → top-2 ASC = [5, 8] + // pk=2 vals: 20, 15 → top-2 ASC = [15, 20] + // pk=3 vals: 7 → top-2 ASC = [7] + let batch = + pk_val_batch(&schema, vec![1, 2, 1, 2, 1, 3], vec![10, 20, 5, 15, 8, 7])?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 5 |", + "| 1 | 8 |", + "| 2 | 15 |", + "| 2 | 20 |", + "| 3 | 7 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// State must accumulate across `insert_batch` calls: a partition + /// key seen in batch 1 should still own its heap when batch 2 + /// arrives, and a row in batch 2 that beats the existing K-th + /// best should evict the loser. + #[tokio::test] + async fn test_partitioned_topk_cross_batch_eviction() -> Result<()> { + let (schema, mut state) = build_partitioned_topk(2)?; + + // Batch 1: pk=1 fills the heap with [50, 40]. + state.insert_batch(&pk_val_batch(&schema, vec![1, 1], vec![50, 40])?)?; + + // Batch 2: pk=1 sees a smaller value (10) — it must evict 50. + // pk=2 appears for the first time mid-stream. + state.insert_batch(&pk_val_batch( + &schema, + vec![1, 2, 1], + vec![10, 99, 60], // 60 > 40 stays on top, gets discarded + )?)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 10 |", + "| 1 | 40 |", + "| 2 | 99 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// Empty input must produce an empty output stream, not panic. + #[tokio::test] + async fn test_partitioned_topk_empty_input() -> Result<()> { + let (_schema, state) = build_partitioned_topk(3)?; + let results: Vec<_> = state.emit()?.try_collect().await?; + assert!(results.is_empty(), "empty input → empty output"); + Ok(()) + } + + /// `fetch = 1` is a common case (rn = 1 filter). The heap should + /// hold exactly one row per partition: the partition's minimum. + #[tokio::test] + async fn test_partitioned_topk_fetch_one() -> Result<()> { + let (schema, mut state) = build_partitioned_topk(1)?; + state.insert_batch(&pk_val_batch( + &schema, + vec![1, 1, 2, 2, 3], + vec![3, 1, 9, 4, 7], + )?)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 1 |", + "| 2 | 4 |", + "| 3 | 7 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } }