Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions java/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,38 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_openStream(
}

fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) -> Result<()> {
if stream_addr == 0 {
return Err(Error::input_error(
"ArrowArrayStream address must not be null".to_string(),
));
}

// Reject a stream that already holds a producer. We write the C struct in place below with
// `ptr::write_unaligned`, which does not run any destructor on the previous contents. If the
// caller passed a stream whose `release` callback is already set (e.g. it was populated by an
// earlier export and not yet released), overwriting it would drop that callback and leak the
// first producer's resources. A freshly-allocated `ArrowArrayStream` has a null `release`, per
// the Arrow C Data Interface, so requiring `release == None` is the contract for "empty".
//
// The struct is allocated by Arrow Java inside an ArrowBuf and is not guaranteed to be aligned
// (hence `write_unaligned` below), so we must not form a reference to it. We read only the
// `release` field through an unaligned read: `addr_of!` computes the field address without
// creating an intermediate, possibly-unaligned reference, and the field is an `Option<fn>`
// which is `Copy` with no destructor, so reading a copy of it leaves the caller's stream
// untouched.
let release_is_set = unsafe {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The populated-stream guard checks the release callback before constructing and writing the new stream, so two concurrent exports to the same empty stream can both pass the check and then overwrite each other. That can leak the first producer and leave the caller draining whichever stream won the race.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that the check-then-write isn't atomic against a concurrent second export into the same stream. I don't think this one can be fixed on the Lance side, though: the ArrowArrayStream is a plain C struct in caller-owned memory with no synchronization, so two concurrent exports into the same struct are a data race regardless of what the guard does — the same constraint Arrow's C Data Interface itself imposes (a stream is single-producer, drained by one consumer). The guard's purpose is to catch the sequential "export twice into the same stream" mistake, which it does.

Rather than imply a safety we can't provide, I documented the contract on exportArrowStream: the stream must not be shared across concurrent exports; use a separate stream per export. (scanBatches() has the same shape — it allocates a fresh stream each call.) Let me know if you'd prefer a different framing. Updated in bc42f5c.

let stream_ptr = stream_addr as *const FFI_ArrowArrayStream;
let release = std::ptr::read_unaligned(std::ptr::addr_of!((*stream_ptr).release));
release.is_some()
};
if release_is_set {
return Err(Error::input_error(
"ArrowArrayStream is already populated; exporting into it would leak the existing \
producer. Pass a freshly-allocated, empty stream."
.to_string(),
));
}

let record_batch_stream = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?;
Expand Down
60 changes: 60 additions & 0 deletions java/src/main/java/org/lance/ipc/LanceScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,66 @@ public ArrowReader scanBatches() {
}
}

/**
* Export this scan's results into a caller-owned Arrow C stream identified by its memory address,
* using the Arrow C Data Interface release callback to transfer ownership.
*
* <p>This method intentionally takes a raw {@code streamAddress} (an {@code ArrowArrayStream}
* memory address) rather than a Java {@link ArrowArrayStream} object. A typed parameter would be
* an {@code org.apache.arrow.c.ArrowArrayStream} loaded by <em>Lance's</em> classloader / Arrow
* version; a caller running a different Arrow version (or under a different classloader, e.g.
* Spark + a native engine bundling its own Arrow) cannot construct that exact type and would hit
* a {@code ClassCastException}/{@code NoSuchMethodError} at the very boundary this method exists
* to cross. The C Data Interface ABI is stable across Arrow versions, so passing the C struct's
* address keeps the two sides fully decoupled: the caller allocates the stream with <em>its
* own</em> Arrow runtime and only the {@code long} address crosses into Lance. See gluten#12263
* for the cross-Arrow-version integration that motivated this.
*
* <p>Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created on Lance's side:
* Lance writes the C struct directly at {@code streamAddress} and the caller drives the read loop
* with its own Arrow runtime.
*
* <p>The {@code streamAddress} must point to a freshly-allocated, empty {@code ArrowArrayStream}
* (its {@code release} callback must be null). Exporting into a stream that already holds a
* producer is rejected with an {@link IllegalArgumentException}, because overwriting the struct
* would drop the existing {@code release} callback and leak the first producer. The caller owns
* the stream and is responsible for closing it; the release callback installed by this call
* routes back through Lance's native side.
*
* <p>The provided stream must not be shared across concurrent exports. An {@code
* ArrowArrayStream} is a plain C struct in caller-owned memory with no internal synchronization,
* so a single stream must be exported into, then drained, by one thread at a time. The
* already-populated check above guards the sequential "export twice" mistake, but it cannot make
* two concurrent exports into the same struct safe — that is a caller-side data race on
* caller-owned memory, the same contract as Arrow's C Data Interface itself. Use a separate
* stream per concurrent export.
*
* <p>Example (caller on its own Arrow version / allocator):
*
* <pre>{@code
* try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
* scanner.exportArrowStream(stream.memoryAddress());
* try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
* while (reader.loadNextBatch()) {
* VectorSchemaRoot batch = reader.getVectorSchemaRoot();
* // ...
* }
* }
* }
* }</pre>
*
* @param streamAddress the memory address of a freshly-allocated, empty {@code ArrowArrayStream}
* to populate
* @throws IllegalArgumentException if the scanner is closed or the stream is already populated
* @throws IOException if the native scan fails to start
*/
public void exportArrowStream(long streamAddress) throws IOException {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed");
openStream(streamAddress);
}
}

private native void openStream(long streamAddress) throws IOException;

@Override
Expand Down
Loading
Loading