From d73e08a67285bbf2cf575df8e8f207bc8aada6e8 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Mon, 26 Jan 2026 09:41:32 +0400 Subject: [PATCH 1/9] bitpack encoding for LID and MID blocks --- config/frac_version.go | 4 +- consts/consts.go | 2 +- frac/sealed/lids/block.go | 123 +++++--- frac/sealed/lids/block2.go | 90 ++++++ frac/sealed/lids/block_test.go | 480 +++++++++++++++++++++++++++++ frac/sealed/lids/loader.go | 25 +- frac/sealed/seqids/blocks.go | 45 ++- frac/sealed/seqids/blocks_test.go | 78 +++++ frac/sealed/seqids/loader.go | 6 +- frac/sealed/seqids/provider.go | 2 +- frac/sealed/seqids/unpack_cache.go | 3 + go.mod | 1 + go.sum | 2 + packer/bitpack_packer.go | 89 ++++++ packer/bitpack_unpacker.go | 185 +++++++++++ packer/bytes_unpacker.go | 9 + 16 files changed, 1090 insertions(+), 54 deletions(-) create mode 100644 frac/sealed/lids/block2.go create mode 100644 frac/sealed/lids/block_test.go create mode 100644 frac/sealed/seqids/blocks_test.go create mode 100644 packer/bitpack_packer.go create mode 100644 packer/bitpack_unpacker.go diff --git a/config/frac_version.go b/config/frac_version.go index d3ff1b14..012eb304 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,6 +9,8 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 + // BinaryDataV3 - MIDs and LIDs encoded in bitpack, variable LID block size + BinaryDataV3 ) -const CurrentFracVersion = BinaryDataV2 +const CurrentFracVersion = BinaryDataV3 diff --git a/consts/consts.go b/consts/consts.go index 46a81d55..7e6111e8 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -12,7 +12,7 @@ const ( DummyMID = 0 IDsPerBlock = int(4 * units.KiB) - LIDBlockCap = int(64 * units.KiB) + LIDBlockCap = int(4 * units.KiB) RegularBlockSize = int(16 * units.KiB) DefaultMaintenanceDelay = time.Second diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index 08884c5c..fe50d031 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -2,9 +2,9 @@ package lids import ( "encoding/binary" - "math" "unsafe" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/packer" ) @@ -24,19 +24,38 @@ func (b *Block) getLIDs(i int) []uint32 { } func (b *Block) Pack(dst []byte) []byte { - lastLID := int64(0) - last := b.getCount() - 1 - for i := 0; i <= last; i++ { - for _, lid := range b.getLIDs(i) { - dst = binary.AppendVarint(dst, int64(lid)-lastLID) - lastLID = int64(lid) - } + // TODO store next flags into a single byte + // write b.IsLastLID as a dedicated uint32 in the header of block + switch b.IsLastLID { + case true: + dst = binary.LittleEndian.AppendUint32(dst, 1) + case false: + dst = binary.LittleEndian.AppendUint32(dst, 0) + } + + fullBlock := len(b.LIDs) == consts.LIDBlockCap + switch fullBlock { + case true: + dst = binary.LittleEndian.AppendUint32(dst, 1) + case false: + dst = binary.LittleEndian.AppendUint32(dst, 0) + } - if i < last || b.IsLastLID { - // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) - // it is the end-marker; see `Block.Unpack()` - dst = binary.AppendVarint(dst, -1-lastLID) + if len(b.LIDs) == consts.LIDBlockCap { + offsetPacker := packer.NewBitpacker(dst, 128) + offsetPacker.Append(b.Offsets) + dst = offsetPacker.Close() + lidPacker := packer.NewBitpacker(dst, 128) + dst = lidPacker.Append4kBlock(b.LIDs) + } else { + lidPacker := packer.NewBitpacker(dst, 128) + sep := []uint32{0} + last := b.getCount() - 1 + for i := 0; i <= last; i++ { + lidPacker.Append(b.getLIDs(i)) + lidPacker.Append(sep) } + dst = lidPacker.Close() } return dst } @@ -49,41 +68,63 @@ func (b *Block) GetSizeBytes() int { return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) } +// TODO add support of the previous versions func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error { - var lid, offset uint32 - - b.IsLastLID = true - - buf.lids = buf.lids[:0] - buf.offsets = buf.offsets[:0] - buf.offsets = append(buf.offsets, 0) // first offset is always zero - unpacker := packer.NewBytesUnpacker(data) - for unpacker.Len() > 0 { - delta, err := unpacker.GetVarint() - if err != nil { - return err - } - lid += uint32(delta) + buf.Reset() + + // read IsLastLID from a dedicated uint32 + isLastLIDValue := unpacker.GetUint32() + switch isLastLIDValue { + case 1: + b.IsLastLID = true + case 0: + b.IsLastLID = false + } - if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method - offset = uint32(len(buf.lids)) - buf.offsets = append(buf.offsets, offset) - lid -= uint32(delta) - continue + fullBlock := unpacker.GetUint32() + switch fullBlock { + case 1: + // block has exactly consts.LIDBlockCap LIDs + decompressedChunk := buf.decompressed + compressedChunk := buf.compressed + offsetUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + for { + offsetChunk, ok := offsetUnpacker.NextChunk() + if !ok { + break + } + b.Offsets = append(b.Offsets, offsetChunk...) } - buf.lids = append(buf.lids, lid) - } + lidUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + b.LIDs = lidUnpacker.AllocateAndRead4kChunk() + case 0: + decompressedChunk := buf.decompressed + compressedChunk := buf.compressed + buf.offsets = append(buf.offsets, 0) + + bitpackUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + pos := 0 + for { + chunk, ok := bitpackUnpacker.NextChunk() + if !ok { + break + } + + for _, lid := range chunk { + if pos > 0 && lid == 0 { + b.LIDs = append(b.LIDs, buf.lids...) + buf.lids = buf.lids[:0] + buf.offsets = append(buf.offsets, uint32(pos)) + } else { + buf.lids = append(buf.lids, lid) + pos++ + } + } + } - if int(offset) < len(buf.lids) { - b.IsLastLID = false - buf.offsets = append(buf.offsets, uint32(len(buf.lids))) + b.Offsets = append([]uint32{}, buf.offsets...) } - - // copy from buffer - b.LIDs = append([]uint32{}, buf.lids...) - b.Offsets = append([]uint32{}, buf.offsets...) - return nil } diff --git a/frac/sealed/lids/block2.go b/frac/sealed/lids/block2.go new file mode 100644 index 00000000..77fb26a9 --- /dev/null +++ b/frac/sealed/lids/block2.go @@ -0,0 +1,90 @@ +package lids + +import ( + "encoding/binary" + "math" + "unsafe" + + "github.com/ozontech/seq-db/packer" +) + +// TODO remove legacy block +type Block2 struct { + LIDs []uint32 + Offsets []uint32 + // todo remove this legacy field + IsLastLID bool +} + +func (b *Block2) getCount() int { + return len(b.Offsets) - 1 +} + +func (b *Block2) getLIDs(i int) []uint32 { + return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] +} + +func (b *Block2) Pack(dst []byte) []byte { + lastLID := int64(0) + last := b.getCount() - 1 + for i := 0; i <= last; i++ { + for _, lid := range b.getLIDs(i) { + dst = binary.AppendVarint(dst, int64(lid)-lastLID) + lastLID = int64(lid) + } + + if i < last || b.IsLastLID { + // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) + // it is the end-marker; see `Block.Unpack()` + dst = binary.AppendVarint(dst, -1-lastLID) + } + } + return dst +} + +func (b *Block2) GetSizeBytes() int { + const ( + uint32Size = int(unsafe.Sizeof(uint32(0))) + blockSize = int(unsafe.Sizeof(*b)) + ) + return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) +} + +func (b *Block2) Unpack(data []byte, buf *UnpackBuffer) error { + var lid, offset uint32 + + b.IsLastLID = true + + buf.lids = buf.lids[:0] + buf.offsets = buf.offsets[:0] + buf.offsets = append(buf.offsets, 0) // first offset is always zero + + unpacker := packer.NewBytesUnpacker(data) + for unpacker.Len() > 0 { + delta, err := unpacker.GetVarint() + if err != nil { + return err + } + lid += uint32(delta) + + if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method + offset = uint32(len(buf.lids)) + buf.offsets = append(buf.offsets, offset) + lid -= uint32(delta) + continue + } + + buf.lids = append(buf.lids, lid) + } + + if int(offset) < len(buf.lids) { + b.IsLastLID = false + buf.offsets = append(buf.offsets, uint32(len(buf.lids))) + } + + // copy from buffer + b.LIDs = append([]uint32{}, buf.lids...) + b.Offsets = append([]uint32{}, buf.offsets...) + + return nil +} diff --git a/frac/sealed/lids/block_test.go b/frac/sealed/lids/block_test.go new file mode 100644 index 00000000..27d49a70 --- /dev/null +++ b/frac/sealed/lids/block_test.go @@ -0,0 +1,480 @@ +package lids + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBlock_Pack_Unpack(t *testing.T) { + testCases := []struct { + name string + lids []uint32 + offsets []uint32 + isLastLID bool + setup func() ([]uint32, []uint32) + }{ + { + name: "SingleToken", + lids: []uint32{100, 150, 200, 250}, + offsets: []uint32{0, 4}, + isLastLID: true, + }, + { + name: "MultipleTokens", + lids: []uint32{100, 150, 200, 250, 300, 350}, + offsets: []uint32{0, 3, 6}, + isLastLID: true, + }, + { + name: "NotLastLID", + lids: []uint32{100, 150, 200}, + offsets: []uint32{0, 3}, + isLastLID: false, + }, + { + name: "SingleLID", + lids: []uint32{100}, + offsets: []uint32{0, 1}, + isLastLID: true, + }, + { + name: "ConsecutiveLIDs", + lids: func() []uint32 { + lids := make([]uint32, 50) + for i := range lids { + lids[i] = uint32(1000 + i) + } + return lids + }(), + offsets: []uint32{0, 50}, + isLastLID: true, + }, + { + name: "LargeLIDs", + lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10}, + offsets: []uint32{0, 3}, + isLastLID: true, + }, + { + name: "MultipleTokens_IsLastLID_False", + lids: []uint32{100, 150, 200, 250, 300, 350, 400, 450}, + offsets: []uint32{0, 3, 6, 8}, + isLastLID: false, + }, + { + name: "ManyTokens", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0) + offsets := []uint32{0} + startLID := uint32(100) + for i := 0; i < 10; i++ { + for j := 0; j < 3; j++ { + lids = append(lids, startLID+uint32(i*10+j)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += 30 + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "LargeBlock", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids = append(lids, startLID+uint32(i*10)) + } + return lids, []uint32{0, uint32(len(lids))} + }, + isLastLID: true, + }, + { + name: "LargeBlock_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids = append(lids, startLID+uint32(i*10)) + } + return lids, []uint32{0, uint32(len(lids))} + }, + isLastLID: false, + }, + { + name: "LargeBlockWithMultipleTokens", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 150) + offsets := []uint32{0} + startLID := uint32(1000) + groupSize := 30 + for group := 0; group < 5; group++ { + for i := 0; i < groupSize; i++ { + lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += uint32(groupSize * 10) + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "LargeBlockWithMultipleTokens_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 150) + offsets := []uint32{0} + startLID := uint32(1000) + groupSize := 30 + for group := 0; group < 5; group++ { + for i := 0; i < groupSize; i++ { + lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += uint32(groupSize * 10) + } + return lids, offsets + }, + isLastLID: false, + }, + { + name: "Exactly128LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 128) + startLID := uint32(1000) + for i := 0; i < 128; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 128} + }, + isLastLID: true, + }, + { + name: "Exactly128LIDs_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 128) + startLID := uint32(1000) + for i := 0; i < 128; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 128} + }, + isLastLID: false, + }, + { + name: "127LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 127) + startLID := uint32(1000) + for i := 0; i < 127; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 127} + }, + isLastLID: true, + }, + { + name: "129LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 129) + startLID := uint32(1000) + for i := 0; i < 129; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 129} + }, + isLastLID: true, + }, + { + name: "129LIDs_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 129) + startLID := uint32(1000) + for i := 0; i < 129; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 129} + }, + isLastLID: false, + }, + { + name: "64k_65536_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65536 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "64k_65539_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65539 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "64k_65533_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65533 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "IsLastLID_True", + lids: []uint32{100, 150, 200, 250, 300}, + offsets: []uint32{0, 5}, + isLastLID: true, + }, + { + name: "IsLastLID_False", + lids: []uint32{100, 150, 200, 250, 300}, + offsets: []uint32{0, 5}, + isLastLID: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var lids []uint32 + var offsets []uint32 + + if tc.setup != nil { + lids, offsets = tc.setup() + } else { + lids = tc.lids + offsets = tc.offsets + } + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: tc.isLastLID, + } + + packed := block.Pack(nil) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") + }) + } +} + +func TestBlock_Pack_4k(t *testing.T) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 4 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_4k_Dense(t *testing.T) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i) + } + offsets := make([]uint32, 2*1024) + for i := 0; i < 2*1024; i++ { + offsets[i] = uint32(i) + } + offsets = append(offsets, 4*1024) + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_64k(t *testing.T) { + lids := make([]uint32, 64*1024) + startLID := uint32(1000) + for i := 0; i < 64*1024; i++ { + lids[i] = startLID + uint32(i) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 64 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_64k_Dense(t *testing.T) { + lids := make([]uint32, 64*1024) + startLID := uint32(1000) + for i := 0; i < 64*1024; i++ { + lids[i] = startLID + uint32(i) + } + offsets := make([]uint32, 32*1024) + for i := 0; i < 32*1024; i++ { + offsets[i] = uint32(i) + } + offsets = append(offsets, 64*1024) + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_Unpack_ReuseBuffer(t *testing.T) { + // Test that UnpackBuffer can be reused + block1 := &Block{ + LIDs: []uint32{100, 150, 200}, + Offsets: []uint32{0, 3}, + IsLastLID: true, + } + + block2 := &Block{ + LIDs: []uint32{300, 350, 400, 450}, + Offsets: []uint32{0, 4}, + IsLastLID: true, + } + + packed1 := block1.Pack(nil) + packed2 := block2.Pack(nil) + + buf := &UnpackBuffer{} + + unpacked1 := &Block{} + err := unpacked1.Unpack(packed1, buf) + require.NoError(t, err) + assert.Equal(t, block1.LIDs, unpacked1.LIDs) + + // Reuse the same buffer + unpacked2 := &Block{} + err = unpacked2.Unpack(packed2, buf) + require.NoError(t, err) + assert.Equal(t, block2.LIDs, unpacked2.LIDs) +} + +func BenchmarkBlock_Pack(b *testing.B) { + lids := make([]uint32, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids[i] = startLID + uint32(i*10) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, uint32(len(lids))}, + IsLastLID: true, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + block.Pack(nil) + } +} + +func BenchmarkBlock_Unpack(b *testing.B) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i*10) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 100, 500, 600, 800, 1000, 1100, 1250, 1500, 2000, 2500, 3000, 3500, 4 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + buf := &UnpackBuffer{} + unpacked := &Block{} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + unpacked.Unpack(packed, buf) + } +} diff --git a/frac/sealed/lids/loader.go b/frac/sealed/lids/loader.go index e5ee8db1..435974a3 100644 --- a/frac/sealed/lids/loader.go +++ b/frac/sealed/lids/loader.go @@ -6,8 +6,29 @@ import ( ) type UnpackBuffer struct { - lids []uint32 - offsets []uint32 + lids []uint32 + offsets []uint32 + decompressed []uint32 + compressed []uint32 +} + +func (b *UnpackBuffer) Reset() { + if b.lids == nil { + b.lids = make([]uint32, 0, 128) + } else { + b.lids = b.lids[:0] + } + if b.offsets == nil { + b.offsets = make([]uint32, 0, 8) + } else { + b.offsets = b.offsets[:0] + } + if b.decompressed == nil { + b.decompressed = make([]uint32, 0, 1024) + } + if b.compressed == nil { + b.compressed = make([]uint32, 0, 256) + } } // Loader is responsible for reading from disk, unpacking and caching LID. diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index f17f1be5..8a38bbca 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -3,8 +3,12 @@ package seqids import ( "encoding/binary" "errors" + "unsafe" + + "github.com/ronanh/intcomp" "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/seq" ) @@ -13,15 +17,46 @@ type BlockMIDs struct { } func (b BlockMIDs) Pack(dst []byte) []byte { - var prev uint64 - for _, mid := range b.Values { - dst = binary.AppendVarint(dst, int64(mid-prev)) - prev = mid + if len(b.Values) == consts.IDsPerBlock { + dst = binary.LittleEndian.AppendUint32(dst, 1) + + _, compressed := intcomp.CompressDeltaBinPackUint64(b.Values, nil) + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(compressed))) + for _, val := range compressed { + dst = binary.LittleEndian.AppendUint64(dst, val) + } + } else { + dst = binary.LittleEndian.AppendUint32(dst, 0) + + var prev uint64 + for _, mid := range b.Values { + dst = binary.AppendVarint(dst, int64(mid-prev)) + prev = mid + } } return dst } -func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion) error { +func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion, cache *unpackCache) error { + if fracVersion >= config.BinaryDataV3 { + fastPath := binary.LittleEndian.Uint32(data) + data = data[4:] + + if fastPath == 1 { + valuesCount := binary.LittleEndian.Uint32(data) + data = data[4:] + + // TODO this is unsafe, we rely that we running on little-endian host + cache.compressed = cache.compressed[:valuesCount] + byteLen := int(valuesCount) * 8 + src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), valuesCount) + copy(cache.compressed, src) + + _, b.Values = intcomp.UncompressDeltaBinPackUint64(cache.compressed, b.Values) + return nil + } + } + values, err := unpackRawMIDsVarint(data, b.Values, fracVersion) if err != nil { return err diff --git a/frac/sealed/seqids/blocks_test.go b/frac/sealed/seqids/blocks_test.go new file mode 100644 index 00000000..98f1a668 --- /dev/null +++ b/frac/sealed/seqids/blocks_test.go @@ -0,0 +1,78 @@ +package seqids + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" +) + +func TestBlockMIDs_Pack_Unpack(t *testing.T) { + tests := []struct { + name string + values []uint64 + fracVersion config.BinaryDataVersion + }{ + { + name: "SlowPath_SmallBlock", + values: []uint64{100, 200, 300, 400, 500}, + fracVersion: config.BinaryDataV3, + }, + { + name: "SlowPath_EmptyBlock", + values: []uint64{}, + fracVersion: config.BinaryDataV3, + }, + { + name: "SlowPath_SingleValue", + values: []uint64{12345678901234}, + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock", + values: generate4kMIDs(1000000000000, 1000000), + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock_LargeValues", + values: generate4kMIDs(0xFFFFFFFF00000000, 1), + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock_Sequential", + values: generate4kMIDs(0, 1), + fracVersion: config.BinaryDataV3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + original := BlockMIDs{Values: tt.values} + + packed := original.Pack(nil) + t.Logf("packed len: %d", len(packed)) + + cache := NewCache() + defer cache.Release() + + unpacked := BlockMIDs{} + err := unpacked.Unpack(packed, tt.fracVersion, cache) + require.NoError(t, err) + + require.Equal(t, len(original.Values), len(unpacked.Values), "length mismatch") + for i := range original.Values { + require.Equal(t, original.Values[i], unpacked.Values[i], "value mismatch at index %d", i) + } + }) + } +} + +func generate4kMIDs(base uint64, increment uint64) []uint64 { + values := make([]uint64, consts.IDsPerBlock) + for i := range values { + values[i] = base + uint64(i)*increment + } + return values +} diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index 1928e182..e6c79b98 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -34,7 +34,7 @@ type Loader struct { fracVersion config.BinaryDataVersion } -func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { +func (l *Loader) GetMIDsBlock(index uint32, cache *unpackCache) (BlockMIDs, error) { // load binary from index data, err := l.cacheMIDs.GetWithError(index, func() ([]byte, int, error) { data, _, err := l.reader.ReadIndexBlock(l.midBlockIndex(index), nil) @@ -48,8 +48,8 @@ func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { return BlockMIDs{}, err } // unpack - block := BlockMIDs{Values: buf} - if err := block.Unpack(data, l.fracVersion); err != nil { + block := BlockMIDs{Values: cache.values[:0]} + if err := block.Unpack(data, l.fracVersion, cache); err != nil { return BlockMIDs{}, err } return block, nil diff --git a/frac/sealed/seqids/provider.go b/frac/sealed/seqids/provider.go index dff698c9..8c32760f 100644 --- a/frac/sealed/seqids/provider.go +++ b/frac/sealed/seqids/provider.go @@ -55,7 +55,7 @@ func (p *Provider) MID(lid seq.LID) (seq.MID, error) { func (p *Provider) fillMIDs(blockIndex uint32, dst *unpackCache) error { if dst.blockIndex != int(blockIndex) { - block, err := p.loader.GetMIDsBlock(blockIndex, dst.values[:0]) + block, err := p.loader.GetMIDsBlock(blockIndex, dst) if err != nil { return err } diff --git a/frac/sealed/seqids/unpack_cache.go b/frac/sealed/seqids/unpack_cache.go index f621dd57..443d8886 100644 --- a/frac/sealed/seqids/unpack_cache.go +++ b/frac/sealed/seqids/unpack_cache.go @@ -10,6 +10,7 @@ type unpackCache struct { blockIndex int startLID uint32 values []uint64 + compressed []uint64 // buffer for decompressing MID blocks } var cachePool = sync.Pool{} @@ -26,6 +27,7 @@ func NewCache() *unpackCache { blockIndex: -1, startLID: 0, values: make([]uint64, 0, defaultValsCapacity), + compressed: make([]uint64, 0, defaultValsCapacity), } } @@ -33,6 +35,7 @@ func (c *unpackCache) reset() *unpackCache { c.blockIndex = -1 c.startLID = 0 c.values = c.values[:0] + c.compressed = c.compressed[:0] return c } diff --git a/go.mod b/go.mod index 6ba7a1d8..6c0d07df 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/ronanh/intcomp v1.1.1 // indirect github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d // indirect diff --git a/go.sum b/go.sum index 92b59e95..b30da514 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlT github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/ronanh/intcomp v1.1.1 h1:+1bGV/wEBiHI0FvzS7RHgzqOpfbBJzLIxkqMJ9e6yxY= +github.com/ronanh/intcomp v1.1.1/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M= diff --git a/packer/bitpack_packer.go b/packer/bitpack_packer.go new file mode 100644 index 00000000..b99e6ed9 --- /dev/null +++ b/packer/bitpack_packer.go @@ -0,0 +1,89 @@ +package packer + +import ( + "encoding/binary" + + "github.com/ronanh/intcomp" +) + +// TODO the whole file looks pretty unreadable + +type Bitpacker struct { + dst []byte // output buffer + // TODO need to remove this chunk, use slice from input slice instead + chunk []uint32 // temporary chunk to feed to intcomp library + compressedChunk []uint32 // temporary chunk to which intcomp will write compressed data + chunkSize int +} + +func NewBitpacker(dst []byte, chunkSize int) *Bitpacker { + if chunkSize <= 0 { + chunkSize = 1024 // default chunk size + } + return &Bitpacker{ + dst: dst, + chunk: make([]uint32, 0, chunkSize), + compressedChunk: make([]uint32, 0, chunkSize/4), + chunkSize: chunkSize, + } +} + +// Append adds values to the current chunk. When the chunk is full, it compresses +// and writes it to dst. +func (b *Bitpacker) Append(values []uint32) { + // TODO this is slow af + // TODO copy values to chunk fully if values slice is large + for _, val := range values { + b.chunk = append(b.chunk, val) + + if len(b.chunk) == b.chunkSize { + b.compressChunk() + } + } +} + +// TODO use intcomp directly for that +func (b *Bitpacker) Append4kBlock(values []uint32) []byte { + _, compressed := intcomp.CompressDeltaBinPackUint32(values, nil) + b.dst = binary.AppendVarint(b.dst, int64(len(compressed))) + + // TODO memcpy (arrow?) + for _, val := range compressed { + b.dst = binary.LittleEndian.AppendUint32(b.dst, val) + } + return b.dst +} + +func (b *Bitpacker) compressChunk() { + if len(b.chunk) == 0 { + return + } + + _, b.compressedChunk = intcomp.CompressDeltaBinPackUint32(b.chunk, b.compressedChunk) + b.dst = binary.AppendVarint(b.dst, int64(len(b.compressedChunk))) + + for _, val := range b.compressedChunk { + b.dst = binary.LittleEndian.AppendUint32(b.dst, val) + } + + b.chunk = b.chunk[:0] + b.compressedChunk = b.compressedChunk[:0] +} + +// Close writes a residual (less than 128 number) as varints +func (b *Bitpacker) Close() []byte { + // append 0 - an indicator of the last chunk with varints + b.dst = binary.AppendVarint(b.dst, 0) + + // append number of varints first + b.dst = binary.AppendVarint(b.dst, int64(len(b.chunk))) + + for _, lid := range b.chunk { + b.dst = binary.AppendVarint(b.dst, int64(lid)) + } + + // TODO remove? + // append the trailer + b.dst = binary.AppendVarint(b.dst, -1) + return b.dst +} diff --git a/packer/bitpack_unpacker.go b/packer/bitpack_unpacker.go new file mode 100644 index 00000000..a2f41175 --- /dev/null +++ b/packer/bitpack_unpacker.go @@ -0,0 +1,185 @@ +package packer + +import ( + "unsafe" + + "github.com/ronanh/intcomp" +) + +// TODO the whole file looks pretty unreadable + +// TODO use golang iter? +// ChunkIterator allows to iterate of chunks of fixed number (usually 128). The last chunk might be less +// than 128 numbers +type ChunkIterator interface { + NextChunk() ([]uint32, bool) +} + +type BitpackUnpacker struct { + unpacker *BytesUnpacker + + // TODO bad design: a caller must provided dst buffer on Next() call, we shall not own it + decompressedBuf []uint32 // descompressed buf, used to return in iterator + compressedBuf []uint32 // temporary buf, intcomp works on top of []uint32 slices + done bool +} + +var _ ChunkIterator = (*BitpackUnpacker)(nil) + +func isLittleEndian() bool { + // TODO check binary.NativeEndian == littleEndian? + return true +} + +func NewBitpackUnpacker(unpacker *BytesUnpacker, decompressedBuf []uint32, compressedBuf []uint32) *BitpackUnpacker { + return &BitpackUnpacker{ + unpacker: unpacker, + decompressedBuf: decompressedBuf, + compressedBuf: compressedBuf, + } +} + +// SkipChunks allows to skip chunks to navigate directly to chunk needed +func (b *BitpackUnpacker) SkipChunks(chunks int) bool { + for i := 0; i < chunks; i++ { + if !b.SkipChunk() { + return false + } + } + return true +} + +func (b *BitpackUnpacker) SkipChunk() bool { + if b.done || b.unpacker.Len() == 0 { + return false + } + + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return false + } + + if blockLen == 0 { + b.readVarintChunk() + // we return false, since varint chunk is always the last, so there is no chunks left + return false + } + + // -1 indicates end of all data + if blockLen < 0 { + return false + } + b.unpacker.SkipUints32(int(blockLen)) + return true +} + +// TODO use directly intcomp for that +func (b *BitpackUnpacker) AllocateAndRead4kChunk() []uint32 { + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + _, decompressed := intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, nil) + return decompressed +} + +// NextChunk returns the next decompressed chunk of values. +// Returns nil, false when all chunks have been read. +func (b *BitpackUnpacker) NextChunk() ([]uint32, bool) { + if b.done || b.unpacker.Len() == 0 { + return nil, false + } + + // read the start of the chunk. + // TODO it's a space overhead + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if blockLen == 0 { + return b.readVarintChunk() + } + if blockLen < 0 { + return nil, false + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + b.decompressedBuf = b.decompressedBuf[:0] + _, b.decompressedBuf = intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, b.decompressedBuf) + return b.decompressedBuf, true +} + +// readVarintChunk reads the varint chunk (there is at most one at the end) +func (b *BitpackUnpacker) readVarintChunk() ([]uint32, bool) { + if b.done { + return nil, false + } + + varintCount, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if varintCount == 0 { + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + b.done = true + return nil, false + } + + chunk := b.decompressedBuf[0:int(varintCount)] + for i := int64(0); i < varintCount; i++ { + val, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + chunk[i] = uint32(val) + } + + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + + b.done = true + return chunk, true +} + +// readUint32Block reads the next blockLen bytes of data into compressedBuf - a []uint32 buffer +func (b *BitpackUnpacker) readUint32Block(blockLen int64) { + if isLittleEndian() { + // TODO use apache arrow or some other lib to avoid this shuit? + if cap(b.compressedBuf) < int(blockLen) { + b.compressedBuf = make([]uint32, blockLen) + } else { + b.compressedBuf = b.compressedBuf[:blockLen] + } + + byteCount := int(blockLen) * 4 + buf := b.unpacker.GetBuffer() + if len(buf) < byteCount { + b.compressedBuf = b.compressedBuf[:0] + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } else { + src := unsafe.Slice((*uint32)(unsafe.Pointer(unsafe.SliceData(buf[:byteCount]))), blockLen) + copy(b.compressedBuf, src) + b.unpacker.SkipUints32(int(blockLen)) + } + } else { + // slow path, unpack with binary.LittleEndian.Uint32 + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } +} diff --git a/packer/bytes_unpacker.go b/packer/bytes_unpacker.go index a5e3fd59..1f728001 100644 --- a/packer/bytes_unpacker.go +++ b/packer/bytes_unpacker.go @@ -33,6 +33,15 @@ func (u *BytesUnpacker) GetUint32() uint32 { return val } +func (u *BytesUnpacker) SkipUints32(skipCount int) { + skipBytes := 4 * skipCount + u.buf = u.buf[skipBytes:] +} + +func (u *BytesUnpacker) GetBuffer() []byte { + return u.buf +} + func (u *BytesUnpacker) GetBinary() []byte { l := u.GetUint32() val := u.buf[:l] From fcc9d26c07a8df3a717514a0e4ba9fdb38c6d0ec Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:57:59 +0300 Subject: [PATCH 2/9] unify bitpack files --- packer/bitpack_packer.go | 143 ++++++++++++++++++++++++++++ packer/bitpack_unpacker.go | 185 ------------------------------------- 2 files changed, 143 insertions(+), 185 deletions(-) delete mode 100644 packer/bitpack_unpacker.go diff --git a/packer/bitpack_packer.go b/packer/bitpack_packer.go index b99e6ed9..ad611dce 100644 --- a/packer/bitpack_packer.go +++ b/packer/bitpack_packer.go @@ -2,6 +2,7 @@ package packer import ( "encoding/binary" + "unsafe" "github.com/ronanh/intcomp" ) @@ -87,3 +88,145 @@ func (b *Bitpacker) Close() []byte { b.dst = binary.AppendVarint(b.dst, -1) return b.dst } + +// TODO use golang iter? +// ChunkIterator allows to iterate of chunks of fixed number (usually 128). The last chunk might be less +// than 128 numbers +type ChunkIterator interface { + NextChunk() ([]uint32, bool) +} + +type BitpackUnpacker struct { + unpacker *BytesUnpacker + + // TODO bad design: a caller must provided dst buffer on Next() call, we shall not own it + decompressedBuf []uint32 // descompressed buf, used to return in iterator + compressedBuf []uint32 // temporary buf, intcomp works on top of []uint32 slices + done bool +} + +var _ ChunkIterator = (*BitpackUnpacker)(nil) + +func isLittleEndian() bool { + // TODO check binary.NativeEndian == littleEndian? + return true +} + +func NewBitpackUnpacker(unpacker *BytesUnpacker, decompressedBuf []uint32, compressedBuf []uint32) *BitpackUnpacker { + return &BitpackUnpacker{ + unpacker: unpacker, + decompressedBuf: decompressedBuf, + compressedBuf: compressedBuf, + } +} + +// TODO use directly intcomp for that +func (b *BitpackUnpacker) AllocateAndRead4kChunk() []uint32 { + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + _, decompressed := intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, nil) + return decompressed +} + +// NextChunk returns the next decompressed chunk of values. +// Returns nil, false when all chunks have been read. +func (b *BitpackUnpacker) NextChunk() ([]uint32, bool) { + if b.done || b.unpacker.Len() == 0 { + return nil, false + } + + // read the start of the chunk. + // TODO it's a space overhead + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if blockLen == 0 { + return b.readVarintChunk() + } + if blockLen < 0 { + return nil, false + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + b.decompressedBuf = b.decompressedBuf[:0] + _, b.decompressedBuf = intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, b.decompressedBuf) + return b.decompressedBuf, true +} + +// readVarintChunk reads the varint chunk (there is at most one at the end) +func (b *BitpackUnpacker) readVarintChunk() ([]uint32, bool) { + if b.done { + return nil, false + } + + varintCount, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if varintCount == 0 { + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + b.done = true + return nil, false + } + + chunk := b.decompressedBuf[0:int(varintCount)] + for i := int64(0); i < varintCount; i++ { + val, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + chunk[i] = uint32(val) + } + + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + + b.done = true + return chunk, true +} + +// readUint32Block reads the next blockLen bytes of data into compressedBuf - a []uint32 buffer +func (b *BitpackUnpacker) readUint32Block(blockLen int64) { + if isLittleEndian() { + // TODO use apache arrow or some other lib to avoid this shuit? + if cap(b.compressedBuf) < int(blockLen) { + b.compressedBuf = make([]uint32, blockLen) + } else { + b.compressedBuf = b.compressedBuf[:blockLen] + } + + byteCount := int(blockLen) * 4 + buf := b.unpacker.GetBuffer() + if len(buf) < byteCount { + b.compressedBuf = b.compressedBuf[:0] + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } else { + src := unsafe.Slice((*uint32)(unsafe.Pointer(unsafe.SliceData(buf[:byteCount]))), blockLen) + copy(b.compressedBuf, src) + b.unpacker.SkipUints32(int(blockLen)) + } + } else { + // slow path, unpack with binary.LittleEndian.Uint32 + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } +} diff --git a/packer/bitpack_unpacker.go b/packer/bitpack_unpacker.go deleted file mode 100644 index a2f41175..00000000 --- a/packer/bitpack_unpacker.go +++ /dev/null @@ -1,185 +0,0 @@ -package packer - -import ( - "unsafe" - - "github.com/ronanh/intcomp" -) - -// TODO the whole file looks pretty unreadable - -// TODO use golang iter? -// ChunkIterator allows to iterate of chunks of fixed number (usually 128). The last chunk might be less -// than 128 numbers -type ChunkIterator interface { - NextChunk() ([]uint32, bool) -} - -type BitpackUnpacker struct { - unpacker *BytesUnpacker - - // TODO bad design: a caller must provided dst buffer on Next() call, we shall not own it - decompressedBuf []uint32 // descompressed buf, used to return in iterator - compressedBuf []uint32 // temporary buf, intcomp works on top of []uint32 slices - done bool -} - -var _ ChunkIterator = (*BitpackUnpacker)(nil) - -func isLittleEndian() bool { - // TODO check binary.NativeEndian == littleEndian? - return true -} - -func NewBitpackUnpacker(unpacker *BytesUnpacker, decompressedBuf []uint32, compressedBuf []uint32) *BitpackUnpacker { - return &BitpackUnpacker{ - unpacker: unpacker, - decompressedBuf: decompressedBuf, - compressedBuf: compressedBuf, - } -} - -// SkipChunks allows to skip chunks to navigate directly to chunk needed -func (b *BitpackUnpacker) SkipChunks(chunks int) bool { - for i := 0; i < chunks; i++ { - if !b.SkipChunk() { - return false - } - } - return true -} - -func (b *BitpackUnpacker) SkipChunk() bool { - if b.done || b.unpacker.Len() == 0 { - return false - } - - blockLen, err := b.unpacker.GetVarint() - if err != nil { - return false - } - - if blockLen == 0 { - b.readVarintChunk() - // we return false, since varint chunk is always the last, so there is no chunks left - return false - } - - // -1 indicates end of all data - if blockLen < 0 { - return false - } - b.unpacker.SkipUints32(int(blockLen)) - return true -} - -// TODO use directly intcomp for that -func (b *BitpackUnpacker) AllocateAndRead4kChunk() []uint32 { - blockLen, err := b.unpacker.GetVarint() - if err != nil { - return nil - } - - b.compressedBuf = b.compressedBuf[:0] - b.readUint32Block(blockLen) - - _, decompressed := intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, nil) - return decompressed -} - -// NextChunk returns the next decompressed chunk of values. -// Returns nil, false when all chunks have been read. -func (b *BitpackUnpacker) NextChunk() ([]uint32, bool) { - if b.done || b.unpacker.Len() == 0 { - return nil, false - } - - // read the start of the chunk. - // TODO it's a space overhead - blockLen, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - - if blockLen == 0 { - return b.readVarintChunk() - } - if blockLen < 0 { - return nil, false - } - - b.compressedBuf = b.compressedBuf[:0] - b.readUint32Block(blockLen) - - b.decompressedBuf = b.decompressedBuf[:0] - _, b.decompressedBuf = intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, b.decompressedBuf) - return b.decompressedBuf, true -} - -// readVarintChunk reads the varint chunk (there is at most one at the end) -func (b *BitpackUnpacker) readVarintChunk() ([]uint32, bool) { - if b.done { - return nil, false - } - - varintCount, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - - if varintCount == 0 { - sentinel, err := b.unpacker.GetVarint() - if err != nil || sentinel != -1 { - return nil, false - } - b.done = true - return nil, false - } - - chunk := b.decompressedBuf[0:int(varintCount)] - for i := int64(0); i < varintCount; i++ { - val, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - chunk[i] = uint32(val) - } - - sentinel, err := b.unpacker.GetVarint() - if err != nil || sentinel != -1 { - return nil, false - } - - b.done = true - return chunk, true -} - -// readUint32Block reads the next blockLen bytes of data into compressedBuf - a []uint32 buffer -func (b *BitpackUnpacker) readUint32Block(blockLen int64) { - if isLittleEndian() { - // TODO use apache arrow or some other lib to avoid this shuit? - if cap(b.compressedBuf) < int(blockLen) { - b.compressedBuf = make([]uint32, blockLen) - } else { - b.compressedBuf = b.compressedBuf[:blockLen] - } - - byteCount := int(blockLen) * 4 - buf := b.unpacker.GetBuffer() - if len(buf) < byteCount { - b.compressedBuf = b.compressedBuf[:0] - for i := int64(0); i < blockLen; i++ { - b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) - } - } else { - src := unsafe.Slice((*uint32)(unsafe.Pointer(unsafe.SliceData(buf[:byteCount]))), blockLen) - copy(b.compressedBuf, src) - b.unpacker.SkipUints32(int(blockLen)) - } - } else { - // slow path, unpack with binary.LittleEndian.Uint32 - for i := int64(0); i < blockLen; i++ { - b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) - } - } -} From 53ef5eb92a99589fbc8a5d61b72749d90cdaffb5 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:59:02 +0300 Subject: [PATCH 3/9] simplify intcomp usage, refine tests --- cmd/index_analyzer/main.go | 3 +- config/frac_version.go | 2 +- consts/consts.go | 2 +- frac/fraction_test.go | 4 +- frac/remote.go | 2 +- frac/sealed.go | 2 +- frac/sealed/lids/block.go | 144 ++++++----- frac/sealed/lids/block2.go | 90 ------- frac/sealed/lids/block_test.go | 380 ++++++----------------------- frac/sealed/lids/loader.go | 16 +- frac/sealed/sealing/index.go | 10 +- frac/sealed/seqids/blocks.go | 56 +---- frac/sealed/seqids/blocks_test.go | 51 ++-- frac/sealed/seqids/unpack_cache.go | 2 +- packer/bitpack_packer.go | 232 ------------------ packer/delta_bitpacker.go | 167 +++++++++++++ packer/delta_bitpacker_test.go | 150 ++++++++++++ 17 files changed, 523 insertions(+), 790 deletions(-) delete mode 100644 frac/sealed/lids/block2.go delete mode 100644 packer/bitpack_packer.go create mode 100644 packer/delta_bitpacker.go create mode 100644 packer/delta_bitpacker_test.go diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index b1b22323..d3a97303 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -104,6 +104,7 @@ func analyzeIndex( if err := b.Unpack(readBlock()); err != nil { logger.Fatal("error unpacking block info", zap.Error(err)) } + ver := b.Info.BinaryDataVer docsCount := int(b.Info.DocsTotal) @@ -162,7 +163,7 @@ func analyzeIndex( } block := &lids.Block{} - if err := block.Unpack(data, &lids.UnpackBuffer{}); err != nil { + if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil { logger.Fatal("error unpacking lids block", zap.Error(err)) } diff --git a/config/frac_version.go b/config/frac_version.go index 012eb304..197ecfa1 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,7 +9,7 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 - // BinaryDataV3 - MIDs and LIDs encoded in bitpack, variable LID block size + // BinaryDataV3 - delta bitpack encoded MIDs and LIDs BinaryDataV3 ) diff --git a/consts/consts.go b/consts/consts.go index 8d418853..aea96645 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -12,7 +12,7 @@ const ( DummyMID = 0 IDsPerBlock = int(4 * units.KiB) - LIDBlockCap = int(4 * units.KiB) + LIDBlockCap = int(64 * units.KiB) RegularBlockSize = int(16 * units.KiB) DefaultMaintenanceDelay = time.Second diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 161e1270..641c0043 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1636,8 +1636,8 @@ func (s *FractionTestSuite) TestFractionInfo() { "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500), - "index on disk doesn't match. actual value: %d", info.MetaOnDisk) + s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1550), + "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") } diff --git a/frac/remote.go b/frac/remote.go index c2088caa..fdfb276d 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -147,7 +147,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), + lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..d9bf5b9a 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -323,7 +323,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), + lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index fe50d031..b5f3d2f5 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -2,9 +2,11 @@ package lids import ( "encoding/binary" + "errors" + "math" "unsafe" - "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/packer" ) @@ -23,40 +25,18 @@ func (b *Block) getLIDs(i int) []uint32 { return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] } -func (b *Block) Pack(dst []byte) []byte { +func (b *Block) Pack(dst []byte, tmp []uint32) []byte { // TODO store next flags into a single byte // write b.IsLastLID as a dedicated uint32 in the header of block - switch b.IsLastLID { - case true: + if b.IsLastLID { dst = binary.LittleEndian.AppendUint32(dst, 1) - case false: + } else { dst = binary.LittleEndian.AppendUint32(dst, 0) } - fullBlock := len(b.LIDs) == consts.LIDBlockCap - switch fullBlock { - case true: - dst = binary.LittleEndian.AppendUint32(dst, 1) - case false: - dst = binary.LittleEndian.AppendUint32(dst, 0) - } + dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, tmp) + dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, tmp) - if len(b.LIDs) == consts.LIDBlockCap { - offsetPacker := packer.NewBitpacker(dst, 128) - offsetPacker.Append(b.Offsets) - dst = offsetPacker.Close() - lidPacker := packer.NewBitpacker(dst, 128) - dst = lidPacker.Append4kBlock(b.LIDs) - } else { - lidPacker := packer.NewBitpacker(dst, 128) - sep := []uint32{0} - last := b.getCount() - 1 - for i := 0; i <= last; i++ { - lidPacker.Append(b.getLIDs(i)) - lidPacker.Append(sep) - } - dst = lidPacker.Close() - } return dst } @@ -68,63 +48,75 @@ func (b *Block) GetSizeBytes() int { return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) } -// TODO add support of the previous versions -func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error { - unpacker := packer.NewBytesUnpacker(data) +func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *UnpackBuffer) error { buf.Reset() + if fracVer >= config.BinaryDataV3 { + return b.unpackBitpack(data, buf) + } + + return b.unpackVarint(data, buf) +} + +func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error { // read IsLastLID from a dedicated uint32 - isLastLIDValue := unpacker.GetUint32() - switch isLastLIDValue { - case 1: - b.IsLastLID = true - case 0: - b.IsLastLID = false + if len(data) < 4 { + return errors.New("lids block decode error: truncated IsLastLID header") } + isLastLIDValue := binary.LittleEndian.Uint32(data[:4]) + b.IsLastLID = isLastLIDValue == 1 + data = data[4:] + + var err error + var values []uint32 - fullBlock := unpacker.GetUint32() - switch fullBlock { - case 1: - // block has exactly consts.LIDBlockCap LIDs - decompressedChunk := buf.decompressed - compressedChunk := buf.compressed - offsetUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) - for { - offsetChunk, ok := offsetUnpacker.NextChunk() - if !ok { - break - } - b.Offsets = append(b.Offsets, offsetChunk...) + data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed) + if err != nil { + return err + } + b.Offsets = append([]uint32{}, values...) + + data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed) + if err != nil { + return err + } + b.LIDs = append([]uint32{}, values...) + return nil +} + +func (b *Block) unpackVarint(data []byte, buf *UnpackBuffer) error { + var lid, offset uint32 + + b.IsLastLID = true + + buf.offsets = append(buf.offsets, 0) // first offset is always zero + + unpacker := packer.NewBytesUnpacker(data) + for unpacker.Len() > 0 { + delta, err := unpacker.GetVarint() + if err != nil { + return err } + lid += uint32(delta) - lidUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) - b.LIDs = lidUnpacker.AllocateAndRead4kChunk() - case 0: - decompressedChunk := buf.decompressed - compressedChunk := buf.compressed - buf.offsets = append(buf.offsets, 0) - - bitpackUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) - pos := 0 - for { - chunk, ok := bitpackUnpacker.NextChunk() - if !ok { - break - } - - for _, lid := range chunk { - if pos > 0 && lid == 0 { - b.LIDs = append(b.LIDs, buf.lids...) - buf.lids = buf.lids[:0] - buf.offsets = append(buf.offsets, uint32(pos)) - } else { - buf.lids = append(buf.lids, lid) - pos++ - } - } + if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method + offset = uint32(len(buf.lids)) + buf.offsets = append(buf.offsets, offset) + lid -= uint32(delta) + continue } - b.Offsets = append([]uint32{}, buf.offsets...) + buf.lids = append(buf.lids, lid) } + + if int(offset) < len(buf.lids) { + b.IsLastLID = false + buf.offsets = append(buf.offsets, uint32(len(buf.lids))) + } + + // copy from buffer + b.LIDs = append([]uint32{}, buf.lids...) + b.Offsets = append([]uint32{}, buf.offsets...) + return nil } diff --git a/frac/sealed/lids/block2.go b/frac/sealed/lids/block2.go deleted file mode 100644 index 77fb26a9..00000000 --- a/frac/sealed/lids/block2.go +++ /dev/null @@ -1,90 +0,0 @@ -package lids - -import ( - "encoding/binary" - "math" - "unsafe" - - "github.com/ozontech/seq-db/packer" -) - -// TODO remove legacy block -type Block2 struct { - LIDs []uint32 - Offsets []uint32 - // todo remove this legacy field - IsLastLID bool -} - -func (b *Block2) getCount() int { - return len(b.Offsets) - 1 -} - -func (b *Block2) getLIDs(i int) []uint32 { - return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] -} - -func (b *Block2) Pack(dst []byte) []byte { - lastLID := int64(0) - last := b.getCount() - 1 - for i := 0; i <= last; i++ { - for _, lid := range b.getLIDs(i) { - dst = binary.AppendVarint(dst, int64(lid)-lastLID) - lastLID = int64(lid) - } - - if i < last || b.IsLastLID { - // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) - // it is the end-marker; see `Block.Unpack()` - dst = binary.AppendVarint(dst, -1-lastLID) - } - } - return dst -} - -func (b *Block2) GetSizeBytes() int { - const ( - uint32Size = int(unsafe.Sizeof(uint32(0))) - blockSize = int(unsafe.Sizeof(*b)) - ) - return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) -} - -func (b *Block2) Unpack(data []byte, buf *UnpackBuffer) error { - var lid, offset uint32 - - b.IsLastLID = true - - buf.lids = buf.lids[:0] - buf.offsets = buf.offsets[:0] - buf.offsets = append(buf.offsets, 0) // first offset is always zero - - unpacker := packer.NewBytesUnpacker(data) - for unpacker.Len() > 0 { - delta, err := unpacker.GetVarint() - if err != nil { - return err - } - lid += uint32(delta) - - if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method - offset = uint32(len(buf.lids)) - buf.offsets = append(buf.offsets, offset) - lid -= uint32(delta) - continue - } - - buf.lids = append(buf.lids, lid) - } - - if int(offset) < len(buf.lids) { - b.IsLastLID = false - buf.offsets = append(buf.offsets, uint32(len(buf.lids))) - } - - // copy from buffer - b.LIDs = append([]uint32{}, buf.lids...) - b.Offsets = append([]uint32{}, buf.offsets...) - - return nil -} diff --git a/frac/sealed/lids/block_test.go b/frac/sealed/lids/block_test.go index 27d49a70..8d172acc 100644 --- a/frac/sealed/lids/block_test.go +++ b/frac/sealed/lids/block_test.go @@ -1,73 +1,63 @@ package lids import ( - "fmt" "math" + "math/rand" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" ) -func TestBlock_Pack_Unpack(t *testing.T) { +func TestBlockPack(t *testing.T) { testCases := []struct { name string lids []uint32 offsets []uint32 isLastLID bool - setup func() ([]uint32, []uint32) + generator func() ([]uint32, []uint32) }{ { - name: "SingleToken", - lids: []uint32{100, 150, 200, 250}, + name: "small_single_token", + lids: generate(4), offsets: []uint32{0, 4}, isLastLID: true, }, { - name: "MultipleTokens", - lids: []uint32{100, 150, 200, 250, 300, 350}, + name: "small_a_few_token", + lids: generate(6), offsets: []uint32{0, 3, 6}, isLastLID: true, }, { - name: "NotLastLID", - lids: []uint32{100, 150, 200}, + name: "small_not_last_lid", + lids: generate(3), offsets: []uint32{0, 3}, isLastLID: false, }, { - name: "SingleLID", + name: "small_single_lid", lids: []uint32{100}, offsets: []uint32{0, 1}, isLastLID: true, }, { - name: "ConsecutiveLIDs", - lids: func() []uint32 { - lids := make([]uint32, 50) - for i := range lids { - lids[i] = uint32(1000 + i) - } - return lids - }(), - offsets: []uint32{0, 50}, - isLastLID: true, - }, - { - name: "LargeLIDs", + name: "small_big_lids", lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10}, offsets: []uint32{0, 3}, isLastLID: true, }, { - name: "MultipleTokens_IsLastLID_False", - lids: []uint32{100, 150, 200, 250, 300, 350, 400, 450}, + name: "small_few_tokens", + lids: generate(8), offsets: []uint32{0, 3, 6, 8}, isLastLID: false, }, { - name: "ManyTokens", - setup: func() ([]uint32, []uint32) { + name: "medium_many_tokens", + generator: func() ([]uint32, []uint32) { lids := make([]uint32, 0) offsets := []uint32{0} startLID := uint32(100) @@ -83,20 +73,8 @@ func TestBlock_Pack_Unpack(t *testing.T) { isLastLID: true, }, { - name: "LargeBlock", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 0, 200) - startLID := uint32(1000) - for i := 0; i < 200; i++ { - lids = append(lids, startLID+uint32(i*10)) - } - return lids, []uint32{0, uint32(len(lids))} - }, - isLastLID: true, - }, - { - name: "LargeBlock_IsLastLID_False", - setup: func() ([]uint32, []uint32) { + name: "large_is_last_lid_false", + generator: func() ([]uint32, []uint32) { lids := make([]uint32, 0, 200) startLID := uint32(1000) for i := 0; i < 200; i++ { @@ -107,150 +85,73 @@ func TestBlock_Pack_Unpack(t *testing.T) { isLastLID: false, }, { - name: "LargeBlockWithMultipleTokens", - setup: func() ([]uint32, []uint32) { + name: "large_many_tokens", + generator: func() ([]uint32, []uint32) { lids := make([]uint32, 0, 150) offsets := []uint32{0} - startLID := uint32(1000) groupSize := 30 for group := 0; group < 5; group++ { for i := 0; i < groupSize; i++ { - lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) + lids = append(lids, 1+uint32(group*groupSize*10+i*10)) } offsets = append(offsets, uint32(len(lids))) - startLID += uint32(groupSize * 10) } return lids, offsets }, isLastLID: true, }, { - name: "LargeBlockWithMultipleTokens_IsLastLID_False", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 0, 150) - offsets := []uint32{0} - startLID := uint32(1000) - groupSize := 30 - for group := 0; group < 5; group++ { - for i := 0; i < groupSize; i++ { - lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) - } - offsets = append(offsets, uint32(len(lids))) - startLID += uint32(groupSize * 10) - } - return lids, offsets - }, - isLastLID: false, - }, - { - name: "Exactly128LIDs", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 128) - startLID := uint32(1000) - for i := 0; i < 128; i++ { - lids[i] = startLID + uint32(i*5) - } - return lids, []uint32{0, 128} - }, + name: "medium_128_lids", + lids: generate(128), + offsets: []uint32{0, 128}, isLastLID: true, }, { - name: "Exactly128LIDs_IsLastLID_False", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 128) - startLID := uint32(1000) - for i := 0; i < 128; i++ { - lids[i] = startLID + uint32(i*5) - } - return lids, []uint32{0, 128} - }, - isLastLID: false, - }, - { - name: "127LIDs", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 127) - startLID := uint32(1000) - for i := 0; i < 127; i++ { - lids[i] = startLID + uint32(i*5) - } - return lids, []uint32{0, 127} - }, + name: "medium_127_lids", + lids: generate(127), + offsets: []uint32{0, 127}, isLastLID: true, }, { - name: "129LIDs", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 129) - startLID := uint32(1000) - for i := 0; i < 129; i++ { - lids[i] = startLID + uint32(i*5) - } - return lids, []uint32{0, 129} - }, + name: "medium_129_lids", + lids: generate(129), + offsets: []uint32{0, 129}, isLastLID: true, }, { - name: "129LIDs_IsLastLID_False", - setup: func() ([]uint32, []uint32) { - lids := make([]uint32, 129) - startLID := uint32(1000) - for i := 0; i < 129; i++ { - lids[i] = startLID + uint32(i*5) - } - return lids, []uint32{0, 129} - }, - isLastLID: false, + name: "medium_4k_lids", + lids: generate(4096), + offsets: []uint32{0, 4096}, + isLastLID: true, }, { - name: "64k_65536_LIDs", - setup: func() ([]uint32, []uint32) { - size := 65536 - lids := make([]uint32, size) - startLID := uint32(1000) - for i := 0; i < size; i++ { - lids[i] = startLID + uint32(i) - } - return lids, []uint32{0, uint32(size)} - }, - isLastLID: false, + name: "medium_4k_minus_one_lids", + lids: generate(4095), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4095}, + isLastLID: true, }, { - name: "64k_65539_LIDs", - setup: func() ([]uint32, []uint32) { - size := 65539 - lids := make([]uint32, size) - startLID := uint32(1000) - for i := 0; i < size; i++ { - lids[i] = startLID + uint32(i) - } - return lids, []uint32{0, uint32(size)} - }, - isLastLID: false, + name: "medium_4k_plus_one_lids", + lids: generate(4097), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4097}, + isLastLID: true, }, { - name: "64k_65533_LIDs", - setup: func() ([]uint32, []uint32) { - size := 65533 - lids := make([]uint32, size) - startLID := uint32(1000) - for i := 0; i < size; i++ { - lids[i] = startLID + uint32(i) - } - return lids, []uint32{0, uint32(size)} - }, + name: "medium_64k_lids", + lids: generate(65536), + offsets: []uint32{0, 65536}, isLastLID: false, }, { - name: "IsLastLID_True", - lids: []uint32{100, 150, 200, 250, 300}, - offsets: []uint32{0, 5}, + name: "medium_64k_minus_one_lids", + lids: generate(65535), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65535}, isLastLID: true, }, { - name: "IsLastLID_False", - lids: []uint32{100, 150, 200, 250, 300}, - offsets: []uint32{0, 5}, + name: "medium_64k_plus_one_lids", + lids: generate(65537), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65537}, isLastLID: false, }, } @@ -260,8 +161,8 @@ func TestBlock_Pack_Unpack(t *testing.T) { var lids []uint32 var offsets []uint32 - if tc.setup != nil { - lids, offsets = tc.setup() + if tc.generator != nil { + lids, offsets = tc.generator() } else { lids = tc.lids offsets = tc.offsets @@ -273,140 +174,30 @@ func TestBlock_Pack_Unpack(t *testing.T) { IsLastLID: tc.isLastLID, } - packed := block.Pack(nil) - require.NotEmpty(t, packed, "packed data should not be empty") + packed := block.Pack(nil, nil) + require.NotEmpty(t, packed) unpacked := &Block{} buf := &UnpackBuffer{} - err := unpacked.Unpack(packed, buf) - require.NoError(t, err, "unpack should succeed") + err := unpacked.Unpack(packed, config.CurrentFracVersion, buf) - assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") - assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") - assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") + require.NoError(t, err) + assert.EqualExportedValues(t, block, unpacked) }) } } -func TestBlock_Pack_4k(t *testing.T) { - lids := make([]uint32, 4*1024) - startLID := uint32(1000) - for i := 0; i < 4*1024; i++ { - lids[i] = startLID + uint32(i) - } - - block := &Block{ - LIDs: lids, - Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 4 * 1024}, - IsLastLID: true, - } - - packed := block.Pack(nil) - fmt.Println("packed len: ", len(packed)) - require.NotEmpty(t, packed, "packed data should not be empty") - - unpacked := &Block{} - buf := &UnpackBuffer{} - err := unpacked.Unpack(packed, buf) - require.NoError(t, err, "unpack should succeed") - - assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") - assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") - assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") -} - -func TestBlock_Pack_4k_Dense(t *testing.T) { - lids := make([]uint32, 4*1024) - startLID := uint32(1000) - for i := 0; i < 4*1024; i++ { - lids[i] = startLID + uint32(i) - } - offsets := make([]uint32, 2*1024) - for i := 0; i < 2*1024; i++ { - offsets[i] = uint32(i) - } - offsets = append(offsets, 4*1024) - - block := &Block{ - LIDs: lids, - Offsets: offsets, - IsLastLID: true, - } - - packed := block.Pack(nil) - fmt.Println("packed len: ", len(packed)) - require.NotEmpty(t, packed, "packed data should not be empty") - - unpacked := &Block{} - buf := &UnpackBuffer{} - err := unpacked.Unpack(packed, buf) - require.NoError(t, err, "unpack should succeed") - - assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") - assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") - assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") -} - -func TestBlock_Pack_64k(t *testing.T) { - lids := make([]uint32, 64*1024) - startLID := uint32(1000) - for i := 0; i < 64*1024; i++ { - lids[i] = startLID + uint32(i) - } - - block := &Block{ - LIDs: lids, - Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 64 * 1024}, - IsLastLID: true, - } - - packed := block.Pack(nil) - fmt.Println("packed len: ", len(packed)) - require.NotEmpty(t, packed, "packed data should not be empty") - - unpacked := &Block{} - buf := &UnpackBuffer{} - err := unpacked.Unpack(packed, buf) - require.NoError(t, err, "unpack should succeed") - - assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") - assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") - assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") -} - -func TestBlock_Pack_64k_Dense(t *testing.T) { - lids := make([]uint32, 64*1024) - startLID := uint32(1000) - for i := 0; i < 64*1024; i++ { - lids[i] = startLID + uint32(i) +func generate(n int) []uint32 { + v := make([]uint32, n) + last := uint32(100) + for i := range v { + v[i] = last + last += uint32(1 + rand.Intn(5)) } - offsets := make([]uint32, 32*1024) - for i := 0; i < 32*1024; i++ { - offsets[i] = uint32(i) - } - offsets = append(offsets, 64*1024) - - block := &Block{ - LIDs: lids, - Offsets: offsets, - IsLastLID: true, - } - - packed := block.Pack(nil) - fmt.Println("packed len: ", len(packed)) - require.NotEmpty(t, packed, "packed data should not be empty") - - unpacked := &Block{} - buf := &UnpackBuffer{} - err := unpacked.Unpack(packed, buf) - require.NoError(t, err, "unpack should succeed") - - assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") - assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") - assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") + return v } -func TestBlock_Pack_Unpack_ReuseBuffer(t *testing.T) { +func TestBlockPack_ReuseBuffer(t *testing.T) { // Test that UnpackBuffer can be reused block1 := &Block{ LIDs: []uint32{100, 150, 200}, @@ -420,61 +211,52 @@ func TestBlock_Pack_Unpack_ReuseBuffer(t *testing.T) { IsLastLID: true, } - packed1 := block1.Pack(nil) - packed2 := block2.Pack(nil) + packed1 := block1.Pack(nil, nil) + packed2 := block2.Pack(nil, nil) buf := &UnpackBuffer{} unpacked1 := &Block{} - err := unpacked1.Unpack(packed1, buf) + err := unpacked1.Unpack(packed1, config.CurrentFracVersion, buf) require.NoError(t, err) assert.Equal(t, block1.LIDs, unpacked1.LIDs) - // Reuse the same buffer unpacked2 := &Block{} - err = unpacked2.Unpack(packed2, buf) + err = unpacked2.Unpack(packed2, config.CurrentFracVersion, buf) require.NoError(t, err) assert.Equal(t, block2.LIDs, unpacked2.LIDs) } func BenchmarkBlock_Pack(b *testing.B) { - lids := make([]uint32, 200) - startLID := uint32(1000) - for i := 0; i < 200; i++ { - lids[i] = startLID + uint32(i*10) - } + lids := generate(64 * 1024) block := &Block{ LIDs: lids, - Offsets: []uint32{0, uint32(len(lids))}, + Offsets: []uint32{0, 64 * 1024}, IsLastLID: true, } + tmp := make([]uint32, 0, 64*1024/4) - b.ResetTimer() - for i := 0; i < b.N; i++ { - block.Pack(nil) + for b.Loop() { + block.Pack(nil, tmp) } } func BenchmarkBlock_Unpack(b *testing.B) { - lids := make([]uint32, 4*1024) - startLID := uint32(1000) - for i := 0; i < 4*1024; i++ { - lids[i] = startLID + uint32(i*10) - } + lids := generate(64 * 1024) block := &Block{ LIDs: lids, - Offsets: []uint32{0, 100, 500, 600, 800, 1000, 1100, 1250, 1500, 2000, 2500, 3000, 3500, 4 * 1024}, + Offsets: []uint32{0, 64 * 1024}, IsLastLID: true, } + packed := block.Pack(nil, nil) - packed := block.Pack(nil) buf := &UnpackBuffer{} unpacked := &Block{} b.ResetTimer() - for i := 0; i < b.N; i++ { - unpacked.Unpack(packed, buf) + for b.Loop() { + unpacked.Unpack(packed, config.CurrentFracVersion, buf) } } diff --git a/frac/sealed/lids/loader.go b/frac/sealed/lids/loader.go index 435974a3..818c9422 100644 --- a/frac/sealed/lids/loader.go +++ b/frac/sealed/lids/loader.go @@ -2,6 +2,8 @@ package lids import ( "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/storage" ) @@ -24,10 +26,14 @@ func (b *UnpackBuffer) Reset() { b.offsets = b.offsets[:0] } if b.decompressed == nil { - b.decompressed = make([]uint32, 0, 1024) + b.decompressed = make([]uint32, 0, consts.LIDBlockCap) + } else { + b.decompressed = b.decompressed[:0] } if b.compressed == nil { - b.compressed = make([]uint32, 0, 256) + b.compressed = make([]uint32, 0, consts.LIDBlockCap/2) + } else { + b.compressed = b.compressed[:0] } } @@ -39,13 +45,15 @@ type Loader struct { reader *storage.IndexReader unpackBuf *UnpackBuffer blockBuf []byte + fracVer config.BinaryDataVersion } -func NewLoader(r *storage.IndexReader, c *cache.Cache[*Block]) *Loader { +func NewLoader(fracVer config.BinaryDataVersion, r *storage.IndexReader, c *cache.Cache[*Block]) *Loader { return &Loader{ cache: c, reader: r, unpackBuf: &UnpackBuffer{}, + fracVer: fracVer, } } @@ -68,7 +76,7 @@ func (l *Loader) readLIDsBlock(blockIndex uint32) (*Block, error) { } block := &Block{} - err = block.Unpack(l.blockBuf, l.unpackBuf) + err = block.Unpack(l.blockBuf, l.fracVer, l.unpackBuf) if err != nil { return nil, err } diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 491c7233..0395a69e 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -32,6 +32,8 @@ type IndexSealer struct { lastErr error // Last error encountered during processing buf1 []byte // Reusable buffer for packing raw data before compression buf2 []byte // Reusable buffer for compressed data + buf32 []uint32 // Reusable buffer for compressed arrays of uint32 + buf64 []uint64 // Reusable buffer for compressed arrays of uint64 params common.SealParams // Configuration parameters for sealing process // PreloadedData structures built during sealing for fast initialization of sealed fraction @@ -46,6 +48,8 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), + buf32: make([]uint32, 0, consts.LIDBlockCap/2), + buf64: make([]uint64, 0, consts.RegularBlockSize/2), } } @@ -388,8 +392,8 @@ func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { } s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData - // Packing block - s.buf1 = block.mids.Pack(s.buf1[:0]) + // Packing block (use reusable uint64 buffer for mid compression) + s.buf1 = block.mids.Pack(s.buf1[:0], s.buf64[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) // Store min MID and RID in extended metadata b.ext1 = uint64(minID.MID) @@ -426,7 +430,7 @@ func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) // Packing block - s.buf1 = block.payload.Pack(s.buf1[:0]) + s.buf1 = block.payload.Pack(s.buf1[:0], s.buf32[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) b.ext1 = ext1 // Legacy continuation flag b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index 8a38bbca..7853a3a1 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -3,12 +3,9 @@ package seqids import ( "encoding/binary" "errors" - "unsafe" - - "github.com/ronanh/intcomp" "github.com/ozontech/seq-db/config" - "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/seq" ) @@ -16,48 +13,21 @@ type BlockMIDs struct { Values []uint64 } -func (b BlockMIDs) Pack(dst []byte) []byte { - if len(b.Values) == consts.IDsPerBlock { - dst = binary.LittleEndian.AppendUint32(dst, 1) - - _, compressed := intcomp.CompressDeltaBinPackUint64(b.Values, nil) - dst = binary.LittleEndian.AppendUint32(dst, uint32(len(compressed))) - for _, val := range compressed { - dst = binary.LittleEndian.AppendUint64(dst, val) - } - } else { - dst = binary.LittleEndian.AppendUint32(dst, 0) - - var prev uint64 - for _, mid := range b.Values { - dst = binary.AppendVarint(dst, int64(mid-prev)) - prev = mid - } - } - return dst +func (b BlockMIDs) Pack(dst []byte, buf []uint64) []byte { + return packer.CompressDeltaBitpackUint64(dst, b.Values, buf) } -func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion, cache *unpackCache) error { - if fracVersion >= config.BinaryDataV3 { - fastPath := binary.LittleEndian.Uint32(data) - data = data[4:] - - if fastPath == 1 { - valuesCount := binary.LittleEndian.Uint32(data) - data = data[4:] - - // TODO this is unsafe, we rely that we running on little-endian host - cache.compressed = cache.compressed[:valuesCount] - byteLen := int(valuesCount) * 8 - src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), valuesCount) - copy(cache.compressed, src) - - _, b.Values = intcomp.UncompressDeltaBinPackUint64(cache.compressed, b.Values) - return nil +func (b *BlockMIDs) Unpack(data []byte, fracVer config.BinaryDataVersion, cache *unpackCache) error { + if fracVer >= config.BinaryDataV3 { + _, values, err := packer.DecompressDeltaBitpackUint64(data, cache.compressed, cache.values) + if err != nil { + return err } + b.Values = append(b.Values[:0], values...) + return nil } - values, err := unpackRawMIDsVarint(data, b.Values, fracVersion) + values, err := unpackRawMIDsVarint(data, b.Values, fracVer) if err != nil { return err } @@ -114,7 +84,7 @@ func (b *BlockParams) Unpack(data []byte) error { // unpackRawMIDsVarint is a dedicated method for unpacking delta encoded MIDs. The reason a dedicated method exists // is that we want to unpack values and potentially convert legacy frac version in one pass. -func unpackRawMIDsVarint(src []byte, dst []uint64, fracVersion config.BinaryDataVersion) ([]uint64, error) { +func unpackRawMIDsVarint(src []byte, dst []uint64, fracVer config.BinaryDataVersion) ([]uint64, error) { dst = dst[:0] id := uint64(0) for len(src) != 0 { @@ -129,7 +99,7 @@ func unpackRawMIDsVarint(src []byte, dst []uint64, fracVersion config.BinaryData } id += uint64(delta) - if fracVersion >= config.BinaryDataV2 { + if fracVer >= config.BinaryDataV2 { dst = append(dst, id) } else { // Legacy format - scale millis to nanos diff --git a/frac/sealed/seqids/blocks_test.go b/frac/sealed/seqids/blocks_test.go index 98f1a668..6665d01c 100644 --- a/frac/sealed/seqids/blocks_test.go +++ b/frac/sealed/seqids/blocks_test.go @@ -9,41 +9,26 @@ import ( "github.com/ozontech/seq-db/consts" ) -func TestBlockMIDs_Pack_Unpack(t *testing.T) { +func TestBlockMIDs_Pack(t *testing.T) { tests := []struct { - name string - values []uint64 - fracVersion config.BinaryDataVersion + name string + values []uint64 }{ { - name: "SlowPath_SmallBlock", - values: []uint64{100, 200, 300, 400, 500}, - fracVersion: config.BinaryDataV3, + name: "small_single", + values: []uint64{12345678901234}, }, { - name: "SlowPath_EmptyBlock", - values: []uint64{}, - fracVersion: config.BinaryDataV3, + name: "small_few", + values: []uint64{100, 200, 300, 400, 500}, }, { - name: "SlowPath_SingleValue", - values: []uint64{12345678901234}, - fracVersion: config.BinaryDataV3, + name: "small_4k", + values: generate(1000000000000, 1000000), }, { - name: "FastPath_4kBlock", - values: generate4kMIDs(1000000000000, 1000000), - fracVersion: config.BinaryDataV3, - }, - { - name: "FastPath_4kBlock_LargeValues", - values: generate4kMIDs(0xFFFFFFFF00000000, 1), - fracVersion: config.BinaryDataV3, - }, - { - name: "FastPath_4kBlock_Sequential", - values: generate4kMIDs(0, 1), - fracVersion: config.BinaryDataV3, + name: "small_4k_large_values", + values: generate(0xFFFFFFFF00000000, 1), }, } @@ -51,25 +36,21 @@ func TestBlockMIDs_Pack_Unpack(t *testing.T) { t.Run(tt.name, func(t *testing.T) { original := BlockMIDs{Values: tt.values} - packed := original.Pack(nil) - t.Logf("packed len: %d", len(packed)) + packed := original.Pack(nil, nil) cache := NewCache() defer cache.Release() unpacked := BlockMIDs{} - err := unpacked.Unpack(packed, tt.fracVersion, cache) - require.NoError(t, err) + err := unpacked.Unpack(packed, config.CurrentFracVersion, cache) - require.Equal(t, len(original.Values), len(unpacked.Values), "length mismatch") - for i := range original.Values { - require.Equal(t, original.Values[i], unpacked.Values[i], "value mismatch at index %d", i) - } + require.NoError(t, err) + require.EqualExportedValues(t, unpacked, original) }) } } -func generate4kMIDs(base uint64, increment uint64) []uint64 { +func generate(base uint64, increment uint64) []uint64 { values := make([]uint64, consts.IDsPerBlock) for i := range values { values[i] = base + uint64(i)*increment diff --git a/frac/sealed/seqids/unpack_cache.go b/frac/sealed/seqids/unpack_cache.go index 443d8886..e900bd8b 100644 --- a/frac/sealed/seqids/unpack_cache.go +++ b/frac/sealed/seqids/unpack_cache.go @@ -27,7 +27,7 @@ func NewCache() *unpackCache { blockIndex: -1, startLID: 0, values: make([]uint64, 0, defaultValsCapacity), - compressed: make([]uint64, 0, defaultValsCapacity), + compressed: make([]uint64, 0, defaultValsCapacity/2), } } diff --git a/packer/bitpack_packer.go b/packer/bitpack_packer.go deleted file mode 100644 index ad611dce..00000000 --- a/packer/bitpack_packer.go +++ /dev/null @@ -1,232 +0,0 @@ -package packer - -import ( - "encoding/binary" - "unsafe" - - "github.com/ronanh/intcomp" -) - -// TODO the whole file looks pretty unreadable - -type Bitpacker struct { - dst []byte // output buffer - // TODO need to remove this chunk, use slice from input slice instead - chunk []uint32 // temporary chunk to feed to intcomp library - compressedChunk []uint32 // temporary chunk to which intcomp will write compressed data - chunkSize int -} - -func NewBitpacker(dst []byte, chunkSize int) *Bitpacker { - if chunkSize <= 0 { - chunkSize = 1024 // default chunk size - } - return &Bitpacker{ - dst: dst, - chunk: make([]uint32, 0, chunkSize), - compressedChunk: make([]uint32, 0, chunkSize/4), - chunkSize: chunkSize, - } -} - -// Append adds values to the current chunk. When the chunk is full, it compresses -// and writes it to dst. -func (b *Bitpacker) Append(values []uint32) { - // TODO this is slow af - // TODO copy values to chunk fully if values slice is large - for _, val := range values { - b.chunk = append(b.chunk, val) - - if len(b.chunk) == b.chunkSize { - b.compressChunk() - } - } -} - -// TODO use intcomp directly for that -func (b *Bitpacker) Append4kBlock(values []uint32) []byte { - _, compressed := intcomp.CompressDeltaBinPackUint32(values, nil) - b.dst = binary.AppendVarint(b.dst, int64(len(compressed))) - - // TODO memcpy (arrow?) - for _, val := range compressed { - b.dst = binary.LittleEndian.AppendUint32(b.dst, val) - } - return b.dst -} - -func (b *Bitpacker) compressChunk() { - if len(b.chunk) == 0 { - return - } - - _, b.compressedChunk = intcomp.CompressDeltaBinPackUint32(b.chunk, b.compressedChunk) - b.dst = binary.AppendVarint(b.dst, int64(len(b.compressedChunk))) - - for _, val := range b.compressedChunk { - b.dst = binary.LittleEndian.AppendUint32(b.dst, val) - } - - b.chunk = b.chunk[:0] - b.compressedChunk = b.compressedChunk[:0] -} - -// Close writes a residual (less than 128 number) as varints -func (b *Bitpacker) Close() []byte { - // append 0 - an indicator of the last chunk with varints - b.dst = binary.AppendVarint(b.dst, 0) - - // append number of varints first - b.dst = binary.AppendVarint(b.dst, int64(len(b.chunk))) - - for _, lid := range b.chunk { - b.dst = binary.AppendVarint(b.dst, int64(lid)) - } - - // TODO remove? - // append the trailer - b.dst = binary.AppendVarint(b.dst, -1) - return b.dst -} - -// TODO use golang iter? -// ChunkIterator allows to iterate of chunks of fixed number (usually 128). The last chunk might be less -// than 128 numbers -type ChunkIterator interface { - NextChunk() ([]uint32, bool) -} - -type BitpackUnpacker struct { - unpacker *BytesUnpacker - - // TODO bad design: a caller must provided dst buffer on Next() call, we shall not own it - decompressedBuf []uint32 // descompressed buf, used to return in iterator - compressedBuf []uint32 // temporary buf, intcomp works on top of []uint32 slices - done bool -} - -var _ ChunkIterator = (*BitpackUnpacker)(nil) - -func isLittleEndian() bool { - // TODO check binary.NativeEndian == littleEndian? - return true -} - -func NewBitpackUnpacker(unpacker *BytesUnpacker, decompressedBuf []uint32, compressedBuf []uint32) *BitpackUnpacker { - return &BitpackUnpacker{ - unpacker: unpacker, - decompressedBuf: decompressedBuf, - compressedBuf: compressedBuf, - } -} - -// TODO use directly intcomp for that -func (b *BitpackUnpacker) AllocateAndRead4kChunk() []uint32 { - blockLen, err := b.unpacker.GetVarint() - if err != nil { - return nil - } - - b.compressedBuf = b.compressedBuf[:0] - b.readUint32Block(blockLen) - - _, decompressed := intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, nil) - return decompressed -} - -// NextChunk returns the next decompressed chunk of values. -// Returns nil, false when all chunks have been read. -func (b *BitpackUnpacker) NextChunk() ([]uint32, bool) { - if b.done || b.unpacker.Len() == 0 { - return nil, false - } - - // read the start of the chunk. - // TODO it's a space overhead - blockLen, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - - if blockLen == 0 { - return b.readVarintChunk() - } - if blockLen < 0 { - return nil, false - } - - b.compressedBuf = b.compressedBuf[:0] - b.readUint32Block(blockLen) - - b.decompressedBuf = b.decompressedBuf[:0] - _, b.decompressedBuf = intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, b.decompressedBuf) - return b.decompressedBuf, true -} - -// readVarintChunk reads the varint chunk (there is at most one at the end) -func (b *BitpackUnpacker) readVarintChunk() ([]uint32, bool) { - if b.done { - return nil, false - } - - varintCount, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - - if varintCount == 0 { - sentinel, err := b.unpacker.GetVarint() - if err != nil || sentinel != -1 { - return nil, false - } - b.done = true - return nil, false - } - - chunk := b.decompressedBuf[0:int(varintCount)] - for i := int64(0); i < varintCount; i++ { - val, err := b.unpacker.GetVarint() - if err != nil { - return nil, false - } - chunk[i] = uint32(val) - } - - sentinel, err := b.unpacker.GetVarint() - if err != nil || sentinel != -1 { - return nil, false - } - - b.done = true - return chunk, true -} - -// readUint32Block reads the next blockLen bytes of data into compressedBuf - a []uint32 buffer -func (b *BitpackUnpacker) readUint32Block(blockLen int64) { - if isLittleEndian() { - // TODO use apache arrow or some other lib to avoid this shuit? - if cap(b.compressedBuf) < int(blockLen) { - b.compressedBuf = make([]uint32, blockLen) - } else { - b.compressedBuf = b.compressedBuf[:blockLen] - } - - byteCount := int(blockLen) * 4 - buf := b.unpacker.GetBuffer() - if len(buf) < byteCount { - b.compressedBuf = b.compressedBuf[:0] - for i := int64(0); i < blockLen; i++ { - b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) - } - } else { - src := unsafe.Slice((*uint32)(unsafe.Pointer(unsafe.SliceData(buf[:byteCount]))), blockLen) - copy(b.compressedBuf, src) - b.unpacker.SkipUints32(int(blockLen)) - } - } else { - // slow path, unpack with binary.LittleEndian.Uint32 - for i := int64(0); i < blockLen; i++ { - b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) - } - } -} diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go new file mode 100644 index 00000000..9ab4cbb9 --- /dev/null +++ b/packer/delta_bitpacker.go @@ -0,0 +1,167 @@ +package packer + +import ( + "encoding/binary" + "errors" + "unsafe" + + "github.com/ronanh/intcomp" +) + +func CompressDeltaBitpackUint32(dst []byte, values []uint32, buf []uint32) []byte { + buf = buf[:0] + buf = append(buf, uint32(len(values))) + + var residual []uint32 + residual, buf = intcomp.CompressDeltaBinPackUint32(values, buf) + + // TODO comment here what residual is + if len(residual) > 0 { + buf = append(buf, residual...) + } + + wordCount := uint32(len(buf)) + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, wordCount) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint32(dst, v) + } + return dst +} + +// TODO simplify +func DecompressDeltaBitpackUint32(data []byte, compressed, decompressed []uint32) ([]byte, []uint32, error) { + if len(data) < 4 { + return nil, nil, errors.New("lids block decode error: truncated sequence length header") + } + + wordCount := binary.LittleEndian.Uint32(data[:4]) + data = data[4:] + + byteLen := int(wordCount) * 4 + if len(data) < byteLen { + return nil, nil, errors.New("lids block decode error: truncated sequence payload") + } + + u32Count := int(wordCount) + if cap(compressed) < u32Count { + compressed = make([]uint32, u32Count) + } else { + compressed = compressed[:u32Count] + } + // TODO manual copy + for i := 0; i < u32Count; i++ { + compressed[i] = binary.LittleEndian.Uint32(data[i*4 : i*4+4]) + } + data = data[byteLen:] + + if len(compressed) == 0 { + return data, nil, nil + } + total := int(compressed[0]) + if total == 0 { + return data, nil, nil + } + + payload := compressed[1:] + + decompressed = decompressed[:0] + switch { + case total < intcomp.BitPackingBlockSize32: + if len(payload) < total { + return nil, nil, errors.New("lids block decode error: residual payload truncated") + } + decompressed = append(decompressed, payload[:total]...) + default: + remaining, out := intcomp.UncompressDeltaBinPackUint32(payload, decompressed) + decompressed = out + if len(remaining) > 0 { + decompressed = append(decompressed, remaining...) + } + if len(decompressed) < total { + return nil, nil, errors.New("lids block decode error: decompressed length mismatch") + } + decompressed = decompressed[:total] + } + + return data, decompressed[:total], nil +} + +func CompressDeltaBitpackUint64(dst []byte, values []uint64, buf []uint64) []byte { + buf = buf[:0] + total := len(values) + buf = append(buf, uint64(total)) + + var residual []uint64 + residual, buf = intcomp.CompressDeltaBinPackUint64(values, buf) + if len(residual) > 0 { + buf = append(buf, residual...) + } + + wordCount := uint32(len(buf)) + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, wordCount) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint64(dst, v) + } + return dst +} + +// TODO simplify +func DecompressDeltaBitpackUint64(data []byte, compressed, values []uint64) ([]byte, []uint64, error) { + if len(data) < 4 { + return nil, nil, errors.New("mids block decode error: truncated sequence length header") + } + + wordCount := binary.LittleEndian.Uint32(data[:4]) + data = data[4:] + + byteLen := int(wordCount) * 8 + if len(data) < byteLen { + return nil, nil, errors.New("mids block decode error: truncated sequence payload") + } + + u64Count := int(wordCount) + if cap(compressed) < u64Count { + compressed = make([]uint64, u64Count) + } else { + compressed = compressed[:u64Count] + } + // TODO we reinterpret []byte as []uint64 and them copy to compressed + // maybe we could get rid of compressed? + src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), u64Count) + copy(compressed, src) + data = data[byteLen:] + + if len(compressed) == 0 { + return data, nil, nil + } + total := int(compressed[0]) + if total == 0 { + return data, nil, nil + } + + payload := compressed[1:] + + values = values[:0] + var decompressed []uint64 + switch { + case total < intcomp.BitPackingBlockSize64: + if len(payload) < total { + return nil, nil, errors.New("mids block decode error: residual payload truncated") + } + decompressed = append(values[:0], payload[:total]...) + default: + remaining, out := intcomp.UncompressDeltaBinPackUint64(payload, values[:0]) + decompressed = out + if len(remaining) > 0 { + decompressed = append(decompressed, remaining...) + } + if len(decompressed) < total { + return nil, nil, errors.New("mids block decode error: decompressed length mismatch") + } + decompressed = decompressed[:total] + } + + return data, decompressed[:total], nil +} diff --git a/packer/delta_bitpacker_test.go b/packer/delta_bitpacker_test.go new file mode 100644 index 00000000..df1118fd --- /dev/null +++ b/packer/delta_bitpacker_test.go @@ -0,0 +1,150 @@ +package packer + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompressDeltaBitpackUint32(t *testing.T) { + testCases := []struct { + name string + values []uint32 + }{ + { + name: "empty", + values: []uint32{}, + }, + { + name: "small_single_value", + values: []uint32{1}, + }, + { + name: "small_few_values", + values: []uint32{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint32(127), + }, + { + name: "small_128", + values: generateUint32(128), + }, + { + name: "small_129", + values: generateUint32(129), + }, + { + name: "midium_4k", + values: generateUint32(4096), + }, + { + name: "midium_4k_more", + values: generateUint32(4105), + }, + { + name: "midium_64k", + values: generateUint32(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint32(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint32([]byte{}, tc.values, []uint32{}) + _, decompressed, err := DecompressDeltaBitpackUint32(compressed, []uint32{}, []uint32{}) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + +func TestCompressDeltaBitpackUint64(t *testing.T) { + testCases := []struct { + name string + values []uint64 + }{ + { + name: "empty", + values: []uint64{}, + }, + { + name: "small_single_value", + values: []uint64{1}, + }, + { + name: "small_few_values", + values: []uint64{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint64(127), + }, + { + name: "small_128", + values: generateUint64(128), + }, + { + name: "small_129", + values: generateUint64(129), + }, + { + name: "midium_4k", + values: generateUint64(4096), + }, + { + name: "midium_4k_more", + values: generateUint64(4105), + }, + { + name: "midium_64k", + values: generateUint64(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint64(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint64([]byte{}, tc.values, []uint64{}) + _, decompressed, err := DecompressDeltaBitpackUint64(compressed, []uint64{}, []uint64{}) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + +func generateUint32(n int) []uint32 { + v := make([]uint32, n) + last := uint32(100) + for i := range v { + v[i] = last + last += uint32(1 + rand.Intn(5)) + } + return v +} + +func generateUint64(n int) []uint64 { + v := make([]uint64, n) + last := uint64(100) + for i := range v { + v[i] = last + last += uint64(1 + rand.Intn(5)) + } + return v +} From f2f5e39cc3748094756c173e7d9522ac6478b346 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:47:00 +0300 Subject: [PATCH 4/9] fixes --- frac/sealed/lids/block.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index b5f3d2f5..8a29731d 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -1,8 +1,6 @@ package lids import ( - "encoding/binary" - "errors" "math" "unsafe" @@ -26,12 +24,10 @@ func (b *Block) getLIDs(i int) []uint32 { } func (b *Block) Pack(dst []byte, tmp []uint32) []byte { - // TODO store next flags into a single byte - // write b.IsLastLID as a dedicated uint32 in the header of block if b.IsLastLID { - dst = binary.LittleEndian.AppendUint32(dst, 1) + dst = append(dst, 1) } else { - dst = binary.LittleEndian.AppendUint32(dst, 0) + dst = append(dst, 0) } dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, tmp) @@ -59,13 +55,12 @@ func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *Unpac } func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error { - // read IsLastLID from a dedicated uint32 - if len(data) < 4 { - return errors.New("lids block decode error: truncated IsLastLID header") + if data[0] == 1 { + b.IsLastLID = true + } else { + b.IsLastLID = false } - isLastLIDValue := binary.LittleEndian.Uint32(data[:4]) - b.IsLastLID = isLastLIDValue == 1 - data = data[4:] + data = data[1:] var err error var values []uint32 From ae2bd9fca8122c50fcbd37509172cc37ccd5d645 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:14:23 +0300 Subject: [PATCH 5/9] naming --- frac/sealed/lids/block.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index 8a29731d..aff47483 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -23,15 +23,15 @@ func (b *Block) getLIDs(i int) []uint32 { return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] } -func (b *Block) Pack(dst []byte, tmp []uint32) []byte { +func (b *Block) Pack(dst []byte, buf []uint32) []byte { if b.IsLastLID { dst = append(dst, 1) } else { dst = append(dst, 0) } - dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, tmp) - dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, tmp) + dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, buf) + dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, buf) return dst } From 6d734b5320e59336b53e293c1864385b5d4284db Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:14:48 +0300 Subject: [PATCH 6/9] larger preallocated buf --- frac/sealed/sealing/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 0395a69e..ad698fcf 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -48,8 +48,8 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), - buf32: make([]uint32, 0, consts.LIDBlockCap/2), - buf64: make([]uint64, 0, consts.RegularBlockSize/2), + buf32: make([]uint32, 0, consts.LIDBlockCap), + buf64: make([]uint64, 0, consts.RegularBlockSize), } } From 29153ed9c89a6b3926531a6f05cfd2f1f9b34aa8 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:56:07 +0300 Subject: [PATCH 7/9] linter fixes, minor fixes --- frac/sealed/lids/block_test.go | 3 +- frac/sealed/seqids/blocks.go | 1 + frac/sealed/seqids/blocks_test.go | 2 +- frac/sealed/seqids/loader.go | 6 +- packer/delta_bitpacker.go | 138 +++++++++++++++++------------- 5 files changed, 85 insertions(+), 65 deletions(-) diff --git a/frac/sealed/lids/block_test.go b/frac/sealed/lids/block_test.go index 8d172acc..5985eca6 100644 --- a/frac/sealed/lids/block_test.go +++ b/frac/sealed/lids/block_test.go @@ -257,6 +257,7 @@ func BenchmarkBlock_Unpack(b *testing.B) { b.ResetTimer() for b.Loop() { - unpacked.Unpack(packed, config.CurrentFracVersion, buf) + err := unpacked.Unpack(packed, config.CurrentFracVersion, buf) + assert.NoError(b, err) } } diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index 7853a3a1..33967cd0 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -19,6 +19,7 @@ func (b BlockMIDs) Pack(dst []byte, buf []uint64) []byte { func (b *BlockMIDs) Unpack(data []byte, fracVer config.BinaryDataVersion, cache *unpackCache) error { if fracVer >= config.BinaryDataV3 { + // TODO use b.Values? _, values, err := packer.DecompressDeltaBitpackUint64(data, cache.compressed, cache.values) if err != nil { return err diff --git a/frac/sealed/seqids/blocks_test.go b/frac/sealed/seqids/blocks_test.go index 6665d01c..93fbaec7 100644 --- a/frac/sealed/seqids/blocks_test.go +++ b/frac/sealed/seqids/blocks_test.go @@ -50,7 +50,7 @@ func TestBlockMIDs_Pack(t *testing.T) { } } -func generate(base uint64, increment uint64) []uint64 { +func generate(base, increment uint64) []uint64 { values := make([]uint64, consts.IDsPerBlock) for i := range values { values[i] = base + uint64(i)*increment diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index 13283a91..defa865f 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -35,7 +35,7 @@ type Loader struct { fracVersion config.BinaryDataVersion } -func (l *Loader) GetMIDsBlock(index uint32, cache *unpackCache) (BlockMIDs, error) { +func (l *Loader) GetMIDsBlock(index uint32, unpackCache *unpackCache) (BlockMIDs, error) { // load binary from index data, err := l.cacheMIDs.GetWithError(index, func() ([]byte, int, error) { data, _, err := l.reader.ReadIndexBlock(l.midBlockIndex(index), nil) @@ -49,8 +49,8 @@ func (l *Loader) GetMIDsBlock(index uint32, cache *unpackCache) (BlockMIDs, erro return BlockMIDs{}, err } // unpack - block := BlockMIDs{Values: cache.values[:0]} - if err := block.Unpack(data, l.fracVersion, cache); err != nil { + block := BlockMIDs{Values: unpackCache.values[:0]} + if err := block.Unpack(data, l.fracVersion, unpackCache); err != nil { return BlockMIDs{}, err } return block, nil diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go index 9ab4cbb9..71079750 100644 --- a/packer/delta_bitpacker.go +++ b/packer/delta_bitpacker.go @@ -8,6 +8,9 @@ import ( "github.com/ronanh/intcomp" ) +// CompressDeltaBitpackUint32 works on top of intcomp library. intcomp can only compress slices which are multiple of 128, but +// this function supports slices of any length. Residual part is always less than 128 numbers and is not delta encoded, +// since we know the number of blocks with length non-multiple of 128 is very low. func CompressDeltaBitpackUint32(dst []byte, values []uint32, buf []uint32) []byte { buf = buf[:0] buf = append(buf, uint32(len(values))) @@ -15,16 +18,17 @@ func CompressDeltaBitpackUint32(dst []byte, values []uint32, buf []uint32) []byt var residual []uint32 residual, buf = intcomp.CompressDeltaBinPackUint32(values, buf) - // TODO comment here what residual is if len(residual) > 0 { + // append residual as is. always less than 128 values buf = append(buf, residual...) } - wordCount := uint32(len(buf)) - // TODO use memcpy - dst = binary.LittleEndian.AppendUint32(dst, wordCount) - for _, v := range buf { - dst = binary.LittleEndian.AppendUint32(dst, v) + { + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(buf))) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint32(dst, v) + } } return dst } @@ -43,51 +47,60 @@ func DecompressDeltaBitpackUint32(data []byte, compressed, decompressed []uint32 return nil, nil, errors.New("lids block decode error: truncated sequence payload") } - u32Count := int(wordCount) - if cap(compressed) < u32Count { - compressed = make([]uint32, u32Count) - } else { - compressed = compressed[:u32Count] - } - // TODO manual copy - for i := 0; i < u32Count; i++ { - compressed[i] = binary.LittleEndian.Uint32(data[i*4 : i*4+4]) - } - data = data[byteLen:] + // TODO if little endian => reinterpret cast data []byte as []uint32 + // otherwise, allocate slice + // remove compressed param + { + u32Count := int(wordCount) + if cap(compressed) < u32Count { + compressed = make([]uint32, u32Count) + } else { + compressed = compressed[:u32Count] + } + // TODO manual copy + for i := 0; i < u32Count; i++ { + compressed[i] = binary.LittleEndian.Uint32(data[i*4 : i*4+4]) + } + data = data[byteLen:] - if len(compressed) == 0 { - return data, nil, nil + if len(compressed) == 0 { + return data, nil, nil + } } - total := int(compressed[0]) - if total == 0 { + + count := int(compressed[0]) + if count == 0 { return data, nil, nil } - payload := compressed[1:] + compressed = compressed[1:] decompressed = decompressed[:0] switch { - case total < intcomp.BitPackingBlockSize32: - if len(payload) < total { + case count < intcomp.BitPackingBlockSize32: + if len(compressed) < count { return nil, nil, errors.New("lids block decode error: residual payload truncated") } - decompressed = append(decompressed, payload[:total]...) + decompressed = append(decompressed, compressed[:count]...) default: - remaining, out := intcomp.UncompressDeltaBinPackUint32(payload, decompressed) + remaining, out := intcomp.UncompressDeltaBinPackUint32(compressed, decompressed) decompressed = out if len(remaining) > 0 { decompressed = append(decompressed, remaining...) } - if len(decompressed) < total { + if len(decompressed) < count { return nil, nil, errors.New("lids block decode error: decompressed length mismatch") } - decompressed = decompressed[:total] + decompressed = decompressed[:count] } - return data, decompressed[:total], nil + return data, decompressed[:count], nil } -func CompressDeltaBitpackUint64(dst []byte, values []uint64, buf []uint64) []byte { +// CompressDeltaBitpackUint64 works on top of intcomp library. intcomp can only compress uint64 slices which are multiple of 256, but +// this function supports slices of any length. Residual part is always less than 256 uint64 numbers and is not delta encoded, +// since we know the number of blocks with length non-multiple of 256 is very low. +func CompressDeltaBitpackUint64(dst []byte, values, buf []uint64) []byte { buf = buf[:0] total := len(values) buf = append(buf, uint64(total)) @@ -95,14 +108,16 @@ func CompressDeltaBitpackUint64(dst []byte, values []uint64, buf []uint64) []byt var residual []uint64 residual, buf = intcomp.CompressDeltaBinPackUint64(values, buf) if len(residual) > 0 { + // append residual as is. always less than 256 values buf = append(buf, residual...) } - wordCount := uint32(len(buf)) - // TODO use memcpy - dst = binary.LittleEndian.AppendUint32(dst, wordCount) - for _, v := range buf { - dst = binary.LittleEndian.AppendUint64(dst, v) + { + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(buf))) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint64(dst, v) + } } return dst } @@ -121,47 +136,50 @@ func DecompressDeltaBitpackUint64(data []byte, compressed, values []uint64) ([]b return nil, nil, errors.New("mids block decode error: truncated sequence payload") } - u64Count := int(wordCount) - if cap(compressed) < u64Count { - compressed = make([]uint64, u64Count) - } else { - compressed = compressed[:u64Count] - } - // TODO we reinterpret []byte as []uint64 and them copy to compressed - // maybe we could get rid of compressed? - src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), u64Count) - copy(compressed, src) - data = data[byteLen:] + // TODO if little endian => reinterpret cast data []byte as []uint64 + // otherwise, allocate slice + // remove compressed param + { + u64Count := int(wordCount) + if cap(compressed) < u64Count { + compressed = make([]uint64, u64Count) + } else { + compressed = compressed[:u64Count] + } + src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), u64Count) + copy(compressed, src) + data = data[byteLen:] - if len(compressed) == 0 { - return data, nil, nil + if len(compressed) == 0 { + return data, nil, nil + } } - total := int(compressed[0]) - if total == 0 { + + count := int(compressed[0]) + if count == 0 { return data, nil, nil } - payload := compressed[1:] - - values = values[:0] + compressed = compressed[1:] var decompressed []uint64 switch { - case total < intcomp.BitPackingBlockSize64: - if len(payload) < total { + case count < intcomp.BitPackingBlockSize64: + if len(compressed) < count { return nil, nil, errors.New("mids block decode error: residual payload truncated") } - decompressed = append(values[:0], payload[:total]...) + decompressed = append([]uint64{}, compressed[:count]...) default: - remaining, out := intcomp.UncompressDeltaBinPackUint64(payload, values[:0]) + values = values[:0] + remaining, out := intcomp.UncompressDeltaBinPackUint64(compressed, values[:0]) decompressed = out if len(remaining) > 0 { decompressed = append(decompressed, remaining...) } - if len(decompressed) < total { + if len(decompressed) < count { return nil, nil, errors.New("mids block decode error: decompressed length mismatch") } - decompressed = decompressed[:total] + decompressed = decompressed[:count] } - return data, decompressed[:total], nil + return data, decompressed[:count], nil } From 0ce64a042d22f51f554266bd31db9175b3b5ce9e Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:57:08 +0300 Subject: [PATCH 8/9] tidy mods --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 0a45d8e0..df45cfd2 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/pkg/profile v1.7.0 github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 github.com/prometheus/client_golang v1.23.2 + github.com/ronanh/intcomp v1.1.1 github.com/stretchr/testify v1.11.1 github.com/valyala/fastrand v1.1.0 github.com/valyala/gozstd v1.24.0 @@ -74,7 +75,6 @@ require ( github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect - github.com/ronanh/intcomp v1.1.1 // indirect github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d // indirect From 4f795f8f85e9ea2c9409f5a0cb5187668934c3c4 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:59:37 +0300 Subject: [PATCH 9/9] tidy mods --- packer/delta_bitpacker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go index 71079750..3e90f0cf 100644 --- a/packer/delta_bitpacker.go +++ b/packer/delta_bitpacker.go @@ -11,7 +11,7 @@ import ( // CompressDeltaBitpackUint32 works on top of intcomp library. intcomp can only compress slices which are multiple of 128, but // this function supports slices of any length. Residual part is always less than 128 numbers and is not delta encoded, // since we know the number of blocks with length non-multiple of 128 is very low. -func CompressDeltaBitpackUint32(dst []byte, values []uint32, buf []uint32) []byte { +func CompressDeltaBitpackUint32(dst []byte, values, buf []uint32) []byte { buf = buf[:0] buf = append(buf, uint32(len(values)))