[GLUTEN-12235][VL] Fix hash shuffle spill ordering when evicting partition buffers#12231
Conversation
Could |
| std::vector<PartitionPayload> selectedPayloads; | ||
| int64_t selectedBytes = 0; | ||
| for (auto& item : pidToSize) { | ||
| auto pid = item.first; | ||
| ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); |
There was a problem hiding this comment.
Good point. I updated the change to select only partition IDs first, based on estimated partition-buffer bytes, then sort the selected IDs by pid and do assembleBuffers() + hashEvict() one partition at a time.
This keeps the large-buffer-first selection behavior, preserves pid-ordered spill writes, and avoids holding all selected InMemoryPayloads at the same time.
Yes, evictPartitionBuffersMinSize() can be called multiple times, so the overall sequence across reclaim rounds may look like [1, 2, 3, 1, 2, 3]. |
|
@marin-ma Can you help take a look? Thanks. |
What changes are proposed in this pull request?
Hash shuffle may write partition-buffer spill payloads to the same local spill file out of partition-id order.
After #10009, VeloxHashShuffleWriter::evictPartitionBuffersMinSize() sorts partition buffers by size before spilling, so the write order can become pid 4 -> pid 1. However, LocalPartitionWriter::mergeSpills() later merges spill files by scanning pids in ascending order, and Spill::nextPayload(pid) only consumes the queue head. If a lower pid appears after a higher pid in the same spill file, it can remain unmerged and fail during clearResource() with:
This pr keep the size-based eviction selection, but separate selection order from write order.
This change first selects partition IDs by estimated partition-buffer bytes until enough memory is selected for eviction, then sorts the selected IDs by pid before assembling and writing each payload to the local partition writer. This preserves the existing large-buffer-first eviction behavior while ensuring local spill files are written in the order required by the merge logic.
Materializing and evicting one selected partition at a time also avoids holding all selected InMemoryPayloads simultaneously, reducing peak memory usage during eviction.
The earlier fallback that split spill files in LocalPartitionWriter on pid wraparound is removed, since the hash writer now preserves pid order before writing.
How was this patch tested?
UT. A regression test covers the case where a higher pid has a larger partition buffer than a lower pid.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: codex GPT-5.5
Related issue: #12235