From b7a992b9d6ce8424904cfe6a614b30e7fae9722a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D1=8C=D0=B1=D0=B5=D1=80=D1=82=20=D0=A1=D0=BA?= =?UTF-8?q?=D0=B0=D0=BB=D1=8C=D1=82?= Date: Wed, 13 Aug 2025 17:13:23 +0300 Subject: [PATCH] push down filter: extend a projection if some pushed filters become unsupported Consider the next scenario: 1. `supports_filters_pushdown` returns `Exact` on some filter, e.g. "a = 1", where column "a" is not required by the query projection. 2. "a" is removed from the table provider projection by "optimize projection" rule. 3. `supports_filters_pushdown` changes a decision and returns `Inexact` on this filter the next time. For example, input filters were changed and it prefers to use a new one. 4. "a" is not returned to the table provider projection which leads to filter that references a column which is not a part of the schema. This patch fixes this issue introducing the next logic within a filter push-down rule: 1. Collect columns that are not used in the current table provider projection, but required for filter expressions. Call it `additional_projection`. 2. If `additional_projection` is empty -- leave all as it was before. 3. Otherwise extend a table provider projection and wrap a plan with an additional projection node to preserve schema used prior to this rule. --- datafusion/optimizer/src/push_down_filter.rs | 144 ++++++++++++++++--- datafusion/sql/src/parser.rs | 9 +- 2 files changed, 129 insertions(+), 24 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 3387bceda7642..608e671f81a67 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -28,15 +28,13 @@ use datafusion_common::{ JoinConstraint, Result, }; use datafusion_expr::expr_rewriter::replace_col; -use datafusion_expr::logical_plan::{ - CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union, -}; +use datafusion_expr::logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, Union}; use datafusion_expr::utils::{ conjunction, expr_to_columns, split_conjunction, split_conjunction_owned, }; use datafusion_expr::{ and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, - Projection, TableProviderFilterPushDown, + Projection, TableProviderFilterPushDown, TableScan, }; use crate::optimizer::ApplyOrder; @@ -897,23 +895,103 @@ impl OptimizerRule for PushDownFilter { .map(|(pred, _)| pred); let new_scan_filters: Vec = new_scan_filters.unique().cloned().collect(); + + let source_schema = scan.source.schema(); + let mut additional_projection = HashSet::new(); let new_predicate: Vec = zip - .filter(|(_, res)| res != &TableProviderFilterPushDown::Exact) + .filter(|(expr, res)| { + if *res == TableProviderFilterPushDown::Exact { + return false; + } + expr.apply(|expr| { + if let Expr::Column(column) = expr { + if let Ok(idx) = source_schema.index_of(column.name()) { + if scan + .projection + .as_ref() + .is_some_and(|p| !p.contains(&idx)) + { + additional_projection.insert(idx); + } + } + } + Ok(TreeNodeRecursion::Continue) + }) + .unwrap(); + true + }) .map(|(pred, _)| pred.clone()) .collect(); - let new_scan = LogicalPlan::TableScan(TableScan { - filters: new_scan_filters, - ..scan - }); - - Transformed::yes(new_scan).transform_data(|new_scan| { - if let Some(predicate) = conjunction(new_predicate) { - make_filter(predicate, Arc::new(new_scan)).map(Transformed::yes) + // Wraps with a filter if some filters are not supported exactly. + let filtered = move |plan| { + if let Some(new_predicate) = conjunction(new_predicate) { + Filter::try_new(new_predicate, Arc::new(plan)) + .map(LogicalPlan::Filter) } else { - Ok(Transformed::no(new_scan)) + Ok(plan) } - }) + }; + + if additional_projection.is_empty() { + // No additional projection is required. + let new_scan = LogicalPlan::TableScan(TableScan { + filters: new_scan_filters, + ..scan + }); + return filtered(new_scan).map(Transformed::yes); + } + + let scan_table_name = &scan.table_name; + let new_scan = filtered( + LogicalPlanBuilder::scan_with_filters_fetch( + scan_table_name.clone(), + Arc::clone(&scan.source), + scan.projection.clone().map(|mut projection| { + // Extend a projection. + projection.extend(additional_projection); + projection + }), + new_scan_filters, + scan.fetch, + )? + .build()?, + )?; + + // Project fields required by the initial projection. + let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema( + scan.projection + .as_ref() + .map(|projection| { + projection + .into_iter() + .cloned() + .map(|idx| { + Expr::Column(Column::new( + Some(scan_table_name.clone()), + source_schema.field(idx).name(), + )) + }) + .collect() + }) + .unwrap_or_else(|| { + source_schema + .fields() + .iter() + .map(|field| { + Expr::Column(Column::new( + Some(scan_table_name.clone()), + field.name(), + )) + }) + .collect() + }), + Arc::new(new_scan), + // Preserve a projected schema. + scan.projected_schema, + )?); + + Ok(Transformed::yes(new_plan)) } LogicalPlan::Extension(extension_plan) => { let prevent_cols = @@ -1206,8 +1284,8 @@ mod tests { use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ col, in_list, in_subquery, lit, ColumnarValue, Extension, ScalarUDF, - ScalarUDFImpl, Signature, TableSource, TableType, UserDefinedLogicalNodeCore, - Volatility, + ScalarUDFImpl, Signature, TableScan, TableSource, TableType, + UserDefinedLogicalNodeCore, Volatility, }; use crate::optimizer::Optimizer; @@ -2452,6 +2530,34 @@ mod tests { .build() } + #[test] + fn projection_is_updated_when_filter_becomes_unsupported() -> Result<()> { + let test_provider = PushDownProvider { + filter_support: TableProviderFilterPushDown::Unsupported, + }; + + let projeted_schema = test_provider.schema().project(&[0])?; + let table_scan = LogicalPlan::TableScan(TableScan { + table_name: "test".into(), + // Emulate that there were pushed filters but now + // provider cannot support it. + filters: vec![col("b").eq(lit(1i64))], + projected_schema: Arc::new(DFSchema::try_from(projeted_schema)?), + projection: Some(vec![0]), + source: Arc::new(test_provider), + fetch: None, + }); + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").eq(lit(1i64)))? + .build()?; + + let expected = "Projection: test.a\ + \n Filter: a = Int64(1) AND b = Int64(1)\ + \n TableScan: test projection=[a, b]"; + assert_optimized_plan_eq(plan, expected) + } + #[test] fn filter_with_table_provider_exact() -> Result<()> { let plan = table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?; @@ -2514,7 +2620,7 @@ mod tests { projected_schema: Arc::new(DFSchema::try_from( (*test_provider.schema()).clone(), )?), - projection: Some(vec![0]), + projection: Some(vec![0, 1]), source: Arc::new(test_provider), fetch: None, }); @@ -2526,7 +2632,7 @@ mod tests { let expected = "Projection: a, b\ \n Filter: a = Int64(10) AND b > Int64(11)\ - \n TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]"; + \n TableScan: test projection=[a, b], partial_filters=[a = Int64(10), b > Int64(11)]"; assert_optimized_plan_eq(plan, expected) } diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 97e719111a5bc..ec45b2d9486f1 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -279,15 +279,14 @@ impl<'a> DFParser<'a> { sql: &str, dialect: &'a dyn Dialect, ) -> Result { - let tokens = Tokenizer::new(dialect, sql).into_tokens().collect::>()?; + let tokens = Tokenizer::new(dialect, sql) + .into_tokens() + .collect::>()?; Ok(Self::from_dialect_and_tokens(dialect, tokens)) } /// Create a new parser from specified dialect and tokens. - pub fn from_dialect_and_tokens( - dialect: &'a dyn Dialect, - tokens: Vec, - ) -> Self { + pub fn from_dialect_and_tokens(dialect: &'a dyn Dialect, tokens: Vec) -> Self { let parser = Parser::new(dialect).with_tokens(tokens); DFParser { parser } }