Skip to content

[GLUTEN-12235][VL] Fix hash shuffle spill ordering when evicting partition buffers#12231

Merged
marin-ma merged 5 commits into
apache:mainfrom
zml1206:spill_order
Jun 9, 2026
Merged

[GLUTEN-12235][VL] Fix hash shuffle spill ordering when evicting partition buffers#12231
marin-ma merged 5 commits into
apache:mainfrom
zml1206:spill_order

Conversation

@zml1206

@zml1206 zml1206 commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

Hash shuffle may write partition-buffer spill payloads to the same local spill file out of partition-id order.

After #10009, VeloxHashShuffleWriter::evictPartitionBuffersMinSize() sorts partition buffers by size before spilling, so the write order can become pid 4 -> pid 1. However, LocalPartitionWriter::mergeSpills() later merges spill files by scanning pids in ascending order, and Spill::nextPayload(pid) only consumes the queue head. If a lower pid appears after a higher pid in the same spill file, it can remain unmerged and fail during clearResource() with:

Native shuffle write: ShuffleWriter stop failed - Invalid:
Merging from spill N is not exhausted. pid: X

This pr keep the size-based eviction selection, but separate selection order from write order.

This change first selects partition IDs by estimated partition-buffer bytes until enough memory is selected for eviction, then sorts the selected IDs by pid before assembling and writing each payload to the local partition writer. This preserves the existing large-buffer-first eviction behavior while ensuring local spill files are written in the order required by the merge logic.

Materializing and evicting one selected partition at a time also avoids holding all selected InMemoryPayloads simultaneously, reducing peak memory usage during eviction.

The earlier fallback that split spill files in LocalPartitionWriter on pid wraparound is removed, since the hash writer now preserves pid order before writing.

How was this patch tested?

UT. A regression test covers the case where a higher pid has a larger partition buffer than a lower pid.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: codex GPT-5.5

Related issue: #12235

@zml1206 zml1206 marked this pull request as draft June 4, 2026 08:46
@github-actions github-actions Bot added the VELOX label Jun 4, 2026
@zml1206 zml1206 changed the title [VL] Fix hash shuffle spill ordering when evicting partition buffers [GLUTEN-12235][VL] Fix hash shuffle spill ordering when evicting partition buffers Jun 4, 2026
@zml1206 zml1206 requested a review from Copilot June 4, 2026 09:49
@zml1206 zml1206 requested review from marin-ma and wForget June 4, 2026 10:01
@zml1206 zml1206 marked this pull request as ready for review June 4, 2026 10:01

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@wForget

wForget commented Jun 4, 2026

Copy link
Copy Markdown
Member

However, LocalPartitionWriter::mergeSpills() later merges spill files by scanning pids in ascending order, and Spill::nextPayload(pid) only consumes the queue head.

Could evictPartitionBuffersMinSize be executed multiple times, causing the payloads of spiller to resemble [1, 2, 3, 1, 2, 3]?

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comment on lines +1521 to +1525
std::vector<PartitionPayload> selectedPayloads;
int64_t selectedBytes = 0;
for (auto& item : pidToSize) {
auto pid = item.first;
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I updated the change to select only partition IDs first, based on estimated partition-buffer bytes, then sort the selected IDs by pid and do assembleBuffers() + hashEvict() one partition at a time.

This keeps the large-buffer-first selection behavior, preserves pid-ordered spill writes, and avoids holding all selected InMemoryPayloads at the same time.

@zml1206

zml1206 commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

However, LocalPartitionWriter::mergeSpills() later merges spill files by scanning pids in ascending order, and Spill::nextPayload(pid) only consumes the queue head.

Could evictPartitionBuffersMinSize be executed multiple times, causing the payloads of spiller to resemble [1, 2, 3, 1, 2, 3]?

Yes, evictPartitionBuffersMinSize() can be called multiple times, so the overall sequence across reclaim rounds may look like [1, 2, 3, 1, 2, 3].
That should be fine because the ordering requirement is for a single active spill batch/file. Within one evictPartitionBuffersMinSize() call, the selected pids are sorted before hashEvict(), so one active spiller sees pids in increasing order. Between reclaim rounds, LocalPartitionWriter::reclaimFixedSize() finishes the current spiller before continuing, so the next [1, 2, 3] sequence belongs to a new spill batch/file rather than the same active spiller.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.

@zml1206

zml1206 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

@marin-ma Can you help take a look? Thanks.

@marin-ma marin-ma left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@marin-ma marin-ma merged commit a69348d into apache:main Jun 9, 2026
62 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants