From d8a20362f61ae6df4ef64aa3ddf87ca9760adb56 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Thu, 18 Jun 2026 05:54:22 -0700 Subject: [PATCH 1/4] fix(index): drop stale scalar index entries after stable-row-id update Under stable row ids an update deletes a row's old copy and rewrites it to a new fragment under the same row id. optimize_indices kept the old value->row_id entry, so queries for the old value returned the updated row and BTree optimize errored ("from_sorted_iter called with non-sorted input"). - build_stable_row_id_filter now subtracts each fragment's deletion vector so the old-row allow-list holds only live rows (fixes BTree). - BitmapIndex::update applies that filter to old postings via OldIndexDataFilter::retain_old_rows. - optimize routes FTS through InvertedIndex::merge_segments (which filters old partitions) instead of the reference-only update path. Adds a regression test covering all three index types. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance-index/src/scalar.rs | 11 ++ rust/lance-index/src/scalar/bitmap.rs | 56 ++++++- rust/lance/src/index/append.rs | 227 +++++++++++++++++++++++++- 3 files changed, 281 insertions(+), 13 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index a287d277a81..3a6834129b3 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -960,6 +960,17 @@ impl OldIndexDataFilter { .collect(), } } + + /// Apply this filter in place to a set of existing (old) row ids/addresses, + /// retaining only the rows the filter selects to keep. Used by index types + /// that merge old postings directly (e.g. bitmap) instead of re-scanning a + /// row-id array through [`Self::filter_row_ids`]. + pub fn retain_old_rows(&self, rows: &mut RowAddrTreeMap) { + match self { + Self::Fragments { to_keep, .. } => rows.retain_fragments(to_keep.iter()), + Self::RowIds(valid_row_ids) => *rows &= valid_row_ids, + } + } } impl UpdateCriteria { diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 8729aadbca2..23b300a2d73 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -806,13 +806,14 @@ impl ScalarIndex for BitmapIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, - _old_data_filter: Option, + old_data_filter: Option, ) -> Result { let file = BitmapIndexPlugin::streaming_build_and_write( new_data, Some(self), dest_store, BITMAP_LOOKUP_NAME, + old_data_filter.as_ref(), ) .await?; @@ -1191,6 +1192,19 @@ async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[Strin #[derive(Debug, Default)] pub struct BitmapIndexPlugin; +/// Drop the rows an old posting should no longer expose -- rows whose fragment +/// was removed, or (under stable row ids) rows rewritten by an update -- keeping +/// only those `filter` still considers valid. A no-op when `filter` is `None`. +fn retain_valid( + mut bitmap: RowAddrTreeMap, + filter: Option<&super::OldIndexDataFilter>, +) -> RowAddrTreeMap { + if let Some(filter) = filter { + filter.retain_old_rows(&mut bitmap); + } + bitmap +} + impl BitmapIndexPlugin { fn get_batch_from_arrays( keys: Arc, @@ -1322,7 +1336,7 @@ impl BitmapIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, ) -> Result { - Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await + Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME, None).await } async fn train_bitmap_shard( @@ -1337,7 +1351,8 @@ impl BitmapIndexPlugin { progress .stage_start("build_bitmap_shard", None, "rows") .await?; - let file = Self::streaming_build_and_write(data, None, index_store, &file_name).await?; + let file = + Self::streaming_build_and_write(data, None, index_store, &file_name, None).await?; progress.stage_complete("build_bitmap_shard").await?; Ok(file) } @@ -1354,6 +1369,7 @@ impl BitmapIndexPlugin { old_index: Option<&BitmapIndex>, index_store: &dyn IndexStore, output_file_name: &str, + old_data_filter: Option<&super::OldIndexDataFilter>, ) -> Result { let value_type = data_source.schema().field(0).data_type().clone(); @@ -1400,6 +1416,7 @@ impl BitmapIndexPlugin { &mut old_pos, &mut emitted_null, &mut writer, + old_data_filter, ) .await?; } @@ -1422,6 +1439,7 @@ impl BitmapIndexPlugin { &mut old_pos, &mut emitted_null, &mut writer, + old_data_filter, ) .await?; } @@ -1429,7 +1447,13 @@ impl BitmapIndexPlugin { // Emit any remaining old-only entries. if let Some(idx) = old_index { while old_pos < old_keys.len() { - let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?; + let old_bitmap = retain_valid( + idx.load_bitmap(&old_keys[old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); writer .emit(old_keys[old_pos].0.clone(), &old_bitmap) .await?; @@ -1444,7 +1468,8 @@ impl BitmapIndexPlugin { { let null_key = new_null_array(&value_type, 1); let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?; - writer.emit(null_key, &idx.null_map).await?; + let null_bitmap = retain_valid((*idx.null_map).clone(), old_data_filter); + writer.emit(null_key, &null_bitmap).await?; } writer.finish().await @@ -1453,6 +1478,7 @@ impl BitmapIndexPlugin { /// Flush a completed value-run from the new data stream, emitting any /// old-only entries that sort before it and merging the old bitmap if the /// key exists in both old and new. + #[allow(clippy::too_many_arguments)] async fn finish_run( key: ScalarValue, bitmap: &mut RowAddrTreeMap, @@ -1461,13 +1487,14 @@ impl BitmapIndexPlugin { old_pos: &mut usize, emitted_null: &mut bool, writer: &mut BitmapBatchWriter, + old_data_filter: Option<&super::OldIndexDataFilter>, ) -> Result<()> { if key.is_null() { // Null values are stored separately in the old index's null_map. if let Some(idx) = old_index && !idx.null_map.is_empty() { - *bitmap |= &*idx.null_map; + *bitmap |= &retain_valid((*idx.null_map).clone(), old_data_filter); } *emitted_null = true; writer.emit(key, bitmap).await?; @@ -1476,7 +1503,13 @@ impl BitmapIndexPlugin { // Emit old-only entries that sort before this key. while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable { - let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?; + let old_bitmap = retain_valid( + idx.load_bitmap(&old_keys[*old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); writer .emit(old_keys[*old_pos].0.clone(), &old_bitmap) .await?; @@ -1485,8 +1518,13 @@ impl BitmapIndexPlugin { // If the old index also has this key, merge its bitmap. if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable { - let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?; - *bitmap |= &*old_bitmap; + *bitmap |= &retain_valid( + idx.load_bitmap(&old_keys[*old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); *old_pos += 1; } diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 99ff7bebe43..6e91868eb11 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -65,16 +65,89 @@ async fn build_stable_row_id_filter( .try_collect::>() .await?; - let row_id_maps = row_id_sequences - .iter() - .map(|(_, seq)| RowAddrTreeMap::from(seq.as_ref())) - .collect::>(); + let frag_by_id: std::collections::HashMap = dataset + .get_fragments() + .into_iter() + .map(|f| (f.id() as u32, f)) + .collect(); + + let mut row_id_maps = Vec::with_capacity(row_id_sequences.len()); + for (frag_id, seq) in &row_id_sequences { + row_id_maps.push(live_row_ids(frag_by_id.get(frag_id), seq).await?); + } let row_id_map_refs = row_id_maps.iter().collect::>(); // Merge all fragment-local row-id sets into one exact membership structure. Ok(::union_all(&row_id_map_refs)) } +/// The fragment's live row ids: its persisted row-id sequence minus the rows +/// its deletion vector marks gone. A persisted sequence covers every row the +/// fragment ever held, so a row whose old copy was deleted (e.g. rewritten by an +/// update under the same stable row id) would otherwise be retained as a stale +/// old-index entry. +async fn live_row_ids( + fragment: Option<&crate::dataset::fragment::FileFragment>, + seq: &lance_table::rowids::RowIdSequence, +) -> Result { + let deletion_vector = match fragment { + Some(f) if f.metadata().deletion_file.is_some() => { + f.get_deletion_vector().await.ok().flatten() + } + _ => None, + }; + Ok(match deletion_vector { + Some(dv) => seq + .iter() + .enumerate() + .filter(|(offset, _)| !dv.contains(*offset as u32)) + .map(|(_, row_id)| row_id) + .collect(), + None => RowAddrTreeMap::from(seq), + }) +} + +/// Open the selected inverted (FTS) segments and merge `new_data` into them +/// through the segment-merge primitive, which materializes each old partition +/// and applies `old_data_filter` (dropping stale rows -- e.g. updated rows under +/// stable row ids). The fast `ScalarIndex::update` path only references old +/// partitions by id and cannot honor a row-level `RowIds` filter, so it must not +/// be used when old rows need to be removed. +async fn open_and_merge_inverted_segments( + dataset: &Dataset, + field_path: &str, + segments: &[&IndexMetadata], + new_data: datafusion::execution::SendableRecordBatchStream, + new_store: &LanceIndexStore, + old_data_filter: Option, +) -> Result { + let mut source_indices = Vec::with_capacity(segments.len()); + for &segment in segments { + let scalar_index = dataset + .open_scalar_index(field_path, &segment.uuid, &NoOpMetricsCollector) + .await?; + let inverted = scalar_index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index(format!( + "Inverted merge: expected inverted segment {}, got {:?}", + segment.uuid, + scalar_index.index_type() + )) + })?; + source_indices.push(Arc::new(inverted.clone())); + } + InvertedIndex::merge_segments( + &source_indices, + new_data, + new_store, + old_data_filter, + Arc::new(NoopIndexBuildProgress), + ) + .await +} + /// Build the [`OldIndexDataFilter`] that must be applied to existing index /// rows when their owning fragments have been pruned by compaction or /// deletions. @@ -295,6 +368,17 @@ async fn merge_scalar_indices<'a>( ) .await? } + IndexType::Inverted => { + open_and_merge_inverted_segments( + dataset.as_ref(), + field_path, + selected_old_indices, + new_data_stream, + &new_store, + old_data_filter, + ) + .await? + } _ => { let old_data_filter = build_old_data_filter( dataset.as_ref(), @@ -1922,6 +2006,141 @@ mod tests { assert_eq!(query_id_count(&dataset, "song-42").await, 1); } + /// Under stable row ids, updating an indexed column and then calling + /// `optimize_indices` must not leave stale entries (old value -> updated row) + /// in the scalar index. An update deletes the old copy of each row and + /// rewrites it under the same stable row id, so the old index entry is stale + /// and must be dropped on merge. Covers BTree, Bitmap, and Inverted (FTS), + /// which take three different merge paths. + #[tokio::test] + async fn test_optimize_scalar_index_drops_stale_rows_after_update() { + use crate::dataset::UpdateBuilder; + use arrow_array::Int32Array; + use lance_index::scalar::FullTextSearchQuery; + use lance_index::scalar::inverted::InvertedIndexParams; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + // 100 rows: num == id; cat = "A" for id<50 else "B"; body = "alpha" for + // id<50 else "beta". + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("num", DataType::Int32, false), + Field::new("cat", DataType::Utf8, false), + Field::new("body", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(StringArray::from_iter_values( + (0..100).map(|i| if i < 50 { "A" } else { "B" }), + )), + Arc::new(StringArray::from_iter_values( + (0..100).map(|i| if i < 50 { "alpha" } else { "beta" }), + )), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + dataset + .create_index( + &["num"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["cat"], + IndexType::Bitmap, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["body"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Update the first 25 rows (id < 25): num -> -1, cat -> 'B', body -> 'beta'. + let res = UpdateBuilder::new(Arc::new(dataset.clone())) + .update_where("id < 25") + .unwrap() + .set("num", "-1") + .unwrap() + .set("cat", "'B'") + .unwrap() + .set("body", "'beta'") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + dataset = res.new_dataset.as_ref().clone(); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + // BTree: `num >= 0` matches ids 25..99 (75 rows); the 25 updated rows + // hold num = -1 and must not appear. + let btree_count = dataset + .scan() + .filter("num >= 0") + .unwrap() + .count_rows() + .await + .unwrap(); + assert_eq!(btree_count, 75, "btree returned stale/incorrect rows"); + + // Bitmap: only the 25 rows (ids 25..49) that still carry cat = 'A' match; + // the 25 rows updated to 'B' must not. + let bitmap_count = dataset + .scan() + .filter("cat = 'A'") + .unwrap() + .count_rows() + .await + .unwrap(); + assert_eq!(bitmap_count, 25, "bitmap returned stale rows"); + + // FTS: only the 25 rows (ids 25..49) whose body still reads "alpha" match; + // the 25 rows updated to "beta" must not. + let mut scan = dataset.scan(); + scan.full_text_search(FullTextSearchQuery::new("alpha".to_owned())) + .unwrap(); + let fts_count = scan.count_rows().await.unwrap(); + assert_eq!(fts_count, 25, "FTS index returned stale rows"); + } + #[tokio::test] async fn test_optimize_scalar_no_unindexed_fragments() { let test_dir = TempStrDir::default(); From 7377e0e70885713241c507f8713a33dad7dbf048 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 22 Jun 2026 15:25:47 -0700 Subject: [PATCH 2/4] fix(index): propagate deletion-vector read errors during optimize live_row_ids swallowed deletion-vector read failures via `.ok().flatten()`, falling back to the "no deletions" branch and putting the deleted rows back into the stable-row-id allow-list as stale entries. Propagate with `?` instead, and add a regression test covering an unreadable deletion file. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/index/append.rs | 96 ++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 6e91868eb11..42140562e04 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -90,10 +90,11 @@ async fn live_row_ids( fragment: Option<&crate::dataset::fragment::FileFragment>, seq: &lance_table::rowids::RowIdSequence, ) -> Result { + // Propagate a deletion-vector read failure rather than swallowing it: a + // swallowed error would fall through to the "no deletions" branch below, + // putting the deleted rows back into the allow-list as stale entries. let deletion_vector = match fragment { - Some(f) if f.metadata().deletion_file.is_some() => { - f.get_deletion_vector().await.ok().flatten() - } + Some(f) if f.metadata().deletion_file.is_some() => f.get_deletion_vector().await?, _ => None, }; Ok(match deletion_vector { @@ -2141,6 +2142,95 @@ mod tests { assert_eq!(fts_count, 25, "FTS index returned stale rows"); } + /// `optimize_indices` builds the stable-row-id allow-list by subtracting each + /// fragment's deletion vector. If a deletion vector cannot be read, the merge + /// must fail loudly: swallowing the error (treating the load as "no + /// deletions") would put every deleted row back into the allow-list and + /// silently reintroduce the stale entries this fix removes. Simulate an + /// unreadable deletion vector by deleting the file the manifest still + /// references, then assert optimize errors instead of succeeding. + #[tokio::test] + async fn test_optimize_errors_when_deletion_vector_unreadable() { + use crate::dataset::UpdateBuilder; + use arrow_array::Int32Array; + use lance_table::io::deletion::deletion_file_path; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("num", DataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(Int32Array::from_iter_values(0..100)), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + dataset + .create_index( + &["num"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Update rewrites the first 25 rows under the same stable row ids, + // leaving a deletion vector on the original fragment. + UpdateBuilder::new(Arc::new(dataset.clone())) + .update_where("id < 25") + .unwrap() + .set("num", "-1") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + + // Reload cold (nothing has cached the deletion vector), then remove the + // deletion file the manifest still references so the next read fails. + let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + let mut removed = 0; + for fragment in dataset.get_fragments() { + if let Some(deletion_file) = fragment.metadata().deletion_file.clone() { + let path = + deletion_file_path(&dataset.base, fragment.metadata().id, &deletion_file); + dataset.object_store.delete(&path).await.unwrap(); + removed += 1; + } + } + assert_eq!( + removed, 1, + "update should have left exactly one deletion file" + ); + + let result = dataset.optimize_indices(&OptimizeOptions::default()).await; + assert!( + result.is_err(), + "optimize must fail when a deletion vector cannot be read, not \ + silently keep the deleted rows in the index" + ); + } + #[tokio::test] async fn test_optimize_scalar_no_unindexed_fragments() { let test_dir = TempStrDir::default(); From 54e0b47f112f9d8753289a3bee5f3df93537a2b4 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 22 Jun 2026 15:25:47 -0700 Subject: [PATCH 3/4] refactor(index): remove dead Inverted arm from merge_scalar_indices IndexType::Inverted is handled by a dedicated arm in merge_indices_with_unindexed_frags (#6737) and never reaches merge_scalar_indices, so its arm here and the open_and_merge_inverted_segments helper were unreachable. Remove them; no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/index/append.rs | 55 ++-------------------------------- 1 file changed, 3 insertions(+), 52 deletions(-) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 42140562e04..8ff113b67b2 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -108,47 +108,6 @@ async fn live_row_ids( }) } -/// Open the selected inverted (FTS) segments and merge `new_data` into them -/// through the segment-merge primitive, which materializes each old partition -/// and applies `old_data_filter` (dropping stale rows -- e.g. updated rows under -/// stable row ids). The fast `ScalarIndex::update` path only references old -/// partitions by id and cannot honor a row-level `RowIds` filter, so it must not -/// be used when old rows need to be removed. -async fn open_and_merge_inverted_segments( - dataset: &Dataset, - field_path: &str, - segments: &[&IndexMetadata], - new_data: datafusion::execution::SendableRecordBatchStream, - new_store: &LanceIndexStore, - old_data_filter: Option, -) -> Result { - let mut source_indices = Vec::with_capacity(segments.len()); - for &segment in segments { - let scalar_index = dataset - .open_scalar_index(field_path, &segment.uuid, &NoOpMetricsCollector) - .await?; - let inverted = scalar_index - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::index(format!( - "Inverted merge: expected inverted segment {}, got {:?}", - segment.uuid, - scalar_index.index_type() - )) - })?; - source_indices.push(Arc::new(inverted.clone())); - } - InvertedIndex::merge_segments( - &source_indices, - new_data, - new_store, - old_data_filter, - Arc::new(NoopIndexBuildProgress), - ) - .await -} - /// Build the [`OldIndexDataFilter`] that must be applied to existing index /// rows when their owning fragments have been pruned by compaction or /// deletions. @@ -369,17 +328,9 @@ async fn merge_scalar_indices<'a>( ) .await? } - IndexType::Inverted => { - open_and_merge_inverted_segments( - dataset.as_ref(), - field_path, - selected_old_indices, - new_data_stream, - &new_store, - old_data_filter, - ) - .await? - } + // NOTE: IndexType::Inverted never reaches here -- it is handled by the + // dedicated arm in merge_indices_with_unindexed_frags before this + // function is called. _ => { let old_data_filter = build_old_data_filter( dataset.as_ref(), From 56aba6ef13b6becb1d1a0fcaac453f828230426d Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 23 Jun 2026 07:24:54 -0700 Subject: [PATCH 4/4] fix(index): drop stale rows from unmerged segments on optimize Default optimize merges only the tail segment, so under stable row ids an update to a row in an older, unmerged segment left that segment's stale postings in place. Also select any older segment still covering a deleted-from fragment, so only the affected segments are rewritten -- an edit to old data no longer forces a full reindex. Add multi-segment regression tests for BTree and FTS. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/index/append.rs | 265 ++++++++++++++++++++++++++++++--- 1 file changed, 247 insertions(+), 18 deletions(-) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 8ff113b67b2..d3ecde030c1 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -231,6 +231,58 @@ async fn rebuild_scalar_segment( .await } +/// The index segments to rewrite in this optimize pass. +/// +/// Normally the trailing `num_indices_to_merge` segments. Under stable row ids, +/// any *older* segment that still covers a fragment carrying deletions is added +/// too: an update deletes a row's old copy (leaving a deletion vector) and +/// rewrites it under the same row id, so its stale old-value postings survive +/// until that segment is rewritten and filtered. Only the segments that actually +/// cover a deleted-from fragment are pulled in -- clean segments in between are +/// left untouched -- so an edit to old data does not force a full reindex. +/// +/// The deletion check is conservative (any current deletion vector on a covered +/// fragment), so a segment built after those deletions may be rewritten as a +/// harmless no-op; it never leaves a stale segment behind (PR #7359). +fn select_segments_to_merge<'a>( + dataset: &Dataset, + old_indices: &[&'a IndexMetadata], + options: &OptimizeOptions, +) -> Vec<&'a IndexMetadata> { + let num_to_merge = options + .num_indices_to_merge + .unwrap_or(1) + .min(old_indices.len()); + let tail_start = old_indices.len() - num_to_merge; + + // Address-style row ids mask stale postings at search time, and append mode + // (num_to_merge == 0) defers cleanup to a real merge; both keep the plain tail. + if num_to_merge == 0 || !dataset.manifest.uses_stable_row_ids() { + return old_indices[tail_start..].to_vec(); + } + + let deleted_frags: RoaringBitmap = dataset + .get_fragments() + .iter() + .filter(|f| f.metadata().deletion_file.is_some()) + .map(|f| f.id() as u32) + .collect(); + if deleted_frags.is_empty() { + return old_indices[tail_start..].to_vec(); + } + + let mut selected = Vec::new(); + for (i, idx) in old_indices.iter().enumerate() { + let covers_deleted = idx + .effective_fragment_bitmap(&dataset.fragment_bitmap) + .is_some_and(|eff| !eff.is_disjoint(&deleted_frags)); + if i >= tail_start || covers_deleted { + selected.push(*idx); + } + } + selected +} + #[allow(clippy::too_many_arguments)] async fn merge_scalar_indices<'a>( dataset: Arc, @@ -248,18 +300,13 @@ async fn merge_scalar_indices<'a>( )); } - let num_to_merge = options - .num_indices_to_merge - .unwrap_or(1) - .min(old_indices.len()); + let selected_old_indices = select_segments_to_merge(dataset.as_ref(), old_indices, options); // No new data + ≤1 old selected = rewriting one segment to itself. - if unindexed.is_empty() && num_to_merge <= 1 { + if unindexed.is_empty() && selected_old_indices.len() <= 1 { return Ok(None); } - let selected_old_indices = &old_indices[old_indices.len() - num_to_merge..]; - // For the delta case (`selected` empty) the reference is purely // for reading params; fall back to the last old index then. let reference_idx = selected_old_indices @@ -317,11 +364,11 @@ async fn merge_scalar_indices<'a>( match index_type { IndexType::BTree => { let (_, old_data_filters) = - build_per_segment_filters(dataset.as_ref(), selected_old_indices).await?; + build_per_segment_filters(dataset.as_ref(), &selected_old_indices).await?; crate::index::scalar::btree::open_and_merge_segments( dataset.as_ref(), field_path, - selected_old_indices, + &selected_old_indices, new_data_stream, &new_store, &old_data_filters, @@ -645,16 +692,11 @@ pub async fn merge_indices_with_unindexed_frags<'a>( let index_type = indices[0].index_type(); match index_type { IndexType::Inverted => { - let num_to_merge = options - .num_indices_to_merge - .unwrap_or(1) - .min(old_indices.len()); - if unindexed.is_empty() && num_to_merge <= 1 { + let selected_old_indices = + select_segments_to_merge(dataset.as_ref(), old_indices, options); + if unindexed.is_empty() && selected_old_indices.len() <= 1 { return Ok(None); } - - let selected_start = old_indices.len().saturating_sub(num_to_merge); - let selected_old_indices = &old_indices[selected_start..]; let reference_idx = selected_old_indices .first() .copied() @@ -710,7 +752,7 @@ pub async fn merge_indices_with_unindexed_frags<'a>( let mut frag_bitmap = base_unindexed_bitmap; let mut effective_old_frags = RoaringBitmap::new(); let mut selected_indices = Vec::with_capacity(selected_old_indices.len()); - for idx in selected_old_indices { + for idx in &selected_old_indices { if let Some(effective) = idx.effective_fragment_bitmap(&dataset.fragment_bitmap) { frag_bitmap |= &effective; @@ -2093,6 +2135,193 @@ mod tests { assert_eq!(fts_count, 25, "FTS index returned stale rows"); } + /// Multi-segment variant (Jack Ye's repro, PR #7359): with one BTree segment + /// per fragment, default optimize merges only the tail segment. A stable-row-id + /// update to a row in an older segment's fragment must still drop that + /// segment's stale postings -- the merge has to reach back to cover it. + #[tokio::test] + async fn test_optimize_btree_drops_stale_rows_across_segments_after_update() { + use crate::dataset::UpdateBuilder; + use crate::index::CreateIndexBuilder; + use arrow_array::Int32Array; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("num", DataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(Int32Array::from_iter_values(0..100)), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + // Two fragments (0..49, 50..99) -> one BTree segment each. + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + max_rows_per_file: 50, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree); + let fragments = dataset.get_fragments(); + let mut segments = Vec::new(); + for fragment in &fragments { + segments.push( + CreateIndexBuilder::new(&mut dataset, &["num"], IndexType::BTree, ¶ms) + .name("num_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("num_idx", "num", segments) + .await + .unwrap(); + + // Update the first 25 rows (in the first/older segment's fragment). + let res = UpdateBuilder::new(Arc::new(dataset.clone())) + .update_where("id < 25") + .unwrap() + .set("num", "-1") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + dataset = res.new_dataset.as_ref().clone(); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + assert_eq!( + dataset + .scan() + .filter("num = 0") + .unwrap() + .count_rows() + .await + .unwrap(), + 0, + "stale entry leaked from the older, unmerged segment" + ); + assert_eq!( + dataset + .scan() + .filter("num >= 0") + .unwrap() + .count_rows() + .await + .unwrap(), + 75 + ); + } + + /// Same multi-segment gap for FTS, which takes the separate Inverted dispatch + /// path. One Inverted segment per fragment; an update to the older segment's + /// fragment must not leave its old-token postings searchable. + #[tokio::test] + async fn test_optimize_fts_drops_stale_rows_across_segments_after_update() { + use crate::dataset::UpdateBuilder; + use crate::index::CreateIndexBuilder; + use arrow_array::Int32Array; + use lance_index::scalar::FullTextSearchQuery; + use lance_index::scalar::inverted::InvertedIndexParams; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("body", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(StringArray::from_iter_values( + (0..100).map(|i| if i < 50 { "alpha" } else { "beta" }), + )), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + max_rows_per_file: 50, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = InvertedIndexParams::default(); + let fragments = dataset.get_fragments(); + let mut segments = Vec::new(); + for fragment in &fragments { + segments.push( + CreateIndexBuilder::new(&mut dataset, &["body"], IndexType::Inverted, ¶ms) + .name("body_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("body_idx", "body", segments) + .await + .unwrap(); + + // Update the first 25 rows (older segment's fragment): body -> "beta". + let res = UpdateBuilder::new(Arc::new(dataset.clone())) + .update_where("id < 25") + .unwrap() + .set("body", "'beta'") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + dataset = res.new_dataset.as_ref().clone(); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + let mut scan = dataset.scan(); + scan.full_text_search(FullTextSearchQuery::new("alpha".to_owned())) + .unwrap(); + assert_eq!( + scan.count_rows().await.unwrap(), + 25, + "FTS stale rows leaked from the older, unmerged segment" + ); + } + /// `optimize_indices` builds the stable-row-id allow-list by subtracting each /// fragment's deletion vector. If a deletion vector cannot be read, the merge /// must fail loudly: swallowing the error (treating the load as "no