diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bb9ebcd4e6191..1f62577b2d427 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1075,6 +1075,12 @@ impl HashJoinExec { &self, partition_mode: PartitionMode, ) -> Result> { + assert_or_internal_err!( + self.dynamic_filter.is_none(), + "Cannot swap HashJoinExec inputs after dynamic filters have been constructed. \ + Optimizer rules that reorder join inputs must run before optimizer rules `FilterPushdown::new_post_optimization()`" + ); + let left = self.left(); let right = self.right(); let new_join = self @@ -6610,6 +6616,45 @@ mod tests { Ok(()) } + #[test] + fn test_swap_inputs_rejects_dynamic_filter() -> Result<()> { + let left = build_table( + ("l_key", &vec![1]), + ("l_payload", &vec![10]), + ("l_other", &vec![100]), + ); + let right = build_table( + ("r_payload", &vec![20]), + ("r_key", &vec![1]), + ("r_other", &vec![200]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("l_key", &left.schema())?) as _, + Arc::new(Column::new_with_schema("r_key", &right.schema())?) as _, + )]; + + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )? + .with_dynamic_filter_expr(dynamic_filter)?; + + let err = join.swap_inputs(PartitionMode::CollectLeft).unwrap_err(); + assert_contains!( + err.to_string(), + "Cannot swap HashJoinExec inputs after dynamic filters have been constructed" + ); + Ok(()) + } + #[test] fn test_dynamic_filter_pushdown_rejects_null_equal_join() -> Result<()> { let (_, _, on) = build_schema_and_on()?;