Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,17 @@ message FragmentReuseIndexDetails {
uint64 dataset_version = 1;

repeated Group groups = 3;

// Fragment IDs that some index covered but that were no longer present in
// the dataset when this version was committed, and were not part of any
// rewrite group (e.g. a fragment that was fully deleted before compaction,
// so it never entered a compaction task / Group.old_fragments).
//
// Such fragments are gone from the manifest, so their rows cannot be
// enumerated by address. Their stale index entries are pruned by fragment
// ID instead: the fragment-reuse index maps every row address whose
// fragment ID is in this list to "deleted" (None).
repeated uint32 removed_fragments = 4;
}
}

Expand Down
44 changes: 44 additions & 0 deletions rust/lance-table/src/system_index/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
use lance_core::deepsize::{Context, DeepSizeOf};
use lance_core::utils::address::RowAddress;
use lance_core::{Error, Result};
use lance_select::RowAddrTreeMap;
use roaring::{RoaringBitmap, RoaringTreemap};
Expand Down Expand Up @@ -105,13 +106,18 @@ impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
pub struct FragReuseVersion {
pub dataset_version: u64,
pub groups: Vec<FragReuseGroup>,
/// Fragments an index covered that were gone from the manifest at commit
/// time and not part of any rewrite group (e.g. fully deleted before
/// compaction). Pruned by ID since their rows can't be enumerated by address.
pub removed_frags: Vec<u32>,
}

impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
fn from(version: &FragReuseVersion) -> Self {
Self {
dataset_version: version.dataset_version,
groups: version.groups.iter().map(|g| g.into()).collect(),
removed_fragments: version.removed_frags.clone(),
}
}
}
Expand All @@ -127,6 +133,7 @@ impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
.into_iter()
.map(FragReuseGroup::try_from)
.collect::<Result<_>>()?,
removed_frags: version.removed_fragments,
})
}
}
Expand Down Expand Up @@ -194,6 +201,15 @@ impl FragReuseIndexDetails {
.flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
)
}

/// Union of every version's `removed_frags`.
pub fn removed_frag_bitmap(&self) -> RoaringBitmap {
RoaringBitmap::from_iter(
self.versions
.iter()
.flat_map(|v| v.removed_frags.iter().copied()),
)
}
}

/// An index that stores row ID maps.
Expand All @@ -204,6 +220,9 @@ pub struct FragReuseIndex {
pub uuid: Uuid,
pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
pub details: FragReuseIndexDetails,
/// Cache of `details.removed_frag_bitmap()`; rebuilt in `new`, so skipped.
#[serde(skip)]
removed_frags: RoaringBitmap,
Comment on lines +223 to +225

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably unrelated to this PR but why do we store the FRI as a collection of versions in the first place? Why not merge them all together at write time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the git history and code comments and my understandings, the main benefit is versioning allows the system to trim the FRI as different indices catch up so the size of the FRI is not unbounded before a full index job kicks in.

Then versioning also makes FRI ~append only which inherit all the consistency benefits

}

impl DeepSizeOf for FragReuseIndex {
Expand All @@ -218,10 +237,12 @@ impl FragReuseIndex {
row_id_maps: Vec<HashMap<u64, Option<u64>>>,
details: FragReuseIndexDetails,
) -> Self {
let removed_frags = details.removed_frag_bitmap();
Self {
uuid,
row_id_maps,
details,
removed_frags,
}
}

Expand All @@ -236,6 +257,20 @@ impl FragReuseIndex {
}
}

// Prune rows whose final address lands in a fully-deleted fragment
// (tracked by ID in `removed_frags`). Checked after chaining, since an
// earlier compaction may have rewritten the row into a fragment that was
// later deleted. Fragment IDs are never reused, so a live row is never
// pruned.
if let Some(addr) = mapped_value
&& !self.removed_frags.is_empty()
&& self
.removed_frags
.contains(RowAddress::new_from_u64(addr).fragment_id())
{
return None;
}

mapped_value
}

Expand Down Expand Up @@ -307,6 +342,13 @@ impl FragReuseIndex {
}

pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
// Orphaned fragments (fully deleted before compaction) are deliberately
// NOT removed from index coverage here. Their rows are already pruned
// from the index data by `remap_row_id`, and a fragment that no longer
// exists is masked out at query time by `effective_fragment_bitmap`'s
// intersection with the live fragments. Stripping a deleted fragment's
// coverage instead makes the planner treat its (now nonexistent) row
// range as an unindexed flat-scan fallback, which fails on `take`.
for version in self.details.versions.iter() {
for group in version.groups.iter() {
let mut removed = 0;
Expand Down Expand Up @@ -378,6 +420,7 @@ mod tests {
},
],
}],
removed_frags: vec![],
};

let version2 = FragReuseVersion {
Expand All @@ -402,6 +445,7 @@ mod tests {
},
],
}],
removed_frags: vec![],
};

// Create FragReuseIndexDetails with versions in reverse order
Expand Down
Loading
Loading