diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..69f5323c06e 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -514,6 +514,17 @@ message FragmentReuseIndexDetails { uint64 dataset_version = 1; repeated Group groups = 3; + + // Fragment IDs that some index covered but that were no longer present in + // the dataset when this version was committed, and were not part of any + // rewrite group (e.g. a fragment that was fully deleted before compaction, + // so it never entered a compaction task / Group.old_fragments). + // + // Such fragments are gone from the manifest, so their rows cannot be + // enumerated by address. Their stale index entries are pruned by fragment + // ID instead: the fragment-reuse index maps every row address whose + // fragment ID is in this list to "deleted" (None). + repeated uint32 removed_fragments = 4; } } diff --git a/rust/lance-table/src/system_index/frag_reuse.rs b/rust/lance-table/src/system_index/frag_reuse.rs index 40bbc4f58b6..c6ed7c414bf 100644 --- a/rust/lance-table/src/system_index/frag_reuse.rs +++ b/rust/lance-table/src/system_index/frag_reuse.rs @@ -7,6 +7,7 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array}; use lance_core::deepsize::{Context, DeepSizeOf}; +use lance_core::utils::address::RowAddress; use lance_core::{Error, Result}; use lance_select::RowAddrTreeMap; use roaring::{RoaringBitmap, RoaringTreemap}; @@ -105,6 +106,10 @@ impl TryFrom for FragReuseGroup { pub struct FragReuseVersion { pub dataset_version: u64, pub groups: Vec, + /// Fragments an index covered that were gone from the manifest at commit + /// time and not part of any rewrite group (e.g. fully deleted before + /// compaction). Pruned by ID since their rows can't be enumerated by address. + pub removed_frags: Vec, } impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { @@ -112,6 +117,7 @@ impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { Self { dataset_version: version.dataset_version, groups: version.groups.iter().map(|g| g.into()).collect(), + removed_fragments: version.removed_frags.clone(), } } } @@ -127,6 +133,7 @@ impl TryFrom for FragReuseVersion { .into_iter() .map(FragReuseGroup::try_from) .collect::>()?, + removed_frags: version.removed_fragments, }) } } @@ -194,6 +201,15 @@ impl FragReuseIndexDetails { .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)), ) } + + /// Union of every version's `removed_frags`. + pub fn removed_frag_bitmap(&self) -> RoaringBitmap { + RoaringBitmap::from_iter( + self.versions + .iter() + .flat_map(|v| v.removed_frags.iter().copied()), + ) + } } /// An index that stores row ID maps. @@ -204,6 +220,9 @@ pub struct FragReuseIndex { pub uuid: Uuid, pub row_id_maps: Vec>>, pub details: FragReuseIndexDetails, + /// Cache of `details.removed_frag_bitmap()`; rebuilt in `new`, so skipped. + #[serde(skip)] + removed_frags: RoaringBitmap, } impl DeepSizeOf for FragReuseIndex { @@ -218,10 +237,12 @@ impl FragReuseIndex { row_id_maps: Vec>>, details: FragReuseIndexDetails, ) -> Self { + let removed_frags = details.removed_frag_bitmap(); Self { uuid, row_id_maps, details, + removed_frags, } } @@ -236,6 +257,20 @@ impl FragReuseIndex { } } + // Prune rows whose final address lands in a fully-deleted fragment + // (tracked by ID in `removed_frags`). Checked after chaining, since an + // earlier compaction may have rewritten the row into a fragment that was + // later deleted. Fragment IDs are never reused, so a live row is never + // pruned. + if let Some(addr) = mapped_value + && !self.removed_frags.is_empty() + && self + .removed_frags + .contains(RowAddress::new_from_u64(addr).fragment_id()) + { + return None; + } + mapped_value } @@ -307,6 +342,13 @@ impl FragReuseIndex { } pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> { + // Orphaned fragments (fully deleted before compaction) are deliberately + // NOT removed from index coverage here. Their rows are already pruned + // from the index data by `remap_row_id`, and a fragment that no longer + // exists is masked out at query time by `effective_fragment_bitmap`'s + // intersection with the live fragments. Stripping a deleted fragment's + // coverage instead makes the planner treat its (now nonexistent) row + // range as an unindexed flat-scan fallback, which fails on `take`. for version in self.details.versions.iter() { for group in version.groups.iter() { let mut removed = 0; @@ -378,6 +420,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; let version2 = FragReuseVersion { @@ -402,6 +445,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; // Create FragReuseIndexDetails with versions in reverse order diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 87dda8e7e57..7d823ae54e8 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -114,6 +114,7 @@ use lance_core::datatypes::{BlobHandling, BlobKind}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS}; use lance_index::frag_reuse::FragReuseGroup; +use lance_index::is_system_index; use lance_table::format::{Fragment, RowIdMeta}; use roaring::{RoaringBitmap, RoaringTreemap}; use serde::{Deserialize, Serialize}; @@ -2027,7 +2028,36 @@ pub async fn commit_compaction( }; let frag_reuse_index = if options.defer_index_remap { - Some(build_new_frag_reuse_index(dataset, frag_reuse_groups, new_fragment_bitmap).await?) + // Fragments an index still covers but that are gone from the manifest + // and never entered a rewrite group (e.g. fully deleted before + // compaction) must be pruned by ID, else their stale entries resurface + // as dangling references. `dataset` is pre-commit here, so fragments + // about to be compacted are still present and not mistaken for removed. + let live_frags: RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + let mut covered = RoaringBitmap::new(); + for index in dataset.load_indices().await?.iter() { + if is_system_index(index) { + continue; + } + if let Some(bitmap) = &index.fragment_bitmap { + covered |= bitmap; + } + } + let removed_frags: Vec = (covered - live_frags).into_iter().collect(); + + Some( + build_new_frag_reuse_index( + dataset, + frag_reuse_groups, + removed_frags, + new_fragment_bitmap, + ) + .await?, + ) } else { None }; @@ -3279,6 +3309,357 @@ mod tests { assert_eq!(current_scalar_index.uuid, original_scalar_uuid); } + // ---- Shared helpers for deferred-remap deleted-fragment tests ---- + + /// Open the fragment-reuse index of a dataset that has one. + async fn open_dataset_fri(dataset: &Dataset) -> lance_index::frag_reuse::FragReuseIndex { + let meta = dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .expect("fragment reuse index must exist after deferred compaction"); + let details = load_frag_reuse_index_details(dataset, &meta).await.unwrap(); + open_frag_reuse_index(meta.uuid, details.as_ref()) + .await + .unwrap() + } + + /// Assert the FRI maps every probed address of `deleted_frags` to None and + /// keeps (maps to Some) the first address of each `live_frags`. + async fn assert_fri_prunes(dataset: &Dataset, deleted_frags: &[u32], live_frags: &[u32]) { + let fri = open_dataset_fri(dataset).await; + let mut leaked = Vec::new(); + for &f in deleted_frags { + for off in [0u32, 1, 50, 99] { + let addr = u64::from(RowAddress::new_from_parts(f, off)); + if let Some(mapped) = fri.remap_row_id(addr) { + leaked.push((f, off, mapped)); + } + } + } + assert!( + leaked.is_empty(), + "deleted fragments {deleted_frags:?} not pruned by the FRI; \ + remap_row_id returned Some for {leaked:?}", + ); + for &f in live_frags { + let addr = u64::from(RowAddress::new_from_parts(f, 0)); + assert!( + fri.remap_row_id(addr).is_some(), + "surviving fragment {f} was over-pruned by the FRI", + ); + } + } + + /// `n` fragments of `rows_per_frag` rows (i = 0..n*rpf) with a scalar BTREE + /// index on `i`. A cheap stand-in (no vector training) when only FRI-level + /// pruning is under test. + async fn dataset_with_btree(n: usize, rows_per_frag: usize) -> Dataset { + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + let mut dataset = Dataset::write( + data_gen.batch((n * rows_per_frag) as i32), + "memory://test/table", + Some(WriteParams { + max_rows_per_file: rows_per_frag, + ..Default::default() + }), + ) + .await + .unwrap(); + dataset + .create_index( + &["i"], + IndexType::BTree, + Some("i_idx".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + dataset + } + + /// 10 fragments x 1000 rows (i = 0..10_000) with a `vec` column and an + /// IVF_PQ index on it -- the faithful shape of the #7374 reproduction. + async fn dataset_with_ivf_pq() -> Dataset { + let mut data_gen = BatchGenerator::new() + .col(Box::new( + RandomVector::new().vec_width(128).named("vec".to_owned()), + )) + .col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + let mut dataset = Dataset::write( + data_gen.batch(10_000), + "memory://test/table", + Some(WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 10); + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some("vector".into()), + &VectorIndexParams::ivf_pq(10, 8, 8, MetricType::L2, 50), + false, + ) + .await + .unwrap(); + dataset + } + + /// Top-200 KNN projecting `i` (the projection forces a `take`, so a stale + /// row address would error). Asserts no result id falls in `ghost_range` + /// and at least one live row is returned. + async fn assert_knn_no_ghosts(dataset: &Dataset, ghost_range: std::ops::Range) { + let batch = dataset + .scan() + .nearest("vec", &Float32Array::from(vec![0.0f32; 128]), 200) + .unwrap() + .project(&["i"]) + .unwrap() + .try_into_batch() + .await + .expect("vector search after deferred compaction must succeed"); + let i_col = batch.column(0).as_primitive::(); + let ghosts: Vec = i_col + .values() + .iter() + .copied() + .filter(|v| ghost_range.contains(v)) + .collect(); + assert!( + ghosts.is_empty(), + "vector search returned ids from the deleted range {ghost_range:?}: {ghosts:?}", + ); + assert!(!i_col.is_empty(), "vector search returned no live rows"); + } + + // Regression test for https://github.com/lancedb/lance/issues/7374 + // "Vector index can become corrupted when compaction is deferred" + // A fully-deleted fragment is removed from the manifest at delete time + // (apply_deletions returns None for it), so deferred compaction never sees + // it and the inline remap that would normally prune it is skipped. The FRI + // must map every address of the fragment to None, both during the FRI window + // and after the physical remap + trim that closes it. + #[tokio::test] + async fn test_defer_index_remap_fully_deleted_fragment() { + let mut dataset = dataset_with_ivf_pq().await; + // Fully delete fragment 4 (i in [4000,5000)). + dataset.delete("i >= 4000 AND i < 5000").await.unwrap(); + + let metrics = compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 100_000, + materialize_deletions: true, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert!(metrics.fragments_removed > 0); + assert!(metrics.fragments_added > 0); + + // During the FRI window: fragment 4 pruned, survivor 0 kept, no ghosts. + assert_fri_prunes(&dataset, &[4], &[0]).await; + assert_knn_no_ghosts(&dataset, 4000..5000).await; + + // Close the window: physical remap + FRI trim. With the FRI gone nothing + // masks stale entries at load time, so this must stay correct too. + remapping::remap_column_index(&mut dataset, &["vec"], Some("vector".into())) + .await + .unwrap(); + cleanup_frag_reuse_index(&mut dataset).await.unwrap(); + assert_knn_no_ghosts(&dataset, 4000..5000).await; + } + + // A fragment that was first MERGED by an earlier deferred compaction and + // then fully deleted is orphaned at its *post-merge* fragment ID, not its + // original one. The FRI must prune by the final mapped address (after + // chaining through every version), otherwise stale index entries chain + // forward into the removed merge target and resurface as dangling refs. + #[tokio::test] + async fn test_defer_index_remap_deleted_merged_fragment() { + let mut dataset = dataset_with_ivf_pq().await; + + // Compaction 1 (deferred): bin the 10 fragments in pairs, so original + // fragments {0,1} merge into a single new fragment covering i[0,2000). + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert_eq!( + dataset.get_fragments().len(), + 5, + "compaction 1 should bin 10 fragments into 5 (pairs)" + ); + + // Fully delete the merged fragment covering i[0,2000) (= original frags + // 0 and 1). This removes that post-merge fragment from the manifest. + dataset.delete("i >= 0 AND i < 2000").await.unwrap(); + assert_eq!( + dataset.get_fragments().len(), + 4, + "deleting i[0,2000) should fully remove exactly one merged fragment" + ); + + // Compaction 2 (deferred): merge the remaining 4 fragments into 1. Its + // orphaned-fragment computation must flag the merge target deleted above. + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 100_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + // Original fragments 0 and 1 were merged (comp 1) into the fragment that + // was then deleted, so their addresses chain forward to the removed merge + // target and must map to None. Fragment 2 survived both compactions and + // must still remap (not be over-pruned). + assert_fri_prunes(&dataset, &[0, 1], &[2]).await; + assert_knn_no_ghosts(&dataset, 0..2000).await; + } + + // Several whole-fragment deletions before one deferred compaction: every + // deleted fragment must be tracked, since `removed_frags` is a union. + #[tokio::test] + async fn test_defer_index_remap_multiple_fully_deleted_fragments() { + let mut dataset = dataset_with_btree(10, 100).await; + assert_eq!(dataset.get_fragments().len(), 10); + + // Fully delete fragments 2 (i[200,300)) and 7 (i[700,800)). + dataset + .delete("(i >= 200 AND i < 300) OR (i >= 700 AND i < 800)") + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 8); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1_000_000, + materialize_deletions: true, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_fri_prunes(&dataset, &[2, 7], &[0, 9]).await; + } + + // A fragment left untouched by an earlier deferred compaction and then fully + // deleted is orphaned at its ORIGINAL id (it never entered a group, so it + // reaches `remap_row_id` by pass-through, not by chaining). It must still be + // pruned. + #[tokio::test] + async fn test_defer_index_remap_survivor_deleted_across_versions() { + let mut dataset = dataset_with_btree(5, 100).await; + + // Compaction 1: target 200 bins {0,1} and {2,3}; fragment 4 is left alone. + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 200, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert_eq!( + dataset.get_fragments().len(), + 3, + "compaction 1 should leave the trailing fragment 4 untouched" + ); + + // Fully delete the surviving original fragment 4 (i[400,500)). + dataset.delete("i >= 400 AND i < 500").await.unwrap(); + assert_eq!(dataset.get_fragments().len(), 2); + + // Compaction 2 merges the two remaining (merged) fragments. + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1_000_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + assert_fri_prunes(&dataset, &[4], &[0]).await; + } + + // Reverse ordering: deferred compaction FIRST, then a delete during the FRI + // window. The delete lands on the new merged fragment; its deletion vector + // must be honored on read so deleted rows don't resurface. + #[tokio::test] + async fn test_defer_index_remap_then_delete_during_window() { + let mut dataset = dataset_with_btree(6, 100).await; // 600 rows + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 1_000_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert_eq!( + dataset.get_fragments().len(), + 1, + "all fragments merge into one" + ); + assert!( + dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .is_some(), + "deferred compaction must leave a fragment-reuse index" + ); + + // Delete a slice of the merged fragment within the FRI window. + dataset.delete("i >= 100 AND i < 200").await.unwrap(); + assert_eq!(dataset.count_rows(None).await.unwrap(), 500); + assert_eq!( + dataset + .count_rows(Some("i >= 100 AND i < 200".to_owned())) + .await + .unwrap(), + 0, + "rows deleted during the FRI window resurfaced" + ); + } + #[tokio::test] async fn test_defer_index_remap_multiple_compactions() { let mut data_gen = BatchGenerator::new() diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index 23a8fec5145..2ab333c2793 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -95,11 +95,13 @@ pub(crate) async fn open_frag_reuse_index( pub(crate) async fn build_new_frag_reuse_index( dataset: &mut Dataset, frag_reuse_groups: Vec, + removed_frags: Vec, new_fragment_bitmap: RoaringBitmap, ) -> lance_core::Result { let new_version = FragReuseVersion { dataset_version: dataset.manifest.version, groups: frag_reuse_groups, + removed_frags, }; let index_meta = dataset.load_indices().await.map(|indices| {