Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3646,3 +3646,276 @@ fn inner_get_zonemap_stats<'local>(

Ok(array_list)
}

/////////////////////////////////////////////
// Build-time consolidation: compute batch //
/////////////////////////////////////////////

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeComputeZonemapBatch(
mut env: JNIEnv,
java_dataset: JObject,
jcolumn_name: JString,
jfragment_ids: JObject,
jparams_json: JString,
array_addr: jlong,
schema_addr: jlong,
) {
ok_or_throw_without_return!(
env,
inner_compute_zonemap_batch(
&mut env,
java_dataset,
jcolumn_name,
jfragment_ids,
jparams_json,
array_addr,
schema_addr,
)
)
}

fn inner_compute_zonemap_batch(
env: &mut JNIEnv<'_>,
java_dataset: JObject,
jcolumn_name: JString,
jfragment_ids: JObject,
jparams_json: JString,
array_addr: jlong,
schema_addr: jlong,
) -> Result<()> {
use arrow::array::StructArray;
use arrow::ffi::FFI_ArrowArray;
use arrow_array::Array;
use jni::objects::JLongArray;
use lance::index::scalar::compute_zonemap_batch;
use lance_index::scalar::zonemap::ZoneMapIndexBuilderParams;

let column_name: String = jcolumn_name.extract(env)?;
let params_json: String = jparams_json.extract(env)?;

// Empty/missing params JSON means "use defaults". Mirrors the Rust plugin's
// unwrap_or_default() so callers without configuration overrides don't have to
// hand-serialize a default object.
let params: ZoneMapIndexBuilderParams = if params_json.is_empty() {
ZoneMapIndexBuilderParams::default()
} else {
serde_json::from_str(&params_json)
.map_err(|e| Error::input_error(format!("Invalid zonemap params JSON: {}", e)))?
};

// null fragment_ids array means "all fragments" (matches the Rust Option<Vec<u32>>
// contract where None covers every fragment).
let fragment_ids: Option<Vec<u32>> = if jfragment_ids.is_null() {
None
} else {
let jarr = JLongArray::from(jfragment_ids);
let len = env.get_array_length(&jarr)? as usize;
let mut buf = vec![0i64; len];
env.get_long_array_region(&jarr, 0, &mut buf)?;
// Lance fragment ids are u32 on disk; reject negative values from Java rather
// than silently wrapping.
let ids = buf
.into_iter()
.map(|v| {
if v < 0 || v > u32::MAX as i64 {
Err(Error::input_error(format!(
"fragment id {} out of u32 range",
v
)))
} else {
Ok(v as u32)
}
})
.collect::<Result<Vec<u32>>>()?;
Some(ids)
};

let dataset = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.inner.clone()
};

let batch = RT
.block_on(async move {
compute_zonemap_batch(&dataset, &column_name, fragment_ids, params).await
})
.map_err(Error::from)?;

// Export to the Java-allocated FFI buffers. The Java consumer reconstructs a
// VectorSchemaRoot via Data.importVectorSchemaRoot(allocator, ArrowArray, ArrowSchema, null),
// which expects the FFI array to be a struct array over the batch's columns and the
// FFI schema to describe the batch's top-level Schema.
let schema = batch.schema();
let struct_array: StructArray = batch.into();
let array_data = struct_array.into_data();
let ffi_array = FFI_ArrowArray::new(&array_data);
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;

unsafe {
std::ptr::write_unaligned(array_addr as *mut FFI_ArrowArray, ffi_array);
std::ptr::write_unaligned(schema_addr as *mut FFI_ArrowSchema, ffi_schema);
}
Ok(())
}

/////////////////////////////////////////////
// Build-time consolidation: write segment //
/////////////////////////////////////////////

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeWriteZonemapIndexFromBatches<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject<'local>,
jindex_name: JString,
jcolumn_name: JString,
jarray_addrs: JObject<'local>,
jschema_addrs: JObject<'local>,
jparams_json: JString,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_write_zonemap_index_from_batches(
&mut env,
java_dataset,
jindex_name,
jcolumn_name,
jarray_addrs,
jschema_addrs,
jparams_json,
)
)
}

fn inner_write_zonemap_index_from_batches<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject<'local>,
jindex_name: JString,
jcolumn_name: JString,
jarray_addrs: JObject<'local>,
jschema_addrs: JObject<'local>,
jparams_json: JString,
) -> Result<JObject<'local>> {
use arrow::array::{RecordBatch, StructArray};
use arrow::compute::concat_batches;
use arrow::ffi::{FFI_ArrowArray, from_ffi_and_data_type};
use jni::objects::JLongArray;
use lance::index::scalar::write_consolidated_zonemap_segment;
use lance_index::scalar::zonemap::ZoneMapIndexBuilderParams;

let index_name: String = jindex_name.extract(env)?;
let column_name: String = jcolumn_name.extract(env)?;
let params_json: String = jparams_json.extract(env)?;

let params: ZoneMapIndexBuilderParams = if params_json.is_empty() {
ZoneMapIndexBuilderParams::default()
} else {
serde_json::from_str(&params_json)
.map_err(|e| Error::input_error(format!("Invalid zonemap params JSON: {}", e)))?
};

if jarray_addrs.is_null() || jschema_addrs.is_null() {
return Err(Error::input_error(
"arrayAddrs / schemaAddrs must not be null".to_string(),
));
}
let jarrs = JLongArray::from(jarray_addrs);
let jschs = JLongArray::from(jschema_addrs);
let n_batches = env.get_array_length(&jarrs)? as usize;
let n_schemas = env.get_array_length(&jschs)? as usize;
if n_batches != n_schemas {
return Err(Error::input_error(format!(
"arrayAddrs length {} != schemaAddrs length {}",
n_batches, n_schemas
)));
}
if n_batches == 0 {
return Err(Error::input_error(
"must provide at least one worker batch".to_string(),
));
}
let mut array_buf = vec![0i64; n_batches];
let mut schema_buf = vec![0i64; n_batches];
env.get_long_array_region(&jarrs, 0, &mut array_buf)?;
env.get_long_array_region(&jschs, 0, &mut schema_buf)?;

// Reconstruct each Arrow batch from the caller-allocated FFI buffers. `FFI_ArrowArray::
// from_raw` moves out of the raw pointer's contents into an owned Rust value and replaces
// the source location with an empty (release-pointer null) struct. When the Rust value
// drops at end-of-scope its destructor fires the original release callback; Java's
// try-with-resources later sees a null release pointer and does nothing — single
// invocation, no double-release. This mirrors the inner_write_batch ownership contract in
// file_writer.rs.
let mut batches: Vec<RecordBatch> = Vec::with_capacity(n_batches);
for i in 0..n_batches {
let arr_ptr = array_buf[i] as *mut FFI_ArrowArray;
let sch_ptr = schema_buf[i] as *mut FFI_ArrowSchema;
if arr_ptr.is_null() || sch_ptr.is_null() {
return Err(Error::input_error(format!(
"null FFI pointer at index {}",
i
)));
}
let c_array = unsafe { FFI_ArrowArray::from_raw(arr_ptr) };
let c_schema = unsafe { FFI_ArrowSchema::from_raw(sch_ptr) };
let data_type = DataType::try_from(&c_schema)?;
let array_data = unsafe { from_ffi_and_data_type(c_array, data_type) }?;
batches.push(RecordBatch::from(StructArray::from(array_data)));
}

let dataset = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.inner.clone()
};

// Use the FIRST worker batch's schema as the concat join target. compute_zonemap_batch
// derives min/max value_type from the post-scan training stream (dictionary→primitive
// adaptation, extension-type unwrap, etc.) rather than from `dataset.schema()`, so the
// dataset-schema-derived type can differ from what workers actually emit. Worker batches
// come from the same scan path on every node, so their schemas are mutually consistent.
// Fall back to zonemap_stats_schema(dataset.schema()-derived value_type) only when no
// worker exists, which the up-front non-empty check has already ruled out.
let canonical_schema = batches[0].schema();
// Defence-in-depth: surface a clear error if a worker emitted a divergent schema
// (e.g. a producer that bypassed compute_zonemap_batch and hand-constructed batches).
// Use strict equality, not Schema::contains — the latter is an asymmetric superset
// check that would let metadata-only mismatches slip through and then surface as a
// less helpful concat_batches error deeper in the stack.
for (i, b) in batches.iter().enumerate().skip(1) {
if b.schema().as_ref() != canonical_schema.as_ref() {
return Err(Error::input_error(format!(
"worker batch {} schema {:?} differs from batch 0 schema {:?}",
i,
b.schema().as_ref(),
canonical_schema.as_ref(),
)));
}
}
// write_consolidated_zonemap_segment validates `column_name` exists in the dataset schema
// as its first action — no UUID directory has been allocated and no zonemap.lance has
// been written when that check fires. Duplicating the check here would not save any
// observable work, so we let the helper handle it.
let concatenated = concat_batches(&canonical_schema, batches.iter()).map_err(|e| {
Error::input_error(format!(
"failed to concatenate worker batches against canonical schema: {}",
e
))
})?;

let metadata = RT
.block_on(async {
write_consolidated_zonemap_segment(
&dataset,
&index_name,
&column_name,
concatenated,
&params,
)
.await
})
.map_err(Error::from)?;

(&metadata).into_java(env)
}
Loading
Loading