diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 8e7e20c211a..50c60bde82c 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -48,9 +48,38 @@ use roaring::RoaringBitmap; use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones}; const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches -const ZONEMAP_FILENAME: &str = "zonemap.lance"; +/// Filename of the on-disk zonemap index file written under `/`. Public because +/// external coordinators (e.g. JNI bindings, custom writers) need to refer to it by the same +/// name the read path expects. +pub const ZONEMAP_FILENAME: &str = "zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; -const ZONEMAP_INDEX_VERSION: u32 = 0; +/// On-disk format version for zonemap indices. Public so external coordinators that +/// assemble an [`IndexMetadata`](lance_table::format::IndexMetadata) for a freshly written +/// consolidated zonemap segment can populate the `index_version` field consistently with +/// what the plugin's `train_index` emits — without duplicating the literal. +pub const ZONEMAP_INDEX_VERSION: u32 = 0; + +/// Canonical schema of a zone-stats record batch. The on-disk `zonemap.lance` file's record +/// layout, the in-memory `ZoneMapIndexBuilder::zonemap_stats_as_batch` output, and any externally- +/// produced batch fed into [`write_zonemap_index_from_batch`] all share this schema. Treat the +/// column names and order as a stability contract: the on-disk reader matches columns by name, +/// and [`validate_zonemap_stats_schema`] enforces full conformance up front so writes whose +/// batch deviates from this shape fail before producing an unloadable file. +/// +/// `min` / `max` are nullable because an entire batch may be all-NULL; the caller supplies the +/// indexed column's type via `value_type`. The remaining metadata columns (`null_count`, +/// `nan_count`, `fragment_id`, `zone_start`, `zone_length`) have fixed types and are non-nullable. +pub fn zonemap_stats_schema(value_type: &DataType) -> Arc { + Arc::new(arrow_schema::Schema::new(vec![ + Field::new("min", value_type.clone(), true), + Field::new("max", value_type.clone(), true), + Field::new("null_count", DataType::UInt32, false), + Field::new("nan_count", DataType::UInt32, false), + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), + ])) +} /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] @@ -765,7 +794,14 @@ impl ZoneMapIndexBuilder { Ok(()) } - fn zonemap_stats_as_batch(&self) -> Result { + /// Drain the in-memory zone statistics into a record batch with the canonical zonemap schema + /// (`min`, `max`, `null_count`, `nan_count`, `fragment_id`, `zone_start`, `zone_length`). + /// + /// Public so external coordinators consolidating per-fragment zone batches from parallel + /// workers can extract the trained state without forcing the in-place file write that + /// [`Self::write_index`] performs. The companion [`write_zonemap_index_from_batch`] free + /// function consumes the same shape. + pub fn zonemap_stats_as_batch(&self) -> Result { // Flush self.maps as a RecordBatch let mins = if self.maps.is_empty() { new_empty_array(&self.items_type) @@ -791,16 +827,7 @@ impl ZoneMapIndexBuilder { let zone_starts = UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.start)); - let schema = Arc::new(arrow_schema::Schema::new(vec![ - // min and max can be null if the entire batch is null values - Field::new("min", self.items_type.clone(), true), - Field::new("max", self.items_type.clone(), true), - Field::new("null_count", DataType::UInt32, false), - Field::new("nan_count", DataType::UInt32, false), - Field::new("fragment_id", DataType::UInt64, false), - Field::new("zone_start", DataType::UInt64, false), - Field::new("zone_length", DataType::UInt64, false), - ])); + let schema = zonemap_stats_schema(&self.items_type); let columns: Vec = vec![ mins, @@ -816,19 +843,169 @@ impl ZoneMapIndexBuilder { pub async fn write_index(self, index_store: &dyn IndexStore) -> Result { let record_batch = self.zonemap_stats_as_batch()?; + write_zonemap_index_from_batch(record_batch, &self.options, index_store).await + } +} - let mut file_schema = record_batch.schema().as_ref().clone(); - file_schema.metadata.insert( - ZONEMAP_SIZE_META_KEY.to_string(), - self.options.rows_per_zone.to_string(), - ); +/// Write a `zonemap.lance` file from a pre-computed zone-stats record batch. +/// +/// The record batch must conform to the canonical zonemap stats schema (see +/// [`zonemap_stats_schema`]). Column names, order, and types are validated up front, and the +/// fixed-type metadata columns are also required to be non-nullable; a mismatched batch returns +/// an `invalid_input` error rather than silently writing a file the read path cannot load. +/// `min` and `max` may carry any data type as long as both share the indexed column's type; +/// their nullability is not constrained (the canonical schema declares them nullable so an +/// all-NULL batch is representable, but a caller whose data has no nulls may pass non-nullable +/// fields). +/// +/// Underlying writer extracted from [`ZoneMapIndexBuilder::write_index`] so external coordinators +/// that consolidate per-fragment zone batches produced by parallel workers can write a single +/// consolidated file without re-running the train phase. `params` is taken by reference rather +/// than `rows_per_zone: u64` so future ZoneMap knobs that affect on-disk metadata can extend +/// the parameter set without breaking this signature. +pub async fn write_zonemap_index_from_batch( + record_batch: RecordBatch, + params: &ZoneMapIndexBuilderParams, + index_store: &dyn IndexStore, +) -> Result { + validate_zonemap_stats_schema(record_batch.schema().as_ref())?; + validate_zone_bounds(&record_batch, params.rows_per_zone())?; + + let mut file_schema = record_batch.schema().as_ref().clone(); + file_schema.metadata.insert( + ZONEMAP_SIZE_META_KEY.to_string(), + params.rows_per_zone().to_string(), + ); + + let mut index_file = index_store + .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) + .await?; + index_file.write_record_batch(record_batch).await?; + index_file.finish().await +} - let mut index_file = index_store - .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) - .await?; - index_file.write_record_batch(record_batch).await?; - index_file.finish().await +/// Validate the per-zone bounds carried in `record_batch` against `rows_per_zone`. +/// +/// [`validate_zonemap_stats_schema`] only checks column shape; the read path's zone-address +/// arithmetic (`(fragment_id << 32) + zone_start .. + zone_length`, see `zoned.rs`) needs more. +/// The caller has already validated the schema, so columns 5/6 are non-null `UInt64`: +/// - `rows_per_zone` must be non-zero — the invariant [`ZoneTrainer::new`] enforces for trained +/// zones; persisting zero produces a zonemap that loads but breaks on a later update/rebuild. +/// - every `zone_length` must be at least 1; it is the offset span `(last - first + 1)`, not the +/// live-row count, so after deletions it can legitimately exceed `rows_per_zone` and is +/// intentionally not capped at the zone capacity here. A zero-length zone would match nothing. +/// - `zone_start + zone_length <= 2^32` — `zone_start` is a within-fragment row offset, so a zone +/// must stay inside its fragment's 32-bit offset space and not spill into the next fragment's +/// row addresses. +fn validate_zone_bounds(record_batch: &RecordBatch, rows_per_zone: u64) -> Result<()> { + if rows_per_zone == 0 { + return Err(Error::invalid_input( + "zonemap rows_per_zone must be greater than zero", + )); } + + let zone_start = record_batch + .column(5) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::invalid_input("zonemap stats batch 'zone_start' is not UInt64"))?; + let zone_length = record_batch + .column(6) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::invalid_input("zonemap stats batch 'zone_length' is not UInt64"))?; + + // Row addresses pack the fragment id into the high 32 bits, leaving 2^32 offsets per fragment. + const FRAGMENT_ROW_SPACE: u64 = 1 << 32; + for i in 0..record_batch.num_rows() { + let start = zone_start.value(i); + let length = zone_length.value(i); + if length == 0 { + return Err(Error::invalid_input(format!( + "zonemap zone {i} has zone_length 0; every zone must cover at least one row" + ))); + } + if start >= FRAGMENT_ROW_SPACE || start + length > FRAGMENT_ROW_SPACE { + return Err(Error::invalid_input(format!( + "zonemap zone {i} row range [{start}, {start} + {length}) overflows the \ + per-fragment 2^32 row-offset space" + ))); + } + } + Ok(()) +} + +/// Validate that `schema` matches the canonical zonemap stats shape: 7 columns in canonical +/// order, the metadata columns at fixed types and non-nullable, and `min`/`max` sharing one +/// (caller-determined) data type. Public so coordinators can validate locally before calling +/// [`write_zonemap_index_from_batch`] (which would discover the mismatch only after opening +/// the output index file). +pub fn validate_zonemap_stats_schema(schema: &arrow_schema::Schema) -> Result<()> { + let fields = schema.fields(); + const EXPECTED_NAMES: [&str; 7] = [ + "min", + "max", + "null_count", + "nan_count", + "fragment_id", + "zone_start", + "zone_length", + ]; + if fields.len() != EXPECTED_NAMES.len() { + return Err(Error::invalid_input(format!( + "zonemap stats batch has {} columns, expected {}: {:?}", + fields.len(), + EXPECTED_NAMES.len(), + EXPECTED_NAMES + ))); + } + for (i, expected) in EXPECTED_NAMES.iter().enumerate() { + if fields[i].name() != expected { + return Err(Error::invalid_input(format!( + "zonemap stats batch column {} is '{}', expected '{}'", + i, + fields[i].name(), + expected + ))); + } + } + + // min and max: both share the indexed column type (caller-determined); both nullable + // (an all-NULL batch is legitimate). + if fields[0].data_type() != fields[1].data_type() { + return Err(Error::invalid_input(format!( + "zonemap stats batch 'min' type {:?} != 'max' type {:?}", + fields[0].data_type(), + fields[1].data_type() + ))); + } + + // Fixed-type non-nullable metadata columns. Allowing nullable here would let null values + // silently coerce to 0 in the read path's UInt{32,64}Array::value() calls, producing + // corrupted zone stats with no error. + fn check_metadata_field(field: &Field, expected_ty: &DataType) -> Result<()> { + if field.data_type() != expected_ty { + return Err(Error::invalid_input(format!( + "zonemap stats batch column '{}' has type {:?}, expected {:?}", + field.name(), + field.data_type(), + expected_ty + ))); + } + if field.is_nullable() { + return Err(Error::invalid_input(format!( + "zonemap stats batch column '{}' must be non-nullable", + field.name() + ))); + } + Ok(()) + } + check_metadata_field(&fields[2], &DataType::UInt32)?; // null_count + check_metadata_field(&fields[3], &DataType::UInt32)?; // nan_count + check_metadata_field(&fields[4], &DataType::UInt64)?; // fragment_id + check_metadata_field(&fields[5], &DataType::UInt64)?; // zone_start + check_metadata_field(&fields[6], &DataType::UInt64)?; // zone_length + Ok(()) } /// Index-specific processor that computes min/max statistics for each zone while the @@ -2636,4 +2813,157 @@ mod tests { // All max characters assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None); } + + /// Build a schema with the canonical column ordering, then mutate via `mutate` and return. + /// Used by the validate_zonemap_stats_schema rejection-branch tests below. + fn canonical_with( + value_type: DataType, + mutate: impl FnOnce(Vec) -> Vec, + ) -> Schema { + let canonical = super::zonemap_stats_schema(&value_type); + let fields = canonical + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect(); + Schema::new(mutate(fields)) + } + + #[test] + fn test_validate_schema_accepts_canonical() { + let schema = super::zonemap_stats_schema(&DataType::Int32); + super::validate_zonemap_stats_schema(schema.as_ref()) + .expect("canonical schema must validate"); + } + + #[test] + fn test_validate_schema_rejects_wrong_column_count() { + let schema = Schema::new(vec![Field::new("min", DataType::Int32, true)]); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("schema with wrong column count must be rejected"); + assert!(err.to_string().contains("expected 7"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_wrong_name() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[2] = Field::new("nullCount", DataType::UInt32, false); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("schema with renamed metadata column must be rejected"); + assert!(err.to_string().contains("'nullCount'"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_min_max_type_mismatch() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[1] = Field::new("max", DataType::Int64, true); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("min/max with different types must be rejected"); + assert!(err.to_string().contains("'min' type"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_wrong_metadata_type() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[4] = Field::new("fragment_id", DataType::UInt32, false); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("metadata column with wrong type must be rejected"); + assert!(err.to_string().contains("fragment_id"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_nullable_metadata() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[2] = Field::new("null_count", DataType::UInt32, true); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("nullable metadata column must be rejected"); + assert!(err.to_string().contains("non-nullable"), "got: {}", err); + } + + /// The validator's doc claims min/max nullability is not constrained (an all-NULL batch + /// is legitimate, but a caller producing a batch where the column happens to have no NULLs + /// may also declare those fields non-nullable). This test locks in that contract — both + /// the all-nullable canonical and a non-nullable-min/max variant must validate. + #[test] + fn test_validate_schema_accepts_non_nullable_min_max() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[0] = Field::new("min", DataType::Int32, false); + f[1] = Field::new("max", DataType::Int32, false); + f + }); + super::validate_zonemap_stats_schema(&schema) + .expect("non-nullable min/max with matching types must validate"); + } + + /// Build a single-zone canonical stats batch with the given zone bounds, for the + /// `validate_zone_bounds` rejection tests below. + fn one_zone_stats_batch(zone_start: u64, zone_length: u64) -> RecordBatch { + let schema = super::zonemap_stats_schema(&DataType::Int32); + RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::Int32Array::from(vec![Some(0)])), + Arc::new(arrow_array::Int32Array::from(vec![Some(10)])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(UInt64Array::from(vec![0u64])), + Arc::new(UInt64Array::from(vec![zone_start])), + Arc::new(UInt64Array::from(vec![zone_length])), + ], + ) + .unwrap() + } + + #[test] + fn test_validate_zone_bounds_accepts_valid() { + let batch = one_zone_stats_batch(0, 8); + super::validate_zone_bounds(&batch, 8).expect("valid zone bounds must pass"); + } + + #[test] + fn test_validate_zone_bounds_rejects_zero_rows_per_zone() { + let batch = one_zone_stats_batch(0, 8); + let err = + super::validate_zone_bounds(&batch, 0).expect_err("rows_per_zone 0 must be rejected"); + assert!( + err.to_string().contains("greater than zero"), + "got: {}", + err + ); + } + + #[test] + fn test_validate_zone_bounds_rejects_zero_length() { + let batch = one_zone_stats_batch(0, 0); + let err = + super::validate_zone_bounds(&batch, 8).expect_err("zero-length zone must be rejected"); + assert!(err.to_string().contains("zone_length 0"), "got: {}", err); + } + + #[test] + fn test_validate_zone_bounds_accepts_span_exceeding_rows_per_zone() { + // zone_length is an offset span (last - first + 1), not a live-row count: after deletions + // a zone of `rows_per_zone` live rows can span a wider offset range, so a length above + // rows_per_zone is legitimate and must NOT be rejected (regression guard). + let batch = one_zone_stats_batch(0, 16383); + super::validate_zone_bounds(&batch, 8192) + .expect("a zone span wider than rows_per_zone (post-deletion) must be accepted"); + } + + #[test] + fn test_validate_zone_bounds_rejects_fragment_offset_overflow() { + // zone_start + zone_length must stay within the 2^32 per-fragment row-offset space. + let batch = one_zone_stats_batch((1u64 << 32) - 4, 8); + let err = super::validate_zone_bounds(&batch, 8) + .expect_err("zone spilling past 2^32 must be rejected"); + assert!(err.to_string().contains("overflows"), "got: {}", err); + } } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index ae2478589fb..e3bacfb34b2 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -234,6 +234,186 @@ pub(crate) async fn load_training_data( } } +/// Compute the zone-stats record batch for a column over the given fragments WITHOUT writing +/// the result to a file. +/// +/// Companion to [`lance_index::scalar::zonemap::write_zonemap_index_from_batch`]. Together they +/// support a build-time-consolidation pipeline: parallel workers each call this for a fragment +/// subset, the resulting batches are concatenated by a coordinator, and one consolidated +/// `zonemap.lance` is written. Compared to the standard `build_scalar_index` flow — which trains +/// AND writes a per-fragment file — returning the in-memory batch lets the coordinator avoid the +/// per-segment read-time round-trips that scale linearly with fragment count. +/// +/// `fragment_ids = None` indexes every fragment in the dataset, mirroring `build_scalar_index`. +pub async fn compute_zonemap_batch( + dataset: &Dataset, + column: &str, + fragment_ids: Option>, + params: lance_index::scalar::zonemap::ZoneMapIndexBuilderParams, +) -> Result { + // Validate the column exists up front for a clear error; the value type itself is taken + // from the post-scan stream below, NOT from the dataset schema. This matches the plugin's + // train_zonemap_index path: scan-time type adaptation (dictionary -> primitive, extension + // type unwrap, nullability changes) means the dataset schema and the actual data stream + // can disagree, and the builder must be configured for the latter. + if dataset.schema().field(column).is_none() { + return Err(Error::invalid_input_source( + format!("No column with name {}", column).into(), + )); + } + + // ZoneMap requires row-address ordering during scan so per-zone bounds correspond to + // contiguous physical row ranges (the same TrainingCriteria the plugin's TrainingRequest + // sets up internally — see ZoneMapIndexTrainingRequest::new). + let criteria = TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(); + + let training_data = + load_training_data(dataset, column, &criteria, None, true, fragment_ids).await?; + + // Derive value_type from the actual stream schema, matching ZoneMapIndexPlugin's + // train_zonemap_index. The first field is the scanned column (subsequent fields like + // _rowaddr are training-criteria additions). + let value_type = training_data.schema().field(0).data_type().clone(); + + let mut builder = + lance_index::scalar::zonemap::ZoneMapIndexBuilder::try_new(params, value_type)?; + builder.train(training_data).await?; + builder.zonemap_stats_as_batch() +} + +/// Driver-side companion to [`compute_zonemap_batch`]: take a pre-computed (typically +/// coordinator-concatenated) zone-stats batch and persist it as a single uncommitted zonemap +/// index segment, returning the [`IndexMetadata`] that the caller can later commit via +/// `Dataset::commit_existing_index_segments`. +/// +/// This is the path that completes build-time consolidation: parallel workers each produce a +/// per-fragment-subset batch via `compute_zonemap_batch`; the coordinator concatenates them +/// and hands the result here. The output is one `zonemap.lance` file under a fresh UUID-named +/// directory in the dataset's `indices/` tree, and an `IndexMetadata` whose `fragment_bitmap` +/// is the union of every fragment id appearing in the batch's `fragment_id` column. No +/// manifest write happens here — that is `commit_existing_index_segments`' job. +/// +/// `batch` must conform to [`zonemap_stats_schema`](lance_index::scalar::zonemap::zonemap_stats_schema) +/// (this is enforced by the inner writer). +pub async fn write_consolidated_zonemap_segment( + dataset: &Dataset, + name: &str, + column: &str, + batch: arrow_array::RecordBatch, + params: &lance_index::scalar::zonemap::ZoneMapIndexBuilderParams, +) -> Result { + use arrow_array::cast::AsArray; + use arrow_array::types::UInt64Type; + use lance_index::scalar::IndexStore; + use lance_index::scalar::zonemap::{ZONEMAP_INDEX_VERSION, write_zonemap_index_from_batch}; + use roaring::RoaringBitmap; + use uuid::Uuid; + + // Validate the indexed column exists in the dataset schema and capture its field id for + // IndexMetadata.fields. + let field = dataset.schema().field(column).ok_or_else(|| { + Error::invalid_input_source(format!("No column with name {}", column).into()) + })?; + let field_id = field.id; + + // Cross-check the batch's min/max value type against the type the read path compares query + // values against. That is the *scanned* column type (after dict→primitive / extension unwrap + // / nullability adaptation), so we re-derive it exactly as compute_zonemap_batch does — from + // the training stream's schema — rather than from the raw dataset schema, which can + // legitimately differ. A batch whose min/max type disagrees would silently prune matching + // rows at query time because cross-type ScalarValue comparisons return false. + let criteria = TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(); + let expected_value_type = load_training_data(dataset, column, &criteria, None, true, None) + .await? + .schema() + .field(0) + .data_type() + .clone(); + let batch_value_type = batch + .schema() + .field_with_name("min") + .map_err(|_| { + Error::invalid_input_source("consolidated zonemap batch missing 'min' column".into()) + })? + .data_type() + .clone(); + if batch_value_type != expected_value_type { + return Err(Error::invalid_input_source( + format!( + "consolidated zonemap batch min/max type {:?} does not match indexed column '{}' \ + scan type {:?}", + batch_value_type, column, expected_value_type + ) + .into(), + )); + } + + // Derive fragment bitmap from the batch's fragment_id column BEFORE consuming the batch + // in write_zonemap_index_from_batch. The schema validator inside the writer will reject a + // missing column, but we need values here to build the bitmap, so a missing column shows + // up as a clearer error than the inner validator's "expected column 5 to be ..." message. + let frag_col = batch.column_by_name("fragment_id").ok_or_else(|| { + Error::invalid_input_source( + "consolidated zonemap batch missing 'fragment_id' column".into(), + ) + })?; + let frag_array = frag_col.as_primitive_opt::().ok_or_else(|| { + Error::invalid_input_source( + "consolidated zonemap batch 'fragment_id' must be UInt64".into(), + ) + })?; + // The dataset's current fragment ids. A stale or foreign batch referencing fragments outside + // this dataset version must be rejected rather than yield commit-ready metadata claiming + // coverage the dataset does not actually have. + let mut dataset_fragments = RoaringBitmap::new(); + for frag in dataset.get_fragments() { + dataset_fragments.insert(frag.id() as u32); + } + + let mut fragment_bitmap = RoaringBitmap::new(); + for f in frag_array.values() { + if *f > u32::MAX as u64 { + return Err(Error::invalid_input_source( + format!("fragment_id {} exceeds u32::MAX", f).into(), + )); + } + let frag_id = *f as u32; + if !dataset_fragments.contains(frag_id) { + return Err(Error::invalid_input_source( + format!( + "consolidated zonemap batch references fragment {} not present in dataset \ + version {}", + f, + dataset.version_id() + ) + .into(), + )); + } + fragment_bitmap.insert(frag_id); + } + + let uuid = Uuid::new_v4(); + let index_store = LanceIndexStore::from_dataset_for_new(dataset, &uuid)?; + write_zonemap_index_from_batch(batch, params, &index_store).await?; + + let index_details = + prost_types::Any::from_msg(&lance_index::pbold::ZoneMapIndexDetails::default()) + .map_err(|e| Error::internal(format!("failed to encode ZoneMapIndexDetails: {}", e)))?; + + Ok(IndexMetadata { + uuid, + fields: vec![field_id], + name: name.to_string(), + dataset_version: dataset.version_id(), + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(index_details)), + index_version: ZONEMAP_INDEX_VERSION as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(index_store.list_files_with_sizes().await?), + }) +} + // TODO: Allow users to register their own plugins static SCALAR_INDEX_PLUGIN_REGISTRY: LazyLock> = LazyLock::new(IndexPluginRegistry::with_default_plugins); @@ -1038,6 +1218,266 @@ mod tests { } } + #[tokio::test] + async fn test_compute_zonemap_batch_round_trip() { + // Round-trip the new build-time-consolidation API surface: + // 1. Per-fragment compute_zonemap_batch produces conformant batches + // 2. Concatenated batches feed write_zonemap_index_from_batch successfully + // 3. The resulting zonemap.lance is readable via IndexStore::open_index_file with + // the canonical schema preserved + // 4. Read-back fragment_id column equals the union of input fragment ids + use arrow::compute::concat_batches; + use lance_index::scalar::IndexStore; + use lance_index::scalar::lance_format::LanceIndexStore; + use lance_index::scalar::zonemap::{ + ZONEMAP_FILENAME, ZoneMapIndexBuilderParams, validate_zonemap_stats_schema, + write_zonemap_index_from_batch, zonemap_stats_schema, + }; + + // 4 fragments × 10 rows. We deliberately pick rows_per_zone=4 (not the default) so each + // fragment spans multiple zones (10/4 = 3 zones — two full + one trailing). This + // exercises the path that matters for the consolidation claim: a zonemap batch where + // fragment_id repeats across consecutive rows. A previous version of this test used the + // default rows_per_zone (8192), which only ever produced one zone per fragment — a + // pathological case that hides per-fragment-multi-zone bugs. + const ROWS_PER_FRAG: u64 = 10; + const ROWS_PER_ZONE: u64 = 4; + let zones_per_frag = ROWS_PER_FRAG.div_ceil(ROWS_PER_ZONE) as usize; // 3 + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset( + FragmentCount::from(4), + FragmentRowCount::from(ROWS_PER_FRAG as u32), + ) + .await + .unwrap(); + + let params = ZoneMapIndexBuilderParams::new(ROWS_PER_ZONE); + + // 1. Compute per-fragment-subset batches. + let batch_0_1 = + compute_zonemap_batch(&dataset, "values", Some(vec![0u32, 1]), params.clone()) + .await + .unwrap(); + let batch_2_3 = + compute_zonemap_batch(&dataset, "values", Some(vec![2u32, 3]), params.clone()) + .await + .unwrap(); + + // Each batch must validate against the canonical schema. + validate_zonemap_stats_schema(batch_0_1.schema().as_ref()).unwrap(); + validate_zonemap_stats_schema(batch_2_3.schema().as_ref()).unwrap(); + + // 2. Concatenate. The canonical schema with the actual value type is the join target. + let canonical = zonemap_stats_schema(&DataType::Int32); + let concatenated = concat_batches(&canonical, [&batch_0_1, &batch_2_3]).unwrap(); + + // 3. Write a consolidated zonemap.lance. + let test_dir = TempStrDir::default(); + let object_store = Arc::new(lance_io::object_store::ObjectStore::local()); + let index_dir = object_store::path::Path::parse(test_dir.as_str()).unwrap(); + let store = LanceIndexStore::new( + object_store.clone(), + index_dir.clone(), + Arc::new(lance_core::cache::LanceCache::no_cache()), + ); + write_zonemap_index_from_batch(concatenated.clone(), ¶ms, &store) + .await + .unwrap(); + + // 4. The written file should round-trip read with the same schema and row count, AND + // the fragment-id column should produce the exact multiset of fragment ids from the + // inputs. We compare counts (not just set membership) so a regression that accidentally + // duplicated a fragment row — same union, wrong cardinality — would fail loudly. + let read_back = store.open_index_file(ZONEMAP_FILENAME).await.unwrap(); + let read_batch = read_back + .read_range(0..read_back.num_rows(), None) + .await + .unwrap(); + assert_eq!( + read_batch.num_rows(), + concatenated.num_rows(), + "round-trip read row count must match consolidated batch" + ); + validate_zonemap_stats_schema(read_batch.schema().as_ref()).unwrap(); + let mut frag_counts: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + for fid in read_batch + .column_by_name("fragment_id") + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + { + *frag_counts.entry(fid).or_insert(0) += 1; + } + // With ROWS_PER_FRAG=10 and ROWS_PER_ZONE=4 each fragment contributes + // ceil(10/4) = 3 zones; total 4 × 3 = 12 zones across the consolidated batch. + let expected: std::collections::BTreeMap = + (0u64..4u64).map(|f| (f, zones_per_frag)).collect(); + assert_eq!( + frag_counts, expected, + "consolidated zonemap must contain ceil(ROWS_PER_FRAG/ROWS_PER_ZONE) zones per \ + input fragment" + ); + assert_eq!( + read_batch.num_rows(), + (zones_per_frag * 4), + "consolidated batch total zone count must equal zones_per_frag × num_fragments" + ); + } + + #[tokio::test] + async fn test_write_consolidated_zonemap_segment_end_to_end() { + // End-to-end check of the driver-side helper: per-fragment compute → concat → write → + // returned IndexMetadata captures (a) every input fragment in the bitmap, (b) the + // correct field id for the indexed column, (c) the canonical ZoneMap index_version + // and (d) at least one file entry under the freshly allocated UUID directory. + use crate::index::scalar::{compute_zonemap_batch, write_consolidated_zonemap_segment}; + use arrow::compute::concat_batches; + use lance_index::scalar::zonemap::{ZONEMAP_INDEX_VERSION, ZoneMapIndexBuilderParams}; + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(10)) + .await + .unwrap(); + + let params = ZoneMapIndexBuilderParams::new(4); + let batch_0_1 = + compute_zonemap_batch(&dataset, "values", Some(vec![0u32, 1]), params.clone()) + .await + .unwrap(); + let batch_2_3 = + compute_zonemap_batch(&dataset, "values", Some(vec![2u32, 3]), params.clone()) + .await + .unwrap(); + let canonical = lance_index::scalar::zonemap::zonemap_stats_schema(&DataType::Int32); + let concatenated = concat_batches(&canonical, [&batch_0_1, &batch_2_3]).unwrap(); + + let metadata = write_consolidated_zonemap_segment( + &dataset, + "values_zm", + "values", + concatenated, + ¶ms, + ) + .await + .unwrap(); + + assert_eq!(metadata.name, "values_zm"); + assert_eq!(metadata.index_version as u32, ZONEMAP_INDEX_VERSION); + assert_eq!(metadata.dataset_version, dataset.version_id()); + assert!(metadata.created_at.is_some()); + + let field_id = dataset.schema().field("values").unwrap().id; + assert_eq!(metadata.fields, vec![field_id]); + + let bitmap = metadata.fragment_bitmap.expect("bitmap must be set"); + let frags: Vec = bitmap.iter().collect(); + assert_eq!( + frags, + vec![0, 1, 2, 3], + "bitmap must cover every fragment in the input batches" + ); + + // The writer should have produced at least zonemap.lance under the new UUID. + let files = metadata + .files + .expect("files must be populated for a freshly written segment"); + assert!( + files.iter().any(|f| f + .path + .ends_with(lance_index::scalar::zonemap::ZONEMAP_FILENAME)), + "freshly written segment must contain {}; got {:?}", + lance_index::scalar::zonemap::ZONEMAP_FILENAME, + files.iter().map(|f| &f.path).collect::>() + ); + } + + #[tokio::test] + async fn test_write_consolidated_zonemap_rejects_value_type_mismatch() { + // A batch whose min/max type disagrees with the indexed column's scanned type must be + // rejected — otherwise cross-type comparisons silently prune matching rows at query time. + use crate::index::scalar::write_consolidated_zonemap_segment; + use lance_index::scalar::zonemap::{ZoneMapIndexBuilderParams, zonemap_stats_schema}; + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(10)) + .await + .unwrap(); + let params = ZoneMapIndexBuilderParams::new(4); + + // Structurally valid batch, but min/max are Int64 while "values" scans as Int32. + let schema = zonemap_stats_schema(&DataType::Int64); + let bad = arrow_array::RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::Int64Array::from(vec![Some(0i64)])), + Arc::new(arrow_array::Int64Array::from(vec![Some(10i64)])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(arrow_array::UInt64Array::from(vec![0u64])), + Arc::new(arrow_array::UInt64Array::from(vec![0u64])), + Arc::new(arrow_array::UInt64Array::from(vec![4u64])), + ], + ) + .unwrap(); + + let err = write_consolidated_zonemap_segment(&dataset, "values_zm", "values", bad, ¶ms) + .await + .expect_err("min/max type mismatch must be rejected"); + assert!( + err.to_string().contains("does not match indexed column"), + "got: {}", + err + ); + } + + #[tokio::test] + async fn test_write_consolidated_zonemap_rejects_foreign_fragment() { + // A batch referencing a fragment outside the dataset must be rejected rather than yield + // commit-ready metadata claiming coverage the dataset does not actually have. + use crate::index::scalar::write_consolidated_zonemap_segment; + use lance_index::scalar::zonemap::{ZoneMapIndexBuilderParams, zonemap_stats_schema}; + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(10)) + .await + .unwrap(); + let params = ZoneMapIndexBuilderParams::new(4); + + // Correct value type, but fragment_id 99 does not exist in the 4-fragment dataset. + let schema = zonemap_stats_schema(&DataType::Int32); + let foreign = arrow_array::RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::Int32Array::from(vec![Some(0)])), + Arc::new(arrow_array::Int32Array::from(vec![Some(10)])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(arrow_array::UInt32Array::from(vec![0u32])), + Arc::new(arrow_array::UInt64Array::from(vec![99u64])), + Arc::new(arrow_array::UInt64Array::from(vec![0u64])), + Arc::new(arrow_array::UInt64Array::from(vec![4u64])), + ], + ) + .unwrap(); + + let err = + write_consolidated_zonemap_segment(&dataset, "values_zm", "values", foreign, ¶ms) + .await + .expect_err("foreign fragment must be rejected"); + assert!( + err.to_string().contains("not present in dataset"), + "got: {}", + err + ); + } + #[tokio::test] async fn test_load_training_data_addr_sort() { // Create test data using lance_datagen