diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 9efcd25fcb412..a209be331ff41 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1498,16 +1498,15 @@ pub trait PhysicalPlanNodeExt: Sized { protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned, protobuf::PartitionMode::Auto => PartitionMode::Auto, }; - let projection = if !hashjoin.projection.is_empty() { - Some( - hashjoin - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { - None + // Proto3 `repeated` cannot distinguish `None` from `Some(vec![])`. The latter + // is reachable via `try_embed_projection` for `SELECT count(1) … JOIN …` and + // changes the join's output schema, so the encoder reserves the single-element + // sentinel `[u32::MAX]` (never a valid column index) to mean "explicitly empty"; + // every other state is sent as-is. See `try_from_hash_join_exec`. + let projection = match hashjoin.projection.as_slice() { + [] => None, + [u32::MAX] => Some(Vec::new()), + indices => Some(indices.iter().map(|i| *i as usize).collect()), }; let mut hash_join = HashJoinExec::try_new( left, @@ -1925,15 +1924,13 @@ pub trait PhysicalPlanNodeExt: Sized { }) .map_or(Ok(None), |v: Result| v.map(Some))?; - let projection = if !join.projection.is_empty() { - Some( - join.projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { - None + // See `try_into_hash_join_physical_plan` for the rationale behind the + // `[u32::MAX]` sentinel; `NestedLoopJoinExec` has the same `Option>` + // projection field and shares the proto3 `repeated` ambiguity. + let projection = match join.projection.as_slice() { + [] => None, + [u32::MAX] => Some(Vec::new()), + indices => Some(indices.iter().map(|i| *i as usize).collect()), }; Ok(Arc::new(NestedLoopJoinExec::try_new( @@ -2658,9 +2655,14 @@ pub trait PhysicalPlanNodeExt: Sized { partition_mode: partition_mode.into(), null_equality: null_equality.into(), filter, - projection: exec.projection.as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + // Send `Some(vec![])` as `[u32::MAX]` (never a valid index) so the + // wire format can distinguish it from `None` (which stays empty). + // See `try_into_hash_join_physical_plan` for the matching decoder. + projection: match exec.projection.as_ref() { + None => Vec::new(), + Some(v) if v.is_empty() => vec![u32::MAX], + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, null_aware: exec.null_aware, dynamic_filter, }, @@ -3449,9 +3451,13 @@ pub trait PhysicalPlanNodeExt: Sized { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + // `[u32::MAX]` sentinel distinguishes `Some(vec![])` from `None`; + // see `try_from_hash_join_exec`. + projection: match exec.projection().as_ref() { + None => Vec::new(), + Some(v) if v.is_empty() => vec![u32::MAX], + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..0e8ad638e9e85 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -348,6 +348,59 @@ fn roundtrip_nested_loop_join() -> Result<()> { Ok(()) } +/// Regression: proto3 `repeated` fields cannot distinguish "absent" from "empty", +/// so a naive encoding collapses `Some(vec![])` and `None` into the same wire +/// representation. `try_embed_projection` (DataFusion 53+) produces +/// `HashJoinExec.projection = Some(vec![])` for `SELECT count(1) … JOIN …`, +/// which previously round-tripped to `None` and caused downstream consumers (e.g. +/// distributed Flight executors) to receive a different number of output +/// columns than the planner declared. Verify all three states preserve. +#[test] +fn roundtrip_hash_join_projection_states() -> Result<()> { + let field_a = Field::new("col", DataType::Int64, false); + let schema_left = Arc::new(Schema::new(vec![field_a.clone()])); + let schema_right = Arc::new(Schema::new(vec![field_a])); + let on = vec![( + Arc::new(Column::new("col", schema_left.index_of("col")?)) as _, + Arc::new(Column::new("col", schema_right.index_of("col")?)) as _, + )]; + + for projection in [None, Some(vec![]), Some(vec![0]), Some(vec![1])] { + roundtrip_test(Arc::new(HashJoinExec::try_new( + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), + on.clone(), + None, + &JoinType::Inner, + projection, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + false, + )?))?; + } + Ok(()) +} + +/// Same regression coverage for `NestedLoopJoinExec`, which shares the +/// `repeated uint32 projection` proto field shape with `HashJoinExec`. +#[test] +fn roundtrip_nested_loop_join_projection_states() -> Result<()> { + let field_a = Field::new("col", DataType::Int64, false); + let schema_left = Arc::new(Schema::new(vec![field_a.clone()])); + let schema_right = Arc::new(Schema::new(vec![field_a])); + + for projection in [None, Some(vec![]), Some(vec![0]), Some(vec![1])] { + roundtrip_test(Arc::new(NestedLoopJoinExec::try_new( + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), + None, + &JoinType::Inner, + projection, + )?))?; + } + Ok(()) +} + #[test] fn roundtrip_udwf() -> Result<()> { let field_a = Field::new("a", DataType::Int64, false);