Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ message SearchRequest {
bool with_total = 4; // Should total number of documents be returned in response.
Order order = 5; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 6; // ID offset for pagination.
repeated string regions = 7; // Required when proxy uses experimental.regions: list of region names to query.
}

message ComplexSearchRequest {
Expand All @@ -227,6 +228,7 @@ message ComplexSearchRequest {
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
repeated string regions = 9; // Required when proxy uses experimental.regions: list of region names to query.
}

message SearchResponse {
Expand Down Expand Up @@ -352,6 +354,7 @@ message AsyncSearchesListItem {
message GetAggregationRequest {
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
repeated string regions = 3; // Required when proxy uses experimental.regions: list of region names to query.
}

message GetAggregationResponse {
Expand All @@ -364,6 +367,7 @@ message GetAggregationResponse {
message GetHistogramRequest {
SearchQuery query = 1; // Search query.
HistQuery hist = 2; // Histogram query.
repeated string regions = 3; // Required when proxy uses experimental.regions: list of region names to query.
}

message GetHistogramResponse {
Expand Down Expand Up @@ -415,6 +419,7 @@ message ExportRequest {
SearchQuery query = 1; // Search query.
int64 size = 2; // Maximum number of documents to return.
int64 offset = 3; // Search offset.
repeated string regions = 4; // Required when proxy uses experimental.regions: list of region names to query.
}

message ExportResponse {
Expand Down
94 changes: 64 additions & 30 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"runtime"
"sort"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -87,6 +88,7 @@ func main() {
config.MaxRequestedDocuments = cfg.Limits.SearchDocs
config.MaxRegexTokensCheck = cfg.Experimental.MaxRegexTokensCheck
config.FailPartialResponse = cfg.Cluster.FailPartialResponse
config.UseRegions = cfg.Experimental.Regions.UseRegions

backoff.DefaultConfig.MaxDelay = 10 * time.Second

Expand Down Expand Up @@ -155,48 +157,21 @@ func startProxy(
) *proxyapi.Ingestor {
logger.Info("max queries per second", zap.Float64("limit", cfg.Limits.QueryRate))

hotReplicasNum := cfg.Cluster.Replicas
if cfg.Cluster.HotReplicas > 0 {
hotReplicasNum = cfg.Cluster.HotReplicas
}

hotStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.HotStores, ","), hotReplicasNum)
hotReadStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.HotReadStores, ","), hotReplicasNum)
readStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.ReadStores, ","), cfg.Cluster.Replicas)
writeStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.WriteStores, ","), cfg.Cluster.Replicas)

logger.Info("stores data",
zap.String("read_stores", readStores.String()),
zap.String("write_stores", writeStores.String()),
zap.String("hot_stores", hotStores.String()),
zap.String("hot_read_stores", hotReadStores.String()),
)

err := validateIngestorTopology(hotStores, hotReadStores, readStores, writeStores)
if err != nil {
logger.Fatal("validating topology", zap.Error(err))
}

pconfig := proxyapi.IngestorConfig{
pConfig := proxyapi.IngestorConfig{
API: proxyapi.APIConfig{
SearchTimeout: consts.DefaultSearchTimeout,
ExportTimeout: consts.DefaultExportTimeout,
QueryRateLimit: cfg.Limits.QueryRate,
EsVersion: cfg.API.ESVersion,
GatewayAddr: cfg.Address.GRPC,
AsyncSearchMaxDocumentsPerRequest: cfg.AsyncSearch.MaxDocumentsPerRequest,
MaxRegionsPerRequest: cfg.Experimental.Regions.MaxRegionsPerRequest,
},
Search: search.Config{
HotStores: hotStores,
HotReadStores: hotReadStores,
ReadStores: readStores,
WriteStores: writeStores,
ShuffleReplicas: cfg.Cluster.ShuffleReplicas,
MirrorAddr: cfg.Cluster.MirrorAddress,
},
Bulk: bulk.IngestorConfig{
HotStores: hotStores,
WriteStores: writeStores,
BulkCircuit: circuitbreaker.Config{
Timeout: cfg.CircuitBreaker.Bulk.ShardTimeout,
MaxConcurrent: int64(cfg.Limits.InflightBulks),
Expand All @@ -219,7 +194,66 @@ func startProxy(
},
}

ingestor, err := proxyapi.NewIngestor(pconfig, inMemory)
regionsCfg := cfg.Experimental.Regions
// without regions
if !config.UseRegions {
hotReplicasNum := cfg.Cluster.Replicas
if cfg.Cluster.HotReplicas > 0 {
hotReplicasNum = cfg.Cluster.HotReplicas
}

hotStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.HotStores, ","), hotReplicasNum)
hotReadStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.HotReadStores, ","), hotReplicasNum)
readStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.ReadStores, ","), cfg.Cluster.Replicas)
writeStores := stores.NewStoresFromString(strings.Join(cfg.Cluster.WriteStores, ","), cfg.Cluster.Replicas)

logger.Info("stores data",
zap.String("read_stores", readStores.String()),
zap.String("write_stores", writeStores.String()),
zap.String("hot_stores", hotStores.String()),
zap.String("hot_read_stores", hotReadStores.String()),
)

err := validateIngestorTopology(hotStores, hotReadStores, readStores, writeStores)
if err != nil {
logger.Fatal("validating topology", zap.Error(err))
}

pConfig.Search.HotStores = hotStores
pConfig.Search.HotReadStores = hotReadStores
pConfig.Search.ReadStores = readStores
pConfig.Search.WriteStores = writeStores

pConfig.Bulk.HotStores = hotStores
pConfig.Bulk.WriteStores = writeStores
} else {
var regionStores *stores.Stores
var regionNames []string

names := make([]string, 0, len(regionsCfg.Regions))
for name := range regionsCfg.Regions {
names = append(names, strings.TrimSpace(name))
}
sort.Strings(names)
regionNames = names
regionStores = &stores.Stores{
Shards: make([][]string, len(names)),
Vers: make([]string, len(names)),
}
for i, name := range names {
regionStores.Shards[i] = []string{regionsCfg.Regions[name]}
regionStores.Vers[i] = ""
}

pConfig.Search.RegionStores = regionStores
pConfig.Search.RegionNames = regionNames

logger.Info("experimental.regions enabled",
zap.Strings("regions", regionNames),
)
}

ingestor, err := proxyapi.NewIngestor(pConfig, inMemory)
if err != nil {
logger.Panic("failed to init ingestor", zap.Error(err))
}
Expand Down
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ type Config struct {
// Specify how many tokens can be checked using regular expressions.
// If zero then there is no limit.
MaxRegexTokensCheck int `config:"max_regex_tokens_check" default:"0"`

// Regions configures the proxy to query only remote regions (other seq-proxies via store API).
// When set, hot_stores, hot_read_stores, write_stores, read_stores must be empty. No bulk; search only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Even with proxy setup I still can configure hot_stores:

cluster:
  replicas: 1
  hot_stores:
    - localhost:9004
    - localhost:9008

However, I can't use them:

{
  "code": 3,
  "message": "no regions specified in request (required when using experimental.regions)"
}

I guess, we could validate config that hot_stores can't be set if regions are enabled.

Regions struct {
// UseRegions is whether to use regions or not
UseRegions bool `config:"use_regions"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, config looks like that:

experimental:
    regions:
        use_regions: true

We repeat regions twice here.

I think it would be cleaner to have just:

experimental:
    regions:
        enabled: true

// Regions maps region name to store API gRPC address (e.g. "eu" -> "eu-proxy:9004").
Regions map[string]string `config:"regions"`
// MaxConcurrent limits how many regions are queried in parallel per search (0 = no limit).
MaxConcurrent int `config:"max_concurrent" default:"0"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's hard to tell what max_concurrent means without reading a code comment.

experimental:
  regions:
    use_regions: true
    ...
    max_concurrent: 5

// MaxRegionsPerRequest is the maximum number of regions a client can specify in one search request (e.g. 5).
MaxRegionsPerRequest int `config:"max_regions_per_request" default:"5"`
} `config:"regions"`
} `config:"experimental"`
}

Expand Down
2 changes: 2 additions & 0 deletions config/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ var (
MaxRegexTokensCheck int

FailPartialResponse = false

UseRegions = false
)
2 changes: 2 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@ var (
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
ErrTooManyFractionsHit = errors.New("too many fractions hit")
ErrNoRegionsSpecified = errors.New("no regions specified in request (required when using experimental.regions)")
ErrTooManyRegionsRequested = errors.New("too many regions in request (exceeds max_regions_per_request)")
)
Loading
Loading