diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ca8dfa431b4f5..73950b961e8ba 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -828,6 +828,27 @@ impl Unparser<'_> { Some(plan_alias.alias.clone()), select.already_projected(), )?; + + // If the SubqueryAlias directly wraps a plan that builds its + // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, + // we must emit a derived subquery: (SELECT ...) AS alias. + // Without this, the recursive handler would merge those clauses + // into the outer SELECT, losing the subquery structure entirely. + if unparsed_table_scan.is_none() + && Self::requires_derived_subquery(plan_alias.input.as_ref()) + { + return self.derive( + &plan_alias.input, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + columns, + )), + false, + ); + } + // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it if !select.already_projected() && unparsed_table_scan.is_none() { @@ -1060,6 +1081,22 @@ impl Unparser<'_> { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } + /// Returns true if a plan, when used as the direct child of a SubqueryAlias, + /// must be emitted as a derived subquery `(SELECT ...) AS alias`. + /// + /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, + /// window functions). + fn requires_derived_subquery(plan: &LogicalPlan) -> bool { + matches!( + plan, + LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Union(_) + ) + } + /// Try to unparse a table scan with pushdown operations into a new subquery plan. /// If the table scan is without any pushdown operations, return None. fn unparse_table_scan_pushdown( diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index aefb404ba4106..24f9226636455 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -23,7 +23,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{WindowFunction, WindowFunctionParams}; use datafusion_expr::test::function_stub::{ - count_udaf, max_udaf, min_udaf, sum, sum_udaf, + count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, @@ -2893,3 +2893,55 @@ fn test_json_access_3() { @r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"# ); } + +/// Test that unparsing a manually constructed join with a subquery aggregate +/// preserves the MAX aggregate function. +/// +/// Builds the equivalent of: +/// SELECT j1.j1_string FROM j1 +/// JOIN (SELECT max(j2_id) AS max_id FROM j2) AS b +/// ON j1.j1_id = b.max_id +#[test] +fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + let j2_schema = context + .get_table_source(TableReference::bare("j2"))? + .schema(); + + // Build the right side: SELECT max(j2_id) AS max_id FROM j2 + let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; + let right_agg = LogicalPlanBuilder::from(right_scan) + .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .build()?; + let right_subquery = subquery_alias(right_agg, "b")?; + + // Build the full plan: SELECT j1.j1_string FROM j1 JOIN (...) AS b ON j1.j1_id = b.max_id + let left_scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(left_scan) + .join( + right_subquery, + datafusion_expr::JoinType::Inner, + ( + vec![Column::from_qualified_name("j1.j1_id")], + vec![Column::from_qualified_name("b.max_id")], + ), + None, + )? + .project(vec![col("j1.j1_string")])? + .build()?; + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + let sql_upper = sql.to_uppercase(); + assert!( + sql_upper.contains("MAX("), + "Unparsed SQL should preserve the MAX aggregate function call, got: {sql}" + ); + + Ok(()) +}