Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions benchmarks/queries/h2o/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,34 @@ SELECT id2, largest2_v2 FROM (
ROW_NUMBER() OVER (PARTITION BY id2 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2;

-- Window Top-N partition cardinality sweep (id3 % N gives N distinct partitions).
-- These exercise PartitionedTopKExec across cardinalities to validate it stays
-- competitive with the SortExec+Filter baseline as partition count grows.
-- Window Top-N: 100 partitions
SELECT pk, largest2_v2 FROM (
SELECT id3 % 100 AS pk, v2 AS largest2_v2,
ROW_NUMBER() OVER (PARTITION BY id3 % 100 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2;

-- Window Top-N: 1,000 partitions
SELECT pk, largest2_v2 FROM (
SELECT id3 % 1000 AS pk, v2 AS largest2_v2,
ROW_NUMBER() OVER (PARTITION BY id3 % 1000 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2;

-- Window Top-N: 10,000 partitions
SELECT pk, largest2_v2 FROM (
SELECT id3 % 10000 AS pk, v2 AS largest2_v2,
ROW_NUMBER() OVER (PARTITION BY id3 % 10000 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2;

-- Window Top-N: 100,000 partitions
SELECT pk, largest2_v2 FROM (
SELECT id3 % 100000 AS pk, v2 AS largest2_v2,
ROW_NUMBER() OVER (PARTITION BY id3 % 100000 ORDER BY v2 DESC) AS order_v2
FROM large WHERE v2 IS NOT NULL
) sub_query WHERE order_v2 <= 2;
150 changes: 32 additions & 118 deletions datafusion/physical-plan/src/sorts/partitioned_topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,19 @@
use std::fmt::{self, Formatter};
use std::sync::Arc;

use arrow::array::{RecordBatch, UInt32Array};
use arrow::compute::{BatchCoalescer, take_record_batch};
use arrow::datatypes::SchemaRef;
use arrow::row::{OwnedRow, RowConverter};
use datafusion_common::{HashMap, Result};
use arrow::row::SortField;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use futures::StreamExt;
use futures::TryStreamExt;
use parking_lot::RwLock;

use crate::execution_plan::{Boundedness, EmissionType};
use crate::metrics::ExecutionPlanMetricsSet;
use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields};
use crate::topk::{PartitionedTopK, build_sort_fields};
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
PlanProperties, SendableRecordBatchStream, stream::RecordBatchStreamAdapter,
Expand Down Expand Up @@ -342,8 +339,6 @@ impl ExecutionPlan for PartitionedTopKExec {
let partition_sort_fields =
build_sort_fields(&self.expr[..self.partition_prefix_len], &schema)?;

let partition_converter = RowConverter::new(partition_sort_fields)?;

let partition_exprs: Vec<Arc<dyn PhysicalExpr>> = self.expr
[..self.partition_prefix_len]
.iter()
Expand All @@ -359,10 +354,11 @@ impl ExecutionPlan for PartitionedTopKExec {

let stream = futures::stream::once(async move {
do_partitioned_topk(
partition,
input,
schema,
partition_converter,
partition_exprs,
partition_sort_fields,
order_expr,
fetch,
batch_size,
Expand All @@ -380,136 +376,54 @@ impl ExecutionPlan for PartitionedTopKExec {
}
}

/// Create a no-op [`TopKDynamicFilters`] for a per-partition [`TopK`].
///
/// In normal `SortExec` top-K mode, dynamic filters push predicates down to
/// the data source (e.g., telling Parquet to skip rows worse than the current
/// K-th best). For per-partition heaps the data is already in memory and split
/// by partition key, so there is no data source to push filters to. We pass
/// `lit(true)` (accept everything) so the filter never rejects any row.
fn create_noop_dynamic_filter() -> Arc<RwLock<TopKDynamicFilters>> {
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
))))
}

/// Read all input, split batches by partition key, feed each sub-batch
/// to a per-partition [`TopK`], then emit results in partition-key order.
/// Read all input, feed each batch into a [`PartitionedTopK`] (which
/// maintains one heap per distinct partition key), then emit results
/// ordered by `(partition_keys, order_keys)`.
///
/// # Phases
///
/// 1. **Accumulation** — For each input batch:
/// - Evaluate partition key expressions to get partition column arrays
/// - Convert partition columns to binary [`arrow::row::Row`] format
/// - Group row indices by partition key
/// - Extract sub-batches via [`take_record_batch`] and insert into
/// the partition's [`TopK`] heap
/// 1. **Accumulation** — forward each input `RecordBatch` to
/// [`PartitionedTopK::insert_batch`], which demultiplexes rows by
/// partition key and dispatches them into the per-key heap. The
/// `RowConverter` and `MemoryReservation` are shared across all
/// partitions for this operator instance.
///
/// 2. **Emission** — After all input is consumed:
/// - Sort partition keys so output is ordered by partition key
/// - For each partition in sorted order, call [`TopK::emit`] to get
/// rows sorted by order-by key
/// - Return all batches as a single stream
/// 2. **Emission** — [`PartitionedTopK::emit`] drains all heaps in
/// sorted partition-key order, returning a coalesced batch stream.
///
/// # Cost
///
/// - Time: O(N log K) where N = total rows, K = fetch
/// - Memory: O(K × P × row_size) where P = number of distinct partitions
#[expect(clippy::too_many_arguments)]
async fn do_partitioned_topk(
partition_id: usize,
mut input: SendableRecordBatchStream,
schema: SchemaRef,
partition_converter: RowConverter,
partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
partition_sort_fields: Vec<SortField>,
order_expr: LexOrdering,
fetch: usize,
batch_size: usize,
runtime: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
runtime: Arc<RuntimeEnv>,
metrics_set: ExecutionPlanMetricsSet,
) -> Result<SendableRecordBatchStream> {
let mut partitions: HashMap<OwnedRow, TopK> = HashMap::new();
let mut partition_counter: usize = 0;

// Macro-like helper: create a new TopK for a partition
macro_rules! new_topk {
() => {{
let id = partition_counter;
partition_counter += 1;
TopK::try_new(
id,
Arc::clone(&schema),
vec![],
order_expr.clone(),
fetch,
batch_size,
Arc::clone(&runtime),
&metrics_set,
create_noop_dynamic_filter(),
)
}};
}
let mut state = PartitionedTopK::try_new(
partition_id,
schema,
partition_exprs,
partition_sort_fields,
order_expr,
fetch,
batch_size,
&runtime,
&metrics_set,
)?;

// ---------- Accumulation phase ----------
while let Some(batch) = input.next().await {
let batch = batch?;
let num_rows = batch.num_rows();
if num_rows == 0 {
continue;
}

// Evaluate partition key columns
let pk_arrays: Vec<_> = partition_exprs
.iter()
.map(|e| e.evaluate(&batch).and_then(|v| v.into_array(num_rows)))
.collect::<Result<Vec<_>>>()?;

let pk_rows = partition_converter.convert_columns(&pk_arrays)?;

// Group row indices by partition key
let mut groups: HashMap<OwnedRow, Vec<u32>> = HashMap::new();
for row_idx in 0..num_rows {
let pk = pk_rows.row(row_idx).owned();
groups.entry(pk).or_default().push(row_idx as u32);
}

// For each partition group, create a sub-batch and feed to TopK
for (pk, indices) in groups {
if !partitions.contains_key(&pk) {
partitions.insert(pk.clone(), new_topk!()?);
}
let topk = partitions.get_mut(&pk).unwrap();
let indices_array = UInt32Array::from(indices);
let sub_batch = take_record_batch(&batch, &indices_array)?;
topk.insert_batch(sub_batch)?;
}
state.insert_batch(&batch?)?;
}
// Release the input pipeline now that accumulation is complete.
drop(input);

// ---------- Emit phase ----------
// Sort partition keys so output is ordered by (partition_keys, order_keys).
let mut sorted_pks: Vec<OwnedRow> = partitions.keys().cloned().collect();
sorted_pks.sort();

let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), batch_size);

for pk in sorted_pks {
if let Some(topk) = partitions.remove(&pk) {
// TopK::emit() returns a stream of sorted batches
let mut stream = topk.emit()?;
while let Some(batch) = stream.next().await {
coalescer.push_batch(batch?)?;
}
}
}
coalescer.finish_buffered_batch()?;
let mut output_batches: Vec<RecordBatch> = Vec::new();
while let Some(batch) = coalescer.next_completed_batch() {
output_batches.push(batch);
}

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(output_batches.into_iter().map(Ok)),
)))
state.emit()
}
Loading
Loading