-
Notifications
You must be signed in to change notification settings - Fork 729
fix(java): expose updatedFragmentOffsets on Update operation for RewriteColumns #6748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1884a7d
61dad6a
09e99cd
7768393
e80b5b8
af17fd3
06b34e8
f292536
d2c16f9
3d0445f
f7165bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ use jni::sys::{jboolean, jint, jlong}; | |
| use lance::dataset::CommitBuilder; | ||
| use lance::dataset::transaction::{ | ||
| DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, | ||
| UpdateMap, UpdateMapEntry, UpdateMode, | ||
| UpdateMap, UpdateMapEntry, UpdateMode, UpdatedFragmentOffsets, | ||
| }; | ||
| use lance::io::ObjectStoreParams; | ||
| use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; | ||
|
|
@@ -433,7 +433,7 @@ fn convert_to_java_operation_inner<'local>( | |
| fields_for_preserving_frag_bitmap, | ||
| update_mode, | ||
| inserted_rows_filter: _, | ||
| updated_fragment_offsets: _, | ||
| updated_fragment_offsets, | ||
| } => { | ||
| let removed_ids: Vec<JLance<i64>> = removed_fragment_ids | ||
| .iter() | ||
|
|
@@ -457,16 +457,56 @@ fn convert_to_java_operation_inner<'local>( | |
| &[JValue::Object(&update_mode)], | ||
| )? | ||
| .l()?; | ||
| // Serialize updated_fragment_offsets to Java Map<Long, byte[]>. | ||
| // Values are portable RoaringBitmap bytes so the JNI boundary stays O(bitmap size) | ||
| // rather than O(n rows). Empty HashMap when None so the Java constructor always | ||
| // receives a non-null map. | ||
| let java_offsets_map = { | ||
| let java_map = env.new_object("java/util/HashMap", "()V", &[])?; | ||
| if let Some(UpdatedFragmentOffsets(ref map)) = updated_fragment_offsets { | ||
| for (frag_id, bitmap) in map { | ||
| let mut buf: Vec<u8> = Vec::new(); | ||
| bitmap.serialize_into(&mut buf).map_err(|e| { | ||
| Error::runtime_error(format!( | ||
| "failed to serialize updatedFragmentOffsets for fragment \ | ||
| {frag_id}: {e}" | ||
| )) | ||
| })?; | ||
| // JNI byte arrays are signed i8; reinterpret without copying. | ||
| let buf_i8: &[i8] = unsafe { | ||
| std::slice::from_raw_parts(buf.as_ptr() as *const i8, buf.len()) | ||
| }; | ||
| env.with_local_frame(4, |env| { | ||
| let java_key = env.new_object( | ||
| "java/lang/Long", | ||
| "(J)V", | ||
| &[JValue::Long(*frag_id as i64)], | ||
| )?; | ||
| let java_arr = env.new_byte_array(buf_i8.len() as i32)?; | ||
| env.set_byte_array_region(&java_arr, 0, buf_i8)?; | ||
| env.call_method( | ||
| &java_map, | ||
| "put", | ||
| "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", | ||
| &[JValue::Object(&java_key), JValue::Object(&*java_arr)], | ||
| )?; | ||
| Ok::<JObject, Error>(JObject::null()) | ||
| })?; | ||
| } | ||
| } | ||
| java_map | ||
| }; | ||
| Ok(env.new_object( | ||
| "org/lance/operation/Update", | ||
| "(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;)V", | ||
| "(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;Ljava/util/Map;)V", | ||
| &[ | ||
| JValue::Object(&removed_fragment_ids_obj), | ||
| JValue::Object(&updated_fragments_obj), | ||
| JValue::Object(&new_fragments_obj), | ||
| JValueGen::Object(&fields_modified), | ||
| JValueGen::Object(&fields_for_preserving_frag_bitmap), | ||
| JValue::Object(&update_mode_optional), | ||
| JValue::Object(&java_offsets_map), | ||
| ], | ||
| )?) | ||
| } | ||
|
|
@@ -1238,6 +1278,59 @@ fn convert_to_rust_operation( | |
| update_mode.extract_object(env) | ||
| })?; | ||
|
|
||
| let updated_fragment_offsets = { | ||
| let offsets_obj = env | ||
| .call_method( | ||
| java_operation, | ||
| "updatedFragmentOffsets", | ||
| "()Ljava/util/Map;", | ||
| &[], | ||
| )? | ||
| .l()?; | ||
| if offsets_obj.is_null() { | ||
| None | ||
| } else { | ||
| let jmap = JMap::from_env(env, &offsets_obj)?; | ||
| let mut iter = jmap.iter(env)?; | ||
| let mut offsets: HashMap<u64, RoaringBitmap> = HashMap::new(); | ||
| // Per-iteration local frame: iterator key/value JNI refs are released each | ||
| // loop so large multi-fragment maps cannot exhaust the local reference table. | ||
| loop { | ||
| let entry = env.with_local_frame( | ||
| 8, | ||
| |env| -> Result<Option<(u64, RoaringBitmap)>> { | ||
| let Some((key, value)) = iter.next(env)? else { | ||
| return Ok(None); | ||
| }; | ||
| let frag_id = | ||
| env.call_method(&key, "longValue", "()J", &[])?.j()? as u64; | ||
| let buf: Vec<u8> = | ||
| env.convert_byte_array(JByteArray::from(value))?; | ||
| let bitmap = RoaringBitmap::deserialize_from(buf.as_slice()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The commit path accepts arbitrary offset bitmaps and later expands them into a full offset vector before checking fragment bounds. A compact valid Roaring bitmap can force huge allocations during a Java RewriteColumns commit.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| .map_err(|e| { | ||
| Error::input_error(format!( | ||
| "invalid updatedFragmentOffsets RoaringBitmap bytes \ | ||
| for fragment {frag_id}: {e}" | ||
| )) | ||
| })?; | ||
| Ok(Some((frag_id, bitmap))) | ||
| }, | ||
| )?; | ||
| match entry { | ||
| None => break, | ||
| Some((frag_id, bitmap)) => { | ||
| offsets.insert(frag_id, bitmap); | ||
| } | ||
| } | ||
| } | ||
| if offsets.is_empty() { | ||
| None | ||
| } else { | ||
| Some(UpdatedFragmentOffsets(offsets)) | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| Operation::Update { | ||
| removed_fragment_ids, | ||
| updated_fragments, | ||
|
|
@@ -1247,7 +1340,7 @@ fn convert_to_rust_operation( | |
| fields_for_preserving_frag_bitmap, | ||
| update_mode, | ||
| inserted_rows_filter: None, | ||
| updated_fragment_offsets: None, | ||
| updated_fragment_offsets, | ||
| } | ||
| } | ||
| "DataReplacement" => { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deprecated long[] encoder casts every non-negative value to u32, so offsets above u32::MAX silently become different rows and can corrupt last_updated metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.