From fed07d511af60352cef32beb1c7ca81c5d440c7c Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 20 Mar 2026 19:55:37 +0100 Subject: [PATCH 1/2] Add ExpressionAnalyzer for pluggable expression-level statistics Introduce ExpressionAnalyzer, a chain-of-responsibility framework for expression-level statistics estimation (NDV, selectivity, min/max). Framework: - ExpressionAnalyzer trait with registry parameter for chain delegation - ExpressionAnalyzerRegistry to chain analyzers (first Computed wins) - DefaultExpressionAnalyzer: Selinger-style estimation for columns, literals, binary expressions, NOT, boolean predicates Integration: - ExpressionAnalyzerRegistry stored in SessionState, initialized once - ProjectionExprs stores optional registry (non-breaking, no signature changes to project_statistics) - ProjectionExec sets registry via Projector, injected by planner - FilterExec uses registry for selectivity when interval analysis cannot handle the predicate - Custom nodes get builtin analyzer as fallback when registry is absent --- datafusion/common/src/config.rs | 7 + .../core/src/execution/session_state.rs | 38 ++ datafusion/core/src/physical_planner.rs | 57 ++- .../src/expression_analyzer/default.rs | 269 +++++++++++++++ .../src/expression_analyzer/mod.rs | 274 +++++++++++++++ .../src/expression_analyzer/tests.rs | 326 ++++++++++++++++++ datafusion/physical-expr/src/lib.rs | 1 + datafusion/physical-expr/src/projection.rs | 117 ++++++- datafusion/physical-plan/src/filter.rs | 43 ++- datafusion/physical-plan/src/projection.rs | 12 + 10 files changed, 1131 insertions(+), 13 deletions(-) create mode 100644 datafusion/physical-expr/src/expression_analyzer/default.rs create mode 100644 datafusion/physical-expr/src/expression_analyzer/mod.rs create mode 100644 datafusion/physical-expr/src/expression_analyzer/tests.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d5fbfc50a21..b3014e9dc6ab3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -963,6 +963,13 @@ config_namespace! { /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the physical planner will use the ExpressionAnalyzer + /// framework for expression-level statistics estimation (NDV, selectivity, + /// min/max, null fraction) in projections and filters. When false, projections + /// return unknown statistics for non-column expressions and filters use the + /// default selectivity heuristic. + pub enable_expression_analyzer: bool, default = false + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 347c8eb3d25a4..d7e9997a9fae5 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -64,6 +64,7 @@ use datafusion_optimizer::{ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, }; use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; @@ -184,6 +185,8 @@ pub struct SessionState { /// /// It will be invoked on `CREATE FUNCTION` statements. /// thus, changing dialect o PostgreSql is required + /// Registry for expression-level statistics analyzers (NDV, selectivity, etc.) + expression_analyzer_registry: Arc, function_factory: Option>, cache_factory: Option>, /// Cache logical plans of prepared statements for later execution. @@ -202,6 +205,10 @@ impl Debug for SessionState { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) @@ -909,6 +916,11 @@ impl SessionState { &self.serializer_registry } + /// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn expression_analyzer_registry(&self) -> &Arc { + &self.expression_analyzer_registry + } + /// Return version of the cargo package that produced this query pub fn version(&self) -> &str { env!("CARGO_PKG_VERSION") @@ -988,6 +1000,7 @@ pub struct SessionStateBuilder { aggregate_functions: Option>>, window_functions: Option>>, serializer_registry: Option>, + expression_analyzer_registry: Option>, file_formats: Option>>, config: Option, table_options: Option, @@ -1028,6 +1041,7 @@ impl SessionStateBuilder { aggregate_functions: None, window_functions: None, serializer_registry: None, + expression_analyzer_registry: None, file_formats: None, table_options: None, config: None, @@ -1083,6 +1097,7 @@ impl SessionStateBuilder { ), window_functions: Some(existing.window_functions.into_values().collect_vec()), serializer_registry: Some(existing.serializer_registry), + expression_analyzer_registry: Some(existing.expression_analyzer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), config: Some(new_config), table_options: Some(existing.table_options), @@ -1326,6 +1341,15 @@ impl SessionStateBuilder { self } + /// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn with_expression_analyzer_registry( + mut self, + expression_analyzer_registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(expression_analyzer_registry); + self + } + /// Set the map of [`FileFormatFactory`]s pub fn with_file_formats( mut self, @@ -1456,6 +1480,7 @@ impl SessionStateBuilder { aggregate_functions, window_functions, serializer_registry, + expression_analyzer_registry, file_formats, table_options, config, @@ -1493,6 +1518,8 @@ impl SessionStateBuilder { window_functions: HashMap::new(), serializer_registry: serializer_registry .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)), + expression_analyzer_registry: expression_analyzer_registry + .unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())), file_formats: HashMap::new(), table_options: table_options.unwrap_or_else(|| { TableOptions::default_from_session_config(config.options()) @@ -1675,6 +1702,13 @@ impl SessionStateBuilder { &mut self.serializer_registry } + /// Returns the current expression_analyzer_registry value + pub fn expression_analyzer_registry( + &mut self, + ) -> &mut Option> { + &mut self.expression_analyzer_registry + } + /// Returns the current file_formats value pub fn file_formats(&mut self) -> &mut Option>> { &mut self.file_formats @@ -1750,6 +1784,10 @@ impl Debug for SessionStateBuilder { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b4fb44f670e8d..2b7596714b12f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1106,12 +1106,23 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExecBuilder::new( + let builder = FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), physical_input, ) - .with_batch_size(session_state.config().batch_size()) - .build()? + .with_batch_size(session_state.config().batch_size()); + let builder = if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + builder.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )) + } else { + builder + }; + builder.build()? } PlanAsyncExpr::Async( async_map, @@ -1121,7 +1132,7 @@ impl DefaultPhysicalPlanner { async_map.async_exprs, physical_input, )?; - FilterExecBuilder::new( + let builder = FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), Arc::new(async_exec), ) @@ -1130,8 +1141,19 @@ impl DefaultPhysicalPlanner { .apply_projection(Some( (0..input.schema().fields().len()).collect::>(), ))? - .with_batch_size(session_state.config().batch_size()) - .build()? + .with_batch_size(session_state.config().batch_size()); + let builder = if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + builder.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )) + } else { + builder + }; + builder.build()? } _ => { return internal_err!( @@ -2892,7 +2914,17 @@ impl DefaultPhysicalPlanner { .into_iter() .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); - Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?)) + let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?; + if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + proj_exec = proj_exec.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )); + } + Ok(Arc::new(proj_exec)) } PlanAsyncExpr::Async( async_map, @@ -2904,8 +2936,17 @@ impl DefaultPhysicalPlanner { .into_iter() .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); - let new_proj_exec = + let mut new_proj_exec = ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?; + if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + new_proj_exec = new_proj_exec.with_expression_analyzer_registry( + Arc::clone(session_state.expression_analyzer_registry()), + ); + } Ok(Arc::new(new_proj_exec)) } _ => internal_err!("Unexpected PlanAsyncExpressions variant"), diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs new file mode 100644 index 0000000000000..29b651a493560 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Default expression analyzer with Selinger-style estimation. + +use std::sync::Arc; + +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; + +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; + +use super::{AnalysisResult, ExpressionAnalyzer, ExpressionAnalyzerRegistry}; + +/// Default expression analyzer with Selinger-style estimation. +/// +/// Handles common expression types: +/// - Column references (uses column statistics) +/// - Binary expressions (AND, OR, comparison operators) +/// - Literals (constant selectivity/NDV) +/// - NOT expressions (1 - child selectivity) +#[derive(Debug, Default, Clone)] +pub struct DefaultExpressionAnalyzer; + +impl DefaultExpressionAnalyzer { + /// Get column index from a Column expression + fn get_column_index(expr: &Arc) -> Option { + expr.as_any().downcast_ref::().map(|c| c.index()) + } + + /// Get column statistics for an expression if it's a column reference + fn get_column_stats<'a>( + expr: &Arc, + input_stats: &'a Statistics, + ) -> Option<&'a ColumnStatistics> { + Self::get_column_index(expr) + .and_then(|idx| input_stats.column_statistics.get(idx)) + } + + /// Recursive selectivity estimation through the registry chain + fn estimate_selectivity_recursive( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> f64 { + registry.get_selectivity(expr, input_stats).unwrap_or(0.5) + } +} + +impl ExpressionAnalyzer for DefaultExpressionAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Binary expressions: AND, OR, comparisons + if let Some(binary) = expr.as_any().downcast_ref::() { + let left_sel = + self.estimate_selectivity_recursive(binary.left(), input_stats, registry); + let right_sel = self.estimate_selectivity_recursive( + binary.right(), + input_stats, + registry, + ); + + let sel = match binary.op() { + // Logical operators + Operator::And => left_sel * right_sel, + Operator::Or => left_sel + right_sel - (left_sel * right_sel), + + // Equality: selectivity = 1/NDV + Operator::Eq => { + let ndv = Self::get_column_stats(binary.left(), input_stats) + .or_else(|| Self::get_column_stats(binary.right(), input_stats)) + .and_then(|s| s.distinct_count.get_value()) + .filter(|&&ndv| ndv > 0); + if let Some(ndv) = ndv { + return AnalysisResult::Computed(1.0 / (*ndv as f64)); + } + 0.1 // Default equality selectivity + } + + // Inequality: selectivity = 1 - 1/NDV + Operator::NotEq => { + let ndv = Self::get_column_stats(binary.left(), input_stats) + .or_else(|| Self::get_column_stats(binary.right(), input_stats)) + .and_then(|s| s.distinct_count.get_value()) + .filter(|&&ndv| ndv > 0); + if let Some(ndv) = ndv { + return AnalysisResult::Computed(1.0 - (1.0 / (*ndv as f64))); + } + 0.9 + } + + // Range predicates: classic 1/3 estimate + Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => 0.33, + + // LIKE: depends on pattern, use conservative estimate + Operator::LikeMatch | Operator::ILikeMatch => 0.25, + Operator::NotLikeMatch | Operator::NotILikeMatch => 0.75, + + // Other operators: default + _ => 0.5, + }; + + return AnalysisResult::Computed(sel); + } + + // NOT expression: 1 - child selectivity + if let Some(not_expr) = expr.as_any().downcast_ref::() { + let child_sel = self.estimate_selectivity_recursive( + not_expr.arg(), + input_stats, + registry, + ); + return AnalysisResult::Computed(1.0 - child_sel); + } + + // Literal boolean: 0.0 or 1.0 + if let Some(b) = expr + .as_any() + .downcast_ref::() + .and_then(|lit| match lit.value() { + ScalarValue::Boolean(Some(b)) => Some(*b), + _ => None, + }) + { + return AnalysisResult::Computed(if b { 1.0 } else { 0.0 }); + } + + // Column reference as predicate (boolean column) + if expr.as_any().downcast_ref::().is_some() { + return AnalysisResult::Computed(0.5); + } + + AnalysisResult::Delegate + } + + fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: use column NDV + if let Some(ndv) = Self::get_column_stats(expr, input_stats) + .and_then(|col_stats| col_stats.distinct_count.get_value().copied()) + { + return AnalysisResult::Computed(ndv); + } + + // Literal: NDV = 1 + if expr.as_any().downcast_ref::().is_some() { + return AnalysisResult::Computed(1); + } + + // BinaryExpr: for arithmetic with a literal operand, treat as injective + // (preserves NDV). This is an approximation: col * 0 or col % 1 are + // technically not injective, but the common case (col + 1, col * 2, etc.) is + if let Some(binary) = expr.as_any().downcast_ref::() { + let is_arithmetic = matches!( + binary.op(), + Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + ); + + if is_arithmetic { + // If one side is a literal, the operation is injective on the other side + let left_is_literal = binary.left().as_any().is::(); + let right_is_literal = binary.right().as_any().is::(); + + if left_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.right(), input_stats) + { + return AnalysisResult::Computed(ndv); + } else if right_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.left(), input_stats) + { + return AnalysisResult::Computed(ndv); + } + // Both sides are non-literals: could combine, but delegate for now + } + } + + AnalysisResult::Delegate + } + + fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + // Column reference: use column min/max + if let Some((min, max)) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + match ( + col_stats.min_value.get_value(), + col_stats.max_value.get_value(), + ) { + (Some(min), Some(max)) => Some((min.clone(), max.clone())), + _ => None, + } + }) + { + return AnalysisResult::Computed((min, max)); + } + + // Literal: min = max = value + if let Some(lit_expr) = expr.as_any().downcast_ref::() { + let val = lit_expr.value().clone(); + return AnalysisResult::Computed((val.clone(), val)); + } + + AnalysisResult::Delegate + } + + fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: null_count / num_rows + if let Some(fraction) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + let null_count = col_stats.null_count.get_value().copied()?; + let num_rows = input_stats.num_rows.get_value().copied()?; + if num_rows > 0 { + Some(null_count as f64 / num_rows as f64) + } else { + None + } + }) + { + return AnalysisResult::Computed(fraction); + } + + // Literal: null fraction depends on whether it's null + if let Some(lit_expr) = expr.as_any().downcast_ref::() { + let is_null = lit_expr.value().is_null(); + return AnalysisResult::Computed(if is_null { 1.0 } else { 0.0 }); + } + + AnalysisResult::Delegate + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/mod.rs b/datafusion/physical-expr/src/expression_analyzer/mod.rs new file mode 100644 index 0000000000000..0103342134c4d --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/mod.rs @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable expression-level statistics analysis. +//! +//! This module provides an extensible mechanism for computing expression-level +//! statistics metadata (selectivity, NDV, min/max bounds) following the chain +//! of responsibility pattern. +//! +//! # Overview +//! +//! Different expressions have different statistical properties: +//! +//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV +//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV +//! - **Monotonic functions**: allow min/max bound propagation +//! - **Constants**: NDV = 1, selectivity depends on value +//! +//! The default implementation uses classic Selinger-style estimation. Users can +//! register custom [`ExpressionAnalyzer`] implementations to: +//! +//! 1. Provide statistics for custom UDFs +//! 2. Override default estimation with domain-specific knowledge +//! 3. Plug in advanced approaches (e.g., histogram-based estimation) +//! +//! # Example +//! +//! ```ignore +//! use datafusion_physical_plan::expression_analyzer::*; +//! +//! // Create registry with default analyzer +//! let mut registry = ExpressionAnalyzerRegistry::new(); +//! +//! // Register custom analyzer (higher priority) +//! registry.register(Arc::new(MyCustomAnalyzer)); +//! +//! // Query expression statistics +//! let selectivity = registry.get_selectivity(&predicate, &input_stats); +//! ``` + +mod default; + +#[cfg(test)] +mod tests; + +pub use default::DefaultExpressionAnalyzer; + +use std::fmt::Debug; +use std::sync::Arc; + +use datafusion_common::{ScalarValue, Statistics}; + +use crate::PhysicalExpr; + +/// Result of expression analysis - either computed or delegate to next analyzer. +#[derive(Debug, Clone)] +pub enum AnalysisResult { + /// Analysis was performed, here's the result + Computed(T), + /// This analyzer doesn't handle this expression; delegate to next + Delegate, +} + +/// Expression-level metadata analysis. +/// +/// Implementations can handle specific expression types or provide domain +/// knowledge for custom UDFs. The chain of analyzers is traversed until one +/// returns [`AnalysisResult::Computed`]. +/// +/// The `registry` parameter allows analyzers to delegate sub-expression +/// analysis back through the full chain, rather than hard-coding a specific +/// analyzer. For example, a function analyzer can ask the registry for the +/// NDV of its input argument, which will traverse the full chain (including +/// any custom analyzers the user registered). +/// +/// # Implementing a Custom Analyzer +/// +/// ```ignore +/// #[derive(Debug)] +/// struct MyUdfAnalyzer; +/// +/// impl ExpressionAnalyzer for MyUdfAnalyzer { +/// fn get_selectivity( +/// &self, +/// expr: &Arc, +/// input_stats: &Statistics, +/// registry: &ExpressionAnalyzerRegistry, +/// ) -> AnalysisResult { +/// // Recognize my custom is_valid_email() UDF +/// if is_my_email_validator(expr) { +/// return AnalysisResult::Computed(0.8); // ~80% valid +/// } +/// AnalysisResult::Delegate +/// } +/// } +/// ``` +pub trait ExpressionAnalyzer: Debug + Send + Sync { + /// Estimate selectivity when this expression is used as a predicate. + /// + /// Returns a value in [0.0, 1.0] representing the fraction of rows + /// that satisfy the predicate. + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate the number of distinct values in the expression's output. + /// + /// Properties: + /// - Injective functions preserve input NDV + /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR) + /// - Constants have NDV = 1 + fn get_distinct_count( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate min/max bounds of the expression's output. + /// + /// Monotonic functions can transform input bounds: + /// - Increasing: (f(min), f(max)) + /// - Decreasing: (f(max), f(min)) + /// - Non-monotonic: may need wider bounds or return Delegate + fn get_min_max( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + AnalysisResult::Delegate + } + + /// Estimate the fraction of null values in the expression's output. + /// + /// Returns a value in [0.0, 1.0]. + fn get_null_fraction( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } +} + +/// Registry that chains [`ExpressionAnalyzer`] implementations. +/// +/// Analyzers are tried in order; the first to return [`AnalysisResult::Computed`] +/// wins. Register domain-specific analyzers before the default for override. +#[derive(Debug, Clone)] +pub struct ExpressionAnalyzerRegistry { + analyzers: Vec>, +} + +impl Default for ExpressionAnalyzerRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ExpressionAnalyzerRegistry { + /// Create a new registry with the [`DefaultExpressionAnalyzer`]. + pub fn new() -> Self { + Self { + analyzers: vec![Arc::new(DefaultExpressionAnalyzer)], + } + } + + /// Create a registry with only the given analyzers (no builtins). + pub fn with_analyzers(analyzers: Vec>) -> Self { + Self { analyzers } + } + + /// Create a registry with custom analyzers followed by the + /// [`DefaultExpressionAnalyzer`] as fallback. + pub fn with_analyzers_and_default( + analyzers: impl IntoIterator>, + ) -> Self { + let mut all: Vec> = analyzers.into_iter().collect(); + all.push(Arc::new(DefaultExpressionAnalyzer)); + Self { analyzers: all } + } + + /// Register an analyzer at the front of the chain (higher priority). + pub fn register(&mut self, analyzer: Arc) { + self.analyzers.insert(0, analyzer); + } + + /// Get selectivity through the analyzer chain. + pub fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(sel) = + analyzer.get_selectivity(expr, input_stats, self) + { + return Some(sel); + } + } + None + } + + /// Get distinct count through the analyzer chain. + pub fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(ndv) = + analyzer.get_distinct_count(expr, input_stats, self) + { + return Some(ndv); + } + } + None + } + + /// Get min/max bounds through the analyzer chain. + pub fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option<(ScalarValue, ScalarValue)> { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(bounds) = + analyzer.get_min_max(expr, input_stats, self) + { + return Some(bounds); + } + } + None + } + + /// Get null fraction through the analyzer chain. + pub fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(frac) = + analyzer.get_null_fraction(expr, input_stats, self) + { + return Some(frac); + } + } + None + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs new file mode 100644 index 0000000000000..703953a211bf1 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; +use std::sync::Arc; + +fn make_stats_with_ndv(num_rows: usize, ndv: usize) -> Statistics { + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Exact(ndv), + byte_size: Precision::Absent, + }], + } +} + +// NDV tests + +#[test] +fn test_column_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} + +#[test] +fn test_literal_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&lit, &stats), Some(1)); +} + +#[test] +fn test_arithmetic_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + // col + 1: injective, preserves NDV + let plus = Arc::new(BinaryExpr::new( + Arc::clone(&col), + Operator::Plus, + Arc::clone(&lit), + )) as Arc; + assert_eq!(registry.get_distinct_count(&plus, &stats), Some(100)); + + // 1 + col: also injective (literal on left) + let plus_rev = + Arc::new(BinaryExpr::new(lit, Operator::Plus, col)) as Arc; + assert_eq!(registry.get_distinct_count(&plus_rev, &stats), Some(100)); +} + +// Selectivity tests + +#[test] +fn test_equality_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); // 1/NDV = 1/100 +} + +#[test] +fn test_equality_selectivity_column_on_right() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(lit, Operator::Eq, col)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); +} + +#[test] +fn test_and_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(Arc::clone(&col), Operator::Eq, lit1)) + as Arc; + let gt = Arc::new(BinaryExpr::new(col, Operator::Gt, lit2)) as Arc; + let and_expr = + Arc::new(BinaryExpr::new(eq, Operator::And, gt)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&and_expr, &stats).unwrap(); + assert!((sel - 0.0033).abs() < 0.001); // 0.01 * 0.33 +} + +#[test] +fn test_or_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(Arc::clone(&col), Operator::Eq, lit1)) + as Arc; + let gt = Arc::new(BinaryExpr::new(col, Operator::Gt, lit2)) as Arc; + let or_expr = + Arc::new(BinaryExpr::new(eq, Operator::Or, gt)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&or_expr, &stats).unwrap(); + assert!((sel - 0.3367).abs() < 0.001); // 0.01 + 0.33 - 0.01*0.33 +} + +#[test] +fn test_not_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + let not_expr = Arc::new(NotExpr::new(eq)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(¬_expr, &stats).unwrap(); + assert!((sel - 0.99).abs() < 0.001); // 1 - 0.01 +} + +// Min/max tests + +#[test] +fn test_column_min_max() { + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Absent, + null_count: Precision::Exact(0), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&col, &stats), + Some((ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))) + ); +} + +#[test] +fn test_literal_min_max() { + let stats = make_stats_with_ndv(100, 10); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&lit, &stats), + Some((ScalarValue::Int32(Some(42)), ScalarValue::Int32(Some(42)))) + ); +} + +// Null fraction tests + +#[test] +fn test_column_null_fraction() { + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(250), + min_value: Precision::Absent, + max_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + let frac = registry.get_null_fraction(&col, &stats).unwrap(); + assert!((frac - 0.25).abs() < 0.001); +} + +#[test] +fn test_literal_null_fraction() { + let stats = make_stats_with_ndv(100, 10); + let registry = ExpressionAnalyzerRegistry::new(); + + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + assert_eq!(registry.get_null_fraction(&lit, &stats), Some(0.0)); + + let null_lit = + Arc::new(Literal::new(ScalarValue::Int32(None))) as Arc; + assert_eq!(registry.get_null_fraction(&null_lit, &stats), Some(1.0)); +} + +// Custom analyzer tests + +#[derive(Debug)] +struct FixedSelectivityAnalyzer(f64); + +impl ExpressionAnalyzer for FixedSelectivityAnalyzer { + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Computed(self.0) + } +} + +#[test] +fn test_custom_analyzer_overrides_default() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(FixedSelectivityAnalyzer(0.42))); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.42).abs() < 0.001); +} + +#[derive(Debug)] +struct ColumnAOnlyAnalyzer; + +impl ExpressionAnalyzer for ColumnAOnlyAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + if let Some(binary) = expr.as_any().downcast_ref::() + && let Some(col) = binary.left().as_any().downcast_ref::() + && col.name() == "a" + && matches!(binary.op(), Operator::Eq) + { + return AnalysisResult::Computed(0.99); + } + AnalysisResult::Delegate + } +} + +#[test] +fn test_custom_analyzer_delegates_to_default() { + let stats = make_stats_with_ndv(1000, 100); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq_a = Arc::new(BinaryExpr::new(col_a, Operator::Eq, Arc::clone(&lit))) + as Arc; + let eq_b = + Arc::new(BinaryExpr::new(col_b, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(ColumnAOnlyAnalyzer)); + + let sel_a = registry.get_selectivity(&eq_a, &stats).unwrap(); + assert!((sel_a - 0.99).abs() < 0.001); + + let sel_b = registry.get_selectivity(&eq_b, &stats).unwrap(); + assert!((sel_b - 0.01).abs() < 0.001); +} + +#[test] +fn test_with_analyzers_and_default() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + + let registry = + ExpressionAnalyzerRegistry::with_analyzers_and_default(vec![Arc::new( + ColumnAOnlyAnalyzer, + ) + as Arc]); + + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index bedd348dab92f..0e5f3945bf571 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -33,6 +33,7 @@ pub mod binary_map { } pub mod async_scalar_function; pub mod equivalence; +pub mod expression_analyzer; pub mod expressions; pub mod intervals; mod partitioning; diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index b9f98c03da195..17739d2c87678 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -21,6 +21,7 @@ use std::ops::Deref; use std::sync::Arc; use crate::PhysicalExpr; +use crate::expression_analyzer::ExpressionAnalyzerRegistry; use crate::expressions::{Column, Literal}; use crate::utils::collect_columns; @@ -124,12 +125,22 @@ impl From for (Arc, String) { /// /// See [`ProjectionExprs::from_indices`] to select a subset of columns by /// indices. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct ProjectionExprs { /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. exprs: Arc<[ProjectionExpr]>, + /// Optional expression analyzer registry for statistics estimation + expression_analyzer_registry: Option>, } +impl PartialEq for ProjectionExprs { + fn eq(&self, other: &Self) -> bool { + self.exprs == other.exprs + } +} + +impl Eq for ProjectionExprs {} + impl std::fmt::Display for ProjectionExprs { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let exprs: Vec = self.exprs.iter().map(|e| e.to_string()).collect(); @@ -141,6 +152,7 @@ impl From> for ProjectionExprs { fn from(value: Vec) -> Self { Self { exprs: value.into(), + expression_analyzer_registry: None, } } } @@ -149,6 +161,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { exprs: value.iter().cloned().collect(), + expression_analyzer_registry: None, } } } @@ -157,6 +170,7 @@ impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } } @@ -172,6 +186,7 @@ impl ProjectionExprs { pub fn new(exprs: impl IntoIterator) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } @@ -179,9 +194,26 @@ impl ProjectionExprs { pub fn from_expressions(exprs: impl Into>) -> Self { Self { exprs: exprs.into(), + expression_analyzer_registry: None, } } + /// Set the expression analyzer registry for statistics estimation. + /// + /// The physical planner injects the registry from [`SessionState`] when + /// creating projections. Projections created later by optimizer rules + /// do not receive the registry and fall back to + /// [`DefaultExpressionAnalyzer`]. Propagating the registry to all + /// operator construction sites requires an operator-level statistics + /// registry, which is orthogonal to this work. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(registry); + self + } + /// Creates a [`ProjectionExpr`] from a list of column indices. /// /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column @@ -710,9 +742,35 @@ impl ProjectionExprs { byte_size, } } + } else if let Some(registry) = &self.expression_analyzer_registry { + // Use ExpressionAnalyzer to estimate statistics for arbitrary expressions + let distinct_count = registry + .get_distinct_count(expr, &stats) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + let (min_value, max_value) = registry + .get_min_max(expr, &stats) + .map(|(min, max)| (Precision::Inexact(min), Precision::Inexact(max))) + .unwrap_or((Precision::Absent, Precision::Absent)); + let null_count = registry + .get_null_fraction(expr, &stats) + .and_then(|frac| { + stats + .num_rows + .get_value() + .map(|&rows| (rows as f64 * frac).ceil() as usize) + }) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + + ColumnStatistics { + distinct_count, + min_value, + max_value, + null_count, + ..ColumnStatistics::new_unknown() + } } else { - // TODO stats: estimate more statistics from expressions - // (expressions should compute their statistics themselves) ColumnStatistics::new_unknown() }; column_statistics.push(col_stats); @@ -803,6 +861,14 @@ impl Projector { pub fn projection(&self) -> &ProjectionExprs { &self.projection } + + /// Set the expression analyzer registry on the underlying projection + pub fn set_expression_analyzer_registry( + &mut self, + registry: Arc, + ) { + self.projection.expression_analyzer_registry = Some(registry); + } } /// Describes an immutable reference counted projection. @@ -2716,7 +2782,7 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression) should have unknown statistics + // First column (col0 + 1): no registry set, so statistics are unknown assert_eq!( output_stats.column_statistics[0].distinct_count, Precision::Absent @@ -2735,6 +2801,49 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_project_statistics_with_expression_analyzer() -> Result<()> { + let input_stats = get_stats(); + let input_schema = get_schema(); + + // Same projection as test_project_statistics_with_expressions, + // but with the analyzer registry enabled + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("col0", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "incremented".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("col1", 1)), + alias: "text".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // With analyzer: col0 + 1 is injective, NDV preserved from col0 (= 5) + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + + // Second column (col1) still preserves exact statistics + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Exact(1) + ); + + Ok(()) + } + #[test] fn test_project_statistics_primitive_width_only() -> Result<()> { let input_stats = get_stats(); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 802261430c47a..e80fe469cf465 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -94,6 +94,10 @@ pub struct FilterExec { batch_size: usize, /// Number of rows to fetch fetch: Option, + /// Optional expression analyzer registry for selectivity estimation + expression_analyzer_registry: Option< + Arc, + >, } /// Builder for [`FilterExec`] to set optional parameters @@ -104,6 +108,9 @@ pub struct FilterExecBuilder { default_selectivity: u8, batch_size: usize, fetch: Option, + expression_analyzer_registry: Option< + Arc, + >, } impl FilterExecBuilder { @@ -116,6 +123,7 @@ impl FilterExecBuilder { default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, + expression_analyzer_registry: None, } } @@ -174,6 +182,25 @@ impl FilterExecBuilder { self } + /// Set the expression analyzer registry for selectivity estimation. + /// + /// Same limitation as [`ProjectionExprs::with_expression_analyzer_registry`]: + /// the planner injects this from [`SessionState`], but filters created + /// by optimizer rules (e.g., filter pushdown into unions) fall back to + /// the default selectivity. An operator-level statistics registry is + /// needed for full coverage. + /// + /// [`ProjectionExprs::with_expression_analyzer_registry`]: datafusion_physical_expr::projection::ProjectionExprs::with_expression_analyzer_registry + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc< + datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, + ) -> Self { + self.expression_analyzer_registry = Some(registry); + self + } + /// Build the FilterExec, computing properties once with all configured parameters pub fn build(self) -> Result { // Validate predicate type @@ -213,6 +240,7 @@ impl FilterExecBuilder { projection: self.projection, batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry, }) } } @@ -226,6 +254,7 @@ impl From<&FilterExec> for FilterExecBuilder { default_selectivity: exec.default_selectivity, batch_size: exec.batch_size, fetch: exec.fetch, + expression_analyzer_registry: exec.expression_analyzer_registry.clone(), // We could cache / copy over PlanProperties // here but that would require invalidating them in FilterExecBuilder::apply_projection, etc. // and currently every call to this method ends up invalidating them anyway. @@ -286,6 +315,7 @@ impl FilterExec { projection: self.projection.clone(), batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }) } @@ -315,9 +345,16 @@ impl FilterExec { input_stats: Statistics, predicate: &Arc, default_selectivity: u8, + expression_analyzer_registry: Option< + &datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, ) -> Result { if !check_support(predicate, schema) { - let selectivity = default_selectivity as f64 / 100.0; + // Use ExpressionAnalyzer for better selectivity when available, + // fall back to the configured default_selectivity + let selectivity = expression_analyzer_registry + .and_then(|r| r.get_selectivity(predicate, &input_stats)) + .unwrap_or(default_selectivity as f64 / 100.0); let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats @@ -414,6 +451,7 @@ impl FilterExec { Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, + None, )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate); @@ -593,6 +631,7 @@ impl ExecutionPlan for FilterExec { input_stats, self.predicate(), self.default_selectivity, + self.expression_analyzer_registry.as_deref(), )?; Ok(Arc::new(stats.project(self.projection.as_ref()))) } @@ -753,6 +792,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }; Some(Arc::new(new) as _) }; @@ -777,6 +817,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), })) } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cd80277156fcb..0ab63105fab05 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -179,6 +179,18 @@ impl ProjectionExec { &self.input } + /// Set the expression analyzer registry for statistics estimation. + /// The registry is stored on the underlying [`ProjectionExprs`]. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc< + datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, + ) -> Self { + self.projector.set_expression_analyzer_registry(registry); + self + } + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, From f101c51e7c5896f2b46d1875aadc1a4c2711cb90 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Tue, 24 Mar 2026 09:52:18 +0100 Subject: [PATCH 2/2] Fix CI: update config docs, sqllogictest, and doc links - Regenerate configs.md for new enable_expression_analyzer option - Add enable_expression_analyzer to information_schema.slt expected output - Fix unresolved doc links to SessionState and DefaultExpressionAnalyzer (cross-crate references use backticks instead of doc links) - Simplify config description --- datafusion/common/src/config.rs | 5 ++--- datafusion/physical-expr/src/projection.rs | 4 ++-- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b3014e9dc6ab3..4e82e53ff9421 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -965,9 +965,8 @@ config_namespace! { /// When set to true, the physical planner will use the ExpressionAnalyzer /// framework for expression-level statistics estimation (NDV, selectivity, - /// min/max, null fraction) in projections and filters. When false, projections - /// return unknown statistics for non-column expressions and filters use the - /// default selectivity heuristic. + /// min/max, null fraction). When false, existing behavior without + /// expression-level statistics support is used. pub enable_expression_analyzer: bool, default = false /// When set to true, the optimizer will insert filters before a join between diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 17739d2c87678..e856e85af4fb5 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -200,10 +200,10 @@ impl ProjectionExprs { /// Set the expression analyzer registry for statistics estimation. /// - /// The physical planner injects the registry from [`SessionState`] when + /// The physical planner injects the registry from `SessionState` when /// creating projections. Projections created later by optimizer rules /// do not receive the registry and fall back to - /// [`DefaultExpressionAnalyzer`]. Propagating the registry to all + /// `DefaultExpressionAnalyzer`. Propagating the registry to all /// operator construction sites requires an operator-level statistics /// registry, which is orthogonal to this work. pub fn with_expression_analyzer_registry( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e80fe469cf465..56a01708d0b7c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -185,7 +185,7 @@ impl FilterExecBuilder { /// Set the expression analyzer registry for selectivity estimation. /// /// Same limitation as [`ProjectionExprs::with_expression_analyzer_registry`]: - /// the planner injects this from [`SessionState`], but filters created + /// the planner injects this from `SessionState`, but filters created /// by optimizer rules (e.g., filter pushdown into unions) fall back to /// the default selectivity. An operator-level statistics registry is /// needed for full coverage. diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2da823421dd76..75f63f55d8d48 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,6 +297,7 @@ datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true +datafusion.optimizer.enable_expression_analyzer false datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_leaf_expression_pushdown true datafusion.optimizer.enable_piecewise_merge_join false @@ -438,6 +439,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. +datafusion.optimizer.enable_expression_analyzer false When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 56ab4d1539f92..3037fd8779fd3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -143,6 +143,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | +| datafusion.optimizer.enable_expression_analyzer | false | When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |