Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,16 +1498,15 @@ pub trait PhysicalPlanNodeExt: Sized {
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
protobuf::PartitionMode::Auto => PartitionMode::Auto,
};
let projection = if !hashjoin.projection.is_empty() {
Some(
hashjoin
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
None
// Proto3 `repeated` cannot distinguish `None` from `Some(vec![])`. The latter
// is reachable via `try_embed_projection` for `SELECT count(1) … JOIN …` and
// changes the join's output schema, so the encoder reserves the single-element
// sentinel `[u32::MAX]` (never a valid column index) to mean "explicitly empty";
// every other state is sent as-is. See `try_from_hash_join_exec`.
let projection = match hashjoin.projection.as_slice() {
[] => None,
[u32::MAX] => Some(Vec::new()),
indices => Some(indices.iter().map(|i| *i as usize).collect()),
};
let mut hash_join = HashJoinExec::try_new(
left,
Expand Down Expand Up @@ -1925,15 +1924,13 @@ pub trait PhysicalPlanNodeExt: Sized {
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;

let projection = if !join.projection.is_empty() {
Some(
join.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
None
// See `try_into_hash_join_physical_plan` for the rationale behind the
// `[u32::MAX]` sentinel; `NestedLoopJoinExec` has the same `Option<Vec<usize>>`
// projection field and shares the proto3 `repeated` ambiguity.
let projection = match join.projection.as_slice() {
[] => None,
[u32::MAX] => Some(Vec::new()),
indices => Some(indices.iter().map(|i| *i as usize).collect()),
};

Ok(Arc::new(NestedLoopJoinExec::try_new(
Expand Down Expand Up @@ -2658,9 +2655,14 @@ pub trait PhysicalPlanNodeExt: Sized {
partition_mode: partition_mode.into(),
null_equality: null_equality.into(),
filter,
projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
// Send `Some(vec![])` as `[u32::MAX]` (never a valid index) so the
// wire format can distinguish it from `None` (which stays empty).
// See `try_into_hash_join_physical_plan` for the matching decoder.
projection: match exec.projection.as_ref() {
None => Vec::new(),
Some(v) if v.is_empty() => vec![u32::MAX],
Some(v) => v.iter().map(|x| *x as u32).collect(),
},
null_aware: exec.null_aware,
dynamic_filter,
},
Expand Down Expand Up @@ -3449,9 +3451,13 @@ pub trait PhysicalPlanNodeExt: Sized {
right: Some(Box::new(right)),
join_type: join_type.into(),
filter,
projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
// `[u32::MAX]` sentinel distinguishes `Some(vec![])` from `None`;
// see `try_from_hash_join_exec`.
projection: match exec.projection().as_ref() {
None => Vec::new(),
Some(v) if v.is_empty() => vec![u32::MAX],
Some(v) => v.iter().map(|x| *x as u32).collect(),
},
},
))),
})
Expand Down
53 changes: 53 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,59 @@ fn roundtrip_nested_loop_join() -> Result<()> {
Ok(())
}

/// Regression: proto3 `repeated` fields cannot distinguish "absent" from "empty",
/// so a naive encoding collapses `Some(vec![])` and `None` into the same wire
/// representation. `try_embed_projection` (DataFusion 53+) produces
/// `HashJoinExec.projection = Some(vec![])` for `SELECT count(1) … JOIN …`,
/// which previously round-tripped to `None` and caused downstream consumers (e.g.
/// distributed Flight executors) to receive a different number of output
/// columns than the planner declared. Verify all three states preserve.
#[test]
fn roundtrip_hash_join_projection_states() -> Result<()> {
let field_a = Field::new("col", DataType::Int64, false);
let schema_left = Arc::new(Schema::new(vec![field_a.clone()]));
let schema_right = Arc::new(Schema::new(vec![field_a]));
let on = vec![(
Arc::new(Column::new("col", schema_left.index_of("col")?)) as _,
Arc::new(Column::new("col", schema_right.index_of("col")?)) as _,
)];

for projection in [None, Some(vec![]), Some(vec![0]), Some(vec![1])] {
roundtrip_test(Arc::new(HashJoinExec::try_new(
Arc::new(EmptyExec::new(schema_left.clone())),
Arc::new(EmptyExec::new(schema_right.clone())),
on.clone(),
None,
&JoinType::Inner,
projection,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?))?;
}
Ok(())
}

/// Same regression coverage for `NestedLoopJoinExec`, which shares the
/// `repeated uint32 projection` proto field shape with `HashJoinExec`.
#[test]
fn roundtrip_nested_loop_join_projection_states() -> Result<()> {
let field_a = Field::new("col", DataType::Int64, false);
let schema_left = Arc::new(Schema::new(vec![field_a.clone()]));
let schema_right = Arc::new(Schema::new(vec![field_a]));

for projection in [None, Some(vec![]), Some(vec![0]), Some(vec![1])] {
roundtrip_test(Arc::new(NestedLoopJoinExec::try_new(
Arc::new(EmptyExec::new(schema_left.clone())),
Arc::new(EmptyExec::new(schema_right.clone())),
None,
&JoinType::Inner,
projection,
)?))?;
}
Ok(())
}

#[test]
fn roundtrip_udwf() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
Expand Down
Loading