diff --git a/main.go b/main.go index 5bad0501..c0f72876 100644 --- a/main.go +++ b/main.go @@ -153,6 +153,7 @@ func main() { webui.WithChunkOverlap(chunkOverlap), webui.WithDatabaseURL(databaseURL), webui.WithCollectionAPIKeys(collectionAPIKeys...), + webui.WithLocalRAGURL(localRAG), ) // Single RAG provider: HTTP client when URL set, in-process when not diff --git a/pkg/localrag/client.go b/pkg/localrag/client.go index 892ba8f2..b30be0f4 100644 --- a/pkg/localrag/client.go +++ b/pkg/localrag/client.go @@ -516,3 +516,105 @@ func (c *Client) Store(collection, filePath string) error { return nil } + +// SourceInfo represents an external source for a collection (LocalRecall API contract). +type SourceInfo struct { + URL string `json:"url"` + UpdateInterval int `json:"update_interval"` // minutes + LastUpdate string `json:"last_update"` // RFC3339 +} + +// AddSource registers an external source for a collection. +func (c *Client) AddSource(collection, url string, updateIntervalMinutes int) error { + reqURL := fmt.Sprintf("%s/api/collections/%s/sources", c.BaseURL, collection) + var body struct { + URL string `json:"url"` + UpdateInterval int `json:"update_interval"` + } + body.URL = url + body.UpdateInterval = updateIntervalMinutes + if body.UpdateInterval < 1 { + body.UpdateInterval = 60 + } + payload, err := json.Marshal(body) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, reqURL, bytes.NewBuffer(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + c.addAuthHeader(req) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return parseAPIError(resp, b, "failed to add source") + } + return nil +} + +// RemoveSource removes an external source from a collection. +func (c *Client) RemoveSource(collection, url string) error { + reqURL := fmt.Sprintf("%s/api/collections/%s/sources", c.BaseURL, collection) + payload, err := json.Marshal(map[string]string{"url": url}) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodDelete, reqURL, bytes.NewBuffer(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + c.addAuthHeader(req) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return parseAPIError(resp, b, "failed to remove source") + } + return nil +} + +// ListSources returns external sources for a collection. +func (c *Client) ListSources(collection string) ([]SourceInfo, error) { + reqURL := fmt.Sprintf("%s/api/collections/%s/sources", c.BaseURL, collection) + req, err := http.NewRequest(http.MethodGet, reqURL, nil) + if err != nil { + return nil, err + } + c.addAuthHeader(req) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, parseAPIError(resp, body, "failed to list sources") + } + var wrap apiResponse + if err := json.Unmarshal(body, &wrap); err != nil || !wrap.Success { + if wrap.Error != nil { + return nil, errors.New(wrap.Error.Message) + } + return nil, fmt.Errorf("invalid response: %w", err) + } + var data struct { + Sources []SourceInfo `json:"sources"` + } + if err := json.Unmarshal(wrap.Data, &data); err != nil { + return nil, err + } + return data.Sources, nil +} diff --git a/webui/collections_backend.go b/webui/collections_backend.go new file mode 100644 index 00000000..dea11c46 --- /dev/null +++ b/webui/collections_backend.go @@ -0,0 +1,39 @@ +package webui + +import ( + "io" + "time" +) + +// CollectionSearchResult is a single search result (content + metadata) for API responses. +type CollectionSearchResult struct { + Content string `json:"content"` + Metadata map[string]string `json:"metadata,omitempty"` + ID string `json:"id,omitempty"` + Similarity float32 `json:"similarity,omitempty"` +} + +// CollectionSourceInfo is a single external source for a collection. +type CollectionSourceInfo struct { + URL string `json:"url"` + UpdateInterval int `json:"update_interval"` // minutes + LastUpdate time.Time `json:"last_update"` +} + +// CollectionsBackend is the interface used by REST handlers for collection operations. +// It is implemented by in-process state (embedded) or by an HTTP client (when LocalRAG URL is set). +type CollectionsBackend interface { + ListCollections() ([]string, error) + CreateCollection(name string) error + Upload(collection, filename string, fileBody io.Reader) error + ListEntries(collection string) ([]string, error) + GetEntryContent(collection, entry string) (content string, chunkCount int, err error) + Search(collection, query string, maxResults int) ([]CollectionSearchResult, error) + Reset(collection string) error + DeleteEntry(collection, entry string) (remainingEntries []string, err error) + AddSource(collection, url string, intervalMin int) error + RemoveSource(collection, url string) error + ListSources(collection string) ([]CollectionSourceInfo, error) + // EntryExists is used by upload handler to avoid duplicate entries. + EntryExists(collection, entry string) bool +} diff --git a/webui/collections_backend_http.go b/webui/collections_backend_http.go new file mode 100644 index 00000000..e00348a4 --- /dev/null +++ b/webui/collections_backend_http.go @@ -0,0 +1,128 @@ +package webui + +import ( + "io" + "os" + "path/filepath" + "time" + + "github.com/mudler/LocalAGI/pkg/localrag" +) + +// collectionsBackendHTTP implements CollectionsBackend using the LocalRAG HTTP API. +type collectionsBackendHTTP struct { + client *localrag.Client +} + +var _ CollectionsBackend = (*collectionsBackendHTTP)(nil) + +// NewCollectionsBackendHTTP returns a CollectionsBackend that delegates to the given HTTP client. +func NewCollectionsBackendHTTP(client *localrag.Client) CollectionsBackend { + return &collectionsBackendHTTP{client: client} +} + +func (b *collectionsBackendHTTP) ListCollections() ([]string, error) { + return b.client.ListCollections() +} + +func (b *collectionsBackendHTTP) CreateCollection(name string) error { + return b.client.CreateCollection(name) +} + +func (b *collectionsBackendHTTP) Upload(collection, filename string, fileBody io.Reader) error { + tmpDir, err := os.MkdirTemp("", "localagi-upload") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + tmpPath := filepath.Join(tmpDir, filename) + out, err := os.Create(tmpPath) + if err != nil { + return err + } + if _, err := io.Copy(out, fileBody); err != nil { + out.Close() + return err + } + if err := out.Close(); err != nil { + return err + } + return b.client.Store(collection, tmpPath) +} + +func (b *collectionsBackendHTTP) ListEntries(collection string) ([]string, error) { + return b.client.ListEntries(collection) +} + +func (b *collectionsBackendHTTP) GetEntryContent(collection, entry string) (string, int, error) { + return b.client.GetEntryContent(collection, entry) +} + +func (b *collectionsBackendHTTP) Search(collection, query string, maxResults int) ([]CollectionSearchResult, error) { + if maxResults <= 0 { + maxResults = 5 + } + results, err := b.client.Search(collection, query, maxResults) + if err != nil { + return nil, err + } + out := make([]CollectionSearchResult, 0, len(results)) + for _, r := range results { + out = append(out, CollectionSearchResult{ + ID: r.ID, + Content: r.Content, + Metadata: r.Metadata, + Similarity: r.Similarity, + }) + } + return out, nil +} + +func (b *collectionsBackendHTTP) Reset(collection string) error { + return b.client.Reset(collection) +} + +func (b *collectionsBackendHTTP) DeleteEntry(collection, entry string) ([]string, error) { + return b.client.DeleteEntry(collection, entry) +} + +func (b *collectionsBackendHTTP) AddSource(collection, url string, intervalMin int) error { + return b.client.AddSource(collection, url, intervalMin) +} + +func (b *collectionsBackendHTTP) RemoveSource(collection, url string) error { + return b.client.RemoveSource(collection, url) +} + +func (b *collectionsBackendHTTP) ListSources(collection string) ([]CollectionSourceInfo, error) { + srcs, err := b.client.ListSources(collection) + if err != nil { + return nil, err + } + out := make([]CollectionSourceInfo, 0, len(srcs)) + for _, s := range srcs { + var lastUpdate time.Time + if s.LastUpdate != "" { + lastUpdate, _ = time.Parse(time.RFC3339, s.LastUpdate) + } + out = append(out, CollectionSourceInfo{ + URL: s.URL, + UpdateInterval: s.UpdateInterval, + LastUpdate: lastUpdate, + }) + } + return out, nil +} + +func (b *collectionsBackendHTTP) EntryExists(collection, entry string) bool { + entries, err := b.client.ListEntries(collection) + if err != nil { + return false + } + for _, e := range entries { + if e == entry { + return true + } + } + return false +} diff --git a/webui/collections_backend_inprocess.go b/webui/collections_backend_inprocess.go new file mode 100644 index 00000000..ddd4c331 --- /dev/null +++ b/webui/collections_backend_inprocess.go @@ -0,0 +1,251 @@ +package webui + +import ( + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/mudler/localrecall/rag" + "github.com/mudler/localrecall/rag/sources" + "github.com/mudler/xlog" + "github.com/sashabaranov/go-openai" +) + +func newVectorEngine( + vectorEngineType string, + llmClient *openai.Client, + apiURL, apiKey, collectionName, dbPath, fileAssets, embeddingModel, databaseURL string, + maxChunkSize, chunkOverlap int, +) *rag.PersistentKB { + switch vectorEngineType { + case "chromem": + xlog.Info("Chromem collection", "collectionName", collectionName, "dbPath", dbPath) + return rag.NewPersistentChromeCollection(llmClient, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap) + case "localai": + xlog.Info("LocalAI collection", "collectionName", collectionName, "apiURL", apiURL) + return rag.NewPersistentLocalAICollection(llmClient, apiURL, apiKey, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap) + case "postgres": + if databaseURL == "" { + xlog.Error("DATABASE_URL is required for PostgreSQL engine") + return nil + } + xlog.Info("PostgreSQL collection", "collectionName", collectionName, "databaseURL", databaseURL) + return rag.NewPersistentPostgresCollection(llmClient, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap, databaseURL) + default: + xlog.Error("Unknown vector engine", "engine", vectorEngineType) + return nil + } +} + +// collectionsBackendInProcess implements CollectionsBackend using in-process state. +type collectionsBackendInProcess struct { + state *collectionsState + cfg *Config + openAIClient *openai.Client +} + +var _ CollectionsBackend = (*collectionsBackendInProcess)(nil) + +func (b *collectionsBackendInProcess) ListCollections() ([]string, error) { + return rag.ListAllCollections(b.cfg.CollectionDBPath), nil +} + +func (b *collectionsBackendInProcess) CreateCollection(name string) error { + collection := newVectorEngine(b.cfg.VectorEngine, b.openAIClient, b.cfg.LLMAPIURL, b.cfg.LLMAPIKey, name, b.cfg.CollectionDBPath, b.cfg.FileAssets, b.cfg.EmbeddingModel, b.cfg.DatabaseURL, b.cfg.MaxChunkingSize, b.cfg.ChunkOverlap) + if collection == nil { + return fmt.Errorf("unsupported or misconfigured vector engine") + } + b.state.mu.Lock() + b.state.collections[name] = collection + b.state.sourceManager.RegisterCollection(name, collection) + b.state.mu.Unlock() + return nil +} + +func (b *collectionsBackendInProcess) Upload(collection, filename string, fileBody io.Reader) error { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return fmt.Errorf("collection not found: %s", collection) + } + filePath := filepath.Join(b.cfg.FileAssets, filename) + out, err := os.Create(filePath) + if err != nil { + return err + } + defer out.Close() + if _, err := io.Copy(out, fileBody); err != nil { + return err + } + now := time.Now().Format(time.RFC3339) + return kb.Store(filePath, map[string]string{"created_at": now}) +} + +func (b *collectionsBackendInProcess) ListEntries(collection string) ([]string, error) { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return nil, fmt.Errorf("collection not found: %s", collection) + } + return kb.ListDocuments(), nil +} + +func (b *collectionsBackendInProcess) GetEntryContent(collection, entry string) (string, int, error) { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return "", 0, fmt.Errorf("collection not found: %s", collection) + } + return kb.GetEntryFileContent(entry) +} + +func (b *collectionsBackendInProcess) Search(collection, query string, maxResults int) ([]CollectionSearchResult, error) { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return nil, fmt.Errorf("collection not found: %s", collection) + } + if maxResults <= 0 { + entries := kb.ListDocuments() + if len(entries) >= 5 { + maxResults = 5 + } else { + maxResults = 1 + } + } + results, err := kb.Search(query, maxResults) + if err != nil { + return nil, err + } + out := make([]CollectionSearchResult, 0, len(results)) + for _, r := range results { + out = append(out, CollectionSearchResult{ + ID: r.ID, + Content: r.Content, + Metadata: r.Metadata, + Similarity: r.Similarity, + }) + } + return out, nil +} + +func (b *collectionsBackendInProcess) Reset(collection string) error { + b.state.mu.Lock() + kb, exists := b.state.collections[collection] + if exists { + delete(b.state.collections, collection) + } + b.state.mu.Unlock() + if !exists { + return fmt.Errorf("collection not found: %s", collection) + } + return kb.Reset() +} + +func (b *collectionsBackendInProcess) DeleteEntry(collection, entry string) ([]string, error) { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return nil, fmt.Errorf("collection not found: %s", collection) + } + if err := kb.RemoveEntry(entry); err != nil { + return nil, err + } + return kb.ListDocuments(), nil +} + +func (b *collectionsBackendInProcess) AddSource(collection, url string, intervalMin int) error { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return fmt.Errorf("collection not found: %s", collection) + } + b.state.sourceManager.RegisterCollection(collection, kb) + return b.state.sourceManager.AddSource(collection, url, time.Duration(intervalMin)*time.Minute) +} + +func (b *collectionsBackendInProcess) RemoveSource(collection, url string) error { + return b.state.sourceManager.RemoveSource(collection, url) +} + +func (b *collectionsBackendInProcess) ListSources(collection string) ([]CollectionSourceInfo, error) { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return nil, fmt.Errorf("collection not found: %s", collection) + } + srcs := kb.GetExternalSources() + out := make([]CollectionSourceInfo, 0, len(srcs)) + for _, s := range srcs { + out = append(out, CollectionSourceInfo{ + URL: s.URL, + UpdateInterval: int(s.UpdateInterval.Minutes()), + LastUpdate: s.LastUpdate, + }) + } + return out, nil +} + +func (b *collectionsBackendInProcess) EntryExists(collection, entry string) bool { + b.state.mu.RLock() + kb, exists := b.state.collections[collection] + b.state.mu.RUnlock() + if !exists { + return false + } + return kb.EntryExists(entry) +} + +// NewInProcessCollectionsBackend creates in-process state (load from disk, start sourceManager) and returns +// a CollectionsBackend and the state. The caller should set app.collectionsState = state for RAG provider. +func NewInProcessCollectionsBackend(cfg *Config) (CollectionsBackend, *collectionsState) { + state := &collectionsState{ + collections: collectionList{}, + sourceManager: rag.NewSourceManager(&sources.Config{}), + } + + openaiConfig := openai.DefaultConfig(cfg.LLMAPIKey) + openaiConfig.BaseURL = cfg.LLMAPIURL + openAIClient := openai.NewClientWithConfig(openaiConfig) + + os.MkdirAll(cfg.CollectionDBPath, 0755) + os.MkdirAll(cfg.FileAssets, 0755) + + colls := rag.ListAllCollections(cfg.CollectionDBPath) + for _, c := range colls { + collection := newVectorEngine(cfg.VectorEngine, openAIClient, cfg.LLMAPIURL, cfg.LLMAPIKey, c, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap) + if collection != nil { + state.collections[c] = collection + state.sourceManager.RegisterCollection(c, collection) + } + } + + state.ensureCollection = func(name string) (*rag.PersistentKB, bool) { + state.mu.Lock() + defer state.mu.Unlock() + if kb, ok := state.collections[name]; ok && kb != nil { + return kb, true + } + collection := newVectorEngine(cfg.VectorEngine, openAIClient, cfg.LLMAPIURL, cfg.LLMAPIKey, name, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap) + if collection == nil { + return nil, false + } + state.collections[name] = collection + state.sourceManager.RegisterCollection(name, collection) + return collection, true + } + + state.sourceManager.Start() + + backend := &collectionsBackendInProcess{state: state, cfg: cfg, openAIClient: openAIClient} + return backend, state +} diff --git a/webui/collections_handlers.go b/webui/collections_handlers.go index 87ceed54..7254ebd5 100644 --- a/webui/collections_handlers.go +++ b/webui/collections_handlers.go @@ -3,19 +3,14 @@ package webui import ( "crypto/subtle" "fmt" - "io" "net/url" - "os" - "path/filepath" "strings" "sync" "time" "github.com/gofiber/fiber/v2" "github.com/mudler/localrecall/rag" - "github.com/mudler/localrecall/rag/sources" "github.com/mudler/xlog" - "github.com/sashabaranov/go-openai" ) type collectionList map[string]*rag.PersistentKB @@ -69,78 +64,8 @@ func collectionsErrorResponse(code, message, details string) collectionsAPIRespo } } -func newVectorEngine( - vectorEngineType string, - llmClient *openai.Client, - apiURL, apiKey, collectionName, dbPath, fileAssets, embeddingModel, databaseURL string, - maxChunkSize, chunkOverlap int, -) *rag.PersistentKB { - switch vectorEngineType { - case "chromem": - xlog.Info("Chromem collection", "collectionName", collectionName, "dbPath", dbPath) - return rag.NewPersistentChromeCollection(llmClient, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap) - case "localai": - xlog.Info("LocalAI collection", "collectionName", collectionName, "apiURL", apiURL) - return rag.NewPersistentLocalAICollection(llmClient, apiURL, apiKey, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap) - case "postgres": - if databaseURL == "" { - xlog.Error("DATABASE_URL is required for PostgreSQL engine") - return nil - } - xlog.Info("PostgreSQL collection", "collectionName", collectionName, "databaseURL", databaseURL) - return rag.NewPersistentPostgresCollection(llmClient, collectionName, dbPath, fileAssets, embeddingModel, maxChunkSize, chunkOverlap, databaseURL) - default: - xlog.Error("Unknown vector engine", "engine", vectorEngineType) - return nil - } -} - -// RegisterCollectionRoutes mounts /api/collections* routes and initializes collections state. -func (app *App) RegisterCollectionRoutes(webapp *fiber.App, cfg *Config) { - state := &collectionsState{ - collections: collectionList{}, - sourceManager: rag.NewSourceManager(&sources.Config{}), - } - - openaiConfig := openai.DefaultConfig(cfg.LLMAPIKey) - openaiConfig.BaseURL = cfg.LLMAPIURL - openAIClient := openai.NewClientWithConfig(openaiConfig) - - // Ensure dirs exist - os.MkdirAll(cfg.CollectionDBPath, 0755) - os.MkdirAll(cfg.FileAssets, 0755) - - // Load existing collections from disk - colls := rag.ListAllCollections(cfg.CollectionDBPath) - for _, c := range colls { - collection := newVectorEngine(cfg.VectorEngine, openAIClient, cfg.LLMAPIURL, cfg.LLMAPIKey, c, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap) - if collection != nil { - state.collections[c] = collection - state.sourceManager.RegisterCollection(c, collection) - } - } - - // Get-or-create for internal RAG (agents use collection name = agent name) - state.ensureCollection = func(name string) (*rag.PersistentKB, bool) { - state.mu.Lock() - defer state.mu.Unlock() - if kb, ok := state.collections[name]; ok && kb != nil { - return kb, true - } - collection := newVectorEngine(cfg.VectorEngine, openAIClient, cfg.LLMAPIURL, cfg.LLMAPIKey, name, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap) - if collection == nil { - return nil, false - } - state.collections[name] = collection - state.sourceManager.RegisterCollection(name, collection) - return collection, true - } - - state.sourceManager.Start() - - app.collectionsState = state - - // Optional API key middleware for /api/collections +// RegisterCollectionRoutes mounts /api/collections* routes. backend is either from NewInProcessCollectionsBackend or NewCollectionsBackendHTTP. +func (app *App) RegisterCollectionRoutes(webapp *fiber.App, cfg *Config, backend CollectionsBackend) { apiKeys := cfg.CollectionAPIKeys if len(apiKeys) == 0 { apiKeys = cfg.ApiKeys @@ -158,21 +83,33 @@ func (app *App) RegisterCollectionRoutes(webapp *fiber.App, cfg *Config) { }) } - // Route handlers close over state and config - webapp.Post("/api/collections", app.createCollection(state, cfg, openAIClient)) - webapp.Get("/api/collections", app.listCollections(cfg)) - webapp.Post("/api/collections/:name/upload", app.uploadFile(state, cfg)) - webapp.Get("/api/collections/:name/entries", app.listFiles(state)) - webapp.Get("/api/collections/:name/entries/*", app.getEntryContent(state)) - webapp.Post("/api/collections/:name/search", app.searchCollection(state)) - webapp.Post("/api/collections/:name/reset", app.resetCollection(state)) - webapp.Delete("/api/collections/:name/entry/delete", app.deleteEntryFromCollection(state)) - webapp.Post("/api/collections/:name/sources", app.registerExternalSource(state)) - webapp.Delete("/api/collections/:name/sources", app.removeExternalSource(state)) - webapp.Get("/api/collections/:name/sources", app.listSources(state)) + webapp.Post("/api/collections", app.createCollection(backend)) + webapp.Get("/api/collections", app.listCollections(backend)) + webapp.Post("/api/collections/:name/upload", app.uploadFile(backend)) + webapp.Get("/api/collections/:name/entries", app.listFiles(backend)) + webapp.Get("/api/collections/:name/entries/*", app.getEntryContent(backend)) + webapp.Post("/api/collections/:name/search", app.searchCollection(backend)) + webapp.Post("/api/collections/:name/reset", app.resetCollection(backend)) + webapp.Delete("/api/collections/:name/entry/delete", app.deleteEntryFromCollection(backend)) + webapp.Post("/api/collections/:name/sources", app.registerExternalSource(backend)) + webapp.Delete("/api/collections/:name/sources", app.removeExternalSource(backend)) + webapp.Get("/api/collections/:name/sources", app.listSources(backend)) } -func (app *App) createCollection(state *collectionsState, cfg *Config, client *openai.Client) func(c *fiber.Ctx) error { +func collectionErrStatus(err error, collection string) int { + if err == nil { + return 0 + } + if strings.Contains(err.Error(), "collection not found") { + return fiber.StatusNotFound + } + if strings.Contains(err.Error(), "entry not found") { + return fiber.StatusNotFound + } + return fiber.StatusInternalServerError +} + +func (app *App) createCollection(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { var r struct { Name string `json:"name"` @@ -180,17 +117,9 @@ func (app *App) createCollection(state *collectionsState, cfg *Config, client *o if err := c.BodyParser(&r); err != nil { return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Invalid request", err.Error())) } - - collection := newVectorEngine(cfg.VectorEngine, client, cfg.LLMAPIURL, cfg.LLMAPIKey, r.Name, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap) - if collection == nil { - return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to create collection", "unsupported or misconfigured vector engine")) + if err := backend.CreateCollection(r.Name); err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to create collection", err.Error())) } - - state.mu.Lock() - state.collections[r.Name] = collection - state.sourceManager.RegisterCollection(r.Name, collection) - state.mu.Unlock() - return c.Status(fiber.StatusCreated).JSON(collectionsSuccessResponse("Collection created successfully", map[string]interface{}{ "name": r.Name, "created_at": time.Now().Format(time.RFC3339), @@ -198,9 +127,12 @@ func (app *App) createCollection(state *collectionsState, cfg *Config, client *o } } -func (app *App) listCollections(cfg *Config) func(c *fiber.Ctx) error { +func (app *App) listCollections(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { - collectionsList := rag.ListAllCollections(cfg.CollectionDBPath) + collectionsList, err := backend.ListCollections() + if err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to list collections", err.Error())) + } return c.JSON(collectionsSuccessResponse("Collections retrieved successfully", map[string]interface{}{ "collections": collectionsList, "count": len(collectionsList), @@ -208,22 +140,14 @@ func (app *App) listCollections(cfg *Config) func(c *fiber.Ctx) error { } } -func (app *App) uploadFile(state *collectionsState, cfg *Config) func(c *fiber.Ctx) error { +func (app *App) uploadFile(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - file, err := c.FormFile("file") if err != nil { xlog.Error("Failed to read file", err) return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Failed to read file", err.Error())) } - f, err := file.Open() if err != nil { xlog.Error("Failed to open file", err) @@ -231,31 +155,20 @@ func (app *App) uploadFile(state *collectionsState, cfg *Config) func(c *fiber.C } defer f.Close() - filePath := filepath.Join(cfg.FileAssets, file.Filename) - out, err := os.Create(filePath) - if err != nil { - xlog.Error("Failed to create file", err) - return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to create file", err.Error())) - } - defer out.Close() - - _, err = io.Copy(out, f) - if err != nil { - xlog.Error("Failed to copy file", err) - return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to copy file", err.Error())) - } - - if collection.EntryExists(file.Filename) { + if backend.EntryExists(name, file.Filename) { xlog.Info("Entry already exists") return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeConflict, "Entry already exists", fmt.Sprintf("File '%s' has already been uploaded to collection '%s'", file.Filename, name))) } - now := time.Now().Format(time.RFC3339) - if err := collection.Store(filePath, map[string]string{"created_at": now}); err != nil { + if err := backend.Upload(name, file.Filename, f); err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } xlog.Error("Failed to store file", err) return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to store file", err.Error())) } + now := time.Now().Format(time.RFC3339) return c.JSON(collectionsSuccessResponse("File uploaded successfully", map[string]interface{}{ "filename": file.Filename, "collection": name, @@ -264,17 +177,16 @@ func (app *App) uploadFile(state *collectionsState, cfg *Config) func(c *fiber.C } } -func (app *App) listFiles(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) listFiles(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + entries, err := backend.ListEntries(name) + if err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } + return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to list entries", err.Error())) } - - entries := collection.ListDocuments() return c.JSON(collectionsSuccessResponse("Entries retrieved successfully", map[string]interface{}{ "collection": name, "entries": entries, @@ -284,7 +196,7 @@ func (app *App) listFiles(state *collectionsState) func(c *fiber.Ctx) error { } // getEntryContent handles GET /api/collections/:name/entries/:entry (Fiber uses * for the rest of path). -func (app *App) getEntryContent(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) getEntryContent(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") entryParam := c.Params("*") @@ -296,17 +208,13 @@ func (app *App) getEntryContent(state *collectionsState) func(c *fiber.Ctx) erro entry = entryParam } - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - - content, chunkCount, err := collection.GetEntryFileContent(entry) + content, chunkCount, err := backend.GetEntryContent(name, entry) if err != nil { - if strings.Contains(err.Error(), "entry not found") { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Entry not found", fmt.Sprintf("Entry '%s' does not exist in collection '%s'", entry, name))) + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + if strings.Contains(err.Error(), "entry not found") { + return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Entry not found", fmt.Sprintf("Entry '%s' does not exist in collection '%s'", entry, name))) + } + return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) } if strings.Contains(err.Error(), "not implemented") || strings.Contains(err.Error(), "unsupported file type") { return c.Status(fiber.StatusNotImplemented).JSON(collectionsErrorResponse(errCodeInternalError, "Not supported", err.Error())) @@ -323,16 +231,9 @@ func (app *App) getEntryContent(state *collectionsState) func(c *fiber.Ctx) erro } } -func (app *App) searchCollection(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) searchCollection(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - var r struct { Query string `json:"query"` MaxResults int `json:"max_results"` @@ -341,16 +242,11 @@ func (app *App) searchCollection(state *collectionsState) func(c *fiber.Ctx) err return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Invalid request", err.Error())) } - if r.MaxResults == 0 { - if len(collection.ListDocuments()) >= 5 { - r.MaxResults = 5 - } else { - r.MaxResults = 1 - } - } - - results, err := collection.Search(r.Query, r.MaxResults) + results, err := backend.Search(name, r.Query, r.MaxResults) if err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to search collection", err.Error())) } @@ -363,24 +259,15 @@ func (app *App) searchCollection(state *collectionsState) func(c *fiber.Ctx) err } } -func (app *App) resetCollection(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) resetCollection(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.Lock() - collection, exists := state.collections[name] - if exists { - delete(state.collections, name) - } - state.mu.Unlock() - - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - - if err := collection.Reset(); err != nil { + if err := backend.Reset(name); err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to reset collection", err.Error())) } - return c.JSON(collectionsSuccessResponse("Collection reset successfully", map[string]interface{}{ "collection": name, "reset_at": time.Now().Format(time.RFC3339), @@ -388,16 +275,9 @@ func (app *App) resetCollection(state *collectionsState) func(c *fiber.Ctx) erro } } -func (app *App) deleteEntryFromCollection(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) deleteEntryFromCollection(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - var r struct { Entry string `json:"entry"` } @@ -405,11 +285,14 @@ func (app *App) deleteEntryFromCollection(state *collectionsState) func(c *fiber return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Invalid request", err.Error())) } - if err := collection.RemoveEntry(r.Entry); err != nil { + remainingEntries, err := backend.DeleteEntry(name, r.Entry) + if err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to remove entry", err.Error())) } - remainingEntries := collection.ListDocuments() return c.JSON(collectionsSuccessResponse("Entry deleted successfully", map[string]interface{}{ "deleted_entry": r.Entry, "remaining_entries": remainingEntries, @@ -418,16 +301,9 @@ func (app *App) deleteEntryFromCollection(state *collectionsState) func(c *fiber } } -func (app *App) registerExternalSource(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) registerExternalSource(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) - } - var r struct { URL string `json:"url"` UpdateInterval int `json:"update_interval"` @@ -435,13 +311,14 @@ func (app *App) registerExternalSource(state *collectionsState) func(c *fiber.Ct if err := c.BodyParser(&r); err != nil { return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Invalid request", err.Error())) } - if r.UpdateInterval < 1 { r.UpdateInterval = 60 } - state.sourceManager.RegisterCollection(name, collection) - if err := state.sourceManager.AddSource(name, r.URL, time.Duration(r.UpdateInterval)*time.Minute); err != nil { + if err := backend.AddSource(name, r.URL, r.UpdateInterval); err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to register source", err.Error())) } @@ -453,10 +330,9 @@ func (app *App) registerExternalSource(state *collectionsState) func(c *fiber.Ct } } -func (app *App) removeExternalSource(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) removeExternalSource(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - var r struct { URL string `json:"url"` } @@ -464,7 +340,7 @@ func (app *App) removeExternalSource(state *collectionsState) func(c *fiber.Ctx) return c.Status(fiber.StatusBadRequest).JSON(collectionsErrorResponse(errCodeInvalidRequest, "Invalid request", err.Error())) } - if err := state.sourceManager.RemoveSource(name, r.URL); err != nil { + if err := backend.RemoveSource(name, r.URL); err != nil { return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to remove source", err.Error())) } @@ -475,22 +351,22 @@ func (app *App) removeExternalSource(state *collectionsState) func(c *fiber.Ctx) } } -func (app *App) listSources(state *collectionsState) func(c *fiber.Ctx) error { +func (app *App) listSources(backend CollectionsBackend) func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error { name := c.Params("name") - state.mu.RLock() - collection, exists := state.collections[name] - state.mu.RUnlock() - if !exists { - return c.Status(fiber.StatusNotFound).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + srcs, err := backend.ListSources(name) + if err != nil { + if status := collectionErrStatus(err, name); status == fiber.StatusNotFound { + return c.Status(status).JSON(collectionsErrorResponse(errCodeNotFound, "Collection not found", fmt.Sprintf("Collection '%s' does not exist", name))) + } + return c.Status(fiber.StatusInternalServerError).JSON(collectionsErrorResponse(errCodeInternalError, "Failed to list sources", err.Error())) } - srcs := collection.GetExternalSources() sourcesList := make([]map[string]interface{}, 0, len(srcs)) for _, source := range srcs { sourcesList = append(sourcesList, map[string]interface{}{ "url": source.URL, - "update_interval": int(source.UpdateInterval.Minutes()), + "update_interval": source.UpdateInterval, "last_update": source.LastUpdate.Format(time.RFC3339), }) } diff --git a/webui/options.go b/webui/options.go index c32811b3..4fb3f50f 100644 --- a/webui/options.go +++ b/webui/options.go @@ -20,14 +20,16 @@ type Config struct { ConversationStoreDuration time.Duration // Collections / knowledge base (LocalRecall) - CollectionDBPath string - FileAssets string - VectorEngine string - EmbeddingModel string - MaxChunkingSize int - ChunkOverlap int - CollectionAPIKeys []string - DatabaseURL string + CollectionDBPath string + FileAssets string + VectorEngine string + EmbeddingModel string + MaxChunkingSize int + ChunkOverlap int + CollectionAPIKeys []string + DatabaseURL string + // LocalRAGURL when set uses HTTP backend for collections API; when empty uses in-process backend. + LocalRAGURL string } type Option func(*Config) @@ -144,6 +146,12 @@ func WithDatabaseURL(url string) Option { } } +func WithLocalRAGURL(url string) Option { + return func(c *Config) { + c.LocalRAGURL = url + } +} + func (c *Config) Apply(opts ...Option) { for _, opt := range opts { opt(c) diff --git a/webui/routes.go b/webui/routes.go index 9deff93d..f0669511 100644 --- a/webui/routes.go +++ b/webui/routes.go @@ -17,6 +17,7 @@ import ( "github.com/mudler/LocalAGI/core/state" "github.com/mudler/LocalAGI/core/types" + "github.com/mudler/LocalAGI/pkg/localrag" "github.com/mudler/LocalAGI/services" "github.com/mudler/xlog" ) @@ -213,8 +214,17 @@ func (app *App) registerRoutes(pool *state.AgentPool, webapp *fiber.App) { webapp.Post("/api/git-repos/:id/sync", app.SyncGitRepo) webapp.Post("/api/git-repos/:id/toggle", app.ToggleGitRepo) - // Collections / knowledge base API (LocalRecall-compatible) - app.RegisterCollectionRoutes(webapp, app.config) + // Collections / knowledge base API (LocalRecall-compatible). Same interface for in-process or remote. + var collectionsBackend CollectionsBackend + if app.config.LocalRAGURL != "" { + client := localrag.NewClient(app.config.LocalRAGURL, app.config.LLMAPIKey) + collectionsBackend = NewCollectionsBackendHTTP(client) + } else { + var state *collectionsState + collectionsBackend, state = NewInProcessCollectionsBackend(app.config) + app.collectionsState = state + } + app.RegisterCollectionRoutes(webapp, app.config, collectionsBackend) } var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")