Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e0c8c4f
[CORELOG-3155] Normalize timeseries values
Jan 16, 2026
e95f3c0
Revert "[CORELOG-3155] Normalize timeseries values"
Jan 16, 2026
69cfca6
Normalize timeseries values
Jan 16, 2026
2feeb09
add http agg ts
Jan 21, 2026
0e63d8b
refactor + remove AggField From tests
Jan 23, 2026
09c1440
Merge branch 'main' of https://github.com/ozontech/seq-ui into normal…
Jan 23, 2026
108501d
add agg_func check
Jan 23, 2026
734e951
fix tests
Jan 23, 2026
87f4f24
add normalization in Search
Jan 27, 2026
a9d86be
add test
Jan 28, 2026
3991907
refactor
Feb 5, 2026
6a3424a
fixes
Feb 5, 2026
2b2c5ea
add bucket_quantity parameter + test
Feb 6, 2026
714e4c8
gen swagger
Feb 6, 2026
307426f
fix lint
Feb 6, 2026
e4f530b
Merge branch 'main' of https://github.com/ozontech/seq-ui into normal…
Feb 16, 2026
1aab589
add buckets unit to aggregation request
Feb 16, 2026
7b9d921
add buckets unit to aggregation response
Feb 16, 2026
39e6433
fix makefile
Feb 16, 2026
31054ba
fix tests
Feb 16, 2026
de5c757
refactor + fix tests
Feb 16, 2026
88fe2be
refactor bucket_unit
timggggggg Feb 20, 2026
ce0b875
update doc
timggggggg Feb 20, 2026
1f054c9
fix doc
timggggggg Feb 20, 2026
8c36470
Merge branch 'main' into normalize-timeseries-values
timggggggg Feb 20, 2026
f62bdf7
fix tests
timggggggg Feb 20, 2026
8b86657
Merge branch 'normalize-timeseries-values' of https://github.com/ozon…
timggggggg Feb 20, 2026
41421e5
fix generate
timggggggg Feb 20, 2026
e14d6cd
refactor
timggggggg Feb 26, 2026
7308aec
fix
timggggggg Feb 26, 2026
16e3993
fix
timggggggg Feb 26, 2026
062507d
remove GetBucketUnits from http aggregation ts
timggggggg Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/seqapi/v1/seq_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message Aggregation {

repeated Bucket buckets = 1;
int64 not_exists = 2;
string bucket_unit = 3;
}

enum AggFunc {
Expand All @@ -107,6 +108,7 @@ message AggregationQuery {
AggFunc func = 4;
repeated double quantiles = 5;
optional string interval = 6;
optional string bucket_unit = 7;
}

message SearchRequest {
Expand Down
6 changes: 6 additions & 0 deletions docs/en/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ Config for `/seqapi` API handlers.

Max allowed buckets per aggregation with timeseries request. The number of buckets is calculated as (`to`-`from`) / `interval`. If set to zero or negative value, then it will be reset to `default`.

+ **`default_bucket_unit`** *`string`* *`default="1s"`*

The unit to which bucket values ​​are normalized in aggregation with timeseries request if not specified in the request. The initial bucket unit is `count/interval`; after normalization, it is `count/bucket_unit`. If set to zero or negative value, then it will be reset to `default`.

The value must be passed in duration format: `<number>(ms|s|m|h)`.

+ **`events_cache_ttl`** *`string`* *`default="24h"`*

TTL for events caching. If not set or set to zero, then it will be reset to `default`.
Expand Down
6 changes: 6 additions & 0 deletions docs/ru/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ handlers:

Максимальное количество бакетов за один запрос аггрегации с таймсерией. Количество бакетов рассчитывается как (`to`-`from`) / `interval`. Если установлено нулевое или отрицательное значение, то оно будет сброшено на `default`.

+ **`default_bucket_unit`** *`string`* *`default="1s"`*

Единица измерения к которой нормализуются значения бакетов в аггрегациях с таймсерией, если она не задана в поисковом запросе. Изначальная единица измерения бакета равна `count/interval`, после нормализации равна `count/bucket_unit`. Если установлено нулевое или отрицательное значение, то оно будет сброшено на `default`.

> Значение должно быть передано в `duration`-формате: `<число>(ms|s|m|h)`.

+ **`events_cache_ttl`** *`string`* *`default="24h"`*

TTL кэширования событий. Если установлено нулевое или отрицательное значение, то оно будет сброшено на `default`.
Expand Down
96 changes: 96 additions & 0 deletions internal/api/seqapi/v1/aggregation_ts/normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package aggregation_ts

import (
"fmt"
"time"

"github.com/ozontech/seq-ui/pkg/seqapi/v1"
)

type aggQuery interface {
GetFunc() seqapi.AggFunc
GetInterval() string
GetBucketUnit() string
}

func NormalizeBuckets[T aggQuery](aggQueries []T, aggs []*seqapi.Aggregation, defaultBucketUnit time.Duration) error {
bucketUnits, err := getBucketUnits(aggQueries, defaultBucketUnit)
if err != nil {
return fmt.Errorf("failed to get bucket units: %w", err)
}

aggIntervals, err := getIntervals(aggQueries)
if err != nil {
return fmt.Errorf("failed to get intervals: %w", err)
}

for i, agg := range aggs {
if agg == nil || agg.Buckets == nil || aggIntervals[i] == 0 {
continue
}

bucketUnitDenominator := time.Second
if bucketUnits[i] != 0 {
bucketUnitDenominator = bucketUnits[i]
agg.BucketUnit = bucketUnits[i].String()
}

for _, bucket := range agg.Buckets {
if bucket == nil || bucket.Value == nil {
continue
}

*bucket.Value = *bucket.Value * float64(bucketUnitDenominator) / float64(aggIntervals[i])
}
}

return nil
}

func getBucketUnits[T aggQuery](aggQueries []T, defaultBucketUnit time.Duration) ([]time.Duration, error) {
aggBucketUnits := make([]time.Duration, 0, len(aggQueries))
for _, agg := range aggQueries {
if agg.GetFunc() != seqapi.AggFunc_AGG_FUNC_COUNT {
aggBucketUnits = append(aggBucketUnits, 0)
continue
}
bucketUnitRaw := agg.GetBucketUnit()
if bucketUnitRaw == "" {
aggBucketUnits = append(aggBucketUnits, defaultBucketUnit)
continue
}

bucketUnit, err := time.ParseDuration(bucketUnitRaw)
if err != nil {
return nil, err
}

aggBucketUnits = append(aggBucketUnits, bucketUnit)
}

return aggBucketUnits, nil
}

func getIntervals[T aggQuery](aggQueries []T) ([]time.Duration, error) {
aggIntervals := make([]time.Duration, 0, len(aggQueries))
for _, agg := range aggQueries {
if agg.GetFunc() != seqapi.AggFunc_AGG_FUNC_COUNT {
aggIntervals = append(aggIntervals, 0)
continue
}
intervalRaw := agg.GetInterval()
if intervalRaw == "" {
aggIntervals = append(aggIntervals, time.Second)
continue
}

interval, err := time.ParseDuration(intervalRaw)
if err != nil {
return nil, err
}

aggIntervals = append(aggIntervals, interval)
}

return aggIntervals, nil
}
6 changes: 6 additions & 0 deletions internal/api/seqapi/v1/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"

"github.com/ozontech/seq-ui/internal/api/seqapi/v1/aggregation_ts"
"github.com/ozontech/seq-ui/internal/api/seqapi/v1/api_error"
"github.com/ozontech/seq-ui/pkg/seqapi/v1"
"github.com/ozontech/seq-ui/tracing"
Expand Down Expand Up @@ -90,5 +91,10 @@ func (a *API) GetAggregation(ctx context.Context, req *seqapi.GetAggregationRequ
}
}

err = aggregation_ts.NormalizeBuckets(req.Aggregations, resp.Aggregations, a.config.DefaultAggregationTsBucketUnit)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return resp, nil
}
154 changes: 133 additions & 21 deletions internal/api/seqapi/v1/grpc/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ func TestGetAggregation(t *testing.T) {

cfg config.SeqAPI
}{
{
name: "ok_single_agg",
req: &seqapi.GetAggregationRequest{
Query: query,
From: timestamppb.New(from),
To: timestamppb.New(to),
AggField: "test1",
},
resp: &seqapi.GetAggregationResponse{
Aggregation: test.MakeAggregation(2, nil),
Aggregations: test.MakeAggregations(1, 2, nil),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
},
},
{
name: "ok_multi_agg",
req: &seqapi.GetAggregationRequest{
Expand All @@ -60,7 +44,6 @@ func TestGetAggregation(t *testing.T) {
},
},
resp: &seqapi.GetAggregationResponse{
Aggregation: test.MakeAggregation(3, nil),
Aggregations: test.MakeAggregations(2, 3, nil),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
Expand All @@ -87,10 +70,15 @@ func TestGetAggregation(t *testing.T) {
{
name: "err_client",
req: &seqapi.GetAggregationRequest{
Query: query,
From: timestamppb.New(from),
To: timestamppb.New(to),
AggField: "test2",
Query: query,
From: timestamppb.New(from),
To: timestamppb.New(to),
Aggregations: []*seqapi.AggregationQuery{
{Field: "test2"},
},
},
cfg: config.SeqAPI{
MaxAggregationsPerRequest: 1,
},
clientErr: errors.New("client error"),
},
Expand Down Expand Up @@ -127,3 +115,127 @@ func TestGetAggregation(t *testing.T) {
})
}
}

func TestGetAggregationWithNormalization(t *testing.T) {
query := "message:error"
from := time.Now()
to := from.Add(time.Second)
bucketUnit := "3s"

tests := []struct {
name string

req *seqapi.GetAggregationRequest
resp *seqapi.GetAggregationResponse
normalized_resp *seqapi.GetAggregationResponse

apiErr bool
clientErr error

cfg config.SeqAPI
}{
{
name: "ok_normalize",
req: &seqapi.GetAggregationRequest{
Query: query,
From: timestamppb.New(from),
To: timestamppb.New(to),
Aggregations: []*seqapi.AggregationQuery{
{Field: "test1", Func: seqapi.AggFunc_AGG_FUNC_COUNT, BucketUnit: &bucketUnit},
{Field: "test2", Func: seqapi.AggFunc_AGG_FUNC_COUNT, BucketUnit: &bucketUnit},
},
},
resp: &seqapi.GetAggregationResponse{
Aggregations: test.MakeAggregations(2, 3, &test.MakeAggOpts{
BucketUnit: bucketUnit,
}),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
},
normalized_resp: &seqapi.GetAggregationResponse{
Aggregations: test.MakeAggregations(2, 3, &test.MakeAggOpts{
BucketUnit: bucketUnit,
Values: []float64{
3,
6,
9,
},
}),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
},
cfg: config.SeqAPI{
MaxAggregationsPerRequest: 3,
DefaultAggregationTsBucketUnit: time.Second,
},
},
{
name: "ok_normalize_default_bucket_unit",
req: &seqapi.GetAggregationRequest{
Query: query,
From: timestamppb.New(from),
To: timestamppb.New(to),
Aggregations: []*seqapi.AggregationQuery{
{Field: "test1", Func: seqapi.AggFunc_AGG_FUNC_COUNT},
{Field: "test2", Func: seqapi.AggFunc_AGG_FUNC_COUNT},
},
},
resp: &seqapi.GetAggregationResponse{
Aggregations: test.MakeAggregations(2, 3, nil),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
},
normalized_resp: &seqapi.GetAggregationResponse{
Aggregations: test.MakeAggregations(2, 3, &test.MakeAggOpts{
BucketUnit: "4s",
Values: []float64{
4,
8,
12,
},
}),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
},
cfg: config.SeqAPI{
MaxAggregationsPerRequest: 3,
DefaultAggregationTsBucketUnit: 4 * time.Second,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

seqData := test.APITestData{
Cfg: tt.cfg,
}

if !tt.apiErr {
ctrl := gomock.NewController(t)

seqDbMock := mock_seqdb.NewMockClient(ctrl)
seqDbMock.EXPECT().GetAggregation(gomock.Any(), proto.Clone(tt.req)).
Return(proto.Clone(tt.resp), tt.clientErr).Times(1)

seqData.Mocks.SeqDB = seqDbMock
}

s := initTestAPI(seqData)

resp, err := s.GetAggregation(context.Background(), tt.req)
if tt.apiErr {
require.True(t, err != nil)
return
}

require.Equal(t, tt.clientErr, err)
require.True(t, proto.Equal(resp, tt.normalized_resp))
})
}
}
6 changes: 6 additions & 0 deletions internal/api/seqapi/v1/grpc/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"

"github.com/ozontech/seq-ui/internal/api/seqapi/v1/aggregation_ts"
"github.com/ozontech/seq-ui/internal/api/seqapi/v1/api_error"
"github.com/ozontech/seq-ui/pkg/seqapi/v1"
"github.com/ozontech/seq-ui/tracing"
Expand Down Expand Up @@ -101,5 +102,10 @@ func (a *API) Search(ctx context.Context, req *seqapi.SearchRequest) (*seqapi.Se
}
}

err = aggregation_ts.NormalizeBuckets(req.Aggregations, resp.Aggregations, a.config.DefaultAggregationTsBucketUnit)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return resp, nil
}
2 changes: 1 addition & 1 deletion internal/api/seqapi/v1/grpc/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestSearch(t *testing.T) {
Events: test.MakeEvents(int(limit), eventTime),
Total: int64(limit),
Histogram: test.MakeHistogram(2),
Aggregations: test.MakeAggregations(3, 2, nil),
Aggregations: test.MakeAggregations(2, 2, nil),
Error: &seqapi.Error{
Code: seqapi.ErrorCode_ERROR_CODE_NO,
},
Expand Down
Loading