Skip to content

feat(java): expose ArrowArrayStream export on LanceScanner#7259

Open
sezruby wants to merge 7 commits into
lance-format:mainfrom
sezruby:feat-java-export-arrow-stream
Open

feat(java): expose ArrowArrayStream export on LanceScanner#7259
sezruby wants to merge 7 commits into
lance-format:mainfrom
sezruby:feat-java-export-arrow-stream

Conversation

@sezruby

@sezruby sezruby commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Summary

Add LanceScanner#exportArrowStream(ArrowArrayStream) — a public wrapper around the existing private native openStream(long) JNI call. Lets callers populate a stream they allocated themselves instead of going through scanBatches(), which immediately imports the result into a Java ArrowReader backed by Lance's BufferAllocator.

Why

Consumers loaded under a different classloader and/or pinned to a different Apache Arrow version cannot safely share org.apache.arrow.vector.* classes with Lance — the JVM treats them as distinct types even when the bytecode is identical. The C Data Interface struct is stable across Arrow versions, so handing the C struct's memory address across the boundary is the only correct integration shape.

A concrete consumer is the gluten-spark / Velox integration tracked at apache/gluten#12263. gluten-spark builds against Arrow 15 (matching what Spark 3.5 ships and Velox uses); Lance Java SDK is on Arrow 18. With this method, gluten can:

try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(glutenAllocator)) {
  scanner.exportArrowStream(stream);
  try (ArrowReader reader = Data.importArrayStream(glutenAllocator, stream)) {
    // import each batch into Velox via gluten's own Arrow 15 stack
  }
}

…where glutenAllocator is a Spark-task-managed BufferAllocator (ArrowReservationListener plumbing for memory accounting). Lance never sees Java Arrow on this side; ownership stays with the caller via the C Data Interface release callback.

What changed

  • LanceScanner#exportArrowStream(ArrowArrayStream) — new public method, ~7 lines + Javadoc with usage example. Mirrors the body of scanBatches() minus the local stream allocation and the Data.importArrayStream step.
  • No native code touched. The underlying JNI hook already existed; it was just not reachable from outside the class.
  • Test testDatasetScannerExportArrowStream exercises the full path: caller allocates the C stream from its own RootAllocator, scanner fills the C struct, caller imports into an ArrowReader and validates batch contents (40 rows over 2 batches of 20).

Backwards compatibility

Pure addition. scanBatches(), schema(), countRows(), getStats(), close() all unchanged. No native ABI change.

Test plan

  • ./mvnw test -Dtest=ScannerTest#testDatasetScannerExportArrowStream — passes locally (Java compile + spotless clean; full test run depends on a working lance-jni Rust build, which had an unrelated aws-smithy-types registry issue on my machine, so I'm relying on CI for the JNI-linked verification).
  • Existing testDatasetScannerColumns covers the scanBatches() path so any regression in the shared openStream JNI call would surface there.

Add public LanceScanner#exportArrowStream(ArrowArrayStream) wrapping the
existing private native openStream(long) call. Lets callers populate a
stream they allocated themselves (typically from their own BufferAllocator)
instead of going through scanBatches(), which immediately imports into a
Java ArrowReader backed by Lance's allocator.

The motivation is consumers loaded under a different classloader / pinned
to a different Apache Arrow version. Sharing org.apache.arrow.vector.*
classes across classloader boundaries is not safe, but the C Data Interface
struct is stable across Arrow versions — so handing the C struct's memory
address through is the only correct integration boundary.

A concrete consumer is the ongoing gluten-spark/Velox integration tracked
at apache/gluten#12263, which needs to import Lance scan output into its
own Arrow 15 + Velox runtime; gluten-spark is built against Arrow 15 while
Lance is on Arrow 18.

Test exercises the full path end-to-end: caller allocates a stream from
its own RootAllocator, scanner fills the C struct, caller imports into an
ArrowReader and validates batch contents.
@github-actions github-actions Bot added A-java Java bindings + JNI enhancement New feature or request labels Jun 12, 2026
@sezruby

sezruby commented Jun 14, 2026

Copy link
Copy Markdown
Contributor Author

@hamersaw @jackye1995 Could you review the PR? The PR is to support Lance Reader in Gluten/Spark. I'll open a lance-spark PR after this PR is merged. Thanks!

@sezruby

sezruby commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Gentle ping — @wjones127 would you have a chance to take a look? Small Java-only change exposing the existing openStream JNI as a public exportArrowStream(ArrowArrayStream) on LanceScanner, same shape as the recent Java API exposures. (CI red is the unrelated aws-smithy-types/time E0119 issue from #7255, not this change.) Thanks!

* @param stream the caller-allocated stream to populate
* @throws IOException if the native scan fails to start
*/
public void exportArrowStream(ArrowArrayStream stream) throws IOException {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public API still requires Lance's ArrowArrayStream Java type, so callers using a different Arrow version or classloader cannot pass their own stream despite this being the integration boundary the method documents.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this defeated the method's whole purpose. Changed the signature to take a raw long streamAddress instead of a typed ArrowArrayStream. The caller now allocates the stream with its own Arrow runtime and passes stream.memoryAddress(), so only the address crosses into Lance and the two sides never share a Java Arrow type. Documented the reasoning in the Javadoc. Fixed in f725d96.

Preconditions.checkNotNull(stream);
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed");
openStream(stream.memoryAddress());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writes into the provided C stream without rejecting an already-populated stream, so a second export can overwrite the existing release callback and leak the first producer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. inner_open_stream now reads the existing struct's release field and rejects with IllegalArgumentException if it's already set (also rejects a null address). A freshly-allocated ArrowArrayStream has a null release per the C Data Interface, so that's the "empty" contract. The read is done unaligned via addr_of! without forming a reference, since the struct lives in a possibly-unaligned ArrowBuf (same reason the write below uses write_unaligned). Added a test that exports twice into the same stream and asserts the second call throws while the first producer stays intact and drainable. Fixed in f725d96.

FieldVector fieldVector = fieldVectors.get(0);
assertEquals(
ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID());
assertEquals(batchRows, fieldVector.getValueCount());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumes every non-strict scan batch has exactly the requested size, so a valid scanner change that returns smaller batches would fail this API test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that was brittle. Reworked the assertions into a shared drainIdStream helper that asserts only batch size <= requested size and that the full row set is correct, without assuming any batch count or that batches are full. Applied across all the new cases. Fixed in f725d96.

Review feedback from @Xuanwo on lance-format#7259:

1. Take a raw `long streamAddress` instead of a typed `ArrowArrayStream`.
   A typed parameter is `org.apache.arrow.c.ArrowArrayStream` loaded by
   Lance's classloader / Arrow version. The whole point of the method is to
   serve callers on a *different* Arrow version or classloader (Spark + a
   native engine bundling its own Arrow), who cannot construct that exact
   type and would fail at the boundary the method exists to cross. The C
   Data Interface ABI is version-stable, so only the address crosses into
   Lance and the two sides stay decoupled.

2. Reject an already-populated stream. `openStream` writes the C struct in
   place with `ptr::write_unaligned`, which runs no destructor on the prior
   contents; if the caller's stream already had a `release` callback,
   overwriting it would leak the first producer. The native side now reads
   only the `release` field (unaligned, via `addr_of!`, no reference formed
   into the possibly-misaligned ArrowBuf) and rejects with
   IllegalArgumentException when it is non-null. Also rejects a null address.

3. Tests no longer assume an exact per-batch row count (batch size is a
   scanner hint, not a guarantee). The shared drain helper asserts only that
   no batch exceeds the requested size and that the full row set is correct.

Expanded test coverage in ScannerTest:
- basic export with batch-size-agnostic assertions
- multiple fragments (4 fragments, batch size not a divisor of fragment size)
- filter pushdown (id < 20)
- limit + offset
- projection (single column, schema asserted)
- empty result
- rejects exporting into an already-populated stream (first producer stays
  intact and drainable)
- rejects null address
- rejects a closed scanner
// `release` field through an unaligned read: `addr_of!` computes the field address without
// creating an intermediate (mis)aligned reference, and the field is an `Option<fn>` which is
// `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched.
let release_is_set = unsafe {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The populated-stream guard checks the release callback before constructing and writing the new stream, so two concurrent exports to the same empty stream can both pass the check and then overwrite each other. That can leak the first producer and leave the caller draining whichever stream won the race.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that the check-then-write isn't atomic against a concurrent second export into the same stream. I don't think this one can be fixed on the Lance side, though: the ArrowArrayStream is a plain C struct in caller-owned memory with no synchronization, so two concurrent exports into the same struct are a data race regardless of what the guard does — the same constraint Arrow's C Data Interface itself imposes (a stream is single-producer, drained by one consumer). The guard's purpose is to catch the sequential "export twice into the same stream" mistake, which it does.

Rather than imply a safety we can't provide, I documented the contract on exportArrowStream: the stream must not be shared across concurrent exports; use a separate stream per export. (scanBatches() has the same shape — it allocates a fresh stream each call.) Let me know if you'd prefer a different framing. Updated in bc42f5c.

List<Integer> ids = new ArrayList<>();
try (ArrowReader reader = Data.importArrayStream(allocator, stream)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
while (reader.loadNextBatch()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema assertions only run after a batch is loaded, so the empty-result case never validates the exported stream schema. A regression that returns the wrong schema for zero-row scans would still pass.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved the schema assertions in the drainIdStream helper to run on the imported reader before the first loadNextBatch(), so they execute even when the scan returns zero batches — the empty-result test now validates the zero-row schema. (ArrowReader#getVectorSchemaRoot() exposes the schema as soon as the stream is imported, before any batch is loaded.) Fixed in bc42f5c.

scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, batchRows);
assertEquals(totalRows, ids.size());
Collections.sort(ids);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-fragment export test sorts the collected ids before asserting them, so it would pass even if the default ordered scan returned fragments out of order.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — sorting defeated the point of a multi-fragment ordering test. Removed Collections.sort from both the multi-fragment and filter tests; they now assert the exact scan order (0..39 / 0..19), matching how the existing validateScanResults asserts order without sorting.

While here I also removed an unsound assertion in the shared helper that assumed every batch was <= the requested batch size — the export tests scan with the default (non-strict) batch size, which only guarantees the total row count, so that was the same brittleness in another spot. Added testExportArrowStreamStrictBatchSize to cover the per-batch-size contract properly via strictBatchSize(true), plus testExportArrowStreamPreservesNulls for null round-tripping. Fixed in bc42f5c.

sezruby and others added 3 commits June 21, 2026 09:28
Follow-up to @Xuanwo's second review pass on lance-format#7259.

- Empty-result schema (review): drainIdStream now asserts the projected
  schema on the imported reader *before* the first loadNextBatch(), so the
  empty-result test actually validates the zero-batch schema. Previously the
  schema assertions were inside the batch loop and never ran for a zero-row
  scan, so a wrong-schema-on-empty regression would have passed.

- Multi-fragment / filter ordering (review): removed Collections.sort from
  the multi-fragment and filter export tests. Sorting masked out-of-order
  fragment results; the baseline validateScanResults asserts strict scan
  order without sorting, so these now assert the exact 0..39 / 0..19 order.

- Concurrency (review): documented on exportArrowStream that the stream must
  not be shared across concurrent exports. The already-populated guard
  catches the sequential "export twice" mistake, but two concurrent exports
  into one caller-owned C struct are a caller-side data race that the guard
  cannot make safe — same contract as Arrow's C Data Interface. Use a
  separate stream per concurrent export.

- Latent bug in the test helper: drainIdStream asserted every batch was
  <= the requested batch size, but the export tests scan with the default
  (non-strict) batch size, which only guarantees the total row count, not
  per-batch sizes (see testStrictBatchSize). Removed that unsound assertion
  and the now-unused maxBatchRows parameter.

New tests:
- testExportArrowStreamPreservesNulls: nulls in id and name survive the
  C-data round-trip (validity bitmaps), via writeSortByDataset.
- testExportArrowStreamStrictBatchSize: with strictBatchSize(true) the
  exported stream splits into batches no larger than the requested size and
  still returns every row in order — the one place per-batch size is part of
  the contract.

Added Javadoc to every export test and the drain helper explaining what each
case guards and why (no-sort, schema-before-loop, single-producer).
The typos linter parses the `(mis)` fragment in `(mis)aligned` as a standalone
word and flags it as a misspelling of `miss`/`mist`. Reword to
"possibly-unaligned reference" to keep the same meaning without the
parenthetical token. Comment-only change.
@sezruby

sezruby commented Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

@Xuanwo all review comments addressed across f725d96 and bc42f5c6, CI green. Mind taking another look when you have a chance? (The concurrency one I handled via docs rather than code — flag me if you'd prefer otherwise.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-java Java bindings + JNI enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants