diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index b1b22323..d3a97303 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -104,6 +104,7 @@ func analyzeIndex( if err := b.Unpack(readBlock()); err != nil { logger.Fatal("error unpacking block info", zap.Error(err)) } + ver := b.Info.BinaryDataVer docsCount := int(b.Info.DocsTotal) @@ -162,7 +163,7 @@ func analyzeIndex( } block := &lids.Block{} - if err := block.Unpack(data, &lids.UnpackBuffer{}); err != nil { + if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil { logger.Fatal("error unpacking lids block", zap.Error(err)) } diff --git a/config/frac_version.go b/config/frac_version.go index d3ff1b14..197ecfa1 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,6 +9,8 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 + // BinaryDataV3 - delta bitpack encoded MIDs and LIDs + BinaryDataV3 ) -const CurrentFracVersion = BinaryDataV2 +const CurrentFracVersion = BinaryDataV3 diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 161e1270..641c0043 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1636,8 +1636,8 @@ func (s *FractionTestSuite) TestFractionInfo() { "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500), - "index on disk doesn't match. actual value: %d", info.MetaOnDisk) + s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1550), + "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") } diff --git a/frac/remote.go b/frac/remote.go index c2088caa..fdfb276d 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -147,7 +147,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), + lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..d9bf5b9a 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -323,7 +323,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), + lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index 08884c5c..aff47483 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -1,10 +1,10 @@ package lids import ( - "encoding/binary" "math" "unsafe" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/packer" ) @@ -23,21 +23,16 @@ func (b *Block) getLIDs(i int) []uint32 { return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] } -func (b *Block) Pack(dst []byte) []byte { - lastLID := int64(0) - last := b.getCount() - 1 - for i := 0; i <= last; i++ { - for _, lid := range b.getLIDs(i) { - dst = binary.AppendVarint(dst, int64(lid)-lastLID) - lastLID = int64(lid) - } - - if i < last || b.IsLastLID { - // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) - // it is the end-marker; see `Block.Unpack()` - dst = binary.AppendVarint(dst, -1-lastLID) - } +func (b *Block) Pack(dst []byte, buf []uint32) []byte { + if b.IsLastLID { + dst = append(dst, 1) + } else { + dst = append(dst, 0) } + + dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, buf) + dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, buf) + return dst } @@ -49,13 +44,46 @@ func (b *Block) GetSizeBytes() int { return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) } -func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error { +func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *UnpackBuffer) error { + buf.Reset() + + if fracVer >= config.BinaryDataV3 { + return b.unpackBitpack(data, buf) + } + + return b.unpackVarint(data, buf) +} + +func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error { + if data[0] == 1 { + b.IsLastLID = true + } else { + b.IsLastLID = false + } + data = data[1:] + + var err error + var values []uint32 + + data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed) + if err != nil { + return err + } + b.Offsets = append([]uint32{}, values...) + + data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed) + if err != nil { + return err + } + b.LIDs = append([]uint32{}, values...) + return nil +} + +func (b *Block) unpackVarint(data []byte, buf *UnpackBuffer) error { var lid, offset uint32 b.IsLastLID = true - buf.lids = buf.lids[:0] - buf.offsets = buf.offsets[:0] buf.offsets = append(buf.offsets, 0) // first offset is always zero unpacker := packer.NewBytesUnpacker(data) diff --git a/frac/sealed/lids/block_test.go b/frac/sealed/lids/block_test.go new file mode 100644 index 00000000..5985eca6 --- /dev/null +++ b/frac/sealed/lids/block_test.go @@ -0,0 +1,263 @@ +package lids + +import ( + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" +) + +func TestBlockPack(t *testing.T) { + testCases := []struct { + name string + lids []uint32 + offsets []uint32 + isLastLID bool + generator func() ([]uint32, []uint32) + }{ + { + name: "small_single_token", + lids: generate(4), + offsets: []uint32{0, 4}, + isLastLID: true, + }, + { + name: "small_a_few_token", + lids: generate(6), + offsets: []uint32{0, 3, 6}, + isLastLID: true, + }, + { + name: "small_not_last_lid", + lids: generate(3), + offsets: []uint32{0, 3}, + isLastLID: false, + }, + { + name: "small_single_lid", + lids: []uint32{100}, + offsets: []uint32{0, 1}, + isLastLID: true, + }, + { + name: "small_big_lids", + lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10}, + offsets: []uint32{0, 3}, + isLastLID: true, + }, + { + name: "small_few_tokens", + lids: generate(8), + offsets: []uint32{0, 3, 6, 8}, + isLastLID: false, + }, + { + name: "medium_many_tokens", + generator: func() ([]uint32, []uint32) { + lids := make([]uint32, 0) + offsets := []uint32{0} + startLID := uint32(100) + for i := 0; i < 10; i++ { + for j := 0; j < 3; j++ { + lids = append(lids, startLID+uint32(i*10+j)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += 30 + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "large_is_last_lid_false", + generator: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids = append(lids, startLID+uint32(i*10)) + } + return lids, []uint32{0, uint32(len(lids))} + }, + isLastLID: false, + }, + { + name: "large_many_tokens", + generator: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 150) + offsets := []uint32{0} + groupSize := 30 + for group := 0; group < 5; group++ { + for i := 0; i < groupSize; i++ { + lids = append(lids, 1+uint32(group*groupSize*10+i*10)) + } + offsets = append(offsets, uint32(len(lids))) + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "medium_128_lids", + lids: generate(128), + offsets: []uint32{0, 128}, + isLastLID: true, + }, + { + name: "medium_127_lids", + lids: generate(127), + offsets: []uint32{0, 127}, + isLastLID: true, + }, + { + name: "medium_129_lids", + lids: generate(129), + offsets: []uint32{0, 129}, + isLastLID: true, + }, + { + name: "medium_4k_lids", + lids: generate(4096), + offsets: []uint32{0, 4096}, + isLastLID: true, + }, + { + name: "medium_4k_minus_one_lids", + lids: generate(4095), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4095}, + isLastLID: true, + }, + { + name: "medium_4k_plus_one_lids", + lids: generate(4097), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4097}, + isLastLID: true, + }, + { + name: "medium_64k_lids", + lids: generate(65536), + offsets: []uint32{0, 65536}, + isLastLID: false, + }, + { + name: "medium_64k_minus_one_lids", + lids: generate(65535), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65535}, + isLastLID: true, + }, + { + name: "medium_64k_plus_one_lids", + lids: generate(65537), + offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65537}, + isLastLID: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var lids []uint32 + var offsets []uint32 + + if tc.generator != nil { + lids, offsets = tc.generator() + } else { + lids = tc.lids + offsets = tc.offsets + } + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: tc.isLastLID, + } + + packed := block.Pack(nil, nil) + require.NotEmpty(t, packed) + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, config.CurrentFracVersion, buf) + + require.NoError(t, err) + assert.EqualExportedValues(t, block, unpacked) + }) + } +} + +func generate(n int) []uint32 { + v := make([]uint32, n) + last := uint32(100) + for i := range v { + v[i] = last + last += uint32(1 + rand.Intn(5)) + } + return v +} + +func TestBlockPack_ReuseBuffer(t *testing.T) { + // Test that UnpackBuffer can be reused + block1 := &Block{ + LIDs: []uint32{100, 150, 200}, + Offsets: []uint32{0, 3}, + IsLastLID: true, + } + + block2 := &Block{ + LIDs: []uint32{300, 350, 400, 450}, + Offsets: []uint32{0, 4}, + IsLastLID: true, + } + + packed1 := block1.Pack(nil, nil) + packed2 := block2.Pack(nil, nil) + + buf := &UnpackBuffer{} + + unpacked1 := &Block{} + err := unpacked1.Unpack(packed1, config.CurrentFracVersion, buf) + require.NoError(t, err) + assert.Equal(t, block1.LIDs, unpacked1.LIDs) + + unpacked2 := &Block{} + err = unpacked2.Unpack(packed2, config.CurrentFracVersion, buf) + require.NoError(t, err) + assert.Equal(t, block2.LIDs, unpacked2.LIDs) +} + +func BenchmarkBlock_Pack(b *testing.B) { + lids := generate(64 * 1024) + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 64 * 1024}, + IsLastLID: true, + } + tmp := make([]uint32, 0, 64*1024/4) + + for b.Loop() { + block.Pack(nil, tmp) + } +} + +func BenchmarkBlock_Unpack(b *testing.B) { + lids := generate(64 * 1024) + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 64 * 1024}, + IsLastLID: true, + } + packed := block.Pack(nil, nil) + + buf := &UnpackBuffer{} + unpacked := &Block{} + + b.ResetTimer() + for b.Loop() { + err := unpacked.Unpack(packed, config.CurrentFracVersion, buf) + assert.NoError(b, err) + } +} diff --git a/frac/sealed/lids/loader.go b/frac/sealed/lids/loader.go index e5ee8db1..818c9422 100644 --- a/frac/sealed/lids/loader.go +++ b/frac/sealed/lids/loader.go @@ -2,12 +2,39 @@ package lids import ( "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/storage" ) type UnpackBuffer struct { - lids []uint32 - offsets []uint32 + lids []uint32 + offsets []uint32 + decompressed []uint32 + compressed []uint32 +} + +func (b *UnpackBuffer) Reset() { + if b.lids == nil { + b.lids = make([]uint32, 0, 128) + } else { + b.lids = b.lids[:0] + } + if b.offsets == nil { + b.offsets = make([]uint32, 0, 8) + } else { + b.offsets = b.offsets[:0] + } + if b.decompressed == nil { + b.decompressed = make([]uint32, 0, consts.LIDBlockCap) + } else { + b.decompressed = b.decompressed[:0] + } + if b.compressed == nil { + b.compressed = make([]uint32, 0, consts.LIDBlockCap/2) + } else { + b.compressed = b.compressed[:0] + } } // Loader is responsible for reading from disk, unpacking and caching LID. @@ -18,13 +45,15 @@ type Loader struct { reader *storage.IndexReader unpackBuf *UnpackBuffer blockBuf []byte + fracVer config.BinaryDataVersion } -func NewLoader(r *storage.IndexReader, c *cache.Cache[*Block]) *Loader { +func NewLoader(fracVer config.BinaryDataVersion, r *storage.IndexReader, c *cache.Cache[*Block]) *Loader { return &Loader{ cache: c, reader: r, unpackBuf: &UnpackBuffer{}, + fracVer: fracVer, } } @@ -47,7 +76,7 @@ func (l *Loader) readLIDsBlock(blockIndex uint32) (*Block, error) { } block := &Block{} - err = block.Unpack(l.blockBuf, l.unpackBuf) + err = block.Unpack(l.blockBuf, l.fracVer, l.unpackBuf) if err != nil { return nil, err } diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 491c7233..ad698fcf 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -32,6 +32,8 @@ type IndexSealer struct { lastErr error // Last error encountered during processing buf1 []byte // Reusable buffer for packing raw data before compression buf2 []byte // Reusable buffer for compressed data + buf32 []uint32 // Reusable buffer for compressed arrays of uint32 + buf64 []uint64 // Reusable buffer for compressed arrays of uint64 params common.SealParams // Configuration parameters for sealing process // PreloadedData structures built during sealing for fast initialization of sealed fraction @@ -46,6 +48,8 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), + buf32: make([]uint32, 0, consts.LIDBlockCap), + buf64: make([]uint64, 0, consts.RegularBlockSize), } } @@ -388,8 +392,8 @@ func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { } s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData - // Packing block - s.buf1 = block.mids.Pack(s.buf1[:0]) + // Packing block (use reusable uint64 buffer for mid compression) + s.buf1 = block.mids.Pack(s.buf1[:0], s.buf64[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) // Store min MID and RID in extended metadata b.ext1 = uint64(minID.MID) @@ -426,7 +430,7 @@ func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) // Packing block - s.buf1 = block.payload.Pack(s.buf1[:0]) + s.buf1 = block.payload.Pack(s.buf1[:0], s.buf32[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) b.ext1 = ext1 // Legacy continuation flag b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index f17f1be5..33967cd0 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/seq" ) @@ -12,17 +13,22 @@ type BlockMIDs struct { Values []uint64 } -func (b BlockMIDs) Pack(dst []byte) []byte { - var prev uint64 - for _, mid := range b.Values { - dst = binary.AppendVarint(dst, int64(mid-prev)) - prev = mid - } - return dst +func (b BlockMIDs) Pack(dst []byte, buf []uint64) []byte { + return packer.CompressDeltaBitpackUint64(dst, b.Values, buf) } -func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion) error { - values, err := unpackRawMIDsVarint(data, b.Values, fracVersion) +func (b *BlockMIDs) Unpack(data []byte, fracVer config.BinaryDataVersion, cache *unpackCache) error { + if fracVer >= config.BinaryDataV3 { + // TODO use b.Values? + _, values, err := packer.DecompressDeltaBitpackUint64(data, cache.compressed, cache.values) + if err != nil { + return err + } + b.Values = append(b.Values[:0], values...) + return nil + } + + values, err := unpackRawMIDsVarint(data, b.Values, fracVer) if err != nil { return err } @@ -79,7 +85,7 @@ func (b *BlockParams) Unpack(data []byte) error { // unpackRawMIDsVarint is a dedicated method for unpacking delta encoded MIDs. The reason a dedicated method exists // is that we want to unpack values and potentially convert legacy frac version in one pass. -func unpackRawMIDsVarint(src []byte, dst []uint64, fracVersion config.BinaryDataVersion) ([]uint64, error) { +func unpackRawMIDsVarint(src []byte, dst []uint64, fracVer config.BinaryDataVersion) ([]uint64, error) { dst = dst[:0] id := uint64(0) for len(src) != 0 { @@ -94,7 +100,7 @@ func unpackRawMIDsVarint(src []byte, dst []uint64, fracVersion config.BinaryData } id += uint64(delta) - if fracVersion >= config.BinaryDataV2 { + if fracVer >= config.BinaryDataV2 { dst = append(dst, id) } else { // Legacy format - scale millis to nanos diff --git a/frac/sealed/seqids/blocks_test.go b/frac/sealed/seqids/blocks_test.go new file mode 100644 index 00000000..93fbaec7 --- /dev/null +++ b/frac/sealed/seqids/blocks_test.go @@ -0,0 +1,59 @@ +package seqids + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" +) + +func TestBlockMIDs_Pack(t *testing.T) { + tests := []struct { + name string + values []uint64 + }{ + { + name: "small_single", + values: []uint64{12345678901234}, + }, + { + name: "small_few", + values: []uint64{100, 200, 300, 400, 500}, + }, + { + name: "small_4k", + values: generate(1000000000000, 1000000), + }, + { + name: "small_4k_large_values", + values: generate(0xFFFFFFFF00000000, 1), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + original := BlockMIDs{Values: tt.values} + + packed := original.Pack(nil, nil) + + cache := NewCache() + defer cache.Release() + + unpacked := BlockMIDs{} + err := unpacked.Unpack(packed, config.CurrentFracVersion, cache) + + require.NoError(t, err) + require.EqualExportedValues(t, unpacked, original) + }) + } +} + +func generate(base, increment uint64) []uint64 { + values := make([]uint64, consts.IDsPerBlock) + for i := range values { + values[i] = base + uint64(i)*increment + } + return values +} diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index a4c9ecdb..defa865f 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -35,7 +35,7 @@ type Loader struct { fracVersion config.BinaryDataVersion } -func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { +func (l *Loader) GetMIDsBlock(index uint32, unpackCache *unpackCache) (BlockMIDs, error) { // load binary from index data, err := l.cacheMIDs.GetWithError(index, func() ([]byte, int, error) { data, _, err := l.reader.ReadIndexBlock(l.midBlockIndex(index), nil) @@ -49,8 +49,8 @@ func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { return BlockMIDs{}, err } // unpack - block := BlockMIDs{Values: buf} - if err := block.Unpack(data, l.fracVersion); err != nil { + block := BlockMIDs{Values: unpackCache.values[:0]} + if err := block.Unpack(data, l.fracVersion, unpackCache); err != nil { return BlockMIDs{}, err } return block, nil diff --git a/frac/sealed/seqids/provider.go b/frac/sealed/seqids/provider.go index 18a1c57c..b1f63161 100644 --- a/frac/sealed/seqids/provider.go +++ b/frac/sealed/seqids/provider.go @@ -55,7 +55,7 @@ func (p *Provider) MID(lid seq.LID) (seq.MID, error) { func (p *Provider) fillMIDs(blockIndex uint32, dst *unpackCache) error { if dst.blockIndex != int(blockIndex) { - block, err := p.loader.GetMIDsBlock(blockIndex, dst.values[:0]) + block, err := p.loader.GetMIDsBlock(blockIndex, dst) if err != nil { return err } diff --git a/frac/sealed/seqids/unpack_cache.go b/frac/sealed/seqids/unpack_cache.go index f621dd57..e900bd8b 100644 --- a/frac/sealed/seqids/unpack_cache.go +++ b/frac/sealed/seqids/unpack_cache.go @@ -10,6 +10,7 @@ type unpackCache struct { blockIndex int startLID uint32 values []uint64 + compressed []uint64 // buffer for decompressing MID blocks } var cachePool = sync.Pool{} @@ -26,6 +27,7 @@ func NewCache() *unpackCache { blockIndex: -1, startLID: 0, values: make([]uint64, 0, defaultValsCapacity), + compressed: make([]uint64, 0, defaultValsCapacity/2), } } @@ -33,6 +35,7 @@ func (c *unpackCache) reset() *unpackCache { c.blockIndex = -1 c.startLID = 0 c.values = c.values[:0] + c.compressed = c.compressed[:0] return c } diff --git a/go.mod b/go.mod index 4e9a15e6..df45cfd2 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/pkg/profile v1.7.0 github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 github.com/prometheus/client_golang v1.23.2 + github.com/ronanh/intcomp v1.1.1 github.com/stretchr/testify v1.11.1 github.com/valyala/fastrand v1.1.0 github.com/valyala/gozstd v1.24.0 diff --git a/go.sum b/go.sum index 92b59e95..b30da514 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlT github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/ronanh/intcomp v1.1.1 h1:+1bGV/wEBiHI0FvzS7RHgzqOpfbBJzLIxkqMJ9e6yxY= +github.com/ronanh/intcomp v1.1.1/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M= diff --git a/packer/bytes_unpacker.go b/packer/bytes_unpacker.go index a5e3fd59..1f728001 100644 --- a/packer/bytes_unpacker.go +++ b/packer/bytes_unpacker.go @@ -33,6 +33,15 @@ func (u *BytesUnpacker) GetUint32() uint32 { return val } +func (u *BytesUnpacker) SkipUints32(skipCount int) { + skipBytes := 4 * skipCount + u.buf = u.buf[skipBytes:] +} + +func (u *BytesUnpacker) GetBuffer() []byte { + return u.buf +} + func (u *BytesUnpacker) GetBinary() []byte { l := u.GetUint32() val := u.buf[:l] diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go new file mode 100644 index 00000000..3e90f0cf --- /dev/null +++ b/packer/delta_bitpacker.go @@ -0,0 +1,185 @@ +package packer + +import ( + "encoding/binary" + "errors" + "unsafe" + + "github.com/ronanh/intcomp" +) + +// CompressDeltaBitpackUint32 works on top of intcomp library. intcomp can only compress slices which are multiple of 128, but +// this function supports slices of any length. Residual part is always less than 128 numbers and is not delta encoded, +// since we know the number of blocks with length non-multiple of 128 is very low. +func CompressDeltaBitpackUint32(dst []byte, values, buf []uint32) []byte { + buf = buf[:0] + buf = append(buf, uint32(len(values))) + + var residual []uint32 + residual, buf = intcomp.CompressDeltaBinPackUint32(values, buf) + + if len(residual) > 0 { + // append residual as is. always less than 128 values + buf = append(buf, residual...) + } + + { + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(buf))) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint32(dst, v) + } + } + return dst +} + +// TODO simplify +func DecompressDeltaBitpackUint32(data []byte, compressed, decompressed []uint32) ([]byte, []uint32, error) { + if len(data) < 4 { + return nil, nil, errors.New("lids block decode error: truncated sequence length header") + } + + wordCount := binary.LittleEndian.Uint32(data[:4]) + data = data[4:] + + byteLen := int(wordCount) * 4 + if len(data) < byteLen { + return nil, nil, errors.New("lids block decode error: truncated sequence payload") + } + + // TODO if little endian => reinterpret cast data []byte as []uint32 + // otherwise, allocate slice + // remove compressed param + { + u32Count := int(wordCount) + if cap(compressed) < u32Count { + compressed = make([]uint32, u32Count) + } else { + compressed = compressed[:u32Count] + } + // TODO manual copy + for i := 0; i < u32Count; i++ { + compressed[i] = binary.LittleEndian.Uint32(data[i*4 : i*4+4]) + } + data = data[byteLen:] + + if len(compressed) == 0 { + return data, nil, nil + } + } + + count := int(compressed[0]) + if count == 0 { + return data, nil, nil + } + + compressed = compressed[1:] + + decompressed = decompressed[:0] + switch { + case count < intcomp.BitPackingBlockSize32: + if len(compressed) < count { + return nil, nil, errors.New("lids block decode error: residual payload truncated") + } + decompressed = append(decompressed, compressed[:count]...) + default: + remaining, out := intcomp.UncompressDeltaBinPackUint32(compressed, decompressed) + decompressed = out + if len(remaining) > 0 { + decompressed = append(decompressed, remaining...) + } + if len(decompressed) < count { + return nil, nil, errors.New("lids block decode error: decompressed length mismatch") + } + decompressed = decompressed[:count] + } + + return data, decompressed[:count], nil +} + +// CompressDeltaBitpackUint64 works on top of intcomp library. intcomp can only compress uint64 slices which are multiple of 256, but +// this function supports slices of any length. Residual part is always less than 256 uint64 numbers and is not delta encoded, +// since we know the number of blocks with length non-multiple of 256 is very low. +func CompressDeltaBitpackUint64(dst []byte, values, buf []uint64) []byte { + buf = buf[:0] + total := len(values) + buf = append(buf, uint64(total)) + + var residual []uint64 + residual, buf = intcomp.CompressDeltaBinPackUint64(values, buf) + if len(residual) > 0 { + // append residual as is. always less than 256 values + buf = append(buf, residual...) + } + + { + // TODO use memcpy + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(buf))) + for _, v := range buf { + dst = binary.LittleEndian.AppendUint64(dst, v) + } + } + return dst +} + +// TODO simplify +func DecompressDeltaBitpackUint64(data []byte, compressed, values []uint64) ([]byte, []uint64, error) { + if len(data) < 4 { + return nil, nil, errors.New("mids block decode error: truncated sequence length header") + } + + wordCount := binary.LittleEndian.Uint32(data[:4]) + data = data[4:] + + byteLen := int(wordCount) * 8 + if len(data) < byteLen { + return nil, nil, errors.New("mids block decode error: truncated sequence payload") + } + + // TODO if little endian => reinterpret cast data []byte as []uint64 + // otherwise, allocate slice + // remove compressed param + { + u64Count := int(wordCount) + if cap(compressed) < u64Count { + compressed = make([]uint64, u64Count) + } else { + compressed = compressed[:u64Count] + } + src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), u64Count) + copy(compressed, src) + data = data[byteLen:] + + if len(compressed) == 0 { + return data, nil, nil + } + } + + count := int(compressed[0]) + if count == 0 { + return data, nil, nil + } + + compressed = compressed[1:] + var decompressed []uint64 + switch { + case count < intcomp.BitPackingBlockSize64: + if len(compressed) < count { + return nil, nil, errors.New("mids block decode error: residual payload truncated") + } + decompressed = append([]uint64{}, compressed[:count]...) + default: + values = values[:0] + remaining, out := intcomp.UncompressDeltaBinPackUint64(compressed, values[:0]) + decompressed = out + if len(remaining) > 0 { + decompressed = append(decompressed, remaining...) + } + if len(decompressed) < count { + return nil, nil, errors.New("mids block decode error: decompressed length mismatch") + } + decompressed = decompressed[:count] + } + + return data, decompressed[:count], nil +} diff --git a/packer/delta_bitpacker_test.go b/packer/delta_bitpacker_test.go new file mode 100644 index 00000000..df1118fd --- /dev/null +++ b/packer/delta_bitpacker_test.go @@ -0,0 +1,150 @@ +package packer + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompressDeltaBitpackUint32(t *testing.T) { + testCases := []struct { + name string + values []uint32 + }{ + { + name: "empty", + values: []uint32{}, + }, + { + name: "small_single_value", + values: []uint32{1}, + }, + { + name: "small_few_values", + values: []uint32{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint32(127), + }, + { + name: "small_128", + values: generateUint32(128), + }, + { + name: "small_129", + values: generateUint32(129), + }, + { + name: "midium_4k", + values: generateUint32(4096), + }, + { + name: "midium_4k_more", + values: generateUint32(4105), + }, + { + name: "midium_64k", + values: generateUint32(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint32(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint32([]byte{}, tc.values, []uint32{}) + _, decompressed, err := DecompressDeltaBitpackUint32(compressed, []uint32{}, []uint32{}) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + +func TestCompressDeltaBitpackUint64(t *testing.T) { + testCases := []struct { + name string + values []uint64 + }{ + { + name: "empty", + values: []uint64{}, + }, + { + name: "small_single_value", + values: []uint64{1}, + }, + { + name: "small_few_values", + values: []uint64{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint64(127), + }, + { + name: "small_128", + values: generateUint64(128), + }, + { + name: "small_129", + values: generateUint64(129), + }, + { + name: "midium_4k", + values: generateUint64(4096), + }, + { + name: "midium_4k_more", + values: generateUint64(4105), + }, + { + name: "midium_64k", + values: generateUint64(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint64(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint64([]byte{}, tc.values, []uint64{}) + _, decompressed, err := DecompressDeltaBitpackUint64(compressed, []uint64{}, []uint64{}) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + +func generateUint32(n int) []uint32 { + v := make([]uint32, n) + last := uint32(100) + for i := range v { + v[i] = last + last += uint32(1 + rand.Intn(5)) + } + return v +} + +func generateUint64(n int) []uint64 { + v := make([]uint64, n) + last := uint64(100) + for i := range v { + v[i] = last + last += uint64(1 + rand.Intn(5)) + } + return v +}