From 0506ad3b84cf6ef7414879566ce71fdda9a9397b Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 6 Mar 2026 15:17:34 +0500 Subject: [PATCH] feat: add DocsFilter --- asyncsearcher/async_searcher.go | 80 +------ config/config.go | 20 +- docsfilter/docs_filter.go | 413 ++++++++++++++++++++++++++++++++ docsfilter/encoding.go | 285 ++++++++++++++++++++++ docsfilter/encoding_test.go | 42 ++++ docsfilter/filter.go | 64 +++++ docsfilter/metrics.go | 26 ++ frac/active.go | 14 ++ frac/active_index.go | 11 + frac/active_indexer.go | 4 + frac/active_lids_map.go | 37 +++ frac/fraction.go | 1 + frac/remote.go | 10 + frac/sealed.go | 7 + frac/sealed_index.go | 4 + fracmanager/proxy_frac.go | 8 + fracmanager/searcher_test.go | 4 + util/fs.go | 90 ++++++- 18 files changed, 1034 insertions(+), 86 deletions(-) create mode 100644 docsfilter/docs_filter.go create mode 100644 docsfilter/encoding.go create mode 100644 docsfilter/encoding_test.go create mode 100644 docsfilter/filter.go create mode 100644 docsfilter/metrics.go create mode 100644 frac/active_lids_map.go diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index 5c606a74..1654766f 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo) panic(err) } fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo) - mustWriteFileAtomic(fpath, b) + util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile) info.infoSize.Store(int64(len(b))) as.requests[id] = info } @@ -420,7 +420,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err name := getQPRFilename(info.Request.ID, f.Info().Name()) fpath := path.Join(as.config.DataDir, name) - mustWriteFileAtomic(fpath, rawQPR) + util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile) info.qprsSize.Add(int64(len(rawQPR))) return nil @@ -465,7 +465,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) { files = append(files, path.Join(as.config.DataDir, name)) return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { return nil, err } return files, nil @@ -490,7 +490,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { areQPRsMerged[requestID] = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { return nil, err } @@ -510,7 +510,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { return nil, err } @@ -522,11 +522,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { return nil, err } if anyRemove { - mustFsyncFile(dataDir) + util.MustFsyncFile(dataDir) } qprsDuByID := make(map[string]int) @@ -592,7 +592,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { requests[requestID] = info return nil } - if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { return nil, err } return requests, nil @@ -807,7 +807,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) { storeMQPR := func(compressed []byte) error { sizeAfter = len(compressed) mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR) - mustWriteFileAtomic(mqprPath, compressed) + util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile) return nil } if err := compressQPR(&qpr, storeMQPR); err != nil { @@ -1026,65 +1026,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []* return items } - -func mustWriteFileAtomic(fpath string, data []byte) { - fpathTmp := fpath + asyncSearchTmpFile - - f, err := os.Create(fpathTmp) - if err != nil { - logger.Fatal("can't create file", zap.Error(err)) - } - defer func() { - if err := f.Close(); err != nil { - logger.Fatal("can't close file", zap.Error(err)) - } - }() - - if _, err := f.Write(data); err != nil { - logger.Fatal("can't write to file", zap.Error(err)) - } - - if err := f.Sync(); err != nil { - logger.Fatal("can't sync file", zap.Error(err)) - } - - if err := os.Rename(fpathTmp, fpath); err != nil { - logger.Fatal("can't rename file", zap.Error(err)) - } - - absFpath, err := filepath.Abs(fpath) - if err != nil { - logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err)) - } - dir := path.Dir(absFpath) - mustFsyncFile(dir) -} - -func mustFsyncFile(fpath string) { - dirFile, err := os.Open(fpath) - if err != nil { - logger.Fatal("can't open dir", zap.Error(err)) - } - if err := dirFile.Sync(); err != nil { - logger.Fatal("can't sync dir", zap.Error(err)) - } - if err := dirFile.Close(); err != nil { - logger.Fatal("can't close dir", zap.Error(err)) - } -} - -func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { - for _, de := range des { - if de.IsDir() { - continue - } - name := de.Name() - if path.Ext(name) != ext { - continue - } - if err := cb(name); err != nil { - return err - } - } - return nil -} diff --git a/config/config.go b/config/config.go index a63a450d..26d7fe64 100644 --- a/config/config.go +++ b/config/config.go @@ -266,13 +266,13 @@ type Config struct { } `config:"tracing"` // Additional filtering options - Filtering struct { - // If a search query time range overlaps with the [from; to] range - // the search query will be `AND`-ed with an additional predicate with the provided query expression - Query string `config:"query"` - From time.Time `config:"from"` - To time.Time `config:"to"` - } `config:"filtering"` + Filtering Filter `config:"filtering"` + DocsFilter struct { + DataDir string `config:"data_dir"` + Concurrency int `config:"concurrency"` + Filters []Filter `config:"filters"` + CacheSize Bytes `config:"cache_size" default:"100MiB"` + } `config:"docs_filter"` // Experimental provides flags // For configuring experimental features. @@ -294,3 +294,9 @@ func (b *Bytes) UnmarshalString(s string) error { *b = Bytes(bytes) return nil } + +type Filter struct { + Query string `config:"query"` + From time.Time `config:"from"` + To time.Time `config:"to"` +} diff --git a/docsfilter/docs_filter.go b/docsfilter/docs_filter.go new file mode 100644 index 00000000..2a092f7e --- /dev/null +++ b/docsfilter/docs_filter.go @@ -0,0 +1,413 @@ +package docsfilter + +import ( + "context" + "fmt" + "math" + "os" + "path" + "runtime" + "strings" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +const ( + fracInQueueExt = ".queue" + fracDoneExt = ".filter" + tmpExt = ".tmp" +) + +const ( + defaultMaintenanceInterval = 30 * time.Second +) + +type MappingProvider interface { + GetMapping() seq.Mapping +} + +type Config struct { + DataDir string + Workers int + CacheSizeLimit uint64 +} + +type DocsFilter struct { + ctx context.Context + + config Config + filters map[string]*Filter + + fracs map[string][]string + fracsMu *sync.RWMutex + + mp MappingProvider + + rateLimit chan struct{} + createDirOnce *sync.Once + + maintenanceInterval time.Duration +} + +func New( + ctx context.Context, + cfg Config, + params []Params, + mp MappingProvider, +) *DocsFilter { + workers := cfg.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + + filtersMap := make(map[string]*Filter, len(params)) + + for _, p := range params { + f := NewFilter(p) + filtersMap[string(f.Hash())] = f + } + + return &DocsFilter{ + ctx: ctx, + config: cfg, + filters: filtersMap, + fracs: make(map[string][]string), + fracsMu: &sync.RWMutex{}, + mp: mp, + rateLimit: make(chan struct{}, workers), + createDirOnce: &sync.Once{}, + maintenanceInterval: defaultMaintenanceInterval, + } +} + +func (df *DocsFilter) Start(fracs fracmanager.List) { + df.createDataDir() + + err := df.loadFilters() + if err != nil { + logger.Fatal("failed to load previous docs filters", zap.Error(err)) + } + + err = df.buildQueue(fracs) + if err != nil { + logger.Fatal("failed to build docs filters queue", zap.Error(err)) + } + + go df.maintenance() + + mapping := df.mp.GetMapping() + + for _, f := range df.filters { + ast, err := parser.ParseSeqQL(f.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + f.ast = ast + + df.processFilter(f, fracs.FilterInRange(seq.MID(f.params.From), seq.MID(f.params.To))) + } +} + +// RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed. +func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fraction.Info().Name()] + df.fracsMu.RUnlock() + + if !has { + return + } + + for _, fileName := range fracsFiles { + filter := df.filters[filterNameFromTombstonesPath(fileName)] + + queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + + filter.processWg.Add(1) + go func() { + if err := df.processFrac(fraction, filter, false); err != nil { + panic(fmt.Errorf("docs filter refresh frac err: %s", err)) + } + }() + } +} + +// RemoveFrac removes fraction's tombstones. Used after frac is deleted +func (df *DocsFilter) RemoveFrac(fracName string) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fracName] + df.fracsMu.RUnlock() + + if !has { + return + } + + df.fracsMu.Lock() + delete(df.fracs, fracName) + df.fracsMu.Unlock() + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + } +} + +func filterNameFromTombstonesPath(p string) string { + return path.Base(path.Dir(p)) +} + +func (df *DocsFilter) addDoneFrac(fracName, fracPath string) { + df.fracsMu.Lock() + defer df.fracsMu.Unlock() + + df.fracs[fracName] = append(df.fracs[fracName], fracPath) +} + +// loadFilters loads existing filters +func (df *DocsFilter) loadFilters() error { + des, err := os.ReadDir(df.config.DataDir) + if err != nil { + return err + } + + var anyRemove bool + + for _, de := range des { + if !de.IsDir() { + continue + } + + if _, ok := df.filters[de.Name()]; !ok { + logger.Info("there is filter folder on disk, but not in config. need to delete it.") + err := os.RemoveAll(path.Join(df.config.DataDir, de.Name())) + if err != nil && !os.IsNotExist(err) { + return err + } + anyRemove = true + continue + } + + f := df.filters[de.Name()] + f.status = StatusInProgress + f.dirPath = path.Join(df.config.DataDir, de.Name()) + + filterDes, err := os.ReadDir(f.dirPath) + if err != nil { + return fmt.Errorf("reading directory: %s", err) + } + + var hasFracsInQueue bool + + for _, fde := range filterDes { + if fde.IsDir() { + continue + } + name := fde.Name() + + switch path.Ext(name) { + case fracInQueueExt: + hasFracsInQueue = true + case fracDoneExt: + df.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) + } + } + + if !hasFracsInQueue { + f.status = StatusDone + } + } + + if anyRemove { + util.MustFsyncFile(df.config.DataDir) + } + + return nil +} + +// buildQueue creates a directory for each of unprocessed filters and creates .queue files +func (df *DocsFilter) buildQueue(fracs fracmanager.List) error { + for _, filter := range df.filters { + if filter.status != StatusCreated { + continue + } + filter.dirPath = path.Join(df.config.DataDir, filter.Hash()) + util.MustCreateDir(filter.dirPath) + + filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) + for _, f := range filterFracs { + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + } + } + + return nil +} + +// handleFilter finds docs and writes to fs +func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { + if len(fracs) == 0 { + return + } + + fracsByName := make(map[string]frac.Fraction) + for _, f := range fracs { + fracsByName[f.Info().Name()] = f + } + + filterDes, err := os.ReadDir(filter.dirPath) + if err != nil { + panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) + } + + inProgressFilters.Add(1) + + processFracInQueue := func(name string) error { + f, ok := fracsByName[fracNameFromFilePath(name)] + if !ok { // skip missing fracs + return nil + } + filter.processWg.Add(1) + go func() { + if err := df.processFrac(f, filter, false); err != nil { + panic(fmt.Errorf("docs filter process frac err: %s", err)) + } + }() + return nil + } + _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) + + go func() { + filter.processWg.Wait() + filter.markAsDone() + inProgressFilters.Add(-1) + }() +} + +func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { + defer filter.processWg.Done() + + df.rateLimit <- struct{}{} + defer func() { <-df.rateLimit }() + + qpr, err := f.Search(df.ctx, processor.SearchParams{ + AST: filter.ast.Root, + From: seq.MID(filter.params.From), + To: seq.MID(filter.params.To), + Limit: math.MaxInt64, + }) + if err != nil { + return err + } + + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + doneFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) + + if len(qpr.IDs) == 0 { + util.RemoveFile(queueFilePath) + return nil + } + + storeDocsFilter := func(rawDocsFilter []byte) error { + util.MustWriteFileAtomic(doneFilePath, rawDocsFilter, tmpExt) + util.RemoveFile(queueFilePath) + return nil + } + + // TODO: here we doing part of the work twice: + // first time we find LIDs inside f.Search() and then find IDs by these LIDs. + // Then we again find LIDs by earlier found IDs in f.FindLIDs(). + // We did it like this because otherwise we had to do serious f.Search() rewrite. + // For now we're ok with some performance penalty. + lids, err := f.FindLIDs(df.ctx, qpr.IDs.IDs()) + if err != nil { + return err + } + + docsFilterBin := DocsFilterBinIn{LIDs: lids} + if err := writeDocsFilter(&docsFilterBin, storeDocsFilter); err != nil { + return err + } + + if !refresh { + df.addDoneFrac(f.Info().Name(), doneFilePath) + } + + return nil +} + +func (df *DocsFilter) maintenance() { + for { + logger.Info("docs filter maintenance iteration") + df.checkDiskUsage() + time.Sleep(df.maintenanceInterval) + } +} + +func (df *DocsFilter) checkDiskUsage() { + du := int64(0) + + for _, f := range df.filters { + des, err := os.ReadDir(f.dirPath) + if err != nil { + logger.Error("docs filter: can't read filter's dir", + zap.String("filter", f.String()), zap.Error(err)) + return + } + + for _, fde := range des { + if fde.IsDir() { + continue + } + info, err := fde.Info() + if err != nil { + logger.Error("docs filter: can't read tombstones file info", + zap.String("filter", f.String()), zap.Error(err)) + return + } + du += info.Size() + } + } + + diskUsage.Set(float64(du)) + storedFilters.Set(float64(len(df.filters))) +} + +func makeFileName(name, ext string) string { + return name + ext +} + +func fracNameFromFilePath(filterFilePath string) string { + return strings.Split(path.Base(filterFilePath), ".")[0] +} + +var marshalBufferPool util.BufferPool + +func writeDocsFilter(df *DocsFilterBinIn, cb func(compressed []byte) error) error { + rawDocsFilter := marshalBufferPool.Get() + defer marshalBufferPool.Put(rawDocsFilter) + + rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) + if err := cb(rawDocsFilter.B); err != nil { + return err + } + return nil +} + +// createDataDir creates dir data lazily to avoid creating extra folders. +func (df *DocsFilter) createDataDir() { + df.createDirOnce.Do(func() { + if err := os.MkdirAll(df.config.DataDir, 0o777); err != nil { + panic(err) + } + }) +} diff --git a/docsfilter/encoding.go b/docsfilter/encoding.go new file mode 100644 index 00000000..dccfc62f --- /dev/null +++ b/docsfilter/encoding.go @@ -0,0 +1,285 @@ +package docsfilter + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "unsafe" + + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +type DocsFilterBinIn struct { + LIDs []seq.LID +} + +type DocsFilterBinOut struct { + LIDs []uint32 +} + +type docsFilterBinVersion uint8 + +const ( + docsFilterBinVersion1 docsFilterBinVersion = iota + 1 +) + +var availableVersions = map[docsFilterBinVersion]struct{}{ + docsFilterBinVersion1: {}, +} + +type lidsCodec byte + +const ( + lidsCodecDelta = 1 + lidsCodecDeltaZstd = 2 +) + +type lidsBlockHeader struct { + Codec lidsCodec + Length uint32 // Number of LIDs in block + MinLID uint32 + MaxLID uint32 + Size uint32 // Size of ids block in bytes. + Offset uint64 // block's offset in file +} + +func (h *lidsBlockHeader) marshal(dst []byte) { + if len(dst) < int(lidsBlockHeaderSizeBytes) { + panic("BUG: marshal lidsBlockHeader: len(dst) is less than header size") + } + + dst[0] = byte(h.Codec) + dst = dst[1:] + binary.BigEndian.PutUint32(dst, h.Length) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MinLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MaxLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.Size) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint64(dst, h.Offset) + dst = dst[sizeOfUint64:] +} + +func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { + if len(src) < int(lidsBlockHeaderSizeBytes) { + return src, errors.New("too few bytes") + } + + h.Codec = lidsCodec(src[0]) + src = src[1:] + h.Length = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MinLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MaxLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Size = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Offset = binary.BigEndian.Uint64(src) + src = src[sizeOfUint64:] + + return src, nil +} + +func marshalDocsFilter(dst []byte, in *DocsFilterBinIn) []byte { + dst = append(dst, uint8(docsFilterBinVersion1)) + dst = marshalLIDsBlocks(dst, in.LIDs) + return dst +} + +const ( + sizeOfUint32 = unsafe.Sizeof(uint32(0)) + sizeOfUint64 = unsafe.Sizeof(uint64(0)) +) + +const ( + lidsBlockHeaderSizeBytes = 1 + (4 * sizeOfUint32) + sizeOfUint64 + maxLIDsBlockLen = 1024 +) + +var lidsBlockBufPool util.BufferPool + +func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + numberOfBlocks := (len(in) + maxLIDsBlockLen - 1) / maxLIDsBlockLen + dst = binary.BigEndian.AppendUint32(dst, uint32(numberOfBlocks)) + + // reserve space for headers + curHeaderOffset := len(dst) + dst = append(dst, make([]byte, numberOfBlocks*int(lidsBlockHeaderSizeBytes))...) + + var start int + for range numberOfBlocks { + end := min(maxLIDsBlockLen, len(in[start:])) + chunk := in[start : start+end] + + var codec lidsCodec + b.B, codec = marshalLIDsBlock(b.B[:0], chunk) + if len(b.B) > math.MaxUint32 { + panic(fmt.Errorf("unexpected block length %d; want up to %d", len(b.B), math.MaxUint32)) + } + + header := lidsBlockHeader{ + Codec: codec, + Length: uint32(len(chunk)), + MinLID: uint32(chunk[0]), + MaxLID: uint32(chunk[len(chunk)-1]), + Size: uint32(len(b.B)), + Offset: uint64(len(dst)), + } + header.marshal(dst[curHeaderOffset:]) + curHeaderOffset += int(lidsBlockHeaderSizeBytes) + + dst = append(dst, b.B...) + start += end + } + + return dst +} + +func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + prev := seq.LID(0) + for i := range len(in) { + lid := in[i] + deltaLID := lid - prev + prev = lid + b.B = binary.AppendVarint(b.B, int64(deltaLID)) + } + + orig := dst + dst = zstd.CompressLevel(b.B, dst, getCompressLevel(len(b.B))) + + compressRatio := float64(len(dst)-len(orig)) / float64(len(b.B)) + if compressRatio < 1.05 { + orig = append(orig, b.B...) + return orig, lidsCodecDelta + } + + return dst, lidsCodecDeltaZstd +} + +const minLIDsFIlterBytesLen = 10 // 1 byte lidsBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs + +func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error) { + if len(src) < minLIDsFIlterBytesLen { + return nil, fmt.Errorf("invalid LIDs filter format; want %d bytes, got %d", minLIDsFIlterBytesLen, len(src)) + } + + version := docsFilterBinVersion(src[0]) + src = src[1:] + if _, ok := availableVersions[version]; !ok { + return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + } + + dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + if err != nil { + return src, err + } + + return src, nil +} + +func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { + numberOfBlocks := binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + + var err error + + headers := make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + src, err = header.unmarshal(src) + if err != nil { + return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + } + headers = append(headers, header) + } + + for i := range numberOfBlocks { + dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + if err != nil { + return dst, src, err + } + } + + if len(src) > 0 { + return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + } + + return dst, src, nil +} + +func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { + if len(src) == 0 { + return dst, src, fmt.Errorf("empty LIDs block") + } + + if header.Size == 0 || int(header.Size) > len(src) { + return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + } + + block := src[:header.Size] + src = src[header.Size:] + + var err error + + switch header.Codec { + case lidsCodecDeltaZstd: + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + b.B, err = zstd.Decompress(block, b.B) + if err != nil { + return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + } + dst, err = unmarshalLIDsDelta(dst, b.B, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + case lidsCodecDelta: + dst, err = unmarshalLIDsDelta(dst, block, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + default: + return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + } +} + +func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { + prevLID := uint32(0) + for range header.Length { + v, n := binary.Varint(block) + block = block[n:] + lid := prevLID + uint32(v) + prevLID = lid + dst = append(dst, lid) + } + + if len(block) > 0 { + return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return dst, nil +} + +func getCompressLevel(size int) int { + level := 3 + if size <= 512 { + level = 1 + } else if size <= 4*1024 { + level = 2 + } + return level +} diff --git a/docsfilter/encoding_test.go b/docsfilter/encoding_test.go new file mode 100644 index 00000000..777285fb --- /dev/null +++ b/docsfilter/encoding_test.go @@ -0,0 +1,42 @@ +package docsfilter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestMarshalUnmarshalLIDsFilter(t *testing.T) { + test := func(df DocsFilterBinIn) { + t.Helper() + + rawDocsFilter := marshalDocsFilter(nil, &df) + var out DocsFilterBinOut + tail, err := unmarshalDocsFilter(&out, rawDocsFilter) + require.NoError(t, err) + require.Equal(t, 0, len(tail)) + assert.Equal(t, lidsToUint32s(df.LIDs), out.LIDs) + } + + test(DocsFilterBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) + test(DocsFilterBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) + test(DocsFilterBinIn{LIDs: []seq.LID{11}}) + + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + test(DocsFilterBinIn{LIDs: multipleBlocksLIDs}) +} + +func lidsToUint32s(in []seq.LID) []uint32 { + out := make([]uint32, 0, len(in)) + for _, i := range in { + out = append(out, uint32(i)) + } + return out +} diff --git a/docsfilter/filter.go b/docsfilter/filter.go new file mode 100644 index 00000000..a5bc7770 --- /dev/null +++ b/docsfilter/filter.go @@ -0,0 +1,64 @@ +package docsfilter + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + "github.com/ozontech/seq-db/parser" +) + +type FilterStatus byte + +const ( + StatusCreated FilterStatus = iota + StatusInProgress + StatusDone + StatusError +) + +type Params struct { + Query string + From int64 + To int64 +} + +type Filter struct { + params Params + + status FilterStatus + + ast parser.SeqQLQuery + + hash string + dirPath string + + processWg *sync.WaitGroup +} + +func NewFilter(params Params) *Filter { + return &Filter{ + params: params, + status: StatusCreated, + processWg: &sync.WaitGroup{}, + } +} + +func (f *Filter) String() string { + return fmt.Sprintf("%s_%d_%d", f.params.Query, f.params.From, f.params.To) +} + +func (f *Filter) Hash() string { + if f.hash == "" { + h := sha256.New() + h.Write([]byte(f.String())) + bs := h.Sum(nil) + f.hash = hex.EncodeToString(bs) + } + return f.hash +} + +func (f *Filter) markAsDone() { + f.status = StatusDone +} diff --git a/docsfilter/metrics.go b/docsfilter/metrics.go new file mode 100644 index 00000000..de45bc40 --- /dev/null +++ b/docsfilter/metrics.go @@ -0,0 +1,26 @@ +package docsfilter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + inProgressFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "in_progress", + Help: "Number of doc filters in progress", + }) + diskUsage = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "disk_usage_bytes", + }) + storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "stored", + Help: "Number of active doc filters", + }) +) diff --git a/frac/active.go b/frac/active.go index e16b48a1..c4e22196 100644 --- a/frac/active.go +++ b/frac/active.go @@ -46,6 +46,7 @@ type Active struct { TokenList *TokenList DocsPositions *DocsPositions + IDsToLIDs *ActiveLIDs docsFile *os.File docsReader storage.DocsReader @@ -84,6 +85,7 @@ func NewActive( f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), DocsPositions: NewSyncDocsPositions(), + IDsToLIDs: NewActiveLIDs(), MIDs: NewIDs(), RIDs: NewIDs(), DocBlocks: NewIDs(), @@ -288,6 +290,17 @@ func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Active) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + if f.Info().DocsTotal == 0 { // it is empty active fraction state + return nil, nil + } + + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { return &activeDataProvider{ ctx: ctx, @@ -300,6 +313,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { blocksOffsets: f.DocBlocks.GetVals(), docsPositions: f.DocsPositions, + idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, } } diff --git a/frac/active_index.go b/frac/active_index.go index 350a8e0d..71831c26 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -25,6 +25,7 @@ type activeDataProvider struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader idsIndex *activeIDsIndex @@ -136,6 +137,16 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return res, nil } +func (dp *activeDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + res := make([]seq.LID, 0, len(ids)) + for _, id := range ids { + if lid, ok := dp.idsToLids.Get(id); ok { + res = append(res, lid) + } + } + return res, nil +} + type activeIDsIndex struct { mids []uint64 rids []uint64 diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 5cda8f9f..3821cf03 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -161,6 +161,10 @@ func (ai *ActiveIndexer) appendWorker(index int) { lids := active.AppendIDs(collector.IDs) m.Stop() + m = sw.Start("active_lids_map_set") + active.IDsToLIDs.SetMultiple(collector.IDs, lids) + m.Stop() + m = sw.Start("token_list_append") tokenLIDsPlaces := collector.PrepareTokenLIDsPlaces() active.TokenList.Append(collector.TokensValues, collector.FieldsLengths, tokenLIDsPlaces) diff --git a/frac/active_lids_map.go b/frac/active_lids_map.go new file mode 100644 index 00000000..bae64854 --- /dev/null +++ b/frac/active_lids_map.go @@ -0,0 +1,37 @@ +package frac + +import ( + "sync" + + "github.com/ozontech/seq-db/seq" +) + +type ActiveLIDs struct { + mu *sync.RWMutex + idToLid map[seq.ID]seq.LID +} + +func NewActiveLIDs() *ActiveLIDs { + al := ActiveLIDs{ + mu: &sync.RWMutex{}, + idToLid: make(map[seq.ID]seq.LID), + } + return &al +} + +func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) { + al.mu.RLock() + defer al.mu.RUnlock() + + val, ok := al.idToLid[id] + return val, ok +} + +func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) { + al.mu.Lock() + defer al.mu.Unlock() + + for i, id := range ids { + al.idToLid[id] = seq.LID(lids[i]) + } +} diff --git a/frac/fraction.go b/frac/fraction.go index 59995639..a3027c97 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -21,6 +21,7 @@ type Fraction interface { Contains(mid seq.MID) bool Fetch(context.Context, []seq.ID) ([][]byte, error) Search(context.Context, processor.SearchParams) (*seq.QPR, error) + FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) } var ( diff --git a/frac/remote.go b/frac/remote.go index c2088caa..e15aa73f 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -131,6 +131,16 @@ func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp, err := f.createDataProvider(ctx) + if err != nil { + return nil, err + } + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) { if err := f.load(); err != nil { logger.Error( diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..ddae8ccf 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -312,6 +312,13 @@ func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Sealed) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { f.load() return &sealedDataProvider{ diff --git a/frac/sealed_index.go b/frac/sealed_index.go index f97c6e84..3899124a 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -122,6 +122,10 @@ func (dp *sealedDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return qpr, nil } +func (dp *sealedDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + return dp.getFetchIndex().findLIDs(ids), nil +} + type sealedIDsIndex struct { fracName string table *seqids.Table diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index 949e2412..f18bb99c 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -69,6 +69,10 @@ func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParam return p.impl.Search(ctx, params) } +func (p *fractionProxy) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + return p.impl.FindLIDs(ctx, ids) +} + // activeProxy manages an active (writable) fraction // Tracks pending write operations and provides freeze capability. // Lifecycle: Created when fraction becomes active, destroyed after sealing. @@ -172,3 +176,7 @@ func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (* metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil } + +func (emptyFraction) FindLIDs(_ context.Context, _ []seq.ID) ([]seq.LID, error) { + return nil, nil +} diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index eb70a2d2..bdb07ec6 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -62,6 +62,10 @@ func (f *testFakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR return f.qpr, nil } +func (f *testFakeFrac) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { + return []seq.LID{}, nil +} + func newFakeFrac(from, to seq.MID, qpr *seq.QPR) *testFakeFrac { return &testFakeFrac{ info: &common.Info{From: from, To: to, DocsTotal: 1}, diff --git a/util/fs.go b/util/fs.go index 57fd7b17..b6e9eaca 100644 --- a/util/fs.go +++ b/util/fs.go @@ -6,30 +6,32 @@ package util import ( "errors" "os" + "path" + "path/filepath" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" ) -func MustSyncPath(path string) { - if err := SyncPath(path); err != nil { - logger.Panic("cannot sync path", zap.String("path", path), zap.Error(err)) +func MustSyncPath(dirPath string) { + if err := SyncPath(dirPath); err != nil { + logger.Panic("cannot sync path", zap.String("path", dirPath), zap.Error(err)) } } -func MustRemoveFileByPath(path string) { - if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { +func MustRemoveFileByPath(fpath string) { + if err := os.Remove(fpath); err != nil && !errors.Is(err, os.ErrNotExist) { logger.Panic( "cannot remove file by path", - zap.String("path", path), + zap.String("path", fpath), zap.Error(err), ) } } -func SyncPath(path string) error { - d, err := os.Open(path) +func SyncPath(dirPath string) error { + d, err := os.Open(dirPath) if err != nil { return err } @@ -53,3 +55,75 @@ func RemoveFile(file string) { logger.Error("file removing error", zap.Error(err)) } } + +func MustWriteFileAtomic(fpath string, data []byte, tmpFileExt string) { + fpathTmp := fpath + tmpFileExt + + f, err := os.Create(fpathTmp) + if err != nil { + logger.Panic("can't create file", zap.Error(err)) + } + defer func() { + if err := f.Close(); err != nil { + logger.Panic("can't close file", zap.Error(err)) + } + }() + + if _, err := f.Write(data); err != nil { + logger.Panic("can't write to file", zap.Error(err)) + } + + if err := f.Sync(); err != nil { + logger.Panic("can't sync file", zap.Error(err)) + } + + if err := os.Rename(fpathTmp, fpath); err != nil { + logger.Panic("can't rename file", zap.Error(err)) + } + + absFpath, err := filepath.Abs(fpath) + if err != nil { + logger.Panic("can't get absolute path", zap.String("path", fpath), zap.Error(err)) + } + dir := path.Dir(absFpath) + MustFsyncFile(dir) +} + +func MustFsyncFile(fpath string) { + dirFile, err := os.Open(fpath) + if err != nil { + logger.Panic("can't open dir", zap.Error(err)) + } + if err := dirFile.Sync(); err != nil { + logger.Panic("can't sync dir", zap.Error(err)) + } + if err := dirFile.Close(); err != nil { + logger.Panic("can't close dir", zap.Error(err)) + } +} + +// MustCreateDir creates directory at dirPath. +// Handles the case when directory already exists. +func MustCreateDir(dirPath string) { + err := os.MkdirAll(dirPath, 0o777) + if err != nil && !os.IsExist(err) { + logger.Panic("can't create file", zap.Error(err)) + } +} + +// VisitFilesWithExt traverses all the files with `ext` extension in `des` directory and calls a `cb` func for each of files. +func VisitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { + for _, de := range des { + if de.IsDir() { + continue + } + name := de.Name() + if path.Ext(name) != ext { + continue + } + if err := cb(name); err != nil { + return err + } + } + return nil +}