Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7af2663
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
7435fd7
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
de8620d
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
4eb9132
feat: remove imitation PoC - write path
forshev Dec 12, 2025
88530f4
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
22aba3f
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
895953a
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
cc77bfe
feat(docsfilter): method to read filtered lids
forshev Dec 19, 2025
cc5e912
feat(docsfilter): filter ids on search
forshev Dec 22, 2025
8f9e34d
feat(docsfilter): filter ids on fetch for sealed fracs
forshev Dec 26, 2025
85e3015
feat(docsfilter): support filtering on active fraction
forshev Jan 16, 2026
e0c4862
feat(docsfilter): refresh frac tobstones after sealing
forshev Jan 20, 2026
7825068
fix: fix rebase
forshev Jan 20, 2026
d867189
fix: fix tests
forshev Jan 20, 2026
f430eaa
chore(docsfilter): organize tombstone files in blocks
forshev Jan 23, 2026
d22b3e8
chore(docsfilter): compress lids blocks in tombstone files
forshev Jan 23, 2026
a7e9c88
chore(docsfilter): handle errors
forshev Jan 23, 2026
5af8cc7
chore(docsfilter): add iterator to lazy load tombstones
forshev Feb 4, 2026
096b667
chore(docsfilter): merge multiple iterators into one
forshev Feb 5, 2026
370432d
chore(docsfilter): use iterators in search and fetch
forshev Feb 5, 2026
eb1098f
chore(docsfilter): remove tombstones when frac is evicted
forshev Feb 6, 2026
1dc6f96
refactor(docsfilter): move common parts from iterators
forshev Feb 6, 2026
1ea773e
chore(docsfilter): write metrics
forshev Feb 6, 2026
051b212
chore(docsfilter): cache tombstones files' headers
forshev Feb 10, 2026
a06c933
fix(docs filter): skip missing fracs
forshev Feb 18, 2026
4341466
fix(docs filter): open tombstone files only when necessary
forshev Feb 20, 2026
001a952
fix(docs filter): remove frac under lock
forshev Feb 20, 2026
91c657e
chore(docsfilter): tombstone headers cache cleanup
forshev Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 9 additions & 71 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo)
panic(err)
}
fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo)
mustWriteFileAtomic(fpath, b)
util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile)
info.infoSize.Store(int64(len(b)))
as.requests[id] = info
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err

name := getQPRFilename(info.Request.ID, f.Info().Name())
fpath := path.Join(as.config.DataDir, name)
mustWriteFileAtomic(fpath, rawQPR)
util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile)

info.qprsSize.Add(int64(len(rawQPR)))
return nil
Expand Down Expand Up @@ -465,7 +465,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) {
files = append(files, path.Join(as.config.DataDir, name))
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil {
return nil, err
}
return files, nil
Expand All @@ -490,7 +490,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
areQPRsMerged[requestID] = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil {
return nil, err
}

Expand All @@ -510,7 +510,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
anyRemove = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil {
return nil, err
}

Expand All @@ -522,11 +522,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
anyRemove = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil {
return nil, err
}
if anyRemove {
mustFsyncFile(dataDir)
util.MustFsyncFile(dataDir)
}

qprsDuByID := make(map[string]int)
Expand Down Expand Up @@ -592,7 +592,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
requests[requestID] = info
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil {
return nil, err
}
return requests, nil
Expand Down Expand Up @@ -807,7 +807,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) {
storeMQPR := func(compressed []byte) error {
sizeAfter = len(compressed)
mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR)
mustWriteFileAtomic(mqprPath, compressed)
util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile)
return nil
}
if err := compressQPR(&qpr, storeMQPR); err != nil {
Expand Down Expand Up @@ -1026,65 +1026,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []*

return items
}

func mustWriteFileAtomic(fpath string, data []byte) {
fpathTmp := fpath + asyncSearchTmpFile

f, err := os.Create(fpathTmp)
if err != nil {
logger.Fatal("can't create file", zap.Error(err))
}
defer func() {
if err := f.Close(); err != nil {
logger.Fatal("can't close file", zap.Error(err))
}
}()

if _, err := f.Write(data); err != nil {
logger.Fatal("can't write to file", zap.Error(err))
}

if err := f.Sync(); err != nil {
logger.Fatal("can't sync file", zap.Error(err))
}

if err := os.Rename(fpathTmp, fpath); err != nil {
logger.Fatal("can't rename file", zap.Error(err))
}

absFpath, err := filepath.Abs(fpath)
if err != nil {
logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err))
}
dir := path.Dir(absFpath)
mustFsyncFile(dir)
}

func mustFsyncFile(fpath string) {
dirFile, err := os.Open(fpath)
if err != nil {
logger.Fatal("can't open dir", zap.Error(err))
}
if err := dirFile.Sync(); err != nil {
logger.Fatal("can't sync dir", zap.Error(err))
}
if err := dirFile.Close(); err != nil {
logger.Fatal("can't close dir", zap.Error(err))
}
}

func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error {
for _, de := range des {
if de.IsDir() {
continue
}
name := de.Name()
if path.Ext(name) != ext {
continue
}
if err := cb(name); err != nil {
return err
}
}
return nil
}
20 changes: 19 additions & 1 deletion cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ozontech/seq-db/buildinfo"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/docsfilter"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/fracmanager"
Expand Down Expand Up @@ -312,10 +313,15 @@ func startStore(
From: cfg.Filtering.From,
},
},
Filters: docsfilter.Config{
DataDir: cfg.DocsFilter.DataDir,
Workers: cfg.DocsFilter.Concurrency,
CacheSizeLimit: uint64(cfg.DocsFilter.CacheSize),
},
}

s3cli := initS3Client(cfg)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, docFilterParamsFromCfg(cfg.DocsFilter.Filters))
if err != nil {
logger.Fatal("initializing store", zap.Error(err))
}
Expand Down Expand Up @@ -358,3 +364,15 @@ func initS3Client(cfg config.Config) *s3.Client {
func enableIndexingForAllFields(mappingPath string) bool {
return mappingPath == "auto"
}

func docFilterParamsFromCfg(in []config.Filter) []docsfilter.Params {
out := make([]docsfilter.Params, 0, len(in))
for _, f := range in {
out = append(out, docsfilter.Params{
Query: f.Query,
From: f.From.UnixNano(),
To: f.To.UnixNano(),
})
}
return out
}
20 changes: 13 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,13 @@ type Config struct {
} `config:"tracing"`

// Additional filtering options
Filtering struct {
// If a search query time range overlaps with the [from; to] range
// the search query will be `AND`-ed with an additional predicate with the provided query expression
Query string `config:"query"`
From time.Time `config:"from"`
To time.Time `config:"to"`
} `config:"filtering"`
Filtering Filter `config:"filtering"`
DocsFilter struct {
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
Filters []Filter `config:"filters"`
CacheSize Bytes `config:"cache_size" default:"100MiB"`
} `config:"docs_filter"`

// Experimental provides flags
// For configuring experimental features.
Expand All @@ -294,3 +294,9 @@ func (b *Bytes) UnmarshalString(s string) error {
*b = Bytes(bytes)
return nil
}

type Filter struct {
Query string `config:"query"`
From time.Time `config:"from"`
To time.Time `config:"to"`
}
Loading
Loading