feat(java): expose ArrowArrayStream export on LanceScanner#7259
feat(java): expose ArrowArrayStream export on LanceScanner#7259sezruby wants to merge 7 commits into
Conversation
Add public LanceScanner#exportArrowStream(ArrowArrayStream) wrapping the existing private native openStream(long) call. Lets callers populate a stream they allocated themselves (typically from their own BufferAllocator) instead of going through scanBatches(), which immediately imports into a Java ArrowReader backed by Lance's allocator. The motivation is consumers loaded under a different classloader / pinned to a different Apache Arrow version. Sharing org.apache.arrow.vector.* classes across classloader boundaries is not safe, but the C Data Interface struct is stable across Arrow versions — so handing the C struct's memory address through is the only correct integration boundary. A concrete consumer is the ongoing gluten-spark/Velox integration tracked at apache/gluten#12263, which needs to import Lance scan output into its own Arrow 15 + Velox runtime; gluten-spark is built against Arrow 15 while Lance is on Arrow 18. Test exercises the full path end-to-end: caller allocates a stream from its own RootAllocator, scanner fills the C struct, caller imports into an ArrowReader and validates batch contents.
|
@hamersaw @jackye1995 Could you review the PR? The PR is to support Lance Reader in Gluten/Spark. I'll open a lance-spark PR after this PR is merged. Thanks! |
|
Gentle ping — @wjones127 would you have a chance to take a look? Small Java-only change exposing the existing |
| * @param stream the caller-allocated stream to populate | ||
| * @throws IOException if the native scan fails to start | ||
| */ | ||
| public void exportArrowStream(ArrowArrayStream stream) throws IOException { |
There was a problem hiding this comment.
The public API still requires Lance's ArrowArrayStream Java type, so callers using a different Arrow version or classloader cannot pass their own stream despite this being the integration boundary the method documents.
There was a problem hiding this comment.
Good catch — this defeated the method's whole purpose. Changed the signature to take a raw long streamAddress instead of a typed ArrowArrayStream. The caller now allocates the stream with its own Arrow runtime and passes stream.memoryAddress(), so only the address crosses into Lance and the two sides never share a Java Arrow type. Documented the reasoning in the Javadoc. Fixed in f725d96.
| Preconditions.checkNotNull(stream); | ||
| try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { | ||
| Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); | ||
| openStream(stream.memoryAddress()); |
There was a problem hiding this comment.
This writes into the provided C stream without rejecting an already-populated stream, so a second export can overwrite the existing release callback and leak the first producer.
There was a problem hiding this comment.
Fixed. inner_open_stream now reads the existing struct's release field and rejects with IllegalArgumentException if it's already set (also rejects a null address). A freshly-allocated ArrowArrayStream has a null release per the C Data Interface, so that's the "empty" contract. The read is done unaligned via addr_of! without forming a reference, since the struct lives in a possibly-unaligned ArrowBuf (same reason the write below uses write_unaligned). Added a test that exports twice into the same stream and asserts the second call throws while the first producer stays intact and drainable. Fixed in f725d96.
| FieldVector fieldVector = fieldVectors.get(0); | ||
| assertEquals( | ||
| ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); | ||
| assertEquals(batchRows, fieldVector.getValueCount()); |
There was a problem hiding this comment.
This test assumes every non-strict scan batch has exactly the requested size, so a valid scanner change that returns smaller batches would fail this API test.
There was a problem hiding this comment.
Agreed, that was brittle. Reworked the assertions into a shared drainIdStream helper that asserts only batch size <= requested size and that the full row set is correct, without assuming any batch count or that batches are full. Applied across all the new cases. Fixed in f725d96.
Review feedback from @Xuanwo on lance-format#7259: 1. Take a raw `long streamAddress` instead of a typed `ArrowArrayStream`. A typed parameter is `org.apache.arrow.c.ArrowArrayStream` loaded by Lance's classloader / Arrow version. The whole point of the method is to serve callers on a *different* Arrow version or classloader (Spark + a native engine bundling its own Arrow), who cannot construct that exact type and would fail at the boundary the method exists to cross. The C Data Interface ABI is version-stable, so only the address crosses into Lance and the two sides stay decoupled. 2. Reject an already-populated stream. `openStream` writes the C struct in place with `ptr::write_unaligned`, which runs no destructor on the prior contents; if the caller's stream already had a `release` callback, overwriting it would leak the first producer. The native side now reads only the `release` field (unaligned, via `addr_of!`, no reference formed into the possibly-misaligned ArrowBuf) and rejects with IllegalArgumentException when it is non-null. Also rejects a null address. 3. Tests no longer assume an exact per-batch row count (batch size is a scanner hint, not a guarantee). The shared drain helper asserts only that no batch exceeds the requested size and that the full row set is correct. Expanded test coverage in ScannerTest: - basic export with batch-size-agnostic assertions - multiple fragments (4 fragments, batch size not a divisor of fragment size) - filter pushdown (id < 20) - limit + offset - projection (single column, schema asserted) - empty result - rejects exporting into an already-populated stream (first producer stays intact and drainable) - rejects null address - rejects a closed scanner
| // `release` field through an unaligned read: `addr_of!` computes the field address without | ||
| // creating an intermediate (mis)aligned reference, and the field is an `Option<fn>` which is | ||
| // `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched. | ||
| let release_is_set = unsafe { |
There was a problem hiding this comment.
The populated-stream guard checks the release callback before constructing and writing the new stream, so two concurrent exports to the same empty stream can both pass the check and then overwrite each other. That can leak the first producer and leave the caller draining whichever stream won the race.
There was a problem hiding this comment.
You're right that the check-then-write isn't atomic against a concurrent second export into the same stream. I don't think this one can be fixed on the Lance side, though: the ArrowArrayStream is a plain C struct in caller-owned memory with no synchronization, so two concurrent exports into the same struct are a data race regardless of what the guard does — the same constraint Arrow's C Data Interface itself imposes (a stream is single-producer, drained by one consumer). The guard's purpose is to catch the sequential "export twice into the same stream" mistake, which it does.
Rather than imply a safety we can't provide, I documented the contract on exportArrowStream: the stream must not be shared across concurrent exports; use a separate stream per export. (scanBatches() has the same shape — it allocates a fresh stream each call.) Let me know if you'd prefer a different framing. Updated in bc42f5c.
| List<Integer> ids = new ArrayList<>(); | ||
| try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { | ||
| VectorSchemaRoot root = reader.getVectorSchemaRoot(); | ||
| while (reader.loadNextBatch()) { |
There was a problem hiding this comment.
The schema assertions only run after a batch is loaded, so the empty-result case never validates the exported stream schema. A regression that returns the wrong schema for zero-row scans would still pass.
There was a problem hiding this comment.
Good catch. Moved the schema assertions in the drainIdStream helper to run on the imported reader before the first loadNextBatch(), so they execute even when the scan returns zero batches — the empty-result test now validates the zero-row schema. (ArrowReader#getVectorSchemaRoot() exposes the schema as soon as the stream is imported, before any batch is loaded.) Fixed in bc42f5c.
| scanner.exportArrowStream(stream.memoryAddress()); | ||
| List<Integer> ids = drainIdStream(allocator, stream, batchRows); | ||
| assertEquals(totalRows, ids.size()); | ||
| Collections.sort(ids); |
There was a problem hiding this comment.
The multi-fragment export test sorts the collected ids before asserting them, so it would pass even if the default ordered scan returned fragments out of order.
There was a problem hiding this comment.
Agreed — sorting defeated the point of a multi-fragment ordering test. Removed Collections.sort from both the multi-fragment and filter tests; they now assert the exact scan order (0..39 / 0..19), matching how the existing validateScanResults asserts order without sorting.
While here I also removed an unsound assertion in the shared helper that assumed every batch was <= the requested batch size — the export tests scan with the default (non-strict) batch size, which only guarantees the total row count, so that was the same brittleness in another spot. Added testExportArrowStreamStrictBatchSize to cover the per-batch-size contract properly via strictBatchSize(true), plus testExportArrowStreamPreservesNulls for null round-tripping. Fixed in bc42f5c.
Follow-up to @Xuanwo's second review pass on lance-format#7259. - Empty-result schema (review): drainIdStream now asserts the projected schema on the imported reader *before* the first loadNextBatch(), so the empty-result test actually validates the zero-batch schema. Previously the schema assertions were inside the batch loop and never ran for a zero-row scan, so a wrong-schema-on-empty regression would have passed. - Multi-fragment / filter ordering (review): removed Collections.sort from the multi-fragment and filter export tests. Sorting masked out-of-order fragment results; the baseline validateScanResults asserts strict scan order without sorting, so these now assert the exact 0..39 / 0..19 order. - Concurrency (review): documented on exportArrowStream that the stream must not be shared across concurrent exports. The already-populated guard catches the sequential "export twice" mistake, but two concurrent exports into one caller-owned C struct are a caller-side data race that the guard cannot make safe — same contract as Arrow's C Data Interface. Use a separate stream per concurrent export. - Latent bug in the test helper: drainIdStream asserted every batch was <= the requested batch size, but the export tests scan with the default (non-strict) batch size, which only guarantees the total row count, not per-batch sizes (see testStrictBatchSize). Removed that unsound assertion and the now-unused maxBatchRows parameter. New tests: - testExportArrowStreamPreservesNulls: nulls in id and name survive the C-data round-trip (validity bitmaps), via writeSortByDataset. - testExportArrowStreamStrictBatchSize: with strictBatchSize(true) the exported stream splits into batches no larger than the requested size and still returns every row in order — the one place per-batch size is part of the contract. Added Javadoc to every export test and the drain helper explaining what each case guards and why (no-sort, schema-before-loop, single-producer).
The typos linter parses the `(mis)` fragment in `(mis)aligned` as a standalone word and flags it as a misspelling of `miss`/`mist`. Reword to "possibly-unaligned reference" to keep the same meaning without the parenthetical token. Comment-only change.
|
@Xuanwo all review comments addressed across |
Summary
Add
LanceScanner#exportArrowStream(ArrowArrayStream)— a public wrapper around the existing private nativeopenStream(long)JNI call. Lets callers populate a stream they allocated themselves instead of going throughscanBatches(), which immediately imports the result into a JavaArrowReaderbacked by Lance'sBufferAllocator.Why
Consumers loaded under a different classloader and/or pinned to a different Apache Arrow version cannot safely share
org.apache.arrow.vector.*classes with Lance — the JVM treats them as distinct types even when the bytecode is identical. The C Data Interface struct is stable across Arrow versions, so handing the C struct's memory address across the boundary is the only correct integration shape.A concrete consumer is the gluten-spark / Velox integration tracked at apache/gluten#12263. gluten-spark builds against Arrow 15 (matching what Spark 3.5 ships and Velox uses); Lance Java SDK is on Arrow 18. With this method, gluten can:
…where
glutenAllocatoris a Spark-task-managedBufferAllocator(ArrowReservationListenerplumbing for memory accounting). Lance never sees Java Arrow on this side; ownership stays with the caller via the C Data Interface release callback.What changed
LanceScanner#exportArrowStream(ArrowArrayStream)— new public method, ~7 lines + Javadoc with usage example. Mirrors the body ofscanBatches()minus the local stream allocation and theData.importArrayStreamstep.testDatasetScannerExportArrowStreamexercises the full path: caller allocates the C stream from its ownRootAllocator, scanner fills the C struct, caller imports into anArrowReaderand validates batch contents (40 rows over 2 batches of 20).Backwards compatibility
Pure addition.
scanBatches(),schema(),countRows(),getStats(),close()all unchanged. No native ABI change.Test plan
./mvnw test -Dtest=ScannerTest#testDatasetScannerExportArrowStream— passes locally (Java compile + spotless clean; full test run depends on a workinglance-jniRust build, which had an unrelatedaws-smithy-typesregistry issue on my machine, so I'm relying on CI for the JNI-linked verification).testDatasetScannerColumnscovers thescanBatches()path so any regression in the sharedopenStreamJNI call would surface there.