From 55c248212d7bb115ba8705faef5e904e64fc15cd Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 22 Jun 2026 14:04:31 +0000 Subject: [PATCH] refactor(index): introduce RowIdRemapper trait to decouple ScalarIndexPlugin from FragReuseIndex Replaces the concrete `FragReuseIndex` parameter in `ScalarIndexPlugin::load_index` and `get_from_cache` with an abstract `RowIdRemapper` trait, removing the direct dependency of scalar index infrastructure on the table-level fragment-reuse type. `FragReuseIndex` implements the new trait and the call site in lance casts before passing through. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-index/src/frag_reuse.rs | 27 ++++++++++++++++++- rust/lance-index/src/scalar.rs | 23 ++++++++++++++-- rust/lance-index/src/scalar/bitmap.rs | 15 +++++------ rust/lance-index/src/scalar/bloomfilter.rs | 7 +++-- rust/lance-index/src/scalar/btree.rs | 21 +++++++-------- rust/lance-index/src/scalar/fmindex.rs | 7 +++-- rust/lance-index/src/scalar/inverted.rs | 5 ++-- rust/lance-index/src/scalar/inverted/index.rs | 14 +++++----- .../src/scalar/inverted/lazy_docset.rs | 6 ++--- rust/lance-index/src/scalar/json.rs | 5 ++-- rust/lance-index/src/scalar/label_list.rs | 13 +++++---- rust/lance-index/src/scalar/ngram.rs | 13 +++++---- rust/lance-index/src/scalar/registry.rs | 6 ++--- rust/lance-index/src/scalar/rtree.rs | 9 +++---- rust/lance-index/src/scalar/zonemap.rs | 10 +++---- rust/lance/src/index/scalar.rs | 7 +++-- 16 files changed, 113 insertions(+), 75 deletions(-) diff --git a/rust/lance-index/src/frag_reuse.rs b/rust/lance-index/src/frag_reuse.rs index d09d8dc0684..50f8e787a6f 100644 --- a/rust/lance-index/src/frag_reuse.rs +++ b/rust/lance-index/src/frag_reuse.rs @@ -10,15 +10,40 @@ use std::any::Any; use std::sync::Arc; +use arrow_array::RecordBatch; use async_trait::async_trait; use lance_core::{Error, Result}; -use roaring::RoaringBitmap; +use lance_select::RowAddrTreeMap; +use roaring::{RoaringBitmap, RoaringTreemap}; use serde::Serialize; pub use lance_table::system_index::frag_reuse::*; +use crate::scalar::RowIdRemapper; use crate::{Index, IndexType}; +impl RowIdRemapper for FragReuseIndex { + fn remap_row_id(&self, row_id: u64) -> Option { + self.remap_row_id(row_id) + } + + fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap { + self.remap_row_addrs_tree_map(row_addrs) + } + + fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap { + self.remap_row_ids_roaring_tree_map(row_ids) + } + + fn remap_row_ids_record_batch( + &self, + batch: RecordBatch, + row_id_idx: usize, + ) -> Result { + self.remap_row_ids_record_batch(batch, row_id_idx) + } +} + #[derive(Serialize)] struct FragReuseStatistics { num_versions: usize, diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index a287d277a81..5fb549bfc16 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -24,7 +24,7 @@ use lance_core::deepsize::DeepSizeOf; use lance_core::{Error, Result}; use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter}; use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; -use roaring::RoaringBitmap; +use roaring::{RoaringBitmap, RoaringTreemap}; use serde::Serialize; use crate::metrics::MetricsCollector; @@ -48,7 +48,6 @@ pub mod rtree; pub mod zoned; pub mod zonemap; -use crate::frag_reuse::FragReuseIndex; pub use inverted::tokenizer::InvertedIndexParams; use lance_datafusion::udf::CONTAINS_TOKENS_UDF; @@ -1076,3 +1075,23 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { /// with the same configuration on another dataset. fn derive_index_params(&self) -> Result; } + +/// Abstraction over any type that can remap row IDs during index loading. +/// +/// This decouples scalar index plugins from the table-level [`crate::frag_reuse::FragReuseIndex`] +/// type. [`crate::frag_reuse::FragReuseIndex`] implements this trait, but callers may also +/// supply custom implementations for testing or other remapping strategies. +pub trait RowIdRemapper: Send + Sync + std::fmt::Debug { + /// Remap a single row id. Returns `None` if the row was deleted. + fn remap_row_id(&self, row_id: u64) -> Option; + /// Remap all addresses in a [`RowAddrTreeMap`], dropping deleted rows. + fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap; + /// Remap all row ids in a [`RoaringTreemap`], dropping deleted rows. + fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap; + /// Remap the row-id column at `row_id_idx` inside `batch`, dropping deleted rows. + fn remap_row_ids_record_batch( + &self, + batch: RecordBatch, + row_id_idx: usize, + ) -> Result; +} diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index c2a6e80e82b..7a363feb711 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -42,10 +42,9 @@ use super::{ use crate::pbold; use crate::{Index, IndexType, metrics::MetricsCollector}; use crate::{ - frag_reuse::FragReuseIndex, progress::IndexBuildProgress, scalar::{ - CreatedIndex, UpdateCriteria, + CreatedIndex, RowIdRemapper, UpdateCriteria, expression::SargableQueryParser, registry::{ ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, @@ -125,7 +124,7 @@ pub struct BitmapIndex { index_cache: WeakLanceCache, - frag_reuse_index: Option>, + frag_reuse_index: Option>, lazy_reader: LazyIndexReader, } @@ -200,7 +199,7 @@ impl BitmapIndexState { &self, store: Arc, index_cache: &LanceCache, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result> { Ok(Arc::new(BitmapIndex::new( self.index_map.clone(), @@ -335,7 +334,7 @@ impl BitmapIndex { value_type: DataType, store: Arc, index_cache: WeakLanceCache, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Self { let lazy_reader = LazyIndexReader::new(store.clone()); Self { @@ -351,7 +350,7 @@ impl BitmapIndex { pub(crate) async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> { let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?; @@ -1766,7 +1765,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) @@ -1775,7 +1774,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { async fn get_from_cache( &self, index_store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result>> { let Some(state) = cache.get_with_key(&BitmapIndexStateKey).await else { diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 856f08af772..8d4f1c3f601 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -27,8 +27,7 @@ use std::sync::LazyLock; use datafusion::execution::SendableRecordBatchStream; use std::{collections::HashMap, sync::Arc}; -use crate::scalar::FragReuseIndex; -use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult}; +use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, RowIdRemapper, ScalarIndex, SearchResult}; use crate::vector::VectorIndex; use crate::{Index, IndexType}; use arrow_array::{ArrayRef, RecordBatch}; @@ -90,7 +89,7 @@ impl DeepSizeOf for BloomFilterIndex { impl BloomFilterIndex { async fn load( store: Arc, - _fri: Option>, + _fri: Option>, _index_cache: &LanceCache, ) -> Result> { let index_file = store.open_index_file(BLOOMFILTER_FILENAME).await?; @@ -1107,7 +1106,7 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok( diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 85c42e9b048..10c66db0e76 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -18,10 +18,9 @@ use super::{ use crate::cache_pb::{BTreeIndexHeader, RangeToFile}; use crate::{Index, IndexType}; use crate::{ - frag_reuse::FragReuseIndex, progress::{IndexBuildProgress, noop_progress}, scalar::{ - CreatedIndex, UpdateCriteria, + CreatedIndex, RowIdRemapper, UpdateCriteria, expression::{SargableQueryParser, ScalarQueryParser}, registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME}, }, @@ -1393,7 +1392,7 @@ impl BTreeIndexState { &self, store: Arc, index_cache: &LanceCache, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result> { let index = BTreeIndex::try_from_serialized( self.lookup_batch.clone(), @@ -1519,7 +1518,7 @@ pub struct BTreeIndex { /// - The local page_idx is calculated: `142 - 100 = 42`. /// - The system now knows to read page `42` from the file `part_2_page_file.lance`. ranges_to_files: Option>>, - frag_reuse_index: Option>, + frag_reuse_index: Option>, } impl DeepSizeOf for BTreeIndex { @@ -1540,7 +1539,7 @@ impl BTreeIndex { index_cache: WeakLanceCache, batch_size: u64, ranges_to_files: Option>>, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Self { Self { page_lookup, @@ -1696,7 +1695,7 @@ impl BTreeIndex { index_cache: &LanceCache, batch_size: u64, ranges_to_files: Option>>, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result { let data_type = data.column(0).data_type().clone(); let page_lookup = Arc::new(BTreeLookup::try_new(data)?); @@ -1714,7 +1713,7 @@ impl BTreeIndex { async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> { let (page_lookup_file, standalone_partition_page_file) = @@ -1950,7 +1949,7 @@ fn filter_keeps_nothing(filter: &Option) -> bool { fn remap_row_ids( stream: SendableRecordBatchStream, - frag_reuse_index: Arc, + frag_reuse_index: Arc, ) -> SendableRecordBatchStream { let schema = stream.schema(); let remapped = stream.map(move |batch_result| { @@ -3291,7 +3290,7 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc) @@ -3300,7 +3299,7 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { async fn get_from_cache( &self, index_store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result>> { let Some(state) = cache.get_with_key(&BTreeIndexStateKey).await else { @@ -6292,7 +6291,7 @@ mod tests { // Remap row 0 -> row 5000 (outside the original [0, 1000) range so no collision). // Querying for value == 0 should now return row 5000, confirming reconstruct threaded // the FragReuseIndex through to the rebuilt BTreeIndex. - let frag_reuse_index = Arc::new(FragReuseIndex::new( + let frag_reuse_index: Arc = Arc::new(FragReuseIndex::new( Uuid::new_v4(), vec![HashMap::from([(0u64, Some(5000u64))])], FragReuseIndexDetails { versions: vec![] }, diff --git a/rust/lance-index/src/scalar/fmindex.rs b/rust/lance-index/src/scalar/fmindex.rs index cdf19f0304c..f16c2236d2a 100644 --- a/rust/lance-index/src/scalar/fmindex.rs +++ b/rust/lance-index/src/scalar/fmindex.rs @@ -34,7 +34,6 @@ use lance_core::deepsize::DeepSizeOf; use lance_core::{Error, ROW_ADDR, Result}; use roaring::RoaringBitmap; -use crate::frag_reuse::FragReuseIndex; use crate::metrics::MetricsCollector; use crate::pb; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; @@ -44,7 +43,7 @@ use crate::scalar::registry::{ }; use crate::scalar::{ AnyQuery, BuiltinIndexType, CreatedIndex, IndexFile, IndexStore, OldIndexDataFilter, - ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria, + RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria, }; use crate::vector::VectorIndex; use crate::{Index, IndexType}; @@ -1258,7 +1257,7 @@ impl FMIndexScalarIndex { async fn load( store: Arc, - _fri: Option>, + _fri: Option>, _cache: &LanceCache, ) -> Result> { let files = store.list_files_with_sizes().await?; @@ -1737,7 +1736,7 @@ impl ScalarIndexPlugin for FMIndexPlugin { &self, store: Arc, details: &prost_types::Any, - fri: Option>, + fri: Option>, cache: &LanceCache, ) -> Result> { let _ = details diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index d0bb0e40d3a..4625f3b14c6 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -115,9 +115,8 @@ use lance_core::Error; use crate::pbold; use crate::progress::IndexBuildProgress; use crate::{ - frag_reuse::FragReuseIndex, scalar::{ - CreatedIndex, ScalarIndex, + CreatedIndex, RowIdRemapper, ScalarIndex, expression::{FtsQueryParser, ScalarQueryParser}, registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest}, }, @@ -289,7 +288,7 @@ impl ScalarIndexPlugin for InvertedIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok( diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 41a18c3bd68..b7a4c7a8fdd 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -69,14 +69,14 @@ use super::{ builder::{InnerBuilder, PositionRecorder}, iter::CompressedPostingListIterator, }; -use crate::frag_reuse::FragReuseIndex; use crate::pbold; use crate::progress::IndexBuildProgress; use crate::scalar::inverted::scorer::MemBM25Scorer; use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer; use crate::scalar::{ AnyQuery, BuiltinIndexType, CreatedIndex, IndexReader, IndexStore, MetricsCollector, - OldIndexDataFilter, ScalarIndex, ScalarIndexParams, SearchResult, TokenQuery, UpdateCriteria, + OldIndexDataFilter, RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, TokenQuery, + UpdateCriteria, }; use crate::{FtsPrewarmOptions, Index}; use crate::{prefilter::PreFilter, scalar::inverted::iter::take_fst_keys}; @@ -821,7 +821,7 @@ impl InvertedIndex { async fn load_legacy_index( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> { log::warn!("loading legacy FTS index"); @@ -888,7 +888,7 @@ impl InvertedIndex { pub async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> where @@ -1257,7 +1257,7 @@ impl InvertedPartition { pub async fn load( store: Arc, id: u64, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, token_set_format: TokenSetFormat, ) -> Result { @@ -4698,7 +4698,7 @@ impl DocSet { pub async fn load( reader: Arc, is_legacy: bool, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result { let batch = reader.read_range(0..reader.num_rows(), None).await?; let row_id_col = batch[ROW_ID].as_primitive::(); @@ -4730,7 +4730,7 @@ impl DocSet { row_id_col: &UInt64Array, num_tokens_col: &arrow_array::UInt32Array, is_legacy: bool, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result { // for legacy format, the row id is doc id; sorting keeps binary search viable if is_legacy { diff --git a/rust/lance-index/src/scalar/inverted/lazy_docset.rs b/rust/lance-index/src/scalar/inverted/lazy_docset.rs index 7a0ee41efd8..b711d7f1a98 100644 --- a/rust/lance-index/src/scalar/inverted/lazy_docset.rs +++ b/rust/lance-index/src/scalar/inverted/lazy_docset.rs @@ -24,8 +24,8 @@ use lance_core::ROW_ID; use lance_core::Result; use tokio::sync::OnceCell; -use crate::frag_reuse::FragReuseIndex; use crate::scalar::inverted::index::{DocSet, NUM_TOKEN_COL}; +use crate::scalar::RowIdRemapper; use crate::scalar::{IndexReader, IndexStore}; use lance_select::mask::RowAddrMask; @@ -63,7 +63,7 @@ pub struct DeferredDocSet { store: Arc, docs_path: String, is_legacy: bool, - frag_reuse_index: Option>, + frag_reuse_index: Option>, /// Doc count cached at construction so `len()` stays sync + IO-free. num_rows: usize, /// `sum(num_tokens)` cached on first compute. @@ -122,7 +122,7 @@ impl LazyDocSet { docs_path: String, num_rows: usize, is_legacy: bool, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Self { Self::Deferred(Box::new(DeferredDocSet { store, diff --git a/rust/lance-index/src/scalar/json.rs b/rust/lance-index/src/scalar/json.rs index 7adf055db61..9c2cf8d51eb 100644 --- a/rust/lance-index/src/scalar/json.rs +++ b/rust/lance-index/src/scalar/json.rs @@ -33,11 +33,10 @@ use lance_core::{Error, ROW_ID, Result, cache::LanceCache, error::LanceOptionExt use crate::{ Index, IndexType, - frag_reuse::FragReuseIndex, metrics::MetricsCollector, registry::IndexPluginRegistry, scalar::{ - AnyQuery, CreatedIndex, IndexStore, ScalarIndex, SearchResult, UpdateCriteria, + AnyQuery, CreatedIndex, IndexStore, RowIdRemapper, ScalarIndex, SearchResult, UpdateCriteria, expression::{IndexedExpression, ScalarIndexExpr, ScalarIndexSearch, ScalarQueryParser}, registry::{ScalarIndexPlugin, TrainingCriteria, TrainingRequest, VALUE_COLUMN_NAME}, }, @@ -793,7 +792,7 @@ impl ScalarIndexPlugin for JsonIndexPlugin { &self, index_store: Arc, index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { let registry = self.registry().unwrap(); diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index 8e07a607bff..1c77b674dd9 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -31,7 +31,6 @@ use tracing::instrument; use super::{AnyQuery, IndexFile, IndexStore, LabelListQuery, ScalarIndex, bitmap::BitmapIndex}; use super::{BuiltinIndexType, SargableQuery, ScalarIndexParams}; use super::{MetricsCollector, SearchResult}; -use crate::frag_reuse::FragReuseIndex; use crate::pbold; use crate::scalar::bitmap::{BitmapIndexPlugin, BitmapIndexState}; use crate::scalar::expression::{LabelListQueryParser, ScalarQueryParser}; @@ -39,7 +38,7 @@ use crate::scalar::registry::{ DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME, }; -use crate::scalar::{CreatedIndex, UpdateCriteria}; +use crate::scalar::{CreatedIndex, RowIdRemapper, UpdateCriteria}; use crate::{Index, IndexType}; pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance"; @@ -90,7 +89,7 @@ impl LabelListIndex { async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> { let values_index = @@ -446,7 +445,7 @@ fn unnest_chunks( async fn read_list_nulls( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result { let reader = store.open_index_file(BITMAP_LOOKUP_NAME).await?; if let Some(buffer_idx_str) = reader.schema().metadata.get(LABEL_LIST_NULLS_METADATA_KEY) { @@ -523,7 +522,7 @@ impl LabelListIndexState { self, store: Arc, index_cache: &LanceCache, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result> { let bitmap = self .bitmap_state @@ -693,7 +692,7 @@ impl ScalarIndexPlugin for LabelListIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok( @@ -705,7 +704,7 @@ impl ScalarIndexPlugin for LabelListIndexPlugin { async fn get_from_cache( &self, index_store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result>> { let Some(state) = cache.get_with_key(&LabelListIndexStateKey).await else { diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index b452ef78c85..82a5ad7d362 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -15,7 +15,6 @@ use super::{ AnyQuery, BuiltinIndexType, IndexFile, IndexReader, IndexStore, IndexWriter, MetricsCollector, ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, }; -use crate::frag_reuse::FragReuseIndex; use crate::metrics::NoOpMetricsCollector; use crate::pbold; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; @@ -23,7 +22,7 @@ use crate::scalar::registry::{ DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME, }; -use crate::scalar::{CreatedIndex, UpdateCriteria}; +use crate::scalar::{CreatedIndex, RowIdRemapper, UpdateCriteria}; use crate::vector::VectorIndex; use crate::{Index, IndexType}; use arrow::array::{AsArray, UInt32Builder}; @@ -187,7 +186,7 @@ impl CacheKey for NGramPostingListKey { impl NGramPostingList { fn try_from_batch( batch: RecordBatch, - frag_reuse_index: Option>, + frag_reuse_index: Option>, ) -> Result { let bitmap_bytes = batch.column(0).as_binary::().value(0); let mut bitmap = RoaringTreemap::deserialize_from(bitmap_bytes) @@ -214,7 +213,7 @@ impl NGramPostingList { /// Reads on-demand ngram posting lists from storage (and stores them in a cache) struct NGramPostingListReader { reader: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: WeakLanceCache, } @@ -299,7 +298,7 @@ impl DeepSizeOf for NGramIndex { impl NGramIndex { async fn from_store( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result { let tokens = store.open_index_file(POSTINGS_FILENAME).await?; @@ -375,7 +374,7 @@ impl NGramIndex { async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> where @@ -1358,7 +1357,7 @@ impl ScalarIndexPlugin for NGramIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok(NGramIndex::load(index_store, frag_reuse_index, cache).await? as Arc) diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index 0add98d8ab3..f1b148b88f0 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -14,8 +14,8 @@ use lance_core::{ use crate::progress::IndexBuildProgress; use crate::registry::IndexPluginRegistry; +use crate::scalar::RowIdRemapper; use crate::{ - frag_reuse::FragReuseIndex, scalar::{CreatedIndex, IndexStore, ScalarIndex, expression::ScalarQueryParser}, }; @@ -158,7 +158,7 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { &self, index_store: Arc, index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result>; @@ -177,7 +177,7 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { async fn get_from_cache( &self, _index_store: Arc, - _frag_reuse_index: Option>, + _frag_reuse_index: Option>, cache: &LanceCache, ) -> Result>> { Ok(cache.get_unsized_with_key(&ScalarIndexCacheKey).await) diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index 5d5ac2a3a92..28f2155a407 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use crate::frag_reuse::FragReuseIndex; use crate::metrics::{MetricsCollector, NoOpMetricsCollector}; use crate::scalar::expression::{GeoQueryParser, ScalarQueryParser}; use crate::scalar::lance_format::LanceIndexStore; @@ -11,7 +10,7 @@ use crate::scalar::registry::{ use crate::scalar::rtree::sort::Sorter; use crate::scalar::{ AnyQuery, BuiltinIndexType, CreatedIndex, GeoQuery, IndexFile, IndexReader, IndexStore, - IndexWriter, ScalarIndex, ScalarIndexParams, SearchResult, UpdateCriteria, + IndexWriter, RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, UpdateCriteria, }; use crate::vector::VectorIndex; use crate::{Index, IndexType, pb}; @@ -259,7 +258,7 @@ impl CacheKey for RTreeCacheKey { pub struct RTreeIndex { pub(crate) metadata: Arc, store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: WeakLanceCache, pages_reader: Arc, nulls_reader: Arc, @@ -277,7 +276,7 @@ impl std::fmt::Debug for RTreeIndex { impl RTreeIndex { pub async fn load( store: Arc, - frag_reuse_index: Option>, + frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result> { let pages_reader = store.open_index_file(RTREE_PAGES_NAME).await?; @@ -997,7 +996,7 @@ impl ScalarIndexPlugin for RTreeIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok(RTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 8e7e20c211a..ec708296506 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -36,7 +36,7 @@ use datafusion_common::ScalarValue; use std::{collections::HashMap, sync::Arc}; use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult}; -use crate::scalar::FragReuseIndex; +use crate::scalar::RowIdRemapper; use crate::vector::VectorIndex; use crate::{Index, IndexType}; use async_trait::async_trait; @@ -108,7 +108,7 @@ pub struct ZoneMapIndex { // The maximum rows per zone provided by user rows_per_zone: u64, store: Arc, - fri: Option>, + fri: Option>, index_cache: WeakLanceCache, } @@ -410,7 +410,7 @@ impl ZoneMapIndex { /// Load the scalar index from storage async fn load( store: Arc, - fri: Option>, + fri: Option>, index_cache: &LanceCache, ) -> Result> where @@ -439,7 +439,7 @@ impl ZoneMapIndex { fn try_from_serialized( data: RecordBatch, store: Arc, - fri: Option>, + fri: Option>, index_cache: &LanceCache, rows_per_zone: u64, ) -> Result { @@ -1037,7 +1037,7 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { &self, index_store: Arc, _index_details: &prost_types::Any, - frag_reuse_index: Option>, + frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index ae2478589fb..f528ea81b46 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -50,8 +50,8 @@ use lance_index::scalar::registry::{ }; use lance_index::scalar::{BuiltinIndexType, CreatedIndex, InvertedIndexParams}; use lance_index::scalar::{ - ScalarIndex, ScalarIndexParams, bitmap::BITMAP_LOOKUP_NAME, inverted::INVERT_LIST_FILE, - lance_format::LanceIndexStore, + RowIdRemapper, ScalarIndex, ScalarIndexParams, bitmap::BITMAP_LOOKUP_NAME, + inverted::INVERT_LIST_FILE, lance_format::LanceIndexStore, }; use lance_index::{IndexCriteria, IndexType}; use lance_table::format::{Fragment, IndexMetadata}; @@ -448,6 +448,9 @@ pub async fn open_scalar_index( .index_cache .for_index(&index.uuid, frag_reuse_index.as_ref().map(|f| &f.uuid)); + let frag_reuse_index: Option> = + frag_reuse_index.map(|f| f as Arc); + if let Some(index) = plugin .get_from_cache(index_store.clone(), frag_reuse_index.clone(), &index_cache) .await?