perf: stream the target on the probe side of the merge_insert join#7382
perf: stream the target on the probe side of the merge_insert join#7382sezruby wants to merge 3 commits into
Conversation
`create_plan` (the v2 merge_insert fast path used by both single-node `execute` and uncommitted/distributed merges) built the join as `target.join(source)`. When the target exceeds DataFusion's hash-join collect threshold the join is planned as `mode=Partitioned`, whose build (left) side is hashed and held in memory per partition. With the target on the left this materialized the entire target per partition, which can exhaust executor memory on large tables (observed: an 8x16GB Spark cluster OOM-killed merging a 12-51M row source into a 64M row target, peak RSS hitting the 16GB pod limit with only ~0.4GB on the JVM heap — the rest was native Arrow hash memory). Build the join as `source.join(target)` instead, so the (typically small) source is the hash build side and the (potentially huge) target is streamed as the probe side. Neither input carries comparable row statistics (the source is a one-shot stream), so DataFusion's `should_swap_join_order` leaves the operands as written rather than swapping the target back onto the build side. The join output is semantically identical — every column is referenced downstream by qualified name, not position — so this is purely a memory/scheduling change. Measured on the OOM repro (real 64M-row target, same 16GB pods): the merge now completes where it previously OOM-killed, peak executor RSS dropped from 16.3GB to 6.7GB (20% source) and 11.0GB (80% source), and it ran ~2x faster than the position-delta path. Add `test_plan_keeps_target_on_probe_side_at_scale`, which exercises the production-representative `Partitioned` plan (>128K-row target) and asserts the target scan is the probe side. The existing toy-sized snapshot tests cannot catch a regression here: at small scale the join is `CollectLeft` and the optimizer freely swaps sides, so they pass with the operands reversed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| .expect("HashJoinExec"); | ||
|
|
||
| // At this scale the join must be Partitioned, not CollectLeft. | ||
| assert_eq!( |
There was a problem hiding this comment.
This test assumes the default planner will always produce a partitioned hash join. On hosts where DataFusion's default target partition count is 1, the same plan uses CollectLeft and the regression test fails even though the production join orientation is still correct.
There was a problem hiding this comment.
Good catch — fixed in 06ea89f. The assertion now checks only the join orientation (target LanceRead on the probe/right side), not the partition mode.
I verified by forcing target_partitions(1) in create_plan: the plan becomes mode=CollectLeft, join_type=Left with the source still on the build (left) side and the target on the probe (right) side — i.e. the fix holds on single-core hosts too, only the mode differs. The previous mode == Partitioned assertion was the only environment-dependent part; it's gone. Test passes in both modes.
Addresses review feedback: the regression test asserted `partition_mode() == Partitioned`, which depends on DataFusion's `target_partitions` (host core count). On a single-core host the same plan is `CollectLeft`, so the assertion failed even though the join orientation — the thing the fix guarantees — was still correct (source on build, target on probe). Drop the partition-mode assertion and keep only the orientation check (target `LanceRead` is the probe/right input), which holds in both `CollectLeft` and `Partitioned`. Verified by forcing `target_partitions(1)`: the target stays on the probe side and the test passes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The fix changes the operand order for every merge shape, but the regression test only exercised the upsert (Left join). Parametrize it across all four shapes — Inner (update-only), Right (delete unmatched target), Left (upsert), Full (upsert + delete) — and assert both the resulting join type and that the target (`LanceRead`) stays on the probe side. Verified the orientation holds for all four under both `Partitioned` (multi-core default) and `CollectLeft` (forced `target_partitions=1`), so the fix is confirmed correct on single-core hosts too, not just for the upsert. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
This seems fine for now. FYI I have a PR that solves this by defining the necessary statistics to allow the optimizer to do the join order swap. It’s in draft here: #7368 I think that will be the better long term solution. |
Relationship to #7368 (TableProvider write inputs / source statistics)@wjones127
So they partition cleanly: statless stream → explicit source-build (this PR); materialized → stats-driven. The only thing to reconcile is the textual merge in Minor suggestion for #7368: have |
FWIW, the source might not always be smaller than the target. In upsert workloads, we only need the |
Problem
create_plan— themerge_insertfast path used byMergeInsertJob::execute— builds the join astarget.join(source).When the target exceeds DataFusion's hash-join collect threshold (
hash_join_single_partition_threshold_rows, 128K rows), the join is planned asmode=Partitioned, whose build (left) side is hashed and held in memory (per partition). With the target on the left, the entire target is materialized in memory during the merge. On a large target this dominates memory use and scales with target size, even when only a handful of rows are being upserted.This is the wrong side to build: the source of an upsert is typically far smaller than the target.
Fix
Build the join as
source.join(target)so the (typically small) source is the hash build side and the (potentially huge) target is streamed as the probe side.Neither input carries comparable row statistics (the source is a one-shot stream), so DataFusion's
should_swap_join_orderleaves the operands as written rather than swapping the target back onto the build side. The join-type mapping is mirrored accordingly (Left↔Right;Inner/Fullunchanged). Every column is referenced downstream by qualified name, not position, so the join output is semantically identical — this is purely a memory/scheduling change.Measured (single-node, pure
MergeInsertJob::execute)Upserting a 50K-row source into a wide-row target, peak process RSS (
getrusage):After the fix, peak memory is bounded by the source size and stays flat as the target grows (5M→20M: ~0.35 GB); before, it scales with the target. ~4.7× lower peak at 20M, and the gap widens with target size.
How the memory numbers were produced (single-node, no external deps)
I measured this with a throwaway
#[ignore]d test inmerge_insert.rs(not included in this PR — it allocates multiple GB and isn't a unit test). It writes a wide-row target, upserts a small source throughMergeInsertJob::execute, and prints the process peak RSS viagetrusage(RUSAGE_SELF). Reviewers can paste it in to reproduce:Run (each orientation in its own process so peak RSS is isolated):
To see the "before" number, temporarily flip the orientation back to
scan_aliased.join(source_df_aliased, ...)(with the join type mirrored) and rerun.Test
Adds
test_plan_keeps_target_on_probe_side_at_scale, which exercises the production-representativePartitionedplan (>128K-row target) and asserts the target scan (LanceRead) is the probe (right) side of theHashJoinExec.The existing toy-sized snapshot tests cannot catch a regression here: at small scale the join is
CollectLeftand the optimizer freely swaps sides, so they would still pass with the operands reversed. The four snapshot tests are updated for the new (semantically identical) projection column order.cargo test -p lance --lib merge_insert→ 157 passed.cargo fmt --all+cargo clippy -p lance --lib --testsclean.🤖 Generated with Claude Code