From 8efe9a37da3a68011f246dbc6a09a3eefd795e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Tue, 18 Feb 2025 15:04:58 +0100 Subject: [PATCH 1/7] feat: allow scaling of the search service --- services/search/pkg/config/engine.go | 1 + services/search/pkg/engine/bleve.go | 167 +++++++++++++++--- .../search/pkg/service/grpc/v0/service.go | 6 +- 3 files changed, 147 insertions(+), 27 deletions(-) diff --git a/services/search/pkg/config/engine.go b/services/search/pkg/config/engine.go index 15cd29a378a..549b498a181 100644 --- a/services/search/pkg/config/engine.go +++ b/services/search/pkg/config/engine.go @@ -9,4 +9,5 @@ type Engine struct { // EngineBleve configures the bleve engine type EngineBleve struct { Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"` + Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the bleve index. If set to false, the service will have exclusive write access to the index as long as the service is running, locking out other processes. Defaults to false." introductionVersion:"%%NEXT%%"` } diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index c6f765b5db5..fab352d7cd8 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -36,15 +36,16 @@ import ( // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { + indexPath string index bleve.Index queryCreator searchQuery.Creator[query.Query] } -// NewBleveIndex returns a new bleve index +// newBleveIndex returns a new bleve index // given path must exist. -func NewBleveIndex(root string) (bleve.Index, error) { +func newBleveIndex(root string, params map[string]interface{}) (bleve.Index, error) { destination := filepath.Join(root, "bleve") - index, err := bleve.Open(destination) + index, err := bleve.OpenUsing(destination, params) if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { m, err := BuildBleveMapping() if err != nil { @@ -62,11 +63,34 @@ func NewBleveIndex(root string) (bleve.Index, error) { } // NewBleveEngine creates a new Bleve instance -func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve { +// If scalable is set to true, one connection to the index is created and +// closed per operation, so multiple operations can be executed in parallel. +// If set to false, only one write connection is created for the whole +// service, which will lock the index for other processes. In this case, +// you must close the engine yourself. +func NewBleveEngine(indexPath string, queryCreator searchQuery.Creator[query.Query], scalable bool) (*Bleve, error) { + var idx bleve.Index + var err error + + if !scalable { + idx, err = newBleveIndex(indexPath, nil) + if err != nil { + return nil, err + } + } + return &Bleve{ - index: index, + indexPath: indexPath, + index: idx, queryCreator: queryCreator, + }, nil +} + +func (b *Bleve) Close() error { + if b.index != nil { + return b.index.Close() } + return nil } // BuildBleveMapping builds a bleve index mapping which can be used for indexing @@ -123,6 +147,21 @@ func BuildBleveMapping() (mapping.IndexMapping, error) { // Search executes a search request operation within the index. // Returns a SearchIndexResponse object or an error. func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, map[string]interface{}{ + "read_only": true, + }) + if biErr != nil { + return nil, biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + createdQuery, err := b.queryCreator.Create(sir.Query) if err != nil { if searchQuery.IsValidationError(err) { @@ -169,7 +208,7 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques } bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return nil, err } @@ -237,19 +276,45 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { - return b.index.Index(id, r) + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, nil) + if biErr != nil { + return biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + return bleveIndex.Index(id, r) } // Move updates the resource location and all of its necessary fields. func (b *Bleve) Move(id string, parentid string, target string) error { - r, err := b.getResource(id) + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, nil) + if biErr != nil { + return biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + r, err := b.getResource(bleveIndex, id) if err != nil { return err } currentPath := r.Path nextPath := utils.MakeRelativePath(target) - r, err = b.updateEntity(id, func(r *Resource) { + r, err = b.updateEntity(bleveIndex, id, func(r *Resource) { r.Path = nextPath r.Name = path.Base(nextPath) r.ParentID = parentid @@ -266,13 +331,13 @@ func (b *Bleve) Move(id string, parentid string, target string) error { bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return err } for _, h := range res.Hits { - _, err := b.updateEntity(h.ID, func(r *Resource) { + _, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) { r.Path = strings.Replace(r.Path, currentPath, nextPath, 1) }) if err != nil { @@ -289,29 +354,83 @@ func (b *Bleve) Move(id string, parentid string, target string) error { // instead of removing the resource it just marks it as deleted! // can be undone func (b *Bleve) Delete(id string) error { - return b.setDeleted(id, true) + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, nil) + if biErr != nil { + return biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + return b.setDeleted(bleveIndex, id, true) } // Restore is the counterpart to Delete. // It restores the resource which makes it available again. func (b *Bleve) Restore(id string) error { - return b.setDeleted(id, false) + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, nil) + if biErr != nil { + return biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + return b.setDeleted(bleveIndex, id, false) } // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { - return b.index.Delete(id) + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, nil) + if biErr != nil { + return biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + return bleveIndex.Delete(id) } // DocCount returns the number of resources in the index. func (b *Bleve) DocCount() (uint64, error) { - return b.index.DocCount() + var bleveIndex bleve.Index + var biErr error + + if b.index == nil { + bleveIndex, biErr = newBleveIndex(b.indexPath, map[string]interface{}{ + "read_only": true, + }) + if biErr != nil { + return 0, biErr + } + defer bleveIndex.Close() + } else { + bleveIndex = b.index + } + + return bleveIndex.DocCount() } -func (b *Bleve) getResource(id string) (*Resource, error) { +func (b *Bleve) getResource(bleveIndex bleve.Index, id string) (*Resource, error) { req := bleve.NewSearchRequest(bleve.NewDocIDQuery([]string{id})) req.Fields = []string{"*"} - res, err := b.index.Search(req) + res, err := bleveIndex.Search(req) if err != nil { return nil, err } @@ -446,19 +565,19 @@ func getPhotoValue[T any](fields map[string]interface{}) *T { return nil } -func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) { - it, err := b.getResource(id) +func (b *Bleve) updateEntity(bleveIndex bleve.Index, id string, mutateFunc func(r *Resource)) (*Resource, error) { + it, err := b.getResource(bleveIndex, id) if err != nil { return nil, err } mutateFunc(it) - return it, b.index.Index(it.ID, it) + return it, bleveIndex.Index(it.ID, it) } -func (b *Bleve) setDeleted(id string, deleted bool) error { - it, err := b.updateEntity(id, func(r *Resource) { +func (b *Bleve) setDeleted(bleveIndex bleve.Index, id string, deleted bool) error { + it, err := b.updateEntity(bleveIndex, id, func(r *Resource) { r.Deleted = deleted }) if err != nil { @@ -473,13 +592,13 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return err } for _, h := range res.Hits { - _, err := b.updateEntity(h.ID, func(r *Resource) { + _, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) { r.Deleted = deleted }) if err != nil { diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 82dfd3249dc..e95786e2c9b 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -45,16 +45,16 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) var eng engine.Engine switch cfg.Engine.Type { case "bleve": - idx, err := engine.NewBleveIndex(cfg.Engine.Bleve.Datapath) + bleveEngine, err := engine.NewBleveEngine(cfg.Engine.Bleve.Datapath, bleve.DefaultCreator, cfg.Engine.Bleve.Scale) if err != nil { return nil, teardown, err } teardown = func() { - _ = idx.Close() + _ = bleveEngine.Close() } + eng = bleveEngine - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) default: return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) } From 29fc141123e26c931fa153f813db3ec84bb53b77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Wed, 19 Feb 2025 13:56:15 +0100 Subject: [PATCH 2/7] feat: refactor scaling code also for tests --- services/search/pkg/engine/bleve.go | 165 ++++++------------ services/search/pkg/engine/bleve/index.go | 150 ++++++++++++++++ services/search/pkg/engine/bleve/option.go | 21 +++ services/search/pkg/engine/bleve_test.go | 14 +- .../search/pkg/service/grpc/v0/service.go | 11 +- 5 files changed, 236 insertions(+), 125 deletions(-) create mode 100644 services/search/pkg/engine/bleve/index.go create mode 100644 services/search/pkg/engine/bleve/option.go diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index fab352d7cd8..d3c58758170 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -5,7 +5,6 @@ import ( "errors" "math" "path" - "path/filepath" "reflect" "strings" "time" @@ -31,66 +30,41 @@ import ( searchMessage "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" searchService "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" "github.com/owncloud/ocis/v2/services/search/pkg/content" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" searchQuery "github.com/owncloud/ocis/v2/services/search/pkg/query" ) // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { - indexPath string - index bleve.Index + indexGetter bleveEngine.IndexGetter queryCreator searchQuery.Creator[query.Query] } -// newBleveIndex returns a new bleve index -// given path must exist. -func newBleveIndex(root string, params map[string]interface{}) (bleve.Index, error) { - destination := filepath.Join(root, "bleve") - index, err := bleve.OpenUsing(destination, params) - if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { - m, err := BuildBleveMapping() - if err != nil { - return nil, err - } - index, err = bleve.New(destination, m) - if err != nil { - return nil, err - } - - return index, nil - } - - return index, err -} - // NewBleveEngine creates a new Bleve instance // If scalable is set to true, one connection to the index is created and // closed per operation, so multiple operations can be executed in parallel. // If set to false, only one write connection is created for the whole // service, which will lock the index for other processes. In this case, // you must close the engine yourself. -func NewBleveEngine(indexPath string, queryCreator searchQuery.Creator[query.Query], scalable bool) (*Bleve, error) { - var idx bleve.Index - var err error - - if !scalable { - idx, err = newBleveIndex(indexPath, nil) - if err != nil { - return nil, err - } - } - +func NewBleveEngine(indexGetter bleveEngine.IndexGetter, queryCreator searchQuery.Creator[query.Query]) *Bleve { return &Bleve{ - indexPath: indexPath, - index: idx, + indexGetter: indexGetter, queryCreator: queryCreator, - }, nil + } } +// Close will get the index and close it. If the indexGetter is returning +// new instances, this method will close just the new returned instance but +// not any other instances that might be in use. +// +// This method is useful if "memory" and "persistent" (not "persistentScale") +// index getters are used. func (b *Bleve) Close() error { - if b.index != nil { - return b.index.Close() + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err } - return nil + return bleveIndex.Close() } // BuildBleveMapping builds a bleve index mapping which can be used for indexing @@ -147,19 +121,12 @@ func BuildBleveMapping() (mapping.IndexMapping, error) { // Search executes a search request operation within the index. // Returns a SearchIndexResponse object or an error. func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, map[string]interface{}{ - "read_only": true, - }) - if biErr != nil { - return nil, biErr - } + bleveIndex, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + if err != nil { + return nil, err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } createdQuery, err := b.queryCreator.Create(sir.Query) @@ -276,17 +243,12 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, nil) - if biErr != nil { - return biErr - } + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } return bleveIndex.Index(id, r) @@ -294,17 +256,12 @@ func (b *Bleve) Upsert(id string, r Resource) error { // Move updates the resource location and all of its necessary fields. func (b *Bleve) Move(id string, parentid string, target string) error { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, nil) - if biErr != nil { - return biErr - } + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } r, err := b.getResource(bleveIndex, id) @@ -354,17 +311,12 @@ func (b *Bleve) Move(id string, parentid string, target string) error { // instead of removing the resource it just marks it as deleted! // can be undone func (b *Bleve) Delete(id string) error { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, nil) - if biErr != nil { - return biErr - } + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } return b.setDeleted(bleveIndex, id, true) @@ -373,17 +325,12 @@ func (b *Bleve) Delete(id string) error { // Restore is the counterpart to Delete. // It restores the resource which makes it available again. func (b *Bleve) Restore(id string) error { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, nil) - if biErr != nil { - return biErr - } + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } return b.setDeleted(bleveIndex, id, false) @@ -391,17 +338,12 @@ func (b *Bleve) Restore(id string) error { // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, nil) - if biErr != nil { - return biErr - } + bleveIndex, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } return bleveIndex.Delete(id) @@ -409,19 +351,12 @@ func (b *Bleve) Purge(id string) error { // DocCount returns the number of resources in the index. func (b *Bleve) DocCount() (uint64, error) { - var bleveIndex bleve.Index - var biErr error - - if b.index == nil { - bleveIndex, biErr = newBleveIndex(b.indexPath, map[string]interface{}{ - "read_only": true, - }) - if biErr != nil { - return 0, biErr - } + bleveIndex, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + if err != nil { + return 0, err + } + if b.indexGetter.IndexCanBeClosed() { defer bleveIndex.Close() - } else { - bleveIndex = b.index } return bleveIndex.DocCount() diff --git a/services/search/pkg/engine/bleve/index.go b/services/search/pkg/engine/bleve/index.go new file mode 100644 index 00000000000..d7c27c80a7c --- /dev/null +++ b/services/search/pkg/engine/bleve/index.go @@ -0,0 +1,150 @@ +package bleve + +import ( + "errors" + "path/filepath" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" +) + +// IndexGetter is an interface that provides a way to get an index. +// Implementations might differ in how the index is created and how the +// index is gotten (reused, created on the fly, etc). +// +// Some implementations might require the index to be kept opened, meaning +// the index should be closed only when the application is shutting down. In +// this case, IndexCanBeClosed should return false. If the index can be +// closed and reopened safely at any time, IndexCanBeClosed should +// return true. +type IndexGetter interface { + GetIndex(opts ...GetIndexOption) (bleve.Index, error) + IndexCanBeClosed() bool +} + +type IndexGetterMemory struct { + mapping mapping.IndexMapping + index bleve.Index +} + +// NewIndexGetterMemory creates a new IndexGetterMemory. This implementation +// creates a new in-memory index every time GetIndex is called. As such, the +// index must be kept opened. Closing the index will result in wiping the +// data. +func NewIndexGetterMemory(mapping mapping.IndexMapping) *IndexGetterMemory { + return &IndexGetterMemory{ + mapping: mapping, + } +} + +// GetIndex creates a new in-memory index every time it is called. +// The options are ignored in this implementation. +func (i *IndexGetterMemory) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { + if i.index != nil { + return i.index, nil + } + + index, err := bleve.NewMemOnly(i.mapping) + if err != nil { + return nil, err + } + + i.index = index + return i.index, nil +} + +// IndexCanBeClosed returns false, meaning the index must be kept opened. +func (i *IndexGetterMemory) IndexCanBeClosed() bool { + return false +} + +type IndexGetterPersistent struct { + rootDir string + mapping mapping.IndexMapping + index bleve.Index +} + +// NewIndexGetterPersistent creates a new IndexGetterPersistent. The index +// will be persisted on the FS. If the index does not exist, it will be +// created. If the index exists, it will be opened. +// +// The index will be cached and reused every time GetIndex is called. You +// should not close the index unless you are shutting down the application. +func NewIndexGetterPersistent(rootDir string, mapping mapping.IndexMapping) *IndexGetterPersistent { + return &IndexGetterPersistent{ + rootDir: rootDir, + mapping: mapping, + } +} + +// GetIndex returns the cached index. The options are ignored in this +// implementation. +func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { + if i.index != nil { + return i.index, nil + } + + destination := filepath.Join(i.rootDir, "bleve") + index, err := bleve.Open(destination) + if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { + index, err = bleve.New(destination, i.mapping) + if err != nil { + return nil, err + } + } + + i.index = index + return i.index, nil +} + +// IndexCanBeClosed returns false, meaning the index must be kept opened. +func (i *IndexGetterPersistent) IndexCanBeClosed() bool { + return false +} + +type IndexGetterPersistentScale struct { + rootDir string + mapping mapping.IndexMapping +} + +// NewIndexGetterPersistentScale creates a new IndexGetterPersistentScale. +// The index will be persisted on the FS. If the index does not exist, it will +// be created. If the index exists, it will be opened. +// The GetIndex method will create a new connection to the index every time +// it is called. That connection must be closed after use. +func NewIndexGetterPersistentScale(rootDir string, mapping mapping.IndexMapping) *IndexGetterPersistentScale { + return &IndexGetterPersistentScale{ + rootDir: rootDir, + mapping: mapping, + } +} + +// GetIndex creates a new connection to the index every time it is called. +// You can use the ReadOnly option to open the index in read-only mode. This +// allow read-only operations to be performed in parallel. +// In order to avoid blocking write operations, you should close the index +// as soon as you are done with it. +func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { + options := newGetIndexOptions(opts...) + destination := filepath.Join(i.rootDir, "bleve") + params := map[string]interface{}{ + "read_only": options.ReadOnly, + } + index, err := bleve.OpenUsing(destination, params) + if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { + index, err = bleve.New(destination, i.mapping) + if err != nil { + return nil, err + } + + return index, nil + } + + return index, err +} + +// IndexCanBeClosed returns true, meaning the index can be closed and +// reopened. You should close the index as soon as you are done with it. +func (i *IndexGetterPersistentScale) IndexCanBeClosed() bool { + return true +} diff --git a/services/search/pkg/engine/bleve/option.go b/services/search/pkg/engine/bleve/option.go new file mode 100644 index 00000000000..791d38278dc --- /dev/null +++ b/services/search/pkg/engine/bleve/option.go @@ -0,0 +1,21 @@ +package bleve + +type GetIndexOption func(o *GetIndexOptions) + +type GetIndexOptions struct { + ReadOnly bool +} + +func ReadOnly(b bool) GetIndexOption { + return func(o *GetIndexOptions) { + o.ReadOnly = b + } +} + +func newGetIndexOptions(opts ...GetIndexOption) GetIndexOptions { + o := GetIndexOptions{} + for _, opt := range opts { + opt(&o) + } + return o +} diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 7209487a5d0..015c6cd1c80 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -15,6 +15,7 @@ import ( searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" "github.com/owncloud/ocis/v2/services/search/pkg/content" "github.com/owncloud/ocis/v2/services/search/pkg/engine" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/query/bleve" ) @@ -59,10 +60,12 @@ var _ = Describe("Bleve", func() { mapping, err := engine.BuildBleveMapping() Expect(err).ToNot(HaveOccurred()) - idx, err = bleveSearch.NewMemOnly(mapping) + indexGetter := bleveEngine.NewIndexGetterMemory(mapping) + + idx, err = indexGetter.GetIndex() Expect(err).ToNot(HaveOccurred()) - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(indexGetter, bleve.DefaultCreator) Expect(err).ToNot(HaveOccurred()) rootResource = engine.Resource{ @@ -91,13 +94,6 @@ var _ = Describe("Bleve", func() { } }) - Describe("New", func() { - It("returns a new index instance", func() { - b := engine.NewBleveEngine(idx, bleve.DefaultCreator) - Expect(b).ToNot(BeNil()) - }) - }) - Describe("Search", func() { Context("by other fields than filename", func() { It("finds files by tags", func() { diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index e95786e2c9b..e2f901d1b93 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -30,6 +30,7 @@ import ( "github.com/owncloud/ocis/v2/services/search/pkg/config" "github.com/owncloud/ocis/v2/services/search/pkg/content" "github.com/owncloud/ocis/v2/services/search/pkg/engine" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/query/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/search" ) @@ -45,11 +46,19 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) var eng engine.Engine switch cfg.Engine.Type { case "bleve": - bleveEngine, err := engine.NewBleveEngine(cfg.Engine.Bleve.Datapath, bleve.DefaultCreator, cfg.Engine.Bleve.Scale) + bleveMapping, err := engine.BuildBleveMapping() if err != nil { return nil, teardown, err } + var indexGetter bleveEngine.IndexGetter + indexGetter = bleveEngine.NewIndexGetterPersistent(cfg.Engine.Bleve.Datapath, bleveMapping) + if cfg.Engine.Bleve.Scale { + indexGetter = bleveEngine.NewIndexGetterPersistentScale(cfg.Engine.Bleve.Datapath, bleveMapping) + } + + bleveEngine := engine.NewBleveEngine(indexGetter, bleve.DefaultCreator) + teardown = func() { _ = bleveEngine.Close() } From 6ddae69088b51185439457d704a45acfaa21c098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Wed, 19 Feb 2025 15:24:35 +0100 Subject: [PATCH 3/7] docs: adjust description of the config option --- services/search/pkg/config/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/search/pkg/config/engine.go b/services/search/pkg/config/engine.go index 549b498a181..670e44e8cad 100644 --- a/services/search/pkg/config/engine.go +++ b/services/search/pkg/config/engine.go @@ -9,5 +9,5 @@ type Engine struct { // EngineBleve configures the bleve engine type EngineBleve struct { Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"` - Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the bleve index. If set to false, the service will have exclusive write access to the index as long as the service is running, locking out other processes. Defaults to false." introductionVersion:"%%NEXT%%"` + Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service will lock out other processes trying to access the index as long it is running." introductionVersion:"%%NEXT%%"` } From 758bda94ba2498c7d6eb396497fd87533ace8a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Tue, 1 Apr 2025 18:31:34 +0200 Subject: [PATCH 4/7] refactor: use a closeFn in the GetIndex instead of checking value The closeFn implicitly checks the value as the implementation decides what to do (either do nothing or close the index). Calling the closeFn is safe and we don't need conditions. --- services/search/pkg/engine/bleve.go | 45 +++++-------- services/search/pkg/engine/bleve/index.go | 73 ++++++++++++---------- services/search/pkg/engine/bleve/option.go | 6 ++ 3 files changed, 62 insertions(+), 62 deletions(-) diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index d3c58758170..e5ebe3f51df 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -60,7 +60,8 @@ func NewBleveEngine(indexGetter bleveEngine.IndexGetter, queryCreator searchQuer // This method is useful if "memory" and "persistent" (not "persistentScale") // index getters are used. func (b *Bleve) Close() error { - bleveIndex, err := b.indexGetter.GetIndex() + // regardless of the implementation, we want to close the index + bleveIndex, _, err := b.indexGetter.GetIndex() if err != nil { return err } @@ -121,13 +122,11 @@ func BuildBleveMapping() (mapping.IndexMapping, error) { // Search executes a search request operation within the index. // Returns a SearchIndexResponse object or an error. func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { - bleveIndex, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) if err != nil { return nil, err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() createdQuery, err := b.queryCreator.Create(sir.Query) if err != nil { @@ -243,26 +242,22 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { - bleveIndex, err := b.indexGetter.GetIndex() + bleveIndex, closeFn, err := b.indexGetter.GetIndex() if err != nil { return err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() return bleveIndex.Index(id, r) } // Move updates the resource location and all of its necessary fields. func (b *Bleve) Move(id string, parentid string, target string) error { - bleveIndex, err := b.indexGetter.GetIndex() + bleveIndex, closeFn, err := b.indexGetter.GetIndex() if err != nil { return err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() r, err := b.getResource(bleveIndex, id) if err != nil { @@ -311,13 +306,11 @@ func (b *Bleve) Move(id string, parentid string, target string) error { // instead of removing the resource it just marks it as deleted! // can be undone func (b *Bleve) Delete(id string) error { - bleveIndex, err := b.indexGetter.GetIndex() + bleveIndex, closeFn, err := b.indexGetter.GetIndex() if err != nil { return err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() return b.setDeleted(bleveIndex, id, true) } @@ -325,39 +318,33 @@ func (b *Bleve) Delete(id string) error { // Restore is the counterpart to Delete. // It restores the resource which makes it available again. func (b *Bleve) Restore(id string) error { - bleveIndex, err := b.indexGetter.GetIndex() + bleveIndex, closeFn, err := b.indexGetter.GetIndex() if err != nil { return err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() return b.setDeleted(bleveIndex, id, false) } // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { - bleveIndex, err := b.indexGetter.GetIndex() + bleveIndex, closeFn, err := b.indexGetter.GetIndex() if err != nil { return err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() return bleveIndex.Delete(id) } // DocCount returns the number of resources in the index. func (b *Bleve) DocCount() (uint64, error) { - bleveIndex, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) if err != nil { return 0, err } - if b.indexGetter.IndexCanBeClosed() { - defer bleveIndex.Close() - } + defer closeFn() return bleveIndex.DocCount() } diff --git a/services/search/pkg/engine/bleve/index.go b/services/search/pkg/engine/bleve/index.go index d7c27c80a7c..b58568c232e 100644 --- a/services/search/pkg/engine/bleve/index.go +++ b/services/search/pkg/engine/bleve/index.go @@ -12,16 +12,26 @@ import ( // Implementations might differ in how the index is created and how the // index is gotten (reused, created on the fly, etc). // +// The GetIndex method returns a function that must be called to close the index. // Some implementations might require the index to be kept opened, meaning // the index should be closed only when the application is shutting down. In -// this case, IndexCanBeClosed should return false. If the index can be -// closed and reopened safely at any time, IndexCanBeClosed should -// return true. +// this case, the returned function to close the index should do nothing (not +// closing the index). If the index can be closed and reopened safely at any +// time, the returned function should close the index. +// Calling the returned function to close the index is fine regardless of the +// implementation, and it will act as a no-op if the index should be kept opened. type IndexGetter interface { - GetIndex(opts ...GetIndexOption) (bleve.Index, error) - IndexCanBeClosed() bool + GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) } +// IndexGetterMemory is an implementation of IndexGetter that uses an in-memory +// index. The implementation caches the index and returns the same index every +// time GetIndex is called. +// The data won't be persisted between runs, and closing the index will wipe +// the data. +// The close function returned by GetIndex won't do anything. The index should +// be kept opened until the application is shutting down. +// This is useful for testing and small datasets. type IndexGetterMemory struct { mapping mapping.IndexMapping index bleve.Index @@ -39,25 +49,26 @@ func NewIndexGetterMemory(mapping mapping.IndexMapping) *IndexGetterMemory { // GetIndex creates a new in-memory index every time it is called. // The options are ignored in this implementation. -func (i *IndexGetterMemory) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { +func (i *IndexGetterMemory) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { + closeFn := func() {} // no-op if i.index != nil { - return i.index, nil + return i.index, closeFn, nil } index, err := bleve.NewMemOnly(i.mapping) if err != nil { - return nil, err + return nil, closeFn, err } i.index = index - return i.index, nil -} - -// IndexCanBeClosed returns false, meaning the index must be kept opened. -func (i *IndexGetterMemory) IndexCanBeClosed() bool { - return false + return i.index, closeFn, nil } +// IndexGetterPersistent is an implementation of IndexGetter that persists the +// index on the filesystem. The implementation caches the index and returns the +// same index every time GetIndex is called. +// The close function returned by GetIndex won't do anything. The index should +// be kept opened until the application is shutting down. type IndexGetterPersistent struct { rootDir string mapping mapping.IndexMapping @@ -79,9 +90,10 @@ func NewIndexGetterPersistent(rootDir string, mapping mapping.IndexMapping) *Ind // GetIndex returns the cached index. The options are ignored in this // implementation. -func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { +func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { + closeFn := func() {} // no-op if i.index != nil { - return i.index, nil + return i.index, closeFn, nil } destination := filepath.Join(i.rootDir, "bleve") @@ -89,19 +101,19 @@ func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, e if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { index, err = bleve.New(destination, i.mapping) if err != nil { - return nil, err + return nil, closeFn, err } } i.index = index - return i.index, nil -} - -// IndexCanBeClosed returns false, meaning the index must be kept opened. -func (i *IndexGetterPersistent) IndexCanBeClosed() bool { - return false + return i.index, closeFn, nil } +// IndexGetterPersistentScale is an implementation of IndexGetter that persists +// the index on the filesystem. The implementation does not cache the index and +// creates a new connection to the index every time GetIndex is called. +// The close function returned by GetIndex must be called to close the index, as +// soon as you the operations on the index are done. type IndexGetterPersistentScale struct { rootDir string mapping mapping.IndexMapping @@ -124,7 +136,7 @@ func NewIndexGetterPersistentScale(rootDir string, mapping mapping.IndexMapping) // allow read-only operations to be performed in parallel. // In order to avoid blocking write operations, you should close the index // as soon as you are done with it. -func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Index, error) { +func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { options := newGetIndexOptions(opts...) destination := filepath.Join(i.rootDir, "bleve") params := map[string]interface{}{ @@ -134,17 +146,12 @@ func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Ind if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { index, err = bleve.New(destination, i.mapping) if err != nil { - return nil, err + closeFn := func() {} // no-op + return nil, closeFn, err } - return index, nil + return index, func() { index.Close() }, nil } - return index, err -} - -// IndexCanBeClosed returns true, meaning the index can be closed and -// reopened. You should close the index as soon as you are done with it. -func (i *IndexGetterPersistentScale) IndexCanBeClosed() bool { - return true + return index, func() { index.Close() }, err } diff --git a/services/search/pkg/engine/bleve/option.go b/services/search/pkg/engine/bleve/option.go index 791d38278dc..b22302548f6 100644 --- a/services/search/pkg/engine/bleve/option.go +++ b/services/search/pkg/engine/bleve/option.go @@ -1,17 +1,23 @@ package bleve +// GetIndexOption is a function that sets some option for the GetIndex method. type GetIndexOption func(o *GetIndexOptions) +// GetIndexOptions contains the options for the GetIndex method. type GetIndexOptions struct { ReadOnly bool } +// ReadOnly is an option to opens the index in read-only mode. +// This option should allow running multiple read-only operations in parallel. +// The behavior of write operations is not defined when this option is used. func ReadOnly(b bool) GetIndexOption { return func(o *GetIndexOptions) { o.ReadOnly = b } } +// newGetIndexOptions creates a new GetIndexOptions with the given options. func newGetIndexOptions(opts ...GetIndexOption) GetIndexOptions { o := GetIndexOptions{} for _, opt := range opts { From 2ca9390c75c443d995caff8ae27277145d786c7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Wed, 2 Apr 2025 12:36:29 +0200 Subject: [PATCH 5/7] fix: adjust unit tests --- services/search/pkg/engine/bleve_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 015c6cd1c80..937901a0f8b 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -62,7 +62,7 @@ var _ = Describe("Bleve", func() { indexGetter := bleveEngine.NewIndexGetterMemory(mapping) - idx, err = indexGetter.GetIndex() + idx, _, err = indexGetter.GetIndex() // IndexGetterMemory ignores closeFn Expect(err).ToNot(HaveOccurred()) eng = engine.NewBleveEngine(indexGetter, bleve.DefaultCreator) From 69d5ada28037171cd3d71dbba54de7558e92bc06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Tue, 6 May 2025 09:12:37 +0200 Subject: [PATCH 6/7] chore: adjust description of configuration option --- services/search/pkg/config/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/search/pkg/config/engine.go b/services/search/pkg/config/engine.go index 670e44e8cad..fc7623d8ac9 100644 --- a/services/search/pkg/config/engine.go +++ b/services/search/pkg/config/engine.go @@ -9,5 +9,5 @@ type Engine struct { // EngineBleve configures the bleve engine type EngineBleve struct { Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"` - Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service will lock out other processes trying to access the index as long it is running." introductionVersion:"%%NEXT%%"` + Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service has exclusive access to the index as long it is running. This locks out other search processes tying to access the index." introductionVersion:"%%NEXT%%"` } From 620b9ad4c0cb388118831c1ccbdc8e5cdfb271fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Thu, 15 May 2025 15:53:43 +0200 Subject: [PATCH 7/7] fix: error handling --- services/search/pkg/engine/bleve/index.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/services/search/pkg/engine/bleve/index.go b/services/search/pkg/engine/bleve/index.go index b58568c232e..9685b7dd1ab 100644 --- a/services/search/pkg/engine/bleve/index.go +++ b/services/search/pkg/engine/bleve/index.go @@ -49,7 +49,7 @@ func NewIndexGetterMemory(mapping mapping.IndexMapping) *IndexGetterMemory { // GetIndex creates a new in-memory index every time it is called. // The options are ignored in this implementation. -func (i *IndexGetterMemory) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { +func (i *IndexGetterMemory) GetIndex(_ ...GetIndexOption) (bleve.Index, func(), error) { closeFn := func() {} // no-op if i.index != nil { return i.index, closeFn, nil @@ -90,7 +90,7 @@ func NewIndexGetterPersistent(rootDir string, mapping mapping.IndexMapping) *Ind // GetIndex returns the cached index. The options are ignored in this // implementation. -func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { +func (i *IndexGetterPersistent) GetIndex(_ ...GetIndexOption) (bleve.Index, func(), error) { closeFn := func() {} // no-op if i.index != nil { return i.index, closeFn, nil @@ -103,6 +103,8 @@ func (i *IndexGetterPersistent) GetIndex(opts ...GetIndexOption) (bleve.Index, f if err != nil { return nil, closeFn, err } + } else if err != nil { + return nil, closeFn, err } i.index = index @@ -142,16 +144,17 @@ func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Ind params := map[string]interface{}{ "read_only": options.ReadOnly, } + + closeFn := func() {} // no-op index, err := bleve.OpenUsing(destination, params) if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { index, err = bleve.New(destination, i.mapping) if err != nil { - closeFn := func() {} // no-op return nil, closeFn, err } - - return index, func() { index.Close() }, nil + } else if err != nil { + return nil, closeFn, err } - return index, func() { index.Close() }, err + return index, func() { index.Close() }, nil }