diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 00000000000..c124dbac81f --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"23aa895c-8c08-462b-ae20-e4ad399e466f","pid":1,"procStart":"52028919","acquiredAt":1782134409076} \ No newline at end of file diff --git a/rust/lance-index/benches/geo.rs b/rust/lance-index/benches/geo.rs index a9c403cc6bf..078aaefc6fb 100644 --- a/rust/lance-index/benches/geo.rs +++ b/rust/lance-index/benches/geo.rs @@ -15,7 +15,7 @@ use geoarrow_schema::Dimension; use lance_core::cache::LanceCache; use lance_core::{Error, ROW_ID}; use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::scalar::registry::ScalarIndexPlugin; +use lance_index::scalar::registry::BasicTrainer; use lance_index::scalar::rtree::{BoundingBox, RTreeIndex, RTreeIndexPlugin, RTreeTrainingRequest}; use lance_index::scalar::{GeoQuery, RelationQuery, ScalarIndex}; use lance_io::object_store::ObjectStore; diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index c2a6e80e82b..29f577d3f35 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -48,7 +48,7 @@ use crate::{ CreatedIndex, UpdateCriteria, expression::SargableQueryParser, registry::{ - ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME, }, }, @@ -1676,11 +1676,7 @@ pub async fn merge_bitmap_indices( } #[async_trait] -impl ScalarIndexPlugin for BitmapIndexPlugin { - fn name(&self) -> &str { - "Bitmap" - } - +impl BasicTrainer for BitmapIndexPlugin { fn new_training_request( &self, params: &str, @@ -1699,26 +1695,6 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { Ok(Box::new(BitmapTrainingRequest::new(params))) } - fn provides_exact_answer(&self) -> bool { - true - } - - fn version(&self) -> u32 { - BITMAP_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - Some(Box::new(SargableQueryParser::new( - index_name, - self.name().to_string(), - false, - ))) - } - async fn train_index( &self, data: SendableRecordBatchStream, @@ -1760,6 +1736,37 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { files: vec![file], }) } +} + +#[async_trait] +impl ScalarIndexPlugin for BitmapIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "Bitmap" + } + + fn provides_exact_answer(&self) -> bool { + true + } + + fn version(&self) -> u32 { + BITMAP_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(SargableQueryParser::new( + index_name, + self.name().to_string(), + false, + ))) + } /// Load an index from storage async fn load_index( diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 856f08af772..b0fd4d9abe8 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -9,7 +9,7 @@ use crate::scalar::expression::{BloomFilterQueryParser, ScalarQueryParser}; use crate::scalar::registry::{ - ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, }; use crate::scalar::{ BloomFilterQuery, BuiltinIndexType, CreatedIndex, IndexFile, ScalarIndexParams, UpdateCriteria, @@ -995,11 +995,7 @@ impl BloomFilterIndexPlugin { } #[async_trait] -impl ScalarIndexPlugin for BloomFilterIndexPlugin { - fn name(&self) -> &str { - "BloomFilter" - } - +impl BasicTrainer for BloomFilterIndexPlugin { fn new_training_request( &self, params: &str, @@ -1082,6 +1078,13 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin { files: vec![file], }) } +} + +#[async_trait] +impl ScalarIndexPlugin for BloomFilterIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } fn provides_exact_answer(&self) -> bool { false @@ -1091,6 +1094,10 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin { BLOOMFILTER_INDEX_VERSION } + fn name(&self) -> &str { + "BloomFilter" + } + fn new_query_parser( &self, index_name: String, diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 85c42e9b048..3d13b2b7400 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -23,7 +23,9 @@ use crate::{ scalar::{ CreatedIndex, UpdateCriteria, expression::{SargableQueryParser, ScalarQueryParser}, - registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME}, + registry::{ + BasicTrainer, ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME, + }, }, }; use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria}; @@ -3202,11 +3204,7 @@ impl TrainingRequest for BTreeTrainingRequest { pub struct BTreeIndexPlugin; #[async_trait] -impl ScalarIndexPlugin for BTreeIndexPlugin { - fn name(&self) -> &str { - "BTree" - } - +impl BasicTrainer for BTreeIndexPlugin { fn new_training_request( &self, params: &str, @@ -3222,26 +3220,6 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { Ok(Box::new(BTreeTrainingRequest::new(params))) } - fn provides_exact_answer(&self) -> bool { - true - } - - fn version(&self) -> u32 { - BTREE_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - Some(Box::new(SargableQueryParser::new( - index_name, - self.name().to_string(), - false, - ))) - } - async fn train_index( &self, data: SendableRecordBatchStream, @@ -3286,6 +3264,37 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { files, }) } +} + +#[async_trait] +impl ScalarIndexPlugin for BTreeIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "BTree" + } + + fn provides_exact_answer(&self) -> bool { + true + } + + fn version(&self) -> u32 { + BTREE_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(SargableQueryParser::new( + index_name, + self.name().to_string(), + false, + ))) + } async fn load_index( &self, diff --git a/rust/lance-index/src/scalar/fmindex.rs b/rust/lance-index/src/scalar/fmindex.rs index cdf19f0304c..3a4c3b1629a 100644 --- a/rust/lance-index/src/scalar/fmindex.rs +++ b/rust/lance-index/src/scalar/fmindex.rs @@ -39,8 +39,8 @@ use crate::metrics::MetricsCollector; use crate::pb; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; use crate::scalar::registry::{ - DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, - VALUE_COLUMN_NAME, + BasicTrainer, DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, + TrainingRequest, VALUE_COLUMN_NAME, }; use crate::scalar::{ AnyQuery, BuiltinIndexType, CreatedIndex, IndexFile, IndexStore, OldIndexDataFilter, @@ -1676,10 +1676,7 @@ async fn write_empty_fmindex_partition(store: &dyn IndexStore) -> Result &str { - "Fm" - } +impl BasicTrainer for FMIndexPlugin { fn new_training_request( &self, _params: &str, @@ -1713,6 +1710,17 @@ impl ScalarIndexPlugin for FMIndexPlugin { files, }) } +} + +#[async_trait] +impl ScalarIndexPlugin for FMIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "Fm" + } fn provides_exact_answer(&self) -> bool { true } @@ -1766,6 +1774,7 @@ mod tests { use std::sync::Arc; use crate::scalar::lance_format::LanceIndexStore; + use crate::scalar::registry::BasicTrainer; #[test] fn test_fmindex_build_and_search() { diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index d0bb0e40d3a..deedf20b97c 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -119,7 +119,9 @@ use crate::{ scalar::{ CreatedIndex, ScalarIndex, expression::{FtsQueryParser, ScalarQueryParser}, - registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest}, + registry::{ + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + }, }, }; @@ -193,11 +195,7 @@ impl TrainingRequest for InvertedIndexTrainingRequest { } #[async_trait] -impl ScalarIndexPlugin for InvertedIndexPlugin { - fn name(&self) -> &str { - "Inverted" - } - +impl BasicTrainer for InvertedIndexPlugin { fn new_training_request( &self, params: &str, @@ -219,33 +217,6 @@ impl ScalarIndexPlugin for InvertedIndexPlugin { Ok(Box::new(InvertedIndexTrainingRequest::new(params))) } - fn provides_exact_answer(&self) -> bool { - false - } - - fn version(&self) -> u32 { - max_supported_fts_format_version().index_version() - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - let Ok(index_details) = _index_details.to_msg::() else { - return None; - }; - - if Self::can_accelerate_queries(&index_details) { - Some(Box::new(FtsQueryParser::new( - index_name, - self.name().to_string(), - ))) - } else { - None - } - } - /// Train a new index /// /// The provided data must fulfill all the criteria returned by `training_criteria`. @@ -280,6 +251,44 @@ impl ScalarIndexPlugin for InvertedIndexPlugin { ) .await } +} + +#[async_trait] +impl ScalarIndexPlugin for InvertedIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "Inverted" + } + + fn provides_exact_answer(&self) -> bool { + false + } + + fn version(&self) -> u32 { + max_supported_fts_format_version().index_version() + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + let Ok(index_details) = _index_details.to_msg::() else { + return None; + }; + + if Self::can_accelerate_queries(&index_details) { + Some(Box::new(FtsQueryParser::new( + index_name, + self.name().to_string(), + ))) + } else { + None + } + } /// Load an index from storage /// diff --git a/rust/lance-index/src/scalar/json.rs b/rust/lance-index/src/scalar/json.rs index 7adf055db61..662cd3a37d9 100644 --- a/rust/lance-index/src/scalar/json.rs +++ b/rust/lance-index/src/scalar/json.rs @@ -39,7 +39,9 @@ use crate::{ scalar::{ AnyQuery, CreatedIndex, IndexStore, ScalarIndex, SearchResult, UpdateCriteria, expression::{IndexedExpression, ScalarIndexExpr, ScalarIndexSearch, ScalarQueryParser}, - registry::{ScalarIndexPlugin, TrainingCriteria, TrainingRequest, VALUE_COLUMN_NAME}, + registry::{ + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingRequest, VALUE_COLUMN_NAME, + }, }, }; @@ -670,11 +672,7 @@ impl JsonIndexPlugin { } #[async_trait] -impl ScalarIndexPlugin for JsonIndexPlugin { - fn name(&self) -> &str { - "Json" - } - +impl BasicTrainer for JsonIndexPlugin { fn new_training_request( &self, params: &str, @@ -692,7 +690,12 @@ impl ScalarIndexPlugin for JsonIndexPlugin { let params = serde_json::from_str::(params)?; let registry = self.registry()?; let target_plugin = registry.get_plugin_by_name(¶ms.target_index_type)?; - let target_request = target_plugin.new_training_request( + let target_trainer = target_plugin.basic_trainer().ok_or_else(|| { + Error::invalid_input_source( + format!("The '{}' index type does not support basic training, please refer to the index's documentation for more details on how to create this index.", params.target_index_type).into(), + ) + })?; + let target_request = target_trainer.new_training_request( params.target_index_parameters.as_deref().unwrap_or("{}"), &Field::new("", target_type, true), )?; @@ -700,39 +703,6 @@ impl ScalarIndexPlugin for JsonIndexPlugin { Ok(Box::new(JsonTrainingRequest::new(params, target_request))) } - fn provides_exact_answer(&self) -> bool { - // TODO: Need to lookup target plugin via details to figure this out correctly - true - } - - fn attach_registry(&self, registry: Arc) { - let mut reg_ref = self.registry.lock().unwrap(); - *reg_ref = Some(registry); - } - - fn version(&self) -> u32 { - JSON_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - index_details: &prost_types::Any, - ) -> Option> { - // TODO: Allow return Result here - let registry = self.registry().unwrap(); - let json_details = - crate::pb::JsonIndexDetails::decode(index_details.value.as_slice()).unwrap(); - let target_details = json_details.target_details.as_ref().expect_ok().unwrap(); - let target_plugin = registry.get_plugin_by_details(target_details).unwrap(); - // TODO: Use something like ${index_name}_${path} for the index name? Don't have access to path here tho - let target_parser = target_plugin.new_query_parser(index_name, index_details)?; - Some(Box::new(JsonQueryParser::new( - json_details.path.clone(), - target_parser, - )) as Box) - } - async fn train_index( &self, data: SendableRecordBatchStream, @@ -759,7 +729,12 @@ impl ScalarIndexPlugin for JsonIndexPlugin { let target_plugin = registry.get_plugin_by_name(&request.parameters.target_index_type)?; // Create a new training request with the inferred type - let target_request = target_plugin.new_training_request( + let target_trainer = target_plugin.basic_trainer().ok_or_else(|| { + Error::invalid_input_source( + format!("The '{}' index type does not support basic training, please refer to the index's documentation for more details on how to create this index.", request.parameters.target_index_type).into(), + ) + })?; + let target_request = target_trainer.new_training_request( request .parameters .target_index_parameters @@ -768,7 +743,7 @@ impl ScalarIndexPlugin for JsonIndexPlugin { &Field::new("", inferred_type, true), )?; - let target_index = target_plugin + let target_index = target_trainer .train_index( converted_stream, index_store, @@ -788,6 +763,50 @@ impl ScalarIndexPlugin for JsonIndexPlugin { files: target_index.files, }) } +} + +#[async_trait] +impl ScalarIndexPlugin for JsonIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "Json" + } + + fn provides_exact_answer(&self) -> bool { + // TODO: Need to lookup target plugin via details to figure this out correctly + true + } + + fn attach_registry(&self, registry: Arc) { + let mut reg_ref = self.registry.lock().unwrap(); + *reg_ref = Some(registry); + } + + fn version(&self) -> u32 { + JSON_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + index_details: &prost_types::Any, + ) -> Option> { + // TODO: Allow return Result here + let registry = self.registry().unwrap(); + let json_details = + crate::pb::JsonIndexDetails::decode(index_details.value.as_slice()).unwrap(); + let target_details = json_details.target_details.as_ref().expect_ok().unwrap(); + let target_plugin = registry.get_plugin_by_details(target_details).unwrap(); + // TODO: Use something like ${index_name}_${path} for the index name? Don't have access to path here tho + let target_parser = target_plugin.new_query_parser(index_name, index_details)?; + Some(Box::new(JsonQueryParser::new( + json_details.path.clone(), + target_parser, + )) as Box) + } async fn load_index( &self, diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index 8e07a607bff..892df027f3e 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -36,8 +36,8 @@ use crate::pbold; use crate::scalar::bitmap::{BitmapIndexPlugin, BitmapIndexState}; use crate::scalar::expression::{LabelListQueryParser, ScalarQueryParser}; use crate::scalar::registry::{ - DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, - VALUE_COLUMN_NAME, + BasicTrainer, DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, + TrainingRequest, VALUE_COLUMN_NAME, }; use crate::scalar::{CreatedIndex, UpdateCriteria}; use crate::{Index, IndexType}; @@ -586,11 +586,7 @@ impl CacheKey for LabelListIndexStateKey { pub struct LabelListIndexPlugin; #[async_trait] -impl ScalarIndexPlugin for LabelListIndexPlugin { - fn name(&self) -> &str { - "LabelList" - } - +impl BasicTrainer for LabelListIndexPlugin { fn new_training_request( &self, _params: &str, @@ -612,25 +608,6 @@ impl ScalarIndexPlugin for LabelListIndexPlugin { ))) } - fn provides_exact_answer(&self) -> bool { - true - } - - fn version(&self) -> u32 { - LABEL_LIST_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - Some(Box::new(LabelListQueryParser::new( - index_name, - self.name().to_string(), - ))) - } - /// Train a new index /// /// The provided data must fulfill all the criteria returned by `training_criteria` @@ -687,6 +664,36 @@ impl ScalarIndexPlugin for LabelListIndexPlugin { files: vec![file], }) } +} + +#[async_trait] +impl ScalarIndexPlugin for LabelListIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "LabelList" + } + + fn provides_exact_answer(&self) -> bool { + true + } + + fn version(&self) -> u32 { + LABEL_LIST_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(LabelListQueryParser::new( + index_name, + self.name().to_string(), + ))) + } /// Load an index from storage async fn load_index( diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 2f82deb8403..0d6ba848f9c 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -549,7 +549,7 @@ mod tests { use crate::scalar::bitmap::BitmapIndexPlugin; use crate::scalar::btree::{BTreeIndexPlugin, BTreeParameters}; use crate::scalar::label_list::LabelListIndexPlugin; - use crate::scalar::registry::{ScalarIndexPlugin, VALUE_COLUMN_NAME}; + use crate::scalar::registry::{BasicTrainer, ScalarIndexPlugin, VALUE_COLUMN_NAME}; use crate::scalar::{ LabelListQuery, SargableQuery, ScalarIndex, SearchResult, bitmap::BitmapIndex, diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index b452ef78c85..153e9c95339 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -20,8 +20,8 @@ use crate::metrics::NoOpMetricsCollector; use crate::pbold; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; use crate::scalar::registry::{ - DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, - VALUE_COLUMN_NAME, + BasicTrainer, DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, + TrainingRequest, VALUE_COLUMN_NAME, }; use crate::scalar::{CreatedIndex, UpdateCriteria}; use crate::vector::VectorIndex; @@ -1286,11 +1286,7 @@ impl NGramIndexPlugin { } #[async_trait] -impl ScalarIndexPlugin for NGramIndexPlugin { - fn name(&self) -> &str { - "NGram" - } - +impl BasicTrainer for NGramIndexPlugin { fn new_training_request( &self, _params: &str, @@ -1308,29 +1304,6 @@ impl ScalarIndexPlugin for NGramIndexPlugin { ))) } - fn provides_exact_answer(&self) -> bool { - false - } - - fn version(&self) -> u32 { - NGRAM_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - Some(Box::new(TextQueryParser::new( - index_name, - self.name().to_string(), - // needs_recheck: ngram results are an inexact candidate superset. - true, - // supports_regex: the ngram index can answer regex queries. - true, - ))) - } - async fn train_index( &self, data: SendableRecordBatchStream, @@ -1353,6 +1326,40 @@ impl ScalarIndexPlugin for NGramIndexPlugin { files: vec![file], }) } +} + +#[async_trait] +impl ScalarIndexPlugin for NGramIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "NGram" + } + + fn provides_exact_answer(&self) -> bool { + false + } + + fn version(&self) -> u32 { + NGRAM_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(TextQueryParser::new( + index_name, + self.name().to_string(), + // needs_recheck: ngram results are an inexact candidate superset. + true, + // supports_regex: the ngram index can answer regex queries. + true, + ))) + } async fn load_index( &self, diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index 0add98d8ab3..869fe5174c3 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -58,15 +58,12 @@ impl TrainingCriteria { } } -/// A trait that describes what criteria is needed to train an index +/// A trait object for plugin-specific training parameters and data requirements. /// -/// The training process has two steps. First, the parameters are given to the -/// plugin and it creates a TrainingRequest. Then, the caller prepares the training -/// data and calls train_index. -/// -/// The call to train_index will include the training request. This allows the plugin -/// to stash any deserialized parameter info in the request and fetch it later during -/// training by downcasting to the appropriate type. +/// Returned by [`BasicTrainer::new_training_request`]. The caller uses +/// [`criteria`](Self::criteria) to prepare the training data stream, then passes +/// the request back to [`BasicTrainer::train_index`], which may downcast +/// it to the plugin-specific concrete type to recover parsed parameters. pub trait TrainingRequest: std::any::Any + Send + Sync { fn as_any(&self) -> &dyn std::any::Any; fn criteria(&self) -> &TrainingCriteria; @@ -93,26 +90,35 @@ impl TrainingRequest for DefaultTrainingRequest { } } -/// A trait for scalar index plugins +/// Implemented by indexes that can train on a stream of column data. +/// +/// The training process has two stages. In the first stage, the caller provides +/// index parameters and receives a [`TrainingRequest`] that describes what criteria +/// the training data must satisfy (e.g. sort order, row-ID availability). In the +/// second stage, the caller prepares the data accordingly and calls +/// [`train_index`](Self::train_index). +/// +/// Any scalar index plugin that builds from a column data stream should implement +/// this trait. #[async_trait] -pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { - /// Creates a new training request from the given parameters +pub trait BasicTrainer: Send + Sync { + /// Creates a new training request from the given parameters. /// - /// This training request specifies the criteria that the data must satisfy to train the index. - /// For example, does the index require the input data to be sorted? + /// The returned request specifies the criteria the training data must satisfy. + /// It is the caller's responsibility to prepare data that meets those criteria + /// before calling [`train_index`](Self::train_index). fn new_training_request(&self, params: &str, field: &Field) -> Result>; - /// Train a new index - /// - /// The provided data must fulfill all the criteria returned by `training_criteria`. - /// It is the caller's responsibility to ensure this. + /// Train a new index from a prepared data stream. /// - /// Returns index details that describe the index. These details can potentially be - /// useful for planning (although this will currently require inside information on - /// the index type) and they will need to be provided when loading the index. + /// The provided data must fulfill all the criteria returned by + /// [`new_training_request`](Self::new_training_request). It is the caller's + /// responsibility to ensure this. /// - /// It is the caller's responsibility to store these details somewhere. + /// Returns index details describing the index. These details may be useful for + /// planning and must be provided when loading the index. It is the caller's + /// responsibility to store them. async fn train_index( &self, data: SendableRecordBatchStream, @@ -121,6 +127,36 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { fragment_ids: Option>, progress: Arc, ) -> Result; +} + +/// A trait for scalar index plugins +#[async_trait] +pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { + /// Returns this plugin's [`BasicTrainer`] implementation, if any. + /// + /// Training an index can be a complex process. For example, a btree index might + /// be trained using a shuffler from a distributed OLAP system such as + /// Spark or Ray. A vector index can be trained by sampling the column to create + /// a kmeans model and then streaming the vectors to assign partitions. Encapsulating + /// the entire set of possible approaches is beyond what this trait can model. + /// This is especially true because this is a low-level crate with no concept of a table + /// or a dataset. + /// + /// However, in many cases, an index can be trained on a (potentially sorted) stream + /// of column data. There is also significant utility in being able to provide users + /// with a simple generic "create an index" API. + /// + /// This method is a compromise. Indexes that support training on a stream of column + /// data should override this to return `Some(self)`. Indexes that need their own + /// individualized training approaches should return `None` and provide their own + /// methods for training. + /// + /// An index can take both approaches. Providing a simple (but maybe less + /// efficient) stream-based trainer while also providing more specialized index + /// creation methods elsewhere. + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + None + } /// A short name for the index /// diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index 5d5ac2a3a92..3e49b92fe50 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -6,7 +6,7 @@ use crate::metrics::{MetricsCollector, NoOpMetricsCollector}; use crate::scalar::expression::{GeoQueryParser, ScalarQueryParser}; use crate::scalar::lance_format::LanceIndexStore; use crate::scalar::registry::{ - ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, }; use crate::scalar::rtree::sort::Sorter; use crate::scalar::{ @@ -915,11 +915,7 @@ impl RTreeIndexPlugin { } #[async_trait] -impl ScalarIndexPlugin for RTreeIndexPlugin { - fn name(&self) -> &str { - "RTree" - } - +impl BasicTrainer for RTreeIndexPlugin { fn new_training_request( &self, params: &str, @@ -973,6 +969,17 @@ impl ScalarIndexPlugin for RTreeIndexPlugin { files, }) } +} + +#[async_trait] +impl ScalarIndexPlugin for RTreeIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "RTree" + } fn provides_exact_answer(&self) -> bool { false diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 8e7e20c211a..62a6b912e34 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -16,7 +16,7 @@ use crate::Any; use crate::pbold; use crate::scalar::expression::{SargableQueryParser, ScalarQueryParser}; use crate::scalar::registry::{ - ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + BasicTrainer, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, }; use crate::scalar::{ BuiltinIndexType, CreatedIndex, IndexFile, SargableQuery, ScalarIndexParams, UpdateCriteria, @@ -968,11 +968,7 @@ impl TrainingRequest for ZoneMapIndexTrainingRequest { } #[async_trait] -impl ScalarIndexPlugin for ZoneMapIndexPlugin { - fn name(&self) -> &str { - "ZoneMap" - } - +impl BasicTrainer for ZoneMapIndexPlugin { fn new_training_request( &self, params: &str, @@ -989,26 +985,6 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { Ok(Box::new(ZoneMapIndexTrainingRequest::new(params))) } - fn provides_exact_answer(&self) -> bool { - false - } - - fn version(&self) -> u32 { - ZONEMAP_INDEX_VERSION - } - - fn new_query_parser( - &self, - index_name: String, - _index_details: &prost_types::Any, - ) -> Option> { - Some(Box::new(SargableQueryParser::new( - index_name, - self.name().to_string(), - true, - ))) - } - async fn train_index( &self, data: SendableRecordBatchStream, @@ -1032,6 +1008,37 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { files: vec![file], }) } +} + +#[async_trait] +impl ScalarIndexPlugin for ZoneMapIndexPlugin { + fn basic_trainer(&self) -> Option<&dyn BasicTrainer> { + Some(self) + } + + fn name(&self) -> &str { + "ZoneMap" + } + + fn provides_exact_answer(&self) -> bool { + false + } + + fn version(&self) -> u32 { + ZONEMAP_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(SargableQueryParser::new( + index_name, + self.name().to_string(), + true, + ))) + } async fn load_index( &self, diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index aae924378da..59b04ae30f6 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -35,7 +35,7 @@ use lance_core::{Error, ROW_ID, Result}; use lance_index::progress::noop_progress; use lance_index::registry::IndexPluginRegistry; use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::scalar::registry::VALUE_COLUMN_NAME; +use lance_index::scalar::registry::{BasicTrainer, VALUE_COLUMN_NAME}; use lance_index::scalar::{BuiltinIndexType, CreatedIndex, ScalarIndexParams}; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_io::stream::RecordBatchStream as LanceRecordBatchStream; @@ -1177,10 +1177,21 @@ impl ManifestNamespace { index_uuid: Uuid, ) -> Result { let index_store = LanceIndexStore::from_dataset_for_new(dataset, &index_uuid)?; - let plugin = registry.get_plugin_by_name(&input.params.index_type)?; - let training_request = plugin + let trainer = registry + .get_plugin_by_name(&input.params.index_type)? + .basic_trainer() + .ok_or_else(|| { + lance_core::Error::invalid_input_source( + format!( + "The '{}' index type does not support basic training, please refer to the index's documentation for more details on how to create this index.", + input.params.index_type + ) + .into(), + ) + })?; + let training_request = trainer .new_training_request(input.params.params.as_deref().unwrap_or("{}"), &input.field)?; - let created_index = plugin + let created_index = trainer .train_index( input.stream, &index_store, diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index ae2478589fb..9490554d0bd 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -296,8 +296,13 @@ pub(super) async fn build_scalar_index( let index_store = LanceIndexStore::from_dataset_for_new(dataset, &uuid)?; let plugin = SCALAR_INDEX_PLUGIN_REGISTRY.get_plugin_by_name(¶ms.index_type)?; + let trainer = plugin.basic_trainer().ok_or_else(|| { + Error::invalid_input_source( + format!("The '{}' index type does not support basic training, please refer to the index's documentation for more details on how to create this index.", params.index_type).into(), + ) + })?; let training_request = - plugin.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)?; + trainer.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)?; progress.stage_start("load_data", None, "rows").await?; let training_data = match preprocessed_data { @@ -316,7 +321,7 @@ pub(super) async fn build_scalar_index( }; progress.stage_complete("load_data").await?; - let created_index = plugin + let created_index = trainer .train_index( training_data, &index_store, @@ -353,8 +358,13 @@ pub(super) async fn build_bitmap_index_segment( let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap); let plugin = SCALAR_INDEX_PLUGIN_REGISTRY.get_plugin_by_name(¶ms.index_type)?; + let trainer = plugin.basic_trainer().ok_or_else(|| { + Error::invalid_input_source( + format!("The '{}' index type does not support basic training, please refer to the index's documentation for more details on how to create this index.", params.index_type).into(), + ) + })?; let training_request = - plugin.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)?; + trainer.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)?; let criteria = training_request.criteria(); progress.stage_start("load_data", None, "rows").await?; @@ -363,7 +373,7 @@ pub(super) async fn build_bitmap_index_segment( progress.stage_complete("load_data").await?; let index_store = LanceIndexStore::from_dataset_for_new(dataset, &uuid)?; - plugin + trainer .train_index( training_data, &index_store,