Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ enum SearchErrorCode {
TOO_MANY_GROUP_TOKENS = 5;
TOO_MANY_FRACTION_TOKENS = 6;
TOO_MANY_FIELD_VALUES = 7;
MEMORY_LIMIT_EXCEEDED = 8;
}

message StartAsyncSearchRequest {
Expand Down
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func startStore(
WorkersCount: cfg.Resources.SearchWorkers,
MaxFractionHits: cfg.Limits.FractionHits,
FractionsPerIteration: config.NumCPU,
MaxQprMemory: uint64(cfg.Limits.QprMemoryUsage),
RequestsLimit: uint64(cfg.Limits.SearchRequests),
LogThreshold: cfg.SlowLogs.SearchThreshold,
Async: asyncsearcher.AsyncSearcherConfig{
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Config struct {
// DocSize specifies maximum possible size for single document.
// Document larger than this threshold will be skipped.
DocSize Bytes `config:"doc_size" default:"128KiB"`
// QprMemoryUsage specifies maximum heap memory which a single QPR (query partial result)
// can use in either store or proxy.
QprMemoryUsage Bytes `config:"qpr_memory_usage" default:"0B"`

Aggregation struct {
// FieldTokens specifies maximum amount of unique field tokens
Expand Down
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ var (
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
ErrTooManyFractionsHit = errors.New("too many fractions hit")
ErrMemoryLimitExceeded = errors.New("memory limit exceeded")
)
8 changes: 8 additions & 0 deletions fracmanager/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type SearcherCfg struct {
MaxFractionHits int // the maximum number of fractions used in the search
FractionsPerIteration int
MaxQprMemory uint64 // max heap memory a single QPR can use. 0 if no limit set
}

type Searcher struct {
Expand Down Expand Up @@ -59,6 +60,7 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params

var totalSearchTimeNanos int64
var totalWaitTimeNanos int64
var totalMemUsage uint64

for len(remainingFracs) > 0 && (scanAll || params.Limit > 0) {
subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, remainingFracs.Shift(fracsChunkSize), params)
Expand All @@ -71,6 +73,12 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params

seq.MergeQPRs(total, subQPRs, origLimit, seq.MillisToMID(params.HistInterval), params.Order)

totalMemUsage = total.MemUsage()

if s.cfg.MaxQprMemory > 0 && totalMemUsage > s.cfg.MaxQprMemory {
return nil, fmt.Errorf("%w: used %d bytes, limit %d", consts.ErrMemoryLimitExceeded, total.MemUsage(), s.cfg.MaxQprMemory)
}

// reduce the limit on the number of ensured docs in response
params.Limit = origLimit - calcEnsuredIDsCount(total.IDs, remainingFracs, params.Order)

Expand Down
7 changes: 7 additions & 0 deletions metric/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ var (
Help: "Search request duration time (only successful searches)",
Buckets: SecondsBuckets,
})
SearchQprMemSize = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "seq_db_store",
Subsystem: "search",
Name: "qpr_mem_size_bytes",
Help: "QPR heap memory size in bytes",
Buckets: prometheus.ExponentialBuckets(1024, 3, 21),
})

SearchRangesSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "seq_db_store",
Expand Down
115 changes: 60 additions & 55 deletions pkg/storeapi/store_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions proxy/search/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ func (si *Ingestor) searchShard(
if errMessage == consts.ErrTooManyFractionTokens.Error() {
return nil, source, fmt.Errorf("store forbids aggregation request: %w", consts.ErrTooManyFractionTokens)
}
if errMessage == consts.ErrMemoryLimitExceeded.Error() {
return nil, source, fmt.Errorf("store forbids search request: %w", consts.ErrMemoryLimitExceeded)
}
errs = append(errs, err)
continue
}
Expand All @@ -620,6 +623,8 @@ func (si *Ingestor) searchShard(
return nil, source, fmt.Errorf("store forbids aggregation request: %w", consts.ErrTooManyGroupTokens)
case storeapi.SearchErrorCode_TOO_MANY_FRACTION_TOKENS:
return nil, source, fmt.Errorf("store forbids aggregation request: %w", consts.ErrTooManyFractionTokens)
case storeapi.SearchErrorCode_MEMORY_LIMIT_EXCEEDED:
return nil, source, fmt.Errorf("store forbids search request: %w", consts.ErrMemoryLimitExceeded)
case storeapi.SearchErrorCode_TOO_MANY_FRACTIONS_HIT:
return nil, source, fmt.Errorf("store forbids request: %w", consts.ErrTooManyFractionsHit)
}
Expand Down
Loading
Loading