diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 38f02634..4f10eefc 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -253,6 +253,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs } type testDoc = struct { + id string json string message string service string @@ -280,6 +281,7 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time. message := messages[rand.IntN(len(messages))] level := rand.IntN(6) timestamp := fromTime.Add(time.Duration(i) * time.Millisecond) + id := fmt.Sprintf("id-%d", i) traceId := fmt.Sprintf("trace-%d", i%5000) pod := fmt.Sprintf("pod-%d", i%50) clientIp := fmt.Sprintf("192.168.%d.%d", rand.IntN(64), rand.IntN(256)) @@ -287,12 +289,13 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time. toTime = timestamp } - json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`, - timestamp.Format(time.RFC3339Nano), service, pod, clientIp, message, traceId, level) + json := fmt.Sprintf(`{"timestamp":%q,"id": %q, "service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`, + timestamp.Format(time.RFC3339Nano), id, service, pod, clientIp, message, traceId, level) docs = append(docs, &testDoc{ json: json, timestamp: timestamp, + id: id, message: message, service: service, pod: pod, diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8d7a782f..e409996c 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -65,6 +65,7 @@ func (s *FractionTestSuite) SetupTestCommon() { seq.TokenizerTypePath: tokenizer.NewPathTokenizer(512, false, true), } s.mapping = seq.Mapping{ + "id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "k8s_pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "k8s_namespace": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "k8s_container": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), @@ -1352,6 +1353,33 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets) }) + // Test large QPR with 25000 groups (all ids are unique) + s.Run("_exists_:service | group by id count()", func() { + countById := make(map[string]int) + for _, doc := range testDocs { + countById[doc.id]++ + } + + var expectedBuckets []seq.AggregationBucket + for id, cnt := range countById { + expectedBuckets = append(expectedBuckets, seq.AggregationBucket{ + Name: id, + Value: float64(cnt), + NotExists: 0, + }) + } + + searchParams := s.query( + "_exists_:service", + withTo(toTime.Format(time.RFC3339Nano)), + withAggQuery(processor.AggQuery{ + GroupBy: aggField("id"), + Func: seq.AggFuncCount, + })) + + s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncCount}, expectedBuckets) + }) + s.Run("NOT message:retry | group by service avg(level)", func() { levelsByService := make(map[string][]int) for _, doc := range testDocs { diff --git a/frac/sealed/token/table.go b/frac/sealed/token/table.go index 92c6102d..be8617d8 100644 --- a/frac/sealed/token/table.go +++ b/frac/sealed/token/table.go @@ -75,10 +75,19 @@ func (t Table) GetEntryByTID(tid uint32) *TableEntry { if tid == 0 { return nil } - // todo: use bin search (we must have ordered slice here) for _, data := range t { - for _, entry := range data.Entries { - if tid >= entry.StartTID && tid < entry.StartTID+entry.ValCount { + from := data.Entries[0].StartTID + to := data.Entries[len(data.Entries)-1].getLastTID() + if tid < from || tid > to { + continue + } + + i := sort.Search(len(data.Entries), func(j int) bool { + return data.Entries[j].StartTID > tid + }) + if i > 0 { + entry := data.Entries[i-1] + if tid <= entry.getLastTID() { return entry } }