diff --git a/Cargo.lock b/Cargo.lock index 406de4b9..bf7a9ca7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -787,6 +787,7 @@ dependencies = [ "rayon", "rcgen", "regex", + "reqwest", "rmcp", "rustls", "serde", @@ -1663,8 +1664,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1674,9 +1677,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi 5.3.0", "wasip2", + "wasm-bindgen", ] [[package]] @@ -1932,6 +1937,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots 1.0.6", ] [[package]] @@ -2498,6 +2504,12 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lz4_flex" version = "0.11.6" @@ -3451,6 +3463,61 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.45" @@ -3762,6 +3829,8 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", "rustls-pki-types", "serde", "serde_json", @@ -3769,6 +3838,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-rustls", "tokio-util", "tower", "tower-http", @@ -3778,6 +3848,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots 1.0.6", ] [[package]] @@ -3916,6 +3987,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ + "web-time", "zeroize", ] @@ -4740,6 +4812,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokenizers" version = "0.22.2" diff --git a/Cargo.toml b/Cargo.toml index 8a759a60..f4749bbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ tree-sitter-lua = "0.5.0" tree-sitter-clojure-orchard = "0.2.5" glob = "0.3.3" async-trait = "0.1.89" +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } sysinfo = "0.38.4" indexmap = { version = "2.13.0", features = ["serde"] } diff --git a/src/cli/commands/index_parallel.rs b/src/cli/commands/index_parallel.rs index 22d9e926..fcc32243 100644 --- a/src/cli/commands/index_parallel.rs +++ b/src/cli/commands/index_parallel.rs @@ -7,9 +7,10 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use crate::config::Settings; +use crate::indexing::facade::{build_embedding_backend, resolve_remote_model_name}; use crate::indexing::pipeline::{IncrementalStats, Phase2Stats, Pipeline, PipelineConfig}; use crate::io::status_line::{ProgressBar, ProgressBarOptions, ProgressBarStyle}; -use crate::semantic::SimpleSemanticSearch; +use crate::semantic::{EmbeddingBackend, SemanticSearchError, SimpleSemanticSearch}; use crate::storage::DocumentIndex; /// Arguments for the index-parallel command. @@ -71,8 +72,10 @@ pub fn run(args: IndexParallelArgs, settings: &Settings) { } }; - // Create semantic search (for embeddings) - let semantic = create_semantic_search(settings, &semantic_path); + // Create semantic search (for storing/loading/searching embeddings) + // and a separate embedding backend for generating new embeddings. + let (semantic, embedding_backend) = + create_semantic_search(settings, &semantic_path); // Create pipeline let settings_arc = Arc::new(settings.clone()); @@ -102,7 +105,7 @@ pub fn run(args: IndexParallelArgs, settings: &Settings) { tracing::info!(target: "pipeline", "Indexing directory ({mode}): {}", path.display()); - match pipeline.index_incremental(path, Arc::clone(&index), semantic.clone(), None, force) { + match pipeline.index_incremental(path, Arc::clone(&index), semantic.clone(), embedding_backend.clone(), force) { Ok(stats) => { display_incremental_stats(&stats, progress); } @@ -119,42 +122,103 @@ pub fn run(args: IndexParallelArgs, settings: &Settings) { } } -/// Create semantic search instance if enabled in settings. +/// Create semantic search instance and embedding backend if enabled in settings. +/// +/// Returns `(semantic, backend)` where: +/// - `semantic` stores/loads/searches the embedding vectors +/// - `backend` generates new embeddings (local fastembed pool or remote HTTP) fn create_semantic_search( settings: &Settings, semantic_path: &Path, -) -> Option>> { +) -> (Option>>, Option>) { if !settings.semantic_search.enabled { tracing::debug!(target: "pipeline", "Semantic search disabled"); - return None; + return (None, None); } + let is_remote = std::env::var("CODANNA_EMBED_URL").is_ok() + || settings.semantic_search.remote_url.is_some(); + + // Build embedding backend (local pool or remote HTTP) + let backend = match build_embedding_backend(&settings.semantic_search) { + Ok(b) => Arc::new(b), + Err(e) => { + tracing::warn!(target: "pipeline", "Failed to initialize embedding backend: {e}"); + return (None, None); + } + }; + let model = &settings.semantic_search.model; - // Try to load existing embeddings first - if semantic_path.exists() { - match SimpleSemanticSearch::load(semantic_path) { - Ok(semantic) => { + // Load existing embeddings or create fresh instance. + // After loading, verify dimensions match the backend so we don't silently + // drop all new embeddings during an incremental run after a backend switch. + let semantic = if semantic_path.exists() { + // In remote mode load without initialising a local fastembed model + let load_result = if is_remote { + SimpleSemanticSearch::load_remote(semantic_path) + } else { + SimpleSemanticSearch::load(semantic_path) + }; + match load_result { + Ok(s) => { + let index_dim = s.dimensions(); + let backend_dim = backend.dimensions(); + if index_dim != backend_dim { + tracing::error!( + target: "pipeline", + "Semantic index dimension mismatch: index has {index_dim}d but backend produces {backend_dim}d. \ + Re-index with: codanna index-parallel --force" + ); + std::process::exit(1); + } + let index_is_remote = s.is_remote_index(); + if index_is_remote != is_remote { + tracing::warn!( + target: "pipeline", + "Backend kind changed (index={}, current={}). \ + Embedding spaces may differ — similarity scores could be inaccurate. \ + Re-index with --force to fix.", + if index_is_remote { "remote" } else { "local" }, + if is_remote { "remote" } else { "local" }, + ); + } tracing::debug!(target: "pipeline", "Loaded existing embeddings from {}", semantic_path.display()); - return Some(Arc::new(Mutex::new(semantic))); + Some(Arc::new(Mutex::new(s))) + } + Err(SemanticSearchError::DimensionMismatch { suggestion, .. }) => { + // Incompatible existing index — cannot continue silently as stored + // vectors are structurally wrong for this backend. + tracing::error!(target: "pipeline", "Semantic index incompatible: {suggestion}"); + std::process::exit(1); } Err(e) => { - tracing::warn!(target: "pipeline", "Failed to load embeddings: {e}"); + tracing::warn!(target: "pipeline", "Failed to load embeddings, continuing without semantic search: {e}"); + None } } - } - - // Create new semantic search instance - match SimpleSemanticSearch::from_model_name(model) { - Ok(semantic) => { - tracing::debug!(target: "pipeline", "Created new semantic search with model: {model}"); - Some(Arc::new(Mutex::new(semantic))) - } - Err(e) => { - tracing::warn!(target: "pipeline", "Failed to initialize semantic search: {e}"); - None + } else { + let new_result = if is_remote { + Ok(SimpleSemanticSearch::new_empty( + backend.dimensions(), + &resolve_remote_model_name(&settings.semantic_search), + )) + } else { + SimpleSemanticSearch::from_model_name(model) + }; + match new_result { + Ok(s) => { + tracing::debug!(target: "pipeline", "Created new semantic search with model: {model}"); + Some(Arc::new(Mutex::new(s))) + } + Err(e) => { + tracing::warn!(target: "pipeline", "Failed to initialize semantic search: {e}"); + None + } } - } + }; + + (semantic, Some(backend)) } fn display_incremental_stats(stats: &IncrementalStats, with_progress: bool) { diff --git a/src/config.rs b/src/config.rs index c8142131..3a3645b3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -205,6 +205,22 @@ pub struct SemanticSearchConfig { /// Number of parallel embedding model instances #[serde(default = "default_embedding_threads")] pub embedding_threads: usize, + + /// Remote embedding server URL (OpenAI-compatible, e.g. http://host:8100). + /// When set, local fastembed is bypassed and this endpoint is used instead. + /// Overrideable via CODANNA_EMBED_URL env var. + #[serde(default)] + pub remote_url: Option, + + /// Model name to send to the remote embedding server. + /// Overrideable via CODANNA_EMBED_MODEL env var. + #[serde(default)] + pub remote_model: Option, + + /// Output dimension of the remote embedding model. + /// Required when remote_url is set. Overrideable via CODANNA_EMBED_DIM env var. + #[serde(default)] + pub remote_dim: Option, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -424,6 +440,9 @@ impl Default for SemanticSearchConfig { model: default_embedding_model(), threshold: default_similarity_threshold(), embedding_threads: default_embedding_threads(), + remote_url: None, + remote_model: None, + remote_dim: None, } } } diff --git a/src/documents/store.rs b/src/documents/store.rs index b5877d56..88e3f933 100644 --- a/src/documents/store.rs +++ b/src/documents/store.rs @@ -226,11 +226,14 @@ fn highlight_keywords(text: &str, query: &str) -> String { let mut merged: Vec<(usize, usize)> = Vec::new(); for (start, end) in matches { if let Some(last) = merged.last_mut() { - // Check if adjacent: only spaces/tabs between (no newlines) - let between = &text[last.1..start]; - let is_adjacent = start <= last.1 || between.chars().all(|c| c == ' ' || c == '\t'); + // Check overlap first — slice is only safe when start > last.1 + let is_adjacent = if start <= last.1 { + true // overlapping, merge unconditionally + } else { + // Adjacent: only spaces/tabs between ranges (no newlines) + text[last.1..start].chars().all(|c| c == ' ' || c == '\t') + }; if is_adjacent { - // Merge: extend the previous range last.1 = last.1.max(end); continue; } diff --git a/src/indexing/facade.rs b/src/indexing/facade.rs index ca55de54..fe277bf3 100644 --- a/src/indexing/facade.rs +++ b/src/indexing/facade.rs @@ -25,7 +25,8 @@ use crate::config::Settings; use crate::indexing::pipeline::Pipeline; -use crate::semantic::{EmbeddingPool, SimpleSemanticSearch}; +use crate::semantic::{EmbeddingBackend, EmbeddingPool, RemoteEmbedder, SemanticSearchError, SimpleSemanticSearch}; +use crate::semantic::remote::run_async; use crate::storage::{DocumentIndex, SearchResult}; use crate::symbol::context::{ContextIncludes, SymbolContext, SymbolRelationships}; use crate::{FileId, IndexError, RelationKind, Relationship, Symbol, SymbolId, SymbolKind}; @@ -79,7 +80,7 @@ pub struct IndexFacade { semantic_search: Option>>, /// Optional embedding pool for parallel embedding generation - embedding_pool: Option>, + embedding_pool: Option>, /// Configuration settings: Arc, @@ -89,6 +90,10 @@ pub struct IndexFacade { /// Base path for index storage index_base: PathBuf, + + /// Set to true when load_semantic_search fails with DimensionMismatch so + /// hot-reload and other callers do not retry on every reload cycle. + semantic_incompatible: bool, } impl IndexFacade { @@ -118,6 +123,7 @@ impl IndexFacade { settings, indexed_paths: HashSet::new(), index_base, + semantic_incompatible: false, }) } @@ -142,6 +148,7 @@ impl IndexFacade { settings, indexed_paths: HashSet::new(), index_base, + semantic_incompatible: false, } } @@ -174,17 +181,25 @@ impl IndexFacade { let semantic_path = self.index_base.join("semantic"); std::fs::create_dir_all(&semantic_path)?; - let model = &self.settings.semantic_search.model; + let backend = build_embedding_backend(&self.settings.semantic_search)?; + let backend = Arc::new(backend); + + // In remote mode, skip local fastembed init; use new_empty so the + // SemanticSearch instance carries the correct dimension from the backend. + let is_remote = self.settings.semantic_search.remote_url.is_some() + || std::env::var("CODANNA_EMBED_URL").is_ok(); + let semantic = if is_remote { + SimpleSemanticSearch::new_empty( + backend.dimensions(), + &resolve_remote_model_name(&self.settings.semantic_search), + ) + } else { + let model = &self.settings.semantic_search.model; + SimpleSemanticSearch::from_model_name(model)? + }; - let semantic = SimpleSemanticSearch::from_model_name(model)?; self.semantic_search = Some(Arc::new(Mutex::new(semantic))); - - // Create embedding pool for parallel generation - let pool_size = self.settings.semantic_search.embedding_threads; - let embedding_model = crate::vector::parse_embedding_model(model) - .map_err(|e| IndexError::General(format!("Failed to parse embedding model: {e}")))?; - let pool = EmbeddingPool::new(pool_size, embedding_model)?; - self.embedding_pool = Some(Arc::new(pool)); + self.embedding_pool = Some(backend); Ok(()) } @@ -194,6 +209,12 @@ impl IndexFacade { self.semantic_search.is_some() } + /// Returns true if a previous load_semantic_search call failed with + /// DimensionMismatch, meaning retrying would always fail until re-indexed. + pub fn is_semantic_incompatible(&self) -> bool { + self.semantic_incompatible + } + /// Save semantic search data to disk. pub fn save_semantic_search(&self, path: &Path) -> FacadeResult<()> { if let Some(ref semantic) = self.semantic_search { @@ -209,21 +230,92 @@ impl IndexFacade { /// Embedding pool for generating new embeddings is initialized lazily. pub fn load_semantic_search(&mut self, path: &Path) -> FacadeResult { if path.join("metadata.json").exists() { - match SimpleSemanticSearch::load(path) { + let is_remote = self.settings.semantic_search.remote_url.is_some() + || std::env::var("CODANNA_EMBED_URL").is_ok(); + let load_result = if is_remote { + SimpleSemanticSearch::load_remote(path) + } else { + SimpleSemanticSearch::load(path) + }; + match load_result { Ok(semantic) => { + // Restore the embedding backend so query-time remote embedding + // works immediately without waiting for a lazy reindex call. + if self.embedding_pool.is_none() { + match build_embedding_backend(&self.settings.semantic_search) { + Ok(b) => self.embedding_pool = Some(Arc::new(b)), + Err(e) => tracing::warn!("Failed to restore embedding backend: {e}"), + } + } + + // Verify dimension and backend kind compatibility. + if let Some(ref pool) = self.embedding_pool { + let backend_dim = pool.dimensions(); + let index_dim = semantic.dimensions(); + + if backend_dim != index_dim { + self.semantic_incompatible = true; + return Err(IndexError::SemanticSearch( + SemanticSearchError::DimensionMismatch { + expected: backend_dim, + actual: index_dim, + suggestion: format!( + "Index was built with {index_dim}-dimensional embeddings \ + but current backend produces {backend_dim}d. \ + Re-index with: codanna index --force" + ), + }, + )); + } + + // Warn when backend kind changed but dimensions happen to match. + // Embedding spaces differ between models so similarity scores may + // be meaningless. Only a --force re-index can fully fix this. + let index_is_remote = semantic.is_remote_index(); + let backend_is_remote = matches!(pool.as_ref(), EmbeddingBackend::Remote(_)); + if index_is_remote != backend_is_remote { + tracing::warn!( + target: "semantic", + "Backend kind changed (index={}, current={}). \ + Embedding spaces may differ — similarity scores could be inaccurate. \ + Re-index with --force to fix.", + if index_is_remote { "remote" } else { "local" }, + if backend_is_remote { "remote" } else { "local" }, + ); + } + } + self.semantic_search = Some(Arc::new(Mutex::new(semantic))); - // Embedding pool is initialized lazily when needed return Ok(true); } + Err(SemanticSearchError::DimensionMismatch { expected, actual, ref suggestion }) => { + // Dimension mismatch: index is structurally incompatible with the + // current backend. Mark this facade so callers do not retry on every + // cycle. The error propagates upward; callers that need the process + // to survive (startup, hot-reload) swallow it and continue text-only. + // Callers that want to fail fast can treat this Err as fatal. + self.semantic_incompatible = true; + tracing::error!( + target: "semantic", + "Semantic index dimension mismatch (expected={expected}, actual={actual}): {suggestion}" + ); + return Err(IndexError::SemanticSearch(SemanticSearchError::DimensionMismatch { + expected, + actual, + suggestion: suggestion.to_string(), + })); + } Err(e) => { - tracing::warn!("Failed to load semantic search: {e}"); + // Other errors (missing file, corrupt data) — warn and continue + // without semantic search rather than blocking startup. + tracing::warn!("Failed to load semantic search, continuing without it: {e}"); } } } Ok(false) } - /// Ensure embedding pool is initialized for generating new embeddings. + /// Ensure embedding backend is initialized for generating new embeddings. /// /// Called lazily by methods that need to compute embeddings (reindexing, watcher). pub fn ensure_embedding_pool(&mut self) -> FacadeResult<()> { @@ -231,13 +323,9 @@ impl IndexFacade { return Ok(()); } - let model = &self.settings.semantic_search.model; - let pool_size = self.settings.semantic_search.embedding_threads; - let embedding_model = crate::vector::parse_embedding_model(model) - .map_err(|e| IndexError::General(format!("Failed to parse embedding model: {e}")))?; - let pool = EmbeddingPool::new(pool_size, embedding_model)?; - self.embedding_pool = Some(Arc::new(pool)); - tracing::debug!("Initialized embedding pool for incremental updates"); + let backend = build_embedding_backend(&self.settings.semantic_search)?; + self.embedding_pool = Some(Arc::new(backend)); + tracing::debug!("Initialized embedding backend for incremental updates"); Ok(()) } @@ -727,7 +815,23 @@ impl IndexFacade { .ok_or(IndexError::SemanticSearchNotEnabled)?; let sem = semantic.lock().map_err(|_| IndexError::lock_error())?; - let results = sem.search_with_language(query, limit, language_filter)?; + + // When the semantic search has no local model (built with remote embeddings), + // generate the query vector via the embedding backend regardless of whether + // the backend is currently remote or local — the pool just needs to produce + // a vector of the right dimension. + let results = if sem.has_local_model() { + sem.search_with_language(query, limit, language_filter)? + } else { + let pool = self.embedding_pool.as_ref().ok_or_else(|| { + IndexError::General( + "Remote-mode index requires an embedding backend for queries. \ + Set CODANNA_EMBED_URL or re-index with a local model.".to_string(), + ) + })?; + let query_vec = pool.embed_one(query)?; + sem.search_with_embedding_and_language(&query_vec, limit, language_filter)? + }; let mut symbols = Vec::new(); for (symbol_id, score) in results { @@ -1096,3 +1200,78 @@ impl IndexFacade { Ok(()) } } + +// ── Embedding backend factory ────────────────────────────────────────────── + +/// Resolve the embedding backend from config + env var overrides. +/// +/// Env vars take precedence over settings.toml: +/// CODANNA_EMBED_URL — remote endpoint (enables remote mode) +/// CODANNA_EMBED_MODEL — model name for remote server +/// CODANNA_EMBED_DIM — expected output dimension +/// Resolve the effective remote model name using the same env-var-first priority +/// as `build_embedding_backend`. Call this when populating `new_empty` so the +/// stored metadata model name matches the model that actually generated the vectors. +/// Resolve the effective remote model name, applying env-var-first precedence. +/// +/// Both `build_embedding_backend` and `new_empty` call sites use this so that +/// the model name embedded in saved metadata always matches what the backend uses. +pub fn resolve_remote_model_name(cfg: &crate::config::SemanticSearchConfig) -> String { + std::env::var("CODANNA_EMBED_MODEL") + .ok() + .or_else(|| cfg.remote_model.clone()) + .unwrap_or_else(|| "text-embedding-ada-002".to_string()) +} + +pub fn build_embedding_backend( + cfg: &crate::config::SemanticSearchConfig, +) -> FacadeResult { + // Env vars override config file + let remote_url = std::env::var("CODANNA_EMBED_URL") + .ok() + .or_else(|| cfg.remote_url.clone()); + + if let Some(url) = remote_url { + let model = resolve_remote_model_name(cfg); + + let dim: Option = match std::env::var("CODANNA_EMBED_DIM") { + Ok(s) => { + let parsed = s.parse::().map_err(|_| { + IndexError::General(format!( + "CODANNA_EMBED_DIM must be a positive integer, got: {s:?}" + )) + })?; + if parsed == 0 { + return Err(IndexError::General( + "CODANNA_EMBED_DIM must be greater than zero".to_string(), + )); + } + Some(parsed) + } + Err(_) => cfg.remote_dim, + }; + + tracing::info!( + target: "semantic", + "Using remote embedding backend: url={url} model={model}" + ); + + let url_owned = url.clone(); + let model_owned = model.clone(); + let embedder = run_async(async move { + RemoteEmbedder::new(&url_owned, &model_owned, dim).await + }) + .map_err(|e| IndexError::General(format!("Remote embedder init failed: {e}")))?; + + return Ok(EmbeddingBackend::Remote(Arc::new(embedder))); + } + + // Local fastembed pool + let pool_size = cfg.embedding_threads; + let embedding_model = crate::vector::parse_embedding_model(&cfg.model) + .map_err(|e| IndexError::General(format!("Failed to parse embedding model: {e}")))?; + let pool = EmbeddingPool::new(pool_size, embedding_model) + .map_err(|e| IndexError::General(format!("Local embedding pool init failed: {e}")))?; + + Ok(EmbeddingBackend::Local(pool)) +} diff --git a/src/indexing/pipeline/mod.rs b/src/indexing/pipeline/mod.rs index c3d8c3e8..df177bc6 100644 --- a/src/indexing/pipeline/mod.rs +++ b/src/indexing/pipeline/mod.rs @@ -863,7 +863,7 @@ impl Pipeline { path: &Path, index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, ) -> PipelineResult { let start = Instant::now(); let semantic_path = self.settings.index_path.join("semantic"); @@ -974,7 +974,7 @@ impl Pipeline { // Generate embeddings let embeddings = pool.embed_parallel(&items); - // Store in semantic search + // store_embeddings warns internally on any dropped embeddings. if !embeddings.is_empty() { if let Ok(mut guard) = sem.lock() { guard.store_embeddings(embeddings); @@ -1030,7 +1030,7 @@ impl Pipeline { root: &Path, index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, force: bool, ) -> PipelineResult { self.index_incremental_with_progress(root, index, semantic, embedding_pool, force, None) @@ -1045,7 +1045,7 @@ impl Pipeline { root: &Path, index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, force: bool, show_progress: bool, total_files: usize, @@ -1283,7 +1283,7 @@ impl Pipeline { root: &Path, index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, force: bool, progress: Option>, ) -> PipelineResult { @@ -1439,7 +1439,7 @@ impl Pipeline { files: &[PathBuf], index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, progress: Option>, ) -> PipelineResult<(IndexStats, Vec, SymbolLookupCache)> { if files.is_empty() { @@ -1659,7 +1659,7 @@ impl Pipeline { root: &Path, index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, semantic_path: &Path, progress: Option>, ) -> PipelineResult { @@ -1761,7 +1761,7 @@ impl Pipeline { root: &Path, index: Arc, semantic: Arc>, - embedding_pool: Option>, + embedding_pool: Option>, progress: Option>, dual_progress: Option>, ) -> PipelineResult { @@ -2175,7 +2175,7 @@ impl Pipeline { config_paths: &[PathBuf], index: Arc, semantic: Option>>, - embedding_pool: Option>, + embedding_pool: Option>, _progress: bool, ) -> PipelineResult { use std::collections::HashSet; diff --git a/src/indexing/pipeline/stages/semantic_embed.rs b/src/indexing/pipeline/stages/semantic_embed.rs index 9ca44210..0de1fec4 100644 --- a/src/indexing/pipeline/stages/semantic_embed.rs +++ b/src/indexing/pipeline/stages/semantic_embed.rs @@ -3,11 +3,11 @@ //! COLLECT ─┬─> EMBED (this stage) ─> SimpleSemanticSearch //! └─> INDEX ─> Tantivy //! -//! Receives EmbeddingBatch from COLLECT, generates embeddings using EmbeddingPool, +//! Receives EmbeddingBatch from COLLECT, generates embeddings using EmbeddingBackend, //! stores them in SimpleSemanticSearch. Runs in parallel with INDEX stage. use crate::indexing::pipeline::types::{EmbeddingBatch, PipelineError, PipelineResult}; -use crate::semantic::{EmbeddingPool, SimpleSemanticSearch}; +use crate::semantic::{EmbeddingBackend, SimpleSemanticSearch}; use crossbeam_channel::Receiver; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -37,19 +37,19 @@ impl SemanticEmbedStats { /// Progress callback type for EMBED stage. pub type EmbedProgressCallback = Arc; -/// Semantic embedding stage using EmbeddingPool. +/// Semantic embedding stage — dispatches to local fastembed pool or remote HTTP backend. /// /// Receives EmbeddingBatch from COLLECT, generates embeddings in parallel, /// stores them in SimpleSemanticSearch. pub struct SemanticEmbedStage { - pool: Arc, + pool: Arc, semantic: Arc>, progress_callback: Option, } impl SemanticEmbedStage { /// Create a new semantic embed stage. - pub fn new(pool: Arc, semantic: Arc>) -> Self { + pub fn new(pool: Arc, semantic: Arc>) -> Self { Self { pool, semantic, @@ -143,18 +143,20 @@ impl SemanticEmbedStage { // Generate embeddings in parallel using pool let embeddings = self.pool.embed_parallel(&items); - let count = embeddings.len(); - // Store in semantic search - if !embeddings.is_empty() { + // Store in semantic search; use the returned count which excludes any + // embeddings dropped due to dimension mismatch (store_embeddings warns). + let stored = if !embeddings.is_empty() { let mut semantic = self.semantic.lock().map_err(|_| PipelineError::Parse { path: std::path::PathBuf::new(), reason: "Failed to lock semantic search".to_string(), })?; - semantic.store_embeddings(embeddings); - } + semantic.store_embeddings(embeddings) + } else { + 0 + }; - Ok(count) + Ok(stored) } } diff --git a/src/semantic/metadata.rs b/src/semantic/metadata.rs index 21c6cc01..ebe48972 100644 --- a/src/semantic/metadata.rs +++ b/src/semantic/metadata.rs @@ -8,12 +8,29 @@ use crate::semantic::SemanticSearchError; use serde::{Deserialize, Serialize}; use std::path::Path; +/// Embedding backend type recorded in metadata. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum EmbeddingBackendKind { + /// Local fastembed model (default for backward compat with old metadata). + #[default] + Local, + /// Remote OpenAI-compatible HTTP endpoint. + Remote, +} + /// Metadata for semantic search persistence #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SemanticMetadata { /// Name of the embedding model used pub model_name: String, + /// Embedding backend type — distinguishes local fastembed from remote HTTP. + /// Defaults to Local for backward compatibility with indexes written before + /// this field was added. + #[serde(default)] + pub backend: EmbeddingBackendKind, + /// Dimension of embeddings pub dimension: usize, @@ -34,11 +51,26 @@ impl SemanticMetadata { /// Current metadata version const CURRENT_VERSION: u32 = 1; - /// Create new metadata with current timestamp + /// Create new metadata for a local fastembed index. pub fn new(model_name: String, dimension: usize, embedding_count: usize) -> Self { let now = get_utc_timestamp(); Self { model_name, + backend: EmbeddingBackendKind::Local, + dimension, + embedding_count, + created_at: now, + updated_at: now, + version: Self::CURRENT_VERSION, + } + } + + /// Create new metadata for a remote-embedding index. + pub fn new_remote(model_name: String, dimension: usize, embedding_count: usize) -> Self { + let now = get_utc_timestamp(); + Self { + model_name, + backend: EmbeddingBackendKind::Remote, dimension, embedding_count, created_at: now, @@ -47,6 +79,11 @@ impl SemanticMetadata { } } + /// Whether this index was built with a remote embedding backend. + pub fn is_remote(&self) -> bool { + self.backend == EmbeddingBackendKind::Remote + } + /// Update the metadata with new embedding count and timestamp pub fn update(&mut self, embedding_count: usize) { self.embedding_count = embedding_count; diff --git a/src/semantic/mod.rs b/src/semantic/mod.rs index 39666be9..fadd71d1 100644 --- a/src/semantic/mod.rs +++ b/src/semantic/mod.rs @@ -5,11 +5,13 @@ mod metadata; mod pool; +pub(crate) mod remote; mod simple; mod storage; -pub use metadata::SemanticMetadata; -pub use pool::EmbeddingPool; +pub use metadata::{EmbeddingBackendKind, SemanticMetadata}; +pub use pool::{EmbeddingBackend, EmbeddingPool}; +pub use remote::RemoteEmbedder; pub use simple::{SemanticSearchError, SimpleSemanticSearch}; pub use storage::SemanticVectorStorage; diff --git a/src/semantic/pool.rs b/src/semantic/pool.rs index 5cd6c90d..3ece6265 100644 --- a/src/semantic/pool.rs +++ b/src/semantic/pool.rs @@ -1,15 +1,115 @@ -//! Embedding model pool for parallel embedding generation +//! Embedding backend abstraction — local fastembed pool or remote HTTP endpoint. //! -//! Provides multiple TextEmbedding instances that can be used concurrently -//! by different threads, enabling parallel embedding generation. +//! `EmbeddingBackend` is the single type the rest of the codebase interacts with. +//! It dispatches to either: +//! - `EmbeddingPool` — local fastembed (parallel, thread-pool based) +//! - `RemoteEmbedder` — OpenAI-compatible HTTP server (async, batched) use crate::SymbolId; use crossbeam_channel::{Receiver, Sender, bounded}; use fastembed::{EmbeddingModel, InitOptions, TextEmbedding}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use super::remote::{run_async, RemoteEmbedder}; use super::SemanticSearchError; +// ── EmbeddingBackend ─────────────────────────────────────────────────────── + +/// Unified embedding backend — wraps either a local fastembed pool or a remote +/// HTTP embedder. All callers use this type; the backend is chosen at startup +/// based on configuration. +pub enum EmbeddingBackend { + Local(EmbeddingPool), + Remote(Arc), +} + +impl EmbeddingBackend { + /// Output dimension of this backend's embeddings. + pub fn dimensions(&self) -> usize { + match self { + EmbeddingBackend::Local(pool) => pool.dimensions(), + EmbeddingBackend::Remote(r) => r.dim(), + } + } + + /// Log usage statistics (no-op for remote backend). + pub fn log_usage_stats(&self) { + if let EmbeddingBackend::Local(pool) = self { + pool.log_usage_stats(); + } + } + + /// Model name / URL for metadata and logging. + pub fn model_name(&self) -> &str { + match self { + EmbeddingBackend::Local(pool) => pool.model_name(), + EmbeddingBackend::Remote(_) => "remote", + } + } + + /// Embed a single text synchronously. + /// Remote backend blocks the calling thread via `tokio::task::block_in_place`. + pub fn embed_one(&self, text: &str) -> Result, SemanticSearchError> { + match self { + EmbeddingBackend::Local(pool) => pool.embed_one(text), + EmbeddingBackend::Remote(r) => { + let r = Arc::clone(r); + let text = text.to_string(); + run_async(async move { + let results = r.embed(&[text]).await?; + results.into_iter().next().ok_or_else(|| { + SemanticSearchError::EmbeddingError("Remote embed returned empty".into()) + }) + }) + } + } + } + + /// Embed multiple items in parallel (local) or batched async (remote). + pub fn embed_parallel( + &self, + items: &[(SymbolId, &str, &str)], + ) -> Vec<(SymbolId, Vec, String)> { + match self { + EmbeddingBackend::Local(pool) => pool.embed_parallel(items), + EmbeddingBackend::Remote(r) => { + let r = Arc::clone(r); + let texts: Vec = items.iter().map(|(_, t, _)| t.to_string()).collect(); + let dim = r.dim(); + + let embeddings = run_async(async move { r.embed(&texts).await }); + + match embeddings { + Ok(embs) => { + embs.into_iter() + .zip(items.iter()) + .filter_map(|(emb, (id, _, lang))| { + if emb.len() == dim { + Some((*id, emb, (*lang).to_string())) + } else { + tracing::warn!( + target: "semantic", + "Remote dim mismatch for {}: expected {dim}, got {}", + id.to_u32(), emb.len() + ); + None + } + }) + .collect() + } + Err(e) => { + tracing::error!(target: "semantic", "Remote embed_parallel failed: {e}"); + Vec::new() + } + } + } + } + } +} + +// ── EmbeddingPool (local fastembed) ─────────────────────────────────────── + /// Model instance with an ID for tracking struct ModelInstance { model: TextEmbedding, @@ -21,28 +121,15 @@ struct ModelInstance { /// Each model instance is expensive (~86MB), but having multiple allows /// true parallel embedding generation with rayon. pub struct EmbeddingPool { - /// Channel to acquire models from the pool model_sender: Sender, model_receiver: Receiver, - /// Number of models in the pool pool_size: usize, - /// Model dimensions (all models have same dimensions) dimensions: usize, - /// Model name for metadata model_name: String, - /// Usage counters per model instance (for tracing) usage_counters: Vec, } impl EmbeddingPool { - /// Create a new embedding pool with the specified number of model instances. - /// - /// # Arguments - /// * `pool_size` - Number of TextEmbedding instances to create - /// * `model` - The embedding model to use - /// - /// # Note - /// Each model instance uses ~86MB of memory for AllMiniLML6V2. pub fn new(pool_size: usize, model: EmbeddingModel) -> Result { let pool_size = pool_size.max(1); let (sender, receiver) = bounded(pool_size); @@ -56,17 +143,14 @@ impl EmbeddingPool { ); let mut dimensions = 0; - - // Create usage counters for each model let usage_counters: Vec = (0..pool_size).map(|_| AtomicUsize::new(0)).collect(); - // Create pool_size model instances for i in 0..pool_size { let mut text_model = TextEmbedding::try_new( InitOptions::new(model.clone()) .with_cache_dir(cache_dir.clone()) - .with_show_download_progress(i == 0), // Only show progress for first model + .with_show_download_progress(i == 0), ) .map_err(|e| { SemanticSearchError::ModelInitError(format!( @@ -76,7 +160,6 @@ impl EmbeddingPool { )) })?; - // Get dimensions from first model if i == 0 { let test_embedding = text_model .embed(vec!["test"], None) @@ -84,12 +167,8 @@ impl EmbeddingPool { dimensions = test_embedding.into_iter().next().unwrap().len(); } - let instance = ModelInstance { - model: text_model, - id: i, - }; sender - .send(instance) + .send(ModelInstance { model: text_model, id: i }) .expect("Pool channel should not be closed"); } @@ -108,52 +187,35 @@ impl EmbeddingPool { }) } - /// Create a pool with default model (AllMiniLML6V2) pub fn with_size(pool_size: usize) -> Result { Self::new(pool_size, EmbeddingModel::AllMiniLML6V2) } - /// Acquire a model from the pool (blocks if none available) fn acquire(&self) -> ModelInstance { - let instance = self - .model_receiver - .recv() - .expect("Pool should not be empty"); - - // Increment usage counter for this model + let instance = self.model_receiver.recv().expect("Pool should not be empty"); self.usage_counters[instance.id].fetch_add(1, Ordering::Relaxed); - instance } - /// Return a model to the pool fn release(&self, instance: ModelInstance) { let _ = self.model_sender.send(instance); } - /// Get the embedding dimensions pub fn dimensions(&self) -> usize { self.dimensions } - /// Get the pool size pub fn pool_size(&self) -> usize { self.pool_size } - /// Get the model name pub fn model_name(&self) -> &str { &self.model_name } - /// Generate embedding for a single document using a pooled model. - /// - /// Thread-safe: acquires model, generates embedding, returns model. pub fn embed_one(&self, text: &str) -> Result, SemanticSearchError> { if text.trim().is_empty() { - return Err(SemanticSearchError::EmbeddingError( - "Empty text".to_string(), - )); + return Err(SemanticSearchError::EmbeddingError("Empty text".to_string())); } let mut instance = self.acquire(); @@ -166,7 +228,6 @@ impl EmbeddingPool { result.map(|mut v| v.remove(0)) } - /// Log usage statistics for all model instances. pub fn log_usage_stats(&self) { let counts: Vec = self .usage_counters @@ -181,7 +242,6 @@ impl EmbeddingPool { .enumerate() .map(|(i, c)| format!("model[{i}]={c}")) .collect(); - tracing::info!( target: "semantic", "Embedding pool usage: {} (total: {total})", @@ -190,21 +250,14 @@ impl EmbeddingPool { } } - /// Generate embeddings for multiple documents in parallel using rayon. - /// - /// Uses batched embedding (64 docs per model call) for optimal throughput. - /// Returns a Vec of (SymbolId, embedding, language) for successful embeddings. - /// Failed embeddings are logged and skipped. pub fn embed_parallel( &self, items: &[(SymbolId, &str, &str)], ) -> Vec<(SymbolId, Vec, String)> { use rayon::prelude::*; - // Optimal batch size for embedding models (matches benchmark) const BATCH_SIZE: usize = 64; - // Filter out empty docs first let valid_items: Vec<_> = items .iter() .filter(|(_, doc, _)| !doc.trim().is_empty()) @@ -214,20 +267,16 @@ impl EmbeddingPool { return Vec::new(); } - // Process in batches of 64, parallelized across available model instances let results: Vec<_> = valid_items .chunks(BATCH_SIZE) .par_bridge() .flat_map(|batch| { - // Collect texts for batch embedding let texts: Vec<&str> = batch.iter().map(|(_, doc, _)| *doc).collect(); - // Acquire model, embed entire batch, release model let mut instance = self.acquire(); let embeddings_result = instance.model.embed(texts.clone(), None); self.release(instance); - // Process results match embeddings_result { Ok(embeddings) => { let mut results = Vec::with_capacity(batch.len()); @@ -248,20 +297,14 @@ impl EmbeddingPool { results } Err(e) => { - tracing::warn!( - target: "semantic", - "Batch embedding failed: {}", - e - ); + tracing::warn!(target: "semantic", "Batch embedding failed: {e}"); Vec::new() } } }) .collect(); - // Log usage stats after parallel embedding self.log_usage_stats(); - results } } @@ -275,21 +318,18 @@ mod tests { fn test_pool_creation() { let pool = EmbeddingPool::with_size(2).unwrap(); assert_eq!(pool.pool_size(), 2); - assert_eq!(pool.dimensions(), 384); // AllMiniLML6V2 + assert_eq!(pool.dimensions(), 384); } #[test] #[ignore = "Downloads 86MB model - run with --ignored"] fn test_parallel_embedding() { let pool = EmbeddingPool::with_size(2).unwrap(); - let items = vec![ (SymbolId::new(1).unwrap(), "Parse JSON data", "rust"), (SymbolId::new(2).unwrap(), "Connect to database", "rust"), - (SymbolId::new(3).unwrap(), "Calculate hash", "rust"), ]; - let results = pool.embed_parallel(&items); - assert_eq!(results.len(), 3); + assert_eq!(results.len(), 2); } } diff --git a/src/semantic/remote.rs b/src/semantic/remote.rs new file mode 100644 index 00000000..b057084c --- /dev/null +++ b/src/semantic/remote.rs @@ -0,0 +1,232 @@ +//! Remote embedding backend — OpenAI-compatible HTTP endpoint. +//! +//! Replaces local fastembed when `semantic_search.remote_url` is configured +//! (or the `CODANNA_EMBED_URL` environment variable is set). +//! +//! Compatible with Infinity, OpenAI, vLLM, and any server that serves +//! POST /v1/embeddings with the OpenAI request/response schema. + +use std::future::Future; +use std::time::Duration; + +use reqwest::Client; + +/// Run an async future from a sync context. +/// +/// `block_in_place` requires a multi-thread Tokio runtime. If the current +/// runtime is single-threaded (or there is no runtime), we fall back to +/// spawning a temporary `current_thread` runtime on this thread instead. +pub(crate) fn run_async(f: F) -> T +where + F: Future + Send + 'static, + T: Send, +{ + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + // block_in_place is valid only on multi-thread schedulers. + // Detect single-thread by attempting a spawn; if it would block, + // we have a multi-thread runtime and can use block_in_place. + if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread { + tokio::task::block_in_place(|| handle.block_on(f)) + } else { + // Current-thread runtime: we cannot block_in_place. + // Spawn a sibling thread with its own runtime instead. + std::thread::scope(|s| { + s.spawn(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime") + .block_on(f) + }) + .join() + .expect("async worker thread panicked") + }) + } + } + Err(_) => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime") + .block_on(f), + } +} +use serde::{Deserialize, Serialize}; + +use super::SemanticSearchError; + +// ── Request / Response types ─────────────────────────────────────────────── + +#[derive(Serialize)] +struct EmbedRequest<'a> { + model: &'a str, + input: &'a [String], +} + +#[derive(Deserialize)] +struct EmbedResponse { + data: Vec, +} + +#[derive(Deserialize)] +struct EmbedData { + index: usize, + embedding: Vec, +} + +// ── RemoteEmbedder ───────────────────────────────────────────────────────── + +/// Embedding client for an OpenAI-compatible HTTP server. +/// +/// All requests are batched in chunks of `BATCH_SIZE` to avoid hitting +/// server request-size limits. Each request has a 30-second timeout. +#[derive(Clone)] +pub struct RemoteEmbedder { + client: Client, + url: String, + model: String, + dim: usize, +} + +const BATCH_SIZE: usize = 64; +const REQUEST_TIMEOUT_SECS: u64 = 30; +const MAX_TEXT_CHARS: usize = 2000; + +impl RemoteEmbedder { + /// Build a RemoteEmbedder, probing the server to confirm the dimension + /// matches `expected_dim` when provided. + pub async fn new( + base_url: &str, + model: &str, + expected_dim: Option, + ) -> Result { + let client = Client::builder() + .timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)) + .build() + .map_err(|e| SemanticSearchError::ModelInitError(format!("HTTP client build failed: {e}")))?; + + let url = format!("{}/v1/embeddings", base_url.trim_end_matches('/')); + + // Probe with a single text to determine / validate dimension + let probe = Self::request(&client, &url, model, &["probe".to_string()]).await?; + let actual_dim = probe + .first() + .map(|v| v.len()) + .ok_or_else(|| SemanticSearchError::ModelInitError("Remote server returned empty embedding on probe".into()))?; + + if let Some(expected) = expected_dim { + if actual_dim != expected { + return Err(SemanticSearchError::ModelInitError(format!( + "Remote embedding dim mismatch: expected {expected}, server returned {actual_dim}" + ))); + } + } + + tracing::info!( + target: "semantic", + "Remote embedding backend ready: url={url} model={model} dim={actual_dim}" + ); + + Ok(Self { + client, + url, + model: model.to_string(), + dim: actual_dim, + }) + } + + /// Output dimension of this embedding model. + pub fn dim(&self) -> usize { + self.dim + } + + /// Embed a batch of texts, truncating each to `MAX_TEXT_CHARS` characters. + /// Sends requests in chunks of `BATCH_SIZE`. + pub async fn embed(&self, texts: &[String]) -> Result>, SemanticSearchError> { + let mut results: Vec<(usize, Vec)> = Vec::with_capacity(texts.len()); + + // Truncate by char count, not bytes, to avoid splitting multi-byte codepoints. + let truncated: Vec = texts + .iter() + .map(|t| { + if t.chars().count() > MAX_TEXT_CHARS { + t.chars().take(MAX_TEXT_CHARS).collect() + } else { + t.clone() + } + }) + .collect(); + + for (chunk_start, chunk) in truncated.chunks(BATCH_SIZE).enumerate() { + let embeddings = Self::request(&self.client, &self.url, &self.model, chunk).await?; + + if embeddings.len() != chunk.len() { + return Err(SemanticSearchError::EmbeddingError(format!( + "Remote server returned {} embeddings for {} inputs", + embeddings.len(), + chunk.len() + ))); + } + + for (i, emb) in embeddings.into_iter().enumerate() { + if emb.len() != self.dim { + return Err(SemanticSearchError::EmbeddingError(format!( + "Remote embedding at index {} has dim {}, expected {}", + chunk_start * BATCH_SIZE + i, + emb.len(), + self.dim + ))); + } + results.push((chunk_start * BATCH_SIZE + i, emb)); + } + } + + // Sort by original index and return in order + results.sort_by_key(|(i, _)| *i); + Ok(results.into_iter().map(|(_, emb)| emb).collect()) + } + + async fn request( + client: &Client, + url: &str, + model: &str, + texts: &[String], + ) -> Result>, SemanticSearchError> { + let body = EmbedRequest { model, input: texts }; + + let resp = client + .post(url) + .json(&body) + .send() + .await + .map_err(|e| SemanticSearchError::EmbeddingError(format!("Remote embed request failed: {e}")))?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + return Err(SemanticSearchError::EmbeddingError(format!( + "Remote embed server returned {status}: {text}" + ))); + } + + let parsed: EmbedResponse = resp + .json() + .await + .map_err(|e| SemanticSearchError::EmbeddingError(format!("Failed to parse embed response: {e}")))?; + + // Sort by index and validate contiguous range [0, len) + let mut data = parsed.data; + data.sort_by_key(|d| d.index); + + for (expected, d) in data.iter().enumerate() { + if d.index != expected { + return Err(SemanticSearchError::EmbeddingError(format!( + "Remote embed response has non-contiguous index: expected {expected}, got {}", + d.index + ))); + } + } + + Ok(data.into_iter().map(|d| d.embedding).collect()) + } +} diff --git a/src/semantic/simple.rs b/src/semantic/simple.rs index 6ad0b5a1..1524e194 100644 --- a/src/semantic/simple.rs +++ b/src/semantic/simple.rs @@ -45,8 +45,9 @@ pub struct SimpleSemanticSearch { /// Language mapping for each symbol (for language-filtered search) symbol_languages: HashMap, - /// The embedding model (wrapped in Mutex for interior mutability) - model: Mutex, + /// The embedding model for query-time embedding (None in remote mode — caller + /// must use `search_with_embedding` and provide the query vector externally). + model: Option>, /// Model dimensions for validation dimensions: usize, @@ -141,7 +142,7 @@ impl SimpleSemanticSearch { Ok(Self { embeddings: HashMap::new(), symbol_languages: HashMap::new(), - model: Mutex::new(text_model), + model: Some(Mutex::new(text_model)), dimensions, metadata: Some(metadata), }) @@ -158,9 +159,13 @@ impl SimpleSemanticSearch { return Ok(()); } - // Generate embedding - let embeddings = self - .model + // Generate embedding — only available in local-model mode + let model = self.model.as_ref().ok_or_else(|| { + SemanticSearchError::ModelInitError( + "No local model — use EmbeddingBackend to generate embeddings in remote mode".to_string(), + ) + })?; + let embeddings = model .lock() .unwrap() .embed(vec![doc], None) @@ -200,24 +205,117 @@ impl SimpleSemanticSearch { Ok(()) } - /// Store pre-generated embeddings (from parallel pool generation). - /// - /// Used when embeddings are generated in parallel by EmbeddingPool. + /// Store pre-generated embeddings produced by an `EmbeddingBackend`. pub fn store_embeddings(&mut self, items: Vec<(SymbolId, Vec, String)>) -> usize { let mut count = 0; + let mut dropped = 0usize; for (symbol_id, embedding, language) in items { if embedding.len() == self.dimensions { self.embeddings.insert(symbol_id, embedding); self.symbol_languages.insert(symbol_id, language); count += 1; + } else { + dropped += 1; } } + if dropped > 0 { + // This typically means the backend dimension changed without a --force re-index. + tracing::warn!( + target: "semantic", + "store_embeddings dropped {dropped} embeddings due to dimension mismatch \ + (index={}, received=?). Re-index with --force to fix.", + self.dimensions + ); + } count } /// Search for similar documentation using a natural language query /// /// Returns symbol IDs with their similarity scores, sorted by score descending + /// Search using a pre-computed query embedding vector. + /// + /// Use this in remote-embedding mode where the caller obtains the query + /// vector via `EmbeddingBackend::embed_one` before calling this method. + pub fn search_with_embedding( + &self, + query_embedding: &[f32], + limit: usize, + threshold: f32, + ) -> Result, SemanticSearchError> { + if self.embeddings.is_empty() { + return Err(SemanticSearchError::NoEmbeddings); + } + if query_embedding.len() != self.dimensions { + return Err(SemanticSearchError::EmbeddingError(format!( + "Query embedding dimension {} does not match index dimension {}", + query_embedding.len(), + self.dimensions + ))); + } + let mut similarities: Vec<(SymbolId, f32)> = self + .embeddings + .iter() + .filter_map(|(id, emb)| { + let sim = cosine_similarity(query_embedding, emb); + if sim >= threshold { Some((*id, sim)) } else { None } + }) + .collect(); + similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); + similarities.truncate(limit); + Ok(similarities) + } + + /// Search using a pre-computed query vector with optional language pre-filtering. + /// + /// Language filtering is applied before similarity ranking so the result slice + /// respects `limit` after filtering, matching the behaviour of `search_with_language`. + /// Convenience wrapper: `search_with_embedding` + threshold filter. + pub fn search_with_embedding_threshold( + &self, + query_embedding: &[f32], + limit: usize, + threshold: f32, + ) -> Result, SemanticSearchError> { + let results = self.search_with_embedding(query_embedding, limit, threshold)?; + Ok(results.into_iter().filter(|(_, s)| *s >= threshold).collect()) + } + + pub fn search_with_embedding_and_language( + &self, + query_embedding: &[f32], + limit: usize, + language: Option<&str>, + ) -> Result, SemanticSearchError> { + if self.embeddings.is_empty() { + return Err(SemanticSearchError::NoEmbeddings); + } + if query_embedding.len() != self.dimensions { + return Err(SemanticSearchError::EmbeddingError(format!( + "Query embedding dimension {} does not match index dimension {}", + query_embedding.len(), + self.dimensions + ))); + } + let candidates: Vec<(&SymbolId, &Vec)> = if let Some(lang) = language { + self.embeddings + .iter() + .filter(|(id, _)| { + self.symbol_languages.get(id).is_some_and(|l| l == lang) + }) + .collect() + } else { + self.embeddings.iter().collect() + }; + let mut similarities: Vec<(SymbolId, f32)> = candidates + .into_iter() + .map(|(id, emb)| (*id, cosine_similarity(query_embedding, emb))) + .collect(); + similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); + similarities.truncate(limit); + Ok(similarities) + } + pub fn search( &self, query: &str, @@ -227,9 +325,14 @@ impl SimpleSemanticSearch { return Err(SemanticSearchError::NoEmbeddings); } + let model = self.model.as_ref().ok_or_else(|| { + SemanticSearchError::ModelInitError( + "No local model available — use search_with_embedding() in remote mode".to_string(), + ) + })?; + // Generate query embedding - let query_embeddings = self - .model + let query_embeddings = model .lock() .unwrap() .embed(vec![query], None) @@ -268,9 +371,14 @@ impl SimpleSemanticSearch { return Err(SemanticSearchError::NoEmbeddings); } + let model = self.model.as_ref().ok_or_else(|| { + SemanticSearchError::ModelInitError( + "No local model available — use search_with_embedding() in remote mode".to_string(), + ) + })?; + // Generate query embedding - let query_embeddings = self - .model + let query_embeddings = model .lock() .unwrap() .embed(vec![query], None) @@ -308,7 +416,11 @@ impl SimpleSemanticSearch { Ok(similarities) } - /// Search with a similarity threshold + /// Search with a similarity threshold. + /// + /// Requires a local embedding model. In remote-embedding mode, use the + /// facade's `semantic_search_docs_with_threshold` which handles backend + /// dispatch, or call `search_with_embedding` with a pre-computed vector. pub fn search_with_threshold( &self, query: &str, @@ -323,6 +435,22 @@ impl SimpleSemanticSearch { } /// Get the number of indexed embeddings + /// Returns true when a local fastembed model is available for query embedding. + /// Returns false for remote-mode instances that require an external backend. + pub fn has_local_model(&self) -> bool { + self.model.is_some() + } + + /// Output dimension of embeddings stored in this index. + pub fn dimensions(&self) -> usize { + self.dimensions + } + + /// Returns true when this index was built with a remote embedding backend. + pub fn is_remote_index(&self) -> bool { + self.metadata.as_ref().is_some_and(|m| m.is_remote()) + } + pub fn embedding_count(&self) -> usize { self.embeddings.len() } @@ -363,15 +491,18 @@ impl SimpleSemanticSearch { suggestion: "Check directory permissions".to_string(), })?; - // Save metadata with actual model name - let model_name = if let Some(ref meta) = self.metadata { - meta.model_name.clone() + let (model_name, is_remote_backend) = if let Some(ref meta) = self.metadata { + (meta.model_name.clone(), meta.is_remote()) } else { - // Fallback to AllMiniLML6V2 if metadata not set (shouldn't happen) - "AllMiniLML6V2".to_string() + // Legacy instance without metadata — infer from model field presence + ("AllMiniLML6V2".to_string(), self.model.is_none()) }; - let metadata = SemanticMetadata::new(model_name, self.dimensions, self.embeddings.len()); + let metadata = if is_remote_backend { + SemanticMetadata::new_remote(model_name, self.dimensions, self.embeddings.len()) + } else { + SemanticMetadata::new(model_name, self.dimensions, self.embeddings.len()) + }; metadata.save(path)?; // Create storage with our dimension @@ -423,12 +554,100 @@ impl SimpleSemanticSearch { /// /// # Arguments /// * `path` - Path where semantic data is stored + /// Create an empty semantic search instance for remote-embedding mode. + /// + /// `model_name` should identify the remote model (e.g. "bge-large-en-v1.5") + /// so it is preserved in saved metadata and visible in status output. + /// No local fastembed model is loaded. Queries must use `search_with_embedding`. + pub fn new_empty(dimensions: usize, model_name: &str) -> Self { + let metadata = crate::semantic::SemanticMetadata::new_remote( + model_name.to_string(), + dimensions, + 0, + ); + Self { + embeddings: HashMap::new(), + symbol_languages: HashMap::new(), + model: None, + dimensions, + metadata: Some(metadata), + } + } + + /// Load an existing semantic index without initialising a local embedding model. + /// + /// Used in remote-embedding mode: stored vectors are loaded for similarity + /// search but query embedding is handled externally via `search_with_embedding`. + pub fn load_remote(path: &Path) -> Result { + use crate::semantic::{SemanticMetadata, SemanticVectorStorage}; + + let metadata = SemanticMetadata::load(path)?; + let mut storage = SemanticVectorStorage::open(path)?; + + // Verify storage dimension matches metadata to catch corrupted indexes. + // Without this, cosine_similarity would silently zip vectors of mismatched + // lengths, producing garbage similarity scores. + if storage.dimension().get() != metadata.dimension { + return Err(SemanticSearchError::DimensionMismatch { + expected: metadata.dimension, + actual: storage.dimension().get(), + suggestion: format!( + "Remote index was built with {}-dimensional embeddings but storage has {}. Re-index with: codanna index --force", + metadata.dimension, + storage.dimension().get() + ), + }); + } + + let embeddings_vec = storage.load_all()?; + let mut embeddings = HashMap::with_capacity(embeddings_vec.len()); + for (id, embedding) in embeddings_vec { + embeddings.insert(id, embedding); + } + + let languages_path = path.join("languages.json"); + let symbol_languages = if languages_path.exists() { + let languages_json = std::fs::read_to_string(&languages_path).map_err(|e| { + SemanticSearchError::StorageError { + message: format!("Failed to read language mappings: {e}"), + suggestion: "Language mappings file may be corrupted".to_string(), + } + })?; + let languages_map: HashMap = serde_json::from_str(&languages_json) + .map_err(|e| SemanticSearchError::StorageError { + message: format!("Failed to parse language mappings: {e}"), + suggestion: "Try rebuilding the semantic index".to_string(), + })?; + languages_map + .into_iter() + .filter_map(|(id, lang)| SymbolId::new(id).map(|sid| (sid, lang))) + .collect() + } else { + HashMap::new() + }; + + Ok(Self { + embeddings, + symbol_languages, + model: None, // no local model — caller uses search_with_embedding() + dimensions: metadata.dimension, + metadata: Some(metadata), + }) + } + pub fn load(path: &Path) -> Result { use crate::semantic::{SemanticMetadata, SemanticVectorStorage}; // Load metadata first let metadata = SemanticMetadata::load(path)?; + // Delegate to load_remote for indexes explicitly built with a remote backend. + // The backend field defaults to Local for old metadata without this field, + // preserving backward compatibility. + if metadata.is_remote() { + return Self::load_remote(path); + } + // Parse model name from metadata let model = crate::vector::parse_embedding_model(&metadata.model_name) .map_err(|e| SemanticSearchError::StorageError { @@ -511,7 +730,7 @@ impl SimpleSemanticSearch { Ok(Self { embeddings, symbol_languages, - model: Mutex::new(text_model), + model: Some(Mutex::new(text_model)), dimensions: metadata.dimension, metadata: Some(metadata), }) diff --git a/src/storage/persistence.rs b/src/storage/persistence.rs index 304831be..2bcdcc0f 100644 --- a/src/storage/persistence.rs +++ b/src/storage/persistence.rs @@ -106,6 +106,15 @@ impl IndexPersistence { Ok(false) => { tracing::debug!("[persistence] no semantic data found (this is optional)"); } + Err(IndexError::SemanticSearch(crate::semantic::SemanticSearchError::DimensionMismatch { ref suggestion, .. })) => { + // Semantic index is structurally incompatible with the current backend. + // Log at error level so it is visible, but continue without semantic + // search rather than failing the whole facade load and discarding the + // valid text index. + tracing::error!( + "[persistence] semantic search disabled — index incompatible: {suggestion}" + ); + } Err(e) => { tracing::warn!("[persistence] failed to load semantic search: {e}"); } diff --git a/src/watcher/hot_reload.rs b/src/watcher/hot_reload.rs index 7411f857..52718835 100644 --- a/src/watcher/hot_reload.rs +++ b/src/watcher/hot_reload.rs @@ -125,7 +125,7 @@ impl HotReloadWatcher { // Ensure semantic search stays attached after hot reloads let mut restored_semantic = false; - if !facade_guard.has_semantic_search() { + if !facade_guard.has_semantic_search() && !facade_guard.is_semantic_incompatible() { let semantic_path = self.index_path.join("semantic"); let metadata_exists = semantic_path.join("metadata.json").exists(); if metadata_exists { @@ -139,6 +139,16 @@ impl HotReloadWatcher { "semantic metadata present but reload returned false" ); } + Err(crate::IndexError::SemanticSearch( + crate::semantic::SemanticSearchError::DimensionMismatch { + ref suggestion, .. + }, + )) => { + warn!( + "Semantic index dimension mismatch after hot-reload: {suggestion}. \ + Semantic search disabled until re-indexed with --force." + ); + } Err(e) => { warn!("Failed to reload semantic search after index update: {e}"); }