diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 87dda8e7e57..a0bc5986aac 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -7316,4 +7316,93 @@ mod tests { ] ); } + + // Compaction should materialize a fully-deleted fragment: drop it from both + // the index's data and its coverage, so searches stay on the fast path + // instead of masking a fragment that no longer exists. + #[tokio::test] + async fn test_compaction_materializes_deleted_fragment() { + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + // 10 fragments x 100 rows, i = 0..1000. Fragment 4 holds i in [400,500). + let mut dataset = Dataset::write( + data_gen.batch(1_000), + "memory://test/table", + Some(WriteParams { + max_rows_per_file: 100, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 10); + + dataset + .create_index( + &["i"], + IndexType::BTree, + Some("i_idx".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Fully delete fragment 4 (removed from the manifest, never compacted). + dataset.delete("i >= 400 AND i < 500").await.unwrap(); + assert_eq!(dataset.get_fragments().len(), 9); + + // Inline compaction (not deferred): indexes are remapped immediately. + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 100_000, + materialize_deletions: true, + defer_index_remap: false, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + // Coverage: the deleted fragment 4 must be gone, leaving exactly the + // live fragments (without the fix, fragment 4 lingers in the bitmap). + let index = dataset + .load_index_by_name("i_idx") + .await + .unwrap() + .expect("index must exist"); + let bitmap = index + .fragment_bitmap + .expect("index must store a fragment bitmap"); + assert!( + !bitmap.contains(4), + "deleted fragment 4 still covered by the index: {bitmap:?}" + ); + let live: roaring::RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + assert_eq!( + bitmap, live, + "index coverage should equal the live fragments" + ); + + // Data: an indexed filtered scan resolves row addresses and takes. If the + // deleted fragment's rows survived in the index data while its coverage + // was trimmed, this take would error on a missing fragment. + let batch = dataset + .scan() + .filter("i >= 400 AND i < 500") + .unwrap() + .project(&["i"]) + .unwrap() + .try_into_batch() + .await + .expect("indexed scan over the deleted range must succeed"); + assert_eq!(batch.num_rows(), 0, "deleted fragment rows resurfaced"); + assert_eq!(dataset.count_rows(None).await.unwrap(), 900); + } } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4555cd7ee6c..7548df83843 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -2070,17 +2070,27 @@ impl Transaction { next_row_id.as_ref(), )?; + let live_fragments: RoaringBitmap = + final_fragments.iter().map(|f| f.id as u32).collect(); if next_row_id.is_some() { // We can re-use indices, but need to rewrite the fragment bitmaps debug_assert!(rewritten_indices.is_empty()); for index in final_indices.iter_mut() { if let Some(fragment_bitmap) = &mut index.fragment_bitmap { - *fragment_bitmap = - Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?; + *fragment_bitmap = Self::recalculate_fragment_bitmap( + fragment_bitmap, + groups, + &live_fragments, + )?; } } } else { - Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?; + Self::handle_rewrite_indices( + &mut final_indices, + rewritten_indices, + groups, + &live_fragments, + )?; } if let Some(frag_reuse_index) = frag_reuse_index { @@ -2744,6 +2754,7 @@ impl Transaction { fn recalculate_fragment_bitmap( old: &RoaringBitmap, groups: &[RewriteGroup], + live_fragments: &RoaringBitmap, ) -> Result { let mut new_bitmap = old.clone(); for group in groups { @@ -2772,6 +2783,12 @@ impl Transaction { } } } + // Drop fragments that are no longer in the dataset (fully deleted before + // this compaction). They were never part of a rewrite group, so the loop + // above leaves them in coverage; trimming them here keeps index searches + // on the fast path. The corresponding index data is dropped during the + // remap (see index::deleted_fragment_row_id_drops). + new_bitmap &= live_fragments; Ok(new_bitmap) } @@ -2779,6 +2796,7 @@ impl Transaction { indices: &mut [IndexMetadata], rewritten_indices: &[RewrittenIndex], groups: &[RewriteGroup], + live_fragments: &RoaringBitmap, ) -> Result<()> { let mut modified_indices = HashSet::new(); @@ -2806,6 +2824,7 @@ impl Transaction { )) })?, groups, + live_fragments, )?); index.uuid = rewritten_index.new_id; // Update file sizes to match the new index files. When not available @@ -4816,7 +4835,12 @@ mod tests { }]; // Should succeed (skip missing index) instead of error - let result = Transaction::handle_rewrite_indices(&mut indices, &rewritten_indices, &[]); + let result = Transaction::handle_rewrite_indices( + &mut indices, + &rewritten_indices, + &[], + &RoaringBitmap::new(), + ); assert!(result.is_ok()); assert!(indices.is_empty()); } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 1a3a3aa54ec..01cdb878675 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -474,6 +474,46 @@ pub trait IndexBuilder { async fn build(&self) -> Result<()>; } +/// `addr -> None` drops for every row of a fragment this index still covers +/// that is no longer in the dataset (fully deleted before compaction). The +/// fragments are gone from the current manifest, so their rows are enumerated +/// from the dataset version the index was built at. Returns `None` if that +/// version can't be loaded (e.g. already cleaned up), leaving those rows to +/// query-time masking as before. +async fn deleted_fragment_row_id_drops( + dataset: &Dataset, + index: &IndexMetadata, +) -> Result>>> { + let Some(coverage) = index.fragment_bitmap.as_ref() else { + return Ok(None); + }; + let live: RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + let dead = coverage - &live; + if dead.is_empty() { + return Ok(None); + } + let Ok(old) = dataset.checkout_version(index.dataset_version).await else { + return Ok(None); + }; + let mut drops = HashMap::new(); + for frag in old.get_fragments() { + let fid = frag.id() as u32; + if !dead.contains(fid) { + continue; + } + if let Some(rows) = frag.metadata().physical_rows { + for offset in 0..rows as u32 { + drops.insert(u64::from(RowAddress::new_from_parts(fid, offset)), None); + } + } + } + Ok(Some(drops)) +} + pub(crate) async fn remap_index( dataset: &Dataset, index_id: &Uuid, @@ -492,6 +532,25 @@ pub(crate) async fn remap_index( )); } + // Materialize fully-deleted fragments. A fragment deleted before this + // compaction is gone from the manifest but may still be covered by this + // index: it was never part of a rewrite group, so its rows are absent from + // `row_id_map` and would survive the rebuild, keeping the index's coverage + // pinned to a fragment that no longer exists and forcing searches onto the + // masked slow path. Map every such row to None so it is dropped from the + // rebuilt index; coverage is trimmed to match at commit time (see + // Transaction::recalculate_fragment_bitmap). + let augmented_map; + let row_id_map = match deleted_fragment_row_id_drops(dataset, matched).await? { + Some(extra) if !extra.is_empty() => { + let mut m = row_id_map.clone(); + m.extend(extra); + augmented_map = m; + &augmented_map + } + _ => row_id_map, + }; + if row_id_map.values().all(|v| v.is_none()) { let deleted_bitmap = RoaringBitmap::from_iter( row_id_map