diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d338ff0fc4b..fa4ce383fb9 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -3646,3 +3646,276 @@ fn inner_get_zonemap_stats<'local>( Ok(array_list) } + +///////////////////////////////////////////// +// Build-time consolidation: compute batch // +///////////////////////////////////////////// + +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeComputeZonemapBatch( + mut env: JNIEnv, + java_dataset: JObject, + jcolumn_name: JString, + jfragment_ids: JObject, + jparams_json: JString, + array_addr: jlong, + schema_addr: jlong, +) { + ok_or_throw_without_return!( + env, + inner_compute_zonemap_batch( + &mut env, + java_dataset, + jcolumn_name, + jfragment_ids, + jparams_json, + array_addr, + schema_addr, + ) + ) +} + +fn inner_compute_zonemap_batch( + env: &mut JNIEnv<'_>, + java_dataset: JObject, + jcolumn_name: JString, + jfragment_ids: JObject, + jparams_json: JString, + array_addr: jlong, + schema_addr: jlong, +) -> Result<()> { + use arrow::array::StructArray; + use arrow::ffi::FFI_ArrowArray; + use arrow_array::Array; + use jni::objects::JLongArray; + use lance::index::scalar::compute_zonemap_batch; + use lance_index::scalar::zonemap::ZoneMapIndexBuilderParams; + + let column_name: String = jcolumn_name.extract(env)?; + let params_json: String = jparams_json.extract(env)?; + + // Empty/missing params JSON means "use defaults". Mirrors the Rust plugin's + // unwrap_or_default() so callers without configuration overrides don't have to + // hand-serialize a default object. + let params: ZoneMapIndexBuilderParams = if params_json.is_empty() { + ZoneMapIndexBuilderParams::default() + } else { + serde_json::from_str(¶ms_json) + .map_err(|e| Error::input_error(format!("Invalid zonemap params JSON: {}", e)))? + }; + + // null fragment_ids array means "all fragments" (matches the Rust Option> + // contract where None covers every fragment). + let fragment_ids: Option> = if jfragment_ids.is_null() { + None + } else { + let jarr = JLongArray::from(jfragment_ids); + let len = env.get_array_length(&jarr)? as usize; + let mut buf = vec![0i64; len]; + env.get_long_array_region(&jarr, 0, &mut buf)?; + // Lance fragment ids are u32 on disk; reject negative values from Java rather + // than silently wrapping. + let ids = buf + .into_iter() + .map(|v| { + if v < 0 || v > u32::MAX as i64 { + Err(Error::input_error(format!( + "fragment id {} out of u32 range", + v + ))) + } else { + Ok(v as u32) + } + }) + .collect::>>()?; + Some(ids) + }; + + let dataset = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + dataset_guard.inner.clone() + }; + + let batch = RT + .block_on(async move { + compute_zonemap_batch(&dataset, &column_name, fragment_ids, params).await + }) + .map_err(Error::from)?; + + // Export to the Java-allocated FFI buffers. The Java consumer reconstructs a + // VectorSchemaRoot via Data.importVectorSchemaRoot(allocator, ArrowArray, ArrowSchema, null), + // which expects the FFI array to be a struct array over the batch's columns and the + // FFI schema to describe the batch's top-level Schema. + let schema = batch.schema(); + let struct_array: StructArray = batch.into(); + let array_data = struct_array.into_data(); + let ffi_array = FFI_ArrowArray::new(&array_data); + let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?; + + unsafe { + std::ptr::write_unaligned(array_addr as *mut FFI_ArrowArray, ffi_array); + std::ptr::write_unaligned(schema_addr as *mut FFI_ArrowSchema, ffi_schema); + } + Ok(()) +} + +///////////////////////////////////////////// +// Build-time consolidation: write segment // +///////////////////////////////////////////// + +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeWriteZonemapIndexFromBatches<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject<'local>, + jindex_name: JString, + jcolumn_name: JString, + jarray_addrs: JObject<'local>, + jschema_addrs: JObject<'local>, + jparams_json: JString, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_write_zonemap_index_from_batches( + &mut env, + java_dataset, + jindex_name, + jcolumn_name, + jarray_addrs, + jschema_addrs, + jparams_json, + ) + ) +} + +fn inner_write_zonemap_index_from_batches<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject<'local>, + jindex_name: JString, + jcolumn_name: JString, + jarray_addrs: JObject<'local>, + jschema_addrs: JObject<'local>, + jparams_json: JString, +) -> Result> { + use arrow::array::{RecordBatch, StructArray}; + use arrow::compute::concat_batches; + use arrow::ffi::{FFI_ArrowArray, from_ffi_and_data_type}; + use jni::objects::JLongArray; + use lance::index::scalar::write_consolidated_zonemap_segment; + use lance_index::scalar::zonemap::ZoneMapIndexBuilderParams; + + let index_name: String = jindex_name.extract(env)?; + let column_name: String = jcolumn_name.extract(env)?; + let params_json: String = jparams_json.extract(env)?; + + let params: ZoneMapIndexBuilderParams = if params_json.is_empty() { + ZoneMapIndexBuilderParams::default() + } else { + serde_json::from_str(¶ms_json) + .map_err(|e| Error::input_error(format!("Invalid zonemap params JSON: {}", e)))? + }; + + if jarray_addrs.is_null() || jschema_addrs.is_null() { + return Err(Error::input_error( + "arrayAddrs / schemaAddrs must not be null".to_string(), + )); + } + let jarrs = JLongArray::from(jarray_addrs); + let jschs = JLongArray::from(jschema_addrs); + let n_batches = env.get_array_length(&jarrs)? as usize; + let n_schemas = env.get_array_length(&jschs)? as usize; + if n_batches != n_schemas { + return Err(Error::input_error(format!( + "arrayAddrs length {} != schemaAddrs length {}", + n_batches, n_schemas + ))); + } + if n_batches == 0 { + return Err(Error::input_error( + "must provide at least one worker batch".to_string(), + )); + } + let mut array_buf = vec![0i64; n_batches]; + let mut schema_buf = vec![0i64; n_batches]; + env.get_long_array_region(&jarrs, 0, &mut array_buf)?; + env.get_long_array_region(&jschs, 0, &mut schema_buf)?; + + // Reconstruct each Arrow batch from the caller-allocated FFI buffers. `FFI_ArrowArray:: + // from_raw` moves out of the raw pointer's contents into an owned Rust value and replaces + // the source location with an empty (release-pointer null) struct. When the Rust value + // drops at end-of-scope its destructor fires the original release callback; Java's + // try-with-resources later sees a null release pointer and does nothing — single + // invocation, no double-release. This mirrors the inner_write_batch ownership contract in + // file_writer.rs. + let mut batches: Vec = Vec::with_capacity(n_batches); + for i in 0..n_batches { + let arr_ptr = array_buf[i] as *mut FFI_ArrowArray; + let sch_ptr = schema_buf[i] as *mut FFI_ArrowSchema; + if arr_ptr.is_null() || sch_ptr.is_null() { + return Err(Error::input_error(format!( + "null FFI pointer at index {}", + i + ))); + } + let c_array = unsafe { FFI_ArrowArray::from_raw(arr_ptr) }; + let c_schema = unsafe { FFI_ArrowSchema::from_raw(sch_ptr) }; + let data_type = DataType::try_from(&c_schema)?; + let array_data = unsafe { from_ffi_and_data_type(c_array, data_type) }?; + batches.push(RecordBatch::from(StructArray::from(array_data))); + } + + let dataset = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + dataset_guard.inner.clone() + }; + + // Use the FIRST worker batch's schema as the concat join target. compute_zonemap_batch + // derives min/max value_type from the post-scan training stream (dictionary→primitive + // adaptation, extension-type unwrap, etc.) rather than from `dataset.schema()`, so the + // dataset-schema-derived type can differ from what workers actually emit. Worker batches + // come from the same scan path on every node, so their schemas are mutually consistent. + // Fall back to zonemap_stats_schema(dataset.schema()-derived value_type) only when no + // worker exists, which the up-front non-empty check has already ruled out. + let canonical_schema = batches[0].schema(); + // Defence-in-depth: surface a clear error if a worker emitted a divergent schema + // (e.g. a producer that bypassed compute_zonemap_batch and hand-constructed batches). + // Use strict equality, not Schema::contains — the latter is an asymmetric superset + // check that would let metadata-only mismatches slip through and then surface as a + // less helpful concat_batches error deeper in the stack. + for (i, b) in batches.iter().enumerate().skip(1) { + if b.schema().as_ref() != canonical_schema.as_ref() { + return Err(Error::input_error(format!( + "worker batch {} schema {:?} differs from batch 0 schema {:?}", + i, + b.schema().as_ref(), + canonical_schema.as_ref(), + ))); + } + } + // write_consolidated_zonemap_segment validates `column_name` exists in the dataset schema + // as its first action — no UUID directory has been allocated and no zonemap.lance has + // been written when that check fires. Duplicating the check here would not save any + // observable work, so we let the helper handle it. + let concatenated = concat_batches(&canonical_schema, batches.iter()).map_err(|e| { + Error::input_error(format!( + "failed to concatenate worker batches against canonical schema: {}", + e + )) + })?; + + let metadata = RT + .block_on(async { + write_consolidated_zonemap_segment( + &dataset, + &index_name, + &column_name, + concatenated, + ¶ms, + ) + .await + }) + .map_err(Error::from)?; + + (&metadata).into_java(env) +} diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 4943962da44..706624fe5b6 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -38,12 +38,14 @@ import org.lance.schema.SqlExpressions; import org.lance.util.JsonUtils; +import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.pojo.Field; @@ -1491,6 +1493,233 @@ public List getZonemapStats(String columnName) { private native List nativeGetZonemapStats(String columnName); + /** + * Compute the zonemap stats batch for a column over the given fragments WITHOUT writing it to + * disk. Worker-side entry point for build-time consolidation: a coordinator (e.g. a Spark driver) + * fans this call out across worker tasks with disjoint fragment id subsets, then concatenates the + * returned batches and writes a single consolidated zonemap index, bypassing the per-segment + * commit shape that scales poorly with fragment count at plan time. + * + * @param columnName the column to scan + * @param fragmentIds fragment ids to include, or {@code null} for every fragment in the dataset. + * An explicit empty array ({@code new long[0]}) is rejected — pass {@code null} when you mean + * "every fragment", and a non-empty array when you mean a specific subset. The all-fragments + * and zero-fragments cases mean different things and confusing them is almost always a + * coordinator bug. + * @param paramsJson serialized {@code ZoneMapIndexBuilderParams} (e.g. {@code + * "{\"rows_per_zone\": 8192}"}) or empty string for defaults + * @param allocator buffer allocator that will own the returned VectorSchemaRoot + * @return a VectorSchemaRoot containing zone records (one row per zone) over the requested + * fragments, with the canonical zonemap stats schema ({@code min}, {@code max}, {@code + * null_count}, {@code nan_count}, {@code fragment_id}, {@code zone_start}, {@code + * zone_length}). Caller owns and must close. + */ + public VectorSchemaRoot computeZonemapBatch( + String columnName, long[] fragmentIds, String paramsJson, BufferAllocator allocator) { + Preconditions.checkArgument( + columnName != null && !columnName.isEmpty(), "columnName cannot be null or empty"); + Preconditions.checkArgument( + fragmentIds == null || fragmentIds.length > 0, + "fragmentIds must be null (all fragments) or non-empty; got an empty array"); + Preconditions.checkNotNull(allocator, "allocator cannot be null"); + String params = paramsJson == null ? "" : paramsJson; + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + // Track each handle inside the try block so an OOM on the second allocation cannot + // strand the first one unreleased. Previously the second allocateNew() sat outside any + // exception handler — the same anti-pattern R21 fixed in writeZonemapIndexFromBatches. + ArrowSchema arrowSchema = null; + ArrowArray arrowArray = null; + boolean importedOk = false; + try { + arrowSchema = ArrowSchema.allocateNew(allocator); + arrowArray = ArrowArray.allocateNew(allocator); + nativeComputeZonemapBatch( + columnName, + fragmentIds, + params, + arrowArray.memoryAddress(), + arrowSchema.memoryAddress()); + VectorSchemaRoot root = + Data.importVectorSchemaRoot(allocator, arrowArray, arrowSchema, null); + importedOk = true; + return root; + } finally { + // Cleanup contract notes: + // - ArrowSchema: Data.importVectorSchemaRoot internally calls importField, whose own + // try/finally always calls release() + close() on the schema — including on the + // failure path. So we MUST NOT call schema.release() ourselves; doing so on the + // failure path would call memoryAddress() on a wrapper with data=null and throw + // NullPointerException ("ArrowArray is already closed" — Arrow Java's + // Preconditions.checkNotNull message), masking the underlying exception. close() + // alone is idempotent (no-op if already closed), so it's safe to call defensively + // in case native threw before the import step ran. + // - ArrowArray: the import consumes it on success (release callback nulled by + // from_raw, struct closed by the consumer). On a partial failure inside + // importIntoVectorSchemaRoot the array may still carry a live producer callback; + // release()-then-close() fires it. If the array was already consumed and closed, + // release() throws NullPointerException via the same Preconditions.checkNotNull + // path — swallow it so the original cause propagates. + if (arrowArray != null) { + if (!importedOk) { + try { + arrowArray.release(); + } catch (NullPointerException alreadyConsumed) { + // Import path consumed the array before throwing — release callback already + // ran via the consumer's drop. The NPE comes from Arrow Java's + // Preconditions.checkNotNull(data, "ArrowArray is already closed"). + } + } + arrowArray.close(); + } + if (arrowSchema != null) { + arrowSchema.close(); + } + } + } + } + + private native void nativeComputeZonemapBatch( + String columnName, + long[] fragmentIds, + String paramsJson, + long arrayAddress, + long schemaAddress); + + /** + * Write a consolidated zonemap index segment from one or more worker-computed batches and return + * its {@link Index} metadata, WITHOUT committing the manifest. + * + *

Driver-side entry point for build-time zonemap consolidation. The coordinator calls {@link + * #computeZonemapBatch} on each worker to obtain per-fragment-subset {@code VectorSchemaRoot}s, + * hands the full list here, and the call (a) concatenates them Rust-side against the canonical + * zonemap stats schema, (b) writes a single {@code zonemap.lance} file under a freshly allocated + * UUID directory, and (c) returns an {@code Index} whose {@code fragment_bitmap} is the union of + * every fragment id appearing in the input batches. + * + *

The returned {@code Index} is uncommitted — to land it in the manifest, pass it (along with + * any other segments) to {@link #commitExistingIndexSegments(String, String, List)}. + * + *

Each batch is exported to the Rust side through the Arrow C Data Interface. The export + * creates a separate Arrow C view that shares buffers with the source via refcount; the source + * {@code VectorSchemaRoot}s remain valid and usable after this method returns. Callers manage + * their lifecycle normally (typically try-with-resources at the worker call site) — no special + * handling around this method. + * + *

{@code paramsJson} should match the params each worker passed to {@link + * #computeZonemapBatch}. The actual zones in the batch have already been laid out by the worker + * pass; the driver value is written into the file metadata header so downstream readers see the + * same {@code rows_per_zone} the workers configured. Passing a different value here records a lie + * in the file header and will mislead consumers — it is not a supported way to "rewrite" the zone + * layout. + * + *

Pre-flight validation (null / empty list, null elements) runs before any FFI export, so + * rejection on those grounds leaves all input batches untouched. Once the export loop begins, an + * allocation or export failure mid-loop leaves the partial state visible only through the + * exception's stack trace; FFI handles allocated up to that point are released in {@code + * finally}. Callers may safely retry against the same source {@code VectorSchemaRoot} instances + * since the export does not mutate them. + * + *

Fragment id coverage is whatever the {@code fragment_id} column in the concatenated batch + * contains. The driver does NOT enforce that worker batches cover disjoint fragment subsets — + * overlapping fragment ids across workers will produce duplicated zone rows in the consolidated + * segment and surface as confused stats at read time. Coordinators are responsible for + * partitioning fragments cleanly. + * + * @param indexName the logical index name + * @param column the indexed column name. This MUST be the same column every worker passed + * to {@link #computeZonemapBatch}. The method does NOT verify the provenance of the + * batch's min/max values — only that the column exists in the dataset schema. Passing column + * "B" with batches that were computed against column "A" produces a silently-corrupted + * segment: the manifest records that the index applies to B, but the on-disk stats are A's, + * and there is no read-time marker that can detect the mismatch. Coordinators must thread the + * column name consistently from the {@code computeZonemapBatch} call sites through to this + * call. + * @param batches per-worker zonemap batches, typically returned by {@link #computeZonemapBatch}. + * Every batch must share the same schema (the case when all workers used the same params and + * the same indexed column on the same Dataset version, which is the only supported + * configuration). Must be non-null and non-empty. + * @param paramsJson serialized {@code ZoneMapIndexBuilderParams} matching the workers' params, or + * empty string for defaults + * @param allocator buffer allocator used to allocate the FFI export buffers + * @return the {@link Index} metadata for the freshly written, uncommitted segment + */ + public Index writeZonemapIndexFromBatches( + String indexName, + String column, + List batches, + String paramsJson, + BufferAllocator allocator) { + Preconditions.checkArgument( + indexName != null && !indexName.isEmpty(), "indexName cannot be null or empty"); + Preconditions.checkArgument( + column != null && !column.isEmpty(), "column cannot be null or empty"); + Preconditions.checkNotNull(batches, "batches cannot be null"); + Preconditions.checkArgument(!batches.isEmpty(), "batches cannot be empty"); + Preconditions.checkNotNull(allocator, "allocator cannot be null"); + String params = paramsJson == null ? "" : paramsJson; + + // Validate every batch reference up front. Mixing the null check with the FFI export + // would half-execute the export loop on a mid-list NPE — earlier elements have already + // had ArrowArray/ArrowSchema handles allocated and refcount-shared via + // Data.exportVectorSchemaRoot, and unwinding through the finally block would leave the + // caller wondering whether the source list had been partially traversed. Two-pass keeps + // the "rejection-on-input-validation leaves all batches and allocator state untouched" + // invariant intact. + for (int i = 0; i < batches.size(); i++) { + Preconditions.checkNotNull(batches.get(i), "batches[%s] is null", i); + } + + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + + // Allocate one (ArrowArray, ArrowSchema) pair per batch. Track them in two parallel + // lists so the close-loop runs in finally whether the native call succeeds or not. + List arrays = new java.util.ArrayList<>(batches.size()); + List schemas = new java.util.ArrayList<>(batches.size()); + try { + long[] arrayAddrs = new long[batches.size()]; + long[] schemaAddrs = new long[batches.size()]; + for (int i = 0; i < batches.size(); i++) { + VectorSchemaRoot root = batches.get(i); + // Track each handle in the cleanup list IMMEDIATELY after allocation, before any + // subsequent allocation can throw. Previously the ArrowSchema.allocateNew call + // could OOM with the just-allocated ArrowArray sitting untracked, leaking it past + // the finally block. + ArrowArray a = ArrowArray.allocateNew(allocator); + arrays.add(a); + ArrowSchema s = ArrowSchema.allocateNew(allocator); + schemas.add(s); + Data.exportVectorSchemaRoot(allocator, root, null, a, s); + arrayAddrs[i] = a.memoryAddress(); + schemaAddrs[i] = s.memoryAddress(); + } + return nativeWriteZonemapIndexFromBatches( + indexName, column, arrayAddrs, schemaAddrs, params); + } finally { + // For each FFI handle: release() fires the producer-side release callback that + // Data.exportVectorSchemaRoot set (so any retained ref to source buffers is dropped), + // then close() frees the 80-byte struct holder itself. close() alone only frees the + // holder — if Rust never consumed the handle via from_raw (e.g. JNI errored before + // the import loop ran), close() would leak the producer-side private_data and its + // retained buffer refs. release() is safe to call after from_raw consumed the struct + // because from_raw nulls the release pointer; the underlying releaseArray JNI call + // no-ops on a null release callback. + for (ArrowArray a : arrays) { + a.release(); + a.close(); + } + for (ArrowSchema s : schemas) { + s.release(); + s.close(); + } + } + } + } + + private native Index nativeWriteZonemapIndexFromBatches( + String indexName, String column, long[] arrayAddrs, long[] schemaAddrs, String paramsJson); + /** * Get the table config of the dataset. * diff --git a/java/src/test/java/org/lance/index/ComputeZonemapBatchTest.java b/java/src/test/java/org/lance/index/ComputeZonemapBatchTest.java new file mode 100644 index 00000000000..80701e45fb2 --- /dev/null +++ b/java/src/test/java/org/lance/index/ComputeZonemapBatchTest.java @@ -0,0 +1,244 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.index; + +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.FragmentMetadata; +import org.lance.FragmentOperation; +import org.lance.WriteParams; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; + +/** Tests for {@link Dataset#computeZonemapBatch(String, long[], String, BufferAllocator)}. */ +public class ComputeZonemapBatchTest { + + private static Schema intSchema() { + return new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.Int(32, true))), + null); + } + + private Dataset writeIntFragment( + BufferAllocator allocator, String path, long version, int startValue, int rowCount) { + Schema schema = intSchema(); + List metas; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("id"); + IntVector valVec = (IntVector) root.getVector("value"); + for (int i = 0; i < rowCount; i++) { + idVec.setSafe(i, startValue + i); + valVec.setSafe(i, (startValue + i) * 10); + } + root.setRowCount(rowCount); + metas = Fragment.create(path, allocator, root, new WriteParams.Builder().build()); + } + FragmentOperation.Append appendOp = new FragmentOperation.Append(metas); + return Dataset.commit(allocator, path, appendOp, Optional.of(version)); + } + + /** + * Null fragmentIds + explicit rows_per_zone — should compute over every fragment. We pass an + * explicit zone size (rather than relying on the default) because Lance honours the {@code + * LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE} env var as the default zone size, so a CI environment that + * sets it would otherwise flip the expected zone count and break this test. + */ + @Test + public void allFragmentsExplicitParams(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("all_frags").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + writeIntFragment(allocator, path, 3, 40, 20).close(); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot batch = + dataset.computeZonemapBatch("value", null, "{\"rows_per_zone\": 8192}", allocator)) { + Schema s = batch.getSchema(); + // Canonical schema: min, max, null_count, nan_count, fragment_id, zone_start, zone_length + // Pin column types so a regression from UInt32→Int32 on null_count etc. is caught here + // rather than silently round-tripping through FFI. + assertEquals(7, s.getFields().size(), "canonical zonemap stats schema has 7 columns"); + // Field 0: min (Int32, nullable — an all-NULL batch is legitimate) + assertEquals("min", s.getFields().get(0).getName()); + assertEquals(new ArrowType.Int(32, true), s.getFields().get(0).getType(), "min: Int32"); + assertTrue(s.getFields().get(0).isNullable(), "min must be nullable"); + // Field 1: max (Int32, nullable — same rationale as min) + assertEquals("max", s.getFields().get(1).getName()); + assertEquals(new ArrowType.Int(32, true), s.getFields().get(1).getType(), "max: Int32"); + assertTrue(s.getFields().get(1).isNullable(), "max must be nullable"); + // Field 2: null_count (UInt32, NON-nullable — metadata column) + assertEquals("null_count", s.getFields().get(2).getName()); + assertEquals( + new ArrowType.Int(32, false), s.getFields().get(2).getType(), "null_count: UInt32"); + assertFalse(s.getFields().get(2).isNullable(), "null_count must be non-nullable"); + // Field 3: nan_count (UInt32, NON-nullable) + assertEquals("nan_count", s.getFields().get(3).getName()); + assertEquals( + new ArrowType.Int(32, false), s.getFields().get(3).getType(), "nan_count: UInt32"); + assertFalse(s.getFields().get(3).isNullable(), "nan_count must be non-nullable"); + // Field 4: fragment_id (UInt64, NON-nullable) + assertEquals("fragment_id", s.getFields().get(4).getName()); + assertEquals( + new ArrowType.Int(64, false), s.getFields().get(4).getType(), "fragment_id: UInt64"); + assertFalse(s.getFields().get(4).isNullable(), "fragment_id must be non-nullable"); + // Field 5: zone_start (UInt64, NON-nullable) + assertEquals("zone_start", s.getFields().get(5).getName()); + assertEquals( + new ArrowType.Int(64, false), s.getFields().get(5).getType(), "zone_start: UInt64"); + assertFalse(s.getFields().get(5).isNullable(), "zone_start must be non-nullable"); + // Field 6: zone_length (UInt64, NON-nullable) + assertEquals("zone_length", s.getFields().get(6).getName()); + assertEquals( + new ArrowType.Int(64, false), s.getFields().get(6).getType(), "zone_length: UInt64"); + assertFalse(s.getFields().get(6).isNullable(), "zone_length must be non-nullable"); + + // Default rows_per_zone (8192) is >> 60 rows total, so we get one zone per fragment. + assertEquals(3, batch.getRowCount(), "one zone per fragment under default zone size"); + } + } + } + + /** Explicit fragment subset + custom rows_per_zone — verifies both knobs reach the Rust side. */ + @Test + public void fragmentSubsetWithSmallZoneSize(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("subset").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + writeIntFragment(allocator, path, 3, 40, 20).close(); + + // Pick the first and third committed fragment ids by discovery (same pattern as the + // WriteZonemapIndexFromBatchesTest tests). Hardcoded {0L, 2L} would silently break if + // Lance ever changed fragment-id allocation. + java.util.List fragIds = new java.util.ArrayList<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + for (org.lance.Fragment f : dataset.getFragments()) { + fragIds.add(f.getId()); + } + } + assertEquals(3, fragIds.size(), "expected 3 fragments committed"); + long firstFragId = fragIds.get(0).longValue(); + long thirdFragId = fragIds.get(2).longValue(); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot batch = + dataset.computeZonemapBatch( + "value", + new long[] {firstFragId, thirdFragId}, + "{\"rows_per_zone\": 5}", + allocator)) { + // 2 fragments × ceil(20/5) = 4 zones each = 8 zones total. + assertEquals(8, batch.getRowCount(), "two fragments × 4 zones each"); + + // Verify the returned batch's fragment_id column actually reflects the REQUESTED + // subset — not just any pair that happens to sum to 8 zones. A regression where the + // JNI swapped fragment-id-arg order or ignored the array would pass the row-count + // assertion above but fail here. + org.apache.arrow.vector.UInt8Vector fragVec = + (org.apache.arrow.vector.UInt8Vector) batch.getVector("fragment_id"); + java.util.Set frags = new java.util.HashSet<>(); + for (int row = 0; row < batch.getRowCount(); row++) { + frags.add(fragVec.getValueAsLong(row)); + } + assertEquals( + new java.util.HashSet<>(java.util.Arrays.asList(firstFragId, thirdFragId)), + frags, + "fragment_id column must contain exactly the requested subset"); + } + } + } + + /** + * Explicit empty fragmentIds array must be rejected: it would otherwise reach the JNI as {@code + * Some(vec![])}, meaning "zero fragments to scan", which is silently distinct from {@code null} + * ("all fragments"). Callers passing an empty list are almost certainly confused about the + * all-vs-none distinction. + */ + @Test + public void emptyFragmentIdsArrayRejected(@TempDir Path tempDir) { + String path = tempDir.resolve("empty_frags").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset dataset = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> dataset.computeZonemapBatch("value", new long[0], "", allocator)); + assertTrue( + thrown.getMessage() != null && thrown.getMessage().toLowerCase().contains("empty"), + "error must mention empty; got: " + thrown.getMessage()); + } + } + } + + @Test + public void nullColumnRejected(@TempDir Path tempDir) { + String path = tempDir.resolve("null_col").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset dataset = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + assertThrows( + IllegalArgumentException.class, + () -> dataset.computeZonemapBatch(null, null, "", allocator)); + } + } + } + + @Test + public void emptyParamsJsonUsesDefaults(@TempDir Path tempDir) throws Exception { + // Both "" and null paramsJson must take the default-params path without throwing. + String path = tempDir.resolve("empty_params").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 30).close(); + + try (Dataset dataset = Dataset.open(path, allocator)) { + try (VectorSchemaRoot a = dataset.computeZonemapBatch("value", null, "", allocator); + VectorSchemaRoot b = dataset.computeZonemapBatch("value", null, null, allocator)) { + assertEquals(a.getRowCount(), b.getRowCount()); + } + } + } + } +} diff --git a/java/src/test/java/org/lance/index/WriteZonemapIndexFromBatchesTest.java b/java/src/test/java/org/lance/index/WriteZonemapIndexFromBatchesTest.java new file mode 100644 index 00000000000..6e187093a52 --- /dev/null +++ b/java/src/test/java/org/lance/index/WriteZonemapIndexFromBatchesTest.java @@ -0,0 +1,795 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.index; + +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.FragmentMetadata; +import org.lance.FragmentOperation; +import org.lance.WriteParams; +import org.lance.index.scalar.ZoneStats; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for the build-time zonemap consolidation flow: {@link Dataset#computeZonemapBatch} (worker) + * → {@link Dataset#writeZonemapIndexFromBatches} (driver) → {@link + * Dataset#commitExistingIndexSegments} (driver). + */ +public class WriteZonemapIndexFromBatchesTest { + + private static Schema intSchema() { + return new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.Int(32, true))), + null); + } + + private Dataset writeIntFragment( + BufferAllocator allocator, String path, long version, int startValue, int rowCount) { + Schema schema = intSchema(); + List metas; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("id"); + IntVector valVec = (IntVector) root.getVector("value"); + for (int i = 0; i < rowCount; i++) { + idVec.setSafe(i, startValue + i); + valVec.setSafe(i, (startValue + i) * 10); + } + root.setRowCount(rowCount); + metas = Fragment.create(path, allocator, root, new WriteParams.Builder().build()); + } + FragmentOperation.Append appendOp = new FragmentOperation.Append(metas); + return Dataset.commit(allocator, path, appendOp, Optional.of(version)); + } + + /** + * End-to-end: 4 fragments, two workers each compute batches for half, driver consolidates + + * writes + commits one IndexMetadata, then read via getZonemapStats and verify it covers all 4 + * fragments via the single returned segment. + */ + @Test + public void endToEndConsolidation(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("e2e").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + writeIntFragment(allocator, path, 3, 40, 20).close(); + writeIntFragment(allocator, path, 4, 60, 20).close(); + + // Build a fragId → startValue map by reading getFragments() in commit-iteration order. + // This is a SMALLER assumption than "fragment ids are monotonically allocated from 0": + // we only need getFragments() to return fragments in the order they were committed, + // not that the ids themselves are sequential. A future Lance change that allocates + // non-monotonic ids (e.g. random UUIDs) would still satisfy this test as long as the + // listing order matches the commit order. If even that assumption breaks, the right + // fix is to read identifying data out of each fragment instead of relying on listing + // order at all. + java.util.Map startValueByFragId = new java.util.HashMap<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + List frags = dataset.getFragments(); + assertEquals(4, frags.size(), "expected 4 fragments to be committed"); + for (int i = 0; i < frags.size(); i++) { + // Fragments are returned in commit order; we wrote startValue=0/20/40/60 in that order. + startValueByFragId.put(frags.get(i).getId(), i * 20); + } + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + // Worker 1: first two fragments by id. Worker 2: last two fragments by id. + java.util.List sortedFragIds = + new java.util.ArrayList<>(startValueByFragId.keySet()); + java.util.Collections.sort(sortedFragIds); + long[] w1Frags = + new long[] {sortedFragIds.get(0).longValue(), sortedFragIds.get(1).longValue()}; + long[] w2Frags = + new long[] {sortedFragIds.get(2).longValue(), sortedFragIds.get(3).longValue()}; + String paramsJson = "{\"rows_per_zone\": 5}"; + try (VectorSchemaRoot w1 = + dataset.computeZonemapBatch("value", w1Frags, paramsJson, allocator); + VectorSchemaRoot w2 = + dataset.computeZonemapBatch("value", w2Frags, paramsJson, allocator)) { + + assertEquals(8, w1.getRowCount(), "worker 1: 2 frags × 4 zones"); + assertEquals(8, w2.getRowCount(), "worker 2: 2 frags × 4 zones"); + + Index segment = + dataset.writeZonemapIndexFromBatches( + "value_zm", "value", Arrays.asList(w1, w2), paramsJson, allocator); + + // Returned segment should have all 4 fragments in its bitmap and a real UUID. + assertNotNull(segment.uuid(), "segment uuid must be non-null"); + assertTrue(segment.fragments().isPresent(), "segment fragments must be populated"); + Set fragIds = new HashSet<>(segment.fragments().get()); + assertEquals( + new HashSet<>(startValueByFragId.keySet()), + fragIds, + "bitmap must cover every input fragment"); + assertEquals("value_zm", segment.name()); + + // Commit it. One IndexMetadata covering all 4 fragments — not 4 separate segments. + dataset.commitExistingIndexSegments( + "value_zm", "value", java.util.Collections.singletonList(segment)); + } + } + + // Re-open and confirm (a) the committed manifest has exactly one IndexMetadata for + // value_zm, and (b) getZonemapStats reflects the union of fragments through that single + // segment. Combined with the earlier `segment.fragments() == {0,1,2,3}` assertion this + // is what makes the consolidation property end-to-end: ONE segment, not ONE per + // fragment, covers the full fragment set. + try (Dataset dataset = Dataset.open(path, allocator)) { + long segmentCount = + dataset.getIndexes().stream().filter(i -> "value_zm".equals(i.name())).count(); + assertEquals( + 1, + segmentCount, + "committed manifest must hold exactly one IndexMetadata for value_zm; got " + + segmentCount); + + List stats = dataset.getZonemapStats("value"); + assertEquals(16, stats.size(), "4 fragments × 4 zones each at rows_per_zone=5"); + + Set fragIds = new HashSet<>(); + for (ZoneStats z : stats) { + fragIds.add(z.getFragmentId()); + // The dataset stores value = (startValue + i) * 10. startValue per fragment comes + // from the startValueByFragId map captured at commit time — robust to Lance changing + // its fragment-id allocation strategy (currently monotonic with commit order, but + // we do not want to bake that into the test). + Integer startValue = startValueByFragId.get(z.getFragmentId()); + assertNotNull( + startValue, "fragment id " + z.getFragmentId() + " was not in the committed set"); + // PER-ZONE bound: zone covers rows [zone_start, zone_start + zone_length) within the + // fragment, and values are (startValue + i)*10. A regression that emitted constant + // fragment-wide min/max for every zone (ignoring zone_start) would pass a looser + // fragment-range check but fail this per-zone bound. + long zoneStart = z.getZoneStart(); + long zoneLength = z.getZoneLength(); + int expectedZoneMin = (startValue + (int) zoneStart) * 10; + int expectedZoneMax = (startValue + (int) zoneStart + (int) zoneLength - 1) * 10; + long zoneMin = ((Number) z.getMin()).longValue(); + long zoneMax = ((Number) z.getMax()).longValue(); + assertEquals( + expectedZoneMin, + zoneMin, + "zone min for fragment " + z.getFragmentId() + " starting at row " + zoneStart); + assertEquals( + expectedZoneMax, + zoneMax, + "zone max for fragment " + z.getFragmentId() + " starting at row " + zoneStart); + } + assertEquals( + new HashSet<>(startValueByFragId.keySet()), + fragIds, + "every fragment must appear in the committed consolidated zonemap"); + + // Across each fragment's zones, min should be monotone-nondecreasing with zone_start + // (data is in row-address order and values are monotone within a fragment). + for (Integer fragmentId : startValueByFragId.keySet()) { + final int frag = fragmentId; + List perFrag = new java.util.ArrayList<>(); + for (ZoneStats z : stats) { + if (z.getFragmentId() == frag) perFrag.add(z); + } + perFrag.sort(java.util.Comparator.comparingLong(ZoneStats::getZoneStart)); + long prevMin = Long.MIN_VALUE; + for (ZoneStats z : perFrag) { + long m = ((Number) z.getMin()).longValue(); + assertTrue( + m >= prevMin, + "fragment " + frag + ": zone min must be monotone-nondecreasing along zone_start"); + prevMin = m; + } + } + } + } + } + + /** + * Single-worker case: the natural Spark setup where {@code num_partitions=1} skips the + * consolidation rationale but the API must still work. Single-batch concat is a no-op but we + * still want a properly committed segment. + */ + @Test + public void singleWorkerBatch(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("single").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + + // Capture actual fragment ids in commit-iteration order — consistent with + // endToEndConsolidation. Tests should not bake "fragment ids are 0,1,2,..." into the + // bitmap-expectation assertion below. + Set committedFragIds = new HashSet<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + for (Fragment f : dataset.getFragments()) { + committedFragIds.add(f.getId()); + } + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + String paramsJson = "{\"rows_per_zone\": 5}"; + try (VectorSchemaRoot w1 = + dataset.computeZonemapBatch("value", null, paramsJson, allocator)) { + Index segment = + dataset.writeZonemapIndexFromBatches( + "value_zm", + "value", + java.util.Collections.singletonList(w1), + paramsJson, + allocator); + assertEquals(committedFragIds, new HashSet<>(segment.fragments().get())); + dataset.commitExistingIndexSegments( + "value_zm", "value", java.util.Collections.singletonList(segment)); + } + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + assertEquals( + 1, dataset.getIndexes().stream().filter(i -> "value_zm".equals(i.name())).count()); + assertEquals(8, dataset.getZonemapStats("value").size(), "2 frags × 4 zones"); + } + } + } + + /** + * Fragment-hole scenario: workers cover {0, 2}; fragment 1 has data but was deliberately not + * indexed. The committed segment's bitmap should hold a literal hole rather than expanding to {0, + * 1, 2}. + */ + @Test + public void fragmentIdHolePreserved(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("hole").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + writeIntFragment(allocator, path, 3, 40, 20).close(); + + // Discover the three actual fragment ids in commit order — same pattern as + // endToEndConsolidation / singleWorkerBatch. We index the FIRST and THIRD; the MIDDLE + // one should remain a hole in the committed bitmap. + java.util.List fragIds = new java.util.ArrayList<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + for (Fragment f : dataset.getFragments()) { + fragIds.add(f.getId()); + } + } + assertEquals(3, fragIds.size(), "expected 3 fragments committed"); + int indexedA = fragIds.get(0); + int holeFragId = fragIds.get(1); + int indexedB = fragIds.get(2); + Set expectedBitmap = new HashSet<>(Arrays.asList(indexedA, indexedB)); + + try (Dataset dataset = Dataset.open(path, allocator)) { + // Note: middle fragment deliberately excluded — that's the hole this test pins. + try (VectorSchemaRoot batch = + dataset.computeZonemapBatch( + "value", + new long[] {(long) indexedA, (long) indexedB}, + "{\"rows_per_zone\": 5}", + allocator)) { + Index segment = + dataset.writeZonemapIndexFromBatches( + "value_zm", + "value", + java.util.Collections.singletonList(batch), + "{\"rows_per_zone\": 5}", + allocator); + assertEquals( + expectedBitmap, + new HashSet<>(segment.fragments().get()), + "in-memory segment bitmap must not invent fragments"); + dataset.commitExistingIndexSegments( + "value_zm", "value", java.util.Collections.singletonList(segment)); + } + } + + // Reopen and verify the hole survives on disk through both the IndexMetadata bitmap and + // the read-back zone-stats listing (which is what consumers actually see). + try (Dataset dataset = Dataset.open(path, allocator)) { + Index committed = + dataset.getIndexes().stream() + .filter(i -> "value_zm".equals(i.name())) + .findFirst() + .orElseThrow(); + assertEquals( + expectedBitmap, + new HashSet<>(committed.fragments().get()), + "committed manifest must preserve the literal hole"); + assertFalse( + committed.fragments().get().contains(holeFragId), + "the deliberately unindexed fragment must NOT appear in the committed bitmap"); + + Set statFragIds = new HashSet<>(); + for (ZoneStats z : dataset.getZonemapStats("value")) { + statFragIds.add(z.getFragmentId()); + } + assertEquals( + expectedBitmap, + statFragIds, + "getZonemapStats must not return zones for the unindexed fragment"); + } + } + } + + /** + * All-NULL column: the doc claims min/max are nullable because an entire batch may be all-NULL. + * Verify the round-trip through compute → write → commit → re-read works for that shape without + * crashing. + */ + @Test + public void allNullColumn(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("all_null").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + Schema schema = intSchema(); + try (Dataset ds = + Dataset.create(allocator, path, schema, new WriteParams.Builder().build())) { + // empty + } + // Write a fragment whose 'value' column is entirely NULL. + List metas; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("id"); + IntVector valVec = (IntVector) root.getVector("value"); + for (int i = 0; i < 20; i++) { + idVec.setSafe(i, i); + valVec.setNull(i); + } + root.setRowCount(20); + metas = Fragment.create(path, allocator, root, new WriteParams.Builder().build()); + } + Dataset.commit(allocator, path, new FragmentOperation.Append(metas), Optional.of(1L)).close(); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot batch = + dataset.computeZonemapBatch("value", null, "{\"rows_per_zone\": 5}", allocator)) { + // Even with all-NULL data we still get one zone per zone-sized chunk. + assertTrue(batch.getRowCount() > 0, "all-NULL fragment must still produce zones"); + Index segment = + dataset.writeZonemapIndexFromBatches( + "value_zm", + "value", + java.util.Collections.singletonList(batch), + "{\"rows_per_zone\": 5}", + allocator); + dataset.commitExistingIndexSegments( + "value_zm", "value", java.util.Collections.singletonList(segment)); + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + List stats = dataset.getZonemapStats("value"); + assertFalse(stats.isEmpty(), "all-NULL fragment must produce readable stats"); + for (ZoneStats z : stats) { + assertEquals( + z.getZoneLength(), + z.getNullCount(), + "every zone in an all-NULL column must have nullCount == zoneLength"); + assertNull(z.getMin(), "min must be null when every value is null"); + assertNull(z.getMax(), "max must be null when every value is null"); + } + } + } + } + + @Test + public void rejectsEmptyBatchList(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("empty").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset dataset = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + assertThrows( + IllegalArgumentException.class, + () -> + dataset.writeZonemapIndexFromBatches( + "zm", "value", java.util.Collections.emptyList(), "", allocator)); + } + } + } + + @Test + public void rejectsNullBatchInList(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("null_batch").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 10).close(); + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot batch = dataset.computeZonemapBatch("value", null, "", allocator)) { + List withNull = Arrays.asList(batch, null); + assertThrows( + NullPointerException.class, + () -> dataset.writeZonemapIndexFromBatches("zm", "value", withNull, "", allocator)); + } + } + } + + /** + * Exercises the defence-in-depth schema check on the Rust side: feed two batches whose schemas + * differ (one from `value`, one from a different column) and expect a clear error rather than a + * confusing concat_batches failure deeper in the stack. + */ + @Test + public void rejectsDivergentBatchSchemas(@TempDir Path tempDir) throws Exception { + // Multi-column dataset so workers can compute zonemap batches with different min/max + // value types (Int32 for `value`, Int32 — but on a different Lance column — for `id`). + // The two output batches have identical Arrow schema shapes, so to force a real divergence + // we use a column with a wider integer type for one worker. + String path = tempDir.resolve("divergent").toString(); + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("id_i32", new ArrowType.Int(32, true)), + Field.nullable("val_i64", new ArrowType.Int(64, true))), + null); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, schema, new WriteParams.Builder().build())) { + // empty + } + // Write a tiny fragment so computeZonemapBatch has data over which to compute. + List metas; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + ((IntVector) root.getVector("id_i32")).setSafe(0, 1); + ((org.apache.arrow.vector.BigIntVector) root.getVector("val_i64")).setSafe(0, 100L); + root.setRowCount(1); + metas = Fragment.create(path, allocator, root, new WriteParams.Builder().build()); + } + Dataset.commit(allocator, path, new FragmentOperation.Append(metas), Optional.of(1L)).close(); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot wIdInt32 = dataset.computeZonemapBatch("id_i32", null, "", allocator); + VectorSchemaRoot wValInt64 = + dataset.computeZonemapBatch("val_i64", null, "", allocator)) { + // The two batches have different min/max types (Int32 vs Int64). Native-side error + // bubbles up as IllegalArgumentException (the Rust JNI maps Error::input_error there). + // Tightening the expected class beats `Exception.class` — the latter would also accept + // an unrelated IOException whose message happens to contain "schema". + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> + dataset.writeZonemapIndexFromBatches( + "zm", "id_i32", Arrays.asList(wIdInt32, wValInt64), "", allocator)); + assertTrue( + thrown.getMessage() != null && thrown.getMessage().toLowerCase().contains("schema"), + "error must mention schema; got: " + thrown.getMessage()); + } + } + } + + /** + * Coordinator-bug scenario: two workers were accidentally assigned overlapping fragment ids. The + * current contract is that the consolidated batch faithfully reflects whatever the caller hands + * it (no implicit dedup), so the resulting segment contains duplicated zones for the shared + * fragment ids and the bitmap dedups (RoaringBitmap is a set). This test pins that observable + * behaviour so a future change to either policy (silent-dedup vs explicit-reject) shows up here. + */ + @Test + public void overlappingFragmentIdsAcrossWorkers(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("overlap").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 20).close(); + writeIntFragment(allocator, path, 2, 20, 20).close(); + + // Discover the two fragment ids in commit order. Worker 1 indexes both; worker 2 + // overlaps on the SECOND one — the coordinator-bug shape this test pins. + java.util.List fragIds = new java.util.ArrayList<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + for (Fragment f : dataset.getFragments()) { + fragIds.add(f.getId()); + } + } + assertEquals(2, fragIds.size(), "expected 2 fragments committed"); + int fragA = fragIds.get(0); + int fragB = fragIds.get(1); + + try (Dataset dataset = Dataset.open(path, allocator)) { + String paramsJson = "{\"rows_per_zone\": 5}"; + try (VectorSchemaRoot w1 = + dataset.computeZonemapBatch( + "value", new long[] {(long) fragA, (long) fragB}, paramsJson, allocator); + // Worker 2 overlaps with worker 1 on fragB — coordinator bug. + VectorSchemaRoot w2 = + dataset.computeZonemapBatch( + "value", new long[] {(long) fragB}, paramsJson, allocator)) { + + Index segment = + dataset.writeZonemapIndexFromBatches( + "value_zm", "value", Arrays.asList(w1, w2), paramsJson, allocator); + + // Bitmap dedups (it's a set). The DATA in the segment contains duplicated zone rows + // for fragB — that's a coordinator bug surfacing as confused read-time stats + // rather than a write-time error. + assertEquals( + new HashSet<>(Arrays.asList(fragA, fragB)), + new HashSet<>(segment.fragments().get()), + "bitmap dedups to the union of fragment ids"); + dataset.commitExistingIndexSegments( + "value_zm", "value", java.util.Collections.singletonList(segment)); + } + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + List stats = dataset.getZonemapStats("value"); + // 12 zone rows total: 8 from worker 1 (fragA + fragB, 4 zones each) + 4 from worker + // 2 (fragB, 4 zones). The duplication for fragB is preserved on disk. + assertEquals( + 12, + stats.size(), + "overlapping fragment partitions produce duplicated zone rows — caller bug surfaces " + + "at read time, not write time"); + // Verify the duplication is on the OVERLAPPED fragment, not somewhere unexpected. A bug + // that produced 12 zones all on fragA would pass a size-only assertion. + long zonesForA = stats.stream().filter(z -> z.getFragmentId() == fragA).count(); + long zonesForB = stats.stream().filter(z -> z.getFragmentId() == fragB).count(); + assertEquals(4, zonesForA, "fragA indexed once → 4 zones"); + assertEquals(8, zonesForB, "fragB indexed twice → 8 zones (duplicate evidence)"); + } + } + } + + /** + * NaN is not conflated with NULL through the FFI / commit / re-read pipeline. A Float64 column + * carrying NaN values but no NULLs must report {@code nullCount=0} on every zone — proving NaN + * counting lives in its own column and is not being merged into the null lane somewhere along the + * export path. The Java {@link ZoneStats} type currently exposes only {@code nullCount} (the + * {@code nan_count} column is not surfaced), so the assertion is indirect; the test name reflects + * what is actually verified. + */ + @Test + public void nanNotConflatedWithNull(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("nan").toString(); + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable( + "val_f64", + new ArrowType.FloatingPoint( + org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))), + null); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, schema, new WriteParams.Builder().build())) { + // empty + } + // Write a 20-row fragment where rows 5..10 are NaN. + List metas; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("id"); + org.apache.arrow.vector.Float8Vector valVec = + (org.apache.arrow.vector.Float8Vector) root.getVector("val_f64"); + for (int i = 0; i < 20; i++) { + idVec.setSafe(i, i); + if (i >= 5 && i < 10) { + valVec.setSafe(i, Double.NaN); + } else { + valVec.setSafe(i, (double) i); + } + } + root.setRowCount(20); + metas = Fragment.create(path, allocator, root, new WriteParams.Builder().build()); + } + Dataset.commit(allocator, path, new FragmentOperation.Append(metas), Optional.of(1L)).close(); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot batch = + dataset.computeZonemapBatch("val_f64", null, "{\"rows_per_zone\": 5}", allocator)) { + Index segment = + dataset.writeZonemapIndexFromBatches( + "val_zm", + "val_f64", + java.util.Collections.singletonList(batch), + "{\"rows_per_zone\": 5}", + allocator); + dataset.commitExistingIndexSegments( + "val_zm", "val_f64", java.util.Collections.singletonList(segment)); + } + + try (Dataset dataset = Dataset.open(path, allocator)) { + // 20 rows / 5 = 4 zones. Zone covering rows 5..10 has 5 NaN values; zone covering + // 10..15 has 0 (since NaN range was 5..10 exclusive). + // ZoneStats currently exposes nullCount but not nanCount via the public Java type; + // verify indirectly by confirming that zones containing the NaN range still produce + // valid min/max for the non-NaN portion and have zero null_count. + List stats = dataset.getZonemapStats("val_f64"); + assertEquals(4, stats.size(), "20 rows / rows_per_zone=5 → 4 zones"); + for (ZoneStats z : stats) { + // No row in this fragment is NULL; only NaN. So null_count must remain 0 for every + // zone — confirming NaN is NOT being conflated with NULL through the FFI pipeline. + assertEquals( + 0, + z.getNullCount(), + "zone " + + z.getZoneStart() + + ": NaN must not be counted as NULL — got nullCount=" + + z.getNullCount()); + } + } + } + } + + /** + * Malformed paramsJson causes the JNI to error BEFORE the FFI from_raw loop runs — i.e. the Java + * side has already allocated ArrowArray/ArrowSchema handles and exported batch data into them, + * but Rust never consumes via from_raw. The finally block must release all of those handles + * cleanly so RootAllocator.close() doesn't fail with a leak; if Arrow Java's {@code close()} + * alone weren't enough, this test would fail at the RootAllocator's try-with-resources exit. + */ + @Test + public void invalidParamsJsonDoesNotLeakFfiHandles(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("bad_params").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 10).close(); + writeIntFragment(allocator, path, 2, 10, 10).close(); + + // Discover fragment ids via getFragments() so this test doesn't bake in the assumption + // that ids are allocated sequentially from 0 — same pattern as the other tests in this + // suite. If Lance ever changes allocation strategy the test still exercises the + // leak-window contract (malformed paramsJson AFTER successful FFI export). + java.util.List discoveredFragIds = new java.util.ArrayList<>(); + try (Dataset dataset = Dataset.open(path, allocator)) { + for (Fragment f : dataset.getFragments()) { + discoveredFragIds.add((long) f.getId()); + } + } + assertEquals(2, discoveredFragIds.size(), "expected 2 fragments committed"); + + try (Dataset dataset = Dataset.open(path, allocator); + VectorSchemaRoot w1 = + dataset.computeZonemapBatch( + "value", new long[] {discoveredFragIds.get(0)}, "", allocator); + VectorSchemaRoot w2 = + dataset.computeZonemapBatch( + "value", new long[] {discoveredFragIds.get(1)}, "", allocator)) { + // Force the JNI's serde_json parse step to fail AFTER Java has finished exporting + // both batches into FFI handles. + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> + dataset.writeZonemapIndexFromBatches( + "zm", "value", Arrays.asList(w1, w2), "{not valid json", allocator)); + assertTrue( + thrown.getMessage() != null && thrown.getMessage().toLowerCase().contains("params"), + "error must mention params JSON; got: " + thrown.getMessage()); + } + } + } + + /** + * Upper-bound fragment id rejection: {@code long} values greater than {@code u32::MAX} + * (4294967295) must be rejected before the JNI silently truncates. Counterpart to the negative-id + * test below; together they pin the full out-of-range contract. + */ + @Test + public void aboveU32MaxFragmentIdRejected(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("oversize_frag").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 10).close(); + try (Dataset dataset = Dataset.open(path, allocator)) { + long aboveU32Max = (1L << 32); // u32::MAX is 2^32 - 1; one past = 2^32. + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> + dataset.computeZonemapBatch("value", new long[] {aboveU32Max}, "", allocator)); + assertTrue( + thrown.getMessage() != null && thrown.getMessage().toLowerCase().contains("fragment"), + "error must mention fragment id; got: " + thrown.getMessage()); + } + } + } + + /** + * Column-not-found is rejected at the Rust layer (Java only checks null/empty string). Verifies + * the typo-catching error propagates back as IllegalArgumentException with a clear message rather + * than e.g. an NPE deeper in the import path. + */ + @Test + public void unknownColumnRejected(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("unknown_col").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 10).close(); + try (Dataset dataset = Dataset.open(path, allocator)) { + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> dataset.computeZonemapBatch("not_a_column", null, "", allocator)); + assertTrue( + thrown.getMessage() != null + && thrown.getMessage().toLowerCase().contains("not_a_column"), + "error must name the missing column; got: " + thrown.getMessage()); + } + } + } + + /** + * Negative fragment id rejection — Java accepts a {@code long[]} so a negative element is + * reachable; the JNI must reject rather than silently u32-wrap. + */ + @Test + public void negativeFragmentIdRejected(@TempDir Path tempDir) throws Exception { + String path = tempDir.resolve("neg_frag").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + try (Dataset ds = + Dataset.create(allocator, path, intSchema(), new WriteParams.Builder().build())) { + // empty + } + writeIntFragment(allocator, path, 1, 0, 10).close(); + try (Dataset dataset = Dataset.open(path, allocator)) { + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, + () -> dataset.computeZonemapBatch("value", new long[] {-1L}, "", allocator)); + assertTrue( + thrown.getMessage() != null && thrown.getMessage().toLowerCase().contains("fragment"), + "error must mention fragment id; got: " + thrown.getMessage()); + } + } + } +} diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 45ded3b0db5..75442100327 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -47,9 +47,38 @@ use roaring::RoaringBitmap; use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones}; const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches -const ZONEMAP_FILENAME: &str = "zonemap.lance"; +/// Filename of the on-disk zonemap index file written under `/`. Public because +/// external coordinators (e.g. JNI bindings, custom writers) need to refer to it by the same +/// name the read path expects. +pub const ZONEMAP_FILENAME: &str = "zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; -const ZONEMAP_INDEX_VERSION: u32 = 0; +/// On-disk format version for zonemap indices. Public so external coordinators that +/// assemble an [`IndexMetadata`](lance_table::format::IndexMetadata) for a freshly written +/// consolidated zonemap segment can populate the `index_version` field consistently with +/// what the plugin's `train_index` emits — without duplicating the literal. +pub const ZONEMAP_INDEX_VERSION: u32 = 0; + +/// Canonical schema of a zone-stats record batch. The on-disk `zonemap.lance` file's record +/// layout, the in-memory `ZoneMapIndexBuilder::zonemap_stats_as_batch` output, and any externally- +/// produced batch fed into [`write_zonemap_index_from_batch`] all share this schema. Treat the +/// column names and order as a stability contract: the on-disk reader matches columns by name, +/// and [`validate_zonemap_stats_schema`] enforces full conformance up front so writes whose +/// batch deviates from this shape fail before producing an unloadable file. +/// +/// `min` / `max` are nullable because an entire batch may be all-NULL; the caller supplies the +/// indexed column's type via `value_type`. The remaining metadata columns (`null_count`, +/// `nan_count`, `fragment_id`, `zone_start`, `zone_length`) have fixed types and are non-nullable. +pub fn zonemap_stats_schema(value_type: &DataType) -> Arc { + Arc::new(arrow_schema::Schema::new(vec![ + Field::new("min", value_type.clone(), true), + Field::new("max", value_type.clone(), true), + Field::new("null_count", DataType::UInt32, false), + Field::new("nan_count", DataType::UInt32, false), + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), + ])) +} /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] @@ -683,7 +712,14 @@ impl ZoneMapIndexBuilder { Ok(()) } - fn zonemap_stats_as_batch(&self) -> Result { + /// Drain the in-memory zone statistics into a record batch with the canonical zonemap schema + /// (`min`, `max`, `null_count`, `nan_count`, `fragment_id`, `zone_start`, `zone_length`). + /// + /// Public so external coordinators consolidating per-fragment zone batches from parallel + /// workers can extract the trained state without forcing the in-place file write that + /// [`Self::write_index`] performs. The companion [`write_zonemap_index_from_batch`] free + /// function consumes the same shape. + pub fn zonemap_stats_as_batch(&self) -> Result { // Flush self.maps as a RecordBatch let mins = if self.maps.is_empty() { new_empty_array(&self.items_type) @@ -709,16 +745,7 @@ impl ZoneMapIndexBuilder { let zone_starts = UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.start)); - let schema = Arc::new(arrow_schema::Schema::new(vec![ - // min and max can be null if the entire batch is null values - Field::new("min", self.items_type.clone(), true), - Field::new("max", self.items_type.clone(), true), - Field::new("null_count", DataType::UInt32, false), - Field::new("nan_count", DataType::UInt32, false), - Field::new("fragment_id", DataType::UInt64, false), - Field::new("zone_start", DataType::UInt64, false), - Field::new("zone_length", DataType::UInt64, false), - ])); + let schema = zonemap_stats_schema(&self.items_type); let columns: Vec = vec![ mins, @@ -734,20 +761,118 @@ impl ZoneMapIndexBuilder { pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<()> { let record_batch = self.zonemap_stats_as_batch()?; + write_zonemap_index_from_batch(record_batch, &self.options, index_store).await + } +} - let mut file_schema = record_batch.schema().as_ref().clone(); - file_schema.metadata.insert( - ZONEMAP_SIZE_META_KEY.to_string(), - self.options.rows_per_zone.to_string(), - ); +/// Write a `zonemap.lance` file from a pre-computed zone-stats record batch. +/// +/// The record batch must conform to the canonical zonemap stats schema (see +/// [`zonemap_stats_schema`]). Column names, order, and types are validated up front, and the +/// fixed-type metadata columns are also required to be non-nullable; a mismatched batch returns +/// an `invalid_input` error rather than silently writing a file the read path cannot load. +/// `min` and `max` may carry any data type as long as both share the indexed column's type; +/// their nullability is not constrained (the canonical schema declares them nullable so an +/// all-NULL batch is representable, but a caller whose data has no nulls may pass non-nullable +/// fields). +/// +/// Underlying writer extracted from [`ZoneMapIndexBuilder::write_index`] so external coordinators +/// that consolidate per-fragment zone batches produced by parallel workers can write a single +/// consolidated file without re-running the train phase. `params` is taken by reference rather +/// than `rows_per_zone: u64` so future ZoneMap knobs that affect on-disk metadata can extend +/// the parameter set without breaking this signature. +pub async fn write_zonemap_index_from_batch( + record_batch: RecordBatch, + params: &ZoneMapIndexBuilderParams, + index_store: &dyn IndexStore, +) -> Result<()> { + validate_zonemap_stats_schema(record_batch.schema().as_ref())?; + + let mut file_schema = record_batch.schema().as_ref().clone(); + file_schema.metadata.insert( + ZONEMAP_SIZE_META_KEY.to_string(), + params.rows_per_zone().to_string(), + ); + + let mut index_file = index_store + .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) + .await?; + index_file.write_record_batch(record_batch).await?; + index_file.finish().await?; + Ok(()) +} - let mut index_file = index_store - .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) - .await?; - index_file.write_record_batch(record_batch).await?; - index_file.finish().await?; +/// Validate that `schema` matches the canonical zonemap stats shape: 7 columns in canonical +/// order, the metadata columns at fixed types and non-nullable, and `min`/`max` sharing one +/// (caller-determined) data type. Public so coordinators can validate locally before calling +/// [`write_zonemap_index_from_batch`] (which would discover the mismatch only after opening +/// the output index file). +pub fn validate_zonemap_stats_schema(schema: &arrow_schema::Schema) -> Result<()> { + let fields = schema.fields(); + const EXPECTED_NAMES: [&str; 7] = [ + "min", + "max", + "null_count", + "nan_count", + "fragment_id", + "zone_start", + "zone_length", + ]; + if fields.len() != EXPECTED_NAMES.len() { + return Err(Error::invalid_input(format!( + "zonemap stats batch has {} columns, expected {}: {:?}", + fields.len(), + EXPECTED_NAMES.len(), + EXPECTED_NAMES + ))); + } + for (i, expected) in EXPECTED_NAMES.iter().enumerate() { + if fields[i].name() != expected { + return Err(Error::invalid_input(format!( + "zonemap stats batch column {} is '{}', expected '{}'", + i, + fields[i].name(), + expected + ))); + } + } + + // min and max: both share the indexed column type (caller-determined); both nullable + // (an all-NULL batch is legitimate). + if fields[0].data_type() != fields[1].data_type() { + return Err(Error::invalid_input(format!( + "zonemap stats batch 'min' type {:?} != 'max' type {:?}", + fields[0].data_type(), + fields[1].data_type() + ))); + } + + // Fixed-type non-nullable metadata columns. Allowing nullable here would let null values + // silently coerce to 0 in the read path's UInt{32,64}Array::value() calls, producing + // corrupted zone stats with no error. + fn check_metadata_field(field: &Field, expected_ty: &DataType) -> Result<()> { + if field.data_type() != expected_ty { + return Err(Error::invalid_input(format!( + "zonemap stats batch column '{}' has type {:?}, expected {:?}", + field.name(), + field.data_type(), + expected_ty + ))); + } + if field.is_nullable() { + return Err(Error::invalid_input(format!( + "zonemap stats batch column '{}' must be non-nullable", + field.name() + ))); + } Ok(()) } + check_metadata_field(&fields[2], &DataType::UInt32)?; // null_count + check_metadata_field(&fields[3], &DataType::UInt32)?; // nan_count + check_metadata_field(&fields[4], &DataType::UInt64)?; // fragment_id + check_metadata_field(&fields[5], &DataType::UInt64)?; // zone_start + check_metadata_field(&fields[6], &DataType::UInt64)?; // zone_length + Ok(()) } /// Index-specific processor that computes min/max statistics for each zone while the @@ -2534,4 +2659,93 @@ mod tests { // All max characters assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None); } + + /// Build a schema with the canonical column ordering, then mutate via `mutate` and return. + /// Used by the validate_zonemap_stats_schema rejection-branch tests below. + fn canonical_with( + value_type: DataType, + mutate: impl FnOnce(Vec) -> Vec, + ) -> Schema { + let canonical = super::zonemap_stats_schema(&value_type); + let fields = canonical + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect(); + Schema::new(mutate(fields)) + } + + #[test] + fn test_validate_schema_accepts_canonical() { + let schema = super::zonemap_stats_schema(&DataType::Int32); + super::validate_zonemap_stats_schema(schema.as_ref()) + .expect("canonical schema must validate"); + } + + #[test] + fn test_validate_schema_rejects_wrong_column_count() { + let schema = Schema::new(vec![Field::new("min", DataType::Int32, true)]); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("schema with wrong column count must be rejected"); + assert!(err.to_string().contains("expected 7"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_wrong_name() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[2] = Field::new("nullCount", DataType::UInt32, false); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("schema with renamed metadata column must be rejected"); + assert!(err.to_string().contains("'nullCount'"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_min_max_type_mismatch() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[1] = Field::new("max", DataType::Int64, true); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("min/max with different types must be rejected"); + assert!(err.to_string().contains("'min' type"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_wrong_metadata_type() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[4] = Field::new("fragment_id", DataType::UInt32, false); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("metadata column with wrong type must be rejected"); + assert!(err.to_string().contains("fragment_id"), "got: {}", err); + } + + #[test] + fn test_validate_schema_rejects_nullable_metadata() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[2] = Field::new("null_count", DataType::UInt32, true); + f + }); + let err = super::validate_zonemap_stats_schema(&schema) + .expect_err("nullable metadata column must be rejected"); + assert!(err.to_string().contains("non-nullable"), "got: {}", err); + } + + /// The validator's doc claims min/max nullability is not constrained (an all-NULL batch + /// is legitimate, but a caller producing a batch where the column happens to have no NULLs + /// may also declare those fields non-nullable). This test locks in that contract — both + /// the all-nullable canonical and a non-nullable-min/max variant must validate. + #[test] + fn test_validate_schema_accepts_non_nullable_min_max() { + let schema = canonical_with(DataType::Int32, |mut f| { + f[0] = Field::new("min", DataType::Int32, false); + f[1] = Field::new("max", DataType::Int32, false); + f + }); + super::validate_zonemap_stats_schema(&schema) + .expect("non-nullable min/max with matching types must validate"); + } } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 50d4f095c69..b7dccd9e9a3 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -225,6 +225,151 @@ pub(crate) async fn load_training_data( } } +/// Compute the zone-stats record batch for a column over the given fragments WITHOUT writing +/// the result to a file. +/// +/// Companion to [`lance_index::scalar::zonemap::write_zonemap_index_from_batch`]. Together they +/// support a build-time-consolidation pipeline: parallel workers each call this for a fragment +/// subset, the resulting batches are concatenated by a coordinator, and one consolidated +/// `zonemap.lance` is written. Compared to the standard `build_scalar_index` flow — which trains +/// AND writes a per-fragment file — returning the in-memory batch lets the coordinator avoid the +/// per-segment read-time round-trips that scale linearly with fragment count. +/// +/// `fragment_ids = None` indexes every fragment in the dataset, mirroring `build_scalar_index`. +pub async fn compute_zonemap_batch( + dataset: &Dataset, + column: &str, + fragment_ids: Option>, + params: lance_index::scalar::zonemap::ZoneMapIndexBuilderParams, +) -> Result { + // Validate the column exists up front for a clear error; the value type itself is taken + // from the post-scan stream below, NOT from the dataset schema. This matches the plugin's + // train_zonemap_index path: scan-time type adaptation (dictionary -> primitive, extension + // type unwrap, nullability changes) means the dataset schema and the actual data stream + // can disagree, and the builder must be configured for the latter. + if dataset.schema().field(column).is_none() { + return Err(Error::invalid_input_source( + format!("No column with name {}", column).into(), + )); + } + + // ZoneMap requires row-address ordering during scan so per-zone bounds correspond to + // contiguous physical row ranges (the same TrainingCriteria the plugin's TrainingRequest + // sets up internally — see ZoneMapIndexTrainingRequest::new). + let criteria = TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(); + + let training_data = + load_training_data(dataset, column, &criteria, None, true, fragment_ids).await?; + + // Derive value_type from the actual stream schema, matching ZoneMapIndexPlugin's + // train_zonemap_index. The first field is the scanned column (subsequent fields like + // _rowaddr are training-criteria additions). + let value_type = training_data.schema().field(0).data_type().clone(); + + let mut builder = + lance_index::scalar::zonemap::ZoneMapIndexBuilder::try_new(params, value_type)?; + builder.train(training_data).await?; + builder.zonemap_stats_as_batch() +} + +/// Driver-side companion to [`compute_zonemap_batch`]: take a pre-computed (typically +/// coordinator-concatenated) zone-stats batch and persist it as a single uncommitted zonemap +/// index segment, returning the [`IndexMetadata`] that the caller can later commit via +/// `Dataset::commit_existing_index_segments`. +/// +/// This is the path that completes build-time consolidation: parallel workers each produce a +/// per-fragment-subset batch via `compute_zonemap_batch`; the coordinator concatenates them +/// and hands the result here. The output is one `zonemap.lance` file under a fresh UUID-named +/// directory in the dataset's `indices/` tree, and an `IndexMetadata` whose `fragment_bitmap` +/// is the union of every fragment id appearing in the batch's `fragment_id` column. No +/// manifest write happens here — that is `commit_existing_index_segments`' job. +/// +/// `batch` must conform to [`zonemap_stats_schema`](lance_index::scalar::zonemap::zonemap_stats_schema) +/// (this is enforced by the inner writer). +/// +/// **Returned `IndexMetadata` fields that survive commit:** only `uuid`, `fragment_bitmap`, +/// `index_details`, and `index_version` are carried into the committed manifest by +/// `commitExistingIndexSegments`. `name`, `dataset_version`, `created_at`, `files`, and +/// `base_id` are re-derived by the commit path from the segment template + the dataset's +/// current state. Callers should not depend on round-tripping the returned `name` etc. +/// through commit unchanged. +/// +/// **Column-provenance contract:** `column` MUST be the same column every batch's stats +/// were computed against (via [`compute_zonemap_batch`]). This function only verifies that +/// `column` exists in the dataset schema; it cannot detect a coordinator that fed batches +/// computed for column A into a write call for column B. The resulting segment's +/// `IndexMetadata.fields` would point at B's field id while the on-disk stats describe A, +/// and there is NO read-time marker that would reveal the mismatch. Coordinators must thread +/// the column name through consistently. +pub async fn write_consolidated_zonemap_segment( + dataset: &Dataset, + name: &str, + column: &str, + batch: arrow_array::RecordBatch, + params: &lance_index::scalar::zonemap::ZoneMapIndexBuilderParams, +) -> Result { + use arrow_array::cast::AsArray; + use arrow_array::types::UInt64Type; + use lance_index::scalar::IndexStore; + use lance_index::scalar::zonemap::{ZONEMAP_INDEX_VERSION, write_zonemap_index_from_batch}; + use roaring::RoaringBitmap; + use uuid::Uuid; + + // Validate the indexed column exists in the dataset schema and capture its field id for + // IndexMetadata.fields. We deliberately do NOT cross-check the batch's min/max type + // against the column type — write_zonemap_index_from_batch validates structural shape, and + // value-type adaptation (dict→primitive etc.) is the writer's domain. + let field = dataset.schema().field(column).ok_or_else(|| { + Error::invalid_input_source(format!("No column with name {}", column).into()) + })?; + let field_id = field.id; + + // Derive fragment bitmap from the batch's fragment_id column BEFORE consuming the batch + // in write_zonemap_index_from_batch. The schema validator inside the writer will reject a + // missing column, but we need values here to build the bitmap, so a missing column shows + // up as a clearer error than the inner validator's "expected column 5 to be ..." message. + let frag_col = batch.column_by_name("fragment_id").ok_or_else(|| { + Error::invalid_input_source( + "consolidated zonemap batch missing 'fragment_id' column".into(), + ) + })?; + let frag_array = frag_col.as_primitive_opt::().ok_or_else(|| { + Error::invalid_input_source( + "consolidated zonemap batch 'fragment_id' must be UInt64".into(), + ) + })?; + let mut fragment_bitmap = RoaringBitmap::new(); + for f in frag_array.values() { + if *f > u32::MAX as u64 { + return Err(Error::invalid_input_source( + format!("fragment_id {} exceeds u32::MAX", f).into(), + )); + } + fragment_bitmap.insert(*f as u32); + } + + let uuid = Uuid::new_v4(); + let index_store = LanceIndexStore::from_dataset_for_new(dataset, &uuid.to_string())?; + write_zonemap_index_from_batch(batch, params, &index_store).await?; + + let index_details = + prost_types::Any::from_msg(&lance_index::pbold::ZoneMapIndexDetails::default()) + .map_err(|e| Error::internal(format!("failed to encode ZoneMapIndexDetails: {}", e)))?; + + Ok(IndexMetadata { + uuid, + fields: vec![field_id], + name: name.to_string(), + dataset_version: dataset.version_id(), + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(index_details)), + index_version: ZONEMAP_INDEX_VERSION as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(index_store.list_files_with_sizes().await?), + }) +} + // TODO: Allow users to register their own plugins static SCALAR_INDEX_PLUGIN_REGISTRY: LazyLock> = LazyLock::new(IndexPluginRegistry::with_default_plugins); @@ -971,6 +1116,185 @@ mod tests { } } + #[tokio::test] + async fn test_compute_zonemap_batch_round_trip() { + // Round-trip the new build-time-consolidation API surface: + // 1. Per-fragment compute_zonemap_batch produces conformant batches + // 2. Concatenated batches feed write_zonemap_index_from_batch successfully + // 3. The resulting zonemap.lance is readable via IndexStore::open_index_file with + // the canonical schema preserved + // 4. Read-back fragment_id column equals the union of input fragment ids + use arrow::compute::concat_batches; + use lance_index::scalar::IndexStore; + use lance_index::scalar::lance_format::LanceIndexStore; + use lance_index::scalar::zonemap::{ + ZONEMAP_FILENAME, ZoneMapIndexBuilderParams, validate_zonemap_stats_schema, + write_zonemap_index_from_batch, zonemap_stats_schema, + }; + + // 4 fragments × 10 rows. We deliberately pick rows_per_zone=4 (not the default) so each + // fragment spans multiple zones (10/4 = 3 zones — two full + one trailing). This + // exercises the path that matters for the consolidation claim: a zonemap batch where + // fragment_id repeats across consecutive rows. A previous version of this test used the + // default rows_per_zone (8192), which only ever produced one zone per fragment — a + // pathological case that hides per-fragment-multi-zone bugs. + const ROWS_PER_FRAG: u64 = 10; + const ROWS_PER_ZONE: u64 = 4; + let zones_per_frag = ROWS_PER_FRAG.div_ceil(ROWS_PER_ZONE) as usize; // 3 + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset( + FragmentCount::from(4), + FragmentRowCount::from(ROWS_PER_FRAG as u32), + ) + .await + .unwrap(); + + let params = ZoneMapIndexBuilderParams::new(ROWS_PER_ZONE); + + // 1. Compute per-fragment-subset batches. + let batch_0_1 = + compute_zonemap_batch(&dataset, "values", Some(vec![0u32, 1]), params.clone()) + .await + .unwrap(); + let batch_2_3 = + compute_zonemap_batch(&dataset, "values", Some(vec![2u32, 3]), params.clone()) + .await + .unwrap(); + + // Each batch must validate against the canonical schema. + validate_zonemap_stats_schema(batch_0_1.schema().as_ref()).unwrap(); + validate_zonemap_stats_schema(batch_2_3.schema().as_ref()).unwrap(); + + // 2. Concatenate. The canonical schema with the actual value type is the join target. + let canonical = zonemap_stats_schema(&DataType::Int32); + let concatenated = concat_batches(&canonical, [&batch_0_1, &batch_2_3]).unwrap(); + + // 3. Write a consolidated zonemap.lance. + let test_dir = TempStrDir::default(); + let object_store = Arc::new(lance_io::object_store::ObjectStore::local()); + let index_dir = object_store::path::Path::parse(test_dir.as_str()).unwrap(); + let store = LanceIndexStore::new( + object_store.clone(), + index_dir.clone(), + Arc::new(lance_core::cache::LanceCache::no_cache()), + ); + write_zonemap_index_from_batch(concatenated.clone(), ¶ms, &store) + .await + .unwrap(); + + // 4. The written file should round-trip read with the same schema and row count, AND + // the fragment-id column should produce the exact multiset of fragment ids from the + // inputs. We compare counts (not just set membership) so a regression that accidentally + // duplicated a fragment row — same union, wrong cardinality — would fail loudly. + let read_back = store.open_index_file(ZONEMAP_FILENAME).await.unwrap(); + let read_batch = read_back + .read_range(0..read_back.num_rows(), None) + .await + .unwrap(); + assert_eq!( + read_batch.num_rows(), + concatenated.num_rows(), + "round-trip read row count must match consolidated batch" + ); + validate_zonemap_stats_schema(read_batch.schema().as_ref()).unwrap(); + let mut frag_counts: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + for fid in read_batch + .column_by_name("fragment_id") + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + { + *frag_counts.entry(fid).or_insert(0) += 1; + } + // With ROWS_PER_FRAG=10 and ROWS_PER_ZONE=4 each fragment contributes + // ceil(10/4) = 3 zones; total 4 × 3 = 12 zones across the consolidated batch. + let expected: std::collections::BTreeMap = + (0u64..4u64).map(|f| (f, zones_per_frag)).collect(); + assert_eq!( + frag_counts, expected, + "consolidated zonemap must contain ceil(ROWS_PER_FRAG/ROWS_PER_ZONE) zones per \ + input fragment" + ); + assert_eq!( + read_batch.num_rows(), + (zones_per_frag * 4), + "consolidated batch total zone count must equal zones_per_frag × num_fragments" + ); + } + + #[tokio::test] + async fn test_write_consolidated_zonemap_segment_end_to_end() { + // End-to-end check of the driver-side helper: per-fragment compute → concat → write → + // returned IndexMetadata captures (a) every input fragment in the bitmap, (b) the + // correct field id for the indexed column, (c) the canonical ZoneMap index_version + // and (d) at least one file entry under the freshly allocated UUID directory. + use crate::index::scalar::{compute_zonemap_batch, write_consolidated_zonemap_segment}; + use arrow::compute::concat_batches; + use lance_index::scalar::zonemap::{ZONEMAP_INDEX_VERSION, ZoneMapIndexBuilderParams}; + + let dataset = lance_datagen::gen_batch() + .col("values", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(10)) + .await + .unwrap(); + + let params = ZoneMapIndexBuilderParams::new(4); + let batch_0_1 = + compute_zonemap_batch(&dataset, "values", Some(vec![0u32, 1]), params.clone()) + .await + .unwrap(); + let batch_2_3 = + compute_zonemap_batch(&dataset, "values", Some(vec![2u32, 3]), params.clone()) + .await + .unwrap(); + let canonical = lance_index::scalar::zonemap::zonemap_stats_schema(&DataType::Int32); + let concatenated = concat_batches(&canonical, [&batch_0_1, &batch_2_3]).unwrap(); + + let metadata = write_consolidated_zonemap_segment( + &dataset, + "values_zm", + "values", + concatenated, + ¶ms, + ) + .await + .unwrap(); + + assert_eq!(metadata.name, "values_zm"); + assert_eq!(metadata.index_version as u32, ZONEMAP_INDEX_VERSION); + assert_eq!(metadata.dataset_version, dataset.version_id()); + assert!(metadata.created_at.is_some()); + + let field_id = dataset.schema().field("values").unwrap().id; + assert_eq!(metadata.fields, vec![field_id]); + + let bitmap = metadata.fragment_bitmap.expect("bitmap must be set"); + let frags: Vec = bitmap.iter().collect(); + assert_eq!( + frags, + vec![0, 1, 2, 3], + "bitmap must cover every fragment in the input batches" + ); + + // The writer should have produced at least zonemap.lance under the new UUID. + let files = metadata + .files + .expect("files must be populated for a freshly written segment"); + assert!( + files.iter().any(|f| f + .path + .ends_with(lance_index::scalar::zonemap::ZONEMAP_FILENAME)), + "freshly written segment must contain {}; got {:?}", + lance_index::scalar::zonemap::ZONEMAP_FILENAME, + files.iter().map(|f| &f.path).collect::>() + ); + } + #[tokio::test] async fn test_load_training_data_addr_sort() { // Create test data using lance_datagen