Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func analyzeIndex(
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}
ver := b.Info.BinaryDataVer

docsCount := int(b.Info.DocsTotal)

Expand Down Expand Up @@ -162,7 +163,7 @@ func analyzeIndex(
}

block := &lids.Block{}
if err := block.Unpack(data, &lids.UnpackBuffer{}); err != nil {
if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil {
logger.Fatal("error unpacking lids block", zap.Error(err))
}

Expand Down
4 changes: 3 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
BinaryDataV1
// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2
// BinaryDataV3 - delta bitpack encoded MIDs and LIDs
BinaryDataV3
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV3
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,8 +1636,8 @@ func (s *FractionTestSuite) TestFractionInfo() {
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500),
"index on disk doesn't match. actual value: %d", info.MetaOnDisk)
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1550),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
}
Expand Down
2 changes: 1 addition & 1 deletion frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
2 changes: 1 addition & 1 deletion frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
64 changes: 46 additions & 18 deletions frac/sealed/lids/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package lids

import (
"encoding/binary"
"math"
"unsafe"

"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/packer"
)

Expand All @@ -23,21 +23,16 @@ func (b *Block) getLIDs(i int) []uint32 {
return b.LIDs[b.Offsets[i]:b.Offsets[i+1]]
}

func (b *Block) Pack(dst []byte) []byte {
lastLID := int64(0)
last := b.getCount() - 1
for i := 0; i <= last; i++ {
for _, lid := range b.getLIDs(i) {
dst = binary.AppendVarint(dst, int64(lid)-lastLID)
lastLID = int64(lid)
}

if i < last || b.IsLastLID {
// when we add this value to prev we must get -1 (or math.MaxUint32 for uint32)
// it is the end-marker; see `Block.Unpack()`
dst = binary.AppendVarint(dst, -1-lastLID)
}
func (b *Block) Pack(dst []byte, buf []uint32) []byte {
if b.IsLastLID {
dst = append(dst, 1)
} else {
dst = append(dst, 0)
}

dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, buf)
dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, buf)

return dst
}

Expand All @@ -49,13 +44,46 @@ func (b *Block) GetSizeBytes() int {
return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets)
}

func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error {
func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *UnpackBuffer) error {
buf.Reset()

if fracVer >= config.BinaryDataV3 {
return b.unpackBitpack(data, buf)
}

return b.unpackVarint(data, buf)
}

func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error {
if data[0] == 1 {
b.IsLastLID = true
} else {
b.IsLastLID = false
}
data = data[1:]

var err error
var values []uint32

data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed)
if err != nil {
return err
}
b.Offsets = append([]uint32{}, values...)

data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.compressed, buf.decompressed)
if err != nil {
return err
}
b.LIDs = append([]uint32{}, values...)
return nil
}

func (b *Block) unpackVarint(data []byte, buf *UnpackBuffer) error {
var lid, offset uint32

b.IsLastLID = true

buf.lids = buf.lids[:0]
buf.offsets = buf.offsets[:0]
buf.offsets = append(buf.offsets, 0) // first offset is always zero

unpacker := packer.NewBytesUnpacker(data)
Expand Down
Loading
Loading