Skip to content

Commit 215cdff

Browse files
Coalesce batches inside hash join, reuse indices buffer (#18972)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18781 ## Rationale for this change Improves the plan / plan readability for queries with joins. Seems to improve perf as well a bit for more challenging joins: (TPC-H SF=10, in-memory) ``` │ QQuery 21 │ 7948.54 ms │ 6340.74 ms │ +1.25x faster │ │ QQuery 22 │ 58.30 ms │ 49.86 ms │ +1.17x faster │ ``` <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Marko Milenković <milenkovicm@users.noreply.github.com>
1 parent 998f534 commit 215cdff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1390
-1483
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 85 additions & 89 deletions
Large diffs are not rendered by default.

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,10 @@ impl JoinFuzzTestCase {
974974
if join_tests.contains(&NljHj) {
975975
let err_msg_rowcnt = format!("NestedLoopJoinExec and HashJoinExec produced different row counts, batch_size: {batch_size}");
976976
assert_eq!(nlj_rows, hj_rows, "{}", err_msg_rowcnt.as_str());
977+
if nlj_rows == 0 && hj_rows == 0 {
978+
// both joins returned no rows, skip content comparison
979+
continue;
980+
}
977981

978982
let err_msg_contents = format!("NestedLoopJoinExec and HashJoinExec produced different results, batch_size: {batch_size}");
979983
// row level compare if any of joins returns the result

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -812,14 +812,13 @@ async fn test_physical_plan_display_indent_multi_children() {
812812
assert_snapshot!(
813813
actual,
814814
@r"
815-
CoalesceBatchesExec: target_batch_size=4096
816-
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
817-
CoalesceBatchesExec: target_batch_size=4096
818-
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
819-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
820-
CoalesceBatchesExec: target_batch_size=4096
821-
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
822-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
815+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
816+
CoalesceBatchesExec: target_batch_size=4096
817+
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
818+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
819+
CoalesceBatchesExec: target_batch_size=4096
820+
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
821+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
823822
"
824823
);
825824
}

datafusion/physical-optimizer/src/coalesce_batches.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion_common::error::Result;
2727
use datafusion_physical_expr::Partitioning;
2828
use datafusion_physical_plan::{
2929
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
30-
joins::HashJoinExec, repartition::RepartitionExec, ExecutionPlan,
30+
repartition::RepartitionExec, ExecutionPlan,
3131
};
3232

3333
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -57,17 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
5757
let target_batch_size = config.execution.batch_size;
5858
plan.transform_up(|plan| {
5959
let plan_any = plan.as_any();
60-
let wrap_in_coalesce = plan_any.downcast_ref::<HashJoinExec>().is_some()
60+
let wrap_in_coalesce = plan_any
6161
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
62-
|| plan_any
63-
.downcast_ref::<RepartitionExec>()
64-
.map(|repart_exec| {
65-
!matches!(
66-
repart_exec.partitioning().clone(),
67-
Partitioning::RoundRobinBatch(_)
68-
)
69-
})
70-
.unwrap_or(false);
62+
.downcast_ref::<RepartitionExec>()
63+
.map(|repart_exec| {
64+
!matches!(
65+
repart_exec.partitioning().clone(),
66+
Partitioning::RoundRobinBatch(_)
67+
)
68+
})
69+
.unwrap_or(false);
7170

7271
if wrap_in_coalesce {
7372
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,12 @@ mod tests {
19361936
div_ceil(9, batch_size)
19371937
};
19381938

1939-
assert_eq!(batches.len(), expected_batch_count);
1939+
// With batch coalescing, we may have fewer batches than expected
1940+
assert!(
1941+
batches.len() <= expected_batch_count,
1942+
"expected at most {expected_batch_count} batches, got {}",
1943+
batches.len()
1944+
);
19401945

19411946
// Inner join output is expected to preserve both inputs order
19421947
allow_duplicates! {
@@ -2016,7 +2021,12 @@ mod tests {
20162021
div_ceil(9, batch_size)
20172022
};
20182023

2019-
assert_eq!(batches.len(), expected_batch_count);
2024+
// With batch coalescing, we may have fewer batches than expected
2025+
assert!(
2026+
batches.len() <= expected_batch_count,
2027+
"expected at most {expected_batch_count} batches, got {}",
2028+
batches.len()
2029+
);
20202030

20212031
// Inner join output is expected to preserve both inputs order
20222032
allow_duplicates! {
@@ -2152,7 +2162,12 @@ mod tests {
21522162
// and filtered later.
21532163
div_ceil(6, batch_size)
21542164
};
2155-
assert_eq!(batches.len(), expected_batch_count);
2165+
// With batch coalescing, we may have fewer batches than expected
2166+
assert!(
2167+
batches.len() <= expected_batch_count,
2168+
"expected at most {expected_batch_count} batches, got {}",
2169+
batches.len()
2170+
);
21562171

21572172
// Inner join output is expected to preserve both inputs order
21582173
allow_duplicates! {
@@ -2177,7 +2192,12 @@ mod tests {
21772192
// and filtered later.
21782193
div_ceil(3, batch_size)
21792194
};
2180-
assert_eq!(batches.len(), expected_batch_count);
2195+
// With batch coalescing, we may have fewer batches than expected
2196+
assert!(
2197+
batches.len() <= expected_batch_count,
2198+
"expected at most {expected_batch_count} batches, got {}",
2199+
batches.len()
2200+
);
21812201

21822202
// Inner join output is expected to preserve both inputs order
21832203
allow_duplicates! {
@@ -3467,6 +3487,8 @@ mod tests {
34673487
let mut hashes_buffer = vec![0; right.num_rows()];
34683488
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
34693489

3490+
let mut probe_indices_buffer = Vec::new();
3491+
let mut build_indices_buffer = Vec::new();
34703492
let (l, r, _) = lookup_join_hashmap(
34713493
&join_hash_map,
34723494
&[left_keys_values],
@@ -3475,6 +3497,8 @@ mod tests {
34753497
&hashes_buffer,
34763498
8192,
34773499
(0, None),
3500+
&mut probe_indices_buffer,
3501+
&mut build_indices_buffer,
34783502
)?;
34793503

34803504
let left_ids: UInt64Array = vec![0, 1].into();
@@ -3524,6 +3548,8 @@ mod tests {
35243548
let mut hashes_buffer = vec![0; right.num_rows()];
35253549
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;
35263550

3551+
let mut probe_indices_buffer = Vec::new();
3552+
let mut build_indices_buffer = Vec::new();
35273553
let (l, r, _) = lookup_join_hashmap(
35283554
&join_hash_map,
35293555
&[left_keys_values],
@@ -3532,6 +3558,8 @@ mod tests {
35323558
&hashes_buffer,
35333559
8192,
35343560
(0, None),
3561+
&mut probe_indices_buffer,
3562+
&mut build_indices_buffer,
35353563
)?;
35363564

35373565
// We still expect to match rows 0 and 1 on both sides
@@ -4197,10 +4225,11 @@ mod tests {
41974225
}
41984226
_ => div_ceil(expected_resultset_records, batch_size) + 1,
41994227
};
4200-
assert_eq!(
4201-
batches.len(),
4202-
expected_batch_count,
4203-
"expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4228+
// With batch coalescing, we may have fewer batches than expected
4229+
assert!(
4230+
batches.len() <= expected_batch_count,
4231+
"expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
4232+
batches.len()
42044233
);
42054234

42064235
let expected = match join_type {
@@ -4210,7 +4239,17 @@ mod tests {
42104239
JoinType::LeftAnti => left_empty.to_vec(),
42114240
_ => common_result.to_vec(),
42124241
};
4213-
assert_batches_eq!(expected, &batches);
4242+
// For anti joins with empty results, we may get zero batches
4243+
// (with coalescing) instead of one empty batch with schema
4244+
if batches.is_empty() {
4245+
// Verify this is an expected empty result case
4246+
assert!(
4247+
matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
4248+
"Unexpected empty result for {join_type} join"
4249+
);
4250+
} else {
4251+
assert_batches_eq!(expected, &batches);
4252+
}
42144253
}
42154254
}
42164255
}
@@ -4473,9 +4512,15 @@ mod tests {
44734512

44744513
assert_join_metrics!(metrics, 0);
44754514

4476-
let expected_null_neq =
4477-
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4478-
assert_batches_eq!(expected_null_neq, &batches_null_neq);
4515+
// With batch coalescing, empty results may not emit any batches
4516+
// Check that either we have no batches, or an empty batch with proper schema
4517+
if batches_null_neq.is_empty() {
4518+
// This is fine - no output rows
4519+
} else {
4520+
let expected_null_neq =
4521+
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4522+
assert_batches_eq!(expected_null_neq, &batches_null_neq);
4523+
}
44794524

44804525
Ok(())
44814526
}

0 commit comments

Comments
 (0)