From 74416296509bb45c548ea78dc6b96bbaeab53df3 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Mon, 22 Jun 2026 16:37:53 +0800 Subject: [PATCH 1/3] fix: drop dynamic filter in hash-join when swap inputs --- .../physical-plan/src/joins/hash_join/exec.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bb9ebcd4e6191..3098365efaa6f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1095,6 +1095,7 @@ impl HashJoinExec { self.join_type(), )) .with_partition_mode(partition_mode) + .with_dynamic_filter(None) .build()?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( @@ -6610,6 +6611,63 @@ mod tests { Ok(()) } + #[test] + fn test_swap_inputs_clears_dynamic_filter() -> Result<()> { + let left = build_table( + ("l_key", &vec![1]), + ("l_payload", &vec![10]), + ("l_other", &vec![100]), + ); + let right = build_table( + ("r_payload", &vec![20]), + ("r_key", &vec![1]), + ("r_other", &vec![200]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("l_key", &left.schema())?) as _, + Arc::new(Column::new_with_schema("r_key", &right.schema())?) as _, + )]; + + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )? + .with_dynamic_filter_expr(dynamic_filter)?; + + let original_dynamic_filter = join + .dynamic_filter_expr() + .expect("join should start with a dynamic filter"); + let original_expression_id = original_dynamic_filter + .expression_id() + .expect("DynamicFilterPhysicalExpr always has an expression_id"); + + let swapped = join.swap_inputs(PartitionMode::CollectLeft)?; + let swapped_join = swapped + .downcast_ref::() + .expect("swapped join should be a HashJoinExec"); + + // HashJoinExecBuilder preserves dynamic_filter by default. swap_inputs + // must explicitly clear it, otherwise this would still be + // `Some(original_expression_id)` after the probe side has changed. + assert_ne!( + swapped_join.dynamic_filter_expr().map(|df| { + df.expression_id() + .expect("DynamicFilterPhysicalExpr always has an expression_id") + }), + Some(original_expression_id) + ); + assert!(swapped_join.dynamic_filter_expr().is_none()); + Ok(()) + } + #[test] fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> { let (_, _, on) = build_schema_and_on()?; From fc93043a4740aea76b0016f7a6cc019ba3cbd6da Mon Sep 17 00:00:00 2001 From: Huaijin Date: Mon, 22 Jun 2026 16:43:56 +0800 Subject: [PATCH 2/3] add test case --- .../tests/cases/roundtrip_physical_plan.rs | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..8620393baffe5 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3666,6 +3666,53 @@ fn test_hash_join_with_dynamic_filter_roundtrip() -> Result<()> { Ok(()) } +#[test] +fn test_hash_join_swap_clears_dynamic_filter_before_roundtrip() -> Result<()> { + let build_schema = Arc::new(Schema::new(vec![Field::new( + "build_key", + DataType::Int64, + false, + )])); + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("probe_payload_0", DataType::Int64, false), + Field::new("probe_payload_1", DataType::Int64, false), + Field::new("probe_key", DataType::Int64, false), + ])); + + let build_child = Arc::new(EmptyExec::new(Arc::clone(&build_schema))); + let probe_child = Arc::new(EmptyExec::new(Arc::clone(&probe_schema))); + let on: Vec<(Arc, Arc)> = vec![( + Arc::new(Column::new("build_key", 0)), + Arc::new(Column::new("probe_key", 2)), + )]; + + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("probe_key", 2))], + lit(true), + )); + let join = HashJoinExec::try_new( + build_child, + probe_child, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )? + .with_dynamic_filter_expr(dynamic_filter)?; + + let swapped = join.swap_inputs(PartitionMode::CollectLeft)?; + + // If HashJoinExec::swap_inputs keeps the old dynamic filter, the swapped + // join still serializes a filter whose child is `probe_key@2`. During + // deserialization, HashJoinExec validates that filter against the new + // right/probe schema (`build_key` only), and fails with an out-of-bounds + // column reference. + roundtrip_test(swapped) +} + /// returns a SessionContext with an empty `netflow` table registered fn netflow_context() -> Result { let ctx = SessionContext::new(); From 1d85fc792dbe4e16141c4922554f3de55d6c7f66 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 23 Jun 2026 16:21:20 +0800 Subject: [PATCH 3/3] update --- .../physical-plan/src/joins/hash_join/exec.rs | 35 +++++--------- .../tests/cases/roundtrip_physical_plan.rs | 47 ------------------- 2 files changed, 11 insertions(+), 71 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 3098365efaa6f..1f62577b2d427 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1075,6 +1075,12 @@ impl HashJoinExec { &self, partition_mode: PartitionMode, ) -> Result> { + assert_or_internal_err!( + self.dynamic_filter.is_none(), + "Cannot swap HashJoinExec inputs after dynamic filters have been constructed. \ + Optimizer rules that reorder join inputs must run before optimizer rules `FilterPushdown::new_post_optimization()`" + ); + let left = self.left(); let right = self.right(); let new_join = self @@ -1095,7 +1101,6 @@ impl HashJoinExec { self.join_type(), )) .with_partition_mode(partition_mode) - .with_dynamic_filter(None) .build()?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( @@ -6612,7 +6617,7 @@ mod tests { } #[test] - fn test_swap_inputs_clears_dynamic_filter() -> Result<()> { + fn test_swap_inputs_rejects_dynamic_filter() -> Result<()> { let left = build_table( ("l_key", &vec![1]), ("l_payload", &vec![10]), @@ -6642,29 +6647,11 @@ mod tests { )? .with_dynamic_filter_expr(dynamic_filter)?; - let original_dynamic_filter = join - .dynamic_filter_expr() - .expect("join should start with a dynamic filter"); - let original_expression_id = original_dynamic_filter - .expression_id() - .expect("DynamicFilterPhysicalExpr always has an expression_id"); - - let swapped = join.swap_inputs(PartitionMode::CollectLeft)?; - let swapped_join = swapped - .downcast_ref::() - .expect("swapped join should be a HashJoinExec"); - - // HashJoinExecBuilder preserves dynamic_filter by default. swap_inputs - // must explicitly clear it, otherwise this would still be - // `Some(original_expression_id)` after the probe side has changed. - assert_ne!( - swapped_join.dynamic_filter_expr().map(|df| { - df.expression_id() - .expect("DynamicFilterPhysicalExpr always has an expression_id") - }), - Some(original_expression_id) + let err = join.swap_inputs(PartitionMode::CollectLeft).unwrap_err(); + assert_contains!( + err.to_string(), + "Cannot swap HashJoinExec inputs after dynamic filters have been constructed" ); - assert!(swapped_join.dynamic_filter_expr().is_none()); Ok(()) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8620393baffe5..8e80467788598 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3666,53 +3666,6 @@ fn test_hash_join_with_dynamic_filter_roundtrip() -> Result<()> { Ok(()) } -#[test] -fn test_hash_join_swap_clears_dynamic_filter_before_roundtrip() -> Result<()> { - let build_schema = Arc::new(Schema::new(vec![Field::new( - "build_key", - DataType::Int64, - false, - )])); - let probe_schema = Arc::new(Schema::new(vec![ - Field::new("probe_payload_0", DataType::Int64, false), - Field::new("probe_payload_1", DataType::Int64, false), - Field::new("probe_key", DataType::Int64, false), - ])); - - let build_child = Arc::new(EmptyExec::new(Arc::clone(&build_schema))); - let probe_child = Arc::new(EmptyExec::new(Arc::clone(&probe_schema))); - let on: Vec<(Arc, Arc)> = vec![( - Arc::new(Column::new("build_key", 0)), - Arc::new(Column::new("probe_key", 2)), - )]; - - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( - vec![Arc::new(Column::new("probe_key", 2))], - lit(true), - )); - let join = HashJoinExec::try_new( - build_child, - probe_child, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )? - .with_dynamic_filter_expr(dynamic_filter)?; - - let swapped = join.swap_inputs(PartitionMode::CollectLeft)?; - - // If HashJoinExec::swap_inputs keeps the old dynamic filter, the swapped - // join still serializes a filter whose child is `probe_key@2`. During - // deserialization, HashJoinExec validates that filter against the new - // right/probe schema (`build_key` only), and fails with an out-of-bounds - // column reference. - roundtrip_test(swapped) -} - /// returns a SessionContext with an empty `netflow` table registered fn netflow_context() -> Result { let ctx = SessionContext::new();