Skip to content

feat(zonemap): public API + driver helper for build-time consolidation#6779

Open
LuciferYang wants to merge 3 commits into
lance-format:mainfrom
LuciferYang:pr2/feat-zonemap-consolidation-api
Open

feat(zonemap): public API + driver helper for build-time consolidation#6779
LuciferYang wants to merge 3 commits into
lance-format:mainfrom
LuciferYang:pr2/feat-zonemap-consolidation-api

Conversation

@LuciferYang

@LuciferYang LuciferYang commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

Currently any external consumer that wants to build a zonemap from multiple workers has to either live in-tree (to reach internal items) or write one segment per fragment, ending up with N IndexMetadata entries per logical index. This PR exposes the surface needed to consolidate worker-produced batches into a single committed segment.

What's new

In lance-index::scalar::zonemap:

  • pub const ZONEMAP_FILENAME — canonical filename inside the index directory
  • pub const ZONEMAP_INDEX_VERSION: u32 — on-disk format version
  • pub fn zonemap_stats_schema(value_type) — canonical Arrow schema factory
  • pub fn validate_zonemap_stats_schema(schema) — strict structural validator
  • pub fn write_zonemap_index_from_batch(batch, params, store) — write a zonemap.lance from a pre-computed batch, no training phase
  • ZoneMapIndexBuilder::zonemap_stats_as_batch promoted to pub

In lance::index::scalar:

  • pub async fn compute_zonemap_batch(...) — worker side: compute zone stats for a fragment subset without writing
  • pub async fn write_consolidated_zonemap_segment(...) — driver side: take concatenated worker batches, write one zonemap.lance under a fresh UUID, return the IndexMetadata ready for commit_existing_index_segments

Coordinator pattern

// Per worker, on a fragment subset:
let batch = compute_zonemap_batch(ds, column, &frag_ids, &params).await?;

// On the driver, after collecting the worker batches:
let segment = write_consolidated_zonemap_segment(ds, name, column, &batches, &params).await?;
ds.commit_existing_index_segments(name, column, vec![segment]).await?;

Result: one IndexMetadata entry covering every input fragment, not N.

Input validation

Because these entry points are now public, they reject malformed or stale input up front instead of writing a segment that silently corrupts query results:

  • write_consolidated_zonemap_segment cross-checks the batch's min/max value type against the column's scanned type — re-derived the same way compute_zonemap_batch does, so a legitimately adapted column (dict→primitive, extension unwrap) isn't false-rejected. A mismatched type would otherwise prune matching rows via cross-type ScalarValue comparisons.
  • write_consolidated_zonemap_segment rejects fragment_ids that aren't in the dataset's current fragments, so a stale or foreign batch can't return commit-ready metadata claiming coverage the dataset doesn't have.
  • write_zonemap_index_from_batch validates the zone bounds: rows_per_zone != 0 (matching the invariant ZoneTrainer enforces on the training path), zone_length >= 1, and zone_start + zone_length within the fragment's 2^32 row-offset space — the read path computes a zone's row range as (fragment_id << 32) + zone_start .. + zone_length, so a start or length outside the fragment would produce wrong row-address ranges. (zone_length is the offset span last - first + 1, not a row count, so it is intentionally not capped at rows_per_zone — after deletions a zone of rows_per_zone live rows can span a wider offset range.)

Tests

  • 7 unit tests for validate_zonemap_stats_schema covering each rejection branch and the documented permissiveness on min/max nullability.
  • 5 unit tests for the zone-bound validation: valid bounds, zero rows_per_zone, zero-length zone, a span wider than rows_per_zone (accepted — the post-deletion case), and per-fragment offset overflow.
  • Round-trip test for write_zonemap_index_from_batch using rows_per_zone=4 so each fragment contributes multiple zones — the default rows_per_zone=8192 produces only one zone per fragment and hides multi-zone bugs.
  • End-to-end test for write_consolidated_zonemap_segment covering fragment-bitmap completeness, schema-derived field id, index_version, and file presence under the freshly written UUID directory.
  • 2 rejection tests for write_consolidated_zonemap_segment: a min/max type that doesn't match the column, and a fragment_id outside the dataset.

Additive only — no breaking changes.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added the enhancement New feature or request label May 14, 2026
@LuciferYang LuciferYang force-pushed the pr2/feat-zonemap-consolidation-api branch from e9fee8e to 4bbdafa Compare May 14, 2026 04:11
LuciferYang added a commit to LuciferYang/lance-spark that referenced this pull request May 14, 2026
Wire lance-core's computeZonemapBatch + writeZonemapIndexFromBatches
APIs into AddIndexExec. When spark.lance.zonemap.consolidate.enabled=true,
the consumer routes through runZonemapConsolidated:

  - executors call dataset.computeZonemapBatch on their fragment and
    return per-zone min/max stats as Arrow-IPC-encoded bytes
  - driver decodes every batch into VectorSchemaRoots and calls
    dataset.writeZonemapIndexFromBatches once, producing a single
    <uuid>/zonemap.lance file covering the union of all fragments
  - driver commits exactly one IndexMetadata entry via the same
    AddIndexOperation path used by runZonemapDistributed

Default off: preserves the multi-segment distributed shape the read
path has served for the entire history of this code.

sf=100 store_sales A/B (ss_sold_date_sk, local[*], Spark 4.0):

  | metric              | distributed | consolidated |
  |---------------------|-------------|--------------|
  | wall-clock          | 15.0 s      | 28.1 s       |
  | index segments      | 234         | 1            |
  | manifest-referenced | 1,099,920 B | 137,835 B    |

The 8x footprint shrink comes from amortising Lance file overhead
(header + footer + schema metadata) across one consolidated file
instead of paying it 234 times. Wall-clock regression is the expected
trade-off: parallel per-fragment writes become a single driver-side
write. At larger scales and on object stores with high per-PUT
latency, manifest- and listing-cost wins on the read side should
pay this back.

Depends on the new lance-core APIs landing upstream (see
lance-format/lance#6779 and #6780).
@codecov

codecov Bot commented May 14, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 92.08925% with 39 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/index/scalar.rs 89.37% 16 Missing and 13 partials ⚠️
rust/lance-index/src/scalar/zonemap.rs 95.45% 0 Missing and 10 partials ⚠️

📢 Thoughts on this report? Let us know!

LuciferYang added a commit to LuciferYang/lance-spark that referenced this pull request May 14, 2026
Wire lance-core's computeZonemapBatch + writeZonemapIndexFromBatches
APIs into AddIndexExec. When spark.lance.zonemap.consolidate.enabled=true,
the consumer routes through runZonemapConsolidated:

  - executors call dataset.computeZonemapBatch on their fragment and
    return per-zone min/max stats as Arrow-IPC-encoded bytes
  - driver decodes every batch into VectorSchemaRoots and calls
    dataset.writeZonemapIndexFromBatches once, producing a single
    <uuid>/zonemap.lance file covering the union of all fragments
  - driver commits exactly one IndexMetadata entry via the same
    AddIndexOperation path used by runZonemapDistributed

Default off: preserves the multi-segment distributed shape the read
path has served for the entire history of this code.

sf=100 store_sales A/B (ss_sold_date_sk, local[*], Spark 4.0):

  | metric              | distributed | consolidated |
  |---------------------|-------------|--------------|
  | wall-clock          | 15.0 s      | 28.1 s       |
  | index segments      | 234         | 1            |
  | manifest-referenced | 1,099,920 B | 137,835 B    |

The 8x footprint shrink comes from amortising Lance file overhead
(header + footer + schema metadata) across one consolidated file
instead of paying it 234 times. Wall-clock regression is the expected
trade-off: parallel per-fragment writes become a single driver-side
write. At larger scales and on object stores with high per-PUT
latency, manifest- and listing-cost wins on the read side should
pay this back.

Depends on the new lance-core APIs landing upstream (see
lance-format/lance#6779 and #6780).
use roaring::RoaringBitmap;
use uuid::Uuid;

// Validate the indexed column exists in the dataset schema and capture its field id for

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper accepts a stats batch whose min/max type does not match the indexed column. Such a segment can prune matching rows because zonemap comparisons return false for cross-type ScalarValue comparisons.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added a type check up front: it re-derives the column's scanned type the same way compute_zonemap_batch does (so dict→primitive and extension columns still pass) and rejects the batch when min/max don't match, instead of letting the cross-type comparison silently prune matching rows. Covered by test_write_consolidated_zonemap_rejects_value_type_mismatch.

params: &ZoneMapIndexBuilderParams,
index_store: &dyn IndexStore,
) -> Result<()> {
validate_zonemap_stats_schema(record_batch.schema().as_ref())?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch writer validates only column shape and trusts zone bounds from the caller. Invalid zone_start or zone_length values can create wrong row-address ranges or huge ranges at query time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The writer now validates the zone bounds before writing: zone_length must be within 1..=rows_per_zone, and zone_start + zone_length must stay inside the fragment's 2^32 offset space — the read path turns each zone into (fragment_id << 32) + zone_start .. + zone_length, so a bad length or an offset that spills past the fragment boundary would otherwise become a wrong/huge range at query time. The zero-length, over-capacity, and overflow cases all have tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correcting my note above — I had the zone_length bound wrong. zone_length is the offset span (last - first + 1), not the live-row count, so with deletions a zone of rows_per_zone live rows can span a wider offset range and legitimately exceed rows_per_zone (CI caught this on test_zonemap_with_deletions). I dropped the <= rows_per_zone check; the writer now validates only what's actually invariant — zone_length >= 1 and zone_start + zone_length <= 2^32 so a zone can't spill past its fragment's offset space — plus a regression test asserting a wider-than-rows_per_zone span is accepted.

let mut file_schema = record_batch.schema().as_ref().clone();
file_schema.metadata.insert(
ZONEMAP_SIZE_META_KEY.to_string(),
params.rows_per_zone().to_string(),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch writer persists rows_per_zone without enforcing the nonzero invariant used by normal training. It can create a zonemap that loads successfully but fails on later update or rebuild.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — added the same rows_per_zone != 0 guard that ZoneTrainer::new already enforces on the training path, so we can't persist a zone size of 0 that loads fine but breaks on the next update/rebuild. Test: test_validate_zone_bounds_rejects_zero_rows_per_zone.

)
})?;
let mut fragment_bitmap = RoaringBitmap::new();
for f in frag_array.values() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fragment bitmap is derived only from batch contents and is not checked against the current dataset fragments. A stale or foreign batch can return commit-ready metadata that claims coverage outside the dataset.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. It now builds the set of the dataset's current fragment ids and rejects any fragment_id in the batch that isn't one of them, so a stale or foreign batch can't hand back commit-ready metadata claiming fragments the dataset doesn't have. Test: test_write_consolidated_zonemap_rejects_foreign_fragment.

Expose the API surface needed for a coordinator (e.g. a Spark driver
in lance-spark) to consolidate per-fragment zonemap batches computed
in parallel by worker tasks into a single committed zonemap.lance
index file, without re-running the train phase on the driver.

Public additions in lance-index::scalar::zonemap:
- `ZONEMAP_FILENAME` const — the canonical filename inside the
  IndexStore so external callers don't hardcode the string.
- `zonemap_stats_schema(value_type)` — canonical Arrow schema factory
  for the on-disk zone stats batch. min/max take a caller-supplied
  type (the indexed column's type); the remaining metadata columns
  are fixed types and non-nullable.
- `validate_zonemap_stats_schema(schema)` — strict structural
  validator (7 columns in canonical order, fixed metadata types and
  nullability, min/max sharing one type). Public so coordinators can
  pre-flight validate before write.
- `write_zonemap_index_from_batch(batch, params, store)` — free fn
  that writes a pre-computed batch via the IndexStore without
  re-running the train phase. Takes `params: &ZoneMapIndexBuilderParams`
  so future ZoneMap knobs that affect on-disk metadata can extend the
  parameter set without breaking the signature.
- `ZoneMapIndexBuilder::zonemap_stats_as_batch` made `pub` for the
  worker-side `compute_zonemap_batch` consumer.

Public addition in lance::index::scalar:
- `compute_zonemap_batch(dataset, column, fragment_ids, params)` —
  computes the zone-stats RecordBatch for a fragment subset WITHOUT
  writing. The min/max value type is derived from the post-scan
  training stream schema (not `dataset.schema()`), so dictionary →
  primitive type adaptation in the scanner is reflected correctly.

Tests:
- Validator: 7 unit tests covering each rejection branch plus the
  documented permissiveness on min/max nullability.
- Round-trip: writes a consolidated zonemap.lance from concatenated
  per-fragment batches, reads it back, asserts schema is preserved
  and per-fragment zone counts match ceil(rows_per_frag/rows_per_zone)
  via a BTreeMap multiset (set-membership would mask duplicated
  rows). Uses rows_per_zone=4 with 10 rows per fragment so each
  fragment contributes multiple zones — covering the multi-zone-per-
  fragment shape rather than the trivial single-zone case.
Completes the build-time-consolidation API surface paired with the
existing worker-side compute_zonemap_batch. The coordinator (Spark
driver, Python driver, etc.) does:

  1. Per-fragment-subset workers call compute_zonemap_batch and
     return Arrow batches.
  2. Coordinator concatenates them.
  3. Coordinator calls write_consolidated_zonemap_segment, which:
     - validates the indexed column exists, captures its field id
     - derives the IndexMetadata.fragment_bitmap from the batch's
       fragment_id column (so a fragment hole in the inputs ⇒ a
       hole in the bitmap, no implicit "all fragments" fiction)
     - allocates a fresh UUID directory under indices/
     - writes zonemap.lance via the existing
       write_zonemap_index_from_batch (which structurally validates
       the batch on the way in)
     - returns a full IndexMetadata populated with the canonical
       ZoneMap index_version, prost-encoded ZoneMapIndexDetails,
       and the file listing for skip-HEAD optimisations.
  4. Coordinator passes the returned IndexMetadata to the existing
     commitExistingIndexSegments to land one IndexMetadata entry
     covering all input fragments — a single segment instead of N.

ZONEMAP_INDEX_VERSION is now pub so this helper (and any future
external coordinator) can populate IndexMetadata.index_version
without duplicating the literal. Other scalar index versions stay
file-internal until they grow a similar need.

End-to-end test verifies (a) every input fragment lands in the
returned bitmap, (b) the field id matches the schema lookup,
(c) the index_version equals ZONEMAP_INDEX_VERSION, (d) the
freshly written directory contains zonemap.lance.
@LuciferYang LuciferYang force-pushed the pr2/feat-zonemap-consolidation-api branch from 4bbdafa to 75ce596 Compare June 22, 2026 15:39
@github-actions github-actions Bot added the A-index Vector index, linalg, tokenizer label Jun 22, 2026
Address review feedback on the build-time-consolidation API: validate
caller-supplied inputs up front so a malformed or foreign batch is rejected
with a clear error instead of silently corrupting query results.

- write_consolidated_zonemap_segment cross-checks the batch min/max value type
  against the column's scanned type (re-derived as compute_zonemap_batch does,
  after dict->primitive / extension adaptation). A mismatch would otherwise
  prune matching rows via cross-type ScalarValue comparisons.
- write_consolidated_zonemap_segment rejects fragment ids absent from the
  dataset's current fragments, so a stale or foreign batch cannot return
  commit-ready metadata claiming coverage the dataset lacks.
- write_zonemap_index_from_batch validates zone bounds (validate_zone_bounds):
  rows_per_zone != 0 (matching ZoneTrainer's invariant), zone_length >= 1, and
  zone_start + zone_length within the per-fragment 2^32 row-offset space.
  zone_length is the offset span (last - first + 1), not the live-row count, so
  it is intentionally not capped at rows_per_zone -- with deletions a zone can
  span a wider offset range than the zone capacity.

Adds 7 unit tests covering each rejection branch plus the post-deletion
wide-span acceptance case.

Post-rebase fixups for main: ZONEMAP_INDEX_VERSION is now pub (used by the
consolidation helper) and from_dataset_for_new takes &Uuid.
@LuciferYang LuciferYang force-pushed the pr2/feat-zonemap-consolidation-api branch from 75ce596 to 57a893a Compare June 23, 2026 06:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-index Vector index, linalg, tokenizer enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants