Skip to content

Commit cb2f3d2

Browse files
authored
Add constant expression evaluator to physical expression simplifier (#19130)
This improves handling of constant expressions during pruning by trying to evaluate them in the simplifier and the pruning machinery. This is somewhat redundant with #19129 in the simple case of our Parquet implementation but since there may be edge cases where one is hit and not the other, or where users are using them independently I thought it best to implement both approaches.
1 parent 944f7f2 commit cb2f3d2

File tree

4 files changed

+258
-18
lines changed

4 files changed

+258
-18
lines changed

datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -564,29 +564,36 @@ async fn test_parquet_missing_column() -> Result<()> {
564564
.push_down_filters(false)
565565
.execute()
566566
.await?;
567-
// There will be data: the filter is (null) is not null or a = 24.
568-
// Statistics pruning doesn't handle `null is not null` so it resolves to `true or a = 24` -> `true` so no row groups are pruned
567+
// There should be zero batches
568+
assert_eq!(batches.len(), 0);
569+
// Check another filter: `b = 'foo' and a = 24` should also prune data with only statistics-based pushdown
570+
let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24)));
571+
let batches = test_case
572+
.clone()
573+
.with_predicate(filter)
574+
.push_down_filters(false)
575+
.execute()
576+
.await?;
577+
// There should be zero batches
578+
assert_eq!(batches.len(), 0);
579+
// On the other hand `b is null and a = 2` should prune only the second row group with stats only pruning
580+
let filter = col("b").is_null().and(col("a").eq(lit(2)));
581+
let batches = test_case
582+
.clone()
583+
.with_predicate(filter)
584+
.push_down_filters(false)
585+
.execute()
586+
.await?;
569587
#[rustfmt::skip]
570588
let expected = [
571589
"+---+---+-----+",
572590
"| a | b | c |",
573591
"+---+---+-----+",
574592
"| 1 | | 1.1 |",
575593
"| 2 | | 2.2 |",
576-
"| 3 | | 3.3 |",
577594
"+---+---+-----+",
578595
];
579596
assert_batches_eq!(expected, &batches);
580-
// On the other hand the filter `b = 'foo' and a = 24` should prune all data even with only statistics-based pushdown
581-
let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24)));
582-
let batches = test_case
583-
.clone()
584-
.with_predicate(filter)
585-
.push_down_filters(false)
586-
.execute()
587-
.await?;
588-
// There should be zero batches
589-
assert_eq!(batches.len(), 0);
590597

591598
Ok(())
592599
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1235,7 +1235,7 @@ mod test {
12351235
force_filter_selections: false,
12361236
enable_page_index: false,
12371237
enable_bloom_filter: false,
1238-
enable_row_group_stats_pruning: true,
1238+
enable_row_group_stats_pruning: false, // note that this is false!
12391239
coerce_int96: None,
12401240
#[cfg(feature = "parquet_encryption")]
12411241
file_decryption_properties: None,
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Constant expression evaluation for the physical expression simplifier
19+
20+
use std::sync::Arc;
21+
22+
use arrow::array::new_null_array;
23+
use arrow::datatypes::{DataType, Field, Schema};
24+
use arrow::record_batch::RecordBatch;
25+
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
26+
use datafusion_common::{Result, ScalarValue};
27+
use datafusion_expr_common::columnar_value::ColumnarValue;
28+
use datafusion_physical_expr_common::physical_expr::is_volatile;
29+
30+
use crate::expressions::{Column, Literal};
31+
use crate::PhysicalExpr;
32+
33+
/// Simplify expressions that consist only of literals by evaluating them.
34+
///
35+
/// This function checks if all children of the given expression are literals.
36+
/// If so, it evaluates the expression against a dummy RecordBatch and returns
37+
/// the result as a new Literal.
38+
///
39+
/// # Example transformations
40+
/// - `1 + 2` -> `3`
41+
/// - `(1 + 2) * 3` -> `9` (with bottom-up traversal)
42+
/// - `'hello' || ' world'` -> `'hello world'`
43+
pub fn simplify_const_expr(
44+
expr: &Arc<dyn PhysicalExpr>,
45+
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
46+
if is_volatile(expr) || has_column_references(expr) {
47+
return Ok(Transformed::no(Arc::clone(expr)));
48+
}
49+
50+
// Create a 1-row dummy batch for evaluation
51+
let batch = create_dummy_batch()?;
52+
53+
// Evaluate the expression
54+
match expr.evaluate(&batch) {
55+
Ok(ColumnarValue::Scalar(scalar)) => {
56+
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
57+
}
58+
Ok(ColumnarValue::Array(arr)) if arr.len() == 1 => {
59+
// Some operations return an array even for scalar inputs
60+
let scalar = ScalarValue::try_from_array(&arr, 0)?;
61+
Ok(Transformed::yes(Arc::new(Literal::new(scalar))))
62+
}
63+
Ok(_) => {
64+
// Unexpected result - keep original expression
65+
Ok(Transformed::no(Arc::clone(expr)))
66+
}
67+
Err(_) => {
68+
// On error, keep original expression
69+
// The expression might succeed at runtime due to short-circuit evaluation
70+
// or other runtime conditions
71+
Ok(Transformed::no(Arc::clone(expr)))
72+
}
73+
}
74+
}
75+
76+
/// Create a 1-row dummy RecordBatch for evaluating constant expressions.
77+
///
78+
/// The batch is never actually accessed for data - it's just needed because
79+
/// the PhysicalExpr::evaluate API requires a RecordBatch. For expressions
80+
/// that only contain literals, the batch content is irrelevant.
81+
///
82+
/// This is the same approach used in the logical expression `ConstEvaluator`.
83+
fn create_dummy_batch() -> Result<RecordBatch> {
84+
// RecordBatch requires at least one column
85+
let dummy_schema = Arc::new(Schema::new(vec![Field::new("_", DataType::Null, true)]));
86+
let col = new_null_array(&DataType::Null, 1);
87+
Ok(RecordBatch::try_new(dummy_schema, vec![col])?)
88+
}
89+
90+
/// Check if this expression has any column references.
91+
pub fn has_column_references(expr: &Arc<dyn PhysicalExpr>) -> bool {
92+
let mut has_columns = false;
93+
expr.apply(|expr| {
94+
if expr.as_any().downcast_ref::<Column>().is_some() {
95+
has_columns = true;
96+
Ok(TreeNodeRecursion::Stop)
97+
} else {
98+
Ok(TreeNodeRecursion::Continue)
99+
}
100+
})
101+
.expect("apply should not fail");
102+
has_columns
103+
}

datafusion/physical-expr/src/simplifier/mod.rs

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::sync::Arc;
2626

2727
use crate::{simplifier::not::simplify_not_expr, PhysicalExpr};
2828

29+
pub mod const_evaluator;
2930
pub mod not;
3031
pub mod unwrap_cast;
3132

@@ -72,11 +73,13 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
7273
#[cfg(test)]
7374
let original_type = node.data_type(self.schema).unwrap();
7475

75-
// Apply NOT expression simplification first, then unwrap cast optimization
76-
let rewritten =
77-
simplify_not_expr(&node, self.schema)?.transform_data(|node| {
76+
// Apply NOT expression simplification first, then unwrap cast optimization,
77+
// then constant expression evaluation
78+
let rewritten = simplify_not_expr(&node, self.schema)?
79+
.transform_data(|node| {
7880
unwrap_cast::unwrap_cast_in_comparison(node, self.schema)
79-
})?;
81+
})?
82+
.transform_data(|node| const_evaluator::simplify_const_expr(&node))?;
8083

8184
#[cfg(test)]
8285
assert_eq!(
@@ -491,4 +494,131 @@ mod tests {
491494

492495
Ok(())
493496
}
497+
498+
#[test]
499+
fn test_simplify_literal_binary_expr() {
500+
let schema = Schema::empty();
501+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
502+
503+
// 1 + 2 -> 3
504+
let expr: Arc<dyn PhysicalExpr> =
505+
Arc::new(BinaryExpr::new(lit(1i32), Operator::Plus, lit(2i32)));
506+
let result = simplifier.simplify(expr).unwrap();
507+
let literal = as_literal(&result);
508+
assert_eq!(literal.value(), &ScalarValue::Int32(Some(3)));
509+
}
510+
511+
#[test]
512+
fn test_simplify_literal_comparison() {
513+
let schema = Schema::empty();
514+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
515+
516+
// 5 > 3 -> true
517+
let expr: Arc<dyn PhysicalExpr> =
518+
Arc::new(BinaryExpr::new(lit(5i32), Operator::Gt, lit(3i32)));
519+
let result = simplifier.simplify(expr).unwrap();
520+
let literal = as_literal(&result);
521+
assert_eq!(literal.value(), &ScalarValue::Boolean(Some(true)));
522+
523+
// 2 > 3 -> false
524+
let expr: Arc<dyn PhysicalExpr> =
525+
Arc::new(BinaryExpr::new(lit(2i32), Operator::Gt, lit(3i32)));
526+
let result = simplifier.simplify(expr).unwrap();
527+
let literal = as_literal(&result);
528+
assert_eq!(literal.value(), &ScalarValue::Boolean(Some(false)));
529+
}
530+
531+
#[test]
532+
fn test_simplify_nested_literal_expr() {
533+
let schema = Schema::empty();
534+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
535+
536+
// (1 + 2) * 3 -> 9
537+
let inner: Arc<dyn PhysicalExpr> =
538+
Arc::new(BinaryExpr::new(lit(1i32), Operator::Plus, lit(2i32)));
539+
let expr: Arc<dyn PhysicalExpr> =
540+
Arc::new(BinaryExpr::new(inner, Operator::Multiply, lit(3i32)));
541+
let result = simplifier.simplify(expr).unwrap();
542+
let literal = as_literal(&result);
543+
assert_eq!(literal.value(), &ScalarValue::Int32(Some(9)));
544+
}
545+
546+
#[test]
547+
fn test_simplify_deeply_nested_literals() {
548+
let schema = Schema::empty();
549+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
550+
551+
// ((1 + 2) * 3) + ((4 - 1) * 2) -> 9 + 6 -> 15
552+
let left: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
553+
Arc::new(BinaryExpr::new(lit(1i32), Operator::Plus, lit(2i32))),
554+
Operator::Multiply,
555+
lit(3i32),
556+
));
557+
let right: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
558+
Arc::new(BinaryExpr::new(lit(4i32), Operator::Minus, lit(1i32))),
559+
Operator::Multiply,
560+
lit(2i32),
561+
));
562+
let expr: Arc<dyn PhysicalExpr> =
563+
Arc::new(BinaryExpr::new(left, Operator::Plus, right));
564+
let result = simplifier.simplify(expr).unwrap();
565+
let literal = as_literal(&result);
566+
assert_eq!(literal.value(), &ScalarValue::Int32(Some(15)));
567+
}
568+
569+
#[test]
570+
fn test_no_simplify_with_column() {
571+
let schema = test_schema();
572+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
573+
574+
// c1 + 2 should NOT be simplified (has column reference)
575+
let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
576+
col("c1", &schema).unwrap(),
577+
Operator::Plus,
578+
lit(2i32),
579+
));
580+
let result = simplifier.simplify(expr).unwrap();
581+
// Should remain a BinaryExpr, not become a Literal
582+
assert!(result.as_any().downcast_ref::<BinaryExpr>().is_some());
583+
}
584+
585+
#[test]
586+
fn test_partial_simplify_with_column() {
587+
let schema = test_schema();
588+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
589+
590+
// (1 + 2) + c1 should simplify the literal part: 3 + c1
591+
let literal_part: Arc<dyn PhysicalExpr> =
592+
Arc::new(BinaryExpr::new(lit(1i32), Operator::Plus, lit(2i32)));
593+
let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
594+
literal_part,
595+
Operator::Plus,
596+
col("c1", &schema).unwrap(),
597+
));
598+
let result = simplifier.simplify(expr).unwrap();
599+
600+
// Should be a BinaryExpr with a Literal(3) on the left
601+
let binary = as_binary(&result);
602+
let left_literal = as_literal(binary.left());
603+
assert_eq!(left_literal.value(), &ScalarValue::Int32(Some(3)));
604+
}
605+
606+
#[test]
607+
fn test_simplify_literal_string_concat() {
608+
let schema = Schema::empty();
609+
let mut simplifier = PhysicalExprSimplifier::new(&schema);
610+
611+
// 'hello' || ' world' -> 'hello world'
612+
let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
613+
lit("hello"),
614+
Operator::StringConcat,
615+
lit(" world"),
616+
));
617+
let result = simplifier.simplify(expr).unwrap();
618+
let literal = as_literal(&result);
619+
assert_eq!(
620+
literal.value(),
621+
&ScalarValue::Utf8(Some("hello world".to_string()))
622+
);
623+
}
494624
}

0 commit comments

Comments
 (0)