From 5cb9698d840336ad64790f4813d56f8f4f1b29a0 Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 6 Mar 2026 15:45:29 +0500 Subject: [PATCH] feat: add DocsFilter read path --- docsfilter/docs_filter.go | 61 +++++++++++ docsfilter/iterator.go | 61 +++++++++++ docsfilter/iterator_asc.go | 62 +++++++++++ docsfilter/iterator_asc_test.go | 72 +++++++++++++ docsfilter/iterator_desc.go | 60 +++++++++++ docsfilter/iterator_desc_test.go | 67 ++++++++++++ docsfilter/loader.go | 160 +++++++++++++++++++++++++++++ docsfilter/loader_test.go | 39 +++++++ docsfilter/merged_iterator.go | 108 +++++++++++++++++++ docsfilter/merged_iterator_test.go | 67 ++++++++++++ 10 files changed, 757 insertions(+) create mode 100644 docsfilter/iterator.go create mode 100644 docsfilter/iterator_asc.go create mode 100644 docsfilter/iterator_asc_test.go create mode 100644 docsfilter/iterator_desc.go create mode 100644 docsfilter/iterator_desc_test.go create mode 100644 docsfilter/loader.go create mode 100644 docsfilter/loader_test.go create mode 100644 docsfilter/merged_iterator.go create mode 100644 docsfilter/merged_iterator_test.go diff --git a/docsfilter/docs_filter.go b/docsfilter/docs_filter.go index 2a092f7e..2872f40f 100644 --- a/docsfilter/docs_filter.go +++ b/docsfilter/docs_filter.go @@ -13,10 +13,12 @@ import ( "go.uber.org/zap" + "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/util" @@ -30,6 +32,8 @@ const ( const ( defaultMaintenanceInterval = 30 * time.Second + defaultCacheCleanInterval = 10 * time.Millisecond + defaultCacheGCDelay = 1 * time.Second ) type MappingProvider interface { @@ -57,6 +61,11 @@ type DocsFilter struct { createDirOnce *sync.Once maintenanceInterval time.Duration + cacheCleanInterval time.Duration + cacheGCDelay time.Duration + + headersCache *cache.Cache[[]lidsBlockHeader] + headersCacheCleaner *cache.Cleaner } func New( @@ -77,6 +86,8 @@ func New( filtersMap[string(f.Hash())] = f } + cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil) + return &DocsFilter{ ctx: ctx, config: cfg, @@ -87,6 +98,10 @@ func New( rateLimit: make(chan struct{}, workers), createDirOnce: &sync.Once{}, maintenanceInterval: defaultMaintenanceInterval, + cacheCleanInterval: defaultCacheCleanInterval, + cacheGCDelay: defaultCacheGCDelay, + headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil), + headersCacheCleaner: cacheCleaner, } } @@ -104,6 +119,7 @@ func (df *DocsFilter) Start(fracs fracmanager.List) { } go df.maintenance() + go df.cacheCleanLoop() mapping := df.mp.GetMapping() @@ -118,6 +134,32 @@ func (df *DocsFilter) Start(fracs fracmanager.List) { } } +func (df *DocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + df.fracsMu.RLock() + defer df.fracsMu.RUnlock() + + fracFiles, has := df.fracs[fracName] + if !has { + return &EmptyIterator{}, nil + } + + iterators := make([]node.Node, 0, len(fracFiles)) + for _, f := range fracFiles { + loader, err := newLoader(f, df.headersCache) + if err != nil { + logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err)) + return nil, err + } + if reverse { + iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) + } else { + iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID))) + } + } + + return NewNMergedIterators(iterators, reverse), nil +} + // RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed. func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) { df.fracsMu.RLock() @@ -353,6 +395,25 @@ func (df *DocsFilter) maintenance() { } } +func (df *DocsFilter) cacheCleanLoop() { + runs := 0 + gcRunsCount := int(df.cacheGCDelay / df.cacheCleanInterval) + + for { + runs++ + df.headersCacheCleaner.Cleanup(&cache.CleanStat{}) + df.headersCacheCleaner.Rotate() + + if runs >= gcRunsCount { + runs = 0 + df.headersCacheCleaner.CleanEmptyGenerations() + df.headersCacheCleaner.ReleaseBuckets() + } + + time.Sleep(df.cacheCleanInterval) + } +} + func (df *DocsFilter) checkDiskUsage() { du := int64(0) diff --git a/docsfilter/iterator.go b/docsfilter/iterator.go new file mode 100644 index 00000000..7d0b9392 --- /dev/null +++ b/docsfilter/iterator.go @@ -0,0 +1,61 @@ +package docsfilter + +import "sort" + +type Iterator struct { + loader *loader + + minLID uint32 + maxLID uint32 + + blockIndex int + tryNextBlock bool + + lids []uint32 +} + +func NewIterator( + loader *loader, + minLID uint32, + maxLID uint32, +) *Iterator { + return &Iterator{ + loader: loader, + minLID: minLID, + maxLID: maxLID, + tryNextBlock: true, + } +} + +func (it *Iterator) hasLIDsInRange() bool { + if it.loader.headers[it.blockIndex].MinLID > it.maxLID { + return false + } + if it.loader.headers[it.blockIndex].MaxLID < it.minLID { + return false + } + + return true +} + +// narrowLIDsRange cuts LIDs between from and to. Returns new lids +func (it *Iterator) narrowLIDsRange(lids []uint32) []uint32 { + if len(lids) == 0 { + return lids + } + + first := lids[0] + last := lids[len(lids)-1] + + if it.minLID > first { + left := sort.Search(len(lids), func(i int) bool { return lids[i] >= it.minLID }) + lids = lids[left:] + } + + if it.maxLID <= last { + right := sort.Search(len(lids), func(i int) bool { return lids[i] > it.maxLID }) + lids = lids[:right] + } + + return lids +} diff --git a/docsfilter/iterator_asc.go b/docsfilter/iterator_asc.go new file mode 100644 index 00000000..da930c76 --- /dev/null +++ b/docsfilter/iterator_asc.go @@ -0,0 +1,62 @@ +package docsfilter + +import ( + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type IteratorAsc Iterator + +func (it *IteratorAsc) String() string { + return "TOMBSTONES_ITERATOR_ASC" +} + +func (it *IteratorAsc) Next() (uint32, bool) { + if it.loader.headers == nil { + headers, err := it.loader.getHeaders() + if err != nil { + logger.Panic("can't load tombstones headers", zap.Error(err)) + } + it.loader.headers = headers + it.blockIndex = len(it.loader.headers) - 1 + } + + for len(it.lids) == 0 { + if !it.tryNextBlock { + if err := it.loader.release(); err != nil { + logger.Panic("error closing loader", zap.Error(err)) + } + return 0, false + } + + it.loadNextLIDsBlock() + it.lids = (*Iterator)(it).narrowLIDsRange(it.lids) + } + + i := len(it.lids) - 1 + lid := it.lids[i] + it.lids = it.lids[:i] + return lid, true +} + +func (it *IteratorAsc) loadNextLIDsBlock() { + hasLIDsInRange := (*Iterator)(it).hasLIDsInRange() + if !hasLIDsInRange { + it.needTryNextBlock() + return + } + + block, err := it.loader.loadBlock(it.blockIndex) + if err != nil { + logger.Panic("error loading LIDs block", zap.Error(err)) + } + + it.lids = block + it.needTryNextBlock() +} + +func (it *IteratorAsc) needTryNextBlock() { + it.tryNextBlock = it.blockIndex > 0 + it.blockIndex-- +} diff --git a/docsfilter/iterator_asc_test.go b/docsfilter/iterator_asc_test.go new file mode 100644 index 00000000..c48c5402 --- /dev/null +++ b/docsfilter/iterator_asc_test.go @@ -0,0 +1,72 @@ +package docsfilter + +import ( + "math" + "os" + "path/filepath" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/seq" +) + +func TestIteratorAsc(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + reversed := make([]uint32, len(multipleBlocksLIDs)) + copy(reversed, lidsToUint32s(multipleBlocksLIDs)) + slices.Reverse(reversed) + + type testCase struct { + title string + minLID, maxLID uint32 + expected []uint32 + } + + tests := []testCase{ + { + title: "ok_without_borders", + minLID: 0, + maxLID: math.MaxUint32, + expected: reversed, + }, + { + title: "ok_with_borders", + minLID: maxLIDsBlockLen + 10, + maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 10)), + expected: reversed[maxLIDsBlockLen+9 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+10)], + }, + { + title: "ok_out_of_borders", + minLID: uint32(len(multipleBlocksLIDs) + 100), + maxLID: uint32(len(multipleBlocksLIDs) + 200), + expected: []uint32{}, + }, + } + + for _, tc := range tests { + t.Run(tc.title, func(t *testing.T) { + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) + require.NoError(t, err) + + iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID)) + resLIDs := make([]uint32, 0, len(tc.expected)) + for lid, has := iterator.Next(); has; lid, has = iterator.Next() { + resLIDs = append(resLIDs, lid) + } + require.Equal(t, tc.expected, resLIDs) + }) + } +} diff --git a/docsfilter/iterator_desc.go b/docsfilter/iterator_desc.go new file mode 100644 index 00000000..6c1f19e8 --- /dev/null +++ b/docsfilter/iterator_desc.go @@ -0,0 +1,60 @@ +package docsfilter + +import ( + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type IteratorDesc Iterator + +func (it *IteratorDesc) String() string { + return "TOMBSTONES_ITERATOR_DESC" +} + +func (it *IteratorDesc) Next() (uint32, bool) { + if it.loader.headers == nil { + headers, err := it.loader.getHeaders() + if err != nil { + logger.Panic("can't load tombstones headers", zap.Error(err)) + } + it.loader.headers = headers + } + + for len(it.lids) == 0 { + if !it.tryNextBlock { + if err := it.loader.release(); err != nil { + logger.Panic("error closing loader", zap.Error(err)) + } + return 0, false + } + + it.loadNextLIDsBlock() + it.lids = (*Iterator)(it).narrowLIDsRange(it.lids) + } + + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true +} + +func (it *IteratorDesc) loadNextLIDsBlock() { + hasLIDsInRange := (*Iterator)(it).hasLIDsInRange() + if !hasLIDsInRange { + it.needTryNextBlock() + return + } + + block, err := it.loader.loadBlock(it.blockIndex) + if err != nil { + logger.Panic("error loading LIDs block", zap.Error(err)) + } + + it.lids = block + it.needTryNextBlock() +} + +func (it *IteratorDesc) needTryNextBlock() { + it.tryNextBlock = it.blockIndex < len(it.loader.headers)-1 + it.blockIndex++ +} diff --git a/docsfilter/iterator_desc_test.go b/docsfilter/iterator_desc_test.go new file mode 100644 index 00000000..c722c606 --- /dev/null +++ b/docsfilter/iterator_desc_test.go @@ -0,0 +1,67 @@ +package docsfilter + +import ( + "math" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/seq" +) + +func TestIteratorDesc(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + type testCase struct { + title string + minLID, maxLID uint32 + expected []uint32 + } + + tests := []testCase{ + { + title: "ok_without_borders", + minLID: 0, + maxLID: math.MaxUint32, + expected: lidsToUint32s(multipleBlocksLIDs), + }, + { + title: "ok_with_borders", + minLID: maxLIDsBlockLen + 10, + maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 10)), + expected: lidsToUint32s(multipleBlocksLIDs[maxLIDsBlockLen+10 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+9)]), + }, + { + title: "ok_out_of_borders", + minLID: uint32(len(multipleBlocksLIDs) + 100), + maxLID: uint32(len(multipleBlocksLIDs) + 200), + expected: []uint32{}, + }, + } + + for _, tc := range tests { + t.Run(tc.title, func(t *testing.T) { + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) + require.NoError(t, err) + + iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID)) + resLIDs := make([]uint32, 0, len(tc.expected)) + for lid, has := iterator.Next(); has; lid, has = iterator.Next() { + resLIDs = append(resLIDs, lid) + } + require.Equal(t, tc.expected, resLIDs) + }) + } +} diff --git a/docsfilter/loader.go b/docsfilter/loader.go new file mode 100644 index 00000000..ac963c01 --- /dev/null +++ b/docsfilter/loader.go @@ -0,0 +1,160 @@ +package docsfilter + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "io" + "os" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/logger" +) + +type loader struct { + filePath string + headers []lidsBlockHeader + file *os.File + headersCache *cache.Cache[[]lidsBlockHeader] + cashKey uint32 +} + +func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) (*loader, error) { + hash := fnv.New32a() + hash.Write([]byte(filterNameFromTombstonesPath(filePath) + fracNameFromFilePath(filePath))) + + return &loader{ + filePath: filePath, + headersCache: headersCache, + cashKey: hash.Sum32(), + }, nil +} + +func (l *loader) getFile() (*os.File, error) { + if l.file == nil { + f, err := os.Open(l.filePath) + if err != nil { + return nil, err + } + l.file = f + } + return l.file, nil +} + +func (l *loader) getHeaders() ([]lidsBlockHeader, error) { + return l.headersCache.GetWithError(l.cashKey, func() ([]lidsBlockHeader, int, error) { + headers, err := l.loadHeaders() + if err != nil { + return headers, 0, err + } + size := len(headers) * int(lidsBlockHeaderSizeBytes) + return headers, size, nil + }) +} + +func (l *loader) loadHeaders() ([]lidsBlockHeader, error) { + file, err := l.getFile() + if err != nil { + return nil, err + } + + numBuf := make([]byte, 1+4) // block version 1 byte + number of blocks 4 bytes + n, err := file.ReadAt(numBuf, 0) + if err != nil { + return nil, fmt.Errorf("can't read headers from disk: %s", err.Error()) + } + if n == 0 { + return nil, fmt.Errorf("can't read headers from disk: n=0") + } + + version := docsFilterBinVersion(numBuf[0]) + if _, ok := availableVersions[version]; !ok { + return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + } + + headersPos := n + numberOfBlocks := binary.BigEndian.Uint32(numBuf[1:]) + headersBuf := make([]byte, numberOfBlocks*uint32(lidsBlockHeaderSizeBytes)) + + n, err = file.ReadAt(headersBuf, int64(headersPos)) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("can't read headers, %s", err.Error()) + } + if n != len(headersBuf) { + return nil, fmt.Errorf("can't read headers, read=%d, requested=%d", n, len(headersBuf)) + } + if len(headersBuf)%int(lidsBlockHeaderSizeBytes) != 0 { + return nil, fmt.Errorf("wrong headers format") + } + + headers := make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + headersBuf, err = header.unmarshal(headersBuf) + if err != nil { + return nil, fmt.Errorf("can't unmarshal lids header: %s", err) + } + headers = append(headers, header) + } + + if len(headersBuf) > 0 { + return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs headers") + } + + return headers, nil +} + +func (l *loader) loadBlock(index int) ([]uint32, error) { + if l.headers == nil { + headers, err := l.getHeaders() + if err != nil { + return nil, err + } + l.headers = headers + } + + if len(l.headers) < index+1 { + return nil, fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) + } + + file, err := l.getFile() + if err != nil { + return nil, err + } + + header := l.headers[index] + + blockBuf := make([]byte, header.Size) + n, err := file.ReadAt(blockBuf, int64(header.Offset)) + if err != nil { + return nil, err + } + if n != len(blockBuf) { + return nil, fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) + } + + lids := make([]uint32, 0, header.Length) + lids, blockBuf, err = unmarshalLIDsBlock(lids, blockBuf, header) + if err != nil { + return nil, err + } + + if len(blockBuf) > 0 { + return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return lids, nil +} + +func (l *loader) release() error { + if l.file != nil { + if err := l.file.Close(); err != nil { + logger.Error("can't close tombstones file", zap.Error(err)) + return err + } + l.file = nil + } + return nil +} diff --git a/docsfilter/loader_test.go b/docsfilter/loader_test.go new file mode 100644 index 00000000..4ef8687a --- /dev/null +++ b/docsfilter/loader_test.go @@ -0,0 +1,39 @@ +package docsfilter + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/seq" +) + +func TestLoader(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) + require.NoError(t, err) + + resLIDs := make([]uint32, 0, len(multipleBlocksLIDs)) + const numberOfBlocks = 4 + for i := range numberOfBlocks { + block, err := loader.loadBlock(i) + require.NoError(t, err) + resLIDs = append(resLIDs, block...) + } + require.Equal(t, lidsToUint32s(multipleBlocksLIDs), resLIDs) + + require.NoError(t, loader.release()) +} diff --git a/docsfilter/merged_iterator.go b/docsfilter/merged_iterator.go new file mode 100644 index 00000000..742e80c6 --- /dev/null +++ b/docsfilter/merged_iterator.go @@ -0,0 +1,108 @@ +package docsfilter + +import "github.com/ozontech/seq-db/node" + +func NewNMergedIterators(iterators []node.Node, reverse bool) node.Node { + if len(iterators) == 0 { + return &EmptyIterator{} + } + + if len(iterators) == 1 { + return iterators[0] + } + + merged := NewMergedIterator(iterators[0], iterators[1], reverse) + for _, s := range iterators[2:] { + merged = NewMergedIterator(merged, s, reverse) + } + return merged +} + +type MergedIterator struct { + a, b node.Node + curA, curB uint32 + init bool + lessFn func(a, b uint32) bool +} + +func NewMergedIterator( + a, b node.Node, + reverse bool, +) node.Node { + lessFn := func(a, b uint32) bool { return a < b } + if reverse { + lessFn = func(a, b uint32) bool { return a > b } + } + + return &MergedIterator{ + a: a, + b: b, + init: false, + lessFn: lessFn, + } +} + +func (it *MergedIterator) String() string { + return "MERGED_TOMBSTONES_ITERATOR" +} + +func (it *MergedIterator) Next() (uint32, bool) { + if !it.init { + it.readA() + it.readB() + it.init = true + } + + if it.a == nil && it.b == nil { + return 0, false + } + if it.a == nil { + return it.readB(), true + } + if it.b == nil { + return it.readA(), true + } + + if it.curA == it.curB { + it.readA() // skip duplicate + if it.a == nil { + return it.readB(), true + } + } + if it.lessFn(it.curB, it.curA) { + return it.readB(), true + } + return it.readA(), true +} + +func (it *MergedIterator) readA() uint32 { + var has bool + current := it.curA + + if it.curA, has = it.a.Next(); !has { + it.a = nil // stop reading a + } + + return current +} + +func (it *MergedIterator) readB() uint32 { + var has bool + current := it.curB + + if it.curB, has = it.b.Next(); !has { + it.b = nil // stop reading b + } + + return current +} + +type EmptyIterator struct{} + +func (it *EmptyIterator) String() string { + return "EMPTY_TOMBSTONES_ITERATOR" +} + +func (it *EmptyIterator) Next() (uint32, bool) { + return 0, false +} diff --git a/docsfilter/merged_iterator_test.go b/docsfilter/merged_iterator_test.go new file mode 100644 index 00000000..1a5b23c3 --- /dev/null +++ b/docsfilter/merged_iterator_test.go @@ -0,0 +1,67 @@ +package docsfilter + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/node" +) + +func TestMergedIterator(t *testing.T) { + iterators := []node.Node{ + &testIterator{lids: []uint32{1, 2, 5, 22, 45}}, + &testIterator{lids: []uint32{2, 3, 9, 15, 33, 45}}, + &testIterator{lids: []uint32{1, 7, 8, 45}}, + } + + mergedIterator := NewNMergedIterators(iterators, false) + resLIDs := make([]uint32, 0) + for { + lid, has := mergedIterator.Next() + if !has { + break + } + resLIDs = append(resLIDs, lid) + + } + require.Equal(t, []uint32{1, 2, 3, 5, 7, 8, 9, 15, 22, 33, 45}, resLIDs) +} + +func TestMergedIteratorReverse(t *testing.T) { + iterators := []node.Node{ + &testIterator{lids: []uint32{45, 22, 5, 2, 1}}, + &testIterator{lids: []uint32{45, 33, 15, 9, 3, 2}}, + &testIterator{lids: []uint32{45, 8, 7, 1}}, + } + + mergedIterator := NewNMergedIterators(iterators, true) + resLIDs := make([]uint32, 0) + for { + lid, has := mergedIterator.Next() + if !has { + break + } + resLIDs = append(resLIDs, lid) + + } + require.Equal(t, []uint32{45, 33, 22, 15, 9, 8, 7, 5, 3, 2, 1}, resLIDs) +} + +type testIterator struct { + lids []uint32 +} + +func (it *testIterator) String() string { + return "TEST_TOMBSTONES_ITERATOR" +} + +func (it *testIterator) Next() (uint32, bool) { + if len(it.lids) == 0 { + return 0, false + } + + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true +}