Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,82 @@ impl LogicalPlan {
}
}

/// Returns the skip (offset) of this plan node, if it has one.
///
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
pub fn skip(&self) -> Result<Option<usize>> {
match self {
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
SkipType::Literal(0) => Ok(None),
SkipType::Literal(n) => Ok(Some(n)),
SkipType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Sort(_) => Ok(None),
LogicalPlan::TableScan(_) => Ok(None),
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// Returns the fetch (limit) of this plan node, if it has one.
///
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
/// return `Ok(None)`.
pub fn fetch(&self) -> Result<Option<usize>> {
match self {
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
FetchType::Literal(s) => Ok(s),
FetchType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// If this node's expressions contains any references to an outer subquery
pub fn contains_outer_reference(&self) -> bool {
let mut contains = false;
Expand Down
66 changes: 66 additions & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,13 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}

// If the child has a fetch (limit) or skip (offset), pushing a filter
// below it would change semantics: the limit/offset should apply before
// the filter, not after.
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::Filter(child_filter) => {
let parents_predicates = split_conjunction_owned(filter.predicate);
Expand Down Expand Up @@ -4222,4 +4229,63 @@ mod tests {
"
)
}

#[test]
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
let scan = test_table_scan()?;
let scan_with_fetch = match scan {
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
fetch: Some(10),
..scan
}),
_ => unreachable!(),
};
let plan = LogicalPlanBuilder::from(scan_with_fetch)
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
TableScan: test, fetch=10
"
)
}

#[test]
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("a").sort(true, true)])?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter should be pushed below the sort
assert_optimized_plan_equal!(
plan,
@r"
Sort: test.a ASC NULLS FIRST
TableScan: test, full_filters=[test.a > Int64(10)]
"
)
}

#[test]
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed below the sort when it has a fetch (limit),
// because the limit should apply before the filter.
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
Sort: test.a ASC NULLS FIRST, fetch=5
TableScan: test
"
)
}
}
22 changes: 13 additions & 9 deletions datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::sync::Arc;
use datafusion_common::Result;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::Itertools;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterPushdownPhase {
Expand Down Expand Up @@ -359,6 +358,17 @@ impl ChildFilterDescription {
})
}

/// Mark all parent filters as unsupported for this child.
Copy link
Contributor Author

@hareshkh hareshkh Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This utility is present on branch-53/main but not here - it is referenced in the logic, so added this too.

pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
Self {
parent_filters: parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect(),
self_filters: vec![],
}
}

/// Add a self filter (from the current node) to be pushed down to this child.
pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
self.self_filters.push(filter);
Expand Down Expand Up @@ -434,15 +444,9 @@ impl FilterDescription {
children: &[&Arc<dyn crate::ExecutionPlan>],
) -> Self {
let mut desc = Self::new();
let child_filters = parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect_vec();
for _ in 0..children.len() {
desc = desc.with_child(ChildFilterDescription {
parent_filters: child_filters.clone(),
self_filters: vec![],
});
desc =
desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
}
desc
}
Expand Down
84 changes: 81 additions & 3 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,12 +1404,23 @@ impl ExecutionPlan for SortExec {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if !matches!(phase, FilterPushdownPhase::Post) {
if phase != FilterPushdownPhase::Post {
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
return FilterDescription::from_children(parent_filters, &self.children());
}

let mut child =
ChildFilterDescription::from_child(&parent_filters, self.input())?;
// In Post phase: block parent filters when fetch is set,
// but still push the TopK dynamic filter (self-filter).
let mut child = if self.fetch.is_some() {
ChildFilterDescription::all_unsupported(&parent_filters)
} else {
ChildFilterDescription::from_child(&parent_filters, self.input())?
};

if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
Expand All @@ -1430,8 +1441,10 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
Expand All @@ -1441,6 +1454,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::RecordBatchStream;
Expand Down Expand Up @@ -2705,4 +2719,68 @@ mod tests {

Ok(())
}

fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(schema));
SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(fetch)
}

#[test]
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Sort with fetch (TopK) must not allow filters to be pushed below it.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
Ok(())
}

#[test]
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(None);
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Plain sort (no fetch) is filter-commutative.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::Yes
));
Ok(())
}

#[test]
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
assert!(sort.filter.is_some(), "TopK filter should be created");

let mut config = ConfigOptions::new();
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![Arc::new(Column::new("a", 0))],
&config,
)?;
// Parent filters are still blocked in the Post phase.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
// But the TopK self-filter should be pushed down.
assert_eq!(desc.self_filters()[0].len(), 1);
Ok(())
}
}
39 changes: 39 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,45 @@ limit 1000;
statement ok
DROP TABLE test_limit_with_partitions;

# Tests for filter pushdown behavior with Sort + LIMIT (fetch).

statement ok
CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);

# Take the 3 smallest values (100, 200, 300), then filter value > 200.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
3 300

# Take the 3 largest values (500, 400, 300), then filter value < 400.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
----
3 300

# The filter stays above the sort+fetch in the plan.
query TT
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
logical_plan
01)SubqueryAlias: sub
02)--Filter: t.value > Int32(200)
03)----Sort: t.value ASC NULLS LAST, fetch=3
04)------TableScan: t projection=[id, value]
physical_plan
01)FilterExec: value@1 > 200
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t;

# Tear down src_table table:
statement ok
DROP TABLE src_table;
15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
02)--Sort: rn1 ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
02)--Filter: rn1 < UInt64(50)
03)----Sort: rn1 ASC NULLS LAST, fetch=5
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
physical_plan
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
01)FilterExec: rn1@5 < 50
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
03)----GlobalLimitExec: skip=0, fetch=5
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]

# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being
Expand Down
Loading