Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7316,4 +7316,93 @@ mod tests {
]
);
}

// Compaction should materialize a fully-deleted fragment: drop it from both
// the index's data and its coverage, so searches stay on the fast path
// instead of masking a fragment that no longer exists.
#[tokio::test]
async fn test_compaction_materializes_deleted_fragment() {
let mut data_gen =
BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned())));
// 10 fragments x 100 rows, i = 0..1000. Fragment 4 holds i in [400,500).
let mut dataset = Dataset::write(
data_gen.batch(1_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 100,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 10);

dataset
.create_index(
&["i"],
IndexType::BTree,
Some("i_idx".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();

// Fully delete fragment 4 (removed from the manifest, never compacted).
dataset.delete("i >= 400 AND i < 500").await.unwrap();
assert_eq!(dataset.get_fragments().len(), 9);

// Inline compaction (not deferred): indexes are remapped immediately.
compact_files(
&mut dataset,
CompactionOptions {
target_rows_per_fragment: 100_000,
materialize_deletions: true,
defer_index_remap: false,
..Default::default()
},
None,
)
.await
.unwrap();

// Coverage: the deleted fragment 4 must be gone, leaving exactly the
// live fragments (without the fix, fragment 4 lingers in the bitmap).
let index = dataset
.load_index_by_name("i_idx")
.await
.unwrap()
.expect("index must exist");
let bitmap = index
.fragment_bitmap
.expect("index must store a fragment bitmap");
assert!(
!bitmap.contains(4),
"deleted fragment 4 still covered by the index: {bitmap:?}"
);
let live: roaring::RoaringBitmap = dataset
.get_fragments()
.iter()
.map(|f| f.id() as u32)
.collect();
assert_eq!(
bitmap, live,
"index coverage should equal the live fragments"
);

// Data: an indexed filtered scan resolves row addresses and takes. If the
// deleted fragment's rows survived in the index data while its coverage
// was trimmed, this take would error on a missing fragment.
let batch = dataset
.scan()
.filter("i >= 400 AND i < 500")
.unwrap()
.project(&["i"])
.unwrap()
.try_into_batch()
.await
.expect("indexed scan over the deleted range must succeed");
assert_eq!(batch.num_rows(), 0, "deleted fragment rows resurfaced");
assert_eq!(dataset.count_rows(None).await.unwrap(), 900);
}
}
32 changes: 28 additions & 4 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2070,17 +2070,27 @@ impl Transaction {
next_row_id.as_ref(),
)?;

let live_fragments: RoaringBitmap =
final_fragments.iter().map(|f| f.id as u32).collect();
if next_row_id.is_some() {
// We can re-use indices, but need to rewrite the fragment bitmaps
debug_assert!(rewritten_indices.is_empty());
for index in final_indices.iter_mut() {
if let Some(fragment_bitmap) = &mut index.fragment_bitmap {
*fragment_bitmap =
Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?;
*fragment_bitmap = Self::recalculate_fragment_bitmap(
fragment_bitmap,
groups,
&live_fragments,
)?;
}
}
} else {
Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?;
Self::handle_rewrite_indices(
&mut final_indices,
rewritten_indices,
groups,
&live_fragments,
)?;
}

if let Some(frag_reuse_index) = frag_reuse_index {
Expand Down Expand Up @@ -2744,6 +2754,7 @@ impl Transaction {
fn recalculate_fragment_bitmap(
old: &RoaringBitmap,
groups: &[RewriteGroup],
live_fragments: &RoaringBitmap,
) -> Result<RoaringBitmap> {
let mut new_bitmap = old.clone();
for group in groups {
Expand Down Expand Up @@ -2772,13 +2783,20 @@ impl Transaction {
}
}
}
// Drop fragments that are no longer in the dataset (fully deleted before
// this compaction). They were never part of a rewrite group, so the loop
// above leaves them in coverage; trimming them here keeps index searches
// on the fast path. The corresponding index data is dropped during the
// remap (see index::deleted_fragment_row_id_drops).
new_bitmap &= live_fragments;
Ok(new_bitmap)
}

fn handle_rewrite_indices(
indices: &mut [IndexMetadata],
rewritten_indices: &[RewrittenIndex],
groups: &[RewriteGroup],
live_fragments: &RoaringBitmap,
) -> Result<()> {
let mut modified_indices = HashSet::new();

Expand Down Expand Up @@ -2806,6 +2824,7 @@ impl Transaction {
))
})?,
groups,
live_fragments,
)?);
index.uuid = rewritten_index.new_id;
// Update file sizes to match the new index files. When not available
Expand Down Expand Up @@ -4816,7 +4835,12 @@ mod tests {
}];

// Should succeed (skip missing index) instead of error
let result = Transaction::handle_rewrite_indices(&mut indices, &rewritten_indices, &[]);
let result = Transaction::handle_rewrite_indices(
&mut indices,
&rewritten_indices,
&[],
&RoaringBitmap::new(),
);
assert!(result.is_ok());
assert!(indices.is_empty());
}
Expand Down
59 changes: 59 additions & 0 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,46 @@ pub trait IndexBuilder {
async fn build(&self) -> Result<()>;
}

/// `addr -> None` drops for every row of a fragment this index still covers
/// that is no longer in the dataset (fully deleted before compaction). The
/// fragments are gone from the current manifest, so their rows are enumerated
/// from the dataset version the index was built at. Returns `None` if that
/// version can't be loaded (e.g. already cleaned up), leaving those rows to
/// query-time masking as before.
async fn deleted_fragment_row_id_drops(
dataset: &Dataset,
index: &IndexMetadata,
) -> Result<Option<HashMap<u64, Option<u64>>>> {
let Some(coverage) = index.fragment_bitmap.as_ref() else {
return Ok(None);
};
let live: RoaringBitmap = dataset
.get_fragments()
.iter()
.map(|f| f.id() as u32)
.collect();
let dead = coverage - &live;
if dead.is_empty() {
return Ok(None);
}
let Ok(old) = dataset.checkout_version(index.dataset_version).await else {
return Ok(None);
};
let mut drops = HashMap::new();
for frag in old.get_fragments() {
let fid = frag.id() as u32;
if !dead.contains(fid) {
continue;
}
if let Some(rows) = frag.metadata().physical_rows {
for offset in 0..rows as u32 {
drops.insert(u64::from(RowAddress::new_from_parts(fid, offset)), None);
}
}
}
Ok(Some(drops))
}

pub(crate) async fn remap_index(
dataset: &Dataset,
index_id: &Uuid,
Expand All @@ -492,6 +532,25 @@ pub(crate) async fn remap_index(
));
}

// Materialize fully-deleted fragments. A fragment deleted before this
// compaction is gone from the manifest but may still be covered by this
// index: it was never part of a rewrite group, so its rows are absent from
// `row_id_map` and would survive the rebuild, keeping the index's coverage
// pinned to a fragment that no longer exists and forcing searches onto the
// masked slow path. Map every such row to None so it is dropped from the
// rebuilt index; coverage is trimmed to match at commit time (see
// Transaction::recalculate_fragment_bitmap).
let augmented_map;
let row_id_map = match deleted_fragment_row_id_drops(dataset, matched).await? {
Some(extra) if !extra.is_empty() => {
let mut m = row_id_map.clone();
m.extend(extra);
augmented_map = m;
&augmented_map
}
_ => row_id_map,
};

if row_id_map.values().all(|v| v.is_none()) {
let deleted_bitmap = RoaringBitmap::from_iter(
row_id_map
Expand Down
Loading