Skip to content

Commit e335907

Browse files
committed
fix: PushDownFilter for GROUP BY on uppercase col names
1 parent 183ff66 commit e335907

File tree

2 files changed

+109
-1
lines changed

2 files changed

+109
-1
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,11 @@ impl OptimizerRule for PushDownFilter {
978978
let group_expr_columns = agg
979979
.group_expr
980980
.iter()
981-
.map(|e| Ok(Column::from_qualified_name(e.schema_name().to_string())))
981+
.map(|e| {
982+
Ok(Column::from_qualified_name_ignore_case(
983+
e.schema_name().to_string(),
984+
))
985+
})
982986
.collect::<Result<HashSet<_>>>()?;
983987

984988
let predicates = split_conjunction_owned(filter.predicate);
@@ -4160,4 +4164,55 @@ mod tests {
41604164
"
41614165
)
41624166
}
4167+
4168+
/// Create a test table scan with uppercase column names for case sensitivity testing
4169+
fn test_table_scan_with_uppercase_columns() -> Result<LogicalPlan> {
4170+
let schema = Schema::new(vec![
4171+
Field::new("a", DataType::UInt32, false),
4172+
Field::new("A", DataType::UInt32, false),
4173+
Field::new("B", DataType::UInt32, false),
4174+
Field::new("C", DataType::UInt32, false),
4175+
]);
4176+
table_scan(Some("test"), &schema, None)?.build()
4177+
}
4178+
4179+
#[test]
4180+
fn filter_agg_case_insensitive() -> Result<()> {
4181+
let table_scan = test_table_scan_with_uppercase_columns()?;
4182+
let plan = LogicalPlanBuilder::from(table_scan)
4183+
.aggregate(
4184+
vec![col(r#""A""#)],
4185+
vec![sum(col(r#""B""#)).alias("total_salary")],
4186+
)?
4187+
.filter(col(r#""A""#).gt(lit(10i64)))?
4188+
.build()?;
4189+
4190+
assert_optimized_plan_equal!(
4191+
plan,
4192+
@r"
4193+
Aggregate: groupBy=[[test.A]], aggr=[[sum(test.B) AS total_salary]]
4194+
TableScan: test, full_filters=[test.A > Int64(10)]
4195+
"
4196+
)
4197+
}
4198+
4199+
#[test]
4200+
fn filter_agg_mix_case_insensitive() -> Result<()> {
4201+
let table_scan = test_table_scan_with_uppercase_columns()?;
4202+
let plan = LogicalPlanBuilder::from(table_scan)
4203+
.aggregate(
4204+
vec![col("a")],
4205+
vec![sum(col(r#""B""#)).alias("total_salary")],
4206+
)?
4207+
.filter(col("a").gt(lit(10i64)))?
4208+
.build()?;
4209+
4210+
assert_optimized_plan_equal!(
4211+
plan,
4212+
@r"
4213+
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.B) AS total_salary]]
4214+
TableScan: test, full_filters=[test.a > Int64(10)]
4215+
"
4216+
)
4217+
}
41634218
}

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,56 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/
288288

289289
statement ok
290290
drop table t;
291+
292+
statement ok
293+
create table test_uppercase_cols (a int, "A" int, "B" int, "C" int);
294+
295+
# statement ok
296+
# set datafusion.explain.physical_plan_only = false;
297+
298+
# Turn off the optimizer to make the logical plan closer to the initial one
299+
# statement ok
300+
# set datafusion.optimizer.max_passes = 0;
301+
302+
# test push down through aggregate for uppercase column name
303+
query TT
304+
explain
305+
select "A", total_salary
306+
from (
307+
select "A", sum("B") as total_salary from test_uppercase_cols group by "A"
308+
)
309+
where "A" > 10;
310+
----
311+
physical_plan
312+
01)ProjectionExec: expr=[A@0 as A, sum(test_uppercase_cols.B)@1 as total_salary]
313+
02)--AggregateExec: mode=FinalPartitioned, gby=[A@0 as A], aggr=[sum(test_uppercase_cols.B)]
314+
03)----CoalesceBatchesExec: target_batch_size=8192
315+
04)------RepartitionExec: partitioning=Hash([A@0], 4), input_partitions=4
316+
05)--------AggregateExec: mode=Partial, gby=[A@0 as A], aggr=[sum(test_uppercase_cols.B)]
317+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
318+
07)------------CoalesceBatchesExec: target_batch_size=8192
319+
08)--------------FilterExec: A@0 > 10
320+
09)----------------DataSourceExec: partitions=1, partition_sizes=[0]
321+
322+
# test push down through aggregate for mix of lowercase and uppercase column names
323+
query TT
324+
explain
325+
select a, total_salary
326+
from (
327+
select a, sum("B") as total_salary from test_uppercase_cols group by a
328+
)
329+
where a > 10;
330+
----
331+
physical_plan
332+
01)ProjectionExec: expr=[a@0 as a, sum(test_uppercase_cols.B)@1 as total_salary]
333+
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[sum(test_uppercase_cols.B)]
334+
03)----CoalesceBatchesExec: target_batch_size=8192
335+
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
336+
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[sum(test_uppercase_cols.B)]
337+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
338+
07)------------CoalesceBatchesExec: target_batch_size=8192
339+
08)--------------FilterExec: a@0 > 10
340+
09)----------------DataSourceExec: partitions=1, partition_sizes=[0]
341+
342+
statement ok
343+
drop table test_uppercase_cols;

0 commit comments

Comments
 (0)