From 5285cd320ba6de10ef6a49b20addb8e3a6e3083a Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 6 Dec 2025 21:31:29 +0200 Subject: [PATCH 01/13] feat: Add constant column extraction and rewriting for projections in ParquetOpener --- datafusion/datasource-parquet/src/opener.rs | 210 +++++++++++++++++++- 1 file changed, 205 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 30573ff6e11d..7327a172a63a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,6 +24,7 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -35,9 +36,13 @@ use std::task::{Context, Poll}; use arrow::datatypes::{SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; - -use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{ + exec_err, ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, +}; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -61,6 +66,7 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use std::collections::HashMap; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -214,6 +220,11 @@ impl FileOpener for ParquetOpener { .get_file_decryption_properties(&file_location) .await?; + let constant_columns = constant_columns_from_stats( + partitioned_file.statistics.as_deref(), + &logical_file_schema, + ); + // Prune this file using the file level statistics and partition values. // Since dynamic filters may have been updated since planning it is possible that we are able // to prune files now that we couldn't prune at planning time. @@ -222,6 +233,15 @@ impl FileOpener for ParquetOpener { // We'll also check this after every record batch we read, // and if at some point we are able to prove we can prune the file using just the file level statistics // we can end the stream early. + if !constant_columns.is_empty() { + predicate = predicate + .map(|expr| { + rewrite_physical_expr_with_constants(expr, &constant_columns) + }) + .transpose()?; + projection = + rewrite_projection_with_constants(projection, &constant_columns)?; + } let mut file_pruner = predicate .as_ref() .filter(|p| { @@ -580,6 +600,100 @@ fn copy_arrow_reader_metrics( } } +type ConstantColumns = HashMap; + +/// Extract constant column values from statistics, keyed by column index in the logical file schema. +fn constant_columns_from_stats( + statistics: Option<&Statistics>, + file_schema: &SchemaRef, +) -> ConstantColumns { + let mut constants = HashMap::new(); + let Some(statistics) = statistics else { + return constants; + }; + + let num_rows = match statistics.num_rows { + Precision::Exact(num_rows) => Some(num_rows), + _ => None, + }; + + for (idx, column_stats) in statistics + .column_statistics + .iter() + .take(file_schema.fields().len()) + .enumerate() + { + if let Some(value) = constant_value_from_stats( + column_stats, + num_rows, + file_schema.field(idx).data_type(), + ) { + constants.insert(idx, value); + } + } + + constants +} + +fn constant_value_from_stats( + column_stats: &ColumnStatistics, + num_rows: Option, + data_type: &DataType, +) -> Option { + if let (Precision::Exact(min), Precision::Exact(max)) = + (&column_stats.min_value, &column_stats.max_value) + { + if min == max + && !min.is_null() + && matches!(column_stats.null_count, Precision::Exact(0)) + { + return Some(min.clone()); + } + } + + if let (Some(num_rows), Precision::Exact(nulls)) = + (num_rows, &column_stats.null_count) + { + if *nulls == num_rows { + return ScalarValue::try_new_null(data_type).ok(); + } + } + + None +} + +fn rewrite_projection_with_constants( + projection: ProjectionExprs, + constants: &ConstantColumns, +) -> Result { + if constants.is_empty() { + return Ok(projection); + } + + projection.try_map_exprs(|expr| rewrite_physical_expr_with_constants(expr, constants)) +} + +fn rewrite_physical_expr_with_constants( + expr: Arc, + constants: &ConstantColumns, +) -> Result> { + if constants.is_empty() { + return Ok(expr); + } + + expr.transform(|current| { + if let Some(column) = current.as_any().downcast_ref::() { + if let Some(value) = constants.get(&column.index()) { + let literal = Arc::new(Literal::new(value.clone())); + return Ok(Transformed::yes(literal)); + } + } + Ok(Transformed::no(current)) + }) + .data() + .map_err(DataFusionError::from) +} + /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after @@ -840,7 +954,11 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use arrow::datatypes::{DataType, Field, Schema}; + use super::{ + constant_columns_from_stats, rewrite_physical_expr_with_constants, + rewrite_projection_with_constants, ConstantColumns, + }; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, @@ -849,8 +967,10 @@ mod test { use datafusion_datasource::{file_stream::FileOpener, PartitionedFile, TableSchema}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ - expressions::DynamicFilterPhysicalExpr, planner::logical2physical, - projection::ProjectionExprs, PhysicalExpr, + expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + planner::logical2physical, + projection::ProjectionExprs, + PhysicalExpr, }; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -860,6 +980,86 @@ mod test { use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + fn constant_int_stats() -> (Statistics, SchemaRef) { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let statistics = Statistics { + num_rows: Precision::Exact(3), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::from(5i32)), + min_value: Precision::Exact(ScalarValue::from(5i32)), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics::new_unknown(), + ], + }; + (statistics, schema) + } + + #[test] + fn extract_constant_columns_non_null() { + let (statistics, schema) = constant_int_stats(); + let constants = constant_columns_from_stats(Some(&statistics), &schema); + assert_eq!(constants.len(), 1); + assert_eq!(constants.get(&0), Some(&ScalarValue::from(5i32))); + assert!(constants.get(&1).is_none()); + } + + #[test] + fn extract_constant_columns_all_null() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let statistics = Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let constants = constant_columns_from_stats(Some(&statistics), &schema); + assert_eq!( + constants.get(&0), + Some(&ScalarValue::Utf8(None)), + "all-null column should be treated as constant null" + ); + } + + #[test] + fn rewrite_projection_to_literals() { + let (statistics, schema) = constant_int_stats(); + let constants = constant_columns_from_stats(Some(&statistics), &schema); + let projection = ProjectionExprs::from_indices(&[0, 1], &schema); + + let rewritten = + rewrite_projection_with_constants(projection, &constants).unwrap(); + let exprs = rewritten.as_ref(); + assert!(exprs[0].expr.as_any().downcast_ref::().is_some()); + assert!(exprs[1].expr.as_any().downcast_ref::().is_some()); + + // Only column `b` should remain in the projection mask + assert_eq!(rewritten.column_indices(), vec![1]); + } + + #[test] + fn rewrite_physical_expr_literal() { + let mut constants = ConstantColumns::new(); + constants.insert(0, ScalarValue::from(7i32)); + let expr: Arc = Arc::new(Column::new("a", 0)); + + let rewritten = rewrite_physical_expr_with_constants(expr, &constants).unwrap(); + assert!(rewritten.as_any().downcast_ref::().is_some()); + } + async fn count_batches_and_rows( mut stream: std::pin::Pin< Box< From 6fc12d852b9986cba0b80d9e28c2156015ab634c Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 6 Dec 2025 23:04:09 +0200 Subject: [PATCH 02/13] fix --- datafusion/datasource-parquet/src/opener.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 7327a172a63a..f295f0c2a84b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -236,7 +236,11 @@ impl FileOpener for ParquetOpener { if !constant_columns.is_empty() { predicate = predicate .map(|expr| { - rewrite_physical_expr_with_constants(expr, &constant_columns) + if is_dynamic_physical_expr(&expr) { + Ok(expr) + } else { + rewrite_physical_expr_with_constants(expr, &constant_columns) + } }) .transpose()?; projection = From feb9a6068d50ca4ce85cb3c3d74284a39e817017 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 6 Dec 2025 23:19:41 +0200 Subject: [PATCH 03/13] fix clippy --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f295f0c2a84b..14b1bcab60de 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1012,7 +1012,7 @@ mod test { let constants = constant_columns_from_stats(Some(&statistics), &schema); assert_eq!(constants.len(), 1); assert_eq!(constants.get(&0), Some(&ScalarValue::from(5i32))); - assert!(constants.get(&1).is_none()); + assert!(!constants.contains_key(&1)); } #[test] From 3dfe4991eb74571ef3bac5527ecb5dda671b2162 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sun, 7 Dec 2025 00:01:28 +0200 Subject: [PATCH 04/13] fix clippy --- datafusion/datasource-parquet/src/opener.rs | 89 ++++++------------- .../src/schema_rewriter.rs | 16 +++- 2 files changed, 37 insertions(+), 68 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 14b1bcab60de..ba3a4b2a40f8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -37,12 +37,10 @@ use std::task::{Context, Poll}; use arrow::datatypes::{SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ exec_err, ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, }; use datafusion_datasource::{PartitionedFile, TableSchema}; -use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -66,7 +64,6 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -use std::collections::HashMap; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -235,16 +232,11 @@ impl FileOpener for ParquetOpener { // we can end the stream early. if !constant_columns.is_empty() { predicate = predicate - .map(|expr| { - if is_dynamic_physical_expr(&expr) { - Ok(expr) - } else { - rewrite_physical_expr_with_constants(expr, &constant_columns) - } - }) + .map(|expr| replace_columns_with_literals(expr, &constant_columns)) .transpose()?; - projection = - rewrite_projection_with_constants(projection, &constant_columns)?; + projection = projection.try_map_exprs(|expr| { + replace_columns_with_literals(expr, &constant_columns) + })?; } let mut file_pruner = predicate .as_ref() @@ -604,9 +596,9 @@ fn copy_arrow_reader_metrics( } } -type ConstantColumns = HashMap; +type ConstantColumns = HashMap; -/// Extract constant column values from statistics, keyed by column index in the logical file schema. +/// Extract constant column values from statistics, keyed by column name in the logical file schema. fn constant_columns_from_stats( statistics: Option<&Statistics>, file_schema: &SchemaRef, @@ -627,12 +619,11 @@ fn constant_columns_from_stats( .take(file_schema.fields().len()) .enumerate() { - if let Some(value) = constant_value_from_stats( - column_stats, - num_rows, - file_schema.field(idx).data_type(), - ) { - constants.insert(idx, value); + let field = file_schema.field(idx); + if let Some(value) = + constant_value_from_stats(column_stats, num_rows, field.data_type()) + { + constants.insert(field.name().clone(), value); } } @@ -666,38 +657,6 @@ fn constant_value_from_stats( None } -fn rewrite_projection_with_constants( - projection: ProjectionExprs, - constants: &ConstantColumns, -) -> Result { - if constants.is_empty() { - return Ok(projection); - } - - projection.try_map_exprs(|expr| rewrite_physical_expr_with_constants(expr, constants)) -} - -fn rewrite_physical_expr_with_constants( - expr: Arc, - constants: &ConstantColumns, -) -> Result> { - if constants.is_empty() { - return Ok(expr); - } - - expr.transform(|current| { - if let Some(column) = current.as_any().downcast_ref::() { - if let Some(value) = constants.get(&column.index()) { - let literal = Arc::new(Literal::new(value.clone())); - return Ok(Transformed::yes(literal)); - } - } - Ok(Transformed::no(current)) - }) - .data() - .map_err(DataFusionError::from) -} - /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after @@ -958,10 +917,7 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use super::{ - constant_columns_from_stats, rewrite_physical_expr_with_constants, - rewrite_projection_with_constants, ConstantColumns, - }; + use super::{constant_columns_from_stats, ConstantColumns}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -976,7 +932,9 @@ mod test { projection::ProjectionExprs, PhysicalExpr, }; - use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; + use datafusion_physical_expr_adapter::{ + replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, + }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectStore}; @@ -999,6 +957,7 @@ mod test { min_value: Precision::Exact(ScalarValue::from(5i32)), sum_value: Precision::Absent, distinct_count: Precision::Absent, + byte_size: Precision::Absent, }, ColumnStatistics::new_unknown(), ], @@ -1011,8 +970,8 @@ mod test { let (statistics, schema) = constant_int_stats(); let constants = constant_columns_from_stats(Some(&statistics), &schema); assert_eq!(constants.len(), 1); - assert_eq!(constants.get(&0), Some(&ScalarValue::from(5i32))); - assert!(!constants.contains_key(&1)); + assert_eq!(constants.get("a"), Some(&ScalarValue::from(5i32))); + assert!(!constants.contains_key("b")); } #[test] @@ -1027,12 +986,13 @@ mod test { min_value: Precision::Absent, sum_value: Precision::Absent, distinct_count: Precision::Absent, + byte_size: Precision::Absent, }], }; let constants = constant_columns_from_stats(Some(&statistics), &schema); assert_eq!( - constants.get(&0), + constants.get("a"), Some(&ScalarValue::Utf8(None)), "all-null column should be treated as constant null" ); @@ -1044,8 +1004,9 @@ mod test { let constants = constant_columns_from_stats(Some(&statistics), &schema); let projection = ProjectionExprs::from_indices(&[0, 1], &schema); - let rewritten = - rewrite_projection_with_constants(projection, &constants).unwrap(); + let rewritten = projection + .try_map_exprs(|expr| replace_columns_with_literals(expr, &constants)) + .unwrap(); let exprs = rewritten.as_ref(); assert!(exprs[0].expr.as_any().downcast_ref::().is_some()); assert!(exprs[1].expr.as_any().downcast_ref::().is_some()); @@ -1057,10 +1018,10 @@ mod test { #[test] fn rewrite_physical_expr_literal() { let mut constants = ConstantColumns::new(); - constants.insert(0, ScalarValue::from(7i32)); + constants.insert("a".to_string(), ScalarValue::from(7i32)); let expr: Arc = Arc::new(Column::new("a", 0)); - let rewritten = rewrite_physical_expr_with_constants(expr, &constants).unwrap(); + let rewritten = replace_columns_with_literals(expr, &constants).unwrap(); assert!(rewritten.as_any().downcast_ref::().is_some()); } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 6f040c1a051b..b5ebfe0be31b 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -19,7 +19,9 @@ //! [`PhysicalExprAdapterFactory`], default implementations, //! and [`replace_columns_with_literals`]. +use std::borrow::Borrow; use std::collections::HashMap; +use std::hash::Hash; use std::sync::Arc; use arrow::compute::can_cast_types; @@ -50,19 +52,25 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// # Arguments /// - `expr`: The physical expression in which to replace column references. /// - `replacements`: A mapping from column names to their corresponding literal `ScalarValue`s. +/// Accepts various HashMap types including `HashMap<&str, &ScalarValue>`, +/// `HashMap`, `HashMap`, etc. /// /// # Returns /// - `Result>`: The rewritten physical expression with columns replaced by literals. -pub fn replace_columns_with_literals( +pub fn replace_columns_with_literals( expr: Arc, - replacements: &HashMap<&str, &ScalarValue>, -) -> Result> { + replacements: &HashMap, +) -> Result> +where + K: Borrow + Eq + Hash, + V: Borrow, +{ expr.transform(|expr| { if let Some(column) = expr.as_any().downcast_ref::() && let Some(replacement_value) = replacements.get(column.name()) { return Ok(Transformed::yes(expressions::lit( - (*replacement_value).clone(), + replacement_value.borrow().clone(), ))); } Ok(Transformed::no(expr)) From bfb108f82d52c1203e1d207941717ff19052189b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:51:20 -0600 Subject: [PATCH 05/13] re-use new function --- datafusion/datasource-parquet/src/opener.rs | 26 ++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ba3a4b2a40f8..470dfda0ed99 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -211,17 +211,25 @@ impl FileOpener for ParquetOpener { let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let constant_columns = constant_columns_from_stats( + partitioned_file.statistics.as_deref(), + &logical_file_schema, + ); + if !constant_columns.is_empty() { + predicate = predicate + .map(|expr| replace_columns_with_literals(expr, &constant_columns)) + .transpose()?; + projection = projection.try_map_exprs(|expr| { + replace_columns_with_literals(expr, &constant_columns) + })?; + } + Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context .get_file_decryption_properties(&file_location) .await?; - let constant_columns = constant_columns_from_stats( - partitioned_file.statistics.as_deref(), - &logical_file_schema, - ); - // Prune this file using the file level statistics and partition values. // Since dynamic filters may have been updated since planning it is possible that we are able // to prune files now that we couldn't prune at planning time. @@ -230,14 +238,6 @@ impl FileOpener for ParquetOpener { // We'll also check this after every record batch we read, // and if at some point we are able to prove we can prune the file using just the file level statistics // we can end the stream early. - if !constant_columns.is_empty() { - predicate = predicate - .map(|expr| replace_columns_with_literals(expr, &constant_columns)) - .transpose()?; - projection = projection.try_map_exprs(|expr| { - replace_columns_with_literals(expr, &constant_columns) - })?; - } let mut file_pruner = predicate .as_ref() .filter(|p| { From ce322761dae207a907cc9f31acacf60dd36e1010 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:59:23 -0600 Subject: [PATCH 06/13] merge literal replacements --- datafusion/datasource-parquet/src/opener.rs | 91 +++++++++------------ 1 file changed, 39 insertions(+), 52 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 470dfda0ed99..c041d1798d4e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -140,59 +140,59 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; - // Build partition values map for replacing partition column references - // with their literal values from this file's partition values. - // - // For example, given - // 1. `region` is a partition column, - // 2. predicate `host IN ('us-east-1', 'eu-central-1')`: - // 3. The file path is `/data/region=us-west-2/...` - // (that is the partition column value is `us-west-2`) + // Calculate the output schema from the original projection (before literal replacement) + // so we get correct field names from column references + let logical_file_schema = Arc::clone(self.table_schema.file_schema()); + let output_schema = Arc::new( + self.projection + .project_schema(self.table_schema.table_schema())?, + ); + + // Build a combined map for replacing column references with literal values. + // This includes: + // 1. Partition column values from the file path (e.g., region=us-west-2) + // 2. Constant columns detected from file statistics (where min == max) + // + // Although partition columns *are* constant columns, we don't want to rely on + // statistics for them being populated if we can use the partition values + // (which are guaranteed to be present). // - // The predicate would be rewritten to - // ```sql - // 'us-west-2` IN ('us-east-1', 'eu-central-1') - // ``` - // which can be further simplified to `FALSE`, meaning - // the file can be skipped entirely. + // For example, given a partition column `region` and predicate + // `region IN ('us-east-1', 'eu-central-1')` with file path + // `/data/region=us-west-2/...`, the predicate is rewritten to + // `'us-west-2' IN ('us-east-1', 'eu-central-1')` which simplifies to FALSE. // - // While this particular optimization is done during logical planning, - // there are other cases where partition columns may appear in more - // complex predicates that cannot be simplified until we are about to - // open the file (such as dynamic predicates) - let partition_values: HashMap<&str, &ScalarValue> = self + // While partition column optimization is done during logical planning, + // there are cases where partition columns may appear in more complex + // predicates that cannot be simplified until we open the file (such as + // dynamic predicates). + let mut literal_columns: HashMap = self .table_schema .table_partition_cols() .iter() .zip(partitioned_file.partition_values.iter()) - .map(|(field, value)| (field.name().as_str(), value)) + .map(|(field, value)| (field.name().clone(), value.clone())) .collect(); - // Calculate the output schema from the original projection (before literal replacement) - // so we get correct field names from column references - let logical_file_schema = Arc::clone(self.table_schema.file_schema()); - let output_schema = Arc::new( - self.projection - .project_schema(self.table_schema.table_schema())?, - ); + // Add constant columns from file statistics (partition columns and file + // columns are disjoint, so no overlap is possible) + literal_columns.extend(constant_columns_from_stats( + partitioned_file.statistics.as_deref(), + &logical_file_schema, + )); - // Apply partition column replacement to projection expressions + // Apply literal replacements to projection and predicate let mut projection = self.projection.clone(); - if !partition_values.is_empty() { + let mut predicate = self.predicate.clone(); + if !literal_columns.is_empty() { projection = projection.try_map_exprs(|expr| { - replace_columns_with_literals(Arc::clone(&expr), &partition_values) + replace_columns_with_literals(Arc::clone(&expr), &literal_columns) })?; + predicate = predicate + .map(|p| replace_columns_with_literals(p, &literal_columns)) + .transpose()?; } - // Apply partition column replacement to predicate - let mut predicate = if partition_values.is_empty() { - self.predicate.clone() - } else { - self.predicate - .clone() - .map(|p| replace_columns_with_literals(p, &partition_values)) - .transpose()? - }; let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let force_filter_selections = self.force_filter_selections; @@ -211,19 +211,6 @@ impl FileOpener for ParquetOpener { let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; - let constant_columns = constant_columns_from_stats( - partitioned_file.statistics.as_deref(), - &logical_file_schema, - ); - if !constant_columns.is_empty() { - predicate = predicate - .map(|expr| replace_columns_with_literals(expr, &constant_columns)) - .transpose()?; - projection = projection.try_map_exprs(|expr| { - replace_columns_with_literals(expr, &constant_columns) - })?; - } - Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context From 5c65a866c17f28fc688f22500733474a83e20f1a Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 13 Dec 2025 11:37:12 +0200 Subject: [PATCH 07/13] fix fmt --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c041d1798d4e..1d30202541a9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -152,7 +152,7 @@ impl FileOpener for ParquetOpener { // This includes: // 1. Partition column values from the file path (e.g., region=us-west-2) // 2. Constant columns detected from file statistics (where min == max) - // + // // Although partition columns *are* constant columns, we don't want to rely on // statistics for them being populated if we can use the partition values // (which are guaranteed to be present). From 7126f5918a185ada499c32f3fbdd9559fb86a6dd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 09:00:13 -0600 Subject: [PATCH 08/13] don't rewrite dynamic filters --- .../physical-expr-adapter/src/schema_rewriter.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index b5ebfe0be31b..1dec39c23359 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Schema, SchemaRef}; +use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, ScalarValue, exec_err, nested_struct::validate_struct_compatibility, @@ -65,7 +66,16 @@ where K: Borrow + Eq + Hash, V: Borrow, { - expr.transform(|expr| { + expr.transform_down(|expr| { + // TODO: if we find ourselves doing this check a lot, we should consider adding a method to PhysicalExpr directly + // to avoid sprinkling this somewhat magic check with an arbitrary integer all over the place. + if expr.snapshot_generation() != 0 { + // This is a dynamic filter expression, don't rewrite it. + // Dynamic filters have shared state, rewriting it would lead to incorrect results. + let mut transformed = Transformed::no(expr); + transformed.tnr = TreeNodeRecursion::Stop; + return Ok(transformed); + } if let Some(column) = expr.as_any().downcast_ref::() && let Some(replacement_value) = replacements.get(column.name()) { From d3101613b69d44820b95c2b473807923eb53bdb7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 09:11:30 -0600 Subject: [PATCH 09/13] remove limitation --- .../src/schema_rewriter.rs | 9 -- .../src/expressions/dynamic_filters.rs | 86 +++++++++++++++++++ 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 1dec39c23359..a25871c4b010 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -67,15 +67,6 @@ where V: Borrow, { expr.transform_down(|expr| { - // TODO: if we find ourselves doing this check a lot, we should consider adding a method to PhysicalExpr directly - // to avoid sprinkling this somewhat magic check with an arbitrary integer all over the place. - if expr.snapshot_generation() != 0 { - // This is a dynamic filter expression, don't rewrite it. - // Dynamic filters have shared state, rewriting it would lead to incorrect results. - let mut transformed = Transformed::no(expr); - transformed.tnr = TreeNodeRecursion::Stop; - return Ok(transformed); - } if let Some(column) = expr.as_any().downcast_ref::() && let Some(replacement_value) = replacements.get(column.name()) { diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 60b00117b7b1..432563135b24 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -607,4 +607,90 @@ mod test { // wait_complete should return immediately dynamic_filter.wait_complete().await; } + + #[test] + fn test_with_new_children_independence() { + // Create a schema with columns a, b, c, d + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Int32, false), + ])); + + // Create expression col(a) + col(b) + let col_a = col("a", &schema).unwrap(); + let col_b = col("b", &schema).unwrap(); + let col_c = col("c", &schema).unwrap(); + let col_d = col("d", &schema).unwrap(); + + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Plus, + Arc::clone(&col_b), + )); + + // Create DynamicFilterPhysicalExpr with children [col_a, col_b] + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + expr as Arc, + )); + + // Clone the Arc (two references to the same DynamicFilterPhysicalExpr) + let clone_1 = Arc::clone(&dynamic_filter); + let clone_2 = Arc::clone(&dynamic_filter); + + // Call with_new_children with different children on each clone + // clone_1: replace [a, b] with [b, c] -> expression becomes b + c + let remapped_1 = clone_1 + .with_new_children(vec![Arc::clone(&col_b), Arc::clone(&col_c)]) + .unwrap(); + + // clone_2: replace [a, b] with [b, d] -> expression becomes b + d + let remapped_2 = clone_2 + .with_new_children(vec![Arc::clone(&col_b), Arc::clone(&col_d)]) + .unwrap(); + + // Create a RecordBatch with columns a=1,2,3 b=10,20,30 c=100,200,300 d=1000,2000,3000 + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), // a + Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])), // b + Arc::new(arrow::array::Int32Array::from(vec![100, 200, 300])), // c + Arc::new(arrow::array::Int32Array::from(vec![1000, 2000, 3000])), // d + ], + ) + .unwrap(); + + // Evaluate both remapped expressions + let result_1 = remapped_1.evaluate(&batch).unwrap(); + let result_2 = remapped_2.evaluate(&batch).unwrap(); + + // Extract arrays from results + let ColumnarValue::Array(arr_1) = result_1 else { + panic!("Expected ColumnarValue::Array for result_1"); + }; + let ColumnarValue::Array(arr_2) = result_2 else { + panic!("Expected ColumnarValue::Array for result_2"); + }; + + // Verify result_1 = b + c = [110, 220, 330] + let expected_1: Arc = + Arc::new(arrow::array::Int32Array::from(vec![110, 220, 330])); + assert!( + arr_1.eq(&expected_1), + "Expected b + c = [110, 220, 330], got {:?}", + arr_1 + ); + + // Verify result_2 = b + d = [1010, 2020, 3030] + let expected_2: Arc = + Arc::new(arrow::array::Int32Array::from(vec![1010, 2020, 3030])); + assert!( + arr_2.eq(&expected_2), + "Expected b + d = [1010, 2020, 3030], got {:?}", + arr_2 + ); + } } From 4bc912f0d1f742a5461e19f5c1f42782e7a2219d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 09:11:58 -0600 Subject: [PATCH 10/13] lint --- datafusion/physical-expr-adapter/src/schema_rewriter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index a25871c4b010..e9b8ff5c37db 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, ScalarValue, exec_err, nested_struct::validate_struct_compatibility, From da1f9427a6f0198c516f83289f9eb0a1b6b00de5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 09:14:05 -0600 Subject: [PATCH 11/13] fmt --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 432563135b24..795784c1c0a4 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -655,8 +655,8 @@ mod test { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), // a - Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])), // b + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), // a + Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])), // b Arc::new(arrow::array::Int32Array::from(vec![100, 200, 300])), // c Arc::new(arrow::array::Int32Array::from(vec![1000, 2000, 3000])), // d ], From cbc9fbfc6e3f2567a62bf5fc64e6eb4a61c93b4a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 10:56:32 -0600 Subject: [PATCH 12/13] lint --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 795784c1c0a4..615d9cbbf61a 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -680,8 +680,7 @@ mod test { Arc::new(arrow::array::Int32Array::from(vec![110, 220, 330])); assert!( arr_1.eq(&expected_1), - "Expected b + c = [110, 220, 330], got {:?}", - arr_1 + "Expected b + c = [110, 220, 330], got {arr_1:?}", ); // Verify result_2 = b + d = [1010, 2020, 3030] @@ -689,8 +688,7 @@ mod test { Arc::new(arrow::array::Int32Array::from(vec![1010, 2020, 3030])); assert!( arr_2.eq(&expected_2), - "Expected b + d = [1010, 2020, 3030], got {:?}", - arr_2 + "Expected b + d = [1010, 2020, 3030], got {arr_2:?}", ); } } From c0b8c3f526475fe022c43803789a6a3a1c0b1f13 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 13 Dec 2025 11:01:32 -0600 Subject: [PATCH 13/13] update comment --- datafusion/datasource-parquet/src/opener.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 1d30202541a9..547ce5091da4 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -173,9 +173,10 @@ impl FileOpener for ParquetOpener { .zip(partitioned_file.partition_values.iter()) .map(|(field, value)| (field.name().clone(), value.clone())) .collect(); - - // Add constant columns from file statistics (partition columns and file - // columns are disjoint, so no overlap is possible) + // Add constant columns from file statistics. + // Note that if there are statistics for partition columns there will be overlap, + // but since we use a HashMap, we'll just overwrite the partition values with the + // constant values from statistics (which should be the same). literal_columns.extend(constant_columns_from_stats( partitioned_file.statistics.as_deref(), &logical_file_schema,