Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,14 @@ message SearchQuery {

// Aggregation function used in request.
enum AggFunc {
AGG_FUNC_COUNT = 0; // Returns how many times `field` was equal to particular value.
AGG_FUNC_SUM = 1; // Performs an addition operation on `field`, among documents with same `group_by` field.
AGG_FUNC_MIN = 2; // Finds minimum value for `field`, among documents with same `group_by` field.
AGG_FUNC_MAX = 3; // Finds maximum value for `field`, among documents with same `group_by` field.
AGG_FUNC_AVG = 4; // Finds average value for `field`, among documents with same `group_by` field.
AGG_FUNC_QUANTILE = 5; // Finds quantiles for `field`, among documents with same `group_by` field.
AGG_FUNC_UNIQUE = 6; // Finds unique values for `group_by` field.
AGG_FUNC_COUNT = 0; // Returns how many times `field` was equal to particular value.
AGG_FUNC_SUM = 1; // Performs an addition operation on `field`, among documents with same `group_by` field.
AGG_FUNC_MIN = 2; // Finds minimum value for `field`, among documents with same `group_by` field.
AGG_FUNC_MAX = 3; // Finds maximum value for `field`, among documents with same `group_by` field.
AGG_FUNC_AVG = 4; // Finds average value for `field`, among documents with same `group_by` field.
AGG_FUNC_QUANTILE = 5; // Finds quantiles for `field`, among documents with same `group_by` field.
AGG_FUNC_UNIQUE = 6; // Finds unique values for `group_by` field.
AGG_FUNC_UNIQUE_COUNT = 7; // Finds count for every unique value of `field`, among documents with same `group_by` field.
}

// Order of document sorting.
Expand Down
4 changes: 4 additions & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum AggFunc {
AGG_FUNC_AVG = 4;
AGG_FUNC_QUANTILE = 5;
AGG_FUNC_UNIQUE = 6;
AGG_FUNC_UNIQUE_COUNT = 7;
}

enum Order {
Expand Down Expand Up @@ -96,6 +97,7 @@ message SearchResponse {
int64 total = 4;
int64 not_exists = 5;
repeated double samples = 6;
repeated uint32 values = 7;
}

message Bin {
Expand All @@ -115,6 +117,7 @@ message SearchResponse {
// { (foo, ts2) -> (val) }
// ]
repeated Bin timeseries = 4;
repeated string values_pool = 5;
}

bytes data = 1 [deprecated = true];
Expand All @@ -140,6 +143,7 @@ enum SearchErrorCode {
TOO_MANY_FIELD_TOKENS = 4;
TOO_MANY_GROUP_TOKENS = 5;
TOO_MANY_FRACTION_TOKENS = 6;
TOO_MANY_FIELD_VALUES = 7;
}

message StartAsyncSearchRequest {
Expand Down
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func startStore(
Search: frac.SearchConfig{
AggLimits: frac.AggLimits{
MaxFieldTokens: cfg.Limits.Aggregation.FieldTokens,
MaxFieldValues: cfg.Limits.Aggregation.FieldValues,
MaxGroupTokens: cfg.Limits.Aggregation.GroupTokens,
MaxTIDsPerFraction: cfg.Limits.Aggregation.FractionTokens,
},
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type Config struct {
// that can be processed in single aggregation requests.
// Setting this field to 0 disables limit.
FieldTokens int `config:"field_tokens" default:"1000000"`
// FieldValues specifies maximum amount of unique field values
// that partial aggregation results (buckets) can contain in single aggregation requests.
// Setting this field to 0 disables limit.
FieldValues int `config:"field_values" default:"1000000"`
// GroupTokens specifies maximum amount of unique group tokens
// that can be processed in single aggregation requests.
// Setting this field to 0 disables limit.
Expand Down
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var (
ErrInvalidAggQuery = errors.New("invalid agg query")
ErrInvalidArgument = errors.New("invalid argument")
ErrTooManyFieldTokens = errors.New("aggregation has too many field tokens")
ErrTooManyFieldValues = errors.New("aggregation has too many field values in memory")
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
ErrTooManyFractionsHit = errors.New("too many fractions hit")
Expand Down
41 changes: 41 additions & 0 deletions docs/en/10-public-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Supported aggregation functions:
- `AGG_FUNC_MAX` — maximum value of the field
- `AGG_FUNC_QUANTILE` — quantile computation for the field
- `AGG_FUNC_UNIQUE` — computation of unique field values (not supported in timeseries)
- `AGG_FUNC_UNIQUE_COUNT` — count of unique field values (can be used with group by)
- `AGG_FUNC_COUNT` — count of documents per group

#### Aggregation Examples
Expand Down Expand Up @@ -443,6 +444,46 @@ grpcurl -plaintext -d '
}
```

##### UNIQUE COUNT

> For `AGG_FUNC_UNIQUE_COUNT` both `group_by` and `field` are required.

**Request:**

```sh
grpcurl -plaintext -d '
{
"query": {
"from": "2000-01-01T00:00:00Z",
"to": "2077-01-01T00:00:00Z",
"query": "*"
},
"aggs": [
{
"func": "AGG_FUNC_UNIQUE_COUNT",
"group_by": "service",
"field": "pod",
}
]
}' localhost:9004 seqproxyapi.v1.SeqProxyApi/GetAggregation
```

**Response:**

```json
{
"aggs": [
{
"buckets": [
{"key": "svc1", "value": 2},
{"key": "svc2", "value": 2},
{"key": "svc3", "value": 1}
]
}
]
}
```

##### COUNT (with interval)

**Request:**
Expand Down
1 change: 1 addition & 0 deletions frac/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type SearchConfig struct {

type AggLimits struct {
MaxFieldTokens int // MaxFieldTokens max AggQuery.Field uniq values to parse.
MaxFieldValues int // MaxFieldValues max AggQuery.Field uniq values to hold per aggregation request.
MaxGroupTokens int // MaxGroupTokens max AggQuery.GroupBy unique values.
MaxTIDsPerFraction int // MaxTIDsPerFraction max number of tokens per fraction.
}
32 changes: 20 additions & 12 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
)

mapping := seq.Mapping{
"service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"message": seq.NewSingleType(seq.TokenizerTypeText, "", 100),
"level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"client_ip": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"message": seq.NewSingleType(seq.TokenizerTypeText, "", 100),
"level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
}
tokenizers := map[seq.TokenizerType]tokenizer.Tokenizer{
seq.TokenizerTypeText: tokenizer.NewTextTokenizer(1024, false, true, 8192),
Expand Down Expand Up @@ -145,7 +147,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
readTest(t, sealed, numReaders, numQueries, docs, fromTime, toTime, mapping)
}

func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []testDoc, fromTime, toTime time.Time, mapping seq.Mapping) {
func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) {
readersGroup, ctx := errgroup.WithContext(t.Context())

type queryFilter func(doc *testDoc) bool
Expand Down Expand Up @@ -232,7 +234,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
// find docs by time range and provided query filter to match against fetched docs
var expectedDocs []string
for i := len(docs) - 1; i >= 0 && len(expectedDocs) < searchParams.Limit; i-- {
if (docs[i].timestamp.Before(queryTime) || docs[i].timestamp.Equal(queryTime)) && filter(&docs[i]) {
if (docs[i].timestamp.Before(queryTime) || docs[i].timestamp.Equal(queryTime)) && filter(docs[i]) {
expectedDocs = append(expectedDocs, docs[i].json)
}
}
Expand All @@ -254,13 +256,15 @@ type testDoc = struct {
json string
message string
service string
pod string
clientIp string
level int
traceId string
timestamp time.Time
}

func generatesMessages(numMessages, bulkSize int) ([]testDoc, [][]string, time.Time, time.Time) {
services := []string{"gateway", "proxy", "scheduler", "database", "bus"}
func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time.Time, time.Time) {
services := []string{"gateway", "proxy", "scheduler", "database", "bus", "kafka"}
messages := []string{
"request started", "request completed", "processing timed out",
"processing data", "processing failed", "processing retry",
Expand All @@ -269,26 +273,30 @@ func generatesMessages(numMessages, bulkSize int) ([]testDoc, [][]string, time.T
fromTime := time.Date(2000, 1, 1, 13, 0, 0, 0, time.UTC)
var toTime time.Time

docs := make([]testDoc, 0, numMessages)
docs := make([]*testDoc, 0, numMessages)

for i := 0; i < numMessages; i++ {
service := services[rand.IntN(len(services))]
message := messages[rand.IntN(len(messages))]
level := rand.IntN(6)
timestamp := fromTime.Add(time.Duration(i) * time.Millisecond)
traceId := fmt.Sprintf("trace-%d", i%5000)
pod := fmt.Sprintf("pod-%d", i%50)
clientIp := fmt.Sprintf("192.168.%d.%d", rand.IntN(64), rand.IntN(256))
if i == numMessages-1 {
toTime = timestamp
}

json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, message, traceId, level)
json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, pod, clientIp, message, traceId, level)

docs = append(docs, testDoc{
docs = append(docs, &testDoc{
json: json,
timestamp: timestamp,
message: message,
service: service,
pod: pod,
clientIp: clientIp,
level: level,
traceId: traceId,
})
Expand Down
Loading
Loading