Is your feature request related to a problem or challenge?
https://docs.rs/arrow-select/latest/arrow_select/concat/fn.concat_batches.html
concat_batches currently uses 2X memory comparing to the input memory size.
e.g. input has 10 batches, 100MB each, during concatenating, peak memory usage (RSS) is 2GB (2x than ideal), the reason is from the implementation, output buffer and input buffer coexist until finish.
This causes significant memory inefficiency for joins (hash join, nested loop join): currently the join implementation typically buffer all input batches, concat them into a large batch, then perform the joining steps. Their peak memory usage become 2X than the ideal case.
Reproducer in datafusion-cli
/usr/bin/time -l datafusion-cli -f /Users/yongting/Scripts/hj_mem.sql
100000001 row(s) fetched. (First 40 displayed. Use --maxrows to adjust)
Elapsed 0.512 seconds.
0.54 real 0.43 user 0.11 sys
2032107520 maximum resident set size <--- 2GB
-- hj_mem.sql
set datafusion.execution.target_partitions=1;
select *
from generate_series(100000000) as t1(v1)
join generate_series(100000000) as t2(v1)
on t1.v1=t2.v1;
This query buffers all build side first, and do streaming probing, build side is Int64 type, so ideal memory consumption is 100M * 8B, inefficient concat_batches explains the 2x. (The remaining 0.5x is the ArrayMap overhead in hash join.)
Here is a related issue trying to solve this issue by avoiding concat_batches in joins: #23031
Describe the solution you'd like
Implement a similar util concat_batches_owned. Incrementally concat the batches like
let inprogress_batch = init_inprogress_batch(input_batches[0].schema());
for batch in input_batches:
inprogress_batch.push(batch);
// now batch is dropped and release memory
The new util have to consume input batches instead of taking reference, but this should be fine for the join use cases.
The implementation should be similar to
Ideally this approach can do better than the existing 2X memory.
Describe alternatives you've considered
No response
Additional context
No response
Is your feature request related to a problem or challenge?
https://docs.rs/arrow-select/latest/arrow_select/concat/fn.concat_batches.html
concat_batchescurrently uses 2X memory comparing to the input memory size.e.g. input has 10 batches, 100MB each, during concatenating, peak memory usage (RSS) is 2GB (2x than ideal), the reason is from the implementation, output buffer and input buffer coexist until finish.
This causes significant memory inefficiency for joins (hash join, nested loop join): currently the join implementation typically buffer all input batches, concat them into a large batch, then perform the joining steps. Their peak memory usage become 2X than the ideal case.
Reproducer in
datafusion-cliThis query buffers all build side first, and do streaming probing, build side is Int64 type, so ideal memory consumption is 100M * 8B, inefficient
concat_batchesexplains the 2x. (The remaining 0.5x is the ArrayMap overhead in hash join.)Here is a related issue trying to solve this issue by avoiding
concat_batchesin joins: #23031Describe the solution you'd like
Implement a similar util
concat_batches_owned. Incrementally concat the batches likeThe new util have to consume input batches instead of taking reference, but this should be fine for the join use cases.
The implementation should be similar to
Ideally this approach can do better than the existing 2X memory.
Describe alternatives you've considered
No response
Additional context
No response