From e6fd395250b22f7c07d34ddf8cdfb900b10a7e45 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 14:58:46 +0530 Subject: [PATCH 1/7] Add bounded SEA API support for CloudFetch (UseBoundedSeaApi=0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part 1 of bounded SEA API compliance for CloudFetch: 1. New connection property UseBoundedSeaApi (default 0/off). When enabled: - Sends row_offset query parameter on GetResultData requests - Forces StreamingChunkProvider (which uses next_chunk_index, not total_chunk_count) even when streaming is explicitly disabled 2. StreamingChunkProvider already uses next_chunk_index for continuation and end-of-stream detection — no changes needed to its core logic. 3. Legacy RemoteChunkProvider (uses total_chunk_count) is bypassed when bounded SEA is enabled. row_offset is derived from the previous link's row_offset + row_count and sent as a query parameter on /sql/statements/{id}/result/chunks/{idx}. This is required for future >100GB results and cluster-side fetch. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksConnectionContext.java | 5 +++++ .../databricks/jdbc/api/impl/arrow/ArrowStreamResult.java | 4 +++- .../databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java | 4 ++-- .../jdbc/api/internal/IDatabricksConnectionContext.java | 3 +++ .../com/databricks/jdbc/common/DatabricksJdbcUrlParams.java | 4 ++++ .../jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java | 4 ++++ 6 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index 956695ab51..6a793a1231 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1"); } + @Override + public boolean isBoundedSeaApiEnabled() { + return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1"); + } + @Override public int getThriftMaxBatchesInMemory() { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 1569b7efb8..bd8949f2f4 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -102,7 +102,9 @@ private static ChunkProvider createRemoteChunkProvider( IDatabricksConnectionContext connectionContext = session.getConnectionContext(); - if (connectionContext.isStreamingChunkProviderEnabled()) { + // Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count + if (connectionContext.isStreamingChunkProviderEnabled() + || connectionContext.isBoundedSeaApiEnabled()) { LOGGER.info( "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java index 9be8dccf10..7df1586cca 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId) @Override public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset) throws SQLException { - // SEA uses startChunkIndex; startRowOffset is ignored + // SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API LOGGER.debug( "Fetching links starting from chunk index {} for statement {}", startChunkIndex, @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset @Override public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException { - // SEA uses chunkIndex; rowOffset is ignored + // SEA uses chunkIndex; rowOffset is passed through for bounded SEA API LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId); ChunkLinkFetchResult result = diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index 8ca5ae0cd2..5754ec0daf 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() { */ boolean isCloudFetchEnabled(); + /** Returns whether bounded SEA API mode is enabled for CloudFetch. */ + boolean isBoundedSeaApiEnabled(); + /** * Returns the maximum number of batches to keep in memory for Thrift streaming. * diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 639288248d..70ec2d4842 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -204,6 +204,10 @@ public enum DatabricksJdbcUrlParams { "EnableSeaSyncMetadata", "Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode", "1"), + USE_BOUNDED_SEA_API( + "UseBoundedSeaApi", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count", + "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", "Disable requesting OAuth refresh tokens (omit offline_access unless explicitly provided)", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 4b73982445..21cee3ce6c 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -529,6 +529,10 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + // Bounded SEA API: send row_offset to support future >100GB results and cluster-side fetch + if (connectionContext.isBoundedSeaApiEnabled() && chunkStartRowOffset > 0) { + path = path + "?row_offset=" + chunkStartRowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN")); From 5d577dc02d07960e0e2ef6489d3dfd06659a06e5 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 15:04:03 +0530 Subject: [PATCH 2/7] Add batched link refresh reconciliation for bounded SEA API During coalesced link refresh, the server may return links for chunks not yet in the provider's map (newly-discovered chunks beyond highestKnownChunkIndex). Previously these were silently skipped. Now: create new chunks from refresh response links, update highestKnownChunkIndex, and set endOfStreamReached from the response's hasMore flag. Follows the per-chunk state-machine reconciliation from the bounded SEA API spec. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../impl/arrow/StreamingChunkProvider.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index dbb1bbe789..4e02d6e0af 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -596,12 +596,13 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti // Single batch FetchResults RPC from the lowest expired offset ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset); - // Update ALL pre-download chunks that received fresh links. - // Always overwrite even if the current link hasn't expired yet, since the - // server-provided link has a later expiry and prevents near-expiry races. + // Reconcile ALL links from the refresh response with local chunk state. for (ExternalLink link : result.getChunkLinks()) { ArrowResultChunk c = chunks.get(link.getChunkIndex()); if (c != null) { + // Existing chunk: update link only for pre-download states. + // DOWNLOADING stays as-is (download task owns the state machine). + // DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory). ChunkStatus status = c.getStatus(); if (status == ChunkStatus.PENDING || status == ChunkStatus.URL_FETCHED @@ -609,9 +610,26 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti || status == ChunkStatus.DOWNLOAD_RETRY) { c.setChunkLink(link); } + } else { + // New chunk from server not yet in our map — create it. + // This handles the bounded SEA case where the refresh response + // may include chunks beyond our current highestKnownChunkIndex. + try { + createChunkFromLink(link); + } catch (Exception e) { + LOGGER.debug( + "Failed to create chunk {} from refresh response: {}", + link.getChunkIndex(), + e.getMessage()); + } } } + // Update end-of-stream from refresh response + if (!result.hasMore()) { + endOfStreamReached = true; + } + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { From 271c4c2838481ec5b9cc1f00f269be3e1d273b71 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 15:19:43 +0530 Subject: [PATCH 3/7] Bring StreamingChunkDownloadTask to parity with ChunkDownloadTask MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes 3 gaps found by comparing with the legacy ChunkDownloadTask: 1. Outer catch(Throwable) + exception chaining in finally: uncaught exceptions were lost — the finally block created a generic exception without the original cause. Now captures uncaughtException and chains it, matching ChunkDownloadTask's pattern. 2. Thread context propagation: download threads had no connection context or statementId for telemetry/logging. Now captures caller's context via DatabricksThreadContextHolder and clears in finally. 3. Download timing: added task-level timing log (totalMs, retries) matching ChunkDownloadTask's diagnostics. Also includes the RuntimeException catch (parity with PR #1302). Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../arrow/StreamingChunkDownloadTask.java | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index efc664b247..15a7e315d0 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -1,6 +1,8 @@ package com.databricks.jdbc.api.impl.arrow; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; import com.databricks.jdbc.dbclient.IDatabricksHttpClient; import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.log.JdbcLogger; @@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable { private final LinkRefresher linkRefresher; private final double cloudFetchSpeedThreshold; + // Capture caller's thread context for telemetry/logging on the download thread + private final IDatabricksConnectionContext connectionContext; + private final String statementId; + public StreamingChunkDownloadTask( ArrowResultChunk chunk, IDatabricksHttpClient httpClient, @@ -40,13 +46,21 @@ public StreamingChunkDownloadTask( this.compressionCodec = compressionCodec; this.linkRefresher = linkRefresher; this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold; + this.connectionContext = DatabricksThreadContextHolder.getConnectionContext(); + this.statementId = DatabricksThreadContextHolder.getStatementId(); } @Override public Void call() throws DatabricksSQLException { int retries = 0; boolean downloadSuccessful = false; + Throwable uncaughtException = null; + + // Propagate caller's thread context for telemetry/logging + DatabricksThreadContextHolder.setConnectionContext(this.connectionContext); + DatabricksThreadContextHolder.setStatementId(this.statementId); + long taskStartTime = System.nanoTime(); try { while (!downloadSuccessful) { try { @@ -62,9 +76,14 @@ public Void call() throws DatabricksSQLException { chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold); downloadSuccessful = true; - LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex()); + long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000; + LOGGER.debug( + "Chunk download complete: chunkIndex={}, totalMs={}, retries={}", + chunk.getChunkIndex(), + taskTotalMs, + retries); - } catch (IOException | SQLException e) { + } catch (IOException | SQLException | RuntimeException e) { retries++; if (retries >= MAX_RETRIES) { LOGGER.error( @@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - // Status will be set to DOWNLOAD_FAILED in the finally block + chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", @@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException { } } } + } catch (Throwable t) { + uncaughtException = t; + throw t; } finally { if (downloadSuccessful) { chunk.getChunkReadyFuture().complete(null); } else { + LOGGER.info( + "Download failed for chunk {}: {}", + chunk.getChunkIndex(), + uncaughtException != null ? uncaughtException.getMessage() : "unknown"); chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); chunk .getChunkReadyFuture() .completeExceptionally( new DatabricksSQLException( "Download failed for chunk " + chunk.getChunkIndex(), + uncaughtException, DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR)); } + + DatabricksThreadContextHolder.clearAllContext(); } return null; From 95fcfc9a69a81ea51bff4af83fe063161b0d8354 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 17:05:23 +0530 Subject: [PATCH 4/7] Address review feedback on bounded SEA API PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0-1: Remove redundant chunk.setStatus(DOWNLOAD_FAILED) in inner catch — defer entirely to finally block. Fixes StreamingChunkDownloadTaskTest. P0-2: Add NEXT_CHANGELOG.md entry under ### Added for UseBoundedSeaApi. P1-1: Call triggerDownloads() after reconciliation creates new chunks from refresh response — prevents newly-discovered chunks sitting PENDING. P1-2/P1-3: Un-gated changes (new chunk creation, EOS from refresh, triggerDownloads) are intentional parity fixes for all EnableStreamingChunkProvider=1 users. EnableStreamingChunkProvider defaults to off, so default users are unaffected. P1-4: Revert RuntimeException from inner catch — DatabricksError is caught by outer catch(Throwable) and fails immediately (no retry), matching ChunkDownloadTask behavior exactly. NPE/ISE won't be retried. P2-1: Always send row_offset (even 0 for chunk 0) when bounded SEA enabled — explicit is safer than relying on server default. P2-3: Update nextLinkFetchIndex after reconciliation to avoid prefetch thread re-fetching chunks already discovered via refresh. P2-5: Add "Requires server support" to connection property help text. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- NEXT_CHANGELOG.md | 5 +++++ .../impl/arrow/StreamingChunkDownloadTask.java | 4 ++-- .../api/impl/arrow/StreamingChunkProvider.java | 15 +++++++++++---- .../jdbc/common/DatabricksJdbcUrlParams.java | 2 +- .../impl/sqlexec/DatabricksSdkClient.java | 4 ++-- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index d27a25e263..708fbe50cd 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -3,6 +3,11 @@ ## [Unreleased] ### Added +- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running). +- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends, + ensuring consistent behavior for SQL warehouses regardless of underlying + protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`. +- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support. ### Updated diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index 15a7e315d0..9d9c183ee5 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -83,7 +83,7 @@ public Void call() throws DatabricksSQLException { taskTotalMs, retries); - } catch (IOException | SQLException | RuntimeException e) { + } catch (IOException | SQLException e) { retries++; if (retries >= MAX_RETRIES) { LOGGER.error( @@ -91,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); + // Status set to DOWNLOAD_FAILED in the finally block throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index 4e02d6e0af..21225cc849 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -611,9 +611,9 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti c.setChunkLink(link); } } else { - // New chunk from server not yet in our map — create it. - // This handles the bounded SEA case where the refresh response - // may include chunks beyond our current highestKnownChunkIndex. + // Server returned a chunk not yet in our map — create it. + // Handles cases where refresh response includes chunks beyond + // our current highestKnownChunkIndex. try { createChunkFromLink(link); } catch (Exception e) { @@ -625,11 +625,18 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti } } - // Update end-of-stream from refresh response + // Update end-of-stream and prefetch index from refresh response if (!result.hasMore()) { endOfStreamReached = true; + } else if (result.getNextFetchIndex() > nextLinkFetchIndex) { + // Avoid re-fetching chunks that the refresh already discovered + nextLinkFetchIndex = result.getNextFetchIndex(); + nextRowOffsetToFetch = result.getNextRowOffset(); } + // Trigger downloads for any newly-created chunks + triggerDownloads(); + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 70ec2d4842..b9a70f1a8a 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -206,7 +206,7 @@ public enum DatabricksJdbcUrlParams { "1"), USE_BOUNDED_SEA_API( "UseBoundedSeaApi", - "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.", "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 21cee3ce6c..b714f2a385 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -529,8 +529,8 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); - // Bounded SEA API: send row_offset to support future >100GB results and cluster-side fetch - if (connectionContext.isBoundedSeaApiEnabled() && chunkStartRowOffset > 0) { + // Bounded SEA API: always send row_offset (even 0 for chunk 0) + if (connectionContext.isBoundedSeaApiEnabled()) { path = path + "?row_offset=" + chunkStartRowOffset; } try { From 7e0b2c1160f222fe1330af7787f659a2c3ea47ea Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 13:54:56 +0530 Subject: [PATCH 5/7] Fix race condition in createChunkFromLink and non-atomic fetch position updates P1-1: Use putIfAbsent in createChunkFromLink to prevent double row-count and chunk replacement when called concurrently from prefetch and download threads. Without this, a race could leave a CompletableFuture that is never completed, causing consumer hangs. P1-2: Bundle nextLinkFetchIndex + nextRowOffsetToFetch into an immutable FetchPosition holder updated via volatile reference. This ensures the prefetch thread always reads a consistent (index, rowOffset) pair, which matters for bounded SEA API where row_offset is used by the server. P2-1: Bump reconciliation failure log from DEBUG to WARN for production visibility. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../impl/arrow/StreamingChunkProvider.java | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index 21225cc849..a4d12e88bd 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -47,6 +47,17 @@ */ public class StreamingChunkProvider implements ChunkProvider { + /** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */ + private static final class FetchPosition { + final long chunkIndex; + final long rowOffset; + + FetchPosition(long chunkIndex, long rowOffset) { + this.chunkIndex = chunkIndex; + this.rowOffset = rowOffset; + } + } + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(StreamingChunkProvider.class); private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-"; @@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider { // - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency private final AtomicLong currentChunkIndex = new AtomicLong(-1); private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1); - private volatile long nextLinkFetchIndex = 0; - private volatile long nextRowOffsetToFetch = 0; + // Bundled as an immutable pair for atomic reads/writes across threads. + // The prefetch thread reads this (fetchNextLinkBatch) while the download thread + // may update it (getRefreshedLink reconciliation). A volatile reference to an + // immutable holder ensures both fields are always read consistently. + private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0); private final AtomicLong nextDownloadIndex = new AtomicLong(0); // State flags @@ -347,11 +361,11 @@ private void linkPrefetchLoop() { long targetIndex = currentChunkIndex.get() + linkPrefetchWindow; // Wait if we're caught up - while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) { + while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) { if (closed) break; LOGGER.debug( "Prefetch caught up, waiting for consumer. next={}, target={}", - nextLinkFetchIndex, + nextFetchPosition.chunkIndex, targetIndex); consumerAdvanced.await(); targetIndex = currentChunkIndex.get() + linkPrefetchWindow; @@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException { return; } + FetchPosition pos = nextFetchPosition; LOGGER.debug( "Fetching links starting from index {}, row offset {} for statement {}", - nextLinkFetchIndex, - nextRowOffsetToFetch, + pos.chunkIndex, + pos.rowOffset, statementId); - ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch); + ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset); if (result.isEndOfStream()) { LOGGER.info("End of stream reached for statement {}", statementId); @@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException { createChunkFromLink(link); } - // Update next fetch positions + // Update next fetch position atomically if (result.hasMore()) { - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } else { endOfStreamReached = true; LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId); @@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) createChunkFromLink(link); } - // Set next fetch positions using unified API + // Set next fetch position atomically if (initialLinks.hasMore()) { - nextLinkFetchIndex = initialLinks.getNextFetchIndex(); - nextRowOffsetToFetch = initialLinks.getNextRowOffset(); + FetchPosition pos = + new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset()); + nextFetchPosition = pos; LOGGER.debug( "Next fetch position set to chunk index {}, row offset {} from initial links", - nextLinkFetchIndex, - nextRowOffsetToFetch); + pos.chunkIndex, + pos.rowOffset); } else { endOfStreamReached = true; LOGGER.info("End of stream reached from initial links for statement {}", statementId); @@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) */ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException { long chunkIndex = link.getChunkIndex(); - if (chunks.containsKey(chunkIndex)) { - LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); - return; - } - long rowCount = link.getRowCount(); long rowOffset = link.getRowOffset(); @@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce .build(); chunk.setChunkLink(link); - chunks.put(chunkIndex, chunk); + + // Atomic insert — if another thread already created this chunk, skip. + // This is safe because createChunkFromLink can be called concurrently from + // the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink). + ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk); + if (existing != null) { + LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); + return; + } + highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex)); totalRowCount.addAndGet(rowCount); @@ -617,7 +636,7 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti try { createChunkFromLink(link); } catch (Exception e) { - LOGGER.debug( + LOGGER.warn( "Failed to create chunk {} from refresh response: {}", link.getChunkIndex(), e.getMessage()); @@ -625,13 +644,14 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti } } - // Update end-of-stream and prefetch index from refresh response + // Update end-of-stream and prefetch position from refresh response if (!result.hasMore()) { endOfStreamReached = true; - } else if (result.getNextFetchIndex() > nextLinkFetchIndex) { - // Avoid re-fetching chunks that the refresh already discovered - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + } else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) { + // Avoid re-fetching chunks that the refresh already discovered. + // Atomic update of both fields via immutable holder. + nextFetchPosition = + new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } // Trigger downloads for any newly-created chunks From 26823e33ba459424d0fde31281f369fa44b4b3ee Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 22:31:03 +0530 Subject: [PATCH 6/7] Stop depending on manifest.total_chunk_count in bounded SEA CloudFetch path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the bounded SEA API contract, drivers must not depend on manifest.{chunks, total_chunk_count, total_row_count}. Pass null for totalChunkCount when converting initial ExternalLinks for StreamingChunkProvider — the provider derives end-of-stream from next_chunk_index on ExternalLink instead. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/arrow/ArrowStreamResult.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index bd8949f2f4..8ef7871658 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -115,10 +115,13 @@ private static ChunkProvider createRemoteChunkProvider( int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - // Convert ExternalLinks to ChunkLinkFetchResult for the provider + // Convert ExternalLinks to ChunkLinkFetchResult for the provider. + // Bounded SEA API: pass null for totalChunkCount — we must not depend on + // manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract. + Long totalChunkCount = + connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount(); ChunkLinkFetchResult initialLinks = - convertToChunkLinkFetchResult( - resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); + convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount); return new StreamingChunkProvider( linkFetcher, From 600446699b338aa8aef9a4cf6aff0e3a63bda0bf Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 22:50:12 +0530 Subject: [PATCH 7/7] Stop depending on manifest.total_row_count in bounded SEA API for isLast/isAfterLast When boundedSeaApiEnabled=true, isLast() and isAfterLast() now use hasNext() instead of resultSetMetaData.getTotalRows(). The bounded SEA API contract does not guarantee manifest.total_row_count is populated; the chunk providers derive end-of-stream from next_chunk_index instead. Gated behind isBoundedSeaApiEnabled() + ArrowStreamResult instanceof check so existing Thrift and non-bounded SEA behavior is unchanged. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksResultSet.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index 5406994072..5461d77d96 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -77,6 +77,7 @@ enum ResultSetType { private ResultSetType resultSetType = ResultSetType.UNASSIGNED; private boolean complexDatatypeSupport = false; + private boolean boundedSeaApiEnabled = false; // Cached telemetry collector resolved once at construction time to avoid // per-row overhead in next(). The connection-to-collector mapping is stable @@ -128,6 +129,7 @@ public DatabricksResultSet( resultSetMetaData = null; } this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled(); + this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled(); this.statementType = statementType; this.updateCount = null; this.parentStatement = parentStatement; @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException { public boolean isAfterLast() throws SQLException { checkIfClosed(); // Account for client-side maxRows truncation - return truncatedByMaxRows - || executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); + if (truncatedByMaxRows) { + return true; + } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + // which derives end-of-stream from next_chunk_index via the chunk provider. + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } + return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); } @Override @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException { || executionResult instanceof StreamingInlineArrowResult) { return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1; }