Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions docsfilter/docs_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (

"go.uber.org/zap"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/util"
Expand All @@ -30,6 +32,8 @@ const (

const (
defaultMaintenanceInterval = 30 * time.Second
defaultCacheCleanInterval = 10 * time.Millisecond
defaultCacheGCDelay = 1 * time.Second
)

type MappingProvider interface {
Expand Down Expand Up @@ -57,6 +61,11 @@ type DocsFilter struct {
createDirOnce *sync.Once

maintenanceInterval time.Duration
cacheCleanInterval time.Duration
cacheGCDelay time.Duration

headersCache *cache.Cache[[]lidsBlockHeader]
headersCacheCleaner *cache.Cleaner
}

func New(
Expand All @@ -77,6 +86,8 @@ func New(
filtersMap[string(f.Hash())] = f
}

cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil)

return &DocsFilter{
ctx: ctx,
config: cfg,
Expand All @@ -87,6 +98,10 @@ func New(
rateLimit: make(chan struct{}, workers),
createDirOnce: &sync.Once{},
maintenanceInterval: defaultMaintenanceInterval,
cacheCleanInterval: defaultCacheCleanInterval,
cacheGCDelay: defaultCacheGCDelay,
headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil),
headersCacheCleaner: cacheCleaner,
}
}

Expand All @@ -104,6 +119,7 @@ func (df *DocsFilter) Start(fracs fracmanager.List) {
}

go df.maintenance()
go df.cacheCleanLoop()

mapping := df.mp.GetMapping()

Expand All @@ -118,6 +134,32 @@ func (df *DocsFilter) Start(fracs fracmanager.List) {
}
}

func (df *DocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) {
df.fracsMu.RLock()
defer df.fracsMu.RUnlock()

fracFiles, has := df.fracs[fracName]
if !has {
return &EmptyIterator{}, nil
}

iterators := make([]node.Node, 0, len(fracFiles))
for _, f := range fracFiles {
loader, err := newLoader(f, df.headersCache)
if err != nil {
logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err))
return nil, err
}
if reverse {
iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID)))
} else {
iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID)))
}
}

return NewNMergedIterators(iterators, reverse), nil
}

// RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed.
func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) {
df.fracsMu.RLock()
Expand Down Expand Up @@ -353,6 +395,25 @@ func (df *DocsFilter) maintenance() {
}
}

func (df *DocsFilter) cacheCleanLoop() {
runs := 0
gcRunsCount := int(df.cacheGCDelay / df.cacheCleanInterval)

for {
runs++
df.headersCacheCleaner.Cleanup(&cache.CleanStat{})
df.headersCacheCleaner.Rotate()

if runs >= gcRunsCount {
runs = 0
df.headersCacheCleaner.CleanEmptyGenerations()
df.headersCacheCleaner.ReleaseBuckets()
}

time.Sleep(df.cacheCleanInterval)
}
}

func (df *DocsFilter) checkDiskUsage() {
du := int64(0)

Expand Down
61 changes: 61 additions & 0 deletions docsfilter/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package docsfilter

import "sort"

type Iterator struct {
loader *loader

minLID uint32
maxLID uint32

blockIndex int
tryNextBlock bool

lids []uint32
}

func NewIterator(
loader *loader,
minLID uint32,
maxLID uint32,
) *Iterator {
return &Iterator{
loader: loader,
minLID: minLID,
maxLID: maxLID,
tryNextBlock: true,
}
}

func (it *Iterator) hasLIDsInRange() bool {
if it.loader.headers[it.blockIndex].MinLID > it.maxLID {
return false
}
if it.loader.headers[it.blockIndex].MaxLID < it.minLID {
return false
}

return true
}

// narrowLIDsRange cuts LIDs between from and to. Returns new lids
func (it *Iterator) narrowLIDsRange(lids []uint32) []uint32 {
if len(lids) == 0 {
return lids
}

first := lids[0]
last := lids[len(lids)-1]

if it.minLID > first {
left := sort.Search(len(lids), func(i int) bool { return lids[i] >= it.minLID })
lids = lids[left:]
}

if it.maxLID <= last {
right := sort.Search(len(lids), func(i int) bool { return lids[i] > it.maxLID })
lids = lids[:right]
}

return lids
}
62 changes: 62 additions & 0 deletions docsfilter/iterator_asc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package docsfilter

import (
"go.uber.org/zap"

"github.com/ozontech/seq-db/logger"
)

type IteratorAsc Iterator

func (it *IteratorAsc) String() string {
return "TOMBSTONES_ITERATOR_ASC"
}

func (it *IteratorAsc) Next() (uint32, bool) {
if it.loader.headers == nil {
headers, err := it.loader.getHeaders()
if err != nil {
logger.Panic("can't load tombstones headers", zap.Error(err))
}
it.loader.headers = headers
it.blockIndex = len(it.loader.headers) - 1
}

for len(it.lids) == 0 {
if !it.tryNextBlock {
if err := it.loader.release(); err != nil {
logger.Panic("error closing loader", zap.Error(err))
}
return 0, false
}

it.loadNextLIDsBlock()
it.lids = (*Iterator)(it).narrowLIDsRange(it.lids)
}

i := len(it.lids) - 1
lid := it.lids[i]
it.lids = it.lids[:i]
return lid, true
}

func (it *IteratorAsc) loadNextLIDsBlock() {
hasLIDsInRange := (*Iterator)(it).hasLIDsInRange()
if !hasLIDsInRange {
it.needTryNextBlock()
return
}

block, err := it.loader.loadBlock(it.blockIndex)
if err != nil {
logger.Panic("error loading LIDs block", zap.Error(err))
}

it.lids = block
it.needTryNextBlock()
}

func (it *IteratorAsc) needTryNextBlock() {
it.tryNextBlock = it.blockIndex > 0
it.blockIndex--
}
72 changes: 72 additions & 0 deletions docsfilter/iterator_asc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package docsfilter

import (
"math"
"os"
"path/filepath"
"slices"
"testing"

"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/seq"
)

func TestIteratorAsc(t *testing.T) {
multipleBlocksSize := maxLIDsBlockLen*3 + 15
multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize)
for i := range multipleBlocksSize {
multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i))
}

reversed := make([]uint32, len(multipleBlocksLIDs))
copy(reversed, lidsToUint32s(multipleBlocksLIDs))
slices.Reverse(reversed)

type testCase struct {
title string
minLID, maxLID uint32
expected []uint32
}

tests := []testCase{
{
title: "ok_without_borders",
minLID: 0,
maxLID: math.MaxUint32,
expected: reversed,
},
{
title: "ok_with_borders",
minLID: maxLIDsBlockLen + 10,
maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 10)),
expected: reversed[maxLIDsBlockLen+9 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+10)],
},
{
title: "ok_out_of_borders",
minLID: uint32(len(multipleBlocksLIDs) + 100),
maxLID: uint32(len(multipleBlocksLIDs) + 200),
expected: []uint32{},
},
}

for _, tc := range tests {
t.Run(tc.title, func(t *testing.T) {
rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs})
filePath := filepath.Join(t.TempDir(), "some.filter")
err := os.WriteFile(filePath, rawDocsFilter, 0o644)
require.NoError(t, err)

loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
require.NoError(t, err)

iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID))
resLIDs := make([]uint32, 0, len(tc.expected))
for lid, has := iterator.Next(); has; lid, has = iterator.Next() {
resLIDs = append(resLIDs, lid)
}
require.Equal(t, tc.expected, resLIDs)
})
}
}
60 changes: 60 additions & 0 deletions docsfilter/iterator_desc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package docsfilter

import (
"go.uber.org/zap"

"github.com/ozontech/seq-db/logger"
)

type IteratorDesc Iterator

func (it *IteratorDesc) String() string {
return "TOMBSTONES_ITERATOR_DESC"
}

func (it *IteratorDesc) Next() (uint32, bool) {
if it.loader.headers == nil {
headers, err := it.loader.getHeaders()
if err != nil {
logger.Panic("can't load tombstones headers", zap.Error(err))
}
it.loader.headers = headers
}

for len(it.lids) == 0 {
if !it.tryNextBlock {
if err := it.loader.release(); err != nil {
logger.Panic("error closing loader", zap.Error(err))
}
return 0, false
}

it.loadNextLIDsBlock()
it.lids = (*Iterator)(it).narrowLIDsRange(it.lids)
}

lid := it.lids[0]
it.lids = it.lids[1:]
return lid, true
}

func (it *IteratorDesc) loadNextLIDsBlock() {
hasLIDsInRange := (*Iterator)(it).hasLIDsInRange()
if !hasLIDsInRange {
it.needTryNextBlock()
return
}

block, err := it.loader.loadBlock(it.blockIndex)
if err != nil {
logger.Panic("error loading LIDs block", zap.Error(err))
}

it.lids = block
it.needTryNextBlock()
}

func (it *IteratorDesc) needTryNextBlock() {
it.tryNextBlock = it.blockIndex < len(it.loader.headers)-1
it.blockIndex++
}
Loading