During remap we normally prune fragments that have been deleted. When using the FRI (deferred compaction) we skip this remap phase and so the deleted fragment is not removed. The index then returns references to the deleted fragment and we get "take received reference to fragment that does not exist" style errors.
#!/usr/bin/env python3
"""Reproduce index corruption via compaction with deferred remap."""
import numpy as np
import lance
import pyarrow as pa
import shutil
import os
DATASET_PATH = "/tmp/corruption_test"
ROWS_PER_FRAGMENT = 1000
NUM_FRAGMENTS = 10
DIM = 128
FRAGMENT_TO_DELETE = 4 # 0-indexed; delete all rows from fragment 4
def main():
# Clean up any previous run
if os.path.exists(DATASET_PATH):
shutil.rmtree(DATASET_PATH)
# Step 1: Create dataset with 10 fragments (1K rows each)
print("=== Step 1: Creating dataset with 10 fragments ===")
rng = np.random.default_rng(42)
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("vec", pa.list_(pa.float32(), DIM)),
])
ds = None
for frag_idx in range(NUM_FRAGMENTS):
start_id = frag_idx * ROWS_PER_FRAGMENT
ids = np.arange(start_id, start_id + ROWS_PER_FRAGMENT, dtype=np.int64)
vecs = rng.standard_normal((ROWS_PER_FRAGMENT, DIM)).astype(np.float32)
table = pa.table({
"id": ids,
"vec": pa.array(vecs.tolist(), type=pa.list_(pa.float32(), DIM)),
})
if ds is None:
ds = lance.write_dataset(table, DATASET_PATH, schema=schema)
else:
ds = lance.write_dataset(table, DATASET_PATH, mode="append")
frags = ds.get_fragments()
print(f" Total rows: {ds.count_rows()}")
print(f" Fragments: {len(frags)}")
for i, f in enumerate(frags):
print(f" Fragment {i}: id={f.fragment_id}, rows={f.count_rows()}")
# Step 2: Create vector index
print("\n=== Step 2: Creating vector index ===")
ds.create_index(
"vec",
index_type="IVF_PQ",
num_partitions=10,
num_sub_vectors=8,
replace=True,
)
print(f" Indices: {ds.list_indices()}")
# Step 3: Delete all rows from one fragment
print(f"\n=== Step 3: Deleting all rows from fragment {FRAGMENT_TO_DELETE} ===")
row_start = FRAGMENT_TO_DELETE * ROWS_PER_FRAGMENT
row_end = row_start + ROWS_PER_FRAGMENT - 1
ds.delete(f"id >= {row_start} AND id <= {row_end}")
ds = lance.dataset(DATASET_PATH)
print(f" Rows after delete: {ds.count_rows()}")
frags = ds.get_fragments()
print(f" Fragments after delete: {len(frags)}")
for i, f in enumerate(frags):
print(f" Fragment {i}: id={f.fragment_id}, rows={f.count_rows()}")
# Step 4: Compact with deferred index remap
print("\n=== Step 4: Compacting with defer_index_remap=True ===")
metrics = ds.optimize.compact_files(
target_rows_per_fragment=ROWS_PER_FRAGMENT * NUM_FRAGMENTS, # merge all into one
materialize_deletions=True,
defer_index_remap=True,
)
print(f" Compaction metrics: {metrics}")
ds = lance.dataset(DATASET_PATH)
print(f" Rows after compaction: {ds.count_rows()}")
frags = ds.get_fragments()
print(f" Fragments after compaction: {len(frags)}")
# Step 5: Search the dataset
print("\n=== Step 5: Searching dataset (expect corruption / errors) ===")
query = np.random.default_rng(99).standard_normal(DIM).astype(np.float32)
try:
results = ds.scanner(
nearest={"column": "vec", "q": query, "k": 10},
).to_table()
print(f" Search returned {len(results)} rows")
print(f" Result ids: {results['id'].to_pylist()}")
# Check for any ids that fall in the deleted fragment range
deleted_ids = set(range(row_start, row_start + ROWS_PER_FRAGMENT))
returned_ids = set(results['id'].to_pylist())
ghost_ids = returned_ids & deleted_ids
if ghost_ids:
print(f" CORRUPTION DETECTED: returned deleted ids: {ghost_ids}")
else:
print(" No ghost ids in results (may still be corrupt in other ways)")
except Exception as e:
print(f" ERROR during search: {type(e).__name__}: {e}")
if __name__ == "__main__":
main()
During remap we normally prune fragments that have been deleted. When using the FRI (deferred compaction) we skip this remap phase and so the deleted fragment is not removed. The index then returns references to the deleted fragment and we get "take received reference to fragment that does not exist" style errors.
Reproduction Script