Skip to content
53 changes: 47 additions & 6 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ arrow::Result<int64_t> compressBuffer(
// Same wire format as compressBuffer:
// kTypeAwareBuffer (int64) | uncompressedLength (int64) | compressedLength (int64) | compressed data
// If compressed size >= uncompressed size, falls back to kUncompressedBuffer (same as standard codec).
//
// For TAC type kStringDict, the offsetsBuffer + numRows arguments must be
// supplied so the codec can build the dictionary. They are ignored for other
// TAC types.
arrow::Result<int64_t> compressTypeAwareBuffer(
const std::shared_ptr<arrow::Buffer>& buffer,
uint8_t* output,
int64_t outputLength,
int8_t typeKind) {
int8_t typeKind,
const uint8_t* offsetsBuffer = nullptr,
int32_t numRows = 0) {
auto outputPtr = &output;
if (!buffer) {
write<int64_t>(outputPtr, kNullBuffer);
Expand All @@ -116,7 +122,8 @@ arrow::Result<int64_t> compressTypeAwareBuffer(

ARROW_ASSIGN_OR_RAISE(
auto compressedSize,
TypeAwareCompressCodec::compress(buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind));
TypeAwareCompressCodec::compress(
buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind, offsetsBuffer, numRows));

if (compressedSize >= buffer->size()) {
// Compression didn't help. Fall back to uncompressed, same as compressBuffer.
Expand Down Expand Up @@ -272,12 +279,46 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
auto typeKind = (bufferTypes != nullptr && i < bufferTypes->size()) ? (*bufferTypes)[i] : tac::kUnsupported;

int64_t compressedSize = 0;
if (TypeAwareCompressCodec::support(typeKind)) {
// Use type-aware compression for supported types.
// For string-dict compression the codec needs per-row offsets into
// the string-data buffer. Production string columns reach us in two
// shapes:
//
// (a) Arrow standard layout: validity, offsets[numRows+1], data.
// offsets[0] == 0, offsets[numRows] == data buffer size.
// (b) VeloxHashShuffleWriter layout: validity, lengths[numRows],
// data — i.e. a per-row length, not cumulative offsets.
//
// The codec reads `offsets[numRows]` as the data-buffer end sentinel,
// which is in-bounds for shape-a but ONE PAST THE END for shape-b.
// To stay correct, only route shape-a inputs through compressStringDict;
// shape-b inputs fall through to the standard LZ4/ZSTD codec below.
// The codec's internal `sliced` early-exit was already producing LZ4
// output for the common shape-b case (where `offsets[0] != 0`), so
// observable compressed size is unchanged at the user-data level.
//
// Use the authoritative `numRows` parameter (NOT `bufferSize/4 - 1`)
// so single-row batches don't underflow to `numRows == 0`.
bool routeToTacStringDict = false;
const uint8_t* offsetsBuf = nullptr;
int32_t offsetsNumRows = 0;
if (typeKind == tac::kStringDict && numRows > 0 && i >= 1 && buffers[i - 1] != nullptr) {
const auto prevSize = buffers[i - 1]->size();
const auto expectedOffsetsBytes = static_cast<int64_t>(numRows + 1) * sizeof(int32_t);
if (prevSize == expectedOffsetsBytes) {
offsetsBuf = buffers[i - 1]->data();
offsetsNumRows = static_cast<int32_t>(numRows);
routeToTacStringDict = true;
}
}

if (TypeAwareCompressCodec::support(typeKind) && (typeKind != tac::kStringDict || routeToTacStringDict)) {
ARROW_ASSIGN_OR_RAISE(
compressedSize, compressTypeAwareBuffer(std::move(buffers[i]), output, availableLength, typeKind));
compressedSize,
compressTypeAwareBuffer(
std::move(buffers[i]), output, availableLength, typeKind, offsetsBuf, offsetsNumRows));
} else {
// Use standard codec (LZ4/ZSTD) for unsupported types.
// Use standard codec (LZ4/ZSTD) for unsupported types and for
// kStringDict on shape-b lengths buffers (or any unexpected shape).
ARROW_ASSIGN_OR_RAISE(compressedSize, compressBuffer(std::move(buffers[i]), output, availableLength, codec));
}
output += compressedSize;
Expand Down
Loading
Loading