Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ozontech/seq-db/buildinfo"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/docsfilter"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/fracmanager"
Expand Down Expand Up @@ -312,10 +313,15 @@ func startStore(
From: cfg.Filtering.From,
},
},
Filters: docsfilter.Config{
DataDir: cfg.DocsFilter.DataDir,
Workers: cfg.DocsFilter.Concurrency,
CacheSizeLimit: uint64(cfg.DocsFilter.CacheSize),
},
}

s3cli := initS3Client(cfg)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, docFilterParamsFromCfg(cfg.DocsFilter.Filters))
if err != nil {
logger.Fatal("initializing store", zap.Error(err))
}
Expand Down Expand Up @@ -358,3 +364,15 @@ func initS3Client(cfg config.Config) *s3.Client {
func enableIndexingForAllFields(mappingPath string) bool {
return mappingPath == "auto"
}

func docFilterParamsFromCfg(in []config.Filter) []docsfilter.Params {
out := make([]docsfilter.Params, 0, len(in))
for _, f := range in {
out = append(out, docsfilter.Params{
Query: f.Query,
From: f.From.UnixNano(),
To: f.To.UnixNano(),
})
}
return out
}
7 changes: 7 additions & 0 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Active struct {

writer *ActiveWriter
indexer *ActiveIndexer

docsFilter DocsFilter
}

const (
Expand All @@ -78,6 +80,7 @@ func NewActive(
docsCache *cache.Cache[[]byte],
sortCache *cache.Cache[[]byte],
cfg *Config,
docsFilter DocsFilter,
) *Active {
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync)
Expand Down Expand Up @@ -105,6 +108,8 @@ func NewActive(
BaseFileName: baseFileName,
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
Config: cfg,

docsFilter: docsFilter,
}

// use of 0 as keys in maps is prohibited – it's system key, so add first element
Expand Down Expand Up @@ -315,6 +320,8 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
docsPositions: f.DocsPositions,
idsToLids: f.IDsToLIDs,
docsReader: &f.docsReader,

docsFilter: f.docsFilter,
}
}

Expand Down
78 changes: 76 additions & 2 deletions frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package frac

import (
"context"
"math"
"slices"

"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
Expand Down Expand Up @@ -29,6 +31,8 @@ type activeDataProvider struct {
docsReader *storage.DocsReader

idsIndex *activeIDsIndex

docsFilter DocsFilter
}

func (dp *activeDataProvider) release() {
Expand Down Expand Up @@ -77,7 +81,10 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
indexes := []activeFetchIndex{{
blocksOffsets: dp.blocksOffsets,
docsPositions: dp.docsPositions,
idsToLids: dp.idsToLids,
docsReader: dp.docsReader,
docsFilter: dp.docsFilter,
fracName: dp.info.Name(),
}}

for _, fi := range indexes {
Expand Down Expand Up @@ -117,6 +124,8 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e
indexes := []activeSearchIndex{{
activeIDsIndex: dp.getIDsIndex(),
activeTokenIndex: dp.getTokenIndex(),
docsFilter: dp.docsFilter,
fracName: dp.info.Name(),
}}
m.Stop()

Expand Down Expand Up @@ -178,6 +187,35 @@ func (p *activeIDsIndex) LessOrEqual(lid seq.LID, id seq.ID) bool {
type activeSearchIndex struct {
*activeIDsIndex
*activeTokenIndex
docsFilter DocsFilter
fracName string
}

func (si *activeSearchIndex) GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) {
// active fraction doesn't meet min and max lid
minLID, maxLID = uint32(0), uint32(math.MaxUint32)

iterator, err := si.docsFilter.GetTombstonesIteratorByFrac(si.fracName, minLID, maxLID, reverse)
if err != nil {
return nil, err
}

res := make([]uint32, 0)
for {
// traverse iterator to inverse and sort lids
lid, has := iterator.Next()
if !has {
break
}
if inversed, ok := si.activeIDsIndex.inverser.Inverse(lid); ok {
res = append(res, uint32(inversed))
}
}

// we need to sort inversed values since they may be out of order after replay of active fraction
slices.Sort(res)

return node.NewStatic(res, reverse), nil
}

type activeTokenIndex struct {
Expand Down Expand Up @@ -223,19 +261,55 @@ func inverseLIDs(unmapped []uint32, inv *inverser, minLID, maxLID uint32) []uint
type activeFetchIndex struct {
blocksOffsets []uint64
docsPositions *DocsPositions
idsToLids *ActiveLIDs
docsReader *storage.DocsReader
docsFilter DocsFilter
fracName string
}

func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 {
return di.blocksOffsets[num]
}

func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
allLids := make([]uint32, len(ids))
for i, id := range ids {
if lid, ok := di.idsToLids.Get(id); ok {
allLids[i] = uint32(lid)
}
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
tombstonesIterator, err := di.docsFilter.GetTombstonesIteratorByFrac(di.fracName, minLID, maxLID, false)
if err != nil {
return nil, err
}

filteredLIDs := make(map[uint32]struct{})
for {
lid, has := tombstonesIterator.Next()
if !has {
break
}
filteredLIDs[lid] = struct{}{}
}

docsPos := make([]seq.DocPos, len(ids))
for i, id := range ids {
docsPos[i] = di.docsPositions.GetSync(id)
}
return docsPos

if len(filteredLIDs) == 0 {
return docsPos, nil
}

for i, lid := range allLids {
if _, ok := filteredLIDs[lid]; ok {
docsPos[i] = seq.DocPosNotFound
}
}

return docsPos, nil
}

func (di *activeFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions frac/active_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func BenchmarkIndexer(b *testing.B) {
cache.NewCache[[]byte](nil, nil),
cache.NewCache[[]byte](nil, nil),
&Config{},
testDocsFilter{},
)

processor := getTestProcessor()
Expand Down
2 changes: 2 additions & 0 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
cache.NewCache[[]byte](nil, nil),
cache.NewCache[[]byte](nil, nil),
&Config{},
testDocsFilter{},
)

mapping := seq.Mapping{
Expand Down Expand Up @@ -356,6 +357,7 @@ func seal(active *Active) (*Sealed, error) {
indexCache,
cache.NewCache[[]byte](nil, nil),
&Config{},
testDocsFilter{},
)
active.Release()
return sealed, nil
Expand Down
25 changes: 22 additions & 3 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,24 @@ import (
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
"github.com/ozontech/seq-db/tokenizer"
)

type testDocsFilter struct{}

func (testDocsFilter) GetFilteredLIDsByFrac(_ string) ([]uint32, error) {
return []uint32{}, nil
}
func (testDocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) {
return node.NewStatic([]uint32{}, false), nil
}
func (testDocsFilter) RemoveFrac(_ string) {}

type FractionTestSuite struct {
suite.Suite
tmpDir string
Expand Down Expand Up @@ -1842,6 +1853,7 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active {
cache.NewCache[[]byte](nil, nil),
cache.NewCache[[]byte](nil, nil),
s.config,
testDocsFilter{},
)

var wg sync.WaitGroup
Expand Down Expand Up @@ -1904,6 +1916,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {
indexCache,
cache.NewCache[[]byte](nil, nil),
s.config,
testDocsFilter{},
)
active.Release()
return sealed
Expand Down Expand Up @@ -1980,7 +1993,9 @@ func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction {
storage.NewReadLimiter(1, nil),
cache.NewCache[[]byte](nil, nil),
cache.NewCache[[]byte](nil, nil),
&Config{})
&Config{},
testDocsFilter{},
)
err := replayedFrac.Replay(context.Background())
s.Require().NoError(err, "replay failed")
return replayedFrac
Expand Down Expand Up @@ -2091,7 +2106,9 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
indexCache,
cache.NewCache[[]byte](nil, nil),
nil,
s.config)
s.config,
testDocsFilter{},
)
s.fraction = sealed
return sealed
}
Expand Down Expand Up @@ -2160,7 +2177,9 @@ func (s *RemoteFractionTestSuite) SetupTest() {
cache.NewCache[[]byte](nil, nil),
sealed.info,
s.config,
s3cli)
s3cli,
testDocsFilter{},
)
s.fraction = remoteFrac
}
}
Expand Down
5 changes: 5 additions & 0 deletions frac/processor/eval_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func evalLeaf(
return node.BuildORTree(lidsTids, order.IsReverse()), nil
}

func evalTombstones(root, tombstonesIterator node.Node, reverse bool, stats *searchStats) node.Node {
stats.NodesTotal++
return node.NewNAnd(tombstonesIterator, root, reverse)
}

type Aggregator interface {
// Next iterates to count the next lid.
Next(lid uint32) error
Expand Down
7 changes: 5 additions & 2 deletions frac/processor/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (

type fetchIndex interface {
GetBlocksOffsets(uint32) uint64
GetDocPos([]seq.ID) []seq.DocPos
GetDocPos([]seq.ID) ([]seq.DocPos, error)
ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error)
}

func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
m := sw.Start("get_docs_pos")
docsPos := fetchIndex.GetDocPos(ids)
docsPos, err := fetchIndex.GetDocPos(ids)
if err != nil {
return err
}
blocks, offsets, index := seq.GroupDocsOffsets(docsPos)
m.Stop()

Expand Down
12 changes: 12 additions & 0 deletions frac/processor/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type tokenIndex interface {
type searchIndex interface {
tokenIndex
idsIndex
GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error)
}

func IndexSearch(
Expand Down Expand Up @@ -93,6 +94,17 @@ func IndexSearch(
}
}

m = sw.Start("get_tombstones")
tombstones, err := index.GetTombstones(minLID, maxLID, params.Order.IsReverse())
m.Stop()
if err != nil {
return nil, err
}

m = sw.Start("eval_tombstones")
evalTree = evalTombstones(evalTree, tombstones, params.Order.IsReverse(), stats)
m.Stop()

m = sw.Start("iterate_eval_tree")
total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw)
m.Stop()
Expand Down
Loading