Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
876432d
microseconds support
cheb0 Oct 30, 2025
3ab6ba1
uncomment test
cheb0 Oct 31, 2025
17d181d
seq_id string in micros, but support legacy millis
cheb0 Oct 31, 2025
eece147
dirty if-convert MID in proxy search
cheb0 Oct 31, 2025
50b3777
dirty if in streaming_doc.go
cheb0 Oct 31, 2025
95eea7e
dbg qpr out
cheb0 Nov 1, 2025
f70c8fe
dbg print
cheb0 Nov 1, 2025
52a3cb9
remove dbg print
cheb0 Nov 1, 2025
3060b12
dirty if for MID conversion for hist
cheb0 Nov 5, 2025
0ba6c6f
do not use cached info from frac cache if binary version is different
cheb0 Nov 5, 2025
5ec9ec1
better parsing of seq.ID
cheb0 Nov 5, 2025
d579050
fix millis to micros conversion overflow
cheb0 Nov 5, 2025
82d6765
grpc header for reporting MID precision
cheb0 Nov 5, 2025
38b5b16
rename header
cheb0 Nov 5, 2025
ef91c7d
report MID precision on streaming RPC call
cheb0 Nov 5, 2025
cbfcfa8
MID now is in nanoseconds
cheb0 Nov 5, 2025
7b17f7b
v1 QPR compatibility
cheb0 Nov 6, 2025
8e82b05
support mid precision "ms"
cheb0 Nov 6, 2025
56babec
handle compatibility with milliseconds
cheb0 Nov 6, 2025
30a2157
omit json tag
cheb0 Nov 6, 2025
91643ce
Merge branch 'main' into 232-nanos-support-phase-2
cheb0 Nov 7, 2025
7492d0f
merge main
cheb0 Nov 7, 2025
28634df
frac cache keeps from and to in milliseconds in JSON file
cheb0 Nov 7, 2025
9c53433
overflow fixes, tests
cheb0 Nov 10, 2025
0423be3
test fixes
cheb0 Nov 11, 2025
9120932
use nanosecond step in tests
cheb0 Nov 11, 2025
c92fe56
move to protocol version
cheb0 Nov 21, 2025
84255d8
Merge branch 'main' into 232-nanos-support-phase-2
cheb0 Dec 1, 2025
6c13141
carry PR fixes from phase 1, add tests
cheb0 Dec 1, 2025
65f7236
redo json marshal for common.Info, fixes
cheb0 Dec 2, 2025
2e9dd32
Merge branch 'main' into 232-nanos-support-phase-2
cheb0 Dec 3, 2025
e637cc6
Merge branch 'refs/heads/main' into 232-nanos-support-phase-2
cheb0 Dec 3, 2025
9a6393e
fomatting fixes
cheb0 Dec 3, 2025
613f831
Merge branch 'main' into 232-nanos-support-phase-2
cheb0 Dec 5, 2025
63b8330
store protocol for single, linter, test issues
cheb0 Dec 5, 2025
575f66d
Remove unneded conversion
cheb0 Dec 15, 2025
88f7e87
Merge branch 'main' into 232-nanos-support-phase-2
cheb0 Dec 15, 2025
33a31f3
PR review: rename type alias
cheb0 Dec 17, 2025
41da960
PR review: cast hist interval to seq.MID
cheb0 Dec 17, 2025
a8dc1d7
PR review: remove unnecessary MillisToMID calls
cheb0 Dec 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearch
}
ctx, cancel := context.WithCancel(context.Background())
return asyncSearchInfo{
Version: infoVersion1,
Version: infoVersion2,
Finished: false,
Error: "",
CanceledAt: time.Time{},
Expand Down Expand Up @@ -591,10 +591,10 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
if info.Version == 0 {
info.Version = infoVersion1
}
if info.Version == infoVersion2 {
info.Request.Params.From = seq.NanosToMID(uint64(info.Request.Params.From))
info.Request.Params.To = seq.NanosToMID(uint64(info.Request.Params.To))
info.Version = infoVersion1
if info.Version == infoVersion1 {
info.Request.Params.From = seq.MillisToMID(uint64(info.Request.Params.From))
info.Request.Params.To = seq.MillisToMID(uint64(info.Request.Params.To))
info.Version = infoVersion2
}

info.merged.Store(areQPRsMerged[requestID])
Expand Down
14 changes: 7 additions & 7 deletions asyncsearcher/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var availableVersions = map[qprBinVersion]struct{}{
}

func marshalQPR(q *seq.QPR, dst []byte) []byte {
dst = append(dst, uint8(qprBinVersion1))
dst = append(dst, uint8(qprBinVersion2))

blocksLenPos := len(dst)
dst = append(dst, make([]byte, 8)...)
Expand Down Expand Up @@ -238,8 +238,8 @@ func unmarshalIDsDelta(dst seq.IDSources, block []byte, version qprBinVersion) (
block = block[hintSize:]

var midValue seq.MID
if version == qprBinVersion2 {
midValue = seq.NanosToMID(uint64(mid))
if version == qprBinVersion1 {
midValue = seq.MillisToMID(uint64(mid))
} else {
midValue = seq.MID(mid)
}
Expand Down Expand Up @@ -296,8 +296,8 @@ func unmarshalHistogram(src []byte, version qprBinVersion) (map[seq.MID]uint64,
}

var midValue seq.MID
if version == qprBinVersion2 {
midValue = seq.NanosToMID(uint64(mid))
if version == qprBinVersion1 {
midValue = seq.MillisToMID(uint64(mid))
} else {
midValue = seq.MID(mid)
}
Expand Down Expand Up @@ -460,8 +460,8 @@ func unmarshalAggregatableSamples(q *seq.AggregatableSamples, src []byte, versio
src = tail

var midValue seq.MID
if version == qprBinVersion2 {
midValue = seq.NanosToMID(uint64(mid))
if version == qprBinVersion1 {
midValue = seq.MillisToMID(uint64(mid))
} else {
midValue = seq.MID(mid)
}
Expand Down
43 changes: 23 additions & 20 deletions asyncsearcher/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,34 +92,34 @@ func TestQPRMarshalUnmarshal(t *testing.T) {
}
}

// TestQPRVersion2Compatibility tests that it's possible to unmarshall and read version 2 async search result encoded.
// MIDs in IDs and a histogram must be converted to milliseconds
func TestQPRVersion2Compatibility(t *testing.T) {
// TestQPRVersion1Compatibility tests that it's possible to unmarshall and read version 1 async search result encoded.
// MIDs in IDs and a histogram must be converted from millis to nanos
func TestQPRVersion1Compatibility(t *testing.T) {
qpr := seq.QPR{
IDs: seq.IDSources{
{
ID: seq.ID{MID: 1761812502573000000, RID: 34734732392},
ID: seq.ID{MID: seq.MID(1761812502573), RID: 34734732392},
},
},
Histogram: map[seq.MID]uint64{
1761812502573000000: 433,
1761812502463000000: 743,
seq.MID(1761812502573): 433,
seq.MID(1761812502463): 743,
},
Aggs: []seq.AggregatableSamples{
{
SamplesByBin: map[seq.AggBin]*seq.SamplesContainer{
{Token: "_not_exists"}: {
Total: 1,
},
{Token: "seq-db store", MID: seq.MID(1761812502953000000)}: {
{Token: "seq-db store", MID: seq.MID(1761812502953)}: {
Min: 3,
Max: 5,
Sum: 794,
Total: 1,
NotExists: 7,
Samples: []float64{324},
},
{Token: "seq-db store", MID: seq.MID(1761812502456000000)}: {
{Token: "seq-db store", MID: seq.MID(1761812502456)}: {
Min: 2,
Max: 6,
Sum: 544,
Expand All @@ -132,27 +132,30 @@ func TestQPRVersion2Compatibility(t *testing.T) {
},
},
}

rawQPR := marshalQPR(&qpr, nil)
rawQPR[0] = uint8(qprBinVersion2)
var outQpr seq.QPR
tail, err := unmarshalQPR(&outQpr, rawQPR, math.MaxInt)
rawQPR[0] = uint8(qprBinVersion1)

var outQPR seq.QPR
tail, err := unmarshalQPR(&outQPR, rawQPR, math.MaxInt)
require.NoError(t, err)
require.Equal(t, 0, len(tail))
require.Equal(t, seq.MID(1761812502573), outQpr.IDs[0].ID.MID, "mid doesn't match, should convert to milliseconds")

require.Equal(t, 2, len(outQpr.Histogram))
require.Equal(t, uint64(433), outQpr.Histogram[seq.MID(1761812502573)], "histogram bucket doesn't match")
require.Equal(t, uint64(743), outQpr.Histogram[seq.MID(1761812502463)], "histogram bucket doesn't match")
require.Equal(t, seq.MID(1761812502573000000), outQPR.IDs[0].ID.MID, "mid doesn't match, should convert to nanoseconds")

require.Len(t, outQPR.Histogram, 2)
require.Equal(t, uint64(433), outQPR.Histogram[seq.MID(1761812502573000000)], "histogram bucket doesn't match")
require.Equal(t, uint64(743), outQPR.Histogram[seq.MID(1761812502463000000)], "histogram bucket doesn't match")

require.Equal(t, 1, len(outQpr.Aggs), "should have one AggregatableSamples")
agg := outQpr.Aggs[0]
require.Equal(t, 3, len(agg.SamplesByBin), "should have 3 samples in bin")
require.Len(t, outQPR.Aggs, 1, "should have one AggregatableSamples")
agg := outQPR.Aggs[0]
require.Len(t, agg.SamplesByBin, 3, "should have 3 samples in bin")

notExistsBin := seq.AggBin{Token: "_not_exists"}
require.Equal(t, int64(1), agg.SamplesByBin[notExistsBin].Total, "bucket doesn't match")
bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953)}
bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953000000)}
require.Equal(t, int64(1), agg.SamplesByBin[bin1].Total, "bucket doesn't match")
bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456)}
bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456000000)}
require.Equal(t, int64(2), agg.SamplesByBin[bin2].Total, "bucket doesn't match")
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/distribution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func main() {
zap.String("name", info.Name()),
zap.String("ver", info.Ver),
zap.Uint32("docs_total", info.DocsTotal),
zap.String("from", util.MsTsToESFormat(uint64(info.From))),
zap.String("to", util.MsTsToESFormat(uint64(info.To))),
zap.String("from", util.NsTsToESFormat(uint64(info.From))),
zap.String("to", util.NsTsToESFormat(uint64(info.To))),
zap.String("creation_time", util.MsTsToESFormat(info.CreationTime)),
)
}
Expand Down
2 changes: 1 addition & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ const (
BinaryDataV2
)

const CurrentFracVersion = BinaryDataV1
const CurrentFracVersion = BinaryDataV2
2 changes: 1 addition & 1 deletion docs/en/internal/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Common

* ID: MID-RID .
* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
* RID - random part of ID, generated by ingestor before sending to store.
* docParam - link of ID and block position, position of doc in block.
* Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file.
Expand Down
2 changes: 1 addition & 1 deletion docs/en/internal/search.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Some basic overview of nodes:
>
> **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid.
>
> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
>
> **RID** (random ID) - random part of an id

Expand Down
2 changes: 1 addition & 1 deletion docs/ru/internal/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Common

* ID: MID-RID .
* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
* RID - random part of ID, generated by ingestor before sending to store.
* docParam - link of ID and block position, position of doc in block.
* Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file.
Expand Down
2 changes: 1 addition & 1 deletion docs/ru/internal/search.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Some basic overview of nodes:
>
> **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid.
>
> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
>
> **RID** (random ID) - random part of an id

Expand Down
35 changes: 34 additions & 1 deletion frac/common/info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"encoding/json"
"fmt"
"math"
"path"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *Info) BuildDistribution(mids []uint64) {
}

func (s *Info) InitEmptyDistribution() bool {
from := time.UnixMilli(int64(s.From))
from := s.From.Time()
creationTime := time.UnixMilli(int64(s.CreationTime))
if creationTime.Sub(from) < DistributionSpreadThreshold { // no big spread in past
return false
Expand Down Expand Up @@ -117,3 +118,35 @@ func (s *Info) IsIntersecting(from, to seq.MID) bool {
// check with distribution
return s.Distribution.IsIntersecting(from, to)
}

// MarshalJSON implements custom JSON marshaling to always store From and To in milliseconds
func (s *Info) MarshalJSON() ([]byte, error) {
type TmpInfo Info // type alias to avoid infinite recursion

tmp := TmpInfo(*s)

// We convert "from" and "to" to milliseconds in order to guarantee we can rollback on deploy.
// When converting nanos to millis we must round "from" down (floor) and round "to" up (ceiling).
// This guarantees that a fraction time range (checked on search with Contains and IsIntersecting methods) is not narrowed down,
// and we do not lose messages on search.
tmp.From = seq.MID(seq.MIDToMillis(s.From))
tmp.To = seq.MID(seq.MIDToCeilingMillis(s.To))

return json.Marshal(tmp)
}

// UnmarshalJSON implements custom JSON unmarshaling to convert From and To from milliseconds to nanoseconds
func (s *Info) UnmarshalJSON(data []byte) error {
type TmpInfo Info // type alias to avoid infinite recursion
var tmp TmpInfo

err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}

*s = Info(tmp)
s.From = seq.MillisToMID(uint64(tmp.From))
s.To = seq.MillisToMID(uint64(tmp.To))
return nil
}
118 changes: 118 additions & 0 deletions frac/common/info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package common

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/seq"
)

func TestInfo_MarshalJSON(t *testing.T) {
info := &Info{
Path: "test-frac",
Ver: "2",
DocsTotal: 100,
DocsOnDisk: 1000,
DocsRaw: 2000,
MetaOnDisk: 500,
IndexOnDisk: 1500,
From: seq.MID(1761812502000000000),
To: seq.MID(1761812503000000000),
CreationTime: 1666193044479,
SealingTime: 1666193045000,
}

jsonBytes, err := json.Marshal(info)
require.NoError(t, err)

var jsonMap map[string]interface{}
err = json.Unmarshal(jsonBytes, &jsonMap)
require.NoError(t, err)

fromRaw, ok := jsonMap["from"].(float64)
require.True(t, ok, "from should be a number")
assert.Equal(t, float64(1761812502000), fromRaw, "should scale from from millis on marshal")
toRaw, ok := jsonMap["to"].(float64)
require.True(t, ok, "to should be a number")
assert.Equal(t, float64(1761812503000), toRaw, "should scale from to millis on marshal")

// validate that original fields are not changed while marshaling (safety check)
assert.Equal(t, seq.MID(1761812502000000000), info.From, "must not change while marshaling")
assert.Equal(t, seq.MID(1761812503000000000), info.To, "must not change while marshaling")
}

func TestInfo_UnmarshalJSON(t *testing.T) {
jsonData := `{
"name": "test-frac",
"ver": "2",
"docs_total": 100,
"docs_on_disk": 1000,
"docs_raw": 2000,
"meta_on_disk": 500,
"index_on_disk": 1500,
"from": 1761812502000,
"to": 1761812503000,
"creation_time": 1666193044479,
"sealing_time": 1666193045000
}`

var info Info
err := json.Unmarshal([]byte(jsonData), &info)
require.NoError(t, err)

assert.Equal(t, seq.MID(1761812502000000000), info.From, "should scale to nanoseconds")
assert.Equal(t, seq.MID(1761812503000000000), info.To, "should scale to nanoseconds")
assert.Equal(t, "test-frac", info.Path)
assert.Equal(t, uint32(100), info.DocsTotal)
}

func TestInfo_MarshalUnmarshal(t *testing.T) {
original := &Info{
Path: "test-frac",
Ver: "2",
DocsTotal: 100,
DocsOnDisk: 1000,
DocsRaw: 2000,
MetaOnDisk: 500,
IndexOnDisk: 1500,
From: seq.MID(1761812502000000000),
To: seq.MID(1761812503000000000),
CreationTime: 1666193044479,
SealingTime: 1666193045000,
}

jsonBytes, err := json.Marshal(original)
require.NoError(t, err)

var unmarshaled Info
err = json.Unmarshal(jsonBytes, &unmarshaled)
require.NoError(t, err)

assert.EqualExportedValues(t, original, &unmarshaled, "should match after marshal/unmarshal")
}

func TestInfo_MarshalUnmarshalWithNanos(t *testing.T) {
original := &Info{
Path: "test-frac",
Ver: "2",
From: seq.MID(1761812502000000777),
To: seq.MID(1761812503000000777),
CreationTime: 1666193044479,
SealingTime: 1666193045000,
}

jsonBytes, err := json.Marshal(original)
require.NoError(t, err)

var unmarshaled Info
err = json.Unmarshal(jsonBytes, &unmarshaled)
require.NoError(t, err)

// we can't represent nanos in millis while saving, so "from" is floored (rounded down) to near millisecond,
// while "to" is ceiled (rounded up) to near millisecond
assert.Equal(t, seq.MID(1761812502000000000), unmarshaled.From)
assert.Equal(t, seq.MID(1761812503001000000), unmarshaled.To)
}
Loading
Loading