Skip to content

Avoid 2x memory amplification from concat_batches in joins #23076

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

https://docs.rs/arrow-select/latest/arrow_select/concat/fn.concat_batches.html

concat_batches currently uses 2X memory comparing to the input memory size.

e.g. input has 10 batches, 100MB each, during concatenating, peak memory usage (RSS) is 2GB (2x than ideal), the reason is from the implementation, output buffer and input buffer coexist until finish.

This causes significant memory inefficiency for joins (hash join, nested loop join): currently the join implementation typically buffer all input batches, concat them into a large batch, then perform the joining steps. Their peak memory usage become 2X than the ideal case.

Reproducer in datafusion-cli

/usr/bin/time -l datafusion-cli -f /Users/yongting/Scripts/hj_mem.sql

100000001 row(s) fetched. (First 40 displayed. Use --maxrows to adjust)
Elapsed 0.512 seconds.

        0.54 real         0.43 user         0.11 sys
          2032107520  maximum resident set size                  <--- 2GB
-- hj_mem.sql
set datafusion.execution.target_partitions=1;

select *
from generate_series(100000000) as t1(v1)
join generate_series(100000000) as t2(v1)
on t1.v1=t2.v1;

This query buffers all build side first, and do streaming probing, build side is Int64 type, so ideal memory consumption is 100M * 8B, inefficient concat_batches explains the 2x. (The remaining 0.5x is the ArrayMap overhead in hash join.)

Here is a related issue trying to solve this issue by avoiding concat_batches in joins: #23031

Describe the solution you'd like

Implement a similar util concat_batches_owned. Incrementally concat the batches like

let inprogress_batch = init_inprogress_batch(input_batches[0].schema());
for batch in input_batches:
    inprogress_batch.push(batch);
    // now batch is dropped and release memory

The new util have to consume input batches instead of taking reference, but this should be fine for the join use cases.

The implementation should be similar to

Ideally this approach can do better than the existing 2X memory.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions