diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index 1d12fec8..93870684 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -183,7 +183,7 @@ func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearch } ctx, cancel := context.WithCancel(context.Background()) return asyncSearchInfo{ - Version: infoVersion1, + Version: infoVersion2, Finished: false, Error: "", CanceledAt: time.Time{}, @@ -591,10 +591,10 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { if info.Version == 0 { info.Version = infoVersion1 } - if info.Version == infoVersion2 { - info.Request.Params.From = seq.NanosToMID(uint64(info.Request.Params.From)) - info.Request.Params.To = seq.NanosToMID(uint64(info.Request.Params.To)) - info.Version = infoVersion1 + if info.Version == infoVersion1 { + info.Request.Params.From = seq.MillisToMID(uint64(info.Request.Params.From)) + info.Request.Params.To = seq.MillisToMID(uint64(info.Request.Params.To)) + info.Version = infoVersion2 } info.merged.Store(areQPRsMerged[requestID]) diff --git a/asyncsearcher/encoding.go b/asyncsearcher/encoding.go index d30eb72d..92772deb 100644 --- a/asyncsearcher/encoding.go +++ b/asyncsearcher/encoding.go @@ -27,7 +27,7 @@ var availableVersions = map[qprBinVersion]struct{}{ } func marshalQPR(q *seq.QPR, dst []byte) []byte { - dst = append(dst, uint8(qprBinVersion1)) + dst = append(dst, uint8(qprBinVersion2)) blocksLenPos := len(dst) dst = append(dst, make([]byte, 8)...) @@ -238,8 +238,8 @@ func unmarshalIDsDelta(dst seq.IDSources, block []byte, version qprBinVersion) ( block = block[hintSize:] var midValue seq.MID - if version == qprBinVersion2 { - midValue = seq.NanosToMID(uint64(mid)) + if version == qprBinVersion1 { + midValue = seq.MillisToMID(uint64(mid)) } else { midValue = seq.MID(mid) } @@ -296,8 +296,8 @@ func unmarshalHistogram(src []byte, version qprBinVersion) (map[seq.MID]uint64, } var midValue seq.MID - if version == qprBinVersion2 { - midValue = seq.NanosToMID(uint64(mid)) + if version == qprBinVersion1 { + midValue = seq.MillisToMID(uint64(mid)) } else { midValue = seq.MID(mid) } @@ -460,8 +460,8 @@ func unmarshalAggregatableSamples(q *seq.AggregatableSamples, src []byte, versio src = tail var midValue seq.MID - if version == qprBinVersion2 { - midValue = seq.NanosToMID(uint64(mid)) + if version == qprBinVersion1 { + midValue = seq.MillisToMID(uint64(mid)) } else { midValue = seq.MID(mid) } diff --git a/asyncsearcher/encoding_test.go b/asyncsearcher/encoding_test.go index 5fbfa832..5b34578b 100644 --- a/asyncsearcher/encoding_test.go +++ b/asyncsearcher/encoding_test.go @@ -92,18 +92,18 @@ func TestQPRMarshalUnmarshal(t *testing.T) { } } -// TestQPRVersion2Compatibility tests that it's possible to unmarshall and read version 2 async search result encoded. -// MIDs in IDs and a histogram must be converted to milliseconds -func TestQPRVersion2Compatibility(t *testing.T) { +// TestQPRVersion1Compatibility tests that it's possible to unmarshall and read version 1 async search result encoded. +// MIDs in IDs and a histogram must be converted from millis to nanos +func TestQPRVersion1Compatibility(t *testing.T) { qpr := seq.QPR{ IDs: seq.IDSources{ { - ID: seq.ID{MID: 1761812502573000000, RID: 34734732392}, + ID: seq.ID{MID: seq.MID(1761812502573), RID: 34734732392}, }, }, Histogram: map[seq.MID]uint64{ - 1761812502573000000: 433, - 1761812502463000000: 743, + seq.MID(1761812502573): 433, + seq.MID(1761812502463): 743, }, Aggs: []seq.AggregatableSamples{ { @@ -111,7 +111,7 @@ func TestQPRVersion2Compatibility(t *testing.T) { {Token: "_not_exists"}: { Total: 1, }, - {Token: "seq-db store", MID: seq.MID(1761812502953000000)}: { + {Token: "seq-db store", MID: seq.MID(1761812502953)}: { Min: 3, Max: 5, Sum: 794, @@ -119,7 +119,7 @@ func TestQPRVersion2Compatibility(t *testing.T) { NotExists: 7, Samples: []float64{324}, }, - {Token: "seq-db store", MID: seq.MID(1761812502456000000)}: { + {Token: "seq-db store", MID: seq.MID(1761812502456)}: { Min: 2, Max: 6, Sum: 544, @@ -132,27 +132,30 @@ func TestQPRVersion2Compatibility(t *testing.T) { }, }, } + rawQPR := marshalQPR(&qpr, nil) - rawQPR[0] = uint8(qprBinVersion2) - var outQpr seq.QPR - tail, err := unmarshalQPR(&outQpr, rawQPR, math.MaxInt) + rawQPR[0] = uint8(qprBinVersion1) + + var outQPR seq.QPR + tail, err := unmarshalQPR(&outQPR, rawQPR, math.MaxInt) require.NoError(t, err) require.Equal(t, 0, len(tail)) - require.Equal(t, seq.MID(1761812502573), outQpr.IDs[0].ID.MID, "mid doesn't match, should convert to milliseconds") - require.Equal(t, 2, len(outQpr.Histogram)) - require.Equal(t, uint64(433), outQpr.Histogram[seq.MID(1761812502573)], "histogram bucket doesn't match") - require.Equal(t, uint64(743), outQpr.Histogram[seq.MID(1761812502463)], "histogram bucket doesn't match") + require.Equal(t, seq.MID(1761812502573000000), outQPR.IDs[0].ID.MID, "mid doesn't match, should convert to nanoseconds") + + require.Len(t, outQPR.Histogram, 2) + require.Equal(t, uint64(433), outQPR.Histogram[seq.MID(1761812502573000000)], "histogram bucket doesn't match") + require.Equal(t, uint64(743), outQPR.Histogram[seq.MID(1761812502463000000)], "histogram bucket doesn't match") - require.Equal(t, 1, len(outQpr.Aggs), "should have one AggregatableSamples") - agg := outQpr.Aggs[0] - require.Equal(t, 3, len(agg.SamplesByBin), "should have 3 samples in bin") + require.Len(t, outQPR.Aggs, 1, "should have one AggregatableSamples") + agg := outQPR.Aggs[0] + require.Len(t, agg.SamplesByBin, 3, "should have 3 samples in bin") notExistsBin := seq.AggBin{Token: "_not_exists"} require.Equal(t, int64(1), agg.SamplesByBin[notExistsBin].Total, "bucket doesn't match") - bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953)} + bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953000000)} require.Equal(t, int64(1), agg.SamplesByBin[bin1].Total, "bucket doesn't match") - bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456)} + bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456000000)} require.Equal(t, int64(2), agg.SamplesByBin[bin2].Total, "bucket doesn't match") } diff --git a/cmd/distribution/main.go b/cmd/distribution/main.go index c8caad0b..527c484f 100644 --- a/cmd/distribution/main.go +++ b/cmd/distribution/main.go @@ -183,8 +183,8 @@ func main() { zap.String("name", info.Name()), zap.String("ver", info.Ver), zap.Uint32("docs_total", info.DocsTotal), - zap.String("from", util.MsTsToESFormat(uint64(info.From))), - zap.String("to", util.MsTsToESFormat(uint64(info.To))), + zap.String("from", util.NsTsToESFormat(uint64(info.From))), + zap.String("to", util.NsTsToESFormat(uint64(info.To))), zap.String("creation_time", util.MsTsToESFormat(info.CreationTime)), ) } diff --git a/config/frac_version.go b/config/frac_version.go index 073235d9..d3ff1b14 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -11,4 +11,4 @@ const ( BinaryDataV2 ) -const CurrentFracVersion = BinaryDataV1 +const CurrentFracVersion = BinaryDataV2 diff --git a/docs/en/internal/common.md b/docs/en/internal/common.md index 899670bc..b8f6eec1 100644 --- a/docs/en/internal/common.md +++ b/docs/en/internal/common.md @@ -3,7 +3,7 @@ ## Common * ID: MID-RID . -* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store. +* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store. * RID - random part of ID, generated by ingestor before sending to store. * docParam - link of ID and block position, position of doc in block. * Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file. diff --git a/docs/en/internal/search.md b/docs/en/internal/search.md index f59331d9..34dfaaf1 100644 --- a/docs/en/internal/search.md +++ b/docs/en/internal/search.md @@ -26,7 +26,7 @@ Some basic overview of nodes: > > **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid. > -> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine. +> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine. > > **RID** (random ID) - random part of an id diff --git a/docs/ru/internal/common.md b/docs/ru/internal/common.md index 899670bc..b8f6eec1 100644 --- a/docs/ru/internal/common.md +++ b/docs/ru/internal/common.md @@ -3,7 +3,7 @@ ## Common * ID: MID-RID . -* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store. +* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store. * RID - random part of ID, generated by ingestor before sending to store. * docParam - link of ID and block position, position of doc in block. * Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file. diff --git a/docs/ru/internal/search.md b/docs/ru/internal/search.md index f59331d9..34dfaaf1 100644 --- a/docs/ru/internal/search.md +++ b/docs/ru/internal/search.md @@ -26,7 +26,7 @@ Some basic overview of nodes: > > **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid. > -> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine. +> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine. > > **RID** (random ID) - random part of an id diff --git a/frac/common/info.go b/frac/common/info.go index 81bd34eb..69121408 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -1,6 +1,7 @@ package common import ( + "encoding/json" "fmt" "math" "path" @@ -80,7 +81,7 @@ func (s *Info) BuildDistribution(mids []uint64) { } func (s *Info) InitEmptyDistribution() bool { - from := time.UnixMilli(int64(s.From)) + from := s.From.Time() creationTime := time.UnixMilli(int64(s.CreationTime)) if creationTime.Sub(from) < DistributionSpreadThreshold { // no big spread in past return false @@ -117,3 +118,35 @@ func (s *Info) IsIntersecting(from, to seq.MID) bool { // check with distribution return s.Distribution.IsIntersecting(from, to) } + +// MarshalJSON implements custom JSON marshaling to always store From and To in milliseconds +func (s *Info) MarshalJSON() ([]byte, error) { + type TmpInfo Info // type alias to avoid infinite recursion + + tmp := TmpInfo(*s) + + // We convert "from" and "to" to milliseconds in order to guarantee we can rollback on deploy. + // When converting nanos to millis we must round "from" down (floor) and round "to" up (ceiling). + // This guarantees that a fraction time range (checked on search with Contains and IsIntersecting methods) is not narrowed down, + // and we do not lose messages on search. + tmp.From = seq.MID(seq.MIDToMillis(s.From)) + tmp.To = seq.MID(seq.MIDToCeilingMillis(s.To)) + + return json.Marshal(tmp) +} + +// UnmarshalJSON implements custom JSON unmarshaling to convert From and To from milliseconds to nanoseconds +func (s *Info) UnmarshalJSON(data []byte) error { + type TmpInfo Info // type alias to avoid infinite recursion + var tmp TmpInfo + + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + + *s = Info(tmp) + s.From = seq.MillisToMID(uint64(tmp.From)) + s.To = seq.MillisToMID(uint64(tmp.To)) + return nil +} diff --git a/frac/common/info_test.go b/frac/common/info_test.go new file mode 100644 index 00000000..b22eb6ab --- /dev/null +++ b/frac/common/info_test.go @@ -0,0 +1,118 @@ +package common + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestInfo_MarshalJSON(t *testing.T) { + info := &Info{ + Path: "test-frac", + Ver: "2", + DocsTotal: 100, + DocsOnDisk: 1000, + DocsRaw: 2000, + MetaOnDisk: 500, + IndexOnDisk: 1500, + From: seq.MID(1761812502000000000), + To: seq.MID(1761812503000000000), + CreationTime: 1666193044479, + SealingTime: 1666193045000, + } + + jsonBytes, err := json.Marshal(info) + require.NoError(t, err) + + var jsonMap map[string]interface{} + err = json.Unmarshal(jsonBytes, &jsonMap) + require.NoError(t, err) + + fromRaw, ok := jsonMap["from"].(float64) + require.True(t, ok, "from should be a number") + assert.Equal(t, float64(1761812502000), fromRaw, "should scale from from millis on marshal") + toRaw, ok := jsonMap["to"].(float64) + require.True(t, ok, "to should be a number") + assert.Equal(t, float64(1761812503000), toRaw, "should scale from to millis on marshal") + + // validate that original fields are not changed while marshaling (safety check) + assert.Equal(t, seq.MID(1761812502000000000), info.From, "must not change while marshaling") + assert.Equal(t, seq.MID(1761812503000000000), info.To, "must not change while marshaling") +} + +func TestInfo_UnmarshalJSON(t *testing.T) { + jsonData := `{ + "name": "test-frac", + "ver": "2", + "docs_total": 100, + "docs_on_disk": 1000, + "docs_raw": 2000, + "meta_on_disk": 500, + "index_on_disk": 1500, + "from": 1761812502000, + "to": 1761812503000, + "creation_time": 1666193044479, + "sealing_time": 1666193045000 + }` + + var info Info + err := json.Unmarshal([]byte(jsonData), &info) + require.NoError(t, err) + + assert.Equal(t, seq.MID(1761812502000000000), info.From, "should scale to nanoseconds") + assert.Equal(t, seq.MID(1761812503000000000), info.To, "should scale to nanoseconds") + assert.Equal(t, "test-frac", info.Path) + assert.Equal(t, uint32(100), info.DocsTotal) +} + +func TestInfo_MarshalUnmarshal(t *testing.T) { + original := &Info{ + Path: "test-frac", + Ver: "2", + DocsTotal: 100, + DocsOnDisk: 1000, + DocsRaw: 2000, + MetaOnDisk: 500, + IndexOnDisk: 1500, + From: seq.MID(1761812502000000000), + To: seq.MID(1761812503000000000), + CreationTime: 1666193044479, + SealingTime: 1666193045000, + } + + jsonBytes, err := json.Marshal(original) + require.NoError(t, err) + + var unmarshaled Info + err = json.Unmarshal(jsonBytes, &unmarshaled) + require.NoError(t, err) + + assert.EqualExportedValues(t, original, &unmarshaled, "should match after marshal/unmarshal") +} + +func TestInfo_MarshalUnmarshalWithNanos(t *testing.T) { + original := &Info{ + Path: "test-frac", + Ver: "2", + From: seq.MID(1761812502000000777), + To: seq.MID(1761812503000000777), + CreationTime: 1666193044479, + SealingTime: 1666193045000, + } + + jsonBytes, err := json.Marshal(original) + require.NoError(t, err) + + var unmarshaled Info + err = json.Unmarshal(jsonBytes, &unmarshaled) + require.NoError(t, err) + + // we can't represent nanos in millis while saving, so "from" is floored (rounded down) to near millisecond, + // while "to" is ceiled (rounded up) to near millisecond + assert.Equal(t, seq.MID(1761812502000000000), unmarshaled.From) + assert.Equal(t, seq.MID(1761812503001000000), unmarshaled.To) +} diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 701e71b1..b8b41159 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -457,6 +457,36 @@ func (s *FractionTestSuite) TestSearchFromTo() { assertSearch(`NOT trace_id:0 AND NOT trace_id:2`, 3, 5, []int{5, 4, 3}) } +// TestSearchFromToNanoseconds tests if SearchParams "from" and "to" params can be specified up to nanoseconds since they are of seq.MID type. +// However, seq-db API doesn't support searching with queries with "from" and "to" specified in nanos. Only millis are supported. +func (s *FractionTestSuite) TestSearchFromToNanoseconds() { + docs := []string{ + /*0*/ `{"timestamp":"2000-01-01T13:00:00.000000000Z","message":"bad","level":"1","trace_id":"0","service":"0"}`, + /*1*/ `{"timestamp":"2000-01-01T13:00:00.000000001Z","message":"good","level":"2","trace_id":"0","service":"1"}`, + /*2*/ `{"timestamp":"2000-01-01T13:00:00.000000002Z","message":"bad","level":"3","trace_id":"0","service":"2"}`, + /*3*/ `{"timestamp":"2000-01-01T13:00:00.000000003Z","message":"good","level":"4","trace_id":"1","service":"0"}`, + /*4*/ `{"timestamp":"2000-01-01T13:00:00.000000004Z","message":"bad","level":"5","trace_id":"1","service":"1"}`, + /*5*/ `{"timestamp":"2000-01-01T13:00:00.000000005Z","message":"good","level":"6","trace_id":"1","service":"2"}`, + /*6*/ `{"timestamp":"2000-01-01T13:00:00.000000006Z","message":"bad","level":"7","trace_id":"2","service":"0"}`, + /*7*/ `{"timestamp":"2000-01-01T13:00:00.000000007Z","message":"good","level":"8","trace_id":"2","service":"1"}`, + } + + s.insertDocuments(docs) + + assertSearch := func(query string, fromOffset, toOffset int, expectedIndexes []int) { + s.AssertSearch(s.query( + query, + withFrom(fmt.Sprintf("2000-01-01T13:00:00.000000%03dZ", fromOffset)), + withTo(fmt.Sprintf("2000-01-01T13:00:00.000000%03dZ", toOffset))), + docs, expectedIndexes) + } + + assertSearch(`message:good`, 0, 7, []int{7, 5, 3, 1}) + assertSearch(`message:bad`, 0, 7, []int{6, 4, 2, 0}) + assertSearch(`message:good`, 0, 6, []int{5, 3, 1}) + assertSearch(`message:bad`, 1, 7, []int{6, 4, 2}) +} + func (s *FractionTestSuite) TestSearchWithLimit() { docs := []string{ /*0*/ `{"timestamp":"2000-01-01T13:00:00.000Z","message":"bad","level":"1","trace_id":"0","service":"0"}`, @@ -1026,6 +1056,112 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { s.AssertSearch(s.query("level:5", withLimit(100)), docs, level5Indexes[:100]) } +func (s *FractionTestSuite) TestIntersectingNanoseconds() { + docs := []string{ + `{"timestamp":"2000-01-01T13:00:00.000000000Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:00:00.000000001Z","message":"good","level":"2"}`, + `{"timestamp":"2000-01-01T13:00:00.000000002Z","message":"ok","level":"1"}`, + `{"timestamp":"2000-01-01T13:00:00.000000003Z","message":"err","level":"2"}`, + `{"timestamp":"2000-01-01T13:00:00.000000004Z","message":"success","level":"3"}`, + `{"timestamp":"2000-01-01T13:00:00.001000000Z","message":"err","level":"2"}`, + `{"timestamp":"2000-01-01T13:00:00.001000001Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:00:00.001000002Z","message":"good","level":"2"}`, + `{"timestamp":"2000-01-01T13:00:00.002000000Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:00:00.002000000Z","message":"err","level":"1"}`, + } + + s.insertDocuments(docs) + + s.Require().Equal(uint64(946731600000000000), uint64(s.fraction.Info().From)) + s.Require().Equal(uint64(946731600002000000), uint64(s.fraction.Info().To)) + + s.Require().True(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T12:59:59.000000000Z")), + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000000000Z"))), + "must intersect at info.From") + // 1 ns before the fraction range. Should not overlap, since MID distribution is not built for fractions with short lifetime, + // and it only covers the last 24h from now + s.Require().False(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T12:59:59.000000000Z")), + seq.TimeToMID(mustParseTime("2000-01-01T12:59:59.999999999Z"))), + "must not overlap (outside of range)") + // overlaps at the only point at info.To + s.Require().True(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.002000000Z")), + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.999999999Z"))), + "must intersect at info.To") + s.Require().False(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.002000001Z")), + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.999999999Z"))), + "must not intersect (1 ns outside of range)") + s.Require().True(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T12:59:59.999999999Z")), + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000000001Z"))), + "must intersect due to overlapping") + s.Require().True(s.fraction.IsIntersecting( + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.001000000Z")), + seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.999999999Z"))), + "must intersect due to overlapping") + + // double check for seq.MID built from raw nanoseconds + s.Require().True(s.fraction.IsIntersecting(seq.MID(946731500000000000), seq.MID(946731600000000000))) + s.Require().True(s.fraction.IsIntersecting(seq.MID(946731600002000000), seq.MID(946731699999999999))) +} + +func (s *FractionTestSuite) TestContainsWithMIDDistribution() { + now := time.Now().Truncate(time.Minute) + docs := []string{ + fmt.Sprintf(`{"timestamp":%q,"message":"apple juice"}`, now.Add(-60*time.Minute).Format(time.RFC3339Nano)), + fmt.Sprintf(`{"timestamp":%q,"message":"orange juice"}`, now.Add(-61*time.Minute).Format(time.RFC3339Nano)), + fmt.Sprintf(`{"timestamp":%q,"message":"cider"}`, now.Add(-65*time.Minute).Format(time.RFC3339Nano)), + fmt.Sprintf(`{"timestamp":%q,"message":"wine"}`, now.Add(-123*time.Minute).Format(time.RFC3339Nano)), + fmt.Sprintf(`{"timestamp":%q,"message":"cola"}`, now.Add(-365*time.Minute).Format(time.RFC3339Nano)), + fmt.Sprintf(`{"timestamp":%q,"message":"cola"}`, now.Add(-30*time.Hour).Format(time.RFC3339Nano)), + } + + s.insertDocuments(docs) + + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-60 * time.Minute)))) + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-61 * time.Minute)))) + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-123 * time.Minute)))) + // also true, MID distribution bucket is 1 minute + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-60 * time.Minute).Add(-30 * time.Second)))) + // contains=true: outside MID distribution but within from-to range + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-27 * time.Hour)))) + s.Require().True(s.fraction.Contains(seq.TimeToMID(now.Add(-30 * time.Hour)))) + // contains=false: outside MID distribution AND outside from-to range + s.Require().False(s.fraction.Contains(seq.TimeToMID(now.Add(-30 * time.Hour).Add(-1 * time.Minute)))) +} + +func (s *FractionTestSuite) TestContainsNanoseconds() { + docs := []string{ + `{"timestamp":"2000-01-01T13:00:00.000000000Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:00:00.000000001Z","message":"good","level":"2"}`, + `{"timestamp":"2000-01-01T13:00:00.000000004Z","message":"success","level":"3"}`, + `{"timestamp":"2000-01-01T13:10:00.000000000Z","message":"err","level":"2"}`, + `{"timestamp":"2000-01-01T13:20:00.000000001Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:30:00.000000002Z","message":"good","level":"2"}`, + `{"timestamp":"2000-01-01T13:40:00.000000001Z","message":"bad","level":"1"}`, + `{"timestamp":"2000-01-01T13:50:00.000000002Z","message":"err","level":"1"}`, + } + + s.insertDocuments(docs) + + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000000000Z"))), "frac must contain first doc") + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000000001Z"))), "frac must contain second doc") + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:10:00.000000000Z"))), "frac must contain third doc") + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:50:00.000000002Z"))), "frac must contain last doc") + + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:30:00.000000002Z"))), "frac must contain sixth doc") + // round doc nano to milli, still Contains returns true + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:30:00.000000000Z"))), "frac must contain sixth doc (rounded to milli)") + + // still Contains returns true even though the timestamp is 5 minute far from nearest doc + // MID distribution only covers the last 24h, so Contains return true here + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:15:00.000000000Z")))) + s.Require().True(s.fraction.Contains(seq.TimeToMID(mustParseTime("2000-01-01T13:25:00.000000000Z")))) +} + func (s *FractionTestSuite) TestMIDDistribution() { now := time.Now().Truncate(time.Minute) docs := []string{ @@ -1073,8 +1209,8 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(300), "doc on disk doesn't match. actual value: %d", info.DocsOnDisk) s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match") - s.Require().Equal(seq.MID(946731625000), info.From, "from doesn't match") - s.Require().Equal(seq.MID(946731654000), info.To, "to doesn't match") + s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match") + s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match") switch s.fraction.(type) { case *Active: @@ -1084,7 +1220,7 @@ func (s *FractionTestSuite) TestFractionInfo() { case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600), - "index on disk doesn't match. actual value: %d", info.MetaOnDisk) + "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500), @@ -1117,7 +1253,7 @@ func (s *FractionTestSuite) query(queryString string, options ...searchOption) * func withFrom(from string) searchOption { return func(p *processor.SearchParams) error { - t, err := time.Parse(time.RFC3339, from) + t, err := time.Parse(time.RFC3339Nano, from) if err != nil { return err } @@ -1128,7 +1264,7 @@ func withFrom(from string) searchOption { func withTo(to string) searchOption { return func(p *processor.SearchParams) error { - t, err := time.Parse(time.RFC3339, to) + t, err := time.Parse(time.RFC3339Nano, to) if err != nil { return err } @@ -1175,6 +1311,14 @@ func withAggQuery(aggQuery processor.AggQuery) searchOption { } } +func mustParseTime(timeStr string) time.Time { + t, err := time.Parse(time.RFC3339Nano, timeStr) + if err != nil { + panic(fmt.Sprintf("could not parse timestamp %s", timeStr)) + } + return t +} + func (s *FractionTestSuite) AssertSearch(queryObject interface{}, originalDocs []string, expectedIndexes []int) { switch q := queryObject.(type) { case string: diff --git a/frac/processor/aggregator.go b/frac/processor/aggregator.go index 4e607df1..59dd0d52 100644 --- a/frac/processor/aggregator.go +++ b/frac/processor/aggregator.go @@ -434,6 +434,6 @@ func provideExtractTimeFunc(sw *stopwatch.Stopwatch, idx idsIndex, interval int6 timer.Start() mid := idx.GetMID(seq.LID(lid)) timer.Stop() - return mid - (mid % seq.MID(interval)) + return mid - (mid % seq.MillisToMID(uint64(interval))) }) } diff --git a/frac/processor/search.go b/frac/processor/search.go index 9dd472c7..6df32c07 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -135,12 +135,13 @@ func convertHistToMap(params SearchParams, hist []uint64) map[seq.MID]uint64 { return nil } res := make(map[seq.MID]uint64, len(hist)) - bucket := params.From - params.From%seq.MID(params.HistInterval) + histIntervalMID := seq.MillisToMID(params.HistInterval) + bucket := params.From - params.From%histIntervalMID for _, cnt := range hist { if cnt > 0 { res[bucket] = cnt } - bucket += seq.MID(params.HistInterval) + bucket += histIntervalMID } return res } @@ -157,12 +158,14 @@ func iterateEvalTree( needScanAllRange := params.IsScanAllRequest() var ( - histBase uint64 - histogram []uint64 + histBase uint64 + histogram []uint64 + histInterval seq.MID ) if hasHist { - histBase = uint64(params.From) / params.HistInterval - histSize := uint64(params.To)/params.HistInterval - histBase + 1 + histInterval = seq.MillisToMID(params.HistInterval) + histBase = uint64(params.From) / uint64(histInterval) + histSize := uint64(params.To)/uint64(histInterval) - histBase + 1 histogram = make([]uint64, histSize) } @@ -206,7 +209,7 @@ func iterateEvalTree( zap.Time("mid", mid.Time())) continue } - bucketIndex := uint64(mid)/params.HistInterval - histBase + bucketIndex := uint64(mid)/uint64(histInterval) - histBase histogram[bucketIndex]++ } @@ -274,7 +277,7 @@ func MergeQPRs(qprs []*seq.QPR, params SearchParams) *seq.QPR { } qpr := qprs[0] if len(qprs) > 1 { - seq.MergeQPRs(qpr, qprs[1:], params.Limit, seq.MID(params.HistInterval), params.Order) + seq.MergeQPRs(qpr, qprs[1:], params.Limit, seq.MillisToMID(params.HistInterval), params.Order) } return qpr } diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index 3c656da3..f17f1be5 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -22,19 +22,11 @@ func (b BlockMIDs) Pack(dst []byte) []byte { } func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion) error { - values, err := unpackRawIDsVarint(data, b.Values) + values, err := unpackRawMIDsVarint(data, b.Values, fracVersion) if err != nil { return err } b.Values = values - - // v2 (nanosecond MIDs) compatibility - convert nanos to millis - if fracVersion >= config.BinaryDataV2 { - for i := range b.Values { - b.Values[i] = uint64(seq.NanosToMID(b.Values[i])) - } - } - return nil } @@ -85,6 +77,35 @@ func (b *BlockParams) Unpack(data []byte) error { return nil } +// unpackRawMIDsVarint is a dedicated method for unpacking delta encoded MIDs. The reason a dedicated method exists +// is that we want to unpack values and potentially convert legacy frac version in one pass. +func unpackRawMIDsVarint(src []byte, dst []uint64, fracVersion config.BinaryDataVersion) ([]uint64, error) { + dst = dst[:0] + id := uint64(0) + for len(src) != 0 { + udelta, n := binary.Uvarint(src) + if n <= 0 { + return nil, errors.New("varint decoded with error") + } + + delta := int64(udelta >> 1) + if udelta&1 != 0 { + delta = ^delta + } + + id += uint64(delta) + if fracVersion >= config.BinaryDataV2 { + dst = append(dst, id) + } else { + // Legacy format - scale millis to nanos + dst = append(dst, uint64(seq.MillisToMID(id))) + } + + src = src[n:] + } + return dst, nil +} + func unpackRawIDsVarint(src []byte, dst []uint64) ([]uint64, error) { dst = dst[:0] id := uint64(0) diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index e863c46e..ae639862 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -98,9 +98,9 @@ func (l *Loader) loadIDs(fracVersion config.BinaryDataVersion) (idsTable seqids. var mid seq.MID if fracVersion < config.BinaryDataV2 { - mid = seq.MID(header.GetExt1()) + mid = seq.MillisToMID(header.GetExt1()) } else { - mid = seq.NanosToMID(header.GetExt1()) + mid = seq.MID(header.GetExt1()) } idsTable.MinBlockIDs = append(idsTable.MinBlockIDs, seq.ID{ diff --git a/fracmanager/frac_info_cache_test.go b/fracmanager/frac_info_cache_test.go index 126171ad..397c37dc 100644 --- a/fracmanager/frac_info_cache_test.go +++ b/fracmanager/frac_info_cache_test.go @@ -8,9 +8,10 @@ import ( "sync" "testing" - insaneJSON "github.com/ozontech/insane-json" "github.com/stretchr/testify/assert" + insaneJSON "github.com/ozontech/insane-json" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" @@ -282,7 +283,10 @@ func TestFracInfoSavedToCache(t *testing.T) { totalSize := uint64(0) cnt := 1 for totalSize < maxSize { - addDummyDoc(t, fm, dp, seq.SimpleID(cnt)) + // increase doc id by 1000000 (1 milli in nanos) instead of 1 + // otherwise all docs fall into same millisecond and test breaks + id := seq.SimpleID(int64(seq.MillisToMID(uint64(cnt)))) + addDummyDoc(t, fm, dp, id) cnt++ fracInstance := rotateAndSeal(fm) totalSize += fracInstance.Info().FullSize() @@ -354,7 +358,7 @@ func TestExtraFractionsRemoved(t *testing.T) { infos := map[string]*common.Info{} for i := 1; i < times+1; i++ { - addDummyDoc(t, fm, dp, seq.SimpleID(i)) + addDummyDoc(t, fm, dp, seq.SimpleID(int64(i))) fracInstance := rotateAndSeal(fm) info := fracInstance.Info() q.Add(item{ @@ -407,7 +411,7 @@ func TestMissingCacheFilesDeleted(t *testing.T) { defer insaneJSON.Release(metaRoot) for i := 1; i < times+1; i++ { - addDummyDoc(t, fm, dp, seq.SimpleID(i)) + addDummyDoc(t, fm, dp, seq.SimpleID(int64(i))) rotateAndSeal(fm) dp.TryReset() } diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index c8e7088f..ace4dc10 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -109,7 +109,7 @@ func TestCapacityExceeded(t *testing.T) { dp := indexer.NewTestDocProvider() makeSealedFrac := func(fm *FracManager, docsPerFrac int) { for i := 0; i < docsPerFrac; i++ { - addDummyDoc(t, fm, dp, seq.SimpleID(id)) + addDummyDoc(t, fm, dp, seq.SimpleID(int64(id))) id++ } fm.seal(fm.rotate()) diff --git a/fracmanager/loader_test.go b/fracmanager/loader_test.go index de92ad19..f1039885 100644 --- a/fracmanager/loader_test.go +++ b/fracmanager/loader_test.go @@ -31,7 +31,7 @@ func appendDocs(t *testing.T, active *frac.Active, docCount int) { doc := []byte("{\"timestamp\": 0, \"message\": \"msg\"}") docRoot, err := insaneJSON.DecodeBytes(doc) assert.NoError(t, err) - dp.Append(doc, docRoot, seq.SimpleID(i), "service:100500", "k8s_pod", "_all_:") + dp.Append(doc, docRoot, seq.SimpleID(int64(i)), "service:100500", "k8s_pod", "_all_:") } docs, metas := dp.Provide() diff --git a/fracmanager/searcher.go b/fracmanager/searcher.go index 82a202c5..e6aa1619 100644 --- a/fracmanager/searcher.go +++ b/fracmanager/searcher.go @@ -61,7 +61,7 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params return nil, err } - seq.MergeQPRs(total, subQPRs, origLimit, seq.MID(params.HistInterval), params.Order) + seq.MergeQPRs(total, subQPRs, origLimit, seq.MillisToMID(params.HistInterval), params.Order) // reduce the limit on the number of ensured docs in response params.Limit = origLimit - calcEnsuredIDsCount(total.IDs, remainingFracs, params.Order) diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index e584e9a1..eceff4b4 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -336,7 +336,7 @@ func newFakeQPRwithTotal(ids []int, total uint64) *seq.QPR { func newFakeQPR(ids ...int) *seq.QPR { idsWithSource := make(seq.IDSources, len(ids)) for i, mid := range ids { - idsWithSource[i] = seq.IDSource{ID: seq.SimpleID(mid)} + idsWithSource[i] = seq.IDSource{ID: seq.SimpleID(int64(mid))} } return &seq.QPR{IDs: idsWithSource} } @@ -344,7 +344,7 @@ func newFakeQPR(ids ...int) *seq.QPR { func newFakeQPRWithHist(ids []int, histogram map[seq.MID]uint64) *seq.QPR { idsWithSource := make(seq.IDSources, len(ids)) for i, mid := range ids { - idsWithSource[i] = seq.IDSource{ID: seq.SimpleID(mid)} + idsWithSource[i] = seq.IDSource{ID: seq.SimpleID(int64(mid))} } return &seq.QPR{ IDs: idsWithSource, diff --git a/indexer/meta_data.go b/indexer/meta_data.go index 594d0034..aa30d646 100644 --- a/indexer/meta_data.go +++ b/indexer/meta_data.go @@ -35,7 +35,7 @@ func (m *MetaData) MarshalBinaryTo(b []byte) []byte { b = binary.LittleEndian.AppendUint16(b, metadataMagic) // Append current binary version of the metadata. - const version = 1 + const version = 2 b = binary.LittleEndian.AppendUint16(b, version) // Encode seq.ID. @@ -77,17 +77,17 @@ func (m *MetaData) UnmarshalBinary(b []byte) error { } func (m *MetaData) unmarshalVersion1(b []byte) error { - return m.unmarshal(b) -} - -func (m *MetaData) unmarshalVersion2(b []byte) error { if err := m.unmarshal(b); err != nil { return err } - m.ID.MID = seq.NanosToMID(uint64(m.ID.MID)) + m.ID.MID = seq.MillisToMID(uint64(m.ID.MID)) return nil } +func (m *MetaData) unmarshalVersion2(b []byte) error { + return m.unmarshal(b) +} + func (m *MetaData) unmarshal(b []byte) error { // Decode seq.ID. m.ID.MID = seq.MID(binary.LittleEndian.Uint64(b)) diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index 4361eff4..e38a9180 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -88,7 +88,7 @@ func TestProcessDocuments(t *testing.T) { now := time.Now().UTC() - id := seq.SimpleID(int(now.UnixMilli())) + id := seq.SimpleID(now.UnixNano()) type TestPayload struct { InDocs []string diff --git a/proxy/search/async.go b/proxy/search/async.go index d9ef5241..25996cfa 100644 --- a/proxy/search/async.go +++ b/proxy/search/async.go @@ -55,7 +55,7 @@ func (si *Ingestor) StartAsyncSearch(ctx context.Context, r AsyncRequest) (Async From: r.From.UnixMilli(), To: r.To.UnixMilli(), Aggs: convertToAggsQuery(r.Aggregations), - HistogramInterval: int64(r.HistogramInterval), + HistogramInterval: int64(seq.MIDToMillis(r.HistogramInterval)), Retention: durationpb.New(r.Retention), WithDocs: r.WithDocs, Size: r.Size, @@ -173,16 +173,16 @@ func (si *Ingestor) FetchAsyncSearchResult( protocolVersion = config.ParseStoreProtocolVersion(protocolVersionValues[0]) } - if protocolVersion == config.StoreProtocolVersion2 { + if protocolVersion == config.StoreProtocolVersion1 { response := storeResp.Response for _, id := range response.IdSources { - id.Id.Mid = uint64(seq.NanosToMID(id.Id.Mid)) + id.Id.Mid = uint64(seq.MillisToMID(id.Id.Mid)) } if len(response.Histogram) > 0 { newHist := make(map[uint64]uint64, len(response.Histogram)) for mid, v := range response.Histogram { - newHist[uint64(seq.NanosToMID(mid))] = v + newHist[uint64(seq.MillisToMID(mid))] = v } response.Histogram = newHist } @@ -213,7 +213,7 @@ func (si *Ingestor) FetchAsyncSearchResult( fracsInQueue += int(sr.FracsQueue) fracsDone += int(sr.FracsDone) - histInterval = seq.MID(sr.HistogramInterval) + histInterval = seq.MillisToMID(uint64(sr.HistogramInterval)) ss := sr.Status.MustAsyncSearchStatus() pr.Status = mergeAsyncSearchStatus(pr.Status, ss) @@ -417,7 +417,7 @@ func (si *Ingestor) GetAsyncSearchesList( From: s.From.AsTime(), To: s.To.AsTime(), Aggregations: buildRequestAggs(s.Aggs), - HistogramInterval: seq.MID(s.HistogramInterval), + HistogramInterval: seq.MillisToMID(uint64(s.HistogramInterval)), WithDocs: s.WithDocs, Size: s.Size, } @@ -479,7 +479,7 @@ func buildRequestAggs(in []*storeapi.AggQuery) []AggQuery { GroupBy: agg.GroupBy, Func: agg.Func.MustAggFunc(), Quantiles: agg.Quantiles, - Interval: seq.MID(agg.Interval), + Interval: seq.MillisToMID(uint64(agg.Interval)), }) } return reqAggs diff --git a/proxy/search/ingestor.go b/proxy/search/ingestor.go index f1694659..1dce3039 100644 --- a/proxy/search/ingestor.go +++ b/proxy/search/ingestor.go @@ -513,7 +513,7 @@ func responseToQPR(resp *storeapi.SearchResponse, source uint64, explain bool) * pbhist := bin.Hist tbin := seq.AggBin{ - MID: seq.MID(bin.Ts.AsTime().UnixMilli()), + MID: seq.MID(bin.Ts.AsTime().UnixNano()), Token: bin.Label, } @@ -635,22 +635,22 @@ func (si *Ingestor) searchHost(ctx context.Context, req *storeapi.SearchRequest, return nil, 0, err } - // Check the store's protocol version from response header - // If header indicates protocol version 2 (MID in nanoseconds), then convert to milliseconds + // If header indicates protocol version 1 (MID in milliseconds), then convert to nanoseconds protocolVersion := config.StoreProtocolVersion1 if storeProtocolValues := md.Get(consts.StoreProtocolVersionHeader); len(storeProtocolValues) > 0 { protocolVersion = config.ParseStoreProtocolVersion(storeProtocolValues[0]) } - if protocolVersion == config.StoreProtocolVersion2 { + // Convert legacy store response (protocol version 1) to nanoseconds MID + if protocolVersion == config.StoreProtocolVersion1 { for _, id := range data.IdSources { - id.Id.Mid = uint64(seq.NanosToMID(id.Id.Mid)) + id.Id.Mid = uint64(seq.MillisToMID(id.Id.Mid)) } if len(data.Histogram) > 0 { newHist := make(map[uint64]uint64, len(data.Histogram)) for mid, v := range data.Histogram { - newHist[uint64(seq.NanosToMID(mid))] = v + newHist[uint64(seq.MillisToMID(mid))] = v } data.Histogram = newHist } diff --git a/proxy/search/search_request.go b/proxy/search/search_request.go index 5f8b9141..88061e67 100644 --- a/proxy/search/search_request.go +++ b/proxy/search/search_request.go @@ -32,11 +32,11 @@ type SearchRequest struct { func (sr *SearchRequest) GetAPISearchRequest() *storeapi.SearchRequest { return &storeapi.SearchRequest{ Query: util.ByteToStringUnsafe(sr.Q), - From: int64(sr.From), - To: int64(sr.To), + From: int64(seq.MIDToMillis(sr.From)), + To: int64(seq.MIDToMillis(sr.To)), Size: int64(sr.Size), Offset: int64(sr.Offset), - Interval: int64(sr.Interval), + Interval: int64(seq.MIDToMillis(sr.Interval)), Aggs: convertToAggsQuery(sr.AggQ), Explain: sr.Explain, WithTotal: sr.WithTotal, @@ -63,7 +63,7 @@ func convertToAggsQuery(aggs []AggQuery) []*storeapi.AggQuery { buf[i].Func = storeapi.AggFunc(query.Func) buf[i].Quantiles = query.Quantiles - buf[i].Interval = int64(query.Interval) + buf[i].Interval = int64(seq.MIDToMillis(query.Interval)) aggQ[i] = &buf[i] } diff --git a/proxy/search/streaming_doc.go b/proxy/search/streaming_doc.go index b177fd0d..0d27762a 100644 --- a/proxy/search/streaming_doc.go +++ b/proxy/search/streaming_doc.go @@ -32,9 +32,11 @@ func unpackDoc(data []byte, source uint64, protocolVersion config.StoreProtocolV block := storage.DocBlock(data) mid := block.GetExt1() - if protocolVersion == config.StoreProtocolVersion2 { - mid = uint64(seq.NanosToMID(mid)) + // Convert from milliseconds to nanoseconds if store (protocol version 1) operates in milliseconds + if protocolVersion == config.StoreProtocolVersion1 { + mid = uint64(seq.MillisToMID(mid)) } + doc := StreamingDoc{ ID: seq.ID{ MID: seq.MID(mid), diff --git a/proxyapi/grpc_async_search.go b/proxyapi/grpc_async_search.go index 7e86fa26..b58e1f14 100644 --- a/proxyapi/grpc_async_search.go +++ b/proxyapi/grpc_async_search.go @@ -51,7 +51,7 @@ func (g *grpcV1) StartAsyncSearch( From: r.GetQuery().GetFrom().AsTime(), To: r.GetQuery().GetTo().AsTime(), Aggregations: aggs, - HistogramInterval: seq.MID(histInterval.Milliseconds()), + HistogramInterval: seq.MID(histInterval.Nanoseconds()), WithDocs: r.WithDocs, Size: r.Size, }) diff --git a/proxyapi/grpc_complex_search_test.go b/proxyapi/grpc_complex_search_test.go index 07478c3e..43c1eec1 100644 --- a/proxyapi/grpc_complex_search_test.go +++ b/proxyapi/grpc_complex_search_test.go @@ -130,8 +130,8 @@ func prepareComplexSearchTestData(t *testing.T, cData cSearchTestCaseData) cSear Q: []byte(req.Query.Query), Size: int(req.Size), Offset: int(req.Offset), - From: seq.MID(req.Query.From.AsTime().UnixMilli()), - To: seq.MID(req.Query.To.AsTime().UnixMilli()), + From: seq.MID(req.Query.From.AsTime().UnixNano()), + To: seq.MID(req.Query.To.AsTime().UnixNano()), WithTotal: req.WithTotal, ShouldFetch: true, } diff --git a/proxyapi/grpc_export_test.go b/proxyapi/grpc_export_test.go index e2c607f9..86b95a15 100644 --- a/proxyapi/grpc_export_test.go +++ b/proxyapi/grpc_export_test.go @@ -76,8 +76,8 @@ func prepareExportTestData(cData exportTestCaseData) exportTestData { Q: []byte(req.Query.Query), Offset: int(req.Offset), Size: int(req.Size), - From: seq.MID(req.Query.From.AsTime().UnixMilli()), - To: seq.MID(req.Query.To.AsTime().UnixMilli()), + From: seq.MID(req.Query.From.AsTime().UnixNano()), + To: seq.MID(req.Query.To.AsTime().UnixNano()), ShouldFetch: true, }, ret: siSearchRet{ diff --git a/proxyapi/grpc_fetch_test.go b/proxyapi/grpc_fetch_test.go index b2aa3e6b..a34ff1f6 100644 --- a/proxyapi/grpc_fetch_test.go +++ b/proxyapi/grpc_fetch_test.go @@ -43,7 +43,7 @@ func prepareFetchTestData(cData fetchTestCaseData) fetchTestData { docs := [][]byte{} apiDocs := make([]*seqproxyapi.Document, 0) for i := 0; i < cData.size; i++ { - id := seq.SimpleID(cData.startID + i) + id := seq.SimpleID(int64(cData.startID + i)) ids = append(ids, seq.IDSource{ID: id}) idsStr = append(idsStr, id.String()) if !cData.noResp { diff --git a/proxyapi/grpc_get_aggregation_test.go b/proxyapi/grpc_get_aggregation_test.go index 1cf8bbbc..251d25a3 100644 --- a/proxyapi/grpc_get_aggregation_test.go +++ b/proxyapi/grpc_get_aggregation_test.go @@ -92,8 +92,8 @@ func prepareGetAggregationTestData(t *testing.T, cData getAggregationTestCaseDat sr := &search.SearchRequest{ Explain: req.Query.Explain, Q: []byte(req.Query.Query), - From: seq.MID(req.Query.From.AsTime().UnixMilli()), - To: seq.MID(req.Query.To.AsTime().UnixMilli()), + From: seq.MID(req.Query.From.AsTime().UnixNano()), + To: seq.MID(req.Query.To.AsTime().UnixNano()), } if len(cData.aggQ) > 0 { for _, query := range cData.aggQ { diff --git a/proxyapi/grpc_get_histogram_test.go b/proxyapi/grpc_get_histogram_test.go index 5fdd91fe..f498d4ba 100644 --- a/proxyapi/grpc_get_histogram_test.go +++ b/proxyapi/grpc_get_histogram_test.go @@ -98,8 +98,8 @@ func prepareGetHistogramTestData(t *testing.T, cData getHistogramTestCaseData) g sr := &search.SearchRequest{ Explain: req.Query.Explain, Q: []byte(req.Query.Query), - From: seq.MID(req.Query.From.AsTime().UnixMilli()), - To: seq.MID(req.Query.To.AsTime().UnixMilli()), + From: seq.MID(req.Query.From.AsTime().UnixNano()), + To: seq.MID(req.Query.To.AsTime().UnixNano()), Interval: seq.DurationToMID(intervalDur), } siSearchMock = &siSearchMockData{ diff --git a/proxyapi/grpc_main_test.go b/proxyapi/grpc_main_test.go index e0d13dbb..4bd9caa2 100644 --- a/proxyapi/grpc_main_test.go +++ b/proxyapi/grpc_main_test.go @@ -197,7 +197,7 @@ func makeSearchRespData(size int) *testSearchResp { docs := make([][]byte, 0) respDocs := make([]*seqproxyapi.Document, 0) for i := 0; i < size; i++ { - id := seq.SimpleID(i) + id := seq.SimpleID(int64(i)) ids = append(ids, seq.IDSource{ID: id, Source: 0}) data := []byte("doc" + strconv.Itoa(i)) docs = append(docs, data) @@ -233,7 +233,7 @@ func makeGetHistRespData(interval string, totalSize, fromTs, toTs int64) (*testG docCnt := totalSize / int64(bucketsCnt) remainCnt := totalSize - docCnt*(int64(bucketsCnt)-1) bucketKey := fromTs - qprHist[seq.MID(bucketKey)] = uint64(remainCnt) + qprHist[seq.MillisToMID(uint64(bucketKey))] = uint64(remainCnt) ts := time.UnixMilli(bucketKey) bucket := &seqproxyapi.Histogram_Bucket{ DocCount: uint64(remainCnt), @@ -243,7 +243,7 @@ func makeGetHistRespData(interval string, totalSize, fromTs, toTs int64) (*testG for i := 1; i < bucketsCnt; i++ { bucketKey := fromTs + int64(i)*intervalMS ts := time.UnixMilli(bucketKey) - qprHist[seq.MID(bucketKey)] = uint64(docCnt) + qprHist[seq.MillisToMID(uint64(bucketKey))] = uint64(docCnt) bucket := &seqproxyapi.Histogram_Bucket{ DocCount: uint64(docCnt), Ts: timestamppb.New(ts), @@ -313,7 +313,7 @@ func makeExportRespData(size int) *testExportResp { docs := make([][]byte, size) resp := make([]*seqproxyapi.ExportResponse, size) for i := range size { - id := seq.SimpleID(i) + id := seq.SimpleID(int64(i)) ids[i] = seq.IDSource{ID: id, Source: 0} data := []byte("doc" + strconv.Itoa(i)) diff --git a/proxyapi/grpc_search_test.go b/proxyapi/grpc_search_test.go index 1340a21a..0f99932a 100644 --- a/proxyapi/grpc_search_test.go +++ b/proxyapi/grpc_search_test.go @@ -88,8 +88,8 @@ func prepareSearchTestData(t *testing.T, cData searchTestCaseData) searchTestDat Q: []byte(req.Query.Query), Size: int(req.Size), Offset: int(req.Offset), - From: seq.MID(req.Query.From.AsTime().UnixMilli()), - To: seq.MID(req.Query.To.AsTime().UnixMilli()), + From: seq.MID(req.Query.From.AsTime().UnixNano()), + To: seq.MID(req.Query.To.AsTime().UnixNano()), WithTotal: req.WithTotal, ShouldFetch: true, } diff --git a/proxyapi/grpc_v1.go b/proxyapi/grpc_v1.go index f23dc4c1..4bf27f9e 100644 --- a/proxyapi/grpc_v1.go +++ b/proxyapi/grpc_v1.go @@ -226,8 +226,8 @@ func (g *grpcV1) doSearch( proxyReq := &search.SearchRequest{ Q: []byte(req.Query.Query), - From: seq.MID(fromTime.UnixMilli()), - To: seq.MID(toTime.UnixMilli()), + From: seq.MID(fromTime.UnixNano()), + To: seq.MID(toTime.UnixNano()), Explain: req.Query.Explain, Size: int(req.Size), Offset: int(req.Offset), @@ -253,7 +253,7 @@ func (g *grpcV1) doSearch( err, ) } - proxyReq.Interval = seq.MID(intervalDuration.Milliseconds()) + proxyReq.Interval = seq.MID(intervalDuration.Nanoseconds()) } qpr, docsStream, _, err := g.searchIngestor.Search(ctx, proxyReq, tr) @@ -329,7 +329,7 @@ func convertAggsQuery(aggs []*seqproxyapi.AggQuery) ([]search.AggQuery, error) { ) } - aggQuery.Interval = seq.MID(interval.Milliseconds()) + aggQuery.Interval = seq.MID(interval.Nanoseconds()) result = append(result, aggQuery) } return result, nil diff --git a/seq/mids_distribution_test.go b/seq/mids_distribution_test.go index bda3b95f..8c93a377 100644 --- a/seq/mids_distribution_test.go +++ b/seq/mids_distribution_test.go @@ -17,7 +17,7 @@ func getTime(s string) time.Time { } func getMID(s string) MID { - return MID(getTime(s).UnixMilli()) + return MillisToMID(uint64(getTime(s).UnixMilli())) } func TestMIDsDistribution(t *testing.T) { diff --git a/seq/qpr.go b/seq/qpr.go index b2b1b772..3a8ab00d 100644 --- a/seq/qpr.go +++ b/seq/qpr.go @@ -399,6 +399,13 @@ func MergeQPRs(dst *QPR, qprs []*QPR, limit int, histInterval MID, order DocsOrd dst.IDs = ids[:l] } +// remove repetition from histogram +func removeHistogramRepetition(repetition IDSource, histogram map[MID]uint64, histInterval MID) { + bucket := repetition.ID.MID + bucket -= bucket % histInterval + histogram[bucket]-- +} + // removes repetitions from both ids and histogram func removeRepetitionsAdvanced(ids IDSources, histogram map[MID]uint64, histInterval MID) (IDSources, uint64) { if len(ids) == 0 { @@ -423,10 +430,3 @@ func removeRepetitionsAdvanced(ids IDSources, histogram map[MID]uint64, histInte return ids[:len(ids)-removeCount], uint64(removeCount) } - -// remove repetition from histogram -func removeHistogramRepetition(repetition IDSource, histogram map[MID]uint64, histInterval MID) { - bucket := repetition.ID.MID - bucket -= bucket % histInterval - histogram[bucket]-- -} diff --git a/seq/seq.go b/seq/seq.go index e406ca2a..2545c47a 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -15,17 +15,13 @@ type ID struct { RID RID } -type MID uint64 // milliseconds part of ID +type MID uint64 // nanoseconds part of ID type RID uint64 // random part of ID type LID uint32 // local id for a fraction func (m MID) Time() time.Time { - if uint64(m) <= math.MaxInt64 { - return time.UnixMilli(int64(m)) - } else { - // since MaxInt64 is 292278994 year in milliseconds, so we assume this MID is "infinite future" - return time.UnixMilli(math.MaxInt64) - } + nanosPerSecond := uint64(time.Second) + return time.Unix(int64(uint64(m)/nanosPerSecond), int64(uint64(m)%nanosPerSecond)) } func (d ID) String() string { @@ -48,7 +44,7 @@ func (d ID) Bytes() []byte { n := hex.Encode(hexBuf, numBuf) final := append(make([]byte, 0), hexBuf[:n]...) - final = append(final, '-') + final = append(final, '_') binary.LittleEndian.PutUint64(numBuf, uint64(d.RID)) n = hex.Encode(hexBuf, numBuf) @@ -91,10 +87,10 @@ func FromString(x string) (ID, error) { switch delimiter := x[16]; delimiter { case '_': - // new format, MID in nanoseconds. convert to milliseconds - id.MID = NanosToMID(binary.LittleEndian.Uint64(mid)) - case '-': id.MID = MID(binary.LittleEndian.Uint64(mid)) + case '-': + // legacy format, MID in millis. Scale to nanoseconds + id.MID = MillisToMID(binary.LittleEndian.Uint64(mid)) default: return id, fmt.Errorf("unknown delimiter %c", delimiter) } @@ -103,31 +99,50 @@ func FromString(x string) (ID, error) { return id, nil } -func SimpleID(i int) ID { +func SimpleID(i int64) ID { return ID{ MID: MID(i), RID: 0, } } -func NanosToMID(nanos uint64) MID { - return MID(nanos / uint64(time.Millisecond)) +func MillisToMID(millis uint64) MID { + if millis <= math.MaxUint64/uint64(time.Millisecond) { + return MID(millis * uint64(time.Millisecond)) + } else { + // math.MaxUint64/1000000 is 2554 year in unix time millisecond, so it's just an "infinite" future for us. + // We can't scale it to nanoseconds, so we just leave it as it is + return MID(millis) + } } func TimeToMID(t time.Time) MID { - return MID(t.UnixNano() / int64(time.Millisecond)) + return MID(t.UnixNano()) } func DurationToMID(d time.Duration) MID { - return MID(d / time.Millisecond) + return MID(d) } func MIDToTime(t MID) time.Time { return t.Time() } +func MIDToMillis(t MID) uint64 { + return uint64(t) / uint64(time.Millisecond) +} + +func MIDToCeilingMillis(t MID) uint64 { + millis := uint64(t) / uint64(time.Millisecond) + nanosPartOfMilli := uint64(t) % uint64(time.Millisecond) + if nanosPartOfMilli != 0 { + millis += 1 + } + return millis +} + func MIDToDuration(t MID) time.Duration { - return time.Duration(t) * time.Millisecond + return time.Duration(t) } func NewID(t time.Time, randomness uint64) ID { @@ -136,6 +151,7 @@ func NewID(t time.Time, randomness uint64) ID { return ID{MID: mid, RID: RID(randomness)} } +// String prints MID to ESFormat. Nanosecond part will not be printed. func (m MID) String() string { - return util.MsTsToESFormat(uint64(m)) + return util.NsTsToESFormat(uint64(m)) } diff --git a/seq/seq_test.go b/seq/seq_test.go index 537276b8..2265bce2 100644 --- a/seq/seq_test.go +++ b/seq/seq_test.go @@ -8,26 +8,58 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMIDFromString(t *testing.T) { +func TestLegacyMIDFromString(t *testing.T) { id, err := FromString("abaf05877b010000-2402dc02d60615cc") assert.NoError(t, err) - assert.Equal(t, MID(1630057901995), id.MID) + // converted legacy (milliseconds MID) to nanoseconds + assert.Equal(t, MID(1630057901995000000), id.MID) } -func TestNewMIDFromString(t *testing.T) { +func TestMIDFromString(t *testing.T) { id, err := FromString("abaf05877b010000_2402dc02d60615cc") assert.NoError(t, err) - // new format, should convert 1630057901995 nanoseconds to millis - assert.Equal(t, MID(1630057), id.MID) + // no convertion, used as micros + assert.Equal(t, MID(1630057901995), id.MID) +} + +func TestMillisToMID(t *testing.T) { + assert.Equal(t, MID(1761812502000000000), MillisToMID(1761812502000)) + + // we can scale this value + assert.Equal(t, MID(math.MaxUint64/3000000*1000000), MillisToMID(math.MaxUint64/3000000)) + + // greatest milliseconds (year 2500) we can scale to nanoseconds + assert.Equal(t, MID(18446744073709000000), MillisToMID(math.MaxUint64/1000000)) + + // we can't scale millis this high to nanoseconds (overflow), so we expect that a user just wants an "infinite future" + assert.Equal(t, MID(math.MaxUint64), MillisToMID(math.MaxUint64)) + assert.Equal(t, MID(math.MaxUint64/1000), MillisToMID(math.MaxUint64/1000)) + +} + +func TestTimeToMIDConversionOverflow(t *testing.T) { + timestamp := time.Now() + assert.EqualExportedValues(t, timestamp, MID(timestamp.UnixNano()).Time()) + + // check that we do not overflow on huge values + maxMID := MID(math.MaxUint64) + assert.Equal(t, 2554, maxMID.Time().Year()) + assert.Equal(t, 2554, MIDToTime(maxMID).Year()) +} + +func TestMIDToCeilingMillis(t *testing.T) { + assert.Equal(t, uint64(14), MIDToCeilingMillis(MID(14000000))) + assert.Equal(t, uint64(15), MIDToCeilingMillis(MID(14000001))) + assert.Equal(t, uint64(15), MIDToCeilingMillis(MID(14999999))) } func TestTimeToMIDConversion(t *testing.T) { timestampNow := time.Now() assert.EqualExportedValues(t, timestampNow, MID(timestampNow.UnixNano()).Time()) - timestamp2 := MID(1763984556395).Time().UTC() + timestamp2 := MID(1763984556395000000).Time().UTC() assert.Equal(t, 2025, timestamp2.Year()) assert.Equal(t, time.Month(11), timestamp2.Month()) assert.Equal(t, 24, timestamp2.Day()) @@ -38,6 +70,6 @@ func TestTimeToMIDConversion(t *testing.T) { // check that we do not overflow on huge values maxMID := MID(math.MaxUint64) - assert.Equal(t, 292278994, maxMID.Time().Year()) - assert.Equal(t, 292278994, MIDToTime(maxMID).Year()) + assert.Equal(t, 2554, maxMID.Time().Year()) + assert.Equal(t, 2554, MIDToTime(maxMID).Year()) } diff --git a/storeapi/client.go b/storeapi/client.go index 3dd22c97..3da87929 100644 --- a/storeapi/client.go +++ b/storeapi/client.go @@ -9,6 +9,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/emptypb" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/pkg/storeapi" ) @@ -20,37 +22,55 @@ func NewClient(store *Store) storeapi.StoreApiClient { return &inMemoryAPIClient{store: store} } -func (i inMemoryAPIClient) Bulk(ctx context.Context, in *storeapi.BulkRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { +func (i inMemoryAPIClient) Bulk(ctx context.Context, in *storeapi.BulkRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { // NOTE: We copy `Metas` to prevent dataraces because `store` might work // with this memory even when it returned response to client. in.Metas = slices.Clone(in.Metas) + setProtocolVersionHeader(opts...) return i.store.GrpcV1().Bulk(ctx, in) } -func (i inMemoryAPIClient) Search(ctx context.Context, in *storeapi.SearchRequest, _ ...grpc.CallOption) (*storeapi.SearchResponse, error) { +func (i inMemoryAPIClient) Search(ctx context.Context, in *storeapi.SearchRequest, opts ...grpc.CallOption) (*storeapi.SearchResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().Search(ctx, in) } -func (i inMemoryAPIClient) StartAsyncSearch(ctx context.Context, in *storeapi.StartAsyncSearchRequest, _ ...grpc.CallOption) (*storeapi.StartAsyncSearchResponse, error) { +func (i inMemoryAPIClient) StartAsyncSearch(ctx context.Context, in *storeapi.StartAsyncSearchRequest, opts ...grpc.CallOption) (*storeapi.StartAsyncSearchResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().StartAsyncSearch(ctx, in) } -func (i inMemoryAPIClient) FetchAsyncSearchResult(ctx context.Context, in *storeapi.FetchAsyncSearchResultRequest, _ ...grpc.CallOption) (*storeapi.FetchAsyncSearchResultResponse, error) { +func (i inMemoryAPIClient) FetchAsyncSearchResult(ctx context.Context, in *storeapi.FetchAsyncSearchResultRequest, opts ...grpc.CallOption) (*storeapi.FetchAsyncSearchResultResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().FetchAsyncSearchResult(ctx, in) } -func (i inMemoryAPIClient) CancelAsyncSearch(ctx context.Context, in *storeapi.CancelAsyncSearchRequest, _ ...grpc.CallOption) (*storeapi.CancelAsyncSearchResponse, error) { +func (i inMemoryAPIClient) CancelAsyncSearch(ctx context.Context, in *storeapi.CancelAsyncSearchRequest, opts ...grpc.CallOption) (*storeapi.CancelAsyncSearchResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().CancelAsyncSearch(ctx, in) } -func (i inMemoryAPIClient) DeleteAsyncSearch(ctx context.Context, in *storeapi.DeleteAsyncSearchRequest, _ ...grpc.CallOption) (*storeapi.DeleteAsyncSearchResponse, error) { +func (i inMemoryAPIClient) DeleteAsyncSearch(ctx context.Context, in *storeapi.DeleteAsyncSearchRequest, opts ...grpc.CallOption) (*storeapi.DeleteAsyncSearchResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().DeleteAsyncSearch(ctx, in) } -func (i inMemoryAPIClient) GetAsyncSearchesList(ctx context.Context, in *storeapi.GetAsyncSearchesListRequest, _ ...grpc.CallOption) (*storeapi.GetAsyncSearchesListResponse, error) { +func (i inMemoryAPIClient) GetAsyncSearchesList(ctx context.Context, in *storeapi.GetAsyncSearchesListRequest, opts ...grpc.CallOption) (*storeapi.GetAsyncSearchesListResponse, error) { + setProtocolVersionHeader(opts...) return i.store.GrpcV1().GetAsyncSearchesList(ctx, in) } +func setProtocolVersionHeader(opts ...grpc.CallOption) { + for _, opt := range opts { + if headerOpt, ok := opt.(grpc.HeaderCallOption); ok && headerOpt.HeaderAddr != nil { + if *headerOpt.HeaderAddr == nil { + *headerOpt.HeaderAddr = make(metadata.MD) + } + (*headerOpt.HeaderAddr)[consts.StoreProtocolVersionHeader] = []string{config.StoreProtocolVersion2.String()} + } + } +} + type storeAPIFetchServer struct { grpc.ServerStream ctx context.Context @@ -81,7 +101,9 @@ func newStoreAPIFetchClient(b []*storeapi.BinaryData) *storeAPIFetchClient { } func (x *storeAPIFetchClient) Header() (metadata.MD, error) { - return nil, nil + md := make(metadata.MD) + md[consts.StoreProtocolVersionHeader] = []string{config.StoreProtocolVersion2.String()} + return md, nil } func (x *storeAPIFetchClient) Recv() (*storeapi.BinaryData, error) { @@ -95,8 +117,9 @@ func (x *storeAPIFetchClient) Recv() (*storeapi.BinaryData, error) { return res, nil } -func (i inMemoryAPIClient) Fetch(ctx context.Context, in *storeapi.FetchRequest, _ ...grpc.CallOption) (storeapi.StoreApi_FetchClient, error) { +func (i inMemoryAPIClient) Fetch(ctx context.Context, in *storeapi.FetchRequest, opts ...grpc.CallOption) (storeapi.StoreApi_FetchClient, error) { s := newStoreAPIFetchServer(ctx) + setProtocolVersionHeader(opts...) if err := i.store.GrpcV1().Fetch(in, s); err != nil { return nil, err } diff --git a/storeapi/grpc_async_search.go b/storeapi/grpc_async_search.go index ef9669a4..518ed19f 100644 --- a/storeapi/grpc_async_search.go +++ b/storeapi/grpc_async_search.go @@ -32,8 +32,8 @@ func (g *GrpcV1) StartAsyncSearch( AST: nil, // Parse AST later. AggQ: aggs, HistInterval: uint64(r.HistogramInterval), - From: seq.MID(r.From), - To: seq.MID(r.To), + From: seq.MillisToMID(uint64(r.From)), + To: seq.MillisToMID(uint64(r.To)), Limit: limit, WithTotal: r.WithDocs, // return total if docs needed Order: seq.DocsOrderDesc, @@ -46,7 +46,7 @@ func (g *GrpcV1) StartAsyncSearch( Retention: r.Retention.AsDuration(), WithDocs: r.WithDocs, } - fracs := g.fracManager.Fractions().FilterInRange(seq.MID(r.From), seq.MID(r.To)) + fracs := g.fracManager.Fractions().FilterInRange(seq.MillisToMID(uint64(r.From)), seq.MillisToMID(uint64(r.To))) if err := g.asyncSearcher.StartSearch(req, fracs); err != nil { return nil, err } diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index ea9ff2e3..1c640b1b 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -79,7 +79,7 @@ func (g *GrpcV1) doSearch( start := time.Now() - from := seq.MID(req.From) + from := seq.MillisToMID(uint64(req.From)) // in store mode hot we return error in case request wants data, that we've already rotated if g.config.StoreMode == StoreModeHot { @@ -89,7 +89,7 @@ func (g *GrpcV1) doSearch( } } - to := seq.MID(req.To) + to := seq.MillisToMID(uint64(req.To)) limit := int(req.Size + req.Offset) if req.Explain { @@ -108,8 +108,8 @@ func (g *GrpcV1) doSearch( return nil, err } - fromTime := seq.MIDToTime(seq.MID(req.From)) - toTime := seq.MIDToTime(seq.MID(req.To)) + fromTime := seq.MIDToTime(from) + toTime := seq.MIDToTime(to) toTimeFilter := g.config.Filter.To fromTimeFilter := g.config.Filter.From @@ -205,8 +205,8 @@ func (g *GrpcV1) doSearch( zap.Int64("took_ms", took.Milliseconds()), zap.Object("req", (*searchRequestMarshaler)(req)), zap.Uint64("found", qpr.Total), - zap.String("from", seq.MID(req.From).String()), - zap.String("to", seq.MID(req.To).String()), + zap.String("from", seq.MillisToMID(uint64(req.From)).String()), + zap.String("to", seq.MillisToMID(uint64(req.To)).String()), zap.Int64("offset", req.Offset), zap.Int64("size", req.Size), zap.Bool("with_total", req.WithTotal), diff --git a/storeapi/grpc_server.go b/storeapi/grpc_server.go index 83331d9b..cb561fb1 100644 --- a/storeapi/grpc_server.go +++ b/storeapi/grpc_server.go @@ -40,11 +40,11 @@ func newGRPCServer(cfg APIConfig, fracManager *fracmanager.FracManager, mappingP func initServer() *grpc.Server { interceptors := []grpc.UnaryServerInterceptor{ - grpcutil.StoreProtocolHeaderUnaryServerInterceptor(config.StoreProtocolVersion1), + grpcutil.StoreProtocolHeaderUnaryServerInterceptor(config.StoreProtocolVersion2), grpcutil.ReturnToVTPoolUnaryServerInterceptor(), } streamInterceptors := []grpc.StreamServerInterceptor{ - grpcutil.StoreProtocolHeaderStreamServerInterceptor(config.StoreProtocolVersion1), + grpcutil.StoreProtocolHeaderStreamServerInterceptor(config.StoreProtocolVersion2), } opts := []grpc.ServerOption{ grpc.ChainUnaryInterceptor(interceptors...), diff --git a/storeapi/grpc_v1_test.go b/storeapi/grpc_v1_test.go index e8c69038..9a5baca2 100644 --- a/storeapi/grpc_v1_test.go +++ b/storeapi/grpc_v1_test.go @@ -54,7 +54,7 @@ func makeBulkRequest(cnt int) *storeapi.BulkRequest { dp := indexer.NewTestDocProvider() for i := 0; i < cnt; i++ { - id := seq.SimpleID(i + 1) + id := seq.SimpleID(int64(i + 1)) doc := []byte("document") dp.Append(doc, nil, id, "_all_:", "service:100500", "k8s_pod:"+strconv.Itoa(i)) } diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index 51a2c4dd..87139f34 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -354,7 +354,7 @@ func (s *IntegrationTestSuite) envWithDummyDocs(n int) (*setup.TestingEnv, []str origDocs := make([]string, 0, allDocsNum) docsBulk := make([]string, 2*n) - getNextTs := getAutoTsGenerator(time.Now(), -time.Second) + getNextTs := getAutoTsGenerator(time.Now(), -time.Nanosecond) for i := 0; i < bulksNum; i++ { @@ -431,7 +431,7 @@ func (s *IntegrationTestSuite) TestFetchNotFound() { func (s *IntegrationTestSuite) TestMulti() { // ingest - getNextTs := getAutoTsGenerator(time.Now(), -time.Second) + getNextTs := getAutoTsGenerator(time.Now(), -time.Nanosecond) origDocs := []string{ fmt.Sprintf(`{"service":"b1", "k8s_pod":"pod1", "yyyy":"xxxx1", "ts":%q}`, getNextTs()), fmt.Sprintf(`{"service":"b2", "k8s_pod":"pod2", "yyyy":"xxxx2", "ts":%q}`, getNextTs()), @@ -770,7 +770,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { qpr, _, _, err := env.Search(`service:"nginx-count"`, 1024, setup.WithAggQuery(search.AggQuery{ GroupBy: "level", Func: seq.AggFuncCount, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -790,7 +790,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { Field: "level", GroupBy: "service", Func: seq.AggFuncMin, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -810,7 +810,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { qpr, _, _, err := env.Search(`service:"nginx-max"`, 1024, setup.WithAggQuery(search.AggQuery{ Field: "level", Func: seq.AggFuncMax, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -829,7 +829,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { qpr, _, _, err := env.Search(`service:"nginx-avg"`, 1024, setup.WithAggQuery(search.AggQuery{ Field: "level", Func: seq.AggFuncAvg, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -848,7 +848,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { qpr, _, _, err := env.Search(`service:"nginx-sum"`, 1024, setup.WithAggQuery(search.AggQuery{ Field: "level", Func: seq.AggFuncSum, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -868,7 +868,7 @@ func (s *IntegrationTestSuite) TestTimeseries() { Field: "level", Func: seq.AggFuncQuantile, Quantiles: []float64{0.5}, - Interval: 30 * 1000, // 30 sec interval + Interval: seq.DurationToMID(30 * time.Second), })) require.NoError(t, err) @@ -1521,7 +1521,7 @@ func (s *IntegrationTestSuite) TestAsyncSearch() { Quantiles: []float64{0.99, 0.95, 0.50}, }, }, - HistogramInterval: seq.MID(time.Second.Milliseconds()), + HistogramInterval: seq.MID(time.Second.Nanoseconds()), WithDocs: true, Size: 100, } diff --git a/tests/integration_tests/single_test.go b/tests/integration_tests/single_test.go index 288c0dc2..81bc0fed 100644 --- a/tests/integration_tests/single_test.go +++ b/tests/integration_tests/single_test.go @@ -157,7 +157,7 @@ func (s *SingleTestSuite) TestSearchNestedWithAND() { doc = `{"timestamp":%q, "trace_id": "%d", "spans": [%s]}` ) docs := make([]string, 0, numTraces) - getNextTs := getAutoTsGenerator(time.Now(), time.Second) + getNextTs := getAutoTsGenerator(time.Now(), time.Nanosecond) for i := range numTraces { spans := make([]string, 0, numSpans) for j := range numSpans { diff --git a/tests/integration_tests/sub_search_test.go b/tests/integration_tests/sub_search_test.go index c03c81c4..3b859f31 100644 --- a/tests/integration_tests/sub_search_test.go +++ b/tests/integration_tests/sub_search_test.go @@ -75,7 +75,7 @@ func makeHist(data []time.Time, interval time.Duration) map[seq.MID]uint64 { for _, ts := range data { t := ts.UnixMilli() t -= t % int64(mid) - r[seq.MID(t)]++ + r[seq.MillisToMID(uint64(t))]++ } return r } diff --git a/tests/setup/env.go b/tests/setup/env.go index d7af955a..56e8ae4a 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -545,14 +545,14 @@ func WithAggQuery(aggQueries ...any) SearchOption { func WithInterval(interval time.Duration) SearchOption { return func(sr *search.SearchRequest) { - sr.Interval = seq.MID(interval / time.Millisecond) + sr.Interval = seq.DurationToMID(interval) } } func WithTimeRange(from, to time.Time) SearchOption { return func(sr *search.SearchRequest) { - sr.From = seq.MID(from.UnixMilli()) - sr.To = seq.MID(to.UnixMilli()) + sr.From = seq.TimeToMID(from) + sr.To = seq.TimeToMID(to) } } diff --git a/util/util.go b/util/util.go index c20f0530..34433213 100644 --- a/util/util.go +++ b/util/util.go @@ -117,6 +117,12 @@ func MsTsToESFormat(ts uint64) string { return time.UnixMilli(int64(ts)).Format(consts.ESTimeFormat) } +// NsTsToESFormat converts timestamp in nanoseconds to ES time format string. Nanosecond part will not be printed. +func NsTsToESFormat(ts uint64) string { + nanosPerSec := uint64(time.Second) + return time.Unix(int64(ts/nanosPerSec), int64(ts%nanosPerSec)).Format(consts.ESTimeFormat) +} + func BinSearchInRange(from, to int, fn func(i int) bool) int { n := to - from + 1 i := sort.Search(n, func(i int) bool { return fn(from + i) })