Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion rust/lance-index/src/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,40 @@
use std::any::Any;
use std::sync::Arc;

use arrow_array::RecordBatch;
use async_trait::async_trait;
use lance_core::{Error, Result};
use roaring::RoaringBitmap;
use lance_select::RowAddrTreeMap;
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::Serialize;

pub use lance_table::system_index::frag_reuse::*;

use crate::scalar::RowIdRemapper;
use crate::{Index, IndexType};

impl RowIdRemapper for FragReuseIndex {
fn remap_row_id(&self, row_id: u64) -> Option<u64> {
self.remap_row_id(row_id)
}

fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
self.remap_row_addrs_tree_map(row_addrs)
}

fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
self.remap_row_ids_roaring_tree_map(row_ids)
}

fn remap_row_ids_record_batch(
&self,
batch: RecordBatch,
row_id_idx: usize,
) -> Result<RecordBatch> {
self.remap_row_ids_record_batch(batch, row_id_idx)
}
}

#[derive(Serialize)]
struct FragReuseStatistics {
num_versions: usize,
Expand Down
23 changes: 21 additions & 2 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use lance_core::deepsize::DeepSizeOf;
use lance_core::{Error, Result};
use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
use roaring::RoaringBitmap;
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::Serialize;

use crate::metrics::MetricsCollector;
Expand All @@ -48,7 +48,6 @@ pub mod rtree;
pub mod zoned;
pub mod zonemap;

use crate::frag_reuse::FragReuseIndex;
pub use inverted::tokenizer::InvertedIndexParams;
use lance_datafusion::udf::CONTAINS_TOKENS_UDF;

Expand Down Expand Up @@ -1076,3 +1075,23 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
/// with the same configuration on another dataset.
fn derive_index_params(&self) -> Result<ScalarIndexParams>;
}

/// Abstraction over any type that can remap row IDs during index loading.
///
/// This decouples scalar index plugins from the table-level [`crate::frag_reuse::FragReuseIndex`]
/// type. [`crate::frag_reuse::FragReuseIndex`] implements this trait, but callers may also
/// supply custom implementations for testing or other remapping strategies.
pub trait RowIdRemapper: Send + Sync + std::fmt::Debug {
/// Remap a single row id. Returns `None` if the row was deleted.
fn remap_row_id(&self, row_id: u64) -> Option<u64>;
/// Remap all addresses in a [`RowAddrTreeMap`], dropping deleted rows.
fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap;
/// Remap all row ids in a [`RoaringTreemap`], dropping deleted rows.
fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap;
/// Remap the row-id column at `row_id_idx` inside `batch`, dropping deleted rows.
fn remap_row_ids_record_batch(
&self,
batch: RecordBatch,
row_id_idx: usize,
) -> Result<RecordBatch>;
}
15 changes: 7 additions & 8 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use super::{
use crate::pbold;
use crate::{Index, IndexType, metrics::MetricsCollector};
use crate::{
frag_reuse::FragReuseIndex,
progress::IndexBuildProgress,
scalar::{
CreatedIndex, UpdateCriteria,
CreatedIndex, RowIdRemapper, UpdateCriteria,
expression::SargableQueryParser,
registry::{
ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
Expand Down Expand Up @@ -125,7 +124,7 @@ pub struct BitmapIndex {

index_cache: WeakLanceCache,

frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,

lazy_reader: LazyIndexReader,
}
Expand Down Expand Up @@ -200,7 +199,7 @@ impl BitmapIndexState {
&self,
store: Arc<dyn IndexStore>,
index_cache: &LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Arc<BitmapIndex>> {
Ok(Arc::new(BitmapIndex::new(
self.index_map.clone(),
Expand Down Expand Up @@ -335,7 +334,7 @@ impl BitmapIndex {
value_type: DataType,
store: Arc<dyn IndexStore>,
index_cache: WeakLanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Self {
let lazy_reader = LazyIndexReader::new(store.clone());
Self {
Expand All @@ -351,7 +350,7 @@ impl BitmapIndex {

pub(crate) async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
Expand Down Expand Up @@ -1766,7 +1765,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
Expand All @@ -1775,7 +1774,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
async fn get_from_cache(
&self,
index_store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Option<Arc<dyn ScalarIndex>>> {
let Some(state) = cache.get_with_key(&BitmapIndexStateKey).await else {
Expand Down
7 changes: 3 additions & 4 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@

use std::sync::LazyLock;

use datafusion::execution::SendableRecordBatchStream;

Check warning on line 27 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
use std::{collections::HashMap, sync::Arc};

use crate::scalar::FragReuseIndex;
use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, RowIdRemapper, ScalarIndex, SearchResult};
use crate::vector::VectorIndex;
use crate::{Index, IndexType};
use arrow_array::{ArrayRef, RecordBatch};
Expand Down Expand Up @@ -90,7 +89,7 @@
impl BloomFilterIndex {
async fn load(
store: Arc<dyn IndexStore>,
_fri: Option<Arc<FragReuseIndex>>,
_fri: Option<Arc<dyn RowIdRemapper>>,
_index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let index_file = store.open_index_file(BLOOMFILTER_FILENAME).await?;
Expand Down Expand Up @@ -1107,7 +1106,7 @@
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(
Expand Down
21 changes: 10 additions & 11 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
AnyQuery, BuiltinIndexType, IndexFile, IndexReader, IndexStore, IndexWriter, MetricsCollector,
OldIndexDataFilter, SargableQuery, ScalarIndex, ScalarIndexParams, SearchResult,
compute_next_prefix,
};

Check warning on line 17 in rust/lance-index/src/scalar/btree.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/btree.rs
use crate::cache_pb::{BTreeIndexHeader, RangeToFile};
use crate::{Index, IndexType};
use crate::{
frag_reuse::FragReuseIndex,
progress::{IndexBuildProgress, noop_progress},
scalar::{
CreatedIndex, UpdateCriteria,
CreatedIndex, RowIdRemapper, UpdateCriteria,
expression::{SargableQueryParser, ScalarQueryParser},
registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},

Check warning on line 25 in rust/lance-index/src/scalar/btree.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/btree.rs
},
};
use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
Expand Down Expand Up @@ -1393,7 +1392,7 @@
&self,
store: Arc<dyn IndexStore>,
index_cache: &LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Arc<dyn ScalarIndex>> {
let index = BTreeIndex::try_from_serialized(
self.lookup_batch.clone(),
Expand Down Expand Up @@ -1519,7 +1518,7 @@
/// - The local page_idx is calculated: `142 - 100 = 42`.
/// - The system now knows to read page `42` from the file `part_2_page_file.lance`.
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
}

impl DeepSizeOf for BTreeIndex {
Expand All @@ -1540,7 +1539,7 @@
index_cache: WeakLanceCache,
batch_size: u64,
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Self {
Self {
page_lookup,
Expand Down Expand Up @@ -1696,7 +1695,7 @@
index_cache: &LanceCache,
batch_size: u64,
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Self> {
let data_type = data.column(0).data_type().clone();
let page_lookup = Arc::new(BTreeLookup::try_new(data)?);
Expand All @@ -1714,7 +1713,7 @@

async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let (page_lookup_file, standalone_partition_page_file) =
Expand Down Expand Up @@ -1950,7 +1949,7 @@

fn remap_row_ids(
stream: SendableRecordBatchStream,
frag_reuse_index: Arc<FragReuseIndex>,
frag_reuse_index: Arc<dyn RowIdRemapper>,
) -> SendableRecordBatchStream {
let schema = stream.schema();
let remapped = stream.map(move |batch_result| {
Expand Down Expand Up @@ -3291,7 +3290,7 @@
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
Expand All @@ -3300,7 +3299,7 @@
async fn get_from_cache(
&self,
index_store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Option<Arc<dyn ScalarIndex>>> {
let Some(state) = cache.get_with_key(&BTreeIndexStateKey).await else {
Expand Down Expand Up @@ -6289,10 +6288,10 @@
ranges_to_files: index.ranges_to_files.clone(),
};

// Remap row 0 -> row 5000 (outside the original [0, 1000) range so no collision).

Check warning on line 6291 in rust/lance-index/src/scalar/btree.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/btree.rs
// Querying for value == 0 should now return row 5000, confirming reconstruct threaded
// the FragReuseIndex through to the rebuilt BTreeIndex.
let frag_reuse_index = Arc::new(FragReuseIndex::new(
let frag_reuse_index: Arc<dyn crate::scalar::RowIdRemapper> = Arc::new(FragReuseIndex::new(
Uuid::new_v4(),
vec![HashMap::from([(0u64, Some(5000u64))])],
FragReuseIndexDetails { versions: vec![] },
Expand Down
7 changes: 3 additions & 4 deletions rust/lance-index/src/scalar/fmindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use lance_core::deepsize::DeepSizeOf;
use lance_core::{Error, ROW_ADDR, Result};
use roaring::RoaringBitmap;

use crate::frag_reuse::FragReuseIndex;
use crate::metrics::MetricsCollector;
use crate::pb;
use crate::scalar::expression::{ScalarQueryParser, TextQueryParser};
Expand All @@ -44,7 +43,7 @@ use crate::scalar::registry::{
};
use crate::scalar::{
AnyQuery, BuiltinIndexType, CreatedIndex, IndexFile, IndexStore, OldIndexDataFilter,
ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria,
RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria,
};
use crate::vector::VectorIndex;
use crate::{Index, IndexType};
Expand Down Expand Up @@ -1258,7 +1257,7 @@ impl FMIndexScalarIndex {

async fn load(
store: Arc<dyn IndexStore>,
_fri: Option<Arc<FragReuseIndex>>,
_fri: Option<Arc<dyn RowIdRemapper>>,
_cache: &LanceCache,
) -> Result<Arc<Self>> {
let files = store.list_files_with_sizes().await?;
Expand Down Expand Up @@ -1737,7 +1736,7 @@ impl ScalarIndexPlugin for FMIndexPlugin {
&self,
store: Arc<dyn IndexStore>,
details: &prost_types::Any,
fri: Option<Arc<FragReuseIndex>>,
fri: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
let _ = details
Expand Down
5 changes: 2 additions & 3 deletions rust/lance-index/src/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@
}

use lance_core::Error;

Check warning on line 114 in rust/lance-index/src/scalar/inverted.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/inverted.rs
use crate::pbold;
use crate::progress::IndexBuildProgress;
use crate::{
frag_reuse::FragReuseIndex,
scalar::{
CreatedIndex, ScalarIndex,
CreatedIndex, RowIdRemapper, ScalarIndex,
expression::{FtsQueryParser, ScalarQueryParser},
registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest},
},
Expand Down Expand Up @@ -289,7 +288,7 @@
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(
Expand Down
14 changes: 7 additions & 7 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ use super::{
builder::{InnerBuilder, PositionRecorder},
iter::CompressedPostingListIterator,
};
use crate::frag_reuse::FragReuseIndex;
use crate::pbold;
use crate::progress::IndexBuildProgress;
use crate::scalar::inverted::scorer::MemBM25Scorer;
use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer;
use crate::scalar::{
AnyQuery, BuiltinIndexType, CreatedIndex, IndexReader, IndexStore, MetricsCollector,
OldIndexDataFilter, ScalarIndex, ScalarIndexParams, SearchResult, TokenQuery, UpdateCriteria,
OldIndexDataFilter, RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, TokenQuery,
UpdateCriteria,
};
use crate::{FtsPrewarmOptions, Index};
use crate::{prefilter::PreFilter, scalar::inverted::iter::take_fst_keys};
Expand Down Expand Up @@ -821,7 +821,7 @@ impl InvertedIndex {

async fn load_legacy_index(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
log::warn!("loading legacy FTS index");
Expand Down Expand Up @@ -888,7 +888,7 @@ impl InvertedIndex {

pub async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>>
where
Expand Down Expand Up @@ -1257,7 +1257,7 @@ impl InvertedPartition {
pub async fn load(
store: Arc<dyn IndexStore>,
id: u64,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
token_set_format: TokenSetFormat,
) -> Result<Self> {
Expand Down Expand Up @@ -4698,7 +4698,7 @@ impl DocSet {
pub async fn load(
reader: Arc<dyn IndexReader>,
is_legacy: bool,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Self> {
let batch = reader.read_range(0..reader.num_rows(), None).await?;
let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
Expand Down Expand Up @@ -4730,7 +4730,7 @@ impl DocSet {
row_id_col: &UInt64Array,
num_tokens_col: &arrow_array::UInt32Array,
is_legacy: bool,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Self> {
// for legacy format, the row id is doc id; sorting keeps binary search viable
if is_legacy {
Expand Down
Loading
Loading