Skip to content

Vector index can become corrupted when compaction is deferred #7374

Description

@westonpace

During remap we normally prune fragments that have been deleted. When using the FRI (deferred compaction) we skip this remap phase and so the deleted fragment is not removed. The index then returns references to the deleted fragment and we get "take received reference to fragment that does not exist" style errors.

Reproduction Script

#!/usr/bin/env python3
"""Reproduce index corruption via compaction with deferred remap."""

import numpy as np
import lance
import pyarrow as pa
import shutil
import os

DATASET_PATH = "/tmp/corruption_test"
ROWS_PER_FRAGMENT = 1000
NUM_FRAGMENTS = 10
DIM = 128
FRAGMENT_TO_DELETE = 4  # 0-indexed; delete all rows from fragment 4

def main():
    # Clean up any previous run
    if os.path.exists(DATASET_PATH):
        shutil.rmtree(DATASET_PATH)

    # Step 1: Create dataset with 10 fragments (1K rows each)
    print("=== Step 1: Creating dataset with 10 fragments ===")
    rng = np.random.default_rng(42)

    schema = pa.schema([
        pa.field("id", pa.int64()),
        pa.field("vec", pa.list_(pa.float32(), DIM)),
    ])

    ds = None
    for frag_idx in range(NUM_FRAGMENTS):
        start_id = frag_idx * ROWS_PER_FRAGMENT
        ids = np.arange(start_id, start_id + ROWS_PER_FRAGMENT, dtype=np.int64)
        vecs = rng.standard_normal((ROWS_PER_FRAGMENT, DIM)).astype(np.float32)

        table = pa.table({
            "id": ids,
            "vec": pa.array(vecs.tolist(), type=pa.list_(pa.float32(), DIM)),
        })

        if ds is None:
            ds = lance.write_dataset(table, DATASET_PATH, schema=schema)
        else:
            ds = lance.write_dataset(table, DATASET_PATH, mode="append")

    frags = ds.get_fragments()
    print(f"  Total rows: {ds.count_rows()}")
    print(f"  Fragments: {len(frags)}")
    for i, f in enumerate(frags):
        print(f"    Fragment {i}: id={f.fragment_id}, rows={f.count_rows()}")

    # Step 2: Create vector index
    print("\n=== Step 2: Creating vector index ===")
    ds.create_index(
        "vec",
        index_type="IVF_PQ",
        num_partitions=10,
        num_sub_vectors=8,
        replace=True,
    )
    print(f"  Indices: {ds.list_indices()}")

    # Step 3: Delete all rows from one fragment
    print(f"\n=== Step 3: Deleting all rows from fragment {FRAGMENT_TO_DELETE} ===")
    row_start = FRAGMENT_TO_DELETE * ROWS_PER_FRAGMENT
    row_end = row_start + ROWS_PER_FRAGMENT - 1
    ds.delete(f"id >= {row_start} AND id <= {row_end}")
    ds = lance.dataset(DATASET_PATH)
    print(f"  Rows after delete: {ds.count_rows()}")
    frags = ds.get_fragments()
    print(f"  Fragments after delete: {len(frags)}")
    for i, f in enumerate(frags):
        print(f"    Fragment {i}: id={f.fragment_id}, rows={f.count_rows()}")

    # Step 4: Compact with deferred index remap
    print("\n=== Step 4: Compacting with defer_index_remap=True ===")
    metrics = ds.optimize.compact_files(
        target_rows_per_fragment=ROWS_PER_FRAGMENT * NUM_FRAGMENTS,  # merge all into one
        materialize_deletions=True,
        defer_index_remap=True,
    )
    print(f"  Compaction metrics: {metrics}")
    ds = lance.dataset(DATASET_PATH)
    print(f"  Rows after compaction: {ds.count_rows()}")
    frags = ds.get_fragments()
    print(f"  Fragments after compaction: {len(frags)}")

    # Step 5: Search the dataset
    print("\n=== Step 5: Searching dataset (expect corruption / errors) ===")
    query = np.random.default_rng(99).standard_normal(DIM).astype(np.float32)
    try:
        results = ds.scanner(
            nearest={"column": "vec", "q": query, "k": 10},
        ).to_table()
        print(f"  Search returned {len(results)} rows")
        print(f"  Result ids: {results['id'].to_pylist()}")
        # Check for any ids that fall in the deleted fragment range
        deleted_ids = set(range(row_start, row_start + ROWS_PER_FRAGMENT))
        returned_ids = set(results['id'].to_pylist())
        ghost_ids = returned_ids & deleted_ids
        if ghost_ids:
            print(f"  CORRUPTION DETECTED: returned deleted ids: {ghost_ids}")
        else:
            print("  No ghost ids in results (may still be corrupt in other ways)")
    except Exception as e:
        print(f"  ERROR during search: {type(e).__name__}: {e}")

if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingcritical-fixBugs that cause crashes, security vulnerabilities, or incorrect data.

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions