From bb960687938b8e49386774372f5086cec444b780 Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Tue, 24 Mar 2026 20:21:18 +0000 Subject: [PATCH] Fix push_down_filter for children with non-empty fetch fields (#21057) - Closes https://github.com/apache/datafusion/issues/21063 Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour. I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In `push_down_limit.rs`, there's code that does this optimisation when a limit has a sort under it: ``` LogicalPlan::Sort(mut sort) => { let new_fetch = { let sort_fetch = skip + fetch; Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch)) }; if new_fetch == sort.fetch { if skip > 0 { original_limit(skip, fetch, LogicalPlan::Sort(sort)) } else { Ok(Transformed::yes(LogicalPlan::Sort(sort))) } } else { sort.fetch = new_fetch; limit.input = Arc::new(LogicalPlan::Sort(sort)); Ok(Transformed::yes(LogicalPlan::Limit(limit))) } } ``` The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it. There is also a related fix in `gather_filters_for_pushdown` in `SortExec`, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down. Added checks in the optimisation rule to avoid pushing filters past children with built-in limits. Yes: - Unit tests in `push_down_filter.rs` - Fixed an existing test in `window.slt` - Unit tests for the physical plan change in `sort.rs` - New slt test in `push_down_filter_sort_fetch.slt` for this exact behaviour No --------- Co-authored-by: Shiv Bhatia --- datafusion/expr/src/logical_plan/plan.rs | 76 +++++++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 66 +++++++++++++++ .../physical-plan/src/filter_pushdown.rs | 22 +++-- datafusion/physical-plan/src/sorts/sort.rs | 84 ++++++++++++++++++- datafusion/sqllogictest/test_files/limit.slt | 39 +++++++++ datafusion/sqllogictest/test_files/window.slt | 15 ++-- 6 files changed, 283 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 4219c24bfc9c9..b5193031a1782 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1388,6 +1388,82 @@ impl LogicalPlan { } } + /// Returns the skip (offset) of this plan node, if it has one. + /// + /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants + /// return `Ok(None)`. Returns `Ok(None)` for a zero skip. + pub fn skip(&self) -> Result> { + match self { + LogicalPlan::Limit(limit) => match limit.get_skip_type()? { + SkipType::Literal(0) => Ok(None), + SkipType::Literal(n) => Ok(Some(n)), + SkipType::UnsupportedExpr => Ok(None), + }, + LogicalPlan::Sort(_) => Ok(None), + LogicalPlan::TableScan(_) => Ok(None), + LogicalPlan::Projection(_) => Ok(None), + LogicalPlan::Filter(_) => Ok(None), + LogicalPlan::Window(_) => Ok(None), + LogicalPlan::Aggregate(_) => Ok(None), + LogicalPlan::Join(_) => Ok(None), + LogicalPlan::Repartition(_) => Ok(None), + LogicalPlan::Union(_) => Ok(None), + LogicalPlan::EmptyRelation(_) => Ok(None), + LogicalPlan::Subquery(_) => Ok(None), + LogicalPlan::SubqueryAlias(_) => Ok(None), + LogicalPlan::Statement(_) => Ok(None), + LogicalPlan::Values(_) => Ok(None), + LogicalPlan::Explain(_) => Ok(None), + LogicalPlan::Analyze(_) => Ok(None), + LogicalPlan::Extension(_) => Ok(None), + LogicalPlan::Distinct(_) => Ok(None), + LogicalPlan::Dml(_) => Ok(None), + LogicalPlan::Ddl(_) => Ok(None), + LogicalPlan::Copy(_) => Ok(None), + LogicalPlan::DescribeTable(_) => Ok(None), + LogicalPlan::Unnest(_) => Ok(None), + LogicalPlan::RecursiveQuery(_) => Ok(None), + } + } + + /// Returns the fetch (limit) of this plan node, if it has one. + /// + /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and + /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants + /// return `Ok(None)`. + pub fn fetch(&self) -> Result> { + match self { + LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch), + LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch), + LogicalPlan::Limit(limit) => match limit.get_fetch_type()? { + FetchType::Literal(s) => Ok(s), + FetchType::UnsupportedExpr => Ok(None), + }, + LogicalPlan::Projection(_) => Ok(None), + LogicalPlan::Filter(_) => Ok(None), + LogicalPlan::Window(_) => Ok(None), + LogicalPlan::Aggregate(_) => Ok(None), + LogicalPlan::Join(_) => Ok(None), + LogicalPlan::Repartition(_) => Ok(None), + LogicalPlan::Union(_) => Ok(None), + LogicalPlan::EmptyRelation(_) => Ok(None), + LogicalPlan::Subquery(_) => Ok(None), + LogicalPlan::SubqueryAlias(_) => Ok(None), + LogicalPlan::Statement(_) => Ok(None), + LogicalPlan::Values(_) => Ok(None), + LogicalPlan::Explain(_) => Ok(None), + LogicalPlan::Analyze(_) => Ok(None), + LogicalPlan::Extension(_) => Ok(None), + LogicalPlan::Distinct(_) => Ok(None), + LogicalPlan::Dml(_) => Ok(None), + LogicalPlan::Ddl(_) => Ok(None), + LogicalPlan::Copy(_) => Ok(None), + LogicalPlan::DescribeTable(_) => Ok(None), + LogicalPlan::Unnest(_) => Ok(None), + LogicalPlan::RecursiveQuery(_) => Ok(None), + } + } + /// If this node's expressions contains any references to an outer subquery pub fn contains_outer_reference(&self) -> bool { let mut contains = false; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 755ffdbafc869..da4d457913ed2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -791,6 +791,13 @@ impl OptimizerRule for PushDownFilter { filter.predicate = new_predicate; } + // If the child has a fetch (limit) or skip (offset), pushing a filter + // below it would change semantics: the limit/offset should apply before + // the filter, not after. + if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); + } + match Arc::unwrap_or_clone(filter.input) { LogicalPlan::Filter(child_filter) => { let parents_predicates = split_conjunction_owned(filter.predicate); @@ -4222,4 +4229,63 @@ mod tests { " ) } + + #[test] + fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> { + let scan = test_table_scan()?; + let scan_with_fetch = match scan { + LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan { + fetch: Some(10), + ..scan + }), + _ => unreachable!(), + }; + let plan = LogicalPlanBuilder::from(scan_with_fetch) + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter must NOT be pushed into the table scan when it has a fetch (limit) + assert_optimized_plan_equal!( + plan, + @r" + Filter: test.a > Int64(10) + TableScan: test, fetch=10 + " + ) + } + + #[test] + fn filter_push_down_through_sort_without_fetch() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, true)])? + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter should be pushed below the sort + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS FIRST + TableScan: test, full_filters=[test.a > Int64(10)] + " + ) + } + + #[test] + fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort_with_limit(vec![col("a").sort(true, true)], Some(5))? + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter must NOT be pushed below the sort when it has a fetch (limit), + // because the limit should apply before the filter. + assert_optimized_plan_equal!( + plan, + @r" + Filter: test.a > Int64(10) + Sort: test.a ASC NULLS FIRST, fetch=5 + TableScan: test + " + ) + } } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 1274e954eaeb3..ea0069d3b75be 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -40,7 +40,6 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use itertools::Itertools; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushdownPhase { @@ -359,6 +358,17 @@ impl ChildFilterDescription { }) } + /// Mark all parent filters as unsupported for this child. + pub fn all_unsupported(parent_filters: &[Arc]) -> Self { + Self { + parent_filters: parent_filters + .iter() + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) + .collect(), + self_filters: vec![], + } + } + /// Add a self filter (from the current node) to be pushed down to this child. pub fn with_self_filter(mut self, filter: Arc) -> Self { self.self_filters.push(filter); @@ -434,15 +444,9 @@ impl FilterDescription { children: &[&Arc], ) -> Self { let mut desc = Self::new(); - let child_filters = parent_filters - .iter() - .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) - .collect_vec(); for _ in 0..children.len() { - desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters.clone(), - self_filters: vec![], - }); + desc = + desc.with_child(ChildFilterDescription::all_unsupported(parent_filters)); } desc } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 475738cca3f05..30a0e301823f7 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1404,12 +1404,23 @@ impl ExecutionPlan for SortExec { parent_filters: Vec>, config: &datafusion_common::config::ConfigOptions, ) -> Result { - if !matches!(phase, FilterPushdownPhase::Post) { + if phase != FilterPushdownPhase::Post { + if self.fetch.is_some() { + return Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )); + } return FilterDescription::from_children(parent_filters, &self.children()); } - let mut child = - ChildFilterDescription::from_child(&parent_filters, self.input())?; + // In Post phase: block parent filters when fetch is set, + // but still push the TopK dynamic filter (self-filter). + let mut child = if self.fetch.is_some() { + ChildFilterDescription::all_unsupported(&parent_filters) + } else { + ChildFilterDescription::from_child(&parent_filters, self.input())? + }; if let Some(filter) = &self.filter && config.optimizer.enable_topk_dynamic_filter_pushdown @@ -1430,8 +1441,10 @@ mod tests { use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::collect; + use crate::empty::EmptyExec; use crate::execution_plan::Boundedness; use crate::expressions::col; + use crate::filter_pushdown::{FilterPushdownPhase, PushedDown}; use crate::test; use crate::test::TestMemoryExec; use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero}; @@ -1441,6 +1454,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; + use datafusion_common::config::ConfigOptions; use datafusion_common::test_util::batches_to_string; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_execution::RecordBatchStream; @@ -2705,4 +2719,68 @@ mod tests { Ok(()) } + + fn make_sort_exec_with_fetch(fetch: Option) -> SortExec { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let input = Arc::new(EmptyExec::new(schema)); + SortExec::new( + [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(), + input, + ) + .with_fetch(fetch) + } + + #[test] + fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> { + let sort = make_sort_exec_with_fetch(Some(10)); + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Pre, + vec![Arc::new(Column::new("a", 0))], + &ConfigOptions::new(), + )?; + // Sort with fetch (TopK) must not allow filters to be pushed below it. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::No + )); + Ok(()) + } + + #[test] + fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> { + let sort = make_sort_exec_with_fetch(None); + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Pre, + vec![Arc::new(Column::new("a", 0))], + &ConfigOptions::new(), + )?; + // Plain sort (no fetch) is filter-commutative. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::Yes + )); + Ok(()) + } + + #[test] + fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> { + let sort = make_sort_exec_with_fetch(Some(10)); + assert!(sort.filter.is_some(), "TopK filter should be created"); + + let mut config = ConfigOptions::new(); + config.optimizer.enable_topk_dynamic_filter_pushdown = true; + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![Arc::new(Column::new("a", 0))], + &config, + )?; + // Parent filters are still blocked in the Post phase. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::No + )); + // But the TopK self-filter should be pushed down. + assert_eq!(desc.self_filters()[0].len(), 1); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 96471411e0f95..8fb0edde79e25 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -869,6 +869,45 @@ limit 1000; statement ok DROP TABLE test_limit_with_partitions; +# Tests for filter pushdown behavior with Sort + LIMIT (fetch). + +statement ok +CREATE TABLE t(id INT, value INT) AS VALUES +(1, 100), +(2, 200), +(3, 300), +(4, 400), +(5, 500); + +# Take the 3 smallest values (100, 200, 300), then filter value > 200. +query II +SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; +---- +3 300 + +# Take the 3 largest values (500, 400, 300), then filter value < 400. +query II +SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400; +---- +3 300 + +# The filter stays above the sort+fetch in the plan. +query TT +EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; +---- +logical_plan +01)SubqueryAlias: sub +02)--Filter: t.value > Int32(200) +03)----Sort: t.value ASC NULLS LAST, fetch=3 +04)------TableScan: t projection=[id, value] +physical_plan +01)FilterExec: value@1 > 200 +02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t; + # Tear down src_table table: statement ok DROP TABLE src_table; \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 8ac8724683a8a..7feeb210f1608 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1 ---- logical_plan 01)Sort: rn1 ASC NULLS LAST -02)--Sort: rn1 ASC NULLS LAST, fetch=5 -03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 -04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50) +02)--Filter: rn1 < UInt64(50) +03)----Sort: rn1 ASC NULLS LAST, fetch=5 +04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] physical_plan -01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] -02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5 -03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] +01)FilterExec: rn1@5 < 50 +02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] +03)----GlobalLimitExec: skip=0, fetch=5 +04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] # Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required # global order. The existing sort is for the second-term lexicographical ordering requirement, which is being