From 576589ab963c2ac7c597fbc4a4f27252f70562da Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Fri, 12 Jun 2026 16:08:26 -0700 Subject: [PATCH 1/4] feat(java): expose ArrowArrayStream export on LanceScanner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add public LanceScanner#exportArrowStream(ArrowArrayStream) wrapping the existing private native openStream(long) call. Lets callers populate a stream they allocated themselves (typically from their own BufferAllocator) instead of going through scanBatches(), which immediately imports into a Java ArrowReader backed by Lance's allocator. The motivation is consumers loaded under a different classloader / pinned to a different Apache Arrow version. Sharing org.apache.arrow.vector.* classes across classloader boundaries is not safe, but the C Data Interface struct is stable across Arrow versions — so handing the C struct's memory address through is the only correct integration boundary. A concrete consumer is the ongoing gluten-spark/Velox integration tracked at apache/gluten#12263, which needs to import Lance scan output into its own Arrow 15 + Velox runtime; gluten-spark is built against Arrow 15 while Lance is on Arrow 18. Test exercises the full path end-to-end: caller allocates a stream from its own RootAllocator, scanner fills the C struct, caller imports into an ArrowReader and validates batch contents. --- .../main/java/org/lance/ipc/LanceScanner.java | 38 +++++++++++++++ java/src/test/java/org/lance/ScannerTest.java | 47 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 3a413e0ccfd..a6db09f2ea8 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -146,6 +146,44 @@ public ArrowReader scanBatches() { } } + /** + * Populate a caller-provided {@link ArrowArrayStream} with this scan's results, using the C Data + * Interface release callback to return ownership. + * + *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created: the caller + * supplies a stream that they allocated (typically from their own {@link + * org.apache.arrow.memory.BufferAllocator}), and Lance writes the C struct directly into it. This + * lets a downstream consumer drive the read loop with their own Arrow runtime, which is required + * when the caller and Lance are loaded by different classloaders / different Arrow versions. + * + *

The caller owns the stream and is responsible for closing it. The release callback installed + * on the C struct routes back through Lance's native side. + * + *

Example: + * + *

{@code
+   * try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
+   *   scanner.exportArrowStream(stream);
+   *   try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
+   *     while (reader.loadNextBatch()) {
+   *       VectorSchemaRoot batch = reader.getVectorSchemaRoot();
+   *       // ...
+   *     }
+   *   }
+   * }
+   * }
+ * + * @param stream the caller-allocated stream to populate + * @throws IOException if the native scan fails to start + */ + public void exportArrowStream(ArrowArrayStream stream) throws IOException { + Preconditions.checkNotNull(stream); + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); + openStream(stream.memoryAddress()); + } + } + private native void openStream(long streamAddress) throws IOException; @Override diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java index 00434034b64..82452385e1a 100644 --- a/java/src/test/java/org/lance/ScannerTest.java +++ b/java/src/test/java/org/lance/ScannerTest.java @@ -22,6 +22,8 @@ import org.lance.ipc.ScanOptions; import org.lance.ipc.ScanStats; +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -158,6 +160,51 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception { } } + @Test + void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("dataset_scanner_export_stream").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 40; + int batchRows = 20; + try (Dataset dataset = testDataset.write(1, totalRows)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchRows) + .columns(Arrays.asList("id")) + .build())) { + // Caller allocates the C stream from their own allocator; the scanner only fills the + // C struct. This is the path callers loaded by a different classloader use to avoid + // sharing Java Arrow vector classes with Lance. + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + int index = 0; + while (reader.loadNextBatch()) { + List fieldVectors = root.getFieldVectors(); + assertEquals(1, fieldVectors.size()); + FieldVector fieldVector = fieldVectors.get(0); + assertEquals( + ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); + assertEquals(batchRows, fieldVector.getValueCount()); + IntVector vector = (IntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + assertEquals(index, vector.get(i)); + index++; + } + } + assertEquals(totalRows, index); + } + } + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString(); From f725d96f4499626dcbbb54ee472ef21d1a473795 Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Fri, 19 Jun 2026 18:22:18 -0700 Subject: [PATCH 2/4] refactor(java): address review on exportArrowStream Review feedback from @Xuanwo on #7259: 1. Take a raw `long streamAddress` instead of a typed `ArrowArrayStream`. A typed parameter is `org.apache.arrow.c.ArrowArrayStream` loaded by Lance's classloader / Arrow version. The whole point of the method is to serve callers on a *different* Arrow version or classloader (Spark + a native engine bundling its own Arrow), who cannot construct that exact type and would fail at the boundary the method exists to cross. The C Data Interface ABI is version-stable, so only the address crosses into Lance and the two sides stay decoupled. 2. Reject an already-populated stream. `openStream` writes the C struct in place with `ptr::write_unaligned`, which runs no destructor on the prior contents; if the caller's stream already had a `release` callback, overwriting it would leak the first producer. The native side now reads only the `release` field (unaligned, via `addr_of!`, no reference formed into the possibly-misaligned ArrowBuf) and rejects with IllegalArgumentException when it is non-null. Also rejects a null address. 3. Tests no longer assume an exact per-batch row count (batch size is a scanner hint, not a guarantee). The shared drain helper asserts only that no batch exceeds the requested size and that the full row set is correct. Expanded test coverage in ScannerTest: - basic export with batch-size-agnostic assertions - multiple fragments (4 fragments, batch size not a divisor of fragment size) - filter pushdown (id < 20) - limit + offset - projection (single column, schema asserted) - empty result - rejects exporting into an already-populated stream (first producer stays intact and drainable) - rejects null address - rejects a closed scanner --- java/lance-jni/src/blocking_scanner.rs | 31 ++ .../main/java/org/lance/ipc/LanceScanner.java | 44 ++- java/src/test/java/org/lance/ScannerTest.java | 268 ++++++++++++++++-- 3 files changed, 309 insertions(+), 34 deletions(-) diff --git a/java/lance-jni/src/blocking_scanner.rs b/java/lance-jni/src/blocking_scanner.rs index 335cb2a4fa3..c94b51ebe7e 100644 --- a/java/lance-jni/src/blocking_scanner.rs +++ b/java/lance-jni/src/blocking_scanner.rs @@ -589,6 +589,37 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_openStream( } fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) -> Result<()> { + if stream_addr == 0 { + return Err(Error::input_error( + "ArrowArrayStream address must not be null".to_string(), + )); + } + + // Reject a stream that already holds a producer. We write the C struct in place below with + // `ptr::write_unaligned`, which does not run any destructor on the previous contents. If the + // caller passed a stream whose `release` callback is already set (e.g. it was populated by an + // earlier export and not yet released), overwriting it would drop that callback and leak the + // first producer's resources. A freshly-allocated `ArrowArrayStream` has a null `release`, per + // the Arrow C Data Interface, so requiring `release == None` is the contract for "empty". + // + // The struct is allocated by Arrow Java inside an ArrowBuf and is not guaranteed to be aligned + // (hence `write_unaligned` below), so we must not form a reference to it. We read only the + // `release` field through an unaligned read: `addr_of!` computes the field address without + // creating an intermediate (mis)aligned reference, and the field is an `Option` which is + // `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched. + let release_is_set = unsafe { + let stream_ptr = stream_addr as *const FFI_ArrowArrayStream; + let release = std::ptr::read_unaligned(std::ptr::addr_of!((*stream_ptr).release)); + release.is_some() + }; + if release_is_set { + return Err(Error::input_error( + "ArrowArrayStream is already populated; exporting into it would leak the existing \ + producer. Pass a freshly-allocated, empty stream." + .to_string(), + )); + } + let record_batch_stream = { let scanner_guard = unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?; diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index a6db09f2ea8..15b29557719 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -147,23 +147,36 @@ public ArrowReader scanBatches() { } /** - * Populate a caller-provided {@link ArrowArrayStream} with this scan's results, using the C Data - * Interface release callback to return ownership. + * Export this scan's results into a caller-owned Arrow C stream identified by its memory address, + * using the Arrow C Data Interface release callback to transfer ownership. * - *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created: the caller - * supplies a stream that they allocated (typically from their own {@link - * org.apache.arrow.memory.BufferAllocator}), and Lance writes the C struct directly into it. This - * lets a downstream consumer drive the read loop with their own Arrow runtime, which is required - * when the caller and Lance are loaded by different classloaders / different Arrow versions. + *

This method intentionally takes a raw {@code streamAddress} (an {@code ArrowArrayStream} + * memory address) rather than a Java {@link ArrowArrayStream} object. A typed parameter would be + * an {@code org.apache.arrow.c.ArrowArrayStream} loaded by Lance's classloader / Arrow + * version; a caller running a different Arrow version (or under a different classloader, e.g. + * Spark + a native engine bundling its own Arrow) cannot construct that exact type and would hit + * a {@code ClassCastException}/{@code NoSuchMethodError} at the very boundary this method exists + * to cross. The C Data Interface ABI is stable across Arrow versions, so passing the C struct's + * address keeps the two sides fully decoupled: the caller allocates the stream with its + * own Arrow runtime and only the {@code long} address crosses into Lance. See gluten#12263 + * for the cross-Arrow-version integration that motivated this. * - *

The caller owns the stream and is responsible for closing it. The release callback installed - * on the C struct routes back through Lance's native side. + *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created on Lance's side: + * Lance writes the C struct directly at {@code streamAddress} and the caller drives the read loop + * with its own Arrow runtime. * - *

Example: + *

The {@code streamAddress} must point to a freshly-allocated, empty {@code ArrowArrayStream} + * (its {@code release} callback must be null). Exporting into a stream that already holds a + * producer is rejected with an {@link IllegalArgumentException}, because overwriting the struct + * would drop the existing {@code release} callback and leak the first producer. The caller owns + * the stream and is responsible for closing it; the release callback installed by this call + * routes back through Lance's native side. + * + *

Example (caller on its own Arrow version / allocator): * *

{@code
    * try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
-   *   scanner.exportArrowStream(stream);
+   *   scanner.exportArrowStream(stream.memoryAddress());
    *   try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
    *     while (reader.loadNextBatch()) {
    *       VectorSchemaRoot batch = reader.getVectorSchemaRoot();
@@ -173,14 +186,15 @@ public ArrowReader scanBatches() {
    * }
    * }
* - * @param stream the caller-allocated stream to populate + * @param streamAddress the memory address of a freshly-allocated, empty {@code ArrowArrayStream} + * to populate + * @throws IllegalArgumentException if the scanner is closed or the stream is already populated * @throws IOException if the native scan fails to start */ - public void exportArrowStream(ArrowArrayStream stream) throws IOException { - Preconditions.checkNotNull(stream); + public void exportArrowStream(long streamAddress) throws IOException { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); - openStream(stream.memoryAddress()); + openStream(streamAddress); } } diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java index 82452385e1a..fa16c482189 100644 --- a/java/src/test/java/org/lance/ScannerTest.java +++ b/java/src/test/java/org/lance/ScannerTest.java @@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ScannerTest { @@ -160,9 +161,41 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception { } } + /** + * Reads every batch from a caller-owned C stream populated by {@link + * LanceScanner#exportArrowStream(long)} and returns the {@code id} values in stream order. + * + *

Asserts the projected schema is exactly {@code id: int32} and that no batch exceeds {@code + * maxBatchRows}, but does not assume any particular batch count or that batches are full — batch + * size is a scanner hint, not a guarantee, so over-asserting on it makes the test brittle. + */ + private static List drainIdStream( + BufferAllocator allocator, ArrowArrayStream stream, int maxBatchRows) throws IOException { + List ids = new ArrayList<>(); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + List fieldVectors = root.getFieldVectors(); + assertEquals(1, fieldVectors.size()); + FieldVector fieldVector = fieldVectors.get(0); + assertEquals("id", fieldVector.getField().getName()); + assertEquals(ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); + int rowsInBatch = fieldVector.getValueCount(); + assertTrue( + rowsInBatch <= maxBatchRows, + "batch of " + rowsInBatch + " rows exceeded requested batch size " + maxBatchRows); + IntVector vector = (IntVector) fieldVector; + for (int i = 0; i < rowsInBatch; i++) { + ids.add(vector.get(i)); + } + } + } + return ids; + } + @Test - void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception { - String datasetPath = tempDir.resolve("dataset_scanner_export_stream").toString(); + void testExportArrowStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_basic").toString(); try (BufferAllocator allocator = new RootAllocator()) { TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath); @@ -176,28 +209,133 @@ void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception .batchSize(batchRows) .columns(Arrays.asList("id")) .build())) { - // Caller allocates the C stream from their own allocator; the scanner only fills the - // C struct. This is the path callers loaded by a different classloader use to avoid - // sharing Java Arrow vector classes with Lance. + // The caller allocates the C stream from its own allocator and passes only the memory + // address; the scanner fills the C struct in place. This is the cross-Arrow-version / + // cross-classloader boundary the API exists to serve. + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, batchRows); + assertEquals(totalRows, ids.size()); + for (int i = 0; i < totalRows; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_multi_fragment").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 40; + // maxRowsPerFile < totalRows forces multiple fragments (4 fragments of 10 rows). + List fragments = testDataset.createNewFragment(totalRows, 10); + assertEquals(4, fragments.size()); + FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); + try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) { + int batchRows = 7; // deliberately not a divisor of any fragment size + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchRows) + .columns(Arrays.asList("id")) + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, batchRows); + assertEquals(totalRows, ids.size()); + Collections.sort(ids); + for (int i = 0; i < totalRows; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_filter").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(50) + .columns(Arrays.asList("id")) + .filter("id < 20") + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 50); + assertEquals(20, ids.size()); + Collections.sort(ids); + for (int i = 0; i < 20; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_limit_offset").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(50) + .columns(Arrays.asList("id")) + .limit(5) + .offset(10) + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 50); + assertEquals(Arrays.asList(10, 11, 12, 13, 14), ids); + } + } + } + } + } + + @Test + void testExportArrowStreamProjectsRequestedColumnsOnly(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_projection").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + // Project only "name"; the exported stream's schema must contain exactly that column. + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("name")).build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { - scanner.exportArrowStream(stream); + scanner.exportArrowStream(stream.memoryAddress()); try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); - int index = 0; + assertEquals(1, root.getSchema().getFields().size()); + assertEquals("name", root.getSchema().getFields().get(0).getName()); + int rows = 0; while (reader.loadNextBatch()) { - List fieldVectors = root.getFieldVectors(); - assertEquals(1, fieldVectors.size()); - FieldVector fieldVector = fieldVectors.get(0); - assertEquals( - ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); - assertEquals(batchRows, fieldVector.getValueCount()); - IntVector vector = (IntVector) fieldVector; - for (int i = 0; i < batchRows; i++) { - assertEquals(index, vector.get(i)); - index++; - } + rows += root.getRowCount(); } - assertEquals(totalRows, index); + assertEquals(10, rows); } } } @@ -205,6 +343,98 @@ void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception } } + @Test + void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_empty").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder().columns(Arrays.asList("id")).filter("id < 0").build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 1024); + assertTrue(ids.isEmpty()); + } + } + } + } + } + + @Test + void testExportArrowStreamRejectsPopulatedStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_populated").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + // First export populates the stream and installs a release callback. + scanner.exportArrowStream(stream.memoryAddress()); + // Exporting again into the same (already-populated) stream must be rejected rather + // than silently overwriting and leaking the first producer's release callback. + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> scanner.exportArrowStream(stream.memoryAddress())); + assertTrue(ex.getMessage().toLowerCase().contains("already populated")); + // The first producer is still intact and drainable. + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + int rows = 0; + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + rows += root.getRowCount(); + } + assertEquals(40, rows); + } + } + } + } + } + } + + @Test + void testExportArrowStreamRejectsNullAddress(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_null").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) { + assertThrows(IllegalArgumentException.class, () -> scanner.exportArrowStream(0L)); + } + } + } + } + + @Test + void testExportArrowStreamRejectsClosedScanner(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_closed").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build()); + scanner.close(); + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + assertThrows( + IllegalArgumentException.class, + () -> scanner.exportArrowStream(stream.memoryAddress())); + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString(); From bc42f5c645d7b6c4376e6ab0812ddd7d2f67f9db Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Sun, 21 Jun 2026 09:28:15 -0700 Subject: [PATCH 3/4] test(java): address review round 2 on exportArrowStream tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to @Xuanwo's second review pass on #7259. - Empty-result schema (review): drainIdStream now asserts the projected schema on the imported reader *before* the first loadNextBatch(), so the empty-result test actually validates the zero-batch schema. Previously the schema assertions were inside the batch loop and never ran for a zero-row scan, so a wrong-schema-on-empty regression would have passed. - Multi-fragment / filter ordering (review): removed Collections.sort from the multi-fragment and filter export tests. Sorting masked out-of-order fragment results; the baseline validateScanResults asserts strict scan order without sorting, so these now assert the exact 0..39 / 0..19 order. - Concurrency (review): documented on exportArrowStream that the stream must not be shared across concurrent exports. The already-populated guard catches the sequential "export twice" mistake, but two concurrent exports into one caller-owned C struct are a caller-side data race that the guard cannot make safe — same contract as Arrow's C Data Interface. Use a separate stream per concurrent export. - Latent bug in the test helper: drainIdStream asserted every batch was <= the requested batch size, but the export tests scan with the default (non-strict) batch size, which only guarantees the total row count, not per-batch sizes (see testStrictBatchSize). Removed that unsound assertion and the now-unused maxBatchRows parameter. New tests: - testExportArrowStreamPreservesNulls: nulls in id and name survive the C-data round-trip (validity bitmaps), via writeSortByDataset. - testExportArrowStreamStrictBatchSize: with strictBatchSize(true) the exported stream splits into batches no larger than the requested size and still returns every row in order — the one place per-batch size is part of the contract. Added Javadoc to every export test and the drain helper explaining what each case guards and why (no-sort, schema-before-loop, single-producer). --- .../main/java/org/lance/ipc/LanceScanner.java | 8 + java/src/test/java/org/lance/ScannerTest.java | 227 +++++++++++++++--- 2 files changed, 207 insertions(+), 28 deletions(-) diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 15b29557719..fbc271e85f4 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -172,6 +172,14 @@ public ArrowReader scanBatches() { * the stream and is responsible for closing it; the release callback installed by this call * routes back through Lance's native side. * + *

The provided stream must not be shared across concurrent exports. An {@code + * ArrowArrayStream} is a plain C struct in caller-owned memory with no internal synchronization, + * so a single stream must be exported into, then drained, by one thread at a time. The + * already-populated check above guards the sequential "export twice" mistake, but it cannot make + * two concurrent exports into the same struct safe — that is a caller-side data race on + * caller-owned memory, the same contract as Arrow's C Data Interface itself. Use a separate + * stream per concurrent export. + * *

Example (caller on its own Arrow version / allocator): * *

{@code
diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java
index fa16c482189..d824d8bb13f 100644
--- a/java/src/test/java/org/lance/ScannerTest.java
+++ b/java/src/test/java/org/lance/ScannerTest.java
@@ -41,6 +41,7 @@
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -162,29 +163,34 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception {
   }
 
   /**
-   * Reads every batch from a caller-owned C stream populated by {@link
-   * LanceScanner#exportArrowStream(long)} and returns the {@code id} values in stream order.
+   * Imports a caller-owned C stream populated by {@link LanceScanner#exportArrowStream(long)} and
+   * returns the {@code id} values in the order the stream produced them.
    *
-   * 

Asserts the projected schema is exactly {@code id: int32} and that no batch exceeds {@code - * maxBatchRows}, but does not assume any particular batch count or that batches are full — batch - * size is a scanner hint, not a guarantee, so over-asserting on it makes the test brittle. + *

The projected schema is asserted to be exactly a single {@code id: int32} field, and the + * assertion is made on the imported reader before the first {@code loadNextBatch()} call + * so that it still runs for an empty (zero-batch) result — a regression that exported the wrong + * schema for an empty scan would otherwise slip through. See {@code + * org.apache.arrow.vector.ipc.ArrowReader#getVectorSchemaRoot()}, which exposes the schema as + * soon as the stream is imported. + * + *

This helper intentionally makes no assertion about per-batch row counts. The + * scanner's {@code batchSize} is only a hint unless {@code strictBatchSize(true)} is set, so the + * number of batches and the rows per batch are not part of the contract being tested here; that + * dimension is covered separately by {@link #testExportArrowStreamStrictBatchSize}. Row ordering + * and exact values are asserted by the callers against the returned list. */ - private static List drainIdStream( - BufferAllocator allocator, ArrowArrayStream stream, int maxBatchRows) throws IOException { + private static List drainIdStream(BufferAllocator allocator, ArrowArrayStream stream) + throws IOException { List ids = new ArrayList<>(); try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); + List fields = root.getSchema().getFields(); + assertEquals(1, fields.size()); + assertEquals("id", fields.get(0).getName()); + assertEquals(ArrowType.ArrowTypeID.Int, fields.get(0).getType().getTypeID()); while (reader.loadNextBatch()) { - List fieldVectors = root.getFieldVectors(); - assertEquals(1, fieldVectors.size()); - FieldVector fieldVector = fieldVectors.get(0); - assertEquals("id", fieldVector.getField().getName()); - assertEquals(ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); - int rowsInBatch = fieldVector.getValueCount(); - assertTrue( - rowsInBatch <= maxBatchRows, - "batch of " + rowsInBatch + " rows exceeded requested batch size " + maxBatchRows); - IntVector vector = (IntVector) fieldVector; + IntVector vector = (IntVector) root.getVector("id"); + int rowsInBatch = vector.getValueCount(); for (int i = 0; i < rowsInBatch; i++) { ids.add(vector.get(i)); } @@ -193,6 +199,12 @@ private static List drainIdStream( return ids; } + /** + * Happy path: a single-fragment ordered scan exported through a caller-owned C stream returns + * every row exactly once, in scan order. The caller allocates the {@link ArrowArrayStream} from + * its own allocator and passes only the memory address; the scanner fills the C struct in place. + * This is the cross-Arrow-version / cross-classloader boundary the API exists to serve. + */ @Test void testExportArrowStream(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_basic").toString(); @@ -209,12 +221,11 @@ void testExportArrowStream(@TempDir Path tempDir) throws Exception { .batchSize(batchRows) .columns(Arrays.asList("id")) .build())) { - // The caller allocates the C stream from its own allocator and passes only the memory - // address; the scanner fills the C struct in place. This is the cross-Arrow-version / - // cross-classloader boundary the API exists to serve. try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { scanner.exportArrowStream(stream.memoryAddress()); - List ids = drainIdStream(allocator, stream, batchRows); + // SimpleTestDataset writes id = 0..totalRows-1; an ordered scan must return them in + // exactly that sequence, so assert the exact ordering (no sort). + List ids = drainIdStream(allocator, stream); assertEquals(totalRows, ids.size()); for (int i = 0; i < totalRows; i++) { assertEquals(i, ids.get(i)); @@ -225,6 +236,16 @@ void testExportArrowStream(@TempDir Path tempDir) throws Exception { } } + /** + * A scan that spans multiple fragments is exported as a single C stream that concatenates the + * fragments in fragment order. {@code createNewFragment(40, 10)} produces 4 fragments of 10 rows + * (ids 0-9, 10-19, 20-29, 30-39), and an ordered scan must return 0..39 in exactly that order. + * + *

The expected ids are asserted in stream order without sorting: sorting would mask a + * regression that returned fragments out of order, which is exactly the kind of bug this test + * exists to catch. A non-divisor batch size (7) is used so batch boundaries do not line up with + * fragment boundaries, exercising the stream's batch stitching across fragments. + */ @Test void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_multi_fragment").toString(); @@ -247,11 +268,11 @@ void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Except .build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { scanner.exportArrowStream(stream.memoryAddress()); - List ids = drainIdStream(allocator, stream, batchRows); + List ids = drainIdStream(allocator, stream); assertEquals(totalRows, ids.size()); - Collections.sort(ids); + // Assert exact scan order (no sort) so out-of-order fragments would fail. for (int i = 0; i < totalRows; i++) { - assertEquals(i, ids.get(i)); + assertEquals(i, ids.get(i), "row " + i + " out of expected scan order"); } } } @@ -259,6 +280,11 @@ void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Except } } + /** + * A pushed-down filter is honored by the exported stream: only matching rows cross the C-data + * boundary. {@code id < 20} over ids 0..39 must yield exactly 0..19 in order. Asserted in scan + * order without sorting so a filter/ordering regression cannot hide behind a sort. + */ @Test void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_filter").toString(); @@ -276,9 +302,8 @@ void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception { .build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { scanner.exportArrowStream(stream.memoryAddress()); - List ids = drainIdStream(allocator, stream, 50); + List ids = drainIdStream(allocator, stream); assertEquals(20, ids.size()); - Collections.sort(ids); for (int i = 0; i < 20; i++) { assertEquals(i, ids.get(i)); } @@ -288,6 +313,11 @@ void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception { } } + /** + * Pushed-down limit and offset are honored by the exported stream. Over ids 0..39, {@code + * offset(10).limit(5)} must yield exactly [10, 11, 12, 13, 14] in order — asserted as an exact + * ordered list so both the window bounds and the ordering are checked. + */ @Test void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_limit_offset").toString(); @@ -306,7 +336,7 @@ void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exceptio .build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { scanner.exportArrowStream(stream.memoryAddress()); - List ids = drainIdStream(allocator, stream, 50); + List ids = drainIdStream(allocator, stream); assertEquals(Arrays.asList(10, 11, 12, 13, 14), ids); } } @@ -314,6 +344,12 @@ void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exceptio } } + /** + * Column projection is reflected in the exported stream's schema. {@code SimpleTestDataset} has + * columns {@code (id, name)}; projecting only {@code name} must produce a stream whose schema is + * exactly that one column. The schema is checked on the imported reader before draining, and the + * full row count is verified after. + */ @Test void testExportArrowStreamProjectsRequestedColumnsOnly(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_projection").toString(); @@ -343,6 +379,13 @@ void testExportArrowStreamProjectsRequestedColumnsOnly(@TempDir Path tempDir) th } } + /** + * A scan that matches no rows ({@code id < 0}) still exports a valid, well-formed stream that + * yields zero rows. {@link #drainIdStream} asserts the projected schema ({@code id: int32}) on + * the imported reader before any {@code loadNextBatch()}, so this case also guards the empty-scan + * schema — a regression that exported a wrong or absent schema for zero-row results would fail + * here even though no batch is ever produced. + */ @Test void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_empty").toString(); @@ -356,7 +399,7 @@ void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception { new ScanOptions.Builder().columns(Arrays.asList("id")).filter("id < 0").build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { scanner.exportArrowStream(stream.memoryAddress()); - List ids = drainIdStream(allocator, stream, 1024); + List ids = drainIdStream(allocator, stream); assertTrue(ids.isEmpty()); } } @@ -364,6 +407,17 @@ void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception { } } + /** + * Guards against the sequential "export twice into the same stream" mistake. After the first + * export installs a producer (non-null {@code release} callback), a second export into the same + * stream must be rejected with {@link IllegalArgumentException} rather than overwriting the C + * struct in place — overwriting would drop the first producer's release callback and leak it. + * + *

The test also verifies the rejection is non-destructive: the first producer is still intact + * and fully drainable (all 40 rows) after the rejected second call. This is the single-threaded + * misuse case; concurrent exports into one caller-owned stream are the caller's responsibility, + * as documented on {@link LanceScanner#exportArrowStream(long)}. + */ @Test void testExportArrowStreamRejectsPopulatedStream(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_reject_populated").toString(); @@ -399,6 +453,10 @@ void testExportArrowStreamRejectsPopulatedStream(@TempDir Path tempDir) throws E } } + /** + * A null (0) stream address is rejected with {@link IllegalArgumentException} before any native + * dereference, so a caller mistake cannot turn into a native null-pointer write. + */ @Test void testExportArrowStreamRejectsNullAddress(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_reject_null").toString(); @@ -415,6 +473,11 @@ void testExportArrowStreamRejectsNullAddress(@TempDir Path tempDir) throws Excep } } + /** + * Exporting from a closed scanner is rejected with {@link IllegalArgumentException} (the native + * scanner handle is zero after {@code close()}), rather than dereferencing a freed handle. The + * scanner is closed explicitly here, so it is intentionally not in a try-with-resources. + */ @Test void testExportArrowStreamRejectsClosedScanner(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("export_stream_reject_closed").toString(); @@ -435,6 +498,114 @@ void testExportArrowStreamRejectsClosedScanner(@TempDir Path tempDir) throws Exc } } + /** + * Null values survive the C-data export round-trip. {@code writeSortByDataset} writes 10 rows + * (insertion order) in which {@code id} is null at rows 2 and 5 and {@code name} is null at rows + * 0 and 6. An unordered scan returns rows in insertion order, so the exported stream must + * reproduce both the non-null values and the null positions exactly — null/validity bitmaps are a + * common casualty of an incorrect C-data export, so this guards them explicitly. + */ + @Test + void testExportArrowStreamPreservesNulls(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_nulls").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.writeSortByDataset(1)) { + // Insertion order, row -> (id, name): + // 0 -> (0, null) 3 -> (2, "P2") 6 -> (3, null) 9 -> (5, "P5") + // 1 -> (1, "P0") 4 -> (2, "P3") 7 -> (4, "P4") + // 2 -> (null,"P1") 5 -> (null,"P3") 8 -> (4, "P5") + Integer[] expectedIds = {0, 1, null, 2, 2, null, 3, 4, 4, 5}; + String[] expectedNames = {null, "P0", "P1", "P2", "P3", "P3", null, "P4", "P5", "P5"}; + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder().columns(Arrays.asList("id", "name")).build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(2, root.getSchema().getFields().size()); + int row = 0; + while (reader.loadNextBatch()) { + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + for (int i = 0; i < root.getRowCount(); i++, row++) { + if (expectedIds[row] == null) { + assertTrue(idVector.isNull(i), "id should be null at row " + row); + } else { + assertEquals( + expectedIds[row].intValue(), idVector.get(i), "id mismatch at row " + row); + } + if (expectedNames[row] == null) { + assertTrue(nameVector.isNull(i), "name should be null at row " + row); + } else { + assertEquals( + expectedNames[row], + new String(nameVector.get(i), StandardCharsets.UTF_8), + "name mismatch at row " + row); + } + } + } + assertEquals(expectedIds.length, row); + } + } + } + } + } + } + + /** + * With {@code strictBatchSize(true)}, the exported stream must split into batches no larger than + * the requested batch size, and still reproduce every row in order. This is the one place the + * per-batch size is part of the contract; the other export tests deliberately leave batch sizing + * unasserted because it is only a hint by default. Mirrors {@link #testStrictBatchSize} but over + * the C-data export path. A batch size of 10 over 25 rows yields batches of at most 10. + */ + @Test + void testExportArrowStreamStrictBatchSize(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_strict_batch").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 25; + int batchSize = 10; + try (Dataset dataset = testDataset.write(1, totalRows)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchSize) + .strictBatchSize(true) + .columns(Arrays.asList("id")) + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + List ids = new ArrayList<>(); + while (reader.loadNextBatch()) { + int rowsInBatch = root.getRowCount(); + assertTrue( + rowsInBatch <= batchSize, + "strict: batch of " + rowsInBatch + " should be <= " + batchSize); + IntVector idVector = (IntVector) root.getVector("id"); + for (int i = 0; i < rowsInBatch; i++) { + ids.add(idVector.get(i)); + } + } + assertEquals(totalRows, ids.size()); + for (int i = 0; i < totalRows; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString(); From a31bd1fffe0a3f6996463c51542081f392adfdcc Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Mon, 22 Jun 2026 08:55:57 -0700 Subject: [PATCH 4/4] style(java): rephrase comment to satisfy typos spell-check The typos linter parses the `(mis)` fragment in `(mis)aligned` as a standalone word and flags it as a misspelling of `miss`/`mist`. Reword to "possibly-unaligned reference" to keep the same meaning without the parenthetical token. Comment-only change. --- java/lance-jni/src/blocking_scanner.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/java/lance-jni/src/blocking_scanner.rs b/java/lance-jni/src/blocking_scanner.rs index c94b51ebe7e..bc1f7656743 100644 --- a/java/lance-jni/src/blocking_scanner.rs +++ b/java/lance-jni/src/blocking_scanner.rs @@ -605,8 +605,9 @@ fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) - // The struct is allocated by Arrow Java inside an ArrowBuf and is not guaranteed to be aligned // (hence `write_unaligned` below), so we must not form a reference to it. We read only the // `release` field through an unaligned read: `addr_of!` computes the field address without - // creating an intermediate (mis)aligned reference, and the field is an `Option` which is - // `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched. + // creating an intermediate, possibly-unaligned reference, and the field is an `Option` + // which is `Copy` with no destructor, so reading a copy of it leaves the caller's stream + // untouched. let release_is_set = unsafe { let stream_ptr = stream_addr as *const FFI_ArrowArrayStream; let release = std::ptr::read_unaligned(std::ptr::addr_of!((*stream_ptr).release));