Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ impl HashJoinExec {
self.join_type(),
))
.with_partition_mode(partition_mode)
.with_dynamic_filter(None)
.build()?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
Expand Down Expand Up @@ -6610,6 +6611,63 @@ mod tests {
Ok(())
}

#[test]
fn test_swap_inputs_clears_dynamic_filter() -> Result<()> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an end-to-end test that explains why this behavior is desired — that swap_inputs() should drop the dynamic filter? It’s a bit hard to infer the test goal from this unit test alone.

It should be some SQL queries that used to have bug in results, but enforcing this fix make them correct.

@haohuaijin haohuaijin Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i try it, but not easy to reproduce with the sql. this bug is trigger when ser/de the physical plan fc93043 (you can check this test case for more detail). i will give more try to reproduce this by sql.

@2010YOUY01 2010YOUY01 Jun 23, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering: are you using swap_inputs() directly downstream? 🤔

I think the root cause is that swap_inputs() has not been clearly specified. It implicitly assumes a bunch of preconditions, so it is only mostly safe to use internally when we strictly follow the optimizer rule order.

However, it is now a public API, so downstream usages may trigger subtle bugs if users are unaware of those preconditions.

See the optimizer rule order below. Internally, swap_inputs() is only called inside that optimizer rule, so we can ensure it is called only when dynamic_filter is None:

Arc::new(JoinSelection::new()),

I tried adding assert!(self.dynamic_filter.is_none()) at the beginning of HashJoinExec::swap_inputs(), and the internal tests still pass.

So I suggest we discuss separately what extra contract you need for downstream usage, and then extend swap_inputs() accordingly. (and also add the above assertions to prevent similar issues)

let left = build_table(
("l_key", &vec![1]),
("l_payload", &vec![10]),
("l_other", &vec![100]),
);
let right = build_table(
("r_payload", &vec![20]),
("r_key", &vec![1]),
("r_other", &vec![200]),
);
let on = vec![(
Arc::new(Column::new_with_schema("l_key", &left.schema())?) as _,
Arc::new(Column::new_with_schema("r_key", &right.schema())?) as _,
)];

let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::LeftSemi,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?
.with_dynamic_filter_expr(dynamic_filter)?;

let original_dynamic_filter = join
.dynamic_filter_expr()
.expect("join should start with a dynamic filter");
let original_expression_id = original_dynamic_filter
.expression_id()
.expect("DynamicFilterPhysicalExpr always has an expression_id");

let swapped = join.swap_inputs(PartitionMode::CollectLeft)?;
let swapped_join = swapped
.downcast_ref::<HashJoinExec>()
.expect("swapped join should be a HashJoinExec");

// HashJoinExecBuilder preserves dynamic_filter by default. swap_inputs
// must explicitly clear it, otherwise this would still be
// `Some(original_expression_id)` after the probe side has changed.
assert_ne!(
swapped_join.dynamic_filter_expr().map(|df| {
df.expression_id()
.expect("DynamicFilterPhysicalExpr always has an expression_id")
}),
Some(original_expression_id)
);
assert!(swapped_join.dynamic_filter_expr().is_none());
Ok(())
}

#[test]
fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> {
let (_, _, on) = build_schema_and_on()?;
Expand Down
47 changes: 47 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3666,6 +3666,53 @@ fn test_hash_join_with_dynamic_filter_roundtrip() -> Result<()> {
Ok(())
}

#[test]
fn test_hash_join_swap_clears_dynamic_filter_before_roundtrip() -> Result<()> {
let build_schema = Arc::new(Schema::new(vec![Field::new(
"build_key",
DataType::Int64,
false,
)]));
let probe_schema = Arc::new(Schema::new(vec![
Field::new("probe_payload_0", DataType::Int64, false),
Field::new("probe_payload_1", DataType::Int64, false),
Field::new("probe_key", DataType::Int64, false),
]));

let build_child = Arc::new(EmptyExec::new(Arc::clone(&build_schema)));
let probe_child = Arc::new(EmptyExec::new(Arc::clone(&probe_schema)));
let on: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = vec![(
Arc::new(Column::new("build_key", 0)),
Arc::new(Column::new("probe_key", 2)),
)];

let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("probe_key", 2))],
lit(true),
));
let join = HashJoinExec::try_new(
build_child,
probe_child,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?
.with_dynamic_filter_expr(dynamic_filter)?;

let swapped = join.swap_inputs(PartitionMode::CollectLeft)?;

// If HashJoinExec::swap_inputs keeps the old dynamic filter, the swapped
// join still serializes a filter whose child is `probe_key@2`. During
// deserialization, HashJoinExec validates that filter against the new
// right/probe schema (`build_key` only), and fails with an out-of-bounds
// column reference.
roundtrip_test(swapped)
}

/// returns a SessionContext with an empty `netflow` table registered
fn netflow_context() -> Result<SessionContext> {
let ctx = SessionContext::new();
Expand Down
Loading