Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 107 additions & 8 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::array::{RecordBatch, RecordBatchIterator, StructArray};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::{DataType, Schema as ArrowSchema};
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::objects::{JByteArray, JIntArray, JValue, JValueGen};
use jni::{
JNIEnv,
objects::{JClass, JLongArray, JObject, JString},
Expand All @@ -19,6 +19,8 @@ use lance_io::utils::CachedFileSize;
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::iter::once;

use roaring::RoaringBitmap;

use lance::dataset::fragment::write::FragmentCreateBuilder;
use lance::io::ObjectStoreParams;
use lance_datafusion::utils::StreamingWriteSource;
Expand Down Expand Up @@ -48,8 +50,8 @@ pub(crate) struct FragmentMergeResult {
pub(crate) struct FragmentUpdateResult {
updated_fragment: Fragment,
fields_modified: Vec<u32>,
/// Physical row offsets that received column updates (from `_rowaddr` low bits).
updated_row_offsets: Vec<i64>,
/// Matched row offsets serialized as portable RoaringBitmap bytes.
updated_row_offset_bytes: Vec<u8>,
}

//////////////////
Expand Down Expand Up @@ -539,15 +541,111 @@ fn inner_update_column<'local>(
let right_on_str: String = right_on.extract(env)?;
let r =
RT.block_on(fragment.update_columns_with_offsets(reader, &left_on_str, &right_on_str))?;
let updated_row_offsets: Vec<i64> = r.matched_offsets.iter().map(|o| o as i64).collect();
let updated_row_offset_bytes = serialize_matched_offsets(&r.matched_offsets)?;
let result = FragmentUpdateResult {
updated_fragment: r.fragment,
fields_modified: r.fields_modified,
updated_row_offsets,
updated_row_offset_bytes,
};
result.into_java(env)
}

fn serialize_matched_offsets(bitmap: &RoaringBitmap) -> Result<Vec<u8>> {
let mut buf = Vec::new();
bitmap.serialize_into(&mut buf).map_err(|e| {
Error::runtime_error(format!(
"failed to serialize matched row offsets RoaringBitmap: {e}"
))
})?;
Ok(buf)
}

fn deserialize_row_offset_bytes(bytes: &[u8]) -> Result<RoaringBitmap> {
if bytes.is_empty() {
return Ok(RoaringBitmap::new());
}
RoaringBitmap::deserialize_from(bytes).map_err(|e| {
Error::input_error(format!(
"invalid updatedRowOffsetBytes RoaringBitmap bytes: {e}"
))
})
}

fn expand_row_offset_bytes_to_i64(bitmap: &RoaringBitmap) -> Vec<i64> {
bitmap.iter().map(|o| o as i64).collect()
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_FragmentUpdateResult_expandRowOffsetsFromBytes<
'local,
>(
mut env: JNIEnv<'local>,
_cls: JClass,
jbytes: JByteArray,
) -> JLongArray<'local> {
ok_or_throw_with_return!(
env,
inner_expand_updated_row_offset_bytes(&mut env, jbytes),
unsafe { JLongArray::from_raw(std::ptr::null_mut()) }
)
}

fn inner_expand_updated_row_offset_bytes<'local>(
env: &mut JNIEnv<'local>,
jbytes: JByteArray,
) -> Result<JLongArray<'local>> {
let buf = env.convert_byte_array(&jbytes)?;
let bitmap = deserialize_row_offset_bytes(&buf)?;
let offsets = expand_row_offset_bytes_to_i64(&bitmap);
let arr = env.new_long_array(offsets.len() as i32)?;
if !offsets.is_empty() {
env.set_long_array_region(&arr, 0, &offsets)?;
}
Ok(arr)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_FragmentUpdateResult_encodeRowOffsetsToBytes<
'local,
>(
mut env: JNIEnv<'local>,
_cls: JClass,
joffsets: JLongArray,
) -> JByteArray<'local> {
ok_or_throw_with_return!(
env,
inner_encode_updated_row_offset_bytes(&mut env, joffsets),
unsafe { JByteArray::from_raw(std::ptr::null_mut()) }
)
}

fn inner_encode_updated_row_offset_bytes<'local>(
env: &mut JNIEnv<'local>,
joffsets: JLongArray,
) -> Result<JByteArray<'local>> {
let len = env.get_array_length(&joffsets)?;
let mut buf: Vec<i64> = vec![0; len as usize];
if len > 0 {
env.get_long_array_region(&joffsets, 0, buf.as_mut_slice())?;
}
let mut bitmap = RoaringBitmap::new();
for offset in buf {
if offset < 0 {
return Err(Error::input_error(format!(
"updatedRowOffsets must be non-negative, got {offset}"
)));
}
if offset > u32::MAX as i64 {
return Err(Error::input_error(format!(
"updatedRowOffsets value {offset} exceeds u32::MAX"
)));
}
bitmap.insert(offset as u32);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecated long[] encoder casts every non-negative value to u32, so offsets above u32::MAX silently become different rows and can corrupt last_updated metadata.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
let bytes = serialize_matched_offsets(&bitmap)?;
Ok(env.byte_array_from_slice(&bytes)?)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_RowIdMeta_nativeEncodeRowIds(
mut env: JNIEnv,
Expand Down Expand Up @@ -591,7 +689,7 @@ const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResul
const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str =
"(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V";
const FRAGMENT_UPDATE_RESULT_CLASS: &str = "org/lance/fragment/FragmentUpdateResult";
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J[J)V";
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J[B)V";

impl IntoJava for &FragmentMergeResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
Expand All @@ -612,14 +710,15 @@ impl IntoJava for &FragmentUpdateResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let java_updated_fragment = self.updated_fragment.into_java(env)?;
let java_fields_modified = JLance(self.fields_modified.clone()).into_java(env)?;
let java_updated_row_offsets = JLance(self.updated_row_offsets.clone()).into_java(env)?;
let java_updated_row_offset_bytes =
env.byte_array_from_slice(&self.updated_row_offset_bytes)?;
Ok(env.new_object(
FRAGMENT_UPDATE_RESULT_CLASS,
FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG,
&[
JValueGen::Object(&java_updated_fragment),
JValueGen::Object(&java_fields_modified),
JValueGen::Object(&java_updated_row_offsets),
JValueGen::Object(&java_updated_row_offset_bytes),
],
)?)
}
Expand Down
101 changes: 97 additions & 4 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use jni::sys::{jboolean, jint, jlong};
use lance::dataset::CommitBuilder;
use lance::dataset::transaction::{
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
UpdateMap, UpdateMapEntry, UpdateMode,
UpdateMap, UpdateMapEntry, UpdateMode, UpdatedFragmentOffsets,
};
use lance::io::ObjectStoreParams;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
Expand Down Expand Up @@ -433,7 +433,7 @@ fn convert_to_java_operation_inner<'local>(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: _,
updated_fragment_offsets: _,
updated_fragment_offsets,
} => {
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
.iter()
Expand All @@ -457,16 +457,56 @@ fn convert_to_java_operation_inner<'local>(
&[JValue::Object(&update_mode)],
)?
.l()?;
// Serialize updated_fragment_offsets to Java Map<Long, byte[]>.
// Values are portable RoaringBitmap bytes so the JNI boundary stays O(bitmap size)
// rather than O(n rows). Empty HashMap when None so the Java constructor always
// receives a non-null map.
let java_offsets_map = {
let java_map = env.new_object("java/util/HashMap", "()V", &[])?;
if let Some(UpdatedFragmentOffsets(ref map)) = updated_fragment_offsets {
for (frag_id, bitmap) in map {
let mut buf: Vec<u8> = Vec::new();
bitmap.serialize_into(&mut buf).map_err(|e| {
Error::runtime_error(format!(
"failed to serialize updatedFragmentOffsets for fragment \
{frag_id}: {e}"
))
})?;
// JNI byte arrays are signed i8; reinterpret without copying.
let buf_i8: &[i8] = unsafe {
std::slice::from_raw_parts(buf.as_ptr() as *const i8, buf.len())
};
env.with_local_frame(4, |env| {
let java_key = env.new_object(
"java/lang/Long",
"(J)V",
&[JValue::Long(*frag_id as i64)],
)?;
let java_arr = env.new_byte_array(buf_i8.len() as i32)?;
env.set_byte_array_region(&java_arr, 0, buf_i8)?;
env.call_method(
&java_map,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&java_key), JValue::Object(&*java_arr)],
)?;
Ok::<JObject, Error>(JObject::null())
})?;
}
}
java_map
};
Ok(env.new_object(
"org/lance/operation/Update",
"(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;)V",
"(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;Ljava/util/Map;)V",
&[
JValue::Object(&removed_fragment_ids_obj),
JValue::Object(&updated_fragments_obj),
JValue::Object(&new_fragments_obj),
JValueGen::Object(&fields_modified),
JValueGen::Object(&fields_for_preserving_frag_bitmap),
JValue::Object(&update_mode_optional),
JValue::Object(&java_offsets_map),
],
)?)
}
Expand Down Expand Up @@ -1238,6 +1278,59 @@ fn convert_to_rust_operation(
update_mode.extract_object(env)
})?;

let updated_fragment_offsets = {
let offsets_obj = env
.call_method(
java_operation,
"updatedFragmentOffsets",
"()Ljava/util/Map;",
&[],
)?
.l()?;
if offsets_obj.is_null() {
None
} else {
let jmap = JMap::from_env(env, &offsets_obj)?;
let mut iter = jmap.iter(env)?;
let mut offsets: HashMap<u64, RoaringBitmap> = HashMap::new();
// Per-iteration local frame: iterator key/value JNI refs are released each
// loop so large multi-fragment maps cannot exhaust the local reference table.
loop {
let entry = env.with_local_frame(
8,
|env| -> Result<Option<(u64, RoaringBitmap)>> {
let Some((key, value)) = iter.next(env)? else {
return Ok(None);
};
let frag_id =
env.call_method(&key, "longValue", "()J", &[])?.j()? as u64;
let buf: Vec<u8> =
env.convert_byte_array(JByteArray::from(value))?;
let bitmap = RoaringBitmap::deserialize_from(buf.as_slice())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit path accepts arbitrary offset bitmaps and later expands them into a full offset vector before checking fragment bounds. A compact valid Roaring bitmap can force huge allocations during a Java RewriteColumns commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.map_err(|e| {
Error::input_error(format!(
"invalid updatedFragmentOffsets RoaringBitmap bytes \
for fragment {frag_id}: {e}"
))
})?;
Ok(Some((frag_id, bitmap)))
},
)?;
match entry {
None => break,
Some((frag_id, bitmap)) => {
offsets.insert(frag_id, bitmap);
}
}
}
if offsets.is_empty() {
None
} else {
Some(UpdatedFragmentOffsets(offsets))
}
}
};

Operation::Update {
removed_fragment_ids,
updated_fragments,
Expand All @@ -1247,7 +1340,7 @@ fn convert_to_rust_operation(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
updated_fragment_offsets: None,
updated_fragment_offsets,
}
}
"DataReplacement" => {
Expand Down
Loading
Loading