Skip to content

perf: stream the target on the probe side of the merge_insert join#7382

Open
sezruby wants to merge 3 commits into
lance-format:mainfrom
sezruby:fix/merge-insert-join-build-side
Open

perf: stream the target on the probe side of the merge_insert join#7382
sezruby wants to merge 3 commits into
lance-format:mainfrom
sezruby:fix/merge-insert-join-build-side

Conversation

@sezruby

@sezruby sezruby commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Problem

create_plan — the merge_insert fast path used by MergeInsertJob::execute — builds the join as target.join(source).

When the target exceeds DataFusion's hash-join collect threshold (hash_join_single_partition_threshold_rows, 128K rows), the join is planned as mode=Partitioned, whose build (left) side is hashed and held in memory (per partition). With the target on the left, the entire target is materialized in memory during the merge. On a large target this dominates memory use and scales with target size, even when only a handful of rows are being upserted.

This is the wrong side to build: the source of an upsert is typically far smaller than the target.

Fix

Build the join as source.join(target) so the (typically small) source is the hash build side and the (potentially huge) target is streamed as the probe side.

Neither input carries comparable row statistics (the source is a one-shot stream), so DataFusion's should_swap_join_order leaves the operands as written rather than swapping the target back onto the build side. The join-type mapping is mirrored accordingly (LeftRight; Inner/Full unchanged). Every column is referenced downstream by qualified name, not position, so the join output is semantically identical — this is purely a memory/scheduling change.

Measured (single-node, pure MergeInsertJob::execute)

Upserting a 50K-row source into a wide-row target, peak process RSS (getrusage):

target rows before (target on build) after (target on probe)
5M 0.34 GB
20M 1.82 GB 0.39 GB

After the fix, peak memory is bounded by the source size and stays flat as the target grows (5M→20M: ~0.35 GB); before, it scales with the target. ~4.7× lower peak at 20M, and the gap widens with target size.

How the memory numbers were produced (single-node, no external deps)

I measured this with a throwaway #[ignore]d test in merge_insert.rs (not included in this PR — it allocates multiple GB and isn't a unit test). It writes a wide-row target, upserts a small source through MergeInsertJob::execute, and prints the process peak RSS via getrusage(RUSAGE_SELF). Reviewers can paste it in to reproduce:

#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn merge_repro_peak_rss() {
    use arrow_array::types::{Float64Type, UInt64Type};
    use lance_datagen::ByteCount;

    fn peak_rss_bytes() -> i64 {
        // ru_maxrss is KB on Linux, bytes on macOS.
        let mut usage: libc::rusage = unsafe { std::mem::zeroed() };
        unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut usage) };
        let maxrss = usage.ru_maxrss as i64;
        if cfg!(target_os = "macos") { maxrss } else { maxrss * 1024 }
    }

    let target_rows = std::env::var("REPRO_TARGET_ROWS")
        .ok().and_then(|s| s.parse::<usize>().ok()).unwrap_or(5_000_000);
    let source_rows = std::env::var("REPRO_SOURCE_ROWS")
        .ok().and_then(|s| s.parse::<usize>().ok()).unwrap_or(50_000);

    let test_dir = TempStrDir::default();
    let test_uri = &test_dir;

    // Wide rows so the target is genuinely large in memory: a u64 key plus
    // two 48-byte strings and a float.
    let batch_rows = 100_000usize;
    let target = lance_datagen::gen_batch()
        .with_seed(Seed::from(1))
        .col("key", array::step::<UInt64Type>())
        .col("v0", array::rand_utf8(ByteCount::from(48), false))
        .col("v1", array::rand_utf8(ByteCount::from(48), false))
        .col("v2", array::rand::<Float64Type>())
        .into_reader_rows(
            RowCount::from(batch_rows as u64),
            BatchCount::from((target_rows / batch_rows) as u32),
        );
    let ds = Arc::new(Dataset::write(target, test_uri, None).await.unwrap());

    // Source: a small set of existing keys (pure updates).
    let source = lance_datagen::gen_batch()
        .with_seed(Seed::from(2))
        .col("key", array::step::<UInt64Type>())
        .col("v0", array::rand_utf8(ByteCount::from(48), false))
        .col("v1", array::rand_utf8(ByteCount::from(48), false))
        .col("v2", array::rand::<Float64Type>())
        .into_reader_rows(RowCount::from(source_rows as u64), BatchCount::from(1));
    let source_stream = reader_to_stream(Box::new(source));

    let job = MergeInsertBuilder::try_new(ds.clone(), vec!["key".to_string()])
        .unwrap()
        .when_matched(WhenMatched::UpdateAll)
        .when_not_matched(WhenNotMatched::InsertAll)
        .try_build()
        .unwrap();

    let before = peak_rss_bytes();
    let (_ds, stats) = job.execute(source_stream).await.unwrap();
    let after = peak_rss_bytes();

    eprintln!(
        "target_rows={target_rows} source_rows={source_rows} updated={} \
         peak_rss_before={:.2}GB peak_rss_after={:.2}GB",
        stats.num_updated_rows, before as f64 / 1e9, after as f64 / 1e9,
    );
}

Run (each orientation in its own process so peak RSS is isolated):

REPRO_TARGET_ROWS=20000000 cargo test -p lance --lib --release \
    merge_repro_peak_rss -- --ignored --nocapture

To see the "before" number, temporarily flip the orientation back to scan_aliased.join(source_df_aliased, ...) (with the join type mirrored) and rerun.

Test

Adds test_plan_keeps_target_on_probe_side_at_scale, which exercises the production-representative Partitioned plan (>128K-row target) and asserts the target scan (LanceRead) is the probe (right) side of the HashJoinExec.

The existing toy-sized snapshot tests cannot catch a regression here: at small scale the join is CollectLeft and the optimizer freely swaps sides, so they would still pass with the operands reversed. The four snapshot tests are updated for the new (semantically identical) projection column order.

cargo test -p lance --lib merge_insert → 157 passed. cargo fmt --all + cargo clippy -p lance --lib --tests clean.

🤖 Generated with Claude Code

`create_plan` (the v2 merge_insert fast path used by both single-node
`execute` and uncommitted/distributed merges) built the join as
`target.join(source)`. When the target exceeds DataFusion's hash-join
collect threshold the join is planned as `mode=Partitioned`, whose build
(left) side is hashed and held in memory per partition. With the target on
the left this materialized the entire target per partition, which can
exhaust executor memory on large tables (observed: an 8x16GB Spark cluster
OOM-killed merging a 12-51M row source into a 64M row target, peak RSS
hitting the 16GB pod limit with only ~0.4GB on the JVM heap — the rest was
native Arrow hash memory).

Build the join as `source.join(target)` instead, so the (typically small)
source is the hash build side and the (potentially huge) target is streamed
as the probe side. Neither input carries comparable row statistics (the
source is a one-shot stream), so DataFusion's `should_swap_join_order`
leaves the operands as written rather than swapping the target back onto the
build side. The join output is semantically identical — every column is
referenced downstream by qualified name, not position — so this is purely a
memory/scheduling change.

Measured on the OOM repro (real 64M-row target, same 16GB pods): the merge
now completes where it previously OOM-killed, peak executor RSS dropped from
16.3GB to 6.7GB (20% source) and 11.0GB (80% source), and it ran ~2x faster
than the position-delta path.

Add `test_plan_keeps_target_on_probe_side_at_scale`, which exercises the
production-representative `Partitioned` plan (>128K-row target) and asserts
the target scan is the probe side. The existing toy-sized snapshot tests
cannot catch a regression here: at small scale the join is `CollectLeft` and
the optimizer freely swaps sides, so they pass with the operands reversed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sezruby sezruby marked this pull request as draft June 20, 2026 01:37
@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 77.77778% with 4 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/dataset/write/merge_insert.rs 77.77% 3 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@sezruby sezruby marked this pull request as ready for review June 20, 2026 02:28
.expect("HashJoinExec");

// At this scale the join must be Partitioned, not CollectLeft.
assert_eq!(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumes the default planner will always produce a partitioned hash join. On hosts where DataFusion's default target partition count is 1, the same plan uses CollectLeft and the regression test fails even though the production join orientation is still correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 06ea89f. The assertion now checks only the join orientation (target LanceRead on the probe/right side), not the partition mode.

I verified by forcing target_partitions(1) in create_plan: the plan becomes mode=CollectLeft, join_type=Left with the source still on the build (left) side and the target on the probe (right) side — i.e. the fix holds on single-core hosts too, only the mode differs. The previous mode == Partitioned assertion was the only environment-dependent part; it's gone. Test passes in both modes.

sezruby and others added 2 commits June 21, 2026 08:52
Addresses review feedback: the regression test asserted
`partition_mode() == Partitioned`, which depends on DataFusion's
`target_partitions` (host core count). On a single-core host the same plan is
`CollectLeft`, so the assertion failed even though the join orientation — the
thing the fix guarantees — was still correct (source on build, target on
probe).

Drop the partition-mode assertion and keep only the orientation check (target
`LanceRead` is the probe/right input), which holds in both `CollectLeft` and
`Partitioned`. Verified by forcing `target_partitions(1)`: the target stays on
the probe side and the test passes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The fix changes the operand order for every merge shape, but the regression
test only exercised the upsert (Left join). Parametrize it across all four
shapes — Inner (update-only), Right (delete unmatched target), Left (upsert),
Full (upsert + delete) — and assert both the resulting join type and that the
target (`LanceRead`) stays on the probe side.

Verified the orientation holds for all four under both `Partitioned`
(multi-core default) and `CollectLeft` (forced `target_partitions=1`), so the
fix is confirmed correct on single-core hosts too, not just for the upsert.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@wjones127

Copy link
Copy Markdown
Contributor

This seems fine for now. FYI I have a PR that solves this by defining the necessary statistics to allow the optimizer to do the join order swap. It’s in draft here: #7368

I think that will be the better long term solution.

@sezruby

sezruby commented Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

Relationship to #7368 (TableProvider write inputs / source statistics)

@wjones127
#7368 also rewrites create_plan's source handling, so the two overlap textually — but they cover different cases and don't conflict semantically:

  • This PR (perf: stream the target on the probe side of the merge_insert join #7382) governs statless stream sources: it builds source.join(target) so the small source is the hash build side and the large target streams as the probe (preventing the per-partition full-target materialization that OOM'd workers). That orientation holds precisely because a one-shot stream carries no statistics for should_swap_join_order to act on.
  • feat: accept TableProvider write inputs for merge_insert and insert #7368 keeps stream/spill sources statless (one_shot_provider / spilling_table_provider report no stats) and only adds exact num_rows/total_byte_size for materialized sources (MemTable/file), where the optimizer then picks the build side by cost.

So they partition cleanly: statless stream → explicit source-build (this PR); materialized → stats-driven. The only thing to reconcile is the textual merge in create_plan for whichever lands second.

Minor suggestion for #7368: have test_merge_insert_source_statistics_in_plan assert the resulting build side (not just that stats reach the plan), so the materialized path's join orientation is pinned alongside this PR's stream-path orientation.

@wjones127

Copy link
Copy Markdown
Contributor

so the small source is the hash build side and the large target streams as the probe

FWIW, the source might not always be smaller than the target. In upsert workloads, we only need the on columns from the target, but we need all columns from the source. If the on is just an integer id, then even on a large table it's not that large.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants