From 9eadbb5f996b80ef33481c55ae993d88efb47eb0 Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Thu, 5 Mar 2026 17:39:28 +0500 Subject: [PATCH] feat: use DocsFilter --- cmd/seq-db/seq-db.go | 20 ++++++- frac/active.go | 7 +++ frac/active_index.go | 78 ++++++++++++++++++++++++++- frac/active_indexer_test.go | 1 + frac/fraction_concurrency_test.go | 2 + frac/fraction_test.go | 25 +++++++-- frac/processor/eval_tree.go | 5 ++ frac/processor/fetch.go | 7 ++- frac/processor/search.go | 12 +++++ frac/remote.go | 8 +++ frac/sealed.go | 12 +++++ frac/sealed_index.go | 58 +++++++++++++++++++- fracmanager/fracmanager.go | 4 +- fracmanager/fracmanager_test.go | 14 ++++- fracmanager/fraction_provider.go | 18 ++++++- fracmanager/fraction_provider_test.go | 4 +- storeapi/grpc_v1_test.go | 11 ++-- storeapi/store.go | 19 ++++++- tests/setup/env.go | 3 +- 19 files changed, 285 insertions(+), 23 deletions(-) diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 3d7bd370..c740e09d 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -21,6 +21,7 @@ import ( "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" @@ -312,10 +313,15 @@ func startStore( From: cfg.Filtering.From, }, }, + Filters: docsfilter.Config{ + DataDir: cfg.DocsFilter.DataDir, + Workers: cfg.DocsFilter.Concurrency, + CacheSizeLimit: uint64(cfg.DocsFilter.CacheSize), + }, } s3cli := initS3Client(cfg) - store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp) + store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, docFilterParamsFromCfg(cfg.DocsFilter.Filters)) if err != nil { logger.Fatal("initializing store", zap.Error(err)) } @@ -358,3 +364,15 @@ func initS3Client(cfg config.Config) *s3.Client { func enableIndexingForAllFields(mappingPath string) bool { return mappingPath == "auto" } + +func docFilterParamsFromCfg(in []config.Filter) []docsfilter.Params { + out := make([]docsfilter.Params, 0, len(in)) + for _, f := range in { + out = append(out, docsfilter.Params{ + Query: f.Query, + From: f.From.UnixNano(), + To: f.To.UnixNano(), + }) + } + return out +} diff --git a/frac/active.go b/frac/active.go index c4e22196..a080c945 100644 --- a/frac/active.go +++ b/frac/active.go @@ -59,6 +59,8 @@ type Active struct { writer *ActiveWriter indexer *ActiveIndexer + + docsFilter DocsFilter } const ( @@ -78,6 +80,7 @@ func NewActive( docsCache *cache.Cache[[]byte], sortCache *cache.Cache[[]byte], cfg *Config, + docsFilter DocsFilter, ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync) @@ -105,6 +108,8 @@ func NewActive( BaseFileName: baseFileName, info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), Config: cfg, + + docsFilter: docsFilter, } // use of 0 as keys in maps is prohibited – it's system key, so add first element @@ -315,6 +320,8 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { docsPositions: f.DocsPositions, idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, + + docsFilter: f.docsFilter, } } diff --git a/frac/active_index.go b/frac/active_index.go index 71831c26..a131c49b 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -2,6 +2,8 @@ package frac import ( "context" + "math" + "slices" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" @@ -29,6 +31,8 @@ type activeDataProvider struct { docsReader *storage.DocsReader idsIndex *activeIDsIndex + + docsFilter DocsFilter } func (dp *activeDataProvider) release() { @@ -77,7 +81,10 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) { indexes := []activeFetchIndex{{ blocksOffsets: dp.blocksOffsets, docsPositions: dp.docsPositions, + idsToLids: dp.idsToLids, docsReader: dp.docsReader, + docsFilter: dp.docsFilter, + fracName: dp.info.Name(), }} for _, fi := range indexes { @@ -117,6 +124,8 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e indexes := []activeSearchIndex{{ activeIDsIndex: dp.getIDsIndex(), activeTokenIndex: dp.getTokenIndex(), + docsFilter: dp.docsFilter, + fracName: dp.info.Name(), }} m.Stop() @@ -178,6 +187,35 @@ func (p *activeIDsIndex) LessOrEqual(lid seq.LID, id seq.ID) bool { type activeSearchIndex struct { *activeIDsIndex *activeTokenIndex + docsFilter DocsFilter + fracName string +} + +func (si *activeSearchIndex) GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) { + // active fraction doesn't meet min and max lid + minLID, maxLID = uint32(0), uint32(math.MaxUint32) + + iterator, err := si.docsFilter.GetTombstonesIteratorByFrac(si.fracName, minLID, maxLID, reverse) + if err != nil { + return nil, err + } + + res := make([]uint32, 0) + for { + // traverse iterator to inverse and sort lids + lid, has := iterator.Next() + if !has { + break + } + if inversed, ok := si.activeIDsIndex.inverser.Inverse(lid); ok { + res = append(res, uint32(inversed)) + } + } + + // we need to sort inversed values since they may be out of order after replay of active fraction + slices.Sort(res) + + return node.NewStatic(res, reverse), nil } type activeTokenIndex struct { @@ -223,19 +261,55 @@ func inverseLIDs(unmapped []uint32, inv *inverser, minLID, maxLID uint32) []uint type activeFetchIndex struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader + docsFilter DocsFilter + fracName string } func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 { return di.blocksOffsets[num] } -func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { +func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { + allLids := make([]uint32, len(ids)) + for i, id := range ids { + if lid, ok := di.idsToLids.Get(id); ok { + allLids[i] = uint32(lid) + } + } + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + tombstonesIterator, err := di.docsFilter.GetTombstonesIteratorByFrac(di.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + filteredLIDs := make(map[uint32]struct{}) + for { + lid, has := tombstonesIterator.Next() + if !has { + break + } + filteredLIDs[lid] = struct{}{} + } + docsPos := make([]seq.DocPos, len(ids)) for i, id := range ids { docsPos[i] = di.docsPositions.GetSync(id) } - return docsPos + + if len(filteredLIDs) == 0 { + return docsPos, nil + } + + for i, lid := range allLids { + if _, ok := filteredLIDs[lid]; ok { + docsPos[i] = seq.DocPosNotFound + } + } + + return docsPos, nil } func (di *activeFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index 17fb5230..9a986d26 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -90,6 +90,7 @@ func BenchmarkIndexer(b *testing.B) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) processor := getTestProcessor() diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 38f02634..82fdd48e 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -52,6 +52,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) mapping := seq.Mapping{ @@ -356,6 +357,7 @@ func seal(active *Active) (*Sealed, error) { indexCache, cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) active.Release() return sealed, nil diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8d7a782f..1cdb4c1f 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -27,6 +27,7 @@ import ( "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" @@ -34,6 +35,16 @@ import ( "github.com/ozontech/seq-db/tokenizer" ) +type testDocsFilter struct{} + +func (testDocsFilter) GetFilteredLIDsByFrac(_ string) ([]uint32, error) { + return []uint32{}, nil +} +func (testDocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + return node.NewStatic([]uint32{}, false), nil +} +func (testDocsFilter) RemoveFrac(_ string) {} + type FractionTestSuite struct { suite.Suite tmpDir string @@ -1842,6 +1853,7 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), s.config, + testDocsFilter{}, ) var wg sync.WaitGroup @@ -1904,6 +1916,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { indexCache, cache.NewCache[[]byte](nil, nil), s.config, + testDocsFilter{}, ) active.Release() return sealed @@ -1980,7 +1993,9 @@ func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction { storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}) + &Config{}, + testDocsFilter{}, + ) err := replayedFrac.Replay(context.Background()) s.Require().NoError(err, "replay failed") return replayedFrac @@ -2091,7 +2106,9 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal indexCache, cache.NewCache[[]byte](nil, nil), nil, - s.config) + s.config, + testDocsFilter{}, + ) s.fraction = sealed return sealed } @@ -2160,7 +2177,9 @@ func (s *RemoteFractionTestSuite) SetupTest() { cache.NewCache[[]byte](nil, nil), sealed.info, s.config, - s3cli) + s3cli, + testDocsFilter{}, + ) s.fraction = remoteFrac } } diff --git a/frac/processor/eval_tree.go b/frac/processor/eval_tree.go index 7ba058e9..82dcce38 100644 --- a/frac/processor/eval_tree.go +++ b/frac/processor/eval_tree.go @@ -79,6 +79,11 @@ func evalLeaf( return node.BuildORTree(lidsTids, order.IsReverse()), nil } +func evalTombstones(root, tombstonesIterator node.Node, reverse bool, stats *searchStats) node.Node { + stats.NodesTotal++ + return node.NewNAnd(tombstonesIterator, root, reverse) +} + type Aggregator interface { // Next iterates to count the next lid. Next(lid uint32) error diff --git a/frac/processor/fetch.go b/frac/processor/fetch.go index 41d46152..e2267780 100644 --- a/frac/processor/fetch.go +++ b/frac/processor/fetch.go @@ -7,13 +7,16 @@ import ( type fetchIndex interface { GetBlocksOffsets(uint32) uint64 - GetDocPos([]seq.ID) []seq.DocPos + GetDocPos([]seq.ID) ([]seq.DocPos, error) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) } func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error { m := sw.Start("get_docs_pos") - docsPos := fetchIndex.GetDocPos(ids) + docsPos, err := fetchIndex.GetDocPos(ids) + if err != nil { + return err + } blocks, offsets, index := seq.GroupDocsOffsets(docsPos) m.Stop() diff --git a/frac/processor/search.go b/frac/processor/search.go index e7d8885b..4e9ebbaf 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -38,6 +38,7 @@ type tokenIndex interface { type searchIndex interface { tokenIndex idsIndex + GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) } func IndexSearch( @@ -93,6 +94,17 @@ func IndexSearch( } } + m = sw.Start("get_tombstones") + tombstones, err := index.GetTombstones(minLID, maxLID, params.Order.IsReverse()) + m.Stop() + if err != nil { + return nil, err + } + + m = sw.Start("eval_tombstones") + evalTree = evalTombstones(evalTree, tombstones, params.Order.IsReverse(), stats) + m.Stop() + m = sw.Start("iterate_eval_tree") total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw) m.Stop() diff --git a/frac/remote.go b/frac/remote.go index e15aa73f..6f9cdb62 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -55,6 +55,8 @@ type Remote struct { s3cli *s3.Client readLimiter *storage.ReadLimiter + + docsFilter DocsFilter } func NewRemote( @@ -66,6 +68,7 @@ func NewRemote( info *common.Info, config *Config, s3cli *s3.Client, + docsFilter DocsFilter, ) *Remote { f := &Remote{ ctx: ctx, @@ -81,6 +84,8 @@ func NewRemote( Config: config, s3cli: s3cli, + + docsFilter: docsFilter, } // Fast path if fraction-info cache exists AND it has valid index size. @@ -170,6 +175,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + docsFilter: f.docsFilter, }, nil } @@ -201,6 +207,8 @@ func (f *Remote) Suicide() { zap.Error(err), ) } + + go f.docsFilter.RemoveFrac(f.info.Name()) } func (f *Remote) String() string { diff --git a/frac/sealed.go b/frac/sealed.go index ddae8ccf..8161e061 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -51,6 +51,8 @@ type Sealed struct { // shit for testing PartialSuicideMode PSD + + docsFilter DocsFilter } type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests @@ -68,6 +70,7 @@ func NewSealed( docsCache *cache.Cache[[]byte], info *common.Info, config *Config, + docsFilter DocsFilter, ) *Sealed { f := &Sealed{ loadMu: &sync.RWMutex{}, @@ -81,6 +84,8 @@ func NewSealed( Config: config, PartialSuicideMode: Off, + + docsFilter: docsFilter, } // fast path if fraction-info cache exists AND it has valid index size @@ -130,6 +135,7 @@ func NewSealedPreloaded( indexCache *IndexCache, docsCache *cache.Cache[[]byte], config *Config, + docsFilter DocsFilter, ) *Sealed { f := &Sealed{ blocksData: preloaded.BlocksData, @@ -144,6 +150,8 @@ func NewSealedPreloaded( info: preloaded.Info, BaseFileName: baseFile, Config: config, + + docsFilter: docsFilter, } // put the token table built during sealing into the cache of the sealed fraction @@ -292,6 +300,8 @@ func (f *Sealed) Suicide() { zap.Error(err), ) } + + go f.docsFilter.RemoveFrac(f.info.Name()) } func (f *Sealed) String() string { @@ -343,6 +353,8 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + + docsFilter: f.docsFilter, } } diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 3899124a..53b8e7f7 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -22,6 +22,11 @@ import ( "github.com/ozontech/seq-db/util" ) +type DocsFilter interface { + GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) + RemoveFrac(fracName string) +} + type sealedDataProvider struct { ctx context.Context info *common.Info @@ -42,6 +47,8 @@ type sealedDataProvider struct { // fractionTypeLabel can be either 'sealed' or 'remote'. // This value is used in metrics to distinguish between operations over local and remote fractions. fractionTypeLabel string + + docsFilter DocsFilter } func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { @@ -54,9 +61,11 @@ func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { func (dp *sealedDataProvider) getFetchIndex() *sealedFetchIndex { return &sealedFetchIndex{ + fracName: dp.info.Name(), idsIndex: dp.getIDsIndex(), docsReader: dp.docsReader, blocksOffsets: dp.blocksOffsets, + docsFilter: dp.docsFilter, } } @@ -74,6 +83,7 @@ func (dp *sealedDataProvider) getSearchIndex() *sealedSearchIndex { return &sealedSearchIndex{ sealedIDsIndex: dp.getIDsIndex(), sealedTokenIndex: dp.getTokenIndex(), + docsFilter: dp.docsFilter, } } @@ -259,17 +269,56 @@ func (ti *sealedTokenIndex) GetLIDsFromTIDs(tids []uint32, stats lids.Counter, m } type sealedFetchIndex struct { + fracName string idsIndex *sealedIDsIndex docsReader *storage.DocsReader blocksOffsets []uint64 + docsFilter DocsFilter } func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 { return fi.blocksOffsets[num] } -func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { - return fi.getDocPosByLIDs(fi.findLIDs(ids)) +func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { + allLids := fi.findLIDs(ids) + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + if len(allLids) > 0 { + // allLids can be not sorted + minVal, maxVal := allLids[0], allLids[0] + for i := 1; i < len(allLids); i++ { + minVal = min(minVal, allLids[i]) + maxVal = max(maxVal, allLids[i]) + } + minLID, maxLID = uint32(minVal), uint32(maxVal) + } + + tombstonesIterator, err := fi.docsFilter.GetTombstonesIteratorByFrac(fi.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + filteredLIDs := make(map[uint32]struct{}) + for { + lid, has := tombstonesIterator.Next() + if !has { + break + } + filteredLIDs[lid] = struct{}{} + } + + if len(filteredLIDs) == 0 { + return fi.getDocPosByLIDs(allLids), nil + } + + for i, lid := range allLids { + if _, ok := filteredLIDs[uint32(lid)]; ok { + allLids[i] = 0 + } + } + + return fi.getDocPosByLIDs(allLids), nil } func (fi *sealedFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { @@ -324,4 +373,9 @@ func (fi *sealedFetchIndex) getDocPosByLIDs(localIDs []seq.LID) []seq.DocPos { type sealedSearchIndex struct { *sealedIDsIndex *sealedTokenIndex + docsFilter DocsFilter +} + +func (si *sealedSearchIndex) GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) { + return si.docsFilter.GetTombstonesIteratorByFrac(si.fracName, minLID, maxLID, reverse) } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..46d4da1e 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -35,13 +35,13 @@ var defaultStorageState = StorageState{ // - stats updating // // Returns the manager instance and a stop function to gracefully shutdown -func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func(), error) { +func New(ctx context.Context, cfg *Config, s3cli *s3.Client, docsFilter DocsFilter) (*FracManager, func(), error) { FillConfigWithDefault(cfg) readLimiter := storage.NewReadLimiter(config.ReaderWorkers, storeBytesRead) idx, stopIdx := frac.NewActiveIndexer(config.IndexWorkers, config.IndexWorkers) cache := NewCacheMaintainer(cfg.CacheSize, cfg.SortCacheSize, newDefaultCacheMetrics()) - provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx) + provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx, docsFilter) infoCache := NewFracInfoCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)) // Load existing fractions into registry diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 89437904..a2c2b8ca 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -8,9 +8,21 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/seq" ) +type testDocsFilter struct{} + +func (testDocsFilter) GetFilteredLIDsByFrac(_ string) ([]uint32, error) { + return nil, nil +} +func (testDocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + return node.NewStatic([]uint32{}, reverse), nil +} +func (testDocsFilter) RefreshFrac(_ frac.Fraction) {} +func (testDocsFilter) RemoveFrac(_ string) {} + func setupDataDir(t testing.TB, cfg *Config) *Config { if cfg == nil { cfg = &Config{ @@ -25,7 +37,7 @@ func setupDataDir(t testing.TB, cfg *Config) *Config { func setupFracManager(t testing.TB, cfg *Config) (*Config, *FracManager, func()) { cfg = setupDataDir(t, cfg) - fm, stop, err := New(t.Context(), cfg, nil) + fm, stop, err := New(t.Context(), cfg, nil, testDocsFilter{}) assert.NoError(t, err) return cfg, fm, stop } diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index e2915598..f970912f 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -13,12 +13,19 @@ import ( "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) const fileBasePattern = "seq-db-" +type DocsFilter interface { + GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) + RefreshFrac(frac frac.Fraction) + RemoveFrac(fracName string) +} + // fractionProvider is a factory for creating different types of fractions // Contains all necessary dependencies for creating and managing fractions type fractionProvider struct { @@ -28,11 +35,13 @@ type fractionProvider struct { activeIndexer *frac.ActiveIndexer // Indexer for active fractions readLimiter *storage.ReadLimiter // Read rate limiter ulidEntropy io.Reader // Entropy source for ULID generation + docsFilter DocsFilter } func newFractionProvider( cfg *Config, s3cli *s3.Client, cp *CacheMaintainer, readLimiter *storage.ReadLimiter, indexer *frac.ActiveIndexer, + docsFilter DocsFilter, ) *fractionProvider { return &fractionProvider{ s3cli: s3cli, @@ -41,6 +50,7 @@ func newFractionProvider( activeIndexer: indexer, readLimiter: readLimiter, ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0), + docsFilter: docsFilter, } } @@ -52,6 +62,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { fp.cacheProvider.CreateDocBlockCache(), fp.cacheProvider.CreateSortDocsCache(), &fp.config.Fraction, + fp.docsFilter, ) } @@ -63,6 +74,7 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra fp.cacheProvider.CreateDocBlockCache(), cachedInfo, // Preloaded meta information &fp.config.Fraction, + fp.docsFilter, ) } @@ -74,6 +86,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *seale fp.cacheProvider.CreateIndexCache(), fp.cacheProvider.CreateDocBlockCache(), &fp.config.Fraction, + fp.docsFilter, ) } @@ -87,6 +100,7 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn cachedInfo, &fp.config.Fraction, fp.s3cli, + fp.docsFilter, ) } @@ -117,7 +131,9 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { return nil, err } - return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil + sealedFrac := fp.NewSealedPreloaded(active.BaseFileName, preloaded) + fp.docsFilter.RefreshFrac(sealedFrac) + return sealedFrac, nil } // Offload uploads fraction to S3 storage and returns a remote fraction diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index f315b615..4bbc1951 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -38,7 +38,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() s3cli, stopS3 := setupS3Client(t) idx, stopIdx := frac.NewActiveIndexer(1, 1) cache := NewCacheMaintainer(uint64(units.MB), uint64(units.MB), nil) - provider := newFractionProvider(cfg, s3cli, cache, rl, idx) + provider := newFractionProvider(cfg, s3cli, cache, rl, idx, testDocsFilter{}) return provider, func() { stopIdx() stopS3() @@ -46,7 +46,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() } func TestFractionID(t *testing.T) { - fp := newFractionProvider(nil, nil, nil, nil, nil) + fp := newFractionProvider(nil, nil, nil, nil, nil, nil) ulid1 := fp.nextFractionID() ulid2 := fp.nextFractionID() assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different") diff --git a/storeapi/grpc_v1_test.go b/storeapi/grpc_v1_test.go index ced60053..9b4a5453 100644 --- a/storeapi/grpc_v1_test.go +++ b/storeapi/grpc_v1_test.go @@ -12,6 +12,7 @@ import ( "github.com/ozontech/seq-db/asyncsearcher" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/mappingprovider" @@ -67,11 +68,16 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { dataDir := common.GetTestTmpDir(t) common.RecreateDir(dataDir) + mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) + assert.NoError(t, err) + + df := docsfilter.New(t.Context(), docsfilter.Config{}, nil, mappingProvider) + fm, stop, err := fracmanager.New(t.Context(), &fracmanager.Config{ FracSize: 500, TotalSize: 5000, DataDir: dataDir, - }, nil) + }, nil, df) assert.NoError(t, err) config := APIConfig{ @@ -92,9 +98,6 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { }, } - mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) - assert.NoError(t, err) - g := NewGrpcV1(config, fm, mappingProvider) release := func() { diff --git a/storeapi/store.go b/storeapi/store.go index 857be4e2..c41cdf21 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -9,6 +9,7 @@ import ( "go.uber.org/atomic" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" @@ -35,6 +36,7 @@ type Store struct { type StoreConfig struct { API APIConfig FracManager fracmanager.Config + Filters docsfilter.Config } func (c *StoreConfig) setDefaults() error { @@ -44,19 +46,32 @@ func (c *StoreConfig) setDefaults() error { if c.API.Search.Async.DataDir == "" { c.API.Search.Async.DataDir = path.Join(c.FracManager.DataDir, "async_searches") } + if c.Filters.DataDir == "" { + c.Filters.DataDir = path.Join(c.FracManager.DataDir, "filters") + } return nil } -func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider) (*Store, error) { +func NewStore( + ctx context.Context, + c StoreConfig, + s3cli *s3.Client, + mappingProvider MappingProvider, + docFilterParams []docsfilter.Params, +) (*Store, error) { if err := c.setDefaults(); err != nil { return nil, err } - fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli) + df := docsfilter.New(ctx, c.Filters, docFilterParams, mappingProvider) + + fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli, df) if err != nil { return nil, fmt.Errorf("loading fractions error: %w", err) } + df.Start(fracManager.Fractions()) + return &Store{ Config: c, // We will set grpcAddr later in Start() diff --git a/tests/setup/env.go b/tests/setup/env.go index cafbbc02..15072152 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -22,6 +22,7 @@ import ( "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" @@ -275,7 +276,7 @@ func (cfg *TestingEnvConfig) MakeStores( logger.Fatal("can't create mapping", zap.Error(err)) } - store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider) + store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider, []docsfilter.Params{}) if err != nil { panic(err) }