Skip to content

Commit dc78613

Browse files
adriangbalamb
andauthored
Move partition handling out of PhysicalExprAdapter (#19128)
This PR does some refactoring of `PhysicalExprAdapter` and `PhysicalExprSimplifier` that I found necessary and/or beneficial while working on #19111. ## Changes made ### Replace `PhysicalExprAdapter::with_partition_values` with `replace_columns_with_literals` This is a nice improvement because it: 1. Makes the `PhysicalExprAdapter` trait that users might need to implement simpler (less boilerplate for users). 2. Decouples these two transformations so that we can replace partition values and then apply a projection without having to also do the schema mapping (it would be from the logical schema to the logical schema, confusing and a waste of compute). I ran into this need in #19111. I think there may be other ways of doing it (e.g. piping in the expected output schema from ParquetSource) but it felt nicer this way and I expect other places may also need the decoupled transformation. 3. I think we can use it in the future to implement #19089 (edit: evidently I was right, see identical function in #19136). 4. It's less lines of code 😄 This will require any users calling `PhysicalExprAdapter` directly to change their code, I can add an entry to the upgrade guide. ### Remove partition pruning logic from `FilePruner` and deprecate now unused `PrunableStatistics` and `CompositePruningStatistics`. Since we replace partition values with literals we no longer need these structures, they get handled like any other literals. This is a good chunk of code / complexity that we can bin off. ### Use `TableSchema` instead of `SchemaRef` + `Vec<FieldRef>` in `ParquetOpener` `TableSchema` is basically `SchemaRef` + `Vec<FieldRef>` already and since `ParquetSource` has a `TableSchema` its less code and less clones to propagate that into `ParquetOpener` --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 7ac40d7 commit dc78613

File tree

13 files changed

+445
-409
lines changed

13 files changed

+445
-409
lines changed

datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
use std::sync::Arc;
2121

2222
use arrow::array::{record_batch, RecordBatch};
23-
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
23+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2424

2525
use datafusion::assert_batches_eq;
2626
use datafusion::common::not_impl_err;
2727
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
28-
use datafusion::common::{Result, ScalarValue};
28+
use datafusion::common::Result;
2929
use datafusion::datasource::listing::{
3030
ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl,
3131
};
@@ -209,14 +209,4 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
209209
})
210210
.data()
211211
}
212-
213-
fn with_partition_values(
214-
&self,
215-
partition_values: Vec<(FieldRef, ScalarValue)>,
216-
) -> Arc<dyn PhysicalExprAdapter> {
217-
Arc::new(Self {
218-
inner: self.inner.with_partition_values(partition_values),
219-
..self.clone()
220-
})
221-
}
222212
}

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 36 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@ use std::collections::HashMap;
2222
use std::sync::Arc;
2323

2424
use arrow::array::RecordBatch;
25-
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
25+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2626
use async_trait::async_trait;
2727

2828
use datafusion::assert_batches_eq;
2929
use datafusion::catalog::memory::DataSourceExec;
3030
use datafusion::catalog::{Session, TableProvider};
31-
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
3231
use datafusion::common::DFSchema;
3332
use datafusion::common::{Result, ScalarValue};
3433
use datafusion::datasource::listing::PartitionedFile;
@@ -39,12 +38,12 @@ use datafusion::logical_expr::utils::conjunction;
3938
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
4039
use datafusion::parquet::arrow::ArrowWriter;
4140
use datafusion::parquet::file::properties::WriterProperties;
42-
use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
4341
use datafusion::physical_expr::PhysicalExpr;
4442
use datafusion::physical_plan::ExecutionPlan;
4543
use datafusion::prelude::{lit, SessionConfig};
4644
use datafusion_physical_expr_adapter::{
47-
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
45+
replace_columns_with_literals, DefaultPhysicalExprAdapterFactory,
46+
PhysicalExprAdapter, PhysicalExprAdapterFactory,
4847
};
4948
use futures::StreamExt;
5049
use object_store::memory::InMemory;
@@ -60,16 +59,16 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
6059
/// This example demonstrates how to:
6160
/// 1. Store default values in field metadata using a constant key
6261
/// 2. Create a custom PhysicalExprAdapter that reads these defaults
63-
/// 3. Inject default values for missing columns in filter predicates
62+
/// 3. Inject default values for missing columns in filter predicates using `replace_columns_with_literals`
6463
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
65-
/// 5. Wrap string default values in cast expressions for proper type conversion
64+
/// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time
6665
///
6766
/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
6867
/// that get pushed down to file scans. For handling missing columns in projections,
6968
/// other mechanisms in DataFusion are used (like SchemaAdapter).
7069
///
7170
/// The metadata-based approach provides a flexible way to store default values as strings
72-
/// and cast them to the appropriate types at query time.
71+
/// and cast them to the appropriate types at planning time, avoiding runtime overhead.
7372
pub async fn default_column_values() -> Result<()> {
7473
println!("=== Creating example data with missing columns and default values ===");
7574

@@ -138,8 +137,8 @@ pub async fn default_column_values() -> Result<()> {
138137
println!("This example demonstrates how PhysicalExprAdapter works:");
139138
println!("1. Physical schema only has 'id' and 'name' columns");
140139
println!("2. Logical schema has 'id', 'name', 'status', and 'priority' columns with defaults");
141-
println!("3. Our custom adapter intercepts filter expressions on missing columns");
142-
println!("4. Default values from metadata are injected as cast expressions");
140+
println!("3. Our custom adapter uses replace_columns_with_literals to inject default values");
141+
println!("4. Default values from metadata are cast to proper types at planning time");
143142
println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations");
144143
println!("\nNote: PhysicalExprAdapter is specifically for filter predicates.");
145144
println!("For projection columns, different mechanisms handle missing columns.");
@@ -206,7 +205,7 @@ impl TableProvider for DefaultValueTableProvider {
206205
}
207206

208207
fn schema(&self) -> SchemaRef {
209-
self.schema.clone()
208+
Arc::clone(&self.schema)
210209
}
211210

212211
fn table_type(&self) -> TableType {
@@ -227,7 +226,7 @@ impl TableProvider for DefaultValueTableProvider {
227226
filters: &[Expr],
228227
limit: Option<usize>,
229228
) -> Result<Arc<dyn ExecutionPlan>> {
230-
let schema = self.schema.clone();
229+
let schema = Arc::clone(&self.schema);
231230
let df_schema = DFSchema::try_from(schema.clone())?;
232231
let filter = state.create_physical_expr(
233232
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
@@ -280,14 +279,15 @@ impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
280279
physical_file_schema: SchemaRef,
281280
) -> Arc<dyn PhysicalExprAdapter> {
282281
let default_factory = DefaultPhysicalExprAdapterFactory;
283-
let default_adapter = default_factory
284-
.create(logical_file_schema.clone(), physical_file_schema.clone());
282+
let default_adapter = default_factory.create(
283+
Arc::clone(&logical_file_schema),
284+
Arc::clone(&physical_file_schema),
285+
);
285286

286287
Arc::new(DefaultValuePhysicalExprAdapter {
287288
logical_file_schema,
288289
physical_file_schema,
289290
default_adapter,
290-
partition_values: Vec::new(),
291291
})
292292
}
293293
}
@@ -299,98 +299,36 @@ struct DefaultValuePhysicalExprAdapter {
299299
logical_file_schema: SchemaRef,
300300
physical_file_schema: SchemaRef,
301301
default_adapter: Arc<dyn PhysicalExprAdapter>,
302-
partition_values: Vec<(FieldRef, ScalarValue)>,
303302
}
304303

305304
impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
306305
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
307-
// First try our custom default value injection for missing columns
308-
let rewritten = expr
309-
.transform(|expr| {
310-
self.inject_default_values(
311-
expr,
312-
&self.logical_file_schema,
313-
&self.physical_file_schema,
314-
)
315-
})
316-
.data()?;
317-
318-
// Then apply the default adapter as a fallback to handle standard schema differences
319-
// like type casting, partition column handling, etc.
320-
let default_adapter = if !self.partition_values.is_empty() {
321-
self.default_adapter
322-
.with_partition_values(self.partition_values.clone())
323-
} else {
324-
self.default_adapter.clone()
325-
};
326-
327-
default_adapter.rewrite(rewritten)
328-
}
329-
330-
fn with_partition_values(
331-
&self,
332-
partition_values: Vec<(FieldRef, ScalarValue)>,
333-
) -> Arc<dyn PhysicalExprAdapter> {
334-
Arc::new(DefaultValuePhysicalExprAdapter {
335-
logical_file_schema: self.logical_file_schema.clone(),
336-
physical_file_schema: self.physical_file_schema.clone(),
337-
default_adapter: self.default_adapter.clone(),
338-
partition_values,
339-
})
340-
}
341-
}
342-
343-
impl DefaultValuePhysicalExprAdapter {
344-
fn inject_default_values(
345-
&self,
346-
expr: Arc<dyn PhysicalExpr>,
347-
logical_file_schema: &Schema,
348-
physical_file_schema: &Schema,
349-
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
350-
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
351-
let column_name = column.name();
352-
353-
// Check if this column exists in the physical schema
354-
if physical_file_schema.index_of(column_name).is_err() {
355-
// Column is missing from physical schema, check if logical schema has a default
356-
if let Ok(logical_field) =
357-
logical_file_schema.field_with_name(column_name)
358-
{
359-
if let Some(default_value_str) =
360-
logical_field.metadata().get(DEFAULT_VALUE_METADATA_KEY)
361-
{
362-
// Create a string literal and wrap it in a cast expression
363-
let default_literal = self.create_default_value_expr(
364-
default_value_str,
365-
logical_field.data_type(),
366-
)?;
367-
return Ok(Transformed::yes(default_literal));
368-
}
369-
}
306+
// Pre-compute replacements for missing columns with default values
307+
let mut replacements = HashMap::new();
308+
for field in self.logical_file_schema.fields() {
309+
// Skip columns that exist in physical schema
310+
if self.physical_file_schema.index_of(field.name()).is_ok() {
311+
continue;
370312
}
371-
}
372-
373-
// No transformation needed
374-
Ok(Transformed::no(expr))
375-
}
376313

377-
fn create_default_value_expr(
378-
&self,
379-
value_str: &str,
380-
data_type: &DataType,
381-
) -> Result<Arc<dyn PhysicalExpr>> {
382-
// Create a string literal with the default value
383-
let string_literal =
384-
Arc::new(Literal::new(ScalarValue::Utf8(Some(value_str.to_string()))));
385-
386-
// If the target type is already Utf8, return the string literal directly
387-
if matches!(data_type, DataType::Utf8) {
388-
return Ok(string_literal);
314+
// Check if this missing column has a default value in metadata
315+
if let Some(default_str) = field.metadata().get(DEFAULT_VALUE_METADATA_KEY) {
316+
// Create a Utf8 ScalarValue from the string and cast it to the target type
317+
let string_value = ScalarValue::Utf8(Some(default_str.to_string()));
318+
let typed_value = string_value.cast_to(field.data_type())?;
319+
replacements.insert(field.name().as_str(), typed_value);
320+
}
389321
}
390322

391-
// Otherwise, wrap the string literal in a cast expression
392-
let cast_expr = Arc::new(CastExpr::new(string_literal, data_type.clone(), None));
323+
// Replace columns with their default literals if any
324+
let rewritten = if !replacements.is_empty() {
325+
let refs: HashMap<_, _> = replacements.iter().map(|(k, v)| (*k, v)).collect();
326+
replace_columns_with_literals(expr, &refs)?
327+
} else {
328+
expr
329+
};
393330

394-
Ok(cast_expr)
331+
// Apply the default adapter as a fallback for other schema adaptations
332+
self.default_adapter.rewrite(rewritten)
395333
}
396334
}

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::any::Any;
2121
use std::sync::Arc;
2222

2323
use arrow::array::{RecordBatch, StringArray};
24-
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
24+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2525

2626
use datafusion::assert_batches_eq;
2727
use datafusion::common::tree_node::{
@@ -277,14 +277,14 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
277277
physical_file_schema: SchemaRef,
278278
) -> Arc<dyn PhysicalExprAdapter> {
279279
let default_factory = DefaultPhysicalExprAdapterFactory;
280-
let default_adapter = default_factory
281-
.create(logical_file_schema.clone(), physical_file_schema.clone());
280+
let default_adapter = default_factory.create(
281+
Arc::clone(&logical_file_schema),
282+
Arc::clone(&physical_file_schema),
283+
);
282284

283285
Arc::new(ShreddedJsonRewriter {
284-
logical_file_schema,
285286
physical_file_schema,
286287
default_adapter,
287-
partition_values: Vec::new(),
288288
})
289289
}
290290
}
@@ -293,10 +293,8 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
293293
/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation
294294
#[derive(Debug)]
295295
struct ShreddedJsonRewriter {
296-
logical_file_schema: SchemaRef,
297296
physical_file_schema: SchemaRef,
298297
default_adapter: Arc<dyn PhysicalExprAdapter>,
299-
partition_values: Vec<(FieldRef, ScalarValue)>,
300298
}
301299

302300
impl PhysicalExprAdapter for ShreddedJsonRewriter {
@@ -307,27 +305,8 @@ impl PhysicalExprAdapter for ShreddedJsonRewriter {
307305
.data()?;
308306

309307
// Then apply the default adapter as a fallback to handle standard schema differences
310-
// like type casting, missing columns, and partition column handling
311-
let default_adapter = if !self.partition_values.is_empty() {
312-
self.default_adapter
313-
.with_partition_values(self.partition_values.clone())
314-
} else {
315-
self.default_adapter.clone()
316-
};
317-
318-
default_adapter.rewrite(rewritten)
319-
}
320-
321-
fn with_partition_values(
322-
&self,
323-
partition_values: Vec<(FieldRef, ScalarValue)>,
324-
) -> Arc<dyn PhysicalExprAdapter> {
325-
Arc::new(ShreddedJsonRewriter {
326-
logical_file_schema: self.logical_file_schema.clone(),
327-
physical_file_schema: self.physical_file_schema.clone(),
328-
default_adapter: self.default_adapter.clone(),
329-
partition_values,
330-
})
308+
// like type casting and missing columns
309+
self.default_adapter.rewrite(rewritten)
331310
}
332311
}
333312

datafusion/common/src/pruning.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ pub trait PruningStatistics {
135135
/// This feeds into [`CompositePruningStatistics`] to allow pruning
136136
/// with filters that depend both on partition columns and data columns
137137
/// (e.g. `WHERE partition_col = data_col`).
138+
#[deprecated(
139+
since = "52.0.0",
140+
note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
141+
)]
138142
#[derive(Clone)]
139143
pub struct PartitionPruningStatistics {
140144
/// Values for each column for each container.
@@ -156,6 +160,7 @@ pub struct PartitionPruningStatistics {
156160
partition_schema: SchemaRef,
157161
}
158162

163+
#[expect(deprecated)]
159164
impl PartitionPruningStatistics {
160165
/// Create a new instance of [`PartitionPruningStatistics`].
161166
///
@@ -232,6 +237,7 @@ impl PartitionPruningStatistics {
232237
}
233238
}
234239

240+
#[expect(deprecated)]
235241
impl PruningStatistics for PartitionPruningStatistics {
236242
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
237243
let index = self.partition_schema.index_of(column.name()).ok()?;
@@ -439,10 +445,15 @@ impl PruningStatistics for PrunableStatistics {
439445
/// the first one is returned without any regard for completeness or accuracy.
440446
/// That is: if the first statistics has information for a column, even if it is incomplete,
441447
/// that is returned even if a later statistics has more complete information.
448+
#[deprecated(
449+
since = "52.0.0",
450+
note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
451+
)]
442452
pub struct CompositePruningStatistics {
443453
pub statistics: Vec<Box<dyn PruningStatistics>>,
444454
}
445455

456+
#[expect(deprecated)]
446457
impl CompositePruningStatistics {
447458
/// Create a new instance of [`CompositePruningStatistics`] from
448459
/// a vector of [`PruningStatistics`].
@@ -457,6 +468,7 @@ impl CompositePruningStatistics {
457468
}
458469
}
459470

471+
#[expect(deprecated)]
460472
impl PruningStatistics for CompositePruningStatistics {
461473
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
462474
for stats in &self.statistics {
@@ -513,6 +525,7 @@ impl PruningStatistics for CompositePruningStatistics {
513525
}
514526

515527
#[cfg(test)]
528+
#[expect(deprecated)]
516529
mod tests {
517530
use crate::{
518531
ColumnStatistics,

0 commit comments

Comments
 (0)