Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ message StartAsyncSearchRequest {
// Set this to true to enable document retrieval via FetchAsyncSearch.
// Note: enabling this may significantly increase disk space usage.
bool with_docs = 5;
// Maximum number of documents to find. Doesn't affect aggs and hist.
// Ignored if with_docs was set to false.
int64 size = 6;
}

message StartAsyncSearchResponse {
Expand Down
3 changes: 3 additions & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ message StartAsyncSearchRequest {
repeated AggQuery aggs = 6;
int64 histogram_interval = 7;
bool with_docs = 8;
int64 size = 9;
}

message StartAsyncSearchResponse {}
Expand Down Expand Up @@ -184,6 +185,7 @@ message FetchAsyncSearchResultResponse {
google.protobuf.Timestamp to = 13;
google.protobuf.Duration retention = 14;
bool with_docs = 15;
int64 size = 16;
}

message CancelAsyncSearchRequest{
Expand Down Expand Up @@ -225,6 +227,7 @@ message AsyncSearchesListItem {
google.protobuf.Timestamp to = 13;
google.protobuf.Duration retention = 14;
bool with_docs = 15;
int64 size = 16;
}

message IdWithHint {
Expand Down
11 changes: 6 additions & 5 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ func startProxy(

pconfig := proxyapi.IngestorConfig{
API: proxyapi.APIConfig{
SearchTimeout: consts.DefaultSearchTimeout,
ExportTimeout: consts.DefaultExportTimeout,
QueryRateLimit: cfg.Limits.QueryRate,
EsVersion: cfg.API.ESVersion,
GatewayAddr: cfg.Address.GRPC,
SearchTimeout: consts.DefaultSearchTimeout,
ExportTimeout: consts.DefaultExportTimeout,
QueryRateLimit: cfg.Limits.QueryRate,
EsVersion: cfg.API.ESVersion,
GatewayAddr: cfg.Address.GRPC,
AsyncSearchMaxDocumentsPerRequest: cfg.AsyncSearch.MaxDocumentsPerRequest,
},
Search: search.Config{
HotStores: hotStores,
Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,11 @@ type Config struct {
AsyncSearch struct {
// DataDir specifies directory that contains data for asynchronous searches.
// By default will be subdirectory in [Config.Storage.DataDir].
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
MaxTotalSize Bytes `config:"max_total_size" default:"1GiB"`
MaxSizePerRequest Bytes `config:"max_size_per_request" default:"100MiB"`
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
MaxTotalSize Bytes `config:"max_total_size" default:"1GiB"`
MaxSizePerRequest Bytes `config:"max_size_per_request" default:"100MiB"`
MaxDocumentsPerRequest int64 `config:"max_documents_per_request" default:"100000"`
} `config:"async_search"`

API struct {
Expand Down
1 change: 1 addition & 0 deletions docs/en/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Configuration for asynchronous search operations.
| `async_search.concurrency` | int | - | Concurrency level for async searches |
| `async_search.max_total_size` | Bytes | `1GiB` | - |
| `async_search.max_size_per_request` | Bytes | `100MiB` | - |
| `async_search.max_documents_per_request` | int | `100000` | - |

## API Configuration

Expand Down
12 changes: 8 additions & 4 deletions fracmanager/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -134,6 +133,7 @@ type AsyncSearchRequest struct {
Params processor.SearchParams
Query string
Retention time.Duration
WithDocs bool
}

type fracSearchState struct {
Expand Down Expand Up @@ -655,6 +655,7 @@ type FetchSearchResultResponse struct {
To seq.MID
Retention time.Duration
WithDocs bool
Size int64
}

func (as *AsyncSearcher) FetchSearchResult(r FetchSearchResultRequest) (FetchSearchResultResponse, bool) {
Expand Down Expand Up @@ -702,7 +703,8 @@ func (as *AsyncSearcher) FetchSearchResult(r FetchSearchResultRequest) (FetchSea
From: info.Request.Params.From,
To: info.Request.Params.To,
Retention: info.Request.Retention,
WithDocs: info.Request.Params.Limit == math.MaxInt,
WithDocs: info.Request.WithDocs,
Size: int64(info.Request.Params.Limit),
}, true
}

Expand Down Expand Up @@ -794,7 +796,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) {
qprPath := path.Join(as.config.DataDir, qprFilename)
qprs = append(qprs, qprPath)
}
qpr, sizeBefore := as.loadSearchResult(qprs, math.MaxInt, seq.DocsOrderDesc)
qpr, sizeBefore := as.loadSearchResult(qprs, job.Info.Request.Params.Limit, seq.DocsOrderDesc)

var sizeAfter int
storeMQPR := func(compressed []byte) error {
Expand Down Expand Up @@ -949,6 +951,7 @@ type AsyncSearchesListItem struct {
To seq.MID
Retention time.Duration
WithDocs bool
Size int64
}

func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []*AsyncSearchesListItem {
Expand Down Expand Up @@ -1004,7 +1007,8 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []*
From: info.Request.Params.From,
To: info.Request.Params.To,
Retention: info.Request.Retention,
WithDocs: info.Request.Params.Limit == math.MaxInt,
WithDocs: info.Request.WithDocs,
Size: int64(info.Request.Params.Limit),
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/seqproxyapi/v1/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestFetchAsyncSearchResultResponseMarshalJSON(t *testing.T) {
Aggs: []*AggQuery{},
Hist: nil,
WithDocs: true,
Size: 100,
},
Response: &ComplexSearchResponse{
Docs: []*Document{
Expand All @@ -141,6 +142,6 @@ func TestFetchAsyncSearchResultResponseMarshalJSON(t *testing.T) {
Progress: 1,
DiskUsage: 488,
},
`{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false},"aggs":[],"withDocs":true},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`,
`{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false},"aggs":[],"withDocs":true,"size":"100"},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`,
)
}
Loading
Loading