Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
}

type testDoc = struct {
id string
json string
message string
service string
Expand Down Expand Up @@ -280,19 +281,21 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time.
message := messages[rand.IntN(len(messages))]
level := rand.IntN(6)
timestamp := fromTime.Add(time.Duration(i) * time.Millisecond)
id := fmt.Sprintf("id-%d", i)
traceId := fmt.Sprintf("trace-%d", i%5000)
pod := fmt.Sprintf("pod-%d", i%50)
clientIp := fmt.Sprintf("192.168.%d.%d", rand.IntN(64), rand.IntN(256))
if i == numMessages-1 {
toTime = timestamp
}

json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, pod, clientIp, message, traceId, level)
json := fmt.Sprintf(`{"timestamp":%q,"id": %q, "service":%q,"pod":%q,"client_ip":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), id, service, pod, clientIp, message, traceId, level)

docs = append(docs, &testDoc{
json: json,
timestamp: timestamp,
id: id,
message: message,
service: service,
pod: pod,
Expand Down
28 changes: 28 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
seq.TokenizerTypePath: tokenizer.NewPathTokenizer(512, false, true),
}
s.mapping = seq.Mapping{
"id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_namespace": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"k8s_container": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down Expand Up @@ -1352,6 +1353,33 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
})

// Test large QPR with 25000 groups (all ids are unique)
s.Run("_exists_:service | group by id count()", func() {
countById := make(map[string]int)
for _, doc := range testDocs {
countById[doc.id]++
}

var expectedBuckets []seq.AggregationBucket
for id, cnt := range countById {
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: id,
Value: float64(cnt),
NotExists: 0,
})
}

searchParams := s.query(
"_exists_:service",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
GroupBy: aggField("id"),
Func: seq.AggFuncCount,
}))

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncCount}, expectedBuckets)
})

s.Run("NOT message:retry | group by service avg(level)", func() {
levelsByService := make(map[string][]int)
for _, doc := range testDocs {
Expand Down
15 changes: 12 additions & 3 deletions frac/sealed/token/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ func (t Table) GetEntryByTID(tid uint32) *TableEntry {
if tid == 0 {
return nil
}
// todo: use bin search (we must have ordered slice here)
for _, data := range t {
for _, entry := range data.Entries {
if tid >= entry.StartTID && tid < entry.StartTID+entry.ValCount {
from := data.Entries[0].StartTID
to := data.Entries[len(data.Entries)-1].getLastTID()
if tid < from || tid > to {
continue
}

i := sort.Search(len(data.Entries), func(j int) bool {
return data.Entries[j].StartTID > tid
})
if i > 0 {
entry := data.Entries[i-1]
if tid <= entry.getLastTID() {
return entry
Comment on lines +88 to 91
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this check is redundant. We checked earlier in line 79 that there MUST exist an entry which has StartTID <= tid && tid <= LastTID (at least at 0-th index). So we can rewrite it simpler:

for _, data := range t {
	from := data.Entries[0].StartTID
	to := data.Entries[len(data.Entries)-1].getLastTID()

	if tid < from || tid > to {
		continue
	}

	i := sort.Search(len(data.Entries), func(j int) bool {
		return data.Entries[j].StartTID > tid
	})

	return data.Entries[i-1]
}

And what's important that for sort.Search we have following documentation:

Search uses binary search to find and return the smallest index i in [0, n) at which f(i) is true, assuming that on the range [0, n), f(i) == true implies f(i+1) == true.

And also there is:

If there is no such index, Search returns n.

So sort.Search cannot return index that is less or equal to 0 in this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would add invariant check here as well:

for _, data := range t {
	from := data.Entries[0].StartTID
	to := data.Entries[len(data.Entries)-1].getLastTID()

	if tid < from || tid > to {
		continue
	}

	i := sort.Search(len(data.Entries), func(j int) bool {
		return data.Entries[j].StartTID > tid
	})

	if i <= 0 {
		panic("invariant violation")
	}

	return data.Entries[i-1]
}

but that's unnecessary of course.

}
}
Expand Down
Loading