feat(zonemap): public API + driver helper for build-time consolidation#6779
feat(zonemap): public API + driver helper for build-time consolidation#6779LuciferYang wants to merge 3 commits into
Conversation
e9fee8e to
4bbdafa
Compare
Wire lance-core's computeZonemapBatch + writeZonemapIndexFromBatches
APIs into AddIndexExec. When spark.lance.zonemap.consolidate.enabled=true,
the consumer routes through runZonemapConsolidated:
- executors call dataset.computeZonemapBatch on their fragment and
return per-zone min/max stats as Arrow-IPC-encoded bytes
- driver decodes every batch into VectorSchemaRoots and calls
dataset.writeZonemapIndexFromBatches once, producing a single
<uuid>/zonemap.lance file covering the union of all fragments
- driver commits exactly one IndexMetadata entry via the same
AddIndexOperation path used by runZonemapDistributed
Default off: preserves the multi-segment distributed shape the read
path has served for the entire history of this code.
sf=100 store_sales A/B (ss_sold_date_sk, local[*], Spark 4.0):
| metric | distributed | consolidated |
|---------------------|-------------|--------------|
| wall-clock | 15.0 s | 28.1 s |
| index segments | 234 | 1 |
| manifest-referenced | 1,099,920 B | 137,835 B |
The 8x footprint shrink comes from amortising Lance file overhead
(header + footer + schema metadata) across one consolidated file
instead of paying it 234 times. Wall-clock regression is the expected
trade-off: parallel per-fragment writes become a single driver-side
write. At larger scales and on object stores with high per-PUT
latency, manifest- and listing-cost wins on the read side should
pay this back.
Depends on the new lance-core APIs landing upstream (see
lance-format/lance#6779 and #6780).
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Wire lance-core's computeZonemapBatch + writeZonemapIndexFromBatches
APIs into AddIndexExec. When spark.lance.zonemap.consolidate.enabled=true,
the consumer routes through runZonemapConsolidated:
- executors call dataset.computeZonemapBatch on their fragment and
return per-zone min/max stats as Arrow-IPC-encoded bytes
- driver decodes every batch into VectorSchemaRoots and calls
dataset.writeZonemapIndexFromBatches once, producing a single
<uuid>/zonemap.lance file covering the union of all fragments
- driver commits exactly one IndexMetadata entry via the same
AddIndexOperation path used by runZonemapDistributed
Default off: preserves the multi-segment distributed shape the read
path has served for the entire history of this code.
sf=100 store_sales A/B (ss_sold_date_sk, local[*], Spark 4.0):
| metric | distributed | consolidated |
|---------------------|-------------|--------------|
| wall-clock | 15.0 s | 28.1 s |
| index segments | 234 | 1 |
| manifest-referenced | 1,099,920 B | 137,835 B |
The 8x footprint shrink comes from amortising Lance file overhead
(header + footer + schema metadata) across one consolidated file
instead of paying it 234 times. Wall-clock regression is the expected
trade-off: parallel per-fragment writes become a single driver-side
write. At larger scales and on object stores with high per-PUT
latency, manifest- and listing-cost wins on the read side should
pay this back.
Depends on the new lance-core APIs landing upstream (see
lance-format/lance#6779 and #6780).
| use roaring::RoaringBitmap; | ||
| use uuid::Uuid; | ||
|
|
||
| // Validate the indexed column exists in the dataset schema and capture its field id for |
There was a problem hiding this comment.
The helper accepts a stats batch whose min/max type does not match the indexed column. Such a segment can prune matching rows because zonemap comparisons return false for cross-type ScalarValue comparisons.
There was a problem hiding this comment.
Good point. Added a type check up front: it re-derives the column's scanned type the same way compute_zonemap_batch does (so dict→primitive and extension columns still pass) and rejects the batch when min/max don't match, instead of letting the cross-type comparison silently prune matching rows. Covered by test_write_consolidated_zonemap_rejects_value_type_mismatch.
| params: &ZoneMapIndexBuilderParams, | ||
| index_store: &dyn IndexStore, | ||
| ) -> Result<()> { | ||
| validate_zonemap_stats_schema(record_batch.schema().as_ref())?; |
There was a problem hiding this comment.
The batch writer validates only column shape and trusts zone bounds from the caller. Invalid zone_start or zone_length values can create wrong row-address ranges or huge ranges at query time.
There was a problem hiding this comment.
Agreed. The writer now validates the zone bounds before writing: zone_length must be within 1..=rows_per_zone, and zone_start + zone_length must stay inside the fragment's 2^32 offset space — the read path turns each zone into (fragment_id << 32) + zone_start .. + zone_length, so a bad length or an offset that spills past the fragment boundary would otherwise become a wrong/huge range at query time. The zero-length, over-capacity, and overflow cases all have tests.
There was a problem hiding this comment.
Correcting my note above — I had the zone_length bound wrong. zone_length is the offset span (last - first + 1), not the live-row count, so with deletions a zone of rows_per_zone live rows can span a wider offset range and legitimately exceed rows_per_zone (CI caught this on test_zonemap_with_deletions). I dropped the <= rows_per_zone check; the writer now validates only what's actually invariant — zone_length >= 1 and zone_start + zone_length <= 2^32 so a zone can't spill past its fragment's offset space — plus a regression test asserting a wider-than-rows_per_zone span is accepted.
| let mut file_schema = record_batch.schema().as_ref().clone(); | ||
| file_schema.metadata.insert( | ||
| ZONEMAP_SIZE_META_KEY.to_string(), | ||
| params.rows_per_zone().to_string(), |
There was a problem hiding this comment.
The batch writer persists rows_per_zone without enforcing the nonzero invariant used by normal training. It can create a zonemap that loads successfully but fails on later update or rebuild.
There was a problem hiding this comment.
Right — added the same rows_per_zone != 0 guard that ZoneTrainer::new already enforces on the training path, so we can't persist a zone size of 0 that loads fine but breaks on the next update/rebuild. Test: test_validate_zone_bounds_rejects_zero_rows_per_zone.
| ) | ||
| })?; | ||
| let mut fragment_bitmap = RoaringBitmap::new(); | ||
| for f in frag_array.values() { |
There was a problem hiding this comment.
The fragment bitmap is derived only from batch contents and is not checked against the current dataset fragments. A stale or foreign batch can return commit-ready metadata that claims coverage outside the dataset.
There was a problem hiding this comment.
Makes sense. It now builds the set of the dataset's current fragment ids and rejects any fragment_id in the batch that isn't one of them, so a stale or foreign batch can't hand back commit-ready metadata claiming fragments the dataset doesn't have. Test: test_write_consolidated_zonemap_rejects_foreign_fragment.
Expose the API surface needed for a coordinator (e.g. a Spark driver in lance-spark) to consolidate per-fragment zonemap batches computed in parallel by worker tasks into a single committed zonemap.lance index file, without re-running the train phase on the driver. Public additions in lance-index::scalar::zonemap: - `ZONEMAP_FILENAME` const — the canonical filename inside the IndexStore so external callers don't hardcode the string. - `zonemap_stats_schema(value_type)` — canonical Arrow schema factory for the on-disk zone stats batch. min/max take a caller-supplied type (the indexed column's type); the remaining metadata columns are fixed types and non-nullable. - `validate_zonemap_stats_schema(schema)` — strict structural validator (7 columns in canonical order, fixed metadata types and nullability, min/max sharing one type). Public so coordinators can pre-flight validate before write. - `write_zonemap_index_from_batch(batch, params, store)` — free fn that writes a pre-computed batch via the IndexStore without re-running the train phase. Takes `params: &ZoneMapIndexBuilderParams` so future ZoneMap knobs that affect on-disk metadata can extend the parameter set without breaking the signature. - `ZoneMapIndexBuilder::zonemap_stats_as_batch` made `pub` for the worker-side `compute_zonemap_batch` consumer. Public addition in lance::index::scalar: - `compute_zonemap_batch(dataset, column, fragment_ids, params)` — computes the zone-stats RecordBatch for a fragment subset WITHOUT writing. The min/max value type is derived from the post-scan training stream schema (not `dataset.schema()`), so dictionary → primitive type adaptation in the scanner is reflected correctly. Tests: - Validator: 7 unit tests covering each rejection branch plus the documented permissiveness on min/max nullability. - Round-trip: writes a consolidated zonemap.lance from concatenated per-fragment batches, reads it back, asserts schema is preserved and per-fragment zone counts match ceil(rows_per_frag/rows_per_zone) via a BTreeMap multiset (set-membership would mask duplicated rows). Uses rows_per_zone=4 with 10 rows per fragment so each fragment contributes multiple zones — covering the multi-zone-per- fragment shape rather than the trivial single-zone case.
Completes the build-time-consolidation API surface paired with the
existing worker-side compute_zonemap_batch. The coordinator (Spark
driver, Python driver, etc.) does:
1. Per-fragment-subset workers call compute_zonemap_batch and
return Arrow batches.
2. Coordinator concatenates them.
3. Coordinator calls write_consolidated_zonemap_segment, which:
- validates the indexed column exists, captures its field id
- derives the IndexMetadata.fragment_bitmap from the batch's
fragment_id column (so a fragment hole in the inputs ⇒ a
hole in the bitmap, no implicit "all fragments" fiction)
- allocates a fresh UUID directory under indices/
- writes zonemap.lance via the existing
write_zonemap_index_from_batch (which structurally validates
the batch on the way in)
- returns a full IndexMetadata populated with the canonical
ZoneMap index_version, prost-encoded ZoneMapIndexDetails,
and the file listing for skip-HEAD optimisations.
4. Coordinator passes the returned IndexMetadata to the existing
commitExistingIndexSegments to land one IndexMetadata entry
covering all input fragments — a single segment instead of N.
ZONEMAP_INDEX_VERSION is now pub so this helper (and any future
external coordinator) can populate IndexMetadata.index_version
without duplicating the literal. Other scalar index versions stay
file-internal until they grow a similar need.
End-to-end test verifies (a) every input fragment lands in the
returned bitmap, (b) the field id matches the schema lookup,
(c) the index_version equals ZONEMAP_INDEX_VERSION, (d) the
freshly written directory contains zonemap.lance.
4bbdafa to
75ce596
Compare
Address review feedback on the build-time-consolidation API: validate caller-supplied inputs up front so a malformed or foreign batch is rejected with a clear error instead of silently corrupting query results. - write_consolidated_zonemap_segment cross-checks the batch min/max value type against the column's scanned type (re-derived as compute_zonemap_batch does, after dict->primitive / extension adaptation). A mismatch would otherwise prune matching rows via cross-type ScalarValue comparisons. - write_consolidated_zonemap_segment rejects fragment ids absent from the dataset's current fragments, so a stale or foreign batch cannot return commit-ready metadata claiming coverage the dataset lacks. - write_zonemap_index_from_batch validates zone bounds (validate_zone_bounds): rows_per_zone != 0 (matching ZoneTrainer's invariant), zone_length >= 1, and zone_start + zone_length within the per-fragment 2^32 row-offset space. zone_length is the offset span (last - first + 1), not the live-row count, so it is intentionally not capped at rows_per_zone -- with deletions a zone can span a wider offset range than the zone capacity. Adds 7 unit tests covering each rejection branch plus the post-deletion wide-span acceptance case. Post-rebase fixups for main: ZONEMAP_INDEX_VERSION is now pub (used by the consolidation helper) and from_dataset_for_new takes &Uuid.
75ce596 to
57a893a
Compare
Summary
Currently any external consumer that wants to build a zonemap from multiple workers has to either live in-tree (to reach internal items) or write one segment per fragment, ending up with N
IndexMetadataentries per logical index. This PR exposes the surface needed to consolidate worker-produced batches into a single committed segment.What's new
In
lance-index::scalar::zonemap:pub const ZONEMAP_FILENAME— canonical filename inside the index directorypub const ZONEMAP_INDEX_VERSION: u32— on-disk format versionpub fn zonemap_stats_schema(value_type)— canonical Arrow schema factorypub fn validate_zonemap_stats_schema(schema)— strict structural validatorpub fn write_zonemap_index_from_batch(batch, params, store)— write azonemap.lancefrom a pre-computed batch, no training phaseZoneMapIndexBuilder::zonemap_stats_as_batchpromoted topubIn
lance::index::scalar:pub async fn compute_zonemap_batch(...)— worker side: compute zone stats for a fragment subset without writingpub async fn write_consolidated_zonemap_segment(...)— driver side: take concatenated worker batches, write onezonemap.lanceunder a fresh UUID, return theIndexMetadataready forcommit_existing_index_segmentsCoordinator pattern
Result: one
IndexMetadataentry covering every input fragment, not N.Input validation
Because these entry points are now public, they reject malformed or stale input up front instead of writing a segment that silently corrupts query results:
write_consolidated_zonemap_segmentcross-checks the batch's min/max value type against the column's scanned type — re-derived the same waycompute_zonemap_batchdoes, so a legitimately adapted column (dict→primitive, extension unwrap) isn't false-rejected. A mismatched type would otherwise prune matching rows via cross-typeScalarValuecomparisons.write_consolidated_zonemap_segmentrejectsfragment_ids that aren't in the dataset's current fragments, so a stale or foreign batch can't return commit-ready metadata claiming coverage the dataset doesn't have.write_zonemap_index_from_batchvalidates the zone bounds:rows_per_zone != 0(matching the invariantZoneTrainerenforces on the training path),zone_length >= 1, andzone_start + zone_lengthwithin the fragment's 2^32 row-offset space — the read path computes a zone's row range as(fragment_id << 32) + zone_start .. + zone_length, so a start or length outside the fragment would produce wrong row-address ranges. (zone_lengthis the offset spanlast - first + 1, not a row count, so it is intentionally not capped atrows_per_zone— after deletions a zone ofrows_per_zonelive rows can span a wider offset range.)Tests
validate_zonemap_stats_schemacovering each rejection branch and the documented permissiveness on min/max nullability.rows_per_zone, zero-length zone, a span wider thanrows_per_zone(accepted — the post-deletion case), and per-fragment offset overflow.write_zonemap_index_from_batchusingrows_per_zone=4so each fragment contributes multiple zones — the defaultrows_per_zone=8192produces only one zone per fragment and hides multi-zone bugs.write_consolidated_zonemap_segmentcovering fragment-bitmap completeness, schema-derived field id,index_version, and file presence under the freshly written UUID directory.write_consolidated_zonemap_segment: a min/max type that doesn't match the column, and afragment_idoutside the dataset.Additive only — no breaking changes.