diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index d27a25e26..708fbe50c 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -3,6 +3,11 @@ ## [Unreleased] ### Added +- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running). +- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends, + ensuring consistent behavior for SQL warehouses regardless of underlying + protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`. +- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support. ### Updated diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index 956695ab5..6a793a123 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1"); } + @Override + public boolean isBoundedSeaApiEnabled() { + return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1"); + } + @Override public int getThriftMaxBatchesInMemory() { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index 540699407..5461d77d9 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -77,6 +77,7 @@ enum ResultSetType { private ResultSetType resultSetType = ResultSetType.UNASSIGNED; private boolean complexDatatypeSupport = false; + private boolean boundedSeaApiEnabled = false; // Cached telemetry collector resolved once at construction time to avoid // per-row overhead in next(). The connection-to-collector mapping is stable @@ -128,6 +129,7 @@ public DatabricksResultSet( resultSetMetaData = null; } this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled(); + this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled(); this.statementType = statementType; this.updateCount = null; this.parentStatement = parentStatement; @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException { public boolean isAfterLast() throws SQLException { checkIfClosed(); // Account for client-side maxRows truncation - return truncatedByMaxRows - || executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); + if (truncatedByMaxRows) { + return true; + } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + // which derives end-of-stream from next_chunk_index via the chunk provider. + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } + return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); } @Override @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException { || executionResult instanceof StreamingInlineArrowResult) { return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1; } diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 1569b7efb..8ef787165 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -102,7 +102,9 @@ private static ChunkProvider createRemoteChunkProvider( IDatabricksConnectionContext connectionContext = session.getConnectionContext(); - if (connectionContext.isStreamingChunkProviderEnabled()) { + // Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count + if (connectionContext.isStreamingChunkProviderEnabled() + || connectionContext.isBoundedSeaApiEnabled()) { LOGGER.info( "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); @@ -113,10 +115,13 @@ private static ChunkProvider createRemoteChunkProvider( int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - // Convert ExternalLinks to ChunkLinkFetchResult for the provider + // Convert ExternalLinks to ChunkLinkFetchResult for the provider. + // Bounded SEA API: pass null for totalChunkCount — we must not depend on + // manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract. + Long totalChunkCount = + connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount(); ChunkLinkFetchResult initialLinks = - convertToChunkLinkFetchResult( - resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); + convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount); return new StreamingChunkProvider( linkFetcher, diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java index 9be8dccf1..7df1586cc 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId) @Override public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset) throws SQLException { - // SEA uses startChunkIndex; startRowOffset is ignored + // SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API LOGGER.debug( "Fetching links starting from chunk index {} for statement {}", startChunkIndex, @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset @Override public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException { - // SEA uses chunkIndex; rowOffset is ignored + // SEA uses chunkIndex; rowOffset is passed through for bounded SEA API LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId); ChunkLinkFetchResult result = diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index efc664b24..9d9c183ee 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -1,6 +1,8 @@ package com.databricks.jdbc.api.impl.arrow; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; import com.databricks.jdbc.dbclient.IDatabricksHttpClient; import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.log.JdbcLogger; @@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable { private final LinkRefresher linkRefresher; private final double cloudFetchSpeedThreshold; + // Capture caller's thread context for telemetry/logging on the download thread + private final IDatabricksConnectionContext connectionContext; + private final String statementId; + public StreamingChunkDownloadTask( ArrowResultChunk chunk, IDatabricksHttpClient httpClient, @@ -40,13 +46,21 @@ public StreamingChunkDownloadTask( this.compressionCodec = compressionCodec; this.linkRefresher = linkRefresher; this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold; + this.connectionContext = DatabricksThreadContextHolder.getConnectionContext(); + this.statementId = DatabricksThreadContextHolder.getStatementId(); } @Override public Void call() throws DatabricksSQLException { int retries = 0; boolean downloadSuccessful = false; + Throwable uncaughtException = null; + + // Propagate caller's thread context for telemetry/logging + DatabricksThreadContextHolder.setConnectionContext(this.connectionContext); + DatabricksThreadContextHolder.setStatementId(this.statementId); + long taskStartTime = System.nanoTime(); try { while (!downloadSuccessful) { try { @@ -62,7 +76,12 @@ public Void call() throws DatabricksSQLException { chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold); downloadSuccessful = true; - LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex()); + long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000; + LOGGER.debug( + "Chunk download complete: chunkIndex={}, totalMs={}, retries={}", + chunk.getChunkIndex(), + taskTotalMs, + retries); } catch (IOException | SQLException e) { retries++; @@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - // Status will be set to DOWNLOAD_FAILED in the finally block + // Status set to DOWNLOAD_FAILED in the finally block throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", @@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException { } } } + } catch (Throwable t) { + uncaughtException = t; + throw t; } finally { if (downloadSuccessful) { chunk.getChunkReadyFuture().complete(null); } else { + LOGGER.info( + "Download failed for chunk {}: {}", + chunk.getChunkIndex(), + uncaughtException != null ? uncaughtException.getMessage() : "unknown"); chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); chunk .getChunkReadyFuture() .completeExceptionally( new DatabricksSQLException( "Download failed for chunk " + chunk.getChunkIndex(), + uncaughtException, DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR)); } + + DatabricksThreadContextHolder.clearAllContext(); } return null; diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index dbb1bbe78..a4d12e88b 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -47,6 +47,17 @@ */ public class StreamingChunkProvider implements ChunkProvider { + /** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */ + private static final class FetchPosition { + final long chunkIndex; + final long rowOffset; + + FetchPosition(long chunkIndex, long rowOffset) { + this.chunkIndex = chunkIndex; + this.rowOffset = rowOffset; + } + } + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(StreamingChunkProvider.class); private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-"; @@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider { // - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency private final AtomicLong currentChunkIndex = new AtomicLong(-1); private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1); - private volatile long nextLinkFetchIndex = 0; - private volatile long nextRowOffsetToFetch = 0; + // Bundled as an immutable pair for atomic reads/writes across threads. + // The prefetch thread reads this (fetchNextLinkBatch) while the download thread + // may update it (getRefreshedLink reconciliation). A volatile reference to an + // immutable holder ensures both fields are always read consistently. + private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0); private final AtomicLong nextDownloadIndex = new AtomicLong(0); // State flags @@ -347,11 +361,11 @@ private void linkPrefetchLoop() { long targetIndex = currentChunkIndex.get() + linkPrefetchWindow; // Wait if we're caught up - while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) { + while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) { if (closed) break; LOGGER.debug( "Prefetch caught up, waiting for consumer. next={}, target={}", - nextLinkFetchIndex, + nextFetchPosition.chunkIndex, targetIndex); consumerAdvanced.await(); targetIndex = currentChunkIndex.get() + linkPrefetchWindow; @@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException { return; } + FetchPosition pos = nextFetchPosition; LOGGER.debug( "Fetching links starting from index {}, row offset {} for statement {}", - nextLinkFetchIndex, - nextRowOffsetToFetch, + pos.chunkIndex, + pos.rowOffset, statementId); - ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch); + ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset); if (result.isEndOfStream()) { LOGGER.info("End of stream reached for statement {}", statementId); @@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException { createChunkFromLink(link); } - // Update next fetch positions + // Update next fetch position atomically if (result.hasMore()) { - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } else { endOfStreamReached = true; LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId); @@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) createChunkFromLink(link); } - // Set next fetch positions using unified API + // Set next fetch position atomically if (initialLinks.hasMore()) { - nextLinkFetchIndex = initialLinks.getNextFetchIndex(); - nextRowOffsetToFetch = initialLinks.getNextRowOffset(); + FetchPosition pos = + new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset()); + nextFetchPosition = pos; LOGGER.debug( "Next fetch position set to chunk index {}, row offset {} from initial links", - nextLinkFetchIndex, - nextRowOffsetToFetch); + pos.chunkIndex, + pos.rowOffset); } else { endOfStreamReached = true; LOGGER.info("End of stream reached from initial links for statement {}", statementId); @@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) */ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException { long chunkIndex = link.getChunkIndex(); - if (chunks.containsKey(chunkIndex)) { - LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); - return; - } - long rowCount = link.getRowCount(); long rowOffset = link.getRowOffset(); @@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce .build(); chunk.setChunkLink(link); - chunks.put(chunkIndex, chunk); + + // Atomic insert — if another thread already created this chunk, skip. + // This is safe because createChunkFromLink can be called concurrently from + // the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink). + ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk); + if (existing != null) { + LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); + return; + } + highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex)); totalRowCount.addAndGet(rowCount); @@ -596,12 +615,13 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti // Single batch FetchResults RPC from the lowest expired offset ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset); - // Update ALL pre-download chunks that received fresh links. - // Always overwrite even if the current link hasn't expired yet, since the - // server-provided link has a later expiry and prevents near-expiry races. + // Reconcile ALL links from the refresh response with local chunk state. for (ExternalLink link : result.getChunkLinks()) { ArrowResultChunk c = chunks.get(link.getChunkIndex()); if (c != null) { + // Existing chunk: update link only for pre-download states. + // DOWNLOADING stays as-is (download task owns the state machine). + // DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory). ChunkStatus status = c.getStatus(); if (status == ChunkStatus.PENDING || status == ChunkStatus.URL_FETCHED @@ -609,9 +629,34 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti || status == ChunkStatus.DOWNLOAD_RETRY) { c.setChunkLink(link); } + } else { + // Server returned a chunk not yet in our map — create it. + // Handles cases where refresh response includes chunks beyond + // our current highestKnownChunkIndex. + try { + createChunkFromLink(link); + } catch (Exception e) { + LOGGER.warn( + "Failed to create chunk {} from refresh response: {}", + link.getChunkIndex(), + e.getMessage()); + } } } + // Update end-of-stream and prefetch position from refresh response + if (!result.hasMore()) { + endOfStreamReached = true; + } else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) { + // Avoid re-fetching chunks that the refresh already discovered. + // Atomic update of both fields via immutable holder. + nextFetchPosition = + new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); + } + + // Trigger downloads for any newly-created chunks + triggerDownloads(); + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index 8ca5ae0cd..5754ec0da 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() { */ boolean isCloudFetchEnabled(); + /** Returns whether bounded SEA API mode is enabled for CloudFetch. */ + boolean isBoundedSeaApiEnabled(); + /** * Returns the maximum number of batches to keep in memory for Thrift streaming. * diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 639288248..b9a70f1a8 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -204,6 +204,10 @@ public enum DatabricksJdbcUrlParams { "EnableSeaSyncMetadata", "Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode", "1"), + USE_BOUNDED_SEA_API( + "UseBoundedSeaApi", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.", + "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", "Disable requesting OAuth refresh tokens (omit offline_access unless explicitly provided)", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 4b7398244..b714f2a38 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -529,6 +529,10 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + // Bounded SEA API: always send row_offset (even 0 for chunk 0) + if (connectionContext.isBoundedSeaApiEnabled()) { + path = path + "?row_offset=" + chunkStartRowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN"));