diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 5d02a844833..b78052c4a35 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -92,11 +92,17 @@ arrow::Result compressBuffer( // Same wire format as compressBuffer: // kTypeAwareBuffer (int64) | uncompressedLength (int64) | compressedLength (int64) | compressed data // If compressed size >= uncompressed size, falls back to kUncompressedBuffer (same as standard codec). +// +// For TAC type kStringDict, the offsetsBuffer + numRows arguments must be +// supplied so the codec can build the dictionary. They are ignored for other +// TAC types. arrow::Result compressTypeAwareBuffer( const std::shared_ptr& buffer, uint8_t* output, int64_t outputLength, - int8_t typeKind) { + int8_t typeKind, + const uint8_t* offsetsBuffer = nullptr, + int32_t numRows = 0) { auto outputPtr = &output; if (!buffer) { write(outputPtr, kNullBuffer); @@ -116,7 +122,8 @@ arrow::Result compressTypeAwareBuffer( ARROW_ASSIGN_OR_RAISE( auto compressedSize, - TypeAwareCompressCodec::compress(buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind)); + TypeAwareCompressCodec::compress( + buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind, offsetsBuffer, numRows)); if (compressedSize >= buffer->size()) { // Compression didn't help. Fall back to uncompressed, same as compressBuffer. @@ -272,12 +279,46 @@ arrow::Result> BlockPayload::fromBuffers( auto typeKind = (bufferTypes != nullptr && i < bufferTypes->size()) ? (*bufferTypes)[i] : tac::kUnsupported; int64_t compressedSize = 0; - if (TypeAwareCompressCodec::support(typeKind)) { - // Use type-aware compression for supported types. + // For string-dict compression the codec needs per-row offsets into + // the string-data buffer. Production string columns reach us in two + // shapes: + // + // (a) Arrow standard layout: validity, offsets[numRows+1], data. + // offsets[0] == 0, offsets[numRows] == data buffer size. + // (b) VeloxHashShuffleWriter layout: validity, lengths[numRows], + // data — i.e. a per-row length, not cumulative offsets. + // + // The codec reads `offsets[numRows]` as the data-buffer end sentinel, + // which is in-bounds for shape-a but ONE PAST THE END for shape-b. + // To stay correct, only route shape-a inputs through compressStringDict; + // shape-b inputs fall through to the standard LZ4/ZSTD codec below. + // The codec's internal `sliced` early-exit was already producing LZ4 + // output for the common shape-b case (where `offsets[0] != 0`), so + // observable compressed size is unchanged at the user-data level. + // + // Use the authoritative `numRows` parameter (NOT `bufferSize/4 - 1`) + // so single-row batches don't underflow to `numRows == 0`. + bool routeToTacStringDict = false; + const uint8_t* offsetsBuf = nullptr; + int32_t offsetsNumRows = 0; + if (typeKind == tac::kStringDict && numRows > 0 && i >= 1 && buffers[i - 1] != nullptr) { + const auto prevSize = buffers[i - 1]->size(); + const auto expectedOffsetsBytes = static_cast(numRows + 1) * sizeof(int32_t); + if (prevSize == expectedOffsetsBytes) { + offsetsBuf = buffers[i - 1]->data(); + offsetsNumRows = static_cast(numRows); + routeToTacStringDict = true; + } + } + + if (TypeAwareCompressCodec::support(typeKind) && (typeKind != tac::kStringDict || routeToTacStringDict)) { ARROW_ASSIGN_OR_RAISE( - compressedSize, compressTypeAwareBuffer(std::move(buffers[i]), output, availableLength, typeKind)); + compressedSize, + compressTypeAwareBuffer( + std::move(buffers[i]), output, availableLength, typeKind, offsetsBuf, offsetsNumRows)); } else { - // Use standard codec (LZ4/ZSTD) for unsupported types. + // Use standard codec (LZ4/ZSTD) for unsupported types and for + // kStringDict on shape-b lengths buffers (or any unexpected shape). ARROW_ASSIGN_OR_RAISE(compressedSize, compressBuffer(std::move(buffers[i]), output, availableLength, codec)); } output += compressedSize; diff --git a/cpp/core/tests/FForCodecTest.cc b/cpp/core/tests/FForCodecTest.cc index a7233222112..919873e63f9 100644 --- a/cpp/core/tests/FForCodecTest.cc +++ b/cpp/core/tests/FForCodecTest.cc @@ -19,8 +19,10 @@ #include "utils/tac/TypeAwareCompressCodec.h" #include "utils/tac/ffor.hpp" +#include #include #include +#include #include #include @@ -85,7 +87,7 @@ void compressRoundtrip(const uint64_t* data, size_t num) { size_t written = compress64(data, num, buf.data()); std::vector decoded(num); - size_t nDecoded = decompress64(buf.data(), written, decoded.data()); + size_t nDecoded = decompress64(buf.data(), written, decoded.data(), num); ASSERT_EQ(nDecoded, num); for (size_t i = 0; i < num; ++i) { @@ -402,7 +404,7 @@ TEST(FForTest, Compress64MisalignedOutput) { size_t written = compress64(data.data(), data.size(), out); std::vector decoded(256); - size_t n = decompress64(out, written, decoded.data()); + size_t n = decompress64(out, written, decoded.data(), decoded.size()); ASSERT_EQ(n, size_t(256)); for (size_t i = 0; i < 256; ++i) { ASSERT_EQ(decoded[i], data[i]) << "offset=" << offset << " i=" << i; @@ -422,7 +424,7 @@ TEST(FForTest, Compress64MisalignedInput) { size_t written = compress64(misalignedInput, 256, comp.data()); std::vector decoded(256); - size_t n = decompress64(comp.data(), written, decoded.data()); + size_t n = decompress64(comp.data(), written, decoded.data(), decoded.size()); ASSERT_EQ(n, size_t(256)); for (size_t i = 0; i < 256; ++i) { ASSERT_EQ(decoded[i], raw[i]) << "offset=" << offset << " i=" << i; @@ -438,7 +440,7 @@ TEST(FForTest, Decompress64MisalignedOutput) { std::vector outBuf(256 * sizeof(uint64_t) + 16); for (size_t offset = 0; offset < 8; ++offset) { auto* misalignedOutput = reinterpret_cast(outBuf.data() + offset); - size_t n = decompress64(comp.data(), written, misalignedOutput); + size_t n = decompress64(comp.data(), written, misalignedOutput, 256); ASSERT_EQ(n, size_t(256)); for (size_t i = 0; i < 256; ++i) { uint64_t val; @@ -463,7 +465,7 @@ TEST(FForTest, Compress64AllMisaligned) { size_t written = compress64(inPtr, 256, compBuf.data() + compOff); auto* outPtr = reinterpret_cast(outBuf.data() + outOff); - size_t n = decompress64(compBuf.data() + compOff, written, outPtr); + size_t n = decompress64(compBuf.data() + compOff, written, outPtr, 256); ASSERT_EQ(n, size_t(256)); for (size_t i = 0; i < 256; ++i) { uint64_t val; @@ -545,6 +547,8 @@ TEST(FForCodecTest, FullRangeDataRoundtrip) { TEST(TypeAwareCompressCodecTest, SupportedTypes) { // Supported TAC types. ASSERT_TRUE(TypeAwareCompressCodec::support(tac::kUInt64)); + ASSERT_TRUE(TypeAwareCompressCodec::support(tac::kUInt128)); + ASSERT_TRUE(TypeAwareCompressCodec::support(tac::kUInt32)); // Not supported. ASSERT_FALSE(TypeAwareCompressCodec::support(tac::kUnsupported)); @@ -643,3 +647,1631 @@ TEST(TypeAwareCompressCodecTest, UnsupportedType) { auto result = TypeAwareCompressCodec::compress(dummy, 8, dummy, 100, kSomeUnsupportedType); ASSERT_FALSE(result.ok()); } + +// --------------------------------------------------------------------------- +// TypeAwareCompressCodec / kUInt128 — split-lane FFOR(uint64) for int128 data. +// --------------------------------------------------------------------------- + +namespace { + +// Build an int128 buffer from low / high 64-bit halves. +std::vector<__int128_t> buildI128(const std::vector& lo, const std::vector& hi) { + EXPECT_EQ(lo.size(), hi.size()); + std::vector<__int128_t> out(lo.size()); + for (size_t i = 0; i < lo.size(); ++i) { + out[i] = (static_cast<__uint128_t>(hi[i]) << 64) | lo[i]; + } + return out; +} + +void roundtripUInt128(const std::vector<__int128_t>& data, bool expectShrink) { + int64_t inputSize = static_cast(data.size() * sizeof(__int128_t)); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt128); + ASSERT_GE(maxLen, inputSize) << "maxCompressedLen must be at least input size"; + std::vector compressed(maxLen + 64, 0xCC); // 64-byte sentinel tail + size_t kSentinelStart = static_cast(maxLen); + + auto compResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + ASSERT_GT(compressedSize, 0); + ASSERT_LE(compressedSize, maxLen); + + // Sentinel tail must be untouched (no out-of-bounds write). + for (size_t i = kSentinelStart; i < compressed.size(); ++i) { + ASSERT_EQ(compressed[i], 0xCC) << "Sentinel byte trampled at offset " << i; + } + + if (expectShrink) { + ASSERT_LT(compressedSize, inputSize) << "Expected narrow-range data to compress below raw size"; + } + + std::vector<__int128_t> decoded(data.size(), 0); + auto decResult = TypeAwareCompressCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, UInt128AllZero) { + std::vector<__int128_t> data(256, 0); + roundtripUInt128(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt128SingleValue) { + // Exercise tail handling: 4 values = one FFOR lane group, no full block. + // Don't expect shrink: 4 values × 16 B = 64 B input; FFOR's per-lane block + // header (16 B) + tail (16 B) + split body header (8 B) + TAC header (2 B) + // exceeds the raw payload at this size. + std::vector<__int128_t> data(4, (__int128_t)1234567890LL); + roundtripUInt128(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt128NarrowPositive) { + // Typical SUM(decimal(7,2)) shuffle pattern: small range, all fit in lower-64, + // upper-64 is identically zero -> both lanes compress to near-zero bits. + auto lo = genData(1024, 5000, 255); + std::vector hi(1024, 0); + auto data = buildI128(lo, hi); + roundtripUInt128(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt128MediumPositive) { + // Values up to ~2^40, fits in lower-64, upper-64 is zero. + auto lo = genData(1024, 0, (uint64_t(1) << 40) - 1, /*seed=*/7); + std::vector hi(1024, 0); + auto data = buildI128(lo, hi); + roundtripUInt128(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt128MixedSign) { + // Mix positive and negative int128. Negative values stored two's-complement + // have lower-64 equal to 2^64 - |value|. With alternating signs, the lower + // lane's unsigned representation spans both "small positive" and + // "near-max-uint64", which makes FFOR's frame-of-reference compress poorly + // (bw=64 on the lower lane). The upper lane is well-compressed: only two + // distinct values (0 for positives, ~0 for negatives). + // + // This test does NOT assert shrink — pathological worst case for split-lane + // FFOR — but does verify the round-trip is exact, the codec falls through + // to FForCodec's bw=64 path correctly, and there is no buffer overflow. + std::vector<__int128_t> data; + for (int i = 0; i < 1024; ++i) { + int64_t v = (i % 2 == 0) ? int64_t(i + 100) : -int64_t(i + 100); + data.push_back(static_cast<__int128_t>(v)); + } + roundtripUInt128(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt128MixedSignSmallNegative) { + // Realistic "mostly positive with a few negatives" pattern, e.g. signed + // sum aggregates. Lower lane stays in a narrow range because the negative + // values are sparse. Verify shrink still happens. + auto pos = genData(1000, 100000, 50000, /*seed=*/37); + std::vector hi(1024, 0); + std::vector lo; + lo.reserve(1024); + for (uint64_t v : pos) { + lo.push_back(v); + } + // 24 trailing negatives (-1..-24) to round out to 1024. + for (int i = 1; i <= 24; ++i) { + int64_t v = -i; + lo.push_back(static_cast(v)); + hi[lo.size() - 1] = ~uint64_t(0); + } + auto data = buildI128(lo, hi); + // With only 24 negatives in 1024 values, the lower lane spans + // [min(100000), max(near-max-uint64)] which defeats FoR. So don't + // assert shrink here either — but verify correctness. + roundtripUInt128(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt128UsesUpperLane) { + // Values that actually use the upper 64 bits — typical of decimal(38,*) with + // very large precomputed sums. Both lanes carry real entropy. + auto lo = genData(512, 0, (uint64_t(1) << 50) - 1, /*seed=*/11); + auto hi = genData(512, 0, (uint64_t(1) << 10) - 1, /*seed=*/13); + auto data = buildI128(lo, hi); + roundtripUInt128(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt128NearMaxValue) { + // Stress the FFOR with values near INT128_MAX. Forces bw=64 on the upper lane + // (which is full-range uint64 in this construction); roundtrip must still be + // exact even when compression doesn't shrink. + auto lo = genData(256, 0, UINT64_MAX, /*seed=*/17); + auto hi = genData(256, 0, UINT64_MAX, /*seed=*/19); + auto data = buildI128(lo, hi); + // Don't assert shrink — full-range high lane defeats FFOR. + roundtripUInt128(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt128MonotoneSequence) { + // Sequential decimal values — perfect frame-of-reference for the lo lane. + std::vector<__int128_t> data(2048); + for (size_t i = 0; i < data.size(); ++i) { + data[i] = static_cast<__int128_t>(1000000LL + static_cast(i)); + } + roundtripUInt128(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt128MaxCompressedLenBoundary) { + // maxCompressedLen must be tight enough to refuse undersized output buffers + // but loose enough to always accommodate worst-case (high-entropy) input. + // Pick a high-entropy input and verify the compress call respects the budget. + auto lo = genData(256, 0, UINT64_MAX, /*seed=*/23); + auto hi = genData(256, 0, UINT64_MAX, /*seed=*/29); + auto data = buildI128(lo, hi); + + int64_t inputSize = static_cast(data.size() * sizeof(__int128_t)); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt128); + + // maxLen budget must succeed: + std::vector ok(maxLen); + auto okResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, ok.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(okResult.ok()) << okResult.status().ToString(); + + // A budget of just the TAC header must NOT succeed: + std::vector tiny(2); + auto tinyResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, tiny.data(), tiny.size(), tac::kUInt128); + ASSERT_FALSE(tinyResult.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt128InvalidInputSize) { + // Input size not a multiple of 16 must be rejected. + std::vector bad(15, 0); + std::vector out(128); + auto result = TypeAwareCompressCodec::compress(bad.data(), bad.size(), out.data(), out.size(), tac::kUInt128); + ASSERT_FALSE(result.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt128EmptyInput) { + auto result = TypeAwareCompressCodec::compress(nullptr, 0, nullptr, 0, tac::kUInt128); + ASSERT_TRUE(result.ok()); + ASSERT_EQ(*result, 0); +} + +TEST(TypeAwareCompressCodecTest, UInt128InvalidOutputSize) { + // Decompressing into an output buffer whose size is not a multiple of 16 must error. + std::vector<__int128_t> data(4, __int128_t(42)); + int64_t inputSize = data.size() * sizeof(__int128_t); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt128); + std::vector compressed(maxLen); + auto cr = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(cr.ok()); + + std::vector badOut(15); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, badOut.data(), badOut.size()); + ASSERT_FALSE(dr.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt128CorruptedHeaderRejected) { + // Forge a wire payload with codec=kFForSplit128 + native strategy byte but + // a corrupted loCompLen larger than the body. Decompress must reject cleanly + // (not crash). + // Header bytes (private, but we know the wire format): + // byte 0: CodecId::kFForSplit128 = 2 + // byte 1: tac::kUInt128 = 1 + // byte 2: int-codec strategy: kIntStrategyNative = 0 + // bytes 3..10: int64 loCompLen + std::vector bogus(2 + 1 + 8 + 16, 0); + bogus[0] = 2; // kFForSplit128 + bogus[1] = 1; // kUInt128 + bogus[2] = 0; // kIntStrategyNative + int64_t huge = (int64_t(1) << 40); // far exceeds body length + std::memcpy(bogus.data() + 3, &huge, sizeof(int64_t)); + std::vector out(16, 0); + auto dr = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + ASSERT_FALSE(dr.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt128NegativeLoCompLenRejected) { + std::vector bogus(2 + 1 + 8 + 16, 0); + bogus[0] = 2; // kFForSplit128 + bogus[1] = 1; // kUInt128 + bogus[2] = 0; // kIntStrategyNative + int64_t neg = int64_t(-1); + std::memcpy(bogus.data() + 3, &neg, sizeof(int64_t)); + std::vector out(16, 0); + auto dr = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + ASSERT_FALSE(dr.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt128WireFormatIndependent) { + // Two payloads compressed independently must each decompress to their own + // input — proves the wire format is self-describing (no shared state). + std::vector<__int128_t> a(64, (__int128_t)1000); + std::vector<__int128_t> b(64, (__int128_t)2000); + + int64_t sz = a.size() * sizeof(__int128_t); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(sz, tac::kUInt128); + + std::vector cA(maxLen), cB(maxLen); + auto rA = TypeAwareCompressCodec::compress( + reinterpret_cast(a.data()), sz, cA.data(), maxLen, tac::kUInt128); + auto rB = TypeAwareCompressCodec::compress( + reinterpret_cast(b.data()), sz, cB.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(rA.ok()); + ASSERT_TRUE(rB.ok()); + + std::vector<__int128_t> dA(64, 0), dB(64, 0); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(cA.data(), *rA, reinterpret_cast(dA.data()), sz).ok()); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(cB.data(), *rB, reinterpret_cast(dB.data()), sz).ok()); + + for (size_t i = 0; i < 64; ++i) { + ASSERT_EQ(dA[i], a[i]); + ASSERT_EQ(dB[i], b[i]); + } +} + +TEST(TypeAwareCompressCodecTest, UInt128CompressesBetterThanUncompressed) { + // Sanity: realistic decimal(38,2) shuffle pattern (small positive prices) + // must compress to substantially less than the raw 16-bytes-per-value. + auto lo = genData(4096, 100000, 50000, /*seed=*/31); // prices ~$1000-$1500 in scale-2 + std::vector hi(4096, 0); + auto data = buildI128(lo, hi); + + int64_t inputSize = data.size() * sizeof(__int128_t); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt128); + std::vector compressed(maxLen); + + auto cr = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(cr.ok()); + + // 4096 values × 16 B = 65536 bytes raw. Tight FoR on the lo lane (~17 bits) + // plus an all-zero hi lane should easily land under 25% of raw. + ASSERT_LT(*cr, inputSize / 4) << "compressed=" << *cr << " raw=" << inputSize; +} + +// --------------------------------------------------------------------------- +// TypeAwareCompressCodec / kUInt32 — FFOR(uint64) over a zero-extended view +// of the uint32 stream. Used for INT32, DATE32, and string-offsets buffers. +// --------------------------------------------------------------------------- + +namespace { + +void roundtripUInt32(const std::vector& data, bool expectShrink) { + int64_t inputSize = static_cast(data.size() * sizeof(uint32_t)); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt32); + ASSERT_GE(maxLen, inputSize) << "maxCompressedLen must be at least input size"; + std::vector compressed(maxLen + 64, 0xCC); + size_t kSentinelStart = static_cast(maxLen); + + auto compResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt32); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + ASSERT_GT(compressedSize, 0); + ASSERT_LE(compressedSize, maxLen); + + // Sentinel tail must be untouched (no out-of-bounds write). + for (size_t i = kSentinelStart; i < compressed.size(); ++i) { + ASSERT_EQ(compressed[i], 0xCC) << "Sentinel byte trampled at offset " << i; + } + + if (expectShrink) { + ASSERT_LT(compressedSize, inputSize) << "Expected narrow-range data to compress below raw size"; + } + + std::vector decoded(data.size(), 0xDEADBEEF); + auto decResult = TypeAwareCompressCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, UInt32AllZero) { + std::vector data(256, 0); + roundtripUInt32(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt32SingleValueTail) { + // 4 values — tail handling. + std::vector data(4, 42); + // No shrink at this size — per-block header overhead exceeds payload. + roundtripUInt32(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt32NarrowPositive) { + // Typical INT32 column shuffle pattern: small values clustered together. + std::mt19937 rng(57); + std::uniform_int_distribution dist(0, 255); + std::vector data(1024); + for (auto& v : data) + v = dist(rng); + roundtripUInt32(data, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt32StringOffsetsMonotone) { + // Realistic Arrow string-offsets buffer: monotonically increasing int32 + // starting at 0, deltas in [1, 30] (typical short-string lengths). + std::mt19937 rng(101); + std::uniform_int_distribution lengthDist(1, 30); + std::vector offsets; + offsets.reserve(2048); + uint32_t cur = 0; + offsets.push_back(cur); + for (size_t i = 1; i < 2048; ++i) { + cur += lengthDist(rng); + offsets.push_back(cur); + } + // Plain FoR will pick base=0 and bw covering max-min ~ 30*2047 ~ 16 bits. + // ~2x compression vs raw — still a real saving. (Delta-FFOR would do + // much better; see future work in the writeup.) + roundtripUInt32(offsets, /*expectShrink=*/true); +} + +TEST(TypeAwareCompressCodecTest, UInt32MaxValue) { + // Full uint32 range; FFOR can't compress below raw + headers. + std::mt19937 rng(83); + std::uniform_int_distribution dist(0, UINT32_MAX); + std::vector data(256); + for (auto& v : data) + v = dist(rng); + roundtripUInt32(data, /*expectShrink=*/false); +} + +TEST(TypeAwareCompressCodecTest, UInt32MaxCompressedLenBoundary) { + std::mt19937 rng(89); + std::uniform_int_distribution dist(0, UINT32_MAX); + std::vector data(256); + for (auto& v : data) + v = dist(rng); + + int64_t inputSize = static_cast(data.size() * sizeof(uint32_t)); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt32); + + // The maxLen budget always succeeds: + std::vector ok(maxLen); + auto okResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, ok.data(), maxLen, tac::kUInt32); + ASSERT_TRUE(okResult.ok()) << okResult.status().ToString(); + + // Just the TAC header (2 bytes) is not enough: + std::vector tiny(2); + auto tinyResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, tiny.data(), tiny.size(), tac::kUInt32); + ASSERT_FALSE(tinyResult.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt32InvalidInputSize) { + // Size not a multiple of 4 must be rejected. + std::vector bad(7, 0); + std::vector out(128); + auto r = TypeAwareCompressCodec::compress(bad.data(), bad.size(), out.data(), out.size(), tac::kUInt32); + ASSERT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt32EmptyInput) { + auto r = TypeAwareCompressCodec::compress(nullptr, 0, nullptr, 0, tac::kUInt32); + ASSERT_TRUE(r.ok()); + ASSERT_EQ(*r, 0); +} + +TEST(TypeAwareCompressCodecTest, UInt32InvalidOutputSize) { + // Output size not a multiple of 4 on decompress must error cleanly. + std::vector data(4, 1234567); + int64_t inputSize = data.size() * sizeof(uint32_t); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt32); + std::vector compressed(maxLen); + auto cr = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt32); + ASSERT_TRUE(cr.ok()); + + std::vector badOut(7); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, badOut.data(), badOut.size()); + ASSERT_FALSE(dr.ok()); +} + +TEST(TypeAwareCompressCodecTest, UInt32CompressesBetterThanUncompressed) { + // INT32 columns in real shuffle: dense, narrow range (e.g. d_year values). + std::mt19937 rng(127); + std::uniform_int_distribution dist(1990, 2020); + std::vector data(4096); + for (auto& v : data) + v = dist(rng); + + int64_t inputSize = data.size() * sizeof(uint32_t); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt32); + std::vector compressed(maxLen); + + auto cr = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt32); + ASSERT_TRUE(cr.ok()); + + // 4096 × 4 B = 16384 bytes raw. Range = ~30 ⇒ bw=5 ⇒ ~3 KB packed. + // Should land under 30% of raw. + ASSERT_LT(*cr, inputSize * 0.3) << "compressed=" << *cr << " raw=" << inputSize; +} + +TEST(TypeAwareCompressCodecTest, UInt32CrossCodecIndependence) { + // Verify a kUInt32 payload and a kUInt64 payload decompress independently + // to their own data, even when one immediately follows the other in memory. + std::vector a32(64); + for (size_t i = 0; i < a32.size(); ++i) + a32[i] = static_cast(1000 + i); + std::vector a64(64); + for (size_t i = 0; i < a64.size(); ++i) + a64[i] = static_cast(2000000 + i); + + int64_t s32 = a32.size() * sizeof(uint32_t); + int64_t s64 = a64.size() * sizeof(uint64_t); + auto m32 = TypeAwareCompressCodec::maxCompressedLen(s32, tac::kUInt32); + auto m64 = TypeAwareCompressCodec::maxCompressedLen(s64, tac::kUInt64); + + std::vector c32(m32), c64(m64); + auto r32 = TypeAwareCompressCodec::compress( + reinterpret_cast(a32.data()), s32, c32.data(), m32, tac::kUInt32); + auto r64 = TypeAwareCompressCodec::compress( + reinterpret_cast(a64.data()), s64, c64.data(), m64, tac::kUInt64); + ASSERT_TRUE(r32.ok()); + ASSERT_TRUE(r64.ok()); + + std::vector d32(64, 0); + std::vector d64(64, 0); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(c32.data(), *r32, reinterpret_cast(d32.data()), s32).ok()); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(c64.data(), *r64, reinterpret_cast(d64.data()), s64).ok()); + for (size_t i = 0; i < 64; ++i) { + ASSERT_EQ(d32[i], a32[i]); + ASSERT_EQ(d64[i], a64[i]); + } +} + +// ============================================================================= +// TIMESTAMP-shaped data tests +// +// Velox Timestamp is a 16-byte struct { int64_t seconds_; uint64_t nanos_; }. +// The MS-fork TAC dispatch routes TypeKind::TIMESTAMP to tac::kUInt128, which +// applies split-lane FFOR independently on the two uint64 lanes. These tests +// exercise realistic timestamp data shapes through that path and confirm +// roundtrip correctness + reasonable compression. +// ============================================================================= + +namespace { + +// Pack (seconds, nanos) pairs into a contiguous uint8 buffer with the same +// layout as Velox FlatVector: 16 bytes per row, low 8 = seconds +// (int64), high 8 = nanos (uint64). +std::vector packTimestamps(const std::vector& seconds, const std::vector& nanos) { + EXPECT_EQ(seconds.size(), nanos.size()); + std::vector bytes(seconds.size() * 16); + for (size_t i = 0; i < seconds.size(); ++i) { + std::memcpy(bytes.data() + i * 16, &seconds[i], 8); + std::memcpy(bytes.data() + i * 16 + 8, &nanos[i], 8); + } + return bytes; +} + +void unpackTimestamps( + const uint8_t* bytes, + size_t rows, + std::vector& secondsOut, + std::vector& nanosOut) { + secondsOut.assign(rows, 0); + nanosOut.assign(rows, 0); + for (size_t i = 0; i < rows; ++i) { + std::memcpy(&secondsOut[i], bytes + i * 16, 8); + std::memcpy(&nanosOut[i], bytes + i * 16 + 8, 8); + } +} + +void roundtripTimestampViaUInt128(const std::vector& secondsIn, const std::vector& nanosIn) { + auto packed = packTimestamps(secondsIn, nanosIn); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(packed.size(), tac::kUInt128); + std::vector compressed(maxLen); + + auto r = TypeAwareCompressCodec::compress(packed.data(), packed.size(), compressed.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + + std::vector decompressed(packed.size()); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(compressed.data(), *r, decompressed.data(), packed.size()).ok()); + + std::vector secondsOut; + std::vector nanosOut; + unpackTimestamps(decompressed.data(), secondsIn.size(), secondsOut, nanosOut); + + ASSERT_EQ(secondsOut, secondsIn); + ASSERT_EQ(nanosOut, nanosIn); +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, TimestampSecondAlignedRoundtrip) { + // Realistic shape #1: a column of second-aligned timestamps from a single + // day, nanos == 0 everywhere. Seconds lane has very low FFOR width (range + // ~86400). Nanos lane is constant zero. + const int64_t baseSec = 1716220800; // 2024-05-20 16:00:00 UTC + std::vector seconds; + std::vector nanos; + for (int i = 0; i < 512; ++i) { + seconds.push_back(baseSec + i * 7); // every 7 seconds + nanos.push_back(0); + } + roundtripTimestampViaUInt128(seconds, nanos); +} + +TEST(TypeAwareCompressCodecTest, TimestampSparseNanosRoundtrip) { + // Realistic shape #2: timestamps clustered in a day, with nanos sometimes + // non-zero (e.g., sub-second precision events). Seconds lane low-width, + // nanos lane medium-width but mostly small. + const int64_t baseSec = 1716220800; + std::vector seconds; + std::vector nanos; + for (int i = 0; i < 1024; ++i) { + seconds.push_back(baseSec + i / 4); // four readings per second + nanos.push_back((i % 4) * 250'000'000ULL); // 0, 250M, 500M, 750M ns + } + roundtripTimestampViaUInt128(seconds, nanos); +} + +TEST(TypeAwareCompressCodecTest, TimestampMultiDayRoundtrip) { + // Realistic shape #3: timestamps spread across multiple days. Seconds lane + // wider than the single-day case (still narrow vs full int64 range). Tests + // that the split-lane FFOR handles realistic fact-table date ranges. + const int64_t baseSec = 1716220800; + std::vector seconds; + std::vector nanos; + for (int i = 0; i < 2048; ++i) { + seconds.push_back(baseSec + i * 3600); // hourly readings, ~85 days span + nanos.push_back(0); + } + roundtripTimestampViaUInt128(seconds, nanos); +} + +TEST(TypeAwareCompressCodecTest, TimestampAllSameRoundtrip) { + // Degenerate shape: constant timestamp column (e.g., a partition key derived + // from a fixed instant). Both lanes have bw=0; codec should still roundtrip. + std::vector seconds(256, 1716220800); + std::vector nanos(256, 12345); + roundtripTimestampViaUInt128(seconds, nanos); +} + +TEST(TypeAwareCompressCodecTest, TimestampCompressesBetterThanUncompressed) { + // Validate that for a realistic timestamp shape (dense seconds, sparse + // nanos), the kUInt128 codec actually saves bytes compared to plain memcpy. + const int64_t baseSec = 1716220800; + std::vector seconds; + std::vector nanos; + for (int i = 0; i < 4096; ++i) { + seconds.push_back(baseSec + i * 60); // minute-aligned for one minute span + nanos.push_back(0); + } + auto packed = packTimestamps(seconds, nanos); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(packed.size(), tac::kUInt128); + std::vector compressed(maxLen); + auto r = TypeAwareCompressCodec::compress(packed.data(), packed.size(), compressed.data(), maxLen, tac::kUInt128); + ASSERT_TRUE(r.ok()); + // Expect at least 4x compression for this shape (constant nanos + dense + // narrow-bw seconds). 4096 rows * 16 bytes = 65536 raw; target < 16384. + EXPECT_LT(*r, packed.size() / 4) << "compressed=" << *r << " raw=" << packed.size(); +} + +// ============================================================================= +// kStringDict adaptive codec tests +// +// String DATA buffer compression: codec tries dictionary encoding AND LZ4 +// fallback, emits whichever is smaller with a strategy byte. Reader dispatches +// on the strategy byte. +// +// These tests cover: +// - Round-trip correctness for both strategies and degenerate inputs. +// - Strategy selection: low-card with long runs => LZ4 wins; medium-card +// scattered => dict wins. +// - Invalid input rejection (truncated headers, bad indices, etc.). +// ============================================================================= + +namespace { + +// Pack a vector of strings into a contiguous data buffer + Arrow int32 offsets +// buffer (length numRows+1, offsets[0]=0, offsets[i+1]=offsets[i]+len(s_i)). +struct PackedStrings { + std::vector data; + std::vector offsets; +}; + +PackedStrings packStrings(const std::vector& strings) { + PackedStrings p; + p.offsets.reserve(strings.size() + 1); + p.offsets.push_back(0); + int32_t cur = 0; + for (const auto& s : strings) { + cur += static_cast(s.size()); + p.offsets.push_back(cur); + } + p.data.reserve(static_cast(cur)); + for (const auto& s : strings) { + p.data.insert(p.data.end(), s.begin(), s.end()); + } + return p; +} + +void roundtripStringDict(const std::vector& strings) { + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + + // For all-empty inputs (inputLen == 0), the codec returns 0 bytes (nothing + // to compress) and decompress isn't called by the wrapper — the higher-level + // wrapper short-circuits via kZeroLengthBuffer. Mirror that here. + if (inputLen == 0) { + EXPECT_EQ(*r, 0); + return; + } + + std::vector decompressed(static_cast(inputLen)); + ASSERT_TRUE(TypeAwareCompressCodec::decompress(compressed.data(), *r, decompressed.data(), inputLen).ok()); + + ASSERT_EQ(decompressed.size(), packed.data.size()); + ASSERT_EQ(std::memcmp(decompressed.data(), packed.data.data(), packed.data.size()), 0); +} + +// Inspect the strategy byte emitted (after the 2-byte codec header). +uint8_t strategyByte(const uint8_t* compressed) { + // [codec_id (1B)] [tac_type (1B)] [strategy (1B)] ... + return compressed[2]; +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, StringDictLowCardRoundtrip) { + // Very low cardinality, like d_day_name in TPC-DS: 7 unique values, many rows. + std::vector names = {"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}; + std::vector strings; + for (int i = 0; i < 1024; ++i) { + strings.push_back(names[i % names.size()]); + } + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictMediumCardScatteredRoundtrip) { + // Medium cardinality, scattered. Realistic shape for an item-brand column + // in shuffle (hundreds of unique brands distributed across rows). + std::vector strings; + strings.reserve(2048); + for (int i = 0; i < 2048; ++i) { + strings.push_back("brand_" + std::to_string((i * 7919) % 300)); + } + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictHighCardRoundtrip) { + // High cardinality (one unique per row, names-like). + std::vector strings; + strings.reserve(1024); + for (int i = 0; i < 1024; ++i) { + strings.push_back("user_" + std::to_string(i)); + } + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictAllSameRoundtrip) { + std::vector strings(256, "same"); + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictEmptyStringsRoundtrip) { + std::vector strings(128, ""); + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictMixedLengthsRoundtrip) { + std::vector strings = {"", "a", "ab", "abcdef", std::string(1024, 'x'), "tail", "", "z"}; + // Repeat the pattern to exercise the index path properly. + std::vector repeated; + for (int i = 0; i < 32; ++i) { + for (const auto& s : strings) { + repeated.push_back(s); + } + } + roundtripStringDict(repeated); +} + +TEST(TypeAwareCompressCodecTest, StringDictLargeDictionaryRoundtrip) { + // 1000 unique strings, each appearing twice. Forces 2-byte indices. + std::vector strings; + strings.reserve(2000); + for (int i = 0; i < 1000; ++i) { + strings.push_back("entry_" + std::to_string(i) + "_padding"); + } + for (int i = 0; i < 1000; ++i) { + strings.push_back("entry_" + std::to_string(i) + "_padding"); + } + roundtripStringDict(strings); +} + +TEST(TypeAwareCompressCodecTest, StringDictPicksLz4ForLongRuns) { + // 7 unique names, but presented as long consecutive runs (the case where + // LZ4 beats dict). Verify codec actually picks LZ4 strategy. + std::vector names = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}; + std::vector strings; + for (const auto& n : names) { + for (int i = 0; i < 2000; ++i) { + strings.push_back(n); + } + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 1) << "expected LZ4 strategy (1) for long-run low-card input"; +} + +TEST(TypeAwareCompressCodecTest, StringDictPicksDictForScattered) { + // High-cardinality random-content strings (e.g., the c_first_name or + // *_address shape in TPC-DS/TPC-H): LZ4 cannot find repeating substrings, + // so dict encoding cleanly wins. Validates the adaptive selection picks + // dict here. + std::mt19937 rng(42); + std::vector uniq; + for (int i = 0; i < 3000; ++i) { + std::string s; + int len = 4 + (rng() % 8); + for (int j = 0; j < len; ++j) { + s += ('a' + (rng() % 26)); + } + uniq.push_back(s); + } + std::vector strings; + strings.reserve(50000); + for (int i = 0; i < 50000; ++i) { + strings.push_back(uniq[rng() % uniq.size()]); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 0) << "expected Dict strategy (0) for high-card random-content input"; +} + +TEST(TypeAwareCompressCodecTest, StringDictRejectsMissingOffsets) { + std::vector data = {'a', 'b', 'c'}; + std::vector out(64); + auto r = TypeAwareCompressCodec::compress(data.data(), 3, out.data(), out.size(), tac::kStringDict, nullptr, 0); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, StringDictAcceptsOffsetsFirstNotZeroViaLz4) { + // Historically the codec rejected offsets[0] != 0 outright. After the OCP + // Run C regression we know Velox passes sliced buffers where offsets[0] > 0; + // the codec now silently falls back to LZ4 in that case and produces a + // bit-exact round-trip of the full input buffer (including the leading + // bytes outside the offset range). + std::vector offsets = {1, 4}; + std::vector data = {0xFF /* leading byte at offset 0 */, 'a', 'b', 'c'}; + std::vector out(64); + auto r = TypeAwareCompressCodec::compress( + data.data(), + static_cast(data.size()), + out.data(), + static_cast(out.size()), + tac::kStringDict, + reinterpret_cast(offsets.data()), + 1); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + // Confirm LZ4 strategy was picked. + ASSERT_GT(*r, 2); + EXPECT_EQ(out[2], 1u) << "non-zero offsets[0] must pick LZ4 strategy"; + // Confirm bit-exact round-trip. + std::vector decoded(data.size(), 0); + auto dr = TypeAwareCompressCodec::decompress(out.data(), *r, decoded.data(), static_cast(data.size())); + ASSERT_TRUE(dr.ok()) << dr.status().ToString(); + EXPECT_EQ(0, std::memcmp(data.data(), decoded.data(), data.size())); +} + +TEST(TypeAwareCompressCodecTest, StringDictRejectsTruncatedDecompressInput) { + std::vector tooSmall = {4 /* codec_id=kStringDict */, 3 /* tac_type */}; + std::vector out(16); + auto r = TypeAwareCompressCodec::decompress(tooSmall.data(), tooSmall.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, StringDictRejectsUnknownStrategy) { + // codec_id=kStringDict, tac_type=kStringDict, strategy=99 (invalid) + std::vector bad = {4, 3, 99}; + std::vector out(16); + auto r = TypeAwareCompressCodec::decompress(bad.data(), bad.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, StringDictCompressesBetterThanRaw) { + // For a realistic medium-card scattered input, dict must beat raw size. + std::vector strings; + for (int i = 0; i < 4096; ++i) { + strings.push_back("brand_" + std::to_string((i * 7919) % 256)); + } + auto packed = packStrings(strings); + int64_t inputLen = static_cast(packed.data.size()); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + static_cast(strings.size())); + ASSERT_TRUE(r.ok()); + EXPECT_LT(*r, inputLen / 2) << "compressed=" << *r << " raw=" << inputLen; +} +// Regression test for PR Assistant review #2 (ffor.hpp:263): encodeRt/decodeRt +// must not deref kEncodeTable/kDecodeTable with bw > 64. +TEST(FForCodecTest, RtDispatchOutOfBoundsBitWidthIsSafe) { + uint64_t in[16] = {0}; + uint64_t out[16] = {0}; + // bw = 200 is well past the 0..64 valid range. The guards in encodeRt / + // decodeRt return early without touching the dispatch tables. + ffor::encodeRt(in, out, /*base=*/0, /*n=*/16, /*bw=*/200); + ffor::decodeRt(in, out, /*base=*/0, /*n=*/16, /*bw=*/65); + // No assertion needed — the test passes if these calls don't crash. + SUCCEED(); +} + +// Regression test for PR Assistant review #3 (ffor.hpp:177): decode with +// nValues == 0 must not preload from `in`. +TEST(FForCodecTest, DecodeZeroLengthBlockIsSafe) { + // Build a compressed payload that decodes to a 0-length output: + // compress with input size 0 → output is just the kBwTailMarker header + // with count=0. Round-tripping through decompress with outputLen=0 must + // not deref `in` past the header. + std::vector compressed(64); + auto cr = FForCodec::compress(nullptr, 0, compressed.data(), compressed.size()); + ASSERT_TRUE(cr.ok()); + EXPECT_EQ(*cr, 0); + // Decompress with output buffer of size 0; must not crash. + std::vector out(8); + auto dr = FForCodec::decompress(compressed.data(), 0, out.data(), 0); + ASSERT_TRUE(dr.ok()) << dr.status().message(); +} + +// Regression test for PR Assistant reviews #4 + #5 (ffor.hpp:437): forged +// payload with a tail-marker `count` larger than remaining input bytes must +// not read past the input buffer; non-tail `count > kMaxValuesPerBlock/kLanes` +// must be rejected. +TEST(FForCodecTest, ForgedTailMarkerCountIsRejected) { + // Build a tail-marker header that claims to memcpy 1000*8 bytes but the + // payload buffer only has 8 bytes after the header. Defensive guard must + // break out of the decode loop without reading OOB. + // Header layout (kHeaderSize = 16): bw(1) + count(1) + reserved(6) + base(8). + // We need bw=kBwTailMarker=255 and count=10 (claims 80 bytes of tail). + std::vector bogus(16 + 8, 0); // header + only 8 bytes after + bogus[0] = 255; // kBwTailMarker + bogus[1] = 10; // claim 10 values = 80 bytes, but only 8 available + // No need to write base. + std::vector out(80, 0); + // Should not crash; will silently decode only what fits (or stop). + auto r = FForCodec::decompress(bogus.data(), bogus.size(), out.data(), 80); + // Either ok with truncated output or an Invalid status — both acceptable; + // the key correctness property is "no UB / OOB read". + ASSERT_TRUE(r.ok() || r.status().IsInvalid()) << r.status().message(); +} + +// Regression test for PR Assistant review (test-coverage): BW=0 path (all +// values constant). The FFOR encoder picks BW=0 when min == max in the input +// block; the constexpr decode<0> specialization just writes base for each +// output. Verify round-trip. +TEST(FForCodecTest, ConstantValuesRoundTripBW0) { + std::vector input(64, 0xDEADBEEFCAFEBABEULL); + int64_t inputSize = static_cast(input.size() * sizeof(uint64_t)); + int64_t maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + auto cr = FForCodec::compress(reinterpret_cast(input.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(cr.ok()); + // Constants compress to near-zero bytes (header + tiny payload). + EXPECT_LT(*cr, inputSize); + + std::vector output(input.size(), 0); + auto dr = FForCodec::decompress(compressed.data(), *cr, reinterpret_cast(output.data()), inputSize); + ASSERT_TRUE(dr.ok()); + EXPECT_EQ(input, output); +} + +// Regression test for PR Assistant review (test-coverage): BW=64 path (full +// 64-bit range, FFOR cannot compress). The decoder takes the BW==64 fast +// path which just adds base to each input value. +TEST(FForCodecTest, FullRangeRoundTripBW64) { + // Make input span [0, UINT64_MAX] so FFOR must use BW=64. + std::vector input; + input.reserve(64); + std::mt19937_64 rng(42); + for (int i = 0; i < 64; ++i) { + input.push_back(rng()); + } + input[0] = 0; + input[1] = UINT64_MAX; + + int64_t inputSize = static_cast(input.size() * sizeof(uint64_t)); + int64_t maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + auto cr = FForCodec::compress(reinterpret_cast(input.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(cr.ok()); + + std::vector output(input.size(), 0); + auto dr = FForCodec::decompress(compressed.data(), *cr, reinterpret_cast(output.data()), inputSize); + ASSERT_TRUE(dr.ok()); + EXPECT_EQ(input, output); +} + +// Regression test for PR Assistant review (test-coverage): N not divisible by +// kLanes=4. The compressor handles the tail via the kBwTailMarker path which +// stores leftover values uncompressed; the decompressor must round-trip them +// exactly. +TEST(FForCodecTest, TailValuesNotMultipleOfLanesRoundTrip) { + // 67 values: 64 in the main block + 3-value tail (67 % 4 == 3). + std::vector input(67); + std::iota(input.begin(), input.end(), 5000ULL); + int64_t inputSize = static_cast(input.size() * sizeof(uint64_t)); + int64_t maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + auto cr = FForCodec::compress(reinterpret_cast(input.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(cr.ok()); + + std::vector output(input.size(), 0); + auto dr = FForCodec::decompress(compressed.data(), *cr, reinterpret_cast(output.data()), inputSize); + ASSERT_TRUE(dr.ok()); + EXPECT_EQ(input, output); +} + +// Regression test for PR Assistant review (test-coverage): N > kMaxValuesPerBlock +// (=256). Forces the compressor / decompressor to emit / consume multiple +// blocks. Round-trip must remain exact across block boundaries. +TEST(FForCodecTest, MultiBlockRoundTrip) { + // 1027 values forces 5 blocks (4 of 256 + 1 tail of 3). + std::vector input(1027); + std::iota(input.begin(), input.end(), 10ULL); + int64_t inputSize = static_cast(input.size() * sizeof(uint64_t)); + int64_t maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + auto cr = FForCodec::compress(reinterpret_cast(input.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(cr.ok()); + + std::vector output(input.size(), 0); + auto dr = FForCodec::decompress(compressed.data(), *cr, reinterpret_cast(output.data()), inputSize); + ASSERT_TRUE(dr.ok()); + EXPECT_EQ(input, output); +} + +// --------------------------------------------------------------------------- +// Adaptive int-codec tests: each numeric codec (kUInt64, kUInt128, kUInt32) +// must pick the smaller of native-FFor vs LZ4-fallback. The body strategy +// byte (kIntStrategyNative=0 / kIntStrategyLz4=1) selects the decode path. +// --------------------------------------------------------------------------- + +namespace { + +// Round-trip helper that returns the strategy byte chosen by compress() for +// the given tacType. Validates correctness on the round-trip. For kUInt64 +// and kUInt32 the strategy byte sits at offset (kPayloadHeaderSize) = +// byte 2 of the compressed payload. Same for kUInt128. +uint8_t roundtripAndGetStrategy(const uint8_t* input, int64_t inputLen, int8_t tacType) { + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tacType); + std::vector compressed(static_cast(maxLen)); + auto cr = TypeAwareCompressCodec::compress(input, inputLen, compressed.data(), maxLen, tacType); + EXPECT_TRUE(cr.ok()) << cr.status().ToString(); + EXPECT_GE(*cr, 3) << "expected at least outer header (2) + strategy byte (1)"; + uint8_t strategy = compressed[2]; + + std::vector decoded(static_cast(inputLen), 0xCC); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, decoded.data(), inputLen); + EXPECT_TRUE(dr.ok()) << dr.status().ToString(); + EXPECT_EQ(0, std::memcmp(input, decoded.data(), static_cast(inputLen))); + return strategy; +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, UInt64AdaptivePicksNativeForNarrowRange) { + // Narrow-range data: FFor with bw small wins easily vs LZ4 (which has + // per-block overhead). + auto data = genData(4096, 100000, 1000); + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(uint64_t)), + tac::kUInt64); + EXPECT_EQ(strat, 0u) << "narrow-range int64 should choose native FFor"; +} + +TEST(TypeAwareCompressCodecTest, UInt64AdaptivePicksLz4ForLongRuns) { + // All-zeros (or all-same value) is the canonical case where LZ4 should + // dominate FFor: bw=0 in FFor still has a small per-block header, but + // LZ4 compresses runs of identical bytes to near-zero. + std::vector data(8192, 42); + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(uint64_t)), + tac::kUInt64); + EXPECT_EQ(strat, 1u) << "long-run int64 should fall back to LZ4"; +} + +TEST(TypeAwareCompressCodecTest, UInt64AdaptiveNoByteRegressionVsLz4) { + // For any input, the adaptive codec must produce output no larger than + // LZ4-alone (modulo the 2-byte outer header + 1-byte strategy + 4-byte + // LZ4 length prefix). This is the whole point of the adaptive design. + std::vector data(4096); + std::mt19937_64 rng(0xC0DE); + for (auto& v : data) { + v = rng() & 0xFFFFFF; // 24-bit values + } + int64_t inputLen = static_cast(data.size() * sizeof(uint64_t)); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kUInt64); + std::vector compressed(static_cast(maxLen)); + auto cr = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputLen, compressed.data(), maxLen, tac::kUInt64); + ASSERT_TRUE(cr.ok()); + + // Compute LZ4-alone reference. + auto lz4 = arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME).ValueOrDie(); + int64_t lz4Max = lz4->MaxCompressedLen(inputLen, nullptr); + std::vector lz4Buf(static_cast(lz4Max)); + auto lz4Result = lz4->Compress(inputLen, reinterpret_cast(data.data()), lz4Max, lz4Buf.data()); + ASSERT_TRUE(lz4Result.ok()); + int64_t lz4Len = *lz4Result; + + // Adaptive output is at most: 2 (TAC hdr) + 1 (strategy) + 4 (lz4Len prefix) + lz4Len. + // It can be strictly smaller if native FFor beats LZ4. + int64_t adaptiveBudget = 2 + 1 + 4 + lz4Len; + EXPECT_LE(*cr, adaptiveBudget) << "Adaptive must never exceed LZ4-alone + framing"; +} + +TEST(TypeAwareCompressCodecTest, UInt32AdaptivePicksNativeForNarrowRange) { + // Narrow-range int32: dates (e.g. 18000..19000 from epoch) — FFor wins. + std::vector data(4096); + std::mt19937 rng(31); + std::uniform_int_distribution dist(18000, 19000); + for (auto& v : data) { + v = dist(rng); + } + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(uint32_t)), + tac::kUInt32); + EXPECT_EQ(strat, 0u) << "narrow-range int32 should choose native FFor"; +} + +TEST(TypeAwareCompressCodecTest, UInt32AdaptivePicksLz4ForLongRuns) { + // Long runs of identical int32 values — LZ4 should dominate. + std::vector data(8192, 7); + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(uint32_t)), + tac::kUInt32); + EXPECT_EQ(strat, 1u) << "long-run int32 should fall back to LZ4"; +} + +TEST(TypeAwareCompressCodecTest, UInt128AdaptivePicksNativeForNarrowRange) { + // Narrow-range int128 (decimal(p,s)) with hi lane all-zero — split-lane + // FFor wins easily. + auto lo = genData(2048, 50000, 200); + std::vector hi(lo.size(), 0); + std::vector<__int128_t> data(lo.size()); + for (size_t i = 0; i < lo.size(); ++i) { + data[i] = (static_cast<__uint128_t>(hi[i]) << 64) | lo[i]; + } + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(__int128_t)), + tac::kUInt128); + EXPECT_EQ(strat, 0u) << "narrow-range int128 should choose native split-lane FFor"; +} + +TEST(TypeAwareCompressCodecTest, UInt128AdaptivePicksLz4ForLongRuns) { + // All-same int128 values — LZ4 dominates. + std::vector<__int128_t> data(4096, static_cast<__int128_t>(123456789)); + uint8_t strat = roundtripAndGetStrategy( + reinterpret_cast(data.data()), + static_cast(data.size() * sizeof(__int128_t)), + tac::kUInt128); + EXPECT_EQ(strat, 1u) << "long-run int128 should fall back to LZ4"; +} + +TEST(TypeAwareCompressCodecTest, IntCodecRejectsUnknownStrategy) { + // Forge a kFFor payload with strategy byte = 99. Must reject cleanly. + std::vector bogus = {/*codec=kFFor*/ 1, /*tac=kUInt64*/ 0, /*strategy=*/99, 0, 0, 0, 0}; + std::vector out(32); + auto r = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, IntCodecRejectsLz4StrategyWithBadLen) { + // Forge a kFFor payload with strategy=LZ4 but negative length. + std::vector bogus(2 + 1 + 4 + 4, 0); + bogus[0] = 1; // kFFor + bogus[1] = 0; // kUInt64 + bogus[2] = 1; // kIntStrategyLz4 + int32_t neg = -1; + std::memcpy(bogus.data() + 3, &neg, sizeof(int32_t)); + std::vector out(8); + auto r = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, IntCodecRejectsLz4StrategyMissingLenPrefix) { + // kFFor + kIntStrategyLz4 but body lacks the int32 length prefix. + std::vector bogus = {1, 0, 1}; + std::vector out(8); + auto r = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +TEST(TypeAwareCompressCodecTest, IntCodecMissingStrategyByteRejected) { + // Outer header present but body is empty — strategy byte missing. + std::vector bogus = {1, 0}; + std::vector out(8); + auto r = TypeAwareCompressCodec::decompress(bogus.data(), bogus.size(), out.data(), out.size()); + EXPECT_FALSE(r.ok()); +} + +// Regression test for OCP Run C (build 220597501) — Velox shuffle can pass +// a SLICED string data buffer where offsets[0] != 0 and/or +// offsets[numRows] < inputLen. Leading and trailing bytes belong to other +// slices / padding and the shuffle reader expects them preserved byte-for-byte. +// The codec MUST fall back to LZ4 (which trivially preserves bytes) in that +// case rather than rejecting the input or losing the leading/trailing bytes. +TEST(TypeAwareCompressCodecTest, StringDictSlicedBufferFallsBackToLz4) { + // Build a data buffer with 10 leading garbage bytes + 5 strings + 7 trailing + // garbage bytes. The offsets describe rows starting at byte 10. + std::vector rows = {"alpha", "beta", "gamma", "delta", "epsilon"}; + std::vector leading(10, 0xAB); + std::vector trailing(7, 0xCD); + std::vector data; + data.insert(data.end(), leading.begin(), leading.end()); + for (const auto& s : rows) { + data.insert(data.end(), s.begin(), s.end()); + } + data.insert(data.end(), trailing.begin(), trailing.end()); + + std::vector offsets; + offsets.reserve(rows.size() + 1); + int32_t cur = static_cast(leading.size()); + offsets.push_back(cur); + for (const auto& s : rows) { + cur += static_cast(s.size()); + offsets.push_back(cur); + } + + int32_t numRows = static_cast(rows.size()); + int64_t inputLen = static_cast(data.size()); + ASSERT_NE(offsets[0], 0); + ASSERT_LT(offsets[numRows], static_cast(inputLen)); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto cr = TypeAwareCompressCodec::compress( + data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(offsets.data()), + numRows); + ASSERT_TRUE(cr.ok()) << cr.status().ToString(); + + // Confirm we chose the LZ4 strategy (byte 2 of the body — after the + // 2-byte outer codec header — must be StringDictStrategy::kStrategyLz4 == 1). + ASSERT_GT(*cr, 2); + EXPECT_EQ(compressed[2], 1u) << "sliced input must pick LZ4 strategy"; + + // Round-trip must reproduce ALL inputLen bytes byte-for-byte, including the + // leading 0xAB and trailing 0xCD padding. + std::vector decoded(static_cast(inputLen), 0); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, decoded.data(), inputLen); + ASSERT_TRUE(dr.ok()) << dr.status().ToString(); + EXPECT_EQ(0, std::memcmp(data.data(), decoded.data(), static_cast(inputLen))); +} + +// Sliced input with non-zero offsets[0] only (offsets[numRows]==inputLen). +TEST(TypeAwareCompressCodecTest, StringDictLeadingOnlySliceRoundtrip) { + std::vector rows = {"foo", "bar"}; + std::vector data(3, 0xAA); // 3 garbage bytes at the start + for (const auto& s : rows) { + data.insert(data.end(), s.begin(), s.end()); + } + std::vector offsets = {3, 6, 9}; + ASSERT_EQ(static_cast(data.size()), offsets.back()); + + int32_t numRows = static_cast(rows.size()); + int64_t inputLen = static_cast(data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto cr = TypeAwareCompressCodec::compress( + data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(offsets.data()), + numRows); + ASSERT_TRUE(cr.ok()) << cr.status().ToString(); + ASSERT_GT(*cr, 2); + EXPECT_EQ(compressed[2], 1u) << "non-zero offsets[0] must pick LZ4"; + + std::vector decoded(static_cast(inputLen), 0); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, decoded.data(), inputLen); + ASSERT_TRUE(dr.ok()) << dr.status().ToString(); + EXPECT_EQ(0, std::memcmp(data.data(), decoded.data(), static_cast(inputLen))); +} + +// Sliced input with trailing padding only (offsets[0]==0, offsets[numRows] rows = {"foo", "bar"}; + std::vector data; + for (const auto& s : rows) { + data.insert(data.end(), s.begin(), s.end()); + } + data.insert(data.end(), 4, 0xBB); // 4 garbage bytes at the end + std::vector offsets = {0, 3, 6}; + + int32_t numRows = static_cast(rows.size()); + int64_t inputLen = static_cast(data.size()); + ASSERT_LT(offsets.back(), static_cast(inputLen)); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto cr = TypeAwareCompressCodec::compress( + data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(offsets.data()), + numRows); + ASSERT_TRUE(cr.ok()) << cr.status().ToString(); + EXPECT_EQ(compressed[2], 1u) << "trailing padding must pick LZ4"; + + std::vector decoded(static_cast(inputLen), 0); + auto dr = TypeAwareCompressCodec::decompress(compressed.data(), *cr, decoded.data(), inputLen); + ASSERT_TRUE(dr.ok()) << dr.status().ToString(); + EXPECT_EQ(0, std::memcmp(data.data(), decoded.data(), static_cast(inputLen))); +} + +// --------------------------------------------------------------------------- +// kStringDict v3 regression-guard tests. +// Background: the dict-build path costs O(N) hashing + heap allocations per +// column. Two guards short-circuit that on workloads where dict can never +// pay off: +// Guard 1: tiny buffer (inputLen < kDictMinInputBytes = 4096) — skip the +// dict build entirely; LZ4 dominates at that scale. +// Guard 2: single deterministic probe at clamp(numRows/8, 256, 2048) — if +// no duplicate has been seen by then, the column is essentially +// all-unique; dict can never recoup its overhead. +// v1 (75 % unique after 64 rows) regressed str_high_card8k. +// v2 (no-dup in 256 rows) regressed str_long_comments via birthday paradox. +// v3 single deterministic probe at clamped position is the production logic. +// --------------------------------------------------------------------------- + +namespace { + +// A small set of guard-test helpers. All produce inputs sized so each guard +// is hit deterministically — no flakiness from RNG. +std::vector repeat(const std::string& s, int32_t n) { + std::vector out; + out.reserve(static_cast(n)); + for (int32_t i = 0; i < n; ++i) { + out.push_back(s); + } + return out; +} + +// Append the LZ4 strategy roundtrip-byte-equality check (mirrors the rest +// of the file's pattern). +void assertRoundtripByteEqual( + const std::vector& compressed, + int64_t compressedLen, + const std::vector& original) { + std::vector decoded(original.size(), 0); + auto dr = TypeAwareCompressCodec::decompress( + compressed.data(), compressedLen, decoded.data(), static_cast(original.size())); + ASSERT_TRUE(dr.ok()) << dr.status().ToString(); + ASSERT_EQ(0, std::memcmp(original.data(), decoded.data(), original.size())); +} + +} // namespace + +TEST(TypeAwareCompressCodecTest, StringDictTinyBufferBailsToLz4) { + // 200 short strings × ~10 bytes each ≈ 2 KB — well below kDictMinInputBytes + // (4096). Guard 1 must fire and pick LZ4 even though the column is + // dictionary-friendly (low cardinality). + std::vector strings = repeat("monday", 50); + for (int i = 0; i < 50; ++i) { + strings.push_back("tuesday"); + } + for (int i = 0; i < 50; ++i) { + strings.push_back("wednesday"); + } + for (int i = 0; i < 50; ++i) { + strings.push_back("thursday"); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + ASSERT_LT(inputLen, 4096) << "test premise: input must be below kDictMinInputBytes"; + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 1u) + << "tiny input must pick LZ4 (kStrategyLz4=1); dict-build is uneconomical"; + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictAllUniqueRowsBailsToLz4) { + // 100K rows, every row distinct. Guard 2 must fire at row clamp(100000/8, + // 256, 2048) = 2048, breaking out of the dict-build loop early and picking + // LZ4. Without the guard we would scan all 100K rows for no compression win. + std::vector strings; + strings.reserve(100000); + for (int i = 0; i < 100000; ++i) { + // 24-char unique key to make inputLen comfortably above kDictMinInputBytes. + char buf[32]; + std::snprintf(buf, sizeof(buf), "uniqkey_%016d", i); + strings.emplace_back(buf); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + ASSERT_GE(inputLen, 4096) << "test premise: input must clear Guard 1"; + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 1u) << "all-unique column must bail to LZ4 (kStrategyLz4=1) via Guard 2"; + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictConstantColumnRoundtrips) { + // Every row is the same long-ish string. Tons of duplicates seen by row 1, + // so Guard 2 never trips. Guard 1 doesn't trip either because numRows×len + // is large. Both strategies (dict and LZ4) compress this shape well; the + // codec is contractually allowed to pick either as long as it picks the + // smaller body. In practice LZ4's RLE-like behaviour beats dict on truly + // constant data (one match reference spans the whole input), so the codec + // typically emits LZ4 here — that is the *correct* choice. This test only + // validates the input/output contract: compress succeeds, the result is a + // dramatic shrink (regardless of strategy), and decompress is byte-exact. + std::vector strings = repeat("the_same_string_value_repeated", 10000); + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + // Either strategy must shrink constant data by >10× (LZ4 typically gets + // 100×+; the dict path would get ~30× with 1 entry + 10K 1-byte indices). + EXPECT_LT(*r * 10, inputLen); + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictGuardKeepsDictForBoundedHighCardinality) { + // v2 false-positive shape (ported from OSS best-tac): + // bounded pool of ~8K distinct strings sampled at 64K rows. By row 256 it + // is plausible (probability ~3 %) to have seen no duplicate purely by + // chance — v2's 256-row probe would have wrongly bailed. v3's 2048-row + // probe makes P(no duplicate from 8K pool) ≈ exp(-2048²/16000) ≈ 10⁻¹¹⁴ + // — guard never fires on this shape, dict wins handily. + std::mt19937_64 rng(0x12345abc); + std::uniform_int_distribution pickPool(0, 7999); + std::vector strings; + strings.reserve(65536); + for (int i = 0; i < 65536; ++i) { + strings.push_back("hkey_" + std::to_string(pickPool(rng))); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 0u) << "bounded-cardinality column must keep Dict (kStrategyDict=0); " + << "v2 regression would have selected LZ4 here"; + EXPECT_LT(*r * 2, inputLen) << "dict should at least halve input on this shape"; + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictLongCommentsRoundtrips) { + // Mid-card with long strings (comment-like): 10K rows drawn from a pool of + // ~3K distinct ~80-char comments that share a long template prefix. + // v2 (256-row probe) regressed this shape when a particular RNG seed + // happened to yield 256 distinct draws (within birthday-paradox tolerance). + // v3 probes at 2048 rows; by then we have seen many duplicates and the + // guard does not trigger, so dict is built and offered. The codec is then + // free to pick whichever strategy compresses better; on this particular + // shape LZ4 tends to win because the shared 66-char prefix is highly + // compressible by LZ77. This test only validates the input/output + // contract: compress succeeds, the output is at least 2× smaller than the + // input (both strategies satisfy this), and decompress is byte-exact. + std::mt19937_64 rng(0xc0ffee01); + std::uniform_int_distribution pickPool(0, 2999); + std::vector strings; + strings.reserve(10000); + for (int i = 0; i < 10000; ++i) { + int idx = pickPool(rng); + std::string s = "comment_template_with_padding_to_eighty_chars_field_value_index_"; + s += std::to_string(idx); + while (s.size() < 80) { + s.push_back('x'); + } + strings.push_back(s); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_LT(*r * 2, inputLen) << "either strategy should at least halve input on this shape"; + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictConstantColumnPicksLz4) { + // Counterpart to StringDictConstantColumnRoundtrips: pin the deterministic + // strategy choice on a constant-value column. The dict body costs + // kDictBodyFixedHeader + (4 + 30) for the single entry + 10 000 x 1B for + // the indices ~= 10 KB. LZ4 on 300 KB of constant bytes is dominated by + // a single 64 KB-windowed match reference and produces only hundreds of + // bytes. LZ4 wins by 50-100x, so the codec must pick kStrategyLz4 (1). + // This test guards against any future change that would silently make + // the codec prefer dict on shapes where LZ4 is much smaller (which would + // bloat the wire form for the most-common low-cardinality patterns). + std::vector strings = repeat("the_same_string_value_repeated", 10000); + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 1u) + << "constant column: LZ4 must beat dict by 50-100x and be the chosen strategy"; + // Sanity: LZ4 on constant data should compress to well under 1% of input. + EXPECT_LT(*r * 100, inputLen); + assertRoundtripByteEqual(compressed, *r, packed.data); +} + +TEST(TypeAwareCompressCodecTest, StringDictLongCommentsPicksLz4) { + // Counterpart to StringDictLongCommentsRoundtrips: pin the deterministic + // strategy choice on a mid-cardinality column whose values share a long + // template prefix. The dict body costs ~3 000 x (4 + 80) bytes for the + // entries + 10 000 x 2 bytes for the (16-bit) indices ~= 272 KB. LZ4 + // captures the 66-byte shared prefix as a single repeated match plus + // small per-row tail differences, getting down to roughly 80-100 KB on + // 800 KB of input. LZ4 therefore wins on size and the codec must emit + // kStrategyLz4 (1). The test exists so a future change to dict-header + // accounting (or to the strategy decision) cannot silently regress to + // emitting a larger dict body on a shape LZ4 compresses better. + std::mt19937_64 rng(0xc0ffee01); + std::uniform_int_distribution pickPool(0, 2999); + std::vector strings; + strings.reserve(10000); + for (int i = 0; i < 10000; ++i) { + int idx = pickPool(rng); + std::string s = "comment_template_with_padding_to_eighty_chars_field_value_index_"; + s += std::to_string(idx); + while (s.size() < 80) { + s.push_back('x'); + } + strings.push_back(s); + } + auto packed = packStrings(strings); + int32_t numRows = static_cast(strings.size()); + int64_t inputLen = static_cast(packed.data.size()); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputLen, tac::kStringDict); + std::vector compressed(static_cast(maxLen)); + auto r = TypeAwareCompressCodec::compress( + packed.data.data(), + inputLen, + compressed.data(), + maxLen, + tac::kStringDict, + reinterpret_cast(packed.offsets.data()), + numRows); + ASSERT_TRUE(r.ok()) << r.status().ToString(); + EXPECT_EQ(strategyByte(compressed.data()), 1u) + << "long-comments shape: LZ4 must beat the ~272 KB dict body and be the chosen strategy"; + // Sanity: LZ4 on long shared-prefix data should compress at least 5x. + EXPECT_LT(*r * 5, inputLen); + assertRoundtripByteEqual(compressed, *r, packed.data); +} diff --git a/cpp/core/utils/tac/FForCodec.cc b/cpp/core/utils/tac/FForCodec.cc index ec079662f92..ecf47d2f3de 100644 --- a/cpp/core/utils/tac/FForCodec.cc +++ b/cpp/core/utils/tac/FForCodec.cc @@ -53,7 +53,8 @@ FForCodec::decompress(const uint8_t* input, int64_t inputSize, uint8_t* output, return arrow::Status::Invalid("FForCodec: output size ", outputSize, " is not a multiple of 8."); } - auto nDecoded = ffor::decompress64(input, inputSize, reinterpret_cast(output)); + size_t outputMaxValues = static_cast(outputSize) / sizeof(uint64_t); + auto nDecoded = ffor::decompress64(input, inputSize, reinterpret_cast(output), outputMaxValues); return static_cast(nDecoded); } diff --git a/cpp/core/utils/tac/TypeAwareCompressCodec.cc b/cpp/core/utils/tac/TypeAwareCompressCodec.cc index 2362f999b57..c09e3b3b4a0 100644 --- a/cpp/core/utils/tac/TypeAwareCompressCodec.cc +++ b/cpp/core/utils/tac/TypeAwareCompressCodec.cc @@ -18,17 +18,117 @@ #include "utils/tac/TypeAwareCompressCodec.h" #include "utils/tac/FForCodec.h" +#include +#include +#include +#include +#include +#include + namespace gluten { +namespace { + +// Light wrapper over arrow's LZ4_FRAME codec, lazily created and shared per +// process. Used both by kStringDict and by the adaptive LZ4 fallback added +// to the numeric codecs. +arrow::util::Codec* lz4CodecShared() { + static auto codec = arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME).ValueOrDie(); + return codec.get(); +} + +} // namespace + bool TypeAwareCompressCodec::support(int8_t tacType) { - return tacType == tac::kUInt64; + return tacType == tac::kUInt64 || tacType == tac::kUInt128 || tacType == tac::kUInt32 || tacType == tac::kStringDict; } int64_t TypeAwareCompressCodec::maxCompressedLen(int64_t inputLen, int8_t tacType) { if (!support(tacType)) { return 0; } - return kPayloadHeaderSize + FForCodec::maxCompressedLength(inputLen); + // Numeric codecs are adaptive: pick max(nativeUpperBound, lz4UpperBound) + // then add the body strategy byte. The outer TAC header (2 B) is always + // added at the very end. + auto* lz4 = lz4CodecShared(); + int64_t lz4Max = lz4->MaxCompressedLen(inputLen, nullptr) + kIntLz4BodyHeaderSize; + if (tacType == tac::kUInt128) { + // Two uint64 lanes of (inputLen / 2) bytes each + 8-byte body header. + int64_t bytesPerLane = inputLen / 2; + int64_t maxLane = FForCodec::maxCompressedLength(bytesPerLane); + int64_t nativeMax = kIntStrategyHeaderSize + kSplit128BodyHeaderSize + 2 * maxLane; + return kPayloadHeaderSize + std::max(nativeMax, lz4Max); + } + if (tacType == tac::kUInt32) { + // Widen each uint32 -> uint64 then FFor. Worst case is 2x the input + // bytes through the uint64 codec. + int64_t widenedBytes = 2 * inputLen; + int64_t nativeMax = kIntStrategyHeaderSize + FForCodec::maxCompressedLength(widenedBytes); + return kPayloadHeaderSize + std::max(nativeMax, lz4Max); + } + if (tacType == tac::kStringDict) { + // Dictionary payload worst case (all unique values) is essentially the + // original data plus per-row offsets (4B) plus per-entry length prefix + // (4B) plus indices (worst case 4B per row). LZ4 worst case has its + // own MaxCompressedLen. Be generous: use the larger of the two upper + // bounds + a small fixed-overhead allowance (codec hdr + strategy + + // payload hdr + reserved counts). 120 bytes covers all headers. + int64_t lz4Bound = lz4->MaxCompressedLen(inputLen, nullptr); + int64_t dictMax = inputLen + 4 * 8 /* widest indices */ + 120; + return kPayloadHeaderSize + 120 + std::max(lz4Bound, dictMax); + } + // kUInt64 + int64_t nativeMax = kIntStrategyHeaderSize + FForCodec::maxCompressedLength(inputLen); + return kPayloadHeaderSize + std::max(nativeMax, lz4Max); +} + +arrow::Result +TypeAwareCompressCodec::runLz4Fallback(const uint8_t* input, int64_t inputLen, std::vector& out) { + auto* codec = lz4CodecShared(); + int64_t lz4Max = codec->MaxCompressedLen(inputLen, input); + out.resize(static_cast(lz4Max)); + ARROW_ASSIGN_OR_RAISE(int64_t lz4Len, codec->Compress(inputLen, input, lz4Max, out.data())); + return lz4Len; +} + +arrow::Result +TypeAwareCompressCodec::writeLz4Body(const uint8_t* lz4Bytes, int64_t lz4Len, uint8_t* output, int64_t outputLen) { + if (lz4Len > std::numeric_limits::max()) { + return arrow::Status::Invalid( + "Int codec LZ4 body: compressed length ", lz4Len, " exceeds INT32_MAX wire-header limit"); + } + int64_t bodySize = kIntLz4BodyHeaderSize + lz4Len; + if (outputLen < bodySize) { + return arrow::Status::Invalid("Int codec LZ4 body: output too small (", outputLen, " < ", bodySize, ")"); + } + uint8_t* out = output; + *out++ = static_cast(kIntStrategyLz4); + int32_t lz4Len32 = static_cast(lz4Len); + std::memcpy(out, &lz4Len32, sizeof(int32_t)); + out += sizeof(int32_t); + std::memcpy(out, lz4Bytes, static_cast(lz4Len)); + out += lz4Len; + return out - output; +} + +arrow::Result +TypeAwareCompressCodec::decompressLz4Body(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen) { + if (inputLen < static_cast(sizeof(int32_t))) { + return arrow::Status::Invalid("Int codec LZ4 body: missing length prefix."); + } + int32_t lz4Len = 0; + std::memcpy(&lz4Len, input, sizeof(int32_t)); + const uint8_t* lz4Bytes = input + sizeof(int32_t); + int64_t remaining = inputLen - sizeof(int32_t); + if (lz4Len < 0 || lz4Len > remaining) { + return arrow::Status::Invalid("Int codec LZ4 body: invalid length ", lz4Len); + } + auto* codec = lz4CodecShared(); + ARROW_ASSIGN_OR_RAISE(int64_t nDecoded, codec->Decompress(lz4Len, lz4Bytes, outputLen, output)); + if (nDecoded != outputLen) { + return arrow::Status::Invalid("Int codec LZ4 body: decompress produced ", nDecoded, " bytes, expected ", outputLen); + } + return outputLen; } arrow::Result TypeAwareCompressCodec::compress( @@ -36,7 +136,9 @@ arrow::Result TypeAwareCompressCodec::compress( int64_t inputLen, uint8_t* output, int64_t outputLen, - int8_t tacType) { + int8_t tacType, + const uint8_t* offsetsBuffer, + int32_t numRows) { if (!support(tacType)) { return arrow::Status::Invalid("Type-aware compression not supported for tac type: ", static_cast(tacType)); } @@ -48,13 +150,87 @@ arrow::Result TypeAwareCompressCodec::compress( } auto* out = output; - *out++ = static_cast(CodecId::kFFor); + + // kStringDict has its own internal adaptive logic (dict vs LZ4) — pass through. + if (tacType == tac::kStringDict) { + if (offsetsBuffer == nullptr || numRows <= 0) { + return arrow::Status::Invalid( + "Type-aware compression (string dict): offsets buffer and positive numRows are required."); + } + *out++ = static_cast(CodecId::kStringDict); + *out++ = static_cast(tacType); + ARROW_ASSIGN_OR_RAISE( + auto bodyLen, compressStringDict(input, inputLen, offsetsBuffer, numRows, out, outputLen - kPayloadHeaderSize)); + return kPayloadHeaderSize + bodyLen; + } + + // Numeric codecs: build native AND LZ4 outputs, pick the smaller. + CodecId nativeCodecId; + switch (tacType) { + case tac::kUInt64: + nativeCodecId = CodecId::kFFor; + break; + case tac::kUInt128: + nativeCodecId = CodecId::kFForSplit128; + break; + case tac::kUInt32: + nativeCodecId = CodecId::kFForWidened32; + break; + default: + // Unreachable: support() would have rejected. + return arrow::Status::Invalid("Unhandled tac type: ", static_cast(tacType)); + } + + // Produce the native body into a scratch buffer so we can compare its size + // against LZ4 before committing to one. + int64_t nativeUpperBound = 0; + if (nativeCodecId == CodecId::kFFor) { + nativeUpperBound = FForCodec::maxCompressedLength(inputLen); + } else if (nativeCodecId == CodecId::kFForSplit128) { + int64_t bytesPerLane = inputLen / 2; + nativeUpperBound = kSplit128BodyHeaderSize + 2 * FForCodec::maxCompressedLength(bytesPerLane); + } else { + // kFForWidened32 + nativeUpperBound = FForCodec::maxCompressedLength(2 * inputLen); + } + std::vector nativeBuf(static_cast(nativeUpperBound)); + int64_t nativeLen = 0; + if (nativeCodecId == CodecId::kFFor) { + ARROW_ASSIGN_OR_RAISE(nativeLen, FForCodec::compress(input, inputLen, nativeBuf.data(), nativeUpperBound)); + } else if (nativeCodecId == CodecId::kFForSplit128) { + ARROW_ASSIGN_OR_RAISE(nativeLen, compressSplit128(input, inputLen, nativeBuf.data(), nativeUpperBound)); + } else { + ARROW_ASSIGN_OR_RAISE(nativeLen, compressWidened32(input, inputLen, nativeBuf.data(), nativeUpperBound)); + } + + // Produce LZ4 body into a scratch buffer. + std::vector lz4Buf; + ARROW_ASSIGN_OR_RAISE(int64_t lz4Len, runLz4Fallback(input, inputLen, lz4Buf)); + + int64_t nativeBodySize = kIntStrategyHeaderSize + nativeLen; + int64_t lz4BodySize = kIntLz4BodyHeaderSize + lz4Len; + + // Tie goes to native (decompression is cheaper and avoids LZ4 lookup overhead). + bool useNative = nativeBodySize <= lz4BodySize; + int64_t bodySize = useNative ? nativeBodySize : lz4BodySize; + if (outputLen - kPayloadHeaderSize < bodySize) { + return arrow::Status::Invalid( + "Adaptive int codec: output buffer too small (", outputLen - kPayloadHeaderSize, " < ", bodySize, ")"); + } + + *out++ = static_cast(nativeCodecId); *out++ = static_cast(tacType); - auto availableOutput = outputLen - kPayloadHeaderSize; - ARROW_ASSIGN_OR_RAISE(auto compressedLen, FForCodec::compress(input, inputLen, out, availableOutput)); + if (useNative) { + *out++ = static_cast(kIntStrategyNative); + std::memcpy(out, nativeBuf.data(), static_cast(nativeLen)); + out += nativeLen; + } else { + ARROW_ASSIGN_OR_RAISE(auto written, writeLz4Body(lz4Buf.data(), lz4Len, out, outputLen - kPayloadHeaderSize)); + out += written; + } - return kPayloadHeaderSize + compressedLen; + return out - output; } arrow::Result @@ -68,15 +244,576 @@ TypeAwareCompressCodec::decompress(const uint8_t* input, int64_t inputLen, uint8 [[maybe_unused]] auto tacType = *in++; auto dataLen = inputLen - kPayloadHeaderSize; + // String-dict codec keeps its own internal strategy byte; pass body through. + if (codecId == CodecId::kStringDict) { + ARROW_RETURN_NOT_OK(decompressStringDict(in, dataLen, output, outputLen)); + return inputLen; + } + + // All numeric codecs share the body strategy byte (native vs LZ4 fallback). + if (codecId != CodecId::kFFor && codecId != CodecId::kFForSplit128 && codecId != CodecId::kFForWidened32) { + return arrow::Status::Invalid("Unknown type-aware codec ID: ", static_cast(codecId)); + } + if (dataLen < kIntStrategyHeaderSize) { + return arrow::Status::Invalid("Int codec body: missing strategy byte."); + } + auto strategy = static_cast(*in++); + dataLen -= kIntStrategyHeaderSize; + + if (strategy == kIntStrategyLz4) { + ARROW_RETURN_NOT_OK(decompressLz4Body(in, dataLen, output, outputLen)); + return inputLen; + } + if (strategy != kIntStrategyNative) { + return arrow::Status::Invalid("Int codec body: unknown strategy ", static_cast(strategy)); + } + + // Native FFor path — dispatch by codec id. switch (codecId) { case CodecId::kFFor: { ARROW_ASSIGN_OR_RAISE(auto nDecoded, FForCodec::decompress(in, dataLen, output, outputLen)); - (void)nDecoded; + // FForCodec::decompress returns the number of uint64 *values* it produced + // (not bytes). The output buffer is sized for `outputLen / sizeof(uint64_t)` + // values, so any mismatch means the input stream was truncated or corrupt. + // Sibling decompressSplit128 / decompressWidened32 paths perform the same + // check; do the same here so a bad stream fails loudly instead of returning + // partially-decoded data to the caller. + const int64_t expectedValues = outputLen / static_cast(sizeof(uint64_t)); + if (nDecoded != expectedValues) { + return arrow::Status::Invalid( + "FFor decompress: produced ", nDecoded, " values, expected ", expectedValues, " (input may be truncated)"); + } + return inputLen; + } + case CodecId::kFForSplit128: { + ARROW_RETURN_NOT_OK(decompressSplit128(in, dataLen, output, outputLen)); + return inputLen; + } + case CodecId::kFForWidened32: { + ARROW_RETURN_NOT_OK(decompressWidened32(in, dataLen, output, outputLen)); return inputLen; } default: - return arrow::Status::Invalid("Unknown type-aware codec ID: ", static_cast(codecId)); + return arrow::Status::Invalid("Unreachable: codecId already validated."); + } +} + +arrow::Result +TypeAwareCompressCodec::compressSplit128(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen) { + if (inputLen % 16 != 0) { + return arrow::Status::Invalid( + "Type-aware compression (split128): input size ", inputLen, " is not a multiple of 16."); + } + if (outputLen < kSplit128BodyHeaderSize) { + return arrow::Status::Invalid("Output buffer too small for split128 body header."); + } + + const size_t nValues = static_cast(inputLen / 16); + + // Split the int128 stream into two parallel uint64 streams. + // On little-endian (x86-64 / aarch64), __int128 layout is [lo_8B][hi_8B] + // — matches Velox HugeInt::lower/upper. Use memcpy to stay alignment-safe. + std::vector lo(nValues); + std::vector hi(nValues); + const uint8_t* src = input; + for (size_t i = 0; i < nValues; ++i) { + std::memcpy(&lo[i], src, sizeof(uint64_t)); + std::memcpy(&hi[i], src + sizeof(uint64_t), sizeof(uint64_t)); + src += 16; + } + + // Body layout: | loCompLen (int64) | loPayload | hiPayload | + uint8_t* loDst = output + kSplit128BodyHeaderSize; + int64_t loBudget = outputLen - kSplit128BodyHeaderSize; + if (loBudget < 0) { + return arrow::Status::Invalid("Output buffer too small for split128 lo payload."); + } + ARROW_ASSIGN_OR_RAISE( + int64_t loLen, + FForCodec::compress( + reinterpret_cast(lo.data()), + static_cast(nValues * sizeof(uint64_t)), + loDst, + loBudget)); + + uint8_t* hiDst = loDst + loLen; + int64_t hiBudget = loBudget - loLen; + if (hiBudget < 0) { + return arrow::Status::Invalid("Output buffer too small for split128 hi payload."); + } + ARROW_ASSIGN_OR_RAISE( + int64_t hiLen, + FForCodec::compress( + reinterpret_cast(hi.data()), + static_cast(nValues * sizeof(uint64_t)), + hiDst, + hiBudget)); + + std::memcpy(output, &loLen, sizeof(int64_t)); + + return kSplit128BodyHeaderSize + loLen + hiLen; +} + +arrow::Result +TypeAwareCompressCodec::decompressSplit128(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen) { + if (inputLen < kSplit128BodyHeaderSize) { + return arrow::Status::Invalid("Input too small for split128 body header."); + } + if (outputLen % 16 != 0) { + return arrow::Status::Invalid( + "Type-aware decompression (split128): output size ", outputLen, " is not a multiple of 16."); + } + + int64_t loLen; + std::memcpy(&loLen, input, sizeof(int64_t)); + if (loLen < 0 || loLen > inputLen - kSplit128BodyHeaderSize) { + return arrow::Status::Invalid("Type-aware decompression (split128): invalid loCompLen ", loLen); + } + int64_t hiLen = inputLen - kSplit128BodyHeaderSize - loLen; + + const size_t nValues = static_cast(outputLen / 16); + const int64_t laneBytes = static_cast(nValues * sizeof(uint64_t)); + + std::vector lo(nValues); + std::vector hi(nValues); + + ARROW_ASSIGN_OR_RAISE( + auto nLo, + FForCodec::decompress(input + kSplit128BodyHeaderSize, loLen, reinterpret_cast(lo.data()), laneBytes)); + ARROW_ASSIGN_OR_RAISE( + auto nHi, + FForCodec::decompress( + input + kSplit128BodyHeaderSize + loLen, hiLen, reinterpret_cast(hi.data()), laneBytes)); + // Defense against truncated/corrupted streams: FForCodec::decompress reports + // the number of uint64 *values* it produced (not bytes). Both lanes must + // produce exactly `nValues` values; anything else means the stream was + // mis-framed and we would be reading from a partially-uninitialized lane + // vector below. + const auto expectedValues = static_cast(nValues); + if (nLo != expectedValues || nHi != expectedValues) { + return arrow::Status::Invalid( + "Split128 decompress: lane size mismatch (lo=", nLo, ", hi=", nHi, ", expected=", expectedValues, ")"); + } + + uint8_t* dst = output; + for (size_t i = 0; i < nValues; ++i) { + std::memcpy(dst, &lo[i], sizeof(uint64_t)); + std::memcpy(dst + sizeof(uint64_t), &hi[i], sizeof(uint64_t)); + dst += 16; + } + + return outputLen; +} + +arrow::Result +TypeAwareCompressCodec::compressWidened32(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen) { + if (inputLen % 4 != 0) { + return arrow::Status::Invalid( + "Type-aware compression (widened32): input size ", inputLen, " is not a multiple of 4."); + } + + const size_t nValues = static_cast(inputLen / 4); + + // Widen uint32 -> uint64 so the existing FFor(uint64) machinery applies. + // memcpy keeps it alignment-safe and matches little-endian on x86-64 / aarch64. + std::vector widened(nValues); + const uint8_t* src = input; + for (size_t i = 0; i < nValues; ++i) { + uint32_t v; + std::memcpy(&v, src, sizeof(uint32_t)); + widened[i] = static_cast(v); + src += sizeof(uint32_t); + } + + ARROW_ASSIGN_OR_RAISE( + int64_t writtenLen, + FForCodec::compress( + reinterpret_cast(widened.data()), + static_cast(nValues * sizeof(uint64_t)), + output, + outputLen)); + return writtenLen; +} + +arrow::Result TypeAwareCompressCodec::decompressWidened32( + const uint8_t* input, + int64_t inputLen, + uint8_t* output, + int64_t outputLen) { + if (outputLen % 4 != 0) { + return arrow::Status::Invalid( + "Type-aware decompression (widened32): output size ", outputLen, " is not a multiple of 4."); + } + + const size_t nValues = static_cast(outputLen / 4); + std::vector widened(nValues); + + const int64_t widenedBytes = static_cast(nValues * sizeof(uint64_t)); + ARROW_ASSIGN_OR_RAISE( + auto nDecoded, FForCodec::decompress(input, inputLen, reinterpret_cast(widened.data()), widenedBytes)); + // Defense against truncated/corrupted streams: FForCodec::decompress reports + // the number of uint64 *values* it produced (not bytes). The widened lane + // must be fully populated before we truncate it back down to 4-byte values, + // else we would be writing uninitialized memory into the caller's output. + const auto expectedValues = static_cast(nValues); + if (nDecoded != expectedValues) { + return arrow::Status::Invalid( + "Widened32 decompress: widened lane produced ", nDecoded, " values, expected ", expectedValues); + } + + // Truncate uint64 -> uint32 back into the output buffer. + uint8_t* dst = output; + for (size_t i = 0; i < nValues; ++i) { + uint32_t v = static_cast(widened[i]); + std::memcpy(dst, &v, sizeof(uint32_t)); + dst += sizeof(uint32_t); + } + + return outputLen; +} + +// String dictionary codec body format (after CodecId+tacType header): +// strategy (uint8): 0 = dict, 1 = LZ4 fallback +// +// Dict strategy body: +// indexWidth (uint8): 1, 2, or 4 bytes per index +// numRows (int32) +// dictCount (int32) +// dictBytes (int32) // total bytes of the dict section +// dict section: dictCount entries of (lengthDelta varint? no — fixed: int32 len, bytes) +// index section: numRows * indexWidth bytes +// +// LZ4 strategy body: +// compressedLen (int32) +// lz4 compressed bytes +// +// Both strategies are recoverable from the wire alone — decompressStringDict +// dispatches on the strategy byte. +namespace { + +constexpr int64_t kDictBodyFixedHeader = sizeof(uint8_t) + // strategy + sizeof(uint8_t) + // indexWidth + sizeof(int32_t) + // numRows + sizeof(int32_t) + // dictCount + sizeof(int32_t); // dictBytes + +constexpr int64_t kLz4BodyFixedHeader = sizeof(uint8_t) + // strategy + sizeof(int32_t); // compressedLen + +// Regression guards for the dict-build path. See compressStringDict for the +// detailed CPU/ratio rationale and the v1/v2 false-positive history. +// +// v1 (heuristic: bail after 64 rows if >75% unique) regressed str_high_card8k. +// v2 (single check at row 256) regressed str_long_comments via the birthday +// paradox on a sampled 10K-element string pool. +// v3 raises the probe row to clamp(numRows/8, 256, 2048): at the cap (2048) +// the probability of seeing no duplicate is still vanishingly small for any +// reasonable cardinality C (~exp(-2048²/(2C)), ~10⁻¹⁴ at C=64K, ~10⁻¹ at +// C=1M) — only essentially-all-unique columns ever bail. The cap was +// lowered from 4096 → 2048 after a sweep showed ~14% CPU savings on the +// str_unique shape (where bailing IS the right call) with zero observable +// regression on shapes that pick dict (str_high_card8k / str_long_comments). +constexpr int64_t kDictMinInputBytes = 4096; +constexpr int32_t kDictBailoutMinCheckRows = 256; +constexpr int32_t kDictBailoutMaxCheckRows = 2048; +constexpr int32_t kDictBailoutCheckDivisor = 8; + +uint8_t indexWidthFor(int32_t dictCount) { + // 0/1 entries → width 1 (still need a valid value). 2..256 → 1B. 257..65536 → 2B. else 4B. + if (dictCount <= 256) { + return 1; + } + if (dictCount <= 65536) { + return 2; + } + return 4; +} + +} // namespace + +arrow::Result TypeAwareCompressCodec::compressStringDict( + const uint8_t* input, + int64_t inputLen, + const uint8_t* offsetsBuffer, + int32_t numRows, + uint8_t* output, + int64_t outputLen) { + const int32_t* offsets = reinterpret_cast(offsetsBuffer); + + // Velox shuffle can hand us a *sliced* string-data buffer: the buffer + // contains numRows worth of variable-length payload, but the row content + // does not necessarily start at byte 0 of the buffer (leading bytes can + // belong to a previous slice or padding), and the row content may not + // extend to byte (inputLen-1) (trailing bytes likewise). + // + // The shuffle reader reconstructs the data buffer byte-for-byte from the + // compressed payload, so any leading/trailing bytes must be preserved + // exactly. The dictionary path cannot do that without extra bookkeeping + // (and the win from dict encoding is small on a sliced buffer anyway — + // sliced inputs are typically small fragments). The safe and simple fix + // is to fall back to LZ4-only when the input is sliced; LZ4 over the raw + // buffer trivially preserves bytes. + const bool sliced = (numRows >= 1) && (offsets[0] != 0 || offsets[numRows] != static_cast(inputLen)); + + // Try LZ4 over the raw input buffer regardless of slicing — LZ4 always + // round-trips inputLen bytes exactly. + auto* codec = lz4CodecShared(); + int64_t lz4Max = codec->MaxCompressedLen(inputLen, input); + std::vector lz4Buf(static_cast(lz4Max)); + ARROW_ASSIGN_OR_RAISE(int64_t lz4Len, codec->Compress(inputLen, input, lz4Max, lz4Buf.data())); + int64_t lz4BodySize = kLz4BodyFixedHeader + lz4Len; + + // Compute the dict body size only when the input is not sliced AND the + // input is large enough that dict-build CPU can pay off (Guard 1: tiny + // buffer skip). We still need to materialize the dict + indices even if + // we end up choosing it, so do the scan first; if it turns out larger + // than LZ4 (or the input is sliced / too small), we emit LZ4 instead. + // + // For production diagnosability: any decision to skip the dict path is + // observable via the wire-level strategy byte (kStrategyLz4 = 1) — no + // separate logline needed at this hot path. + int64_t dictBodySize = std::numeric_limits::max(); + std::unordered_map dict; + std::vector indices; + std::vector dictEntries; + int32_t dictCount = 0; + uint8_t indexWidth = 1; + int64_t dictBytes = 0; + bool dictBailoutFired = false; + + const bool buildDict = !sliced && inputLen >= kDictMinInputBytes; + if (buildDict) { + dict.reserve(static_cast(numRows)); + indices.assign(static_cast(numRows), 0); + dictEntries.reserve(static_cast(numRows)); + + // Guard 2: single deterministic "no-duplicates-after-N-rows" probe. + // Clamp to [kDictBailoutMinCheckRows, kDictBailoutMaxCheckRows] so we + // (a) always scan enough rows that birthday-paradox false positives are + // negligible for any reasonable cardinality, and (b) cap wasted CPU on + // true-unique columns. See namespace-level comment for full rationale. + const int32_t bailoutCheckAt = + std::clamp(numRows / kDictBailoutCheckDivisor, kDictBailoutMinCheckRows, kDictBailoutMaxCheckRows); + bool seenAnyDuplicate = false; + + for (int32_t i = 0; i < numRows; ++i) { + int32_t start = offsets[i]; + int32_t len = offsets[i + 1] - start; + if (len < 0 || start + len > inputLen) { + return arrow::Status::Invalid("String dict codec: invalid offsets at row ", i); + } + std::string_view sv(reinterpret_cast(input + start), static_cast(len)); + auto [it, inserted] = dict.try_emplace(sv, static_cast(dictEntries.size())); + if (inserted) { + dictEntries.push_back(sv); + dictBytes += sizeof(int32_t) + len; + } else { + seenAnyDuplicate = true; + } + indices[i] = it->second; + + // Single probe: if we have scanned bailoutCheckAt rows without + // observing a single duplicate, the column is essentially all-unique + // and the dict can never recoup its overhead. Bail to LZ4. + if (!seenAnyDuplicate && (i + 1) == bailoutCheckAt) { + dictBailoutFired = true; + break; + } + } + + if (!dictBailoutFired) { + dictCount = static_cast(dictEntries.size()); + indexWidth = indexWidthFor(dictCount); + int64_t indicesBytes = static_cast(numRows) * indexWidth; + dictBodySize = kDictBodyFixedHeader + dictBytes + indicesBytes; + } + } + + // Pick the smaller strategy. Tie goes to dict (cheaper to decompress). + // sliced inputs always pick LZ4 because dictBodySize is INT64_MAX. + bool useDict = dictBodySize <= lz4BodySize; + int64_t bodySize = useDict ? dictBodySize : lz4BodySize; + if (outputLen < bodySize) { + return arrow::Status::Invalid("String dict codec: output buffer too small (", outputLen, " < ", bodySize, ")"); + } + + uint8_t* out = output; + if (useDict) { + if (dictBytes > std::numeric_limits::max()) { + return arrow::Status::Invalid( + "String dict codec: dictionary total byte size ", dictBytes, " exceeds INT32_MAX wire-header limit"); + } + *out++ = static_cast(kStrategyDict); + *out++ = indexWidth; + std::memcpy(out, &numRows, sizeof(int32_t)); + out += sizeof(int32_t); + std::memcpy(out, &dictCount, sizeof(int32_t)); + out += sizeof(int32_t); + int32_t dictBytes32 = static_cast(dictBytes); + std::memcpy(out, &dictBytes32, sizeof(int32_t)); + out += sizeof(int32_t); + + for (const auto& sv : dictEntries) { + if (sv.size() > static_cast(std::numeric_limits::max())) { + return arrow::Status::Invalid( + "String dict codec: dictionary entry length ", sv.size(), " exceeds INT32_MAX wire-header limit"); + } + int32_t len = static_cast(sv.size()); + std::memcpy(out, &len, sizeof(int32_t)); + out += sizeof(int32_t); + std::memcpy(out, sv.data(), sv.size()); + out += sv.size(); + } + + for (int32_t i = 0; i < numRows; ++i) { + int32_t idx = indices[i]; + if (indexWidth == 1) { + *out++ = static_cast(idx); + } else if (indexWidth == 2) { + uint16_t v = static_cast(idx); + std::memcpy(out, &v, sizeof(uint16_t)); + out += sizeof(uint16_t); + } else { + std::memcpy(out, &idx, sizeof(int32_t)); + out += sizeof(int32_t); + } + } + } else { + if (lz4Len > std::numeric_limits::max()) { + return arrow::Status::Invalid( + "String dict codec LZ4 body: compressed length ", lz4Len, " exceeds INT32_MAX wire-header limit"); + } + *out++ = static_cast(kStrategyLz4); + int32_t lz4Len32 = static_cast(lz4Len); + std::memcpy(out, &lz4Len32, sizeof(int32_t)); + out += sizeof(int32_t); + std::memcpy(out, lz4Buf.data(), static_cast(lz4Len)); + out += lz4Len; + } + + return out - output; +} + +arrow::Result TypeAwareCompressCodec::decompressStringDict( + const uint8_t* input, + int64_t inputLen, + uint8_t* output, + int64_t outputLen) { + if (inputLen < 1) { + return arrow::Status::Invalid("String dict codec: input too small for strategy byte."); + } + StringDictStrategy strategy = static_cast(input[0]); + const uint8_t* in = input + 1; + int64_t remaining = inputLen - 1; + + if (strategy == kStrategyLz4) { + if (remaining < static_cast(sizeof(int32_t))) { + return arrow::Status::Invalid("String dict codec: LZ4 strategy missing length header."); + } + int32_t lz4Len = 0; + std::memcpy(&lz4Len, in, sizeof(int32_t)); + in += sizeof(int32_t); + remaining -= sizeof(int32_t); + if (lz4Len < 0 || lz4Len > remaining) { + return arrow::Status::Invalid("String dict codec: invalid LZ4 length ", lz4Len); + } + auto* codec = lz4CodecShared(); + ARROW_ASSIGN_OR_RAISE(int64_t nDecoded, codec->Decompress(lz4Len, in, outputLen, output)); + if (nDecoded != outputLen) { + return arrow::Status::Invalid( + "String dict codec: LZ4 decode produced ", nDecoded, " bytes, expected ", outputLen); + } + return outputLen; + } + + if (strategy != kStrategyDict) { + return arrow::Status::Invalid("String dict codec: unknown strategy ", static_cast(strategy)); + } + + if (remaining < static_cast(sizeof(uint8_t) + 3 * sizeof(int32_t))) { + return arrow::Status::Invalid("String dict codec: dict strategy header truncated."); + } + uint8_t indexWidth = *in++; + int32_t numRows = 0; + std::memcpy(&numRows, in, sizeof(int32_t)); + in += sizeof(int32_t); + int32_t dictCount = 0; + std::memcpy(&dictCount, in, sizeof(int32_t)); + in += sizeof(int32_t); + int32_t dictBytes = 0; + std::memcpy(&dictBytes, in, sizeof(int32_t)); + in += sizeof(int32_t); + remaining -= sizeof(uint8_t) + 3 * sizeof(int32_t); + + if (indexWidth != 1 && indexWidth != 2 && indexWidth != 4) { + return arrow::Status::Invalid("String dict codec: invalid indexWidth ", static_cast(indexWidth)); + } + if (numRows < 0 || dictCount < 0 || dictBytes < 0) { + return arrow::Status::Invalid("String dict codec: negative counts in header."); + } + + // Materialize dict entries as (ptr, len) into the input buffer (no copy). + std::vector> dictEntries; + dictEntries.reserve(static_cast(dictCount)); + const uint8_t* dictEnd = in + dictBytes; + if (dictEnd > input + inputLen) { + return arrow::Status::Invalid("String dict codec: dict section overruns input."); + } + for (int32_t i = 0; i < dictCount; ++i) { + if (in + sizeof(int32_t) > dictEnd) { + return arrow::Status::Invalid("String dict codec: dict entry header truncated at index ", i); + } + int32_t len = 0; + std::memcpy(&len, in, sizeof(int32_t)); + in += sizeof(int32_t); + if (len < 0 || in + len > dictEnd) { + return arrow::Status::Invalid("String dict codec: dict entry ", i, " has invalid length ", len); + } + dictEntries.emplace_back(in, len); + in += len; + } + + int64_t indicesBytes = static_cast(numRows) * indexWidth; + if (in + indicesBytes > input + inputLen) { + return arrow::Status::Invalid("String dict codec: indices section overruns input."); + } + + uint8_t* dst = output; + uint8_t* dstEnd = output + outputLen; + for (int32_t i = 0; i < numRows; ++i) { + int32_t idx = 0; + if (indexWidth == 1) { + idx = *in; + in += 1; + } else if (indexWidth == 2) { + uint16_t v = 0; + std::memcpy(&v, in, sizeof(uint16_t)); + in += sizeof(uint16_t); + idx = v; + } else { + std::memcpy(&idx, in, sizeof(int32_t)); + in += sizeof(int32_t); + } + if (idx < 0 || idx >= dictCount) { + return arrow::Status::Invalid("String dict codec: index ", idx, " out of range [0, ", dictCount, ")"); + } + const auto& entry = dictEntries[static_cast(idx)]; + if (dst + entry.second > dstEnd) { + return arrow::Status::Invalid("String dict codec: output overrun at row ", i); + } + std::memcpy(dst, entry.first, static_cast(entry.second)); + dst += entry.second; + } + + // The caller's `decompress` wrapper ignores the byte-count return value and + // assumes the whole `outputLen` window was written. Reject mismatches here + // so that any stream describing fewer (or — by way of an earlier overrun + // check — more) bytes than expected fails loudly instead of leaving the + // tail of the caller's buffer uninitialised. + if (dst != dstEnd) { + return arrow::Status::Invalid("String dict codec: decompressed ", dst - output, " bytes, expected ", outputLen); } + return outputLen; } } // namespace gluten diff --git a/cpp/core/utils/tac/TypeAwareCompressCodec.h b/cpp/core/utils/tac/TypeAwareCompressCodec.h index 6955525d2e4..106a73c20a8 100644 --- a/cpp/core/utils/tac/TypeAwareCompressCodec.h +++ b/cpp/core/utils/tac/TypeAwareCompressCodec.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace gluten { namespace tac { @@ -30,6 +31,10 @@ namespace tac { enum TacDataType : int8_t { kUnsupported = -1, // Not compressible by TAC. kUInt64 = 0, // 8-byte unsigned integer (also used for int64, double, date64). + kUInt128 = 1, // 16-byte unsigned integer (used for int128 / HugeInt / decimal(p > 18, s)). + kUInt32 = 2, // 4-byte unsigned integer (also used for int32, date32, string-offsets buffer). + kStringDict = + 3, // Variable-length string DATA buffer. Adaptive: emits dictionary or LZ4 payload based on which is smaller. }; } // namespace tac @@ -37,8 +42,32 @@ enum TacDataType : int8_t { /// TypeAwareCompressCodec provides type-aware compression that selects the best /// compression algorithm based on the data type of the buffer. /// +/// All numeric codecs (kUInt64, kUInt128, kUInt32) are adaptive: each computes +/// both its native specialized encoding AND an LZ4 baseline over the raw input, +/// emitting whichever is smaller along with a 1-byte body strategy header. +/// This guarantees we never produce more compressed bytes than LZ4 alone would. +/// /// Currently supported: -/// kUInt64 -> FFor (Frame-of-Reference + Bit-Packing) for uint64_t streams. +/// kUInt64 -> FFor (Frame-of-Reference + Bit-Packing) for uint64_t streams. +/// LZ4 fallback wins for low-cardinality data with long runs. +/// kUInt128 -> FFor split-lane: split each int128 into two uint64 lanes +/// (low / high), FFor-encode each lane independently. Reuses +/// the existing FFor(uint64) machinery. LZ4 fallback wins for +/// long runs or columns where neither lane has narrow range. +/// kUInt32 -> FFor over a zero-extended uint64 view of the uint32 stream. +/// Reuses the FFor(uint64) machinery; decompress truncates +/// back to uint32. Suited for INT32, DATE32, and string-offsets +/// buffers (where the dense int32 range and locality both fit +/// FFor's frame-of-reference well). LZ4 fallback handles the +/// low-cardinality or highly-repetitive edge cases. +/// kStringDict-> Variable-length string DATA buffer. Requires the column's +/// OFFSETS buffer (passed via compress() overload) to delimit +/// individual strings. Computes both a dictionary encoding +/// and an LZ4-compressed payload; emits whichever is smaller +/// along with a strategy byte for the reader to dispatch on. +/// LZ4 wins for low-cardinality data with long consecutive +/// runs (LZ4 finds the long-distance repetitions); dictionary +/// wins for medium-cardinality scattered data. /// /// The compressed wire format is self-describing: decompress() does not need /// a type hint because codec ID and element width are embedded in the header. @@ -51,8 +80,19 @@ class TypeAwareCompressCodec { static int64_t maxCompressedLen(int64_t inputLen, int8_t tacType); /// Compress a buffer with a type hint. Returns bytes written to output. - static arrow::Result - compress(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen, int8_t tacType); + /// + /// For kStringDict, the offsetsBuffer + numRows arguments are required and + /// must describe the buffer's row boundaries (Arrow int32 offsets array of + /// length numRows+1). For all other TAC types, these arguments are ignored + /// and should be left at their defaults. + static arrow::Result compress( + const uint8_t* input, + int64_t inputLen, + uint8_t* output, + int64_t outputLen, + int8_t tacType, + const uint8_t* offsetsBuffer = nullptr, + int32_t numRows = 0); /// Decompress without a type hint. Self-describing from the payload header. static arrow::Result decompress(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); @@ -60,9 +100,66 @@ class TypeAwareCompressCodec { private: enum CodecId : uint8_t { kFFor = 1, + kFForSplit128 = 2, + kFForWidened32 = 3, + kStringDict = 4, + }; + + /// Strategy byte for kStringDict payloads. Tells the reader whether the + /// payload was dictionary-encoded or LZ4-fallback-compressed. + enum StringDictStrategy : uint8_t { + kStrategyDict = 0, + kStrategyLz4 = 1, + }; + + /// Strategy byte for kFFor / kFForSplit128 / kFForWidened32 bodies. Each of + /// those codecs adaptively chooses between its native FFor-based encoding + /// and an LZ4 fallback over the raw input, depending on which produces a + /// smaller payload. The first byte of the body identifies which path was + /// taken so decompress() can dispatch accordingly. + enum IntCodecStrategy : uint8_t { + kIntStrategyNative = 0, + kIntStrategyLz4 = 1, }; static constexpr int64_t kPayloadHeaderSize = sizeof(uint8_t) + sizeof(uint8_t); + static constexpr int64_t kIntStrategyHeaderSize = sizeof(uint8_t); + static constexpr int64_t kIntLz4BodyHeaderSize = sizeof(uint8_t) + sizeof(int32_t); + static constexpr int64_t kSplit128BodyHeaderSize = sizeof(int64_t); + + static arrow::Result + compressSplit128(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + static arrow::Result + decompressSplit128(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + static arrow::Result + compressWidened32(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + static arrow::Result + decompressWidened32(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + static arrow::Result compressStringDict( + const uint8_t* input, + int64_t inputLen, + const uint8_t* offsetsBuffer, + int32_t numRows, + uint8_t* output, + int64_t outputLen); + + static arrow::Result + decompressStringDict(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + /// Compress `input` with LZ4 into `out`. Returns the raw lz4 byte count. + static arrow::Result runLz4Fallback(const uint8_t* input, int64_t inputLen, std::vector& out); + + /// Emit an int-codec LZ4 fallback body: strategy byte + int32 length + lz4 bytes. + static arrow::Result + writeLz4Body(const uint8_t* lz4Bytes, int64_t lz4Len, uint8_t* output, int64_t outputLen); + + /// Decode an int-codec LZ4 fallback body (strategy byte already consumed). + static arrow::Result + decompressLz4Body(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); }; } // namespace gluten diff --git a/cpp/core/utils/tac/ffor.hpp b/cpp/core/utils/tac/ffor.hpp index 0d632efff5a..e04ddadf99f 100644 --- a/cpp/core/utils/tac/ffor.hpp +++ b/cpp/core/utils/tac/ffor.hpp @@ -176,6 +176,11 @@ void decode(const uint64_t* __restrict in, uint64_t* __restrict out, uint64_t ba return; } else { constexpr uint64_t kMask = bitmask(); + // Avoid the unconditional `cur[lane] = in[...]` preload below when there + // are no values; otherwise we'd read past `in`. + if (nValues == 0) { + return; + } const size_t nGroups = nValues / kLanes; uint64_t cur[kLanes]; @@ -264,11 +269,23 @@ inline const auto kDecodeTable = makeDecodeTable(std::make_index_sequence<65>{}) // Runtime-dispatched encode (when BW is not known at compile time). inline void encodeRt(const uint64_t* in, uint64_t* out, uint64_t base, size_t n, unsigned bw) { + // Defensive bounds check: kEncodeTable has 65 entries (BW = 0..64). Any larger + // value would deref OOB function pointer and trigger UB. The encoder side + // never produces bw > 64, but a corrupted block header during decode could + // route here via encodeRt's twin path; guard explicitly. + if (bw > 64) { + return; + } detail::kEncodeTable[bw](in, out, base, n); } // Runtime-dispatched decode. inline void decodeRt(const uint64_t* in, uint64_t* out, uint64_t base, size_t n, unsigned bw) { + // See encodeRt above: kDecodeTable is bounded at 65 entries; reject any + // invalid bit-width (e.g., from a corrupted stream) before dispatch. + if (bw > 64) { + return; + } detail::kDecodeTable[bw](in, out, base, n); } @@ -419,7 +436,7 @@ inline size_t compress64(const uint64_t* input, size_t num, uint8_t* output) { // Template-based decompress with alignment dispatch. template -inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* output) { +inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* output, size_t outputMaxValues) { alignas(64) uint64_t tmpIn[kMaxValuesPerBlock]; alignas(64) uint64_t tmpOut[kMaxValuesPerBlock]; @@ -436,14 +453,43 @@ inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* if (bw == kBwTailMarker) { if (count > 0) { + const size_t tailBytes = static_cast(count) * sizeof(uint64_t); + // Defensive bounds check: a corrupted block header could overstate + // `count`; refuse the memcpy if it would read past the input buffer. + if (inPtr + tailBytes > inEnd) { + break; + } + // Output bounds check: refuse to write past the caller-allocated buffer. + if (nDecoded + static_cast(count) > outputMaxValues) { + break; + } // memcpy handles any alignment, no special case needed. - std::memcpy(reinterpret_cast(output) + nDecoded * sizeof(uint64_t), inPtr, count * sizeof(uint64_t)); + std::memcpy(reinterpret_cast(output) + nDecoded * sizeof(uint64_t), inPtr, tailBytes); nDecoded += count; } break; } + // Non-tail block: validate `count` against the legal range before + // computing `blockVals`. Without this, an attacker-controlled (or + // corrupted) header with `count > kMaxValuesPerBlock / kLanes` could + // produce `blockVals > kMaxValuesPerBlock`, overflowing `tmpOut` in the + // `OutAligned == false` path. Encoder never emits such a header + // (see compress64Impl: blockVals is capped at kMaxValuesPerBlock before + // count is derived), so any larger value indicates corruption. + if (count == 0 || static_cast(count) > (kMaxValuesPerBlock / kLanes)) { + break; + } + size_t blockVals = static_cast(count) * kLanes; + + // Output bounds check: refuse to decode a block that would overflow the + // caller-allocated output buffer. This fires before any write, preventing + // heap corruption on corrupted compressed streams. + if (nDecoded + blockVals > outputMaxValues) { + break; + } + size_t compBytes = compressedWords(blockVals, bw) * sizeof(uint64_t); if (inPtr + compBytes > inEnd) { @@ -481,19 +527,19 @@ inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* } // Runtime dispatch. -inline size_t decompress64(const uint8_t* input, size_t inputSize, uint64_t* output) { +inline size_t decompress64(const uint8_t* input, size_t inputSize, uint64_t* output, size_t outputMaxValues) { bool inOk = (reinterpret_cast(input) % alignof(uint64_t) == 0); bool outOk = (reinterpret_cast(output) % alignof(uint64_t) == 0); if (inOk && outOk) { - return decompress64Impl(input, inputSize, output); + return decompress64Impl(input, inputSize, output, outputMaxValues); } if (inOk && !outOk) { - return decompress64Impl(input, inputSize, output); + return decompress64Impl(input, inputSize, output, outputMaxValues); } if (!inOk && outOk) { - return decompress64Impl(input, inputSize, output); + return decompress64Impl(input, inputSize, output, outputMaxValues); } - return decompress64Impl(input, inputSize, output); + return decompress64Impl(input, inputSize, output, outputMaxValues); } } // namespace ffor diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index dfb799806af..6d8a37d0908 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -802,7 +802,14 @@ arrow::Status VeloxHashShuffleWriter::initColumnTypes(const facebook::velox::Row isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); isValidityBuffer_.push_back(false); - tacBufferTypes_.insert(tacBufferTypes_.end(), 3, tac::kUnsupported); + // Offsets buffer: dense, monotonically increasing int32 stream, ideal + // for FFOR(uint32). Data buffer: variable-length string payload -- + // route through the string-dict codec which adaptively picks dictionary + // encoding or LZ4 fallback per buffer, ensuring no regression versus + // the prior LZ4-only path while winning on medium-cardinality columns. + tacBufferTypes_.push_back(tac::kUnsupported); + tacBufferTypes_.push_back(tac::kUInt32); + tacBufferTypes_.push_back(tac::kStringDict); } break; case arrow::StructType::type_id: case arrow::MapType::type_id: diff --git a/cpp/velox/shuffle/VeloxTypeAwareCompress.h b/cpp/velox/shuffle/VeloxTypeAwareCompress.h index fbbabc5c06d..0d4c1cd4e51 100644 --- a/cpp/velox/shuffle/VeloxTypeAwareCompress.h +++ b/cpp/velox/shuffle/VeloxTypeAwareCompress.h @@ -27,7 +27,25 @@ namespace gluten { inline int8_t veloxTypeToTacType(facebook::velox::TypeKind kind) { switch (kind) { case facebook::velox::TypeKind::BIGINT: + // BIGINT covers signed/unsigned int64, double (reinterpreted), date64 and + // ShortDecimal(p<=18) since Velox stores ShortDecimal as DecimalType. return tac::kUInt64; + case facebook::velox::TypeKind::HUGEINT: + // HUGEINT (int128_t) covers LongDecimal(p>18) — Velox stores it as + // DecimalType. Compressed via split-lane FFOR(uint64) on the + // low / high 64 bits independently. + return tac::kUInt128; + case facebook::velox::TypeKind::TIMESTAMP: + // TIMESTAMP is a 16-byte struct { int64_t seconds_; uint64_t nanos_; }. + // Layout matches kUInt128: seconds lane has dense int64 values with + // strong locality (low FFOR bit-width); nanos lane is typically 0 or a + // small set of recurring values (tiny FFOR bit-width). + return tac::kUInt128; + case facebook::velox::TypeKind::INTEGER: + // INTEGER covers signed/unsigned int32. Velox's DateType also derives + // from IntegerType (TypeKind::INTEGER), so date32 columns flow through + // the same path. Compressed via FFOR(uint64) over a zero-extended view. + return tac::kUInt32; default: return tac::kUnsupported; }