diff --git a/CHANGELOG.md b/CHANGELOG.md index 085cf89b9..1185b041d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ +## [Unreleased] + +### New Features +- [client-v2] Async HTTP support using Apache HttpClient 5 NIO API (https://github.com/ClickHouse/clickhouse-java/issues/1831) + - True non-blocking I/O for queries and inserts + - Streaming responses with constant memory usage (~512KB regardless of result size) + - Streaming request compression (HTTP framed LZ4 and ClickHouse native LZ4) + - Eliminates blocking threads waiting for I/O under high concurrency, significantly reducing thread usage compared to the synchronous client + - Opt-in via `useAsyncHttp(true)` builder option + - Full backward compatibility (async disabled by default) + - Added test coverage (integration tests) + +### Known Limitations +- [client-v2] Async HTTP: SOCKS proxy not supported (Apache HttpClient async limitation) +- [client-v2] Async HTTP: Multipart requests use sync fallback + ## 0.9.6 Release is aimed to address potential security risk in one of the dependencies (see below). We strongly recommend to upgrade. diff --git a/README.md b/README.md index 21c2088ef..cf3fe6d05 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Historically, there are two versions of both components. The previous version of | Name | Client V2 | Client V1 | Comments |----------------------------------------------|:---------:|:---------:|:---------:| | Http Connection |✔ |✔ | | +| Async HTTP (NIO) |✔ |✗ | Non-blocking I/O for high concurrency | | Http Compression (LZ4) |✔ |✔ | | | Server Response Compression - LZ4 |✔ |✔ | | | Client Request Compression - LZ4 |✔ |✔ | | @@ -89,8 +90,36 @@ Nightly Builds: https://s01.oss.sonatype.org/content/repositories/snapshots/com/ [Begin-with Usage Examples](../../tree/main/examples/client-v2) -[Spring Demo Service](https://github.com/ClickHouse/clickhouse-java/tree/main/examples/demo-service) +[Spring Demo Service](https://github.com/ClickHouse/clickhouse-java/tree/main/examples/demo-service) +### Async HTTP Support + +Client V2 supports true async HTTP using Apache HttpClient 5 NIO API for high-concurrency workloads. + +**Features:** +- Non-blocking I/O - no thread-per-request blocking +- Streaming responses with constant memory usage +- Streaming request compression (HTTP and native LZ4) +- Substantial reduction in thread usage under high concurrency + +**Usage:** +```java +Client client = new Client.Builder() + .addEndpoint("http://localhost:8123") + .setUsername("default") + .useAsyncHttp(true) // Enable async HTTP + .build(); + +// Queries and inserts work the same way +CompletableFuture future = client.query("SELECT * FROM table"); +``` + +**When to use:** +- High concurrency (100+ concurrent requests) +- Large result sets or inserts (GB+ data) +- Memory-constrained environments + +Async HTTP is opt-in and disabled by default. Existing code works without changes. ## JDBC Driver diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 9d983decb..ac608892f 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -18,6 +18,7 @@ import com.clickhouse.client.api.internal.ClientStatisticsHolder; import com.clickhouse.client.api.internal.HttpAPIClientHelper; import com.clickhouse.client.api.internal.MapUtils; +import com.clickhouse.client.api.internal.StreamingAsyncEntityProducer; import com.clickhouse.client.api.internal.TableSchemaParser; import com.clickhouse.client.api.internal.ValidationUtils; import com.clickhouse.client.api.metadata.ColumnToMethodMatchingStrategy; @@ -113,8 +114,64 @@ public class Client implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Client.class); + /** + * Shared scheduler for async operation timeouts (Java 8 compatible alternative to orTimeout). + * Uses daemon threads as a fallback so it won't prevent JVM shutdown if not properly released. + * + *

Lifecycle management: The scheduler is lazily created when the first async client is created + * and shut down when the last async client is closed. This ensures graceful resource cleanup.

+ */ + private static final Object TIMEOUT_SCHEDULER_LOCK = new Object(); + private static final java.util.concurrent.atomic.AtomicInteger TIMEOUT_SCHEDULER_REF_COUNT = + new java.util.concurrent.atomic.AtomicInteger(0); + private static volatile java.util.concurrent.ScheduledExecutorService timeoutScheduler = null; + + /** + * Thread-safety: Synchronizes on TIMEOUT_SCHEDULER_LOCK, ensuring mutual exclusion + * with releaseTimeoutScheduler(). No race condition exists because threads cannot + * concurrently execute acquire and release. + */ + private static void acquireTimeoutScheduler() { + synchronized (TIMEOUT_SCHEDULER_LOCK) { + if (TIMEOUT_SCHEDULER_REF_COUNT.getAndIncrement() == 0) { + timeoutScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "ch-async-timeout"); + t.setDaemon(true); + return t; + }); + LOG.debug("Created async timeout scheduler"); + } + } + } + + /** + * Thread-safety: Synchronizes on TIMEOUT_SCHEDULER_LOCK, ensuring mutual exclusion + * with acquireTimeoutScheduler(). The synchronized block guarantees that between + * checking the ref count and shutting down, no other thread can acquire a new reference. + */ + private static void releaseTimeoutScheduler() { + synchronized (TIMEOUT_SCHEDULER_LOCK) { + if (TIMEOUT_SCHEDULER_REF_COUNT.decrementAndGet() == 0 && timeoutScheduler != null) { + LOG.debug("Shutting down async timeout scheduler"); + timeoutScheduler.shutdown(); + try { + if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + timeoutScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + timeoutScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + timeoutScheduler = null; + } + } + } + private HttpAPIClientHelper httpClientHelper = null; + /** Tracks whether this client instance uses async HTTP (for proper executor cleanup) */ + private final boolean usesAsyncHttp; + private final List endpoints; private final Map configuration; @@ -195,6 +252,29 @@ private Client(Collection endpoints, Map configuration, this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown"); this.dbUser = configuration.getOrDefault(ClientConfigProperties.USER.getKey(), ClientConfigProperties.USER.getDefObjVal()); this.typeHintMapping = (Map>) this.configuration.get(ClientConfigProperties.TYPE_HINT_MAPPING.getKey()); + + // Acquire shared async resources if async HTTP is enabled + this.usesAsyncHttp = httpClientHelper.isAsyncEnabled(); + if (this.usesAsyncHttp) { + boolean timeoutSchedulerAcquired = false; + try { + acquireTimeoutScheduler(); + timeoutSchedulerAcquired = true; + StreamingAsyncEntityProducer.acquireExecutor(); + } catch (Exception e) { + // Release any acquired resources on failure to prevent leaks + if (timeoutSchedulerAcquired) { + releaseTimeoutScheduler(); + } + // Ensure HTTP client helper is also closed on initialization failure + try { + httpClientHelper.close(); + } catch (Exception closeEx) { + // Ignore to avoid masking the original initialization failure + } + throw e; + } + } } /** @@ -232,6 +312,10 @@ public String getDefaultDatabase() { * Frees the resources associated with the client. *
    *
  • Shuts down the shared operation executor by calling {@code shutdownNow()}
  • + *
  • Closes the HTTP client helper
  • + *
  • Releases shared async resources (compression executor, timeout scheduler) if this + * client was using async HTTP. When the last async client is closed, these shared + * resources are gracefully shut down.
  • *
*/ @Override @@ -251,6 +335,12 @@ public void close() { if (httpClientHelper != null) { httpClientHelper.close(); } + + // Release shared async resources if this client was using them + if (usesAsyncHttp) { + StreamingAsyncEntityProducer.releaseExecutor(); + releaseTimeoutScheduler(); + } } @@ -824,6 +914,32 @@ public Builder useAsyncRequests(boolean async) { return this; } + /** + * Enables true async HTTP transport using Apache HttpClient 5 async API. + * When enabled, HTTP requests use NIO-based non-blocking I/O instead of + * blocking thread-per-request model. This provides better scalability + * under high concurrency without requiring thread pools. + * + *

Features:

+ *
    + *
  • Response bodies are streamed through a pipe, avoiding memory buffering
  • + *
  • LZ4 request compression is supported ({@code compressClientRequest})
  • + *
  • Suitable for large result sets and high-concurrency workloads
  • + *
+ * + *

Current Limitations:

+ *
    + *
  • MULTIPART: Multipart requests use sync fallback path.
  • + *
+ * + * @param useAsyncHttp - true to enable async HTTP transport + * @return this builder + */ + public Builder useAsyncHttp(boolean useAsyncHttp) { + this.configuration.put(ClientConfigProperties.USE_ASYNC_HTTP.getKey(), String.valueOf(useAsyncHttp)); + return this; + } + /** * Sets an executor for running operations. If async operations are enabled and no executor is specified * client will create a default executor. @@ -1424,6 +1540,12 @@ public CompletableFuture insert(String tableName, throw new IllegalArgumentException("Buffer size must be greater than 0"); } + // Use async path for InputStream-based inserts when async is enabled + if (httpClientHelper.isAsyncEnabled()) { + CompletableFuture future = executeInsertAsync(tableName, columnNames, data, format, settings); + return applyAsyncTimeout(future, settings.getNetworkTimeout()); + } + return insert(tableName, columnNames, new DataStreamWriter() { @Override public void onOutput(OutputStream out) throws IOException { @@ -1443,6 +1565,85 @@ public void onRetry() throws IOException { format, settings); } + /** + * Executes an async insert operation using the async HTTP client. + * + *

IMPORTANT: Unlike the synchronous insert path, this async implementation + * does NOT support automatic retry on 503 Service Unavailable responses. The synchronous + * path retries on 503 and retryable failures (invoking onRetry()/data.reset()), but + * async inserts with InputStreams cannot reliably support retry because streams are + * not always resettable.

+ * + *

If you require retry semantics for insert operations, use the synchronous client + * (set useAsyncHttp(false)) or implement retry logic in your application code with + * a resettable data source.

+ */ + private CompletableFuture executeInsertAsync(String tableName, + List columnNames, + InputStream data, + ClickHouseFormat format, + InsertSettings settings) { + final InsertSettings requestSettings = new InsertSettings(buildRequestSettings(settings.getAllSettings())); + requestSettings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format); + + String operationId = requestSettings.getOperationId(); + ClientStatisticsHolder clientStats = operationId != null ? globalClientStats.remove(operationId) : null; + if (clientStats == null) { + clientStats = new ClientStatisticsHolder(); + } + clientStats.start(ClientMetrics.OP_DURATION); + final ClientStatisticsHolder finalClientStats = clientStats; + + // Build INSERT statement + StringBuilder sqlStmt = new StringBuilder("INSERT INTO ").append(tableName); + if (columnNames != null && !columnNames.isEmpty()) { + sqlStmt.append(" ("); + for (String columnName : columnNames) { + sqlStmt.append(columnName).append(", "); + } + sqlStmt.setLength(sqlStmt.length() - 2); + sqlStmt.append(")"); + } + sqlStmt.append(" FORMAT ").append(format.name()); + requestSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt.toString()); + + if (requestSettings.getQueryId() == null && queryIdGenerator != null) { + requestSettings.setQueryId(queryIdGenerator.get()); + } + + final Endpoint selectedEndpoint = getNextAliveNode(); + + return httpClientHelper.executeInsertAsyncStreaming(selectedEndpoint, requestSettings.getAllSettings(), data) + .thenApply(response -> { + try { + // Check for 503 Service Unavailable - async inserts don't support retry + if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + throw new java.util.concurrent.CompletionException( + new ServerException(ServerException.CODE_UNKNOWN, + "Service Unavailable - async inserts do not support automatic retry", + HttpStatus.SC_SERVICE_UNAVAILABLE, requestSettings.getQueryId())); + } + + OperationMetrics metrics = new OperationMetrics(finalClientStats); + String summary = HttpAPIClientHelper.getHeaderVal( + response.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}"); + ProcessParser.parseSummary(summary, metrics); + String queryId = HttpAPIClientHelper.getHeaderVal( + response.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId()); + metrics.setQueryId(queryId); + metrics.operationComplete(); + + return new InsertResponse(metrics); + } finally { + try { + response.close(); + } catch (IOException e) { + LOG.debug("Error closing insert response", e); + } + } + }); + } + /** * Does an insert request to a server. Data is pushed when a {@link DataStreamWriter#onOutput(OutputStream)} is called. * @@ -1508,7 +1709,7 @@ public CompletableFuture insert(String tableName, for (String columnName : columnNames) { sqlStmt.append(columnName).append(", "); } - sqlStmt.deleteCharAt(sqlStmt.length() - 2); + sqlStmt.setLength(sqlStmt.length() - 2); sqlStmt.append(")"); } sqlStmt.append(" FORMAT ").append(format.name()); @@ -1645,6 +1846,14 @@ public CompletableFuture query(String sqlQuery, Map future = executeQueryAsync(sqlQuery, requestSettings, clientStats, 0); + return applyAsyncTimeout(future, requestSettings.getNetworkTimeout()); + } + Supplier responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1653,7 +1862,6 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map executeQueryAsync(String sqlQuery, + QuerySettings requestSettings, + ClientStatisticsHolder clientStats, + int attempt) { + final long startTime = System.nanoTime(); + final Endpoint selectedEndpoint = getNextAliveNode(); + + return httpClientHelper.executeRequestAsyncStreaming(selectedEndpoint, requestSettings.getAllSettings(), sqlQuery) + .handle((response, ex) -> { + if (ex != null) { + Throwable cause = ex instanceof java.util.concurrent.CompletionException ? ex.getCause() : ex; + String msg = requestExMsg("Query", (attempt + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + RuntimeException wrappedException = (cause instanceof Exception) + ? httpClientHelper.wrapException(msg, (Exception) cause, requestSettings.getQueryId()) + : new RuntimeException(msg, cause); + + if (httpClientHelper.shouldRetry(cause, requestSettings.getAllSettings()) && attempt < retries) { + LOG.warn("Async query failed, retrying (attempt {}): {}", attempt + 1, cause.getMessage()); + return new AsyncRetryMarker(attempt + 1); + } + throw new java.util.concurrent.CompletionException(wrappedException); + } + + if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + if (attempt < retries) { + LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", + response.getCode(), durationSince(startTime)); + // Close the streaming response before retrying to avoid resource leaks + try { + response.close(); + } catch (Exception closeEx) { + LOG.debug("Failed to close streaming response before retry", closeEx); + } + return new AsyncRetryMarker(attempt + 1); + } else { + String msg = requestExMsg("Query", (attempt + 1), + durationSince(startTime).toMillis(), requestSettings.getQueryId()); + IOException cause = new IOException("Failed to get response. Server returned HTTP 503 (Service Unavailable)."); + RuntimeException wrappedException = httpClientHelper.wrapException( + msg, cause, requestSettings.getQueryId()); + // Close the streaming response before completing exceptionally + try { + response.close(); + } catch (Exception closeEx) { + LOG.debug("Failed to close streaming response after retries exhausted", closeEx); + } + throw new java.util.concurrent.CompletionException(wrappedException); + } + } + + OperationMetrics metrics = new OperationMetrics(clientStats); + String summary = HttpAPIClientHelper.getHeaderVal( + response.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}"); + ProcessParser.parseSummary(summary, metrics); + String queryId = HttpAPIClientHelper.getHeaderVal( + response.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId()); + metrics.setQueryId(queryId); + metrics.operationComplete(); + + Header formatHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_FORMAT); + ClickHouseFormat responseFormat = requestSettings.getFormat(); + if (formatHeader != null) { + responseFormat = ClickHouseFormat.valueOf(formatHeader.getValue()); + } + + return new QueryResponse(response, responseFormat, requestSettings, metrics); + }) + .thenCompose(result -> { + if (result instanceof AsyncRetryMarker) { + return executeQueryAsync(sqlQuery, requestSettings, clientStats, ((AsyncRetryMarker) result).nextAttempt); + } + return CompletableFuture.completedFuture((QueryResponse) result); + }); + } + + /** Marker to signal async retry without blocking .join() calls */ + private static class AsyncRetryMarker { + final int nextAttempt; + AsyncRetryMarker(int nextAttempt) { + this.nextAttempt = nextAttempt; + } + } + + /** + * Applies a timeout to an async operation if configured. + * If timeout is 0 or negative, returns the original future unchanged. + * Java 8 compatible implementation (orTimeout requires Java 9+). + * + * @param future the future to wrap with timeout + * @param timeoutMs timeout in milliseconds (0 or negative means no timeout) + * @return future with timeout applied, or original future if no timeout + */ + private CompletableFuture applyAsyncTimeout(CompletableFuture future, long timeoutMs) { + if (timeoutMs <= 0) { + return future; + } + // Java 8 compatible timeout implementation using shared scheduler + java.util.concurrent.ScheduledExecutorService scheduler = timeoutScheduler; + if (scheduler == null || scheduler.isShutdown()) { + LOG.warn("Timeout scheduler not available - timeout will not be applied"); + return future; + } + + // Wrapper future that enforces timeout and propagates cancellation + CompletableFuture resultFuture = new CompletableFuture<>(); + + java.util.concurrent.ScheduledFuture scheduled = scheduler.schedule(() -> { + if (resultFuture.isDone()) { + return; + } + // Complete the wrapper with a timeout and cancel the underlying operation + TimeoutException timeoutException = + new TimeoutException("Async operation timed out after " + timeoutMs + "ms"); + resultFuture.completeExceptionally(timeoutException); + future.cancel(true); + }, timeoutMs, TimeUnit.MILLISECONDS); + + // When the underlying future completes first, propagate its result and cancel the timeout task + future.whenComplete((value, ex) -> { + if (resultFuture.isDone()) { + return; + } + scheduled.cancel(false); + if (ex != null) { + resultFuture.completeExceptionally(ex); + } else { + resultFuture.complete(value); + } + }); + + // If callers cancel the wrapper, propagate cancellation to the underlying future + resultFuture.whenComplete((v, ex) -> { + if (resultFuture.isCancelled() && !future.isDone()) { + future.cancel(true); + } + }); + + return resultFuture; + } + public CompletableFuture query(String sqlQuery, Map queryParams) { return query(sqlQuery, queryParams, null); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java index 5a23f91ba..7e38db611 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java @@ -57,6 +57,14 @@ public enum ClientConfigProperties { ASYNC_OPERATIONS("async", Boolean.class, "false"), + /** + * Enables true async HTTP transport using Apache HttpClient 5 async API. + * When enabled, HTTP requests use NIO-based non-blocking I/O instead of + * blocking thread-per-request model. This provides better scalability + * under high concurrency. + */ + USE_ASYNC_HTTP("use_async_http", Boolean.class, "false"), + CONNECTION_TTL("connection_ttl", Long.class, "-1"), CONNECTION_TIMEOUT("connection_timeout", Long.class), diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 31cdeff3f..6cab1524e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -18,23 +18,32 @@ import net.jpountz.lz4.LZ4Factory; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.hc.client5.http.ConnectTimeoutException; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; import org.apache.hc.client5.http.entity.mime.MultipartPartBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.io.ManagedHttpClientConnection; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.socket.ConnectionSocketFactory; import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ConnectionRequestTimeoutException; @@ -45,22 +54,32 @@ import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.message.BasicHttpRequest; import org.apache.hc.core5.http.NoHttpResponseException; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; import org.apache.hc.core5.http.io.SocketConfig; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.ssl.TLS; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.IOCallback; import org.apache.hc.core5.net.URIBuilder; import org.apache.hc.core5.pool.ConnPoolControl; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,8 +115,11 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Function; @@ -116,7 +138,11 @@ public class HttpAPIClientHelper { private static final Pattern PATTERN_HEADER_VALUE_ASCII = Pattern.compile( "\\p{Graph}+(?:[ ]\\p{Graph}+)*"); + private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); + private final CloseableHttpClient httpClient; + private final CloseableHttpAsyncClient httpAsyncClient; + private final AtomicBoolean asyncClientClosed = new AtomicBoolean(false); private String proxyAuthHeaderValue; @@ -147,6 +173,26 @@ public HttpAPIClientHelper(Map configuration, Object metricsRegi } this.defaultUserAgent = buildDefaultUserAgent(); + + // Initialize async client + boolean useAsyncHttp = ClientConfigProperties.USE_ASYNC_HTTP.getOrDefault(configuration); + if (useAsyncHttp) { + try { + CloseableHttpAsyncClient asyncClient = createHttpAsyncClient(initSslContext, configuration); + asyncClient.start(); + this.httpAsyncClient = asyncClient; + LOG.info("Async HTTP client initialized and started"); + } catch (RuntimeException | Error e) { + try { + this.httpClient.close(); + } catch (IOException closeEx) { + e.addSuppressed(closeEx); + } + throw e; + } + } else { + this.httpAsyncClient = null; + } } /** @@ -339,6 +385,84 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map configuration) { + HttpAsyncClientBuilder asyncBuilder = HttpAsyncClients.custom(); + + SSLContext sslContext = initSslContext ? createSSLContext(configuration) : null; + + IOReactorConfig.Builder ioReactorBuilder = IOReactorConfig.custom(); + ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.applyIfSet(configuration, + (t) -> ioReactorBuilder.setSoTimeout(Timeout.ofMilliseconds(t))); + ClientConfigProperties.SOCKET_RCVBUF_OPT.applyIfSet(configuration, + ioReactorBuilder::setRcvBufSize); + ClientConfigProperties.SOCKET_SNDBUF_OPT.applyIfSet(configuration, + ioReactorBuilder::setSndBufSize); + ClientConfigProperties.SOCKET_LINGER_OPT.applyIfSet(configuration, + (v) -> ioReactorBuilder.setSoLinger(TimeValue.ofSeconds(v))); + ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT.applyIfSet(configuration, + ioReactorBuilder::setTcpNoDelay); + asyncBuilder.setIOReactorConfig(ioReactorBuilder.build()); + + PoolingAsyncClientConnectionManagerBuilder connMgrBuilder = PoolingAsyncClientConnectionManagerBuilder.create(); + connMgrBuilder.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX); + + ConnectionReuseStrategy connectionReuseStrategy = ClientConfigProperties.CONNECTION_REUSE_STRATEGY.getOrDefault(configuration); + switch (connectionReuseStrategy) { + case LIFO: + connMgrBuilder.setConnPoolPolicy(PoolReusePolicy.LIFO); + break; + case FIFO: + connMgrBuilder.setConnPoolPolicy(PoolReusePolicy.FIFO); + break; + default: + throw new ClientMisconfigurationException("Unknown connection reuse strategy: " + connectionReuseStrategy); + } + + connMgrBuilder.setMaxConnTotal(Integer.MAX_VALUE); + ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.applyIfSet(configuration, connMgrBuilder::setMaxConnPerRoute); + connMgrBuilder.setDefaultConnectionConfig(createConnectionConfig(configuration)); + + if (sslContext != null) { + TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslContext) + .setTlsVersions(TLS.V_1_2, TLS.V_1_3) + .build(); + connMgrBuilder.setTlsStrategy(tlsStrategy); + } + + PoolingAsyncClientConnectionManager asyncConnMgr = connMgrBuilder.build(); + asyncBuilder.setConnectionManager(asyncConnMgr); + + // Register metrics for async connection pool + if (metricsRegistry != null) { + try { + String mGroupName = ClientConfigProperties.METRICS_GROUP_NAME.getOrDefault(configuration); + Class micrometerLoader = getClass().getClassLoader().loadClass("com.clickhouse.client.api.metrics.MicrometerLoader"); + Method applyMethod = micrometerLoader.getDeclaredMethod("applyAsyncPoolingMetricsBinder", + Object.class, String.class, PoolingAsyncClientConnectionManager.class); + applyMethod.invoke(micrometerLoader, metricsRegistry, mGroupName, asyncConnMgr); + } catch (Exception e) { + LOG.error("Failed to register async connection pool metrics", e); + } + } + + String proxyHost = (String) configuration.get(ClientConfigProperties.PROXY_HOST.getKey()); + Integer proxyPort = (Integer) configuration.get(ClientConfigProperties.PROXY_PORT.getKey()); + String proxyTypeVal = (String) configuration.get(ClientConfigProperties.PROXY_TYPE.getKey()); + ProxyType proxyType = proxyTypeVal == null ? null : ProxyType.valueOf(proxyTypeVal); + + if (proxyType == ProxyType.HTTP && proxyHost != null && proxyPort != null) { + asyncBuilder.setProxy(new HttpHost(proxyHost, proxyPort)); + } + + boolean disableCookies = !((Boolean) ClientConfigProperties.HTTP_SAVE_COOKIES.getOrDefault(configuration)); + if (disableCookies) { + asyncBuilder.disableCookieManagement(); + } + + return asyncBuilder.build(); + } + // private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:"; private static final String ERROR_CODE_PREFIX_PATTERN = "%d. DB::Exception:"; @@ -480,6 +604,478 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r return doPostRequest(requestConfig, req); } + /** + * Executes an HTTP request asynchronously. Buffers entire response body in memory. + * For large result sets, use the streaming sync API instead. + */ + public CompletableFuture executeRequestAsync(Endpoint server, + Map requestConfig, + String body) { + if (httpAsyncClient == null) { + throw new ClientException("Async HTTP client is not enabled. Set USE_ASYNC_HTTP to true."); + } + + final URI uri = createRequestURI(server, requestConfig, true); + final SimpleHttpRequest request = createSimpleHttpRequest(uri, requestConfig, body); + + CompletableFuture future = new CompletableFuture<>(); + + Future httpFuture = httpAsyncClient.execute(request, new FutureCallback() { + @Override + public void completed(SimpleHttpResponse response) { + try { + if (response.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { + future.completeExceptionally(new ClientMisconfigurationException( + "Proxy authentication required. Please check your proxy settings.")); + return; + } else if (response.getCode() == HttpStatus.SC_BAD_GATEWAY) { + future.completeExceptionally(new ClientException( + "Server returned '502 Bad gateway'. Check network and proxy settings.")); + return; + } else if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + // Return 503 normally - let caller handle retry logic + future.complete(response); + return; + } else if (response.getCode() >= HttpStatus.SC_BAD_REQUEST || + response.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { + future.completeExceptionally(readErrorFromAsyncResponse(response)); + return; + } + future.complete(response); + } catch (Exception e) { + future.completeExceptionally(e); + } + } + + @Override + public void failed(Exception ex) { + LOG.debug("Async request failed to '{}': {}", uri, ex.getMessage(), ex); + future.completeExceptionally(ex); + } + + @Override + public void cancelled() { + future.cancel(true); + } + }); + + // Propagate cancellation to the underlying HTTP request + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + httpFuture.cancel(true); + } + }); + + return future; + } + + /** + * Executes an HTTP request asynchronously with streaming response. + * Response body is streamed through a PipedInputStream, avoiding memory buffering. + * Suitable for large result sets. + * + *

IMPORTANT: The returned future completes as soon as HTTP headers are received, + * NOT when all data has been transferred. This allows the caller to start reading + * from the stream immediately, preventing deadlock.

+ */ + public CompletableFuture executeRequestAsyncStreaming( + Endpoint server, + Map requestConfig, + String body) { + if (httpAsyncClient == null) { + throw new ClientException("Async HTTP client is not enabled. Set USE_ASYNC_HTTP to true."); + } + + final URI uri = createRequestURI(server, requestConfig, true); + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + + // Apply compression if configured (acceptable for queries which are small payloads) + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + + if (clientCompression && !appCompressedData) { + int bufferSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); + bodyBytes = compressLZ4(bodyBytes, useHttpCompression, bufferSize); + LOG.debug("Async request compressed: {} -> {} bytes", body.length(), bodyBytes.length); + } + + BasicHttpRequest request = new BasicHttpRequest("POST", uri); + addHeadersToRequest(request, requestConfig); + + AsyncEntityProducer entityProducer = new BasicAsyncEntityProducer(bodyBytes, CONTENT_TYPE); + AsyncRequestProducer requestProducer = new BasicRequestProducer(request, entityProducer); + + StreamingAsyncResponseConsumer responseConsumer = new StreamingAsyncResponseConsumer(); + + CompletableFuture future = new CompletableFuture<>(); + + // Complete future when headers arrive (via headersFuture), NOT when stream ends. + // This prevents deadlock: user can start reading while NIO thread writes. + responseConsumer.getHeadersFuture().whenComplete((response, headerEx) -> { + if (headerEx != null) { + future.completeExceptionally(headerEx); + return; + } + + try { + if (response.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { + closeStreamingResponse(response); + future.completeExceptionally(new ClientMisconfigurationException( + "Proxy authentication required. Please check your proxy settings.")); + } else if (response.getCode() == HttpStatus.SC_BAD_GATEWAY) { + closeStreamingResponse(response); + future.completeExceptionally(new ClientException( + "Server returned '502 Bad gateway'. Check network and proxy settings.")); + } else if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + // Return 503 normally - let caller handle retry logic + future.complete(response); + } else if (response.getCode() >= HttpStatus.SC_BAD_REQUEST || + response.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { + // readErrorFromStreamingResponse closes the response + future.completeExceptionally(readErrorFromStreamingResponse(response)); + } else { + future.complete(response); + } + } catch (Exception e) { + closeStreamingResponse(response); + future.completeExceptionally(e); + } + }); + + Future httpFuture = httpAsyncClient.execute( + requestProducer, responseConsumer, new FutureCallback() { + @Override + public void completed(StreamingAsyncResponseConsumer.StreamingResponse response) { + // Stream has ended. Future should already be completed via headersFuture. + LOG.debug("Async streaming request completed for '{}'", uri); + } + + @Override + public void failed(Exception ex) { + LOG.debug("Async streaming request failed to '{}': {}", uri, ex.getMessage(), ex); + // Complete future exceptionally if not already done + future.completeExceptionally(ex); + } + + @Override + public void cancelled() { + future.cancel(true); + } + }); + + // Propagate cancellation to the underlying HTTP request + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + httpFuture.cancel(true); + } + }); + + return future; + } + + private Exception readErrorFromStreamingResponse(StreamingAsyncResponseConsumer.StreamingResponse response) { + try { + InputStream is = response.getInputStream(); + byte[] errorBytes = new byte[ERROR_BODY_BUFFER_SIZE]; + int bytesRead = is.read(errorBytes, 0, ERROR_BODY_BUFFER_SIZE); + String errorBody = bytesRead > 0 ? new String(errorBytes, 0, bytesRead, StandardCharsets.UTF_8) : ""; + + int errorCode = getHeaderVal(response.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), + 0, Integer::parseInt); + + Header queryIdHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + String queryId = queryIdHeader != null ? queryIdHeader.getValue() : null; + + return new ServerException(errorCode, errorBody, response.getCode(), queryId); + } catch (Exception e) { + return new ClientException("Failed to read error response", e); + } finally { + try { + response.close(); + } catch (IOException e) { + LOG.debug("Failed to close streaming response after reading error", e); + } + } + } + + private void closeStreamingResponse(StreamingAsyncResponseConsumer.StreamingResponse response) { + try { + response.close(); + } catch (IOException e) { + LOG.debug("Failed to close streaming response", e); + } + } + + /** + * Compresses data using LZ4 compression. + * + * @param data the uncompressed data + * @param useHttpCompression if true, uses framed LZ4 (HTTP Content-Encoding compatible); + * if false, uses ClickHouse native LZ4 format + * @param bufferSize buffer size for compression + * @return compressed data + */ + private byte[] compressLZ4(byte[] data, boolean useHttpCompression, int bufferSize) { + try { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + OutputStream compressingStream; + + if (useHttpCompression) { + compressingStream = new org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream(baos); + } else { + compressingStream = new ClickHouseLZ4OutputStream(baos, lz4Factory.fastCompressor(), bufferSize); + } + + try { + compressingStream.write(data); + } finally { + compressingStream.close(); + } + + return baos.toByteArray(); + } catch (IOException e) { + throw new ClientException("Failed to compress request data", e); + } + } + + /** + * Executes an async insert request with streaming body and optional compression. + * Data is streamed from the InputStream with on-the-fly compression, avoiding + * buffering the entire payload in memory. + * + * @param server target endpoint + * @param requestConfig request configuration + * @param dataStream input stream containing data to insert + * @return future that completes when headers are received (streaming continues in background) + */ + public CompletableFuture executeInsertAsyncStreaming( + Endpoint server, + Map requestConfig, + InputStream dataStream) { + if (httpAsyncClient == null) { + throw new ClientException("Async HTTP client is not enabled. Set USE_ASYNC_HTTP to true."); + } + + final URI uri = createRequestURI(server, requestConfig, true); + + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + int compressionBufferSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); + + boolean shouldCompress = clientCompression && !appCompressedData; + + BasicHttpRequest request = new BasicHttpRequest("POST", uri); + addHeadersToRequest(request, requestConfig); + + StreamingAsyncEntityProducer entityProducer = new StreamingAsyncEntityProducer( + dataStream, CONTENT_TYPE, + shouldCompress, useHttpCompression, + compressionBufferSize, lz4Factory); + + AsyncRequestProducer requestProducer = new BasicRequestProducer(request, entityProducer); + StreamingAsyncResponseConsumer responseConsumer = new StreamingAsyncResponseConsumer(); + + CompletableFuture future = new CompletableFuture<>(); + + responseConsumer.getHeadersFuture().whenComplete((response, headerEx) -> { + if (headerEx != null) { + future.completeExceptionally(headerEx); + return; + } + + try { + if (response.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { + closeStreamingResponse(response); + future.completeExceptionally(new ClientMisconfigurationException( + "Proxy authentication required. Please check your proxy settings.")); + } else if (response.getCode() == HttpStatus.SC_BAD_GATEWAY) { + closeStreamingResponse(response); + future.completeExceptionally(new ClientException( + "Server returned '502 Bad gateway'. Check network and proxy settings.")); + } else if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + // Return 503 normally - let caller handle retry logic + future.complete(response); + } else if (response.getCode() >= HttpStatus.SC_BAD_REQUEST || + response.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { + // readErrorFromStreamingResponse closes the response + future.completeExceptionally(readErrorFromStreamingResponse(response)); + } else { + future.complete(response); + } + } catch (Exception e) { + closeStreamingResponse(response); + future.completeExceptionally(e); + } + }); + + Future httpFuture = httpAsyncClient.execute( + requestProducer, responseConsumer, new FutureCallback() { + @Override + public void completed(StreamingAsyncResponseConsumer.StreamingResponse response) { + LOG.debug("Async insert request completed for '{}'", uri); + } + + @Override + public void failed(Exception ex) { + LOG.debug("Async insert request failed to '{}': {}", uri, ex.getMessage(), ex); + future.completeExceptionally(ex); + } + + @Override + public void cancelled() { + future.cancel(true); + } + }); + + // Propagate cancellation to the underlying HTTP request + future.whenComplete((result, ex) -> { + if (future.isCancelled()) { + httpFuture.cancel(true); + } + }); + + return future; + } + + private SimpleHttpRequest createSimpleHttpRequest(URI uri, Map requestConfig, String body) { + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + + // Apply compression if configured + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + + if (clientCompression && !appCompressedData) { + int bufferSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); + bodyBytes = compressLZ4(bodyBytes, useHttpCompression, bufferSize); + LOG.debug("Async simple request compressed: {} -> {} bytes", body.length(), bodyBytes.length); + } + + SimpleRequestBuilder builder = SimpleRequestBuilder.post(uri) + .setBody(bodyBytes, CONTENT_TYPE); + addHeadersToSimpleRequest(builder, requestConfig); + return builder.build(); + } + + private void addHeadersToSimpleRequest(SimpleRequestBuilder builder, Map requestConfig) { + builder.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); + + if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) { + builder.setHeader(ClickHouseHttpProto.HEADER_FORMAT, + ((ClickHouseFormat) requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())).name()); + } + if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { + builder.setHeader(ClickHouseHttpProto.HEADER_QUERY_ID, + (String) requestConfig.get(ClientConfigProperties.QUERY_ID.getKey())); + } + builder.setHeader(ClickHouseHttpProto.HEADER_DATABASE, + ClientConfigProperties.DATABASE.getOrDefault(requestConfig)); + + // Check if custom Authorization header is set + String customAuthHeaderKey = ClientConfigProperties.HTTP_HEADER_PREFIX + HttpHeaders.AUTHORIZATION; + boolean hasCustomAuth = requestConfig.containsKey(customAuthHeaderKey) && + requestConfig.get(customAuthHeaderKey) != null; + + if (ClientConfigProperties.SSL_AUTH.getOrDefault(requestConfig).booleanValue()) { + if (!hasCustomAuth) { + builder.setHeader(ClickHouseHttpProto.HEADER_DB_USER, + ClientConfigProperties.USER.getOrDefault(requestConfig)); + } + builder.setHeader(ClickHouseHttpProto.HEADER_SSL_CERT_AUTH, "on"); + } else if (ClientConfigProperties.HTTP_USE_BASIC_AUTH.getOrDefault(requestConfig).booleanValue()) { + String user = ClientConfigProperties.USER.getOrDefault(requestConfig); + String password = ClientConfigProperties.PASSWORD.getOrDefault(requestConfig); + builder.addHeader(HttpHeaders.AUTHORIZATION, + "Basic " + Base64.getEncoder().encodeToString( + (user + ":" + password).getBytes(StandardCharsets.UTF_8))); + } else if (!hasCustomAuth) { + // Only set CH auth headers if no custom Authorization header is provided + builder.setHeader(ClickHouseHttpProto.HEADER_DB_USER, + ClientConfigProperties.USER.getOrDefault(requestConfig)); + String password = ClientConfigProperties.PASSWORD.getOrDefault(requestConfig); + if (password != null && !password.isEmpty()) { + builder.setHeader(ClickHouseHttpProto.HEADER_DB_PASSWORD, password); + } + } + + if (proxyAuthHeaderValue != null) { + builder.addHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthHeaderValue); + } + + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + if (useHttpCompression) { + if (serverCompression) { + builder.setHeader(HttpHeaders.ACCEPT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); + } + if (clientCompression && !appCompressedData) { + builder.setHeader(HttpHeaders.CONTENT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); + } + } + + for (String key : requestConfig.keySet()) { + if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) { + Object val = requestConfig.get(key); + if (val != null) { + builder.setHeader(key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), + String.valueOf(val)); + } + } + } + + String clientName = ClientConfigProperties.CLIENT_NAME.getOrDefault(requestConfig); + String userAgentValue = defaultUserAgent; + if (clientName != null && !clientName.isEmpty()) { + userAgentValue = clientName + " " + defaultUserAgent; + } + builder.setHeader(HttpHeaders.USER_AGENT, userAgentValue); + } + + private Exception readErrorFromAsyncResponse(SimpleHttpResponse response) { + Header qIdHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final String queryId = qIdHeader == null ? "" : qIdHeader.getValue(); + Header codeHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE); + int serverCode = 0; + if (codeHeader != null) { + try { + serverCode = Integer.parseInt(codeHeader.getValue()); + } catch (NumberFormatException nfe) { + LOG.warn("Failed to parse exception code header value '{}' as integer; using 0 instead", + codeHeader.getValue()); + serverCode = 0; + } + } + + String msg; + try { + byte[] bodyBytes = response.getBodyBytes(); + if (bodyBytes != null && bodyBytes.length > 0) { + String bodyText = new String(bodyBytes, StandardCharsets.UTF_8); + msg = bodyText.replaceAll("\\s+", " ").replaceAll("\\\\n", " ").replaceAll("\\\\/", "/"); + if (msg.trim().isEmpty()) { + msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + + " (transport error: " + response.getCode() + ")"; + } + } else { + msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + + " (transport error: " + response.getCode() + ")"; + } + } catch (Exception e) { + LOG.error("Failed to read error message from async response", e); + msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + + " (transport error: " + response.getCode() + ")"; + } + return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", response.getCode(), queryId); + } + + public boolean isAsyncEnabled() { + return httpAsyncClient != null; + } + public ClassicHttpResponse executeMultiPartRequest(Endpoint server, Map requestConfig, String sqlQuery) throws Exception { requestConfig.put(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), false); @@ -566,8 +1162,6 @@ public static void closeQuietly(ClassicHttpResponse httpResponse) { } } - private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); - private void addHeaders(HttpPost req, Map requestConfig) { setHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) { @@ -661,6 +1255,69 @@ private void addHeaders(HttpPost req, Map requestConfig) { correctUserAgentHeader(req, requestConfig); } + private void addHeadersToRequest(HttpRequest req, Map requestConfig) { + setHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); + if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) { + setHeader(req, ClickHouseHttpProto.HEADER_FORMAT, + ((ClickHouseFormat) requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())).name()); + } + if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { + setHeader(req, ClickHouseHttpProto.HEADER_QUERY_ID, + (String) requestConfig.get(ClientConfigProperties.QUERY_ID.getKey())); + } + setHeader(req, ClickHouseHttpProto.HEADER_DATABASE, ClientConfigProperties.DATABASE.getOrDefault(requestConfig)); + + if (ClientConfigProperties.SSL_AUTH.getOrDefault(requestConfig).booleanValue()) { + setHeader(req, ClickHouseHttpProto.HEADER_DB_USER, ClientConfigProperties.USER.getOrDefault(requestConfig)); + setHeader(req, ClickHouseHttpProto.HEADER_SSL_CERT_AUTH, "on"); + } else if (ClientConfigProperties.HTTP_USE_BASIC_AUTH.getOrDefault(requestConfig).booleanValue()) { + String user = ClientConfigProperties.USER.getOrDefault(requestConfig); + String password = ClientConfigProperties.PASSWORD.getOrDefault(requestConfig); + req.addHeader(HttpHeaders.AUTHORIZATION, + "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes(StandardCharsets.UTF_8))); + } else { + setHeader(req, ClickHouseHttpProto.HEADER_DB_USER, ClientConfigProperties.USER.getOrDefault(requestConfig)); + setHeader(req, ClickHouseHttpProto.HEADER_DB_PASSWORD, ClientConfigProperties.PASSWORD.getOrDefault(requestConfig)); + } + + if (proxyAuthHeaderValue != null) { + req.addHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthHeaderValue); + } + + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + + if (useHttpCompression && serverCompression) { + setHeader(req, HttpHeaders.ACCEPT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); + } + if (useHttpCompression && clientCompression && !appCompressedData) { + setHeader(req, HttpHeaders.CONTENT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); + } + + for (String key : requestConfig.keySet()) { + if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) { + Object val = requestConfig.get(key); + if (val != null) { + setHeader(req, key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), String.valueOf(val)); + } + } + } + + // Special cases - match sync addHeaders behavior + if (req.containsHeader(HttpHeaders.AUTHORIZATION) + && (req.containsHeader(ClickHouseHttpProto.HEADER_DB_USER) || + req.containsHeader(ClickHouseHttpProto.HEADER_DB_PASSWORD))) + { + // user has set auth header for purpose, lets remove ours + req.removeHeaders(ClickHouseHttpProto.HEADER_DB_USER); + req.removeHeaders(ClickHouseHttpProto.HEADER_DB_PASSWORD); + } + + correctUserAgentHeader(req, requestConfig); + } + private void addRequestParams(Map requestConfig, BiConsumer consumer) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { consumer.accept(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); @@ -761,7 +1418,12 @@ public static T getHeaderVal(Header header, T defaultValue, Function requestSettings) { @@ -890,6 +1552,21 @@ private String buildDefaultUserAgent() { public void close() { httpClient.close(CloseMode.IMMEDIATE); + + // Close async client with graceful shutdown if it was initialized + if (httpAsyncClient != null && asyncClientClosed.compareAndSet(false, true)) { + try { + httpAsyncClient.close(CloseMode.GRACEFUL); + LOG.debug("Async HTTP client closed gracefully"); + } catch (Exception e) { + LOG.warn("Failed to close async HTTP client gracefully, forcing immediate close", e); + try { + httpAsyncClient.close(CloseMode.IMMEDIATE); + } catch (Exception e2) { + LOG.error("Failed to close async HTTP client", e2); + } + } + } } private static void setHeader(HttpRequest req, String headerName, diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java new file mode 100644 index 000000000..99cd85915 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java @@ -0,0 +1,315 @@ +package com.clickhouse.client.api.internal; + +import net.jpountz.lz4.LZ4Factory; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Async entity producer that streams data from an InputStream with optional LZ4 compression. + * Supports on-the-fly compression without buffering the entire payload in memory. + * + *

For compression, data flows: User InputStream → Compression → PipedStream → NIO output

+ */ +public class StreamingAsyncEntityProducer implements AsyncEntityProducer { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingAsyncEntityProducer.class); + private static final int DEFAULT_BUFFER_SIZE = 8 * 1024; // 8KB read buffer + private static final int PIPE_BUFFER_SIZE = 512 * 1024; // 512KB pipe buffer + private static final AtomicLong THREAD_COUNTER = new AtomicLong(0); // For unique thread names + + /** + * Shared thread pool for compression tasks - bounded to prevent thread explosion under high concurrency. + * Uses daemon threads so it won't prevent JVM shutdown if shutdown is not called. + * + *

Lifecycle management: Call {@link #acquireExecutor()} when creating an async client and + * {@link #releaseExecutor()} when closing it. The pool is lazily created on first acquire and + * shut down when the last client releases it.

+ */ + private static final int COMPRESSION_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors()); + private static final Object EXECUTOR_LOCK = new Object(); + private static final AtomicInteger EXECUTOR_REF_COUNT = new AtomicInteger(0); + private static volatile ExecutorService compressionExecutor = null; + + /** + * Acquires a reference to the shared compression executor. + * Call this when creating an async HTTP client that may use compression. + * Must be paired with a call to {@link #releaseExecutor()} when the client is closed. + * + *

Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion + * with releaseExecutor(). No race condition exists because a thread cannot enter + * releaseExecutor() while another is in acquireExecutor() (and vice versa).

+ */ + public static void acquireExecutor() { + synchronized (EXECUTOR_LOCK) { + if (EXECUTOR_REF_COUNT.getAndIncrement() == 0) { + compressionExecutor = createCompressionExecutor(); + LOG.debug("Created compression executor pool"); + } + } + } + + /** + * Releases a reference to the shared compression executor. + * When the last reference is released, the executor is gracefully shut down. + * + *

Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion + * with acquireExecutor(). The synchronized block guarantees that between checking the + * ref count and shutting down, no other thread can acquire a new reference.

+ */ + public static void releaseExecutor() { + synchronized (EXECUTOR_LOCK) { + if (EXECUTOR_REF_COUNT.decrementAndGet() == 0 && compressionExecutor != null) { + LOG.debug("Shutting down compression executor pool"); + compressionExecutor.shutdown(); + try { + if (!compressionExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + compressionExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + compressionExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + compressionExecutor = null; + } + } + } + + private static ExecutorService createCompressionExecutor() { + return new ThreadPoolExecutor( + COMPRESSION_POOL_SIZE, + COMPRESSION_POOL_SIZE, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1000), // Bounded queue to provide backpressure + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "ch-async-compress-" + THREAD_COUNTER.incrementAndGet()); + t.setDaemon(true); + return t; + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() // If queue full, run in caller thread (backpressure) + ); + } + + private static ExecutorService getExecutor() { + ExecutorService exec = compressionExecutor; + if (exec == null || exec.isShutdown()) { + // Fallback: if executor not acquired properly, create inline (caller's thread) + // This handles edge cases but logs a warning + LOG.warn("Compression executor not acquired - compression will run in caller thread"); + return null; + } + return exec; + } + + private final ContentType contentType; + private final InputStream sourceStream; + private final boolean compressData; + private final boolean useHttpCompression; + private final int compressionBufferSize; + private final LZ4Factory lz4Factory; + + private final ByteBuffer readBuffer; + private final AtomicBoolean completed = new AtomicBoolean(false); + private final AtomicReference error = new AtomicReference<>(); + + // For compression: compress in thread pool, read compressed data here + private PipedInputStream compressedInputStream; + private InputStream activeInputStream; + + public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType) { + this(sourceStream, contentType, false, false, 0, null); + } + + public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType, + boolean compressData, boolean useHttpCompression, + int compressionBufferSize, LZ4Factory lz4Factory) { + this.sourceStream = sourceStream; + this.contentType = contentType; + this.compressData = compressData; + this.useHttpCompression = useHttpCompression; + this.compressionBufferSize = compressionBufferSize > 0 ? compressionBufferSize : ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE; + this.lz4Factory = lz4Factory; + this.readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + this.readBuffer.flip(); // Start empty + } + + private void initializeStreams() throws IOException { + if (activeInputStream != null) { + return; // Already initialized + } + + if (compressData && lz4Factory != null) { + // Setup compression pipeline: sourceStream → compress → pipedStream → NIO + PipedOutputStream compressedOutputStream = new PipedOutputStream(); + compressedInputStream = new PipedInputStream(compressedOutputStream, PIPE_BUFFER_SIZE); + activeInputStream = compressedInputStream; + + // Submit compression task to shared thread pool (or run inline if not available) + ExecutorService executor = getExecutor(); + Runnable compressionTask = () -> { + try { + OutputStream compressingStream; + if (useHttpCompression) { + compressingStream = new org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream(compressedOutputStream); + } else { + compressingStream = new ClickHouseLZ4OutputStream(compressedOutputStream, lz4Factory.fastCompressor(), compressionBufferSize); + } + + try { + byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + int bytesRead; + while ((bytesRead = sourceStream.read(buffer)) != -1) { + compressingStream.write(buffer, 0, bytesRead); + } + } finally { + compressingStream.close(); + compressedOutputStream.close(); + } + } catch (IOException e) { + error.set(e); + try { + compressedOutputStream.close(); + } catch (IOException ignored) { + } + try { + sourceStream.close(); + } catch (IOException ignored) { + } + } + }; + + if (executor != null) { + executor.submit(compressionTask); + } else { + // Fallback: run compression in a new thread (less efficient but functional) + Thread t = new Thread(compressionTask, "ch-compress-fallback-" + THREAD_COUNTER.incrementAndGet()); + t.setDaemon(true); + t.start(); + } + } else { + // No compression - read directly from source + activeInputStream = sourceStream; + } + } + + @Override + public boolean isRepeatable() { + return false; // Streaming is not repeatable + } + + @Override + public String getContentType() { + return contentType != null ? contentType.toString() : null; + } + + @Override + public long getContentLength() { + return -1; // Unknown length for streaming + } + + @Override + public int available() { + try { + initializeStreams(); + if (readBuffer.hasRemaining()) { + return readBuffer.remaining(); + } + return activeInputStream.available(); + } catch (IOException e) { + return 0; + } + } + + @Override + public String getContentEncoding() { + return null; // Content-Encoding header is set separately + } + + @Override + public boolean isChunked() { + return true; // Always chunked for streaming + } + + @Override + public Set getTrailerNames() { + return null; + } + + @Override + public void produce(DataStreamChannel channel) throws IOException { + initializeStreams(); + + // Check for compression errors + Exception compressionError = error.get(); + if (compressionError != null) { + throw new IOException("Compression failed", compressionError); + } + + // If buffer has data, write it first + if (readBuffer.hasRemaining()) { + channel.write(readBuffer); + if (readBuffer.hasRemaining()) { + return; // Channel couldn't accept all data, will be called again + } + } + + // Read more data from stream + readBuffer.clear(); + byte[] array = readBuffer.array(); + int bytesRead = activeInputStream.read(array, 0, array.length); + + if (bytesRead == -1) { + // End of stream + completed.set(true); + channel.endStream(); + } else if (bytesRead > 0) { + readBuffer.limit(bytesRead); + channel.write(readBuffer); + } + } + + @Override + public void failed(Exception cause) { + LOG.debug("Streaming entity producer failed", cause); + error.set(cause); + releaseResources(); + } + + @Override + public void releaseResources() { + completed.set(true); + // Closing streams will cause any running compression task to fail and exit + try { + if (activeInputStream != null) { + activeInputStream.close(); + } + if (sourceStream != activeInputStream) { + sourceStream.close(); + } + } catch (IOException e) { + LOG.debug("Error closing streams", e); + } + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java new file mode 100644 index 000000000..c43bd2207 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java @@ -0,0 +1,220 @@ +package com.clickhouse.client.api.internal; + +import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Async response consumer that streams response body through a PipedInputStream. + * Data is written to PipedOutputStream as it arrives from the NIO thread, + * and can be read from the connected PipedInputStream in the user's thread. + * + *

IMPORTANT: The {@link #getHeadersFuture()} completes as soon as headers arrive, + * allowing the caller to start reading from the stream immediately. This prevents + * deadlock - the NIO thread can write while the user thread reads concurrently.

+ */ +public class StreamingAsyncResponseConsumer extends AbstractBinResponseConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingAsyncResponseConsumer.class); + private static final int DEFAULT_PIPE_SIZE = 512 * 1024; // 512KB pipe buffer + private static final int CAPACITY_INCREMENT = 8 * 1024; // 8KB chunks + + private final PipedInputStream pipedInputStream; + private final PipedOutputStream pipedOutputStream; + private final CompletableFuture headersFuture; + private final CompletableFuture streamCompleteFuture; + private final AtomicBoolean outputClosed = new AtomicBoolean(false); + + private HttpResponse response; + private ContentType contentType; + private volatile Exception streamError; + private volatile long totalBytesWritten = 0; + + // Reusable buffer to avoid GC pressure from per-chunk allocations + private final byte[] reusableBuffer = new byte[CAPACITY_INCREMENT]; + + public StreamingAsyncResponseConsumer() { + this(DEFAULT_PIPE_SIZE); + } + + public StreamingAsyncResponseConsumer(int pipeSize) { + this.pipedInputStream = new PipedInputStream(pipeSize); + this.headersFuture = new CompletableFuture<>(); + this.streamCompleteFuture = new CompletableFuture<>(); + try { + this.pipedOutputStream = new PipedOutputStream(pipedInputStream); + } catch (IOException e) { + throw new RuntimeException("Failed to create piped streams", e); + } + } + + /** + * Returns a future that completes when HTTP headers are received. + * Use this to get the response early and start reading from the stream + * before all data has arrived. This prevents deadlock. + */ + public CompletableFuture getHeadersFuture() { + return headersFuture; + } + + @Override + protected void start(HttpResponse response, ContentType contentType) throws IOException { + this.response = response; + this.contentType = contentType; + LOG.debug("Streaming response started: status={}, contentType={}", + response.getCode(), contentType); + + // Complete headers future immediately so caller can start reading + StreamingResponse streamingResponse = new StreamingResponse( + response, contentType, pipedInputStream, streamCompleteFuture); + headersFuture.complete(streamingResponse); + } + + @Override + protected int capacityIncrement() { + return CAPACITY_INCREMENT; + } + + @Override + protected void data(ByteBuffer src, boolean endOfStream) throws IOException { + if (streamError != null) { + LOG.debug("data() called but streamError already set: {}", streamError.getMessage()); + return; + } + + try { + long bytesWrittenThisCall = 0; + while (src.hasRemaining()) { + int bytesToRead = Math.min(src.remaining(), reusableBuffer.length); + src.get(reusableBuffer, 0, bytesToRead); + pipedOutputStream.write(reusableBuffer, 0, bytesToRead); + bytesWrittenThisCall += bytesToRead; + } + if (bytesWrittenThisCall > 0) { + totalBytesWritten += bytesWrittenThisCall; + pipedOutputStream.flush(); // Ensure data is immediately available to reader + LOG.debug("data() wrote {} bytes (total: {}), endOfStream={}", bytesWrittenThisCall, totalBytesWritten, endOfStream); + } + + if (endOfStream) { + LOG.debug("data() endOfStream=true, closing output. Total bytes: {}", totalBytesWritten); + closeOutputStream(); + } + } catch (IOException e) { + LOG.debug("data() IOException: {} (total bytes written: {})", e.getMessage(), totalBytesWritten); + streamError = e; + closeOutputStream(); + throw e; + } + } + + @Override + protected StreamingResponse buildResult() { + // Note: This creates a new instance each time, but all instances share the same + // underlying streams and futures. The headersFuture provides the primary response. + return new StreamingResponse(response, contentType, pipedInputStream, streamCompleteFuture); + } + + @Override + public void releaseResources() { + closeOutputStream(); + } + + @Override + public void failed(Exception cause) { + LOG.debug("Streaming response failed", cause); + streamError = cause; + closeOutputStream(); + + // Complete both futures exceptionally if not already completed + headersFuture.completeExceptionally(cause); + streamCompleteFuture.completeExceptionally(cause); + } + + private void closeOutputStream() { + if (outputClosed.compareAndSet(false, true)) { + LOG.debug("closeOutputStream() called, total bytes written: {}", totalBytesWritten); + Exception closeException = null; + try { + pipedOutputStream.close(); + } catch (IOException e) { + LOG.debug("Error closing piped output stream: {}", e.getMessage()); + closeException = e; + } + // Always complete the future, even if close() threw an exception + if (streamError != null) { + streamCompleteFuture.completeExceptionally(streamError); + LOG.debug("closeOutputStream() completed with stream error"); + } else if (closeException != null) { + streamCompleteFuture.completeExceptionally(closeException); + LOG.debug("closeOutputStream() completed with close error"); + } else { + streamCompleteFuture.complete(null); + LOG.debug("closeOutputStream() completed successfully"); + } + } else { + LOG.debug("closeOutputStream() already closed, skipping"); + } + } + + /** + * Result object containing HTTP response metadata and the streaming InputStream. + */ + public static class StreamingResponse { + private final HttpResponse httpResponse; + private final ContentType contentType; + private final InputStream inputStream; + private final CompletableFuture completeFuture; + + StreamingResponse(HttpResponse httpResponse, ContentType contentType, + InputStream inputStream, CompletableFuture completeFuture) { + this.httpResponse = httpResponse; + this.contentType = contentType; + this.inputStream = inputStream; + this.completeFuture = completeFuture; + } + + public HttpResponse getHttpResponse() { + return httpResponse; + } + + public int getCode() { + return httpResponse.getCode(); + } + + public Header getFirstHeader(String name) { + return httpResponse.getFirstHeader(name); + } + + public boolean containsHeader(String name) { + return httpResponse.containsHeader(name); + } + + public ContentType getContentType() { + return contentType; + } + + public InputStream getInputStream() { + return inputStream; + } + + public CompletableFuture getCompleteFuture() { + return completeFuture; + } + + public void close() throws IOException { + inputStream.close(); + } + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java b/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java index 00f6c0344..b77bc92e5 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java @@ -7,6 +7,7 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.core5.pool.ConnPoolControl; public class MicrometerLoader { @@ -63,4 +64,48 @@ public static void applyConnectionMetricsBinder(Object registry, String metricsG } } + /** + * Registers Micrometer metrics for the async HTTP client connection pool. + */ + public static void applyAsyncPoolingMetricsBinder(Object registry, String metricsGroupName, PoolingAsyncClientConnectionManager pacm) { + if (registry instanceof MeterRegistry) { + Iterable tags = Tags.of("httpclient", metricsGroupName + "-async"); + Gauge + .builder("httpcomponents.httpclient.pool.total.max", pacm, + (connPoolControl) -> connPoolControl.getTotalStats().getMax()) + .description("The configured maximum number of allowed persistent connections for all routes (async).") + .tags(tags) + .register((MeterRegistry) registry); + Gauge + .builder("httpcomponents.httpclient.pool.total.connections", pacm, + (connPoolControl) -> connPoolControl.getTotalStats().getAvailable()) + .description("The number of persistent and available connections for all routes (async).") + .tags(tags) + .tag("state", "available") + .register((MeterRegistry) registry); + Gauge + .builder("httpcomponents.httpclient.pool.total.connections", pacm, + (connPoolControl) -> connPoolControl.getTotalStats().getLeased()) + .description("The number of persistent and leased connections for all routes (async).") + .tags(tags) + .tag("state", "leased") + .register((MeterRegistry) registry); + Gauge + .builder("httpcomponents.httpclient.pool.total.pending", pacm, + (connPoolControl) -> connPoolControl.getTotalStats().getPending()) + .description("The number of connection requests being blocked awaiting a free connection for all routes (async).") + .tags(tags) + .register((MeterRegistry) registry); + Gauge + .builder("httpcomponents.httpclient.pool.route.max.default", pacm, + ConnPoolControl::getDefaultMaxPerRoute) + .description("The configured default maximum number of allowed persistent connections per route (async).") + .tags(tags) + .register((MeterRegistry) registry); + + } else { + throw new ClientMisconfigurationException("Unsupported registry type." + registry.getClass()); + } + } + } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java index 735fe6f58..167a87194 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java @@ -3,12 +3,16 @@ import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.http.ClickHouseHttpProto; +import com.clickhouse.client.api.internal.StreamingAsyncResponseConsumer; import com.clickhouse.client.api.metrics.OperationMetrics; import com.clickhouse.client.api.metrics.ServerMetrics; import com.clickhouse.data.ClickHouseFormat; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.Header; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.util.TimeZone; @@ -35,14 +39,56 @@ public class QueryResponse implements AutoCloseable { private ClassicHttpResponse httpResponse; + private byte[] bufferedResponseBody; + + private InputStream streamingInputStream; + public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, QuerySettings settings, OperationMetrics operationMetrics) { this.httpResponse = response; + this.bufferedResponseBody = null; this.format = format; this.operationMetrics = operationMetrics; this.settings = settings; - Header tzHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE); + parseServerTimezone(response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE)); + } + + /** + * Constructor for async responses. Buffers entire response body in memory. + * For large result sets, use the streaming async API or sync API instead. + */ + public QueryResponse(SimpleHttpResponse response, ClickHouseFormat format, QuerySettings settings, + OperationMetrics operationMetrics) { + this.httpResponse = null; + // getBodyBytes() can return null for empty responses + byte[] bodyBytes = response.getBodyBytes(); + this.bufferedResponseBody = bodyBytes != null ? bodyBytes : new byte[0]; + this.streamingInputStream = null; + this.format = format; + this.operationMetrics = operationMetrics; + this.settings = settings; + + parseServerTimezone(response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE)); + } + + /** + * Constructor for streaming async responses. Response body is streamed through a pipe, + * avoiding memory buffering. Suitable for large result sets. + */ + public QueryResponse(StreamingAsyncResponseConsumer.StreamingResponse response, ClickHouseFormat format, + QuerySettings settings, OperationMetrics operationMetrics) { + this.httpResponse = null; + this.bufferedResponseBody = null; + this.streamingInputStream = response.getInputStream(); + this.format = format; + this.operationMetrics = operationMetrics; + this.settings = settings; + + parseServerTimezone(response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE)); + } + + private void parseServerTimezone(Header tzHeader) { if (tzHeader != null) { try { this.settings.setOption(ClientConfigProperties.SERVER_TIMEZONE.getKey(), @@ -55,20 +101,29 @@ public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, Quer public InputStream getInputStream() { try { + if (streamingInputStream != null) { + return streamingInputStream; + } + if (bufferedResponseBody != null) { + return new ByteArrayInputStream(bufferedResponseBody); + } return httpResponse.getEntity().getContent(); } catch (Exception e) { throw new ClientException("Failed to construct input stream", e); } } + /** + * Closes this response and releases associated resources. + * IOExceptions are propagated as-is to maintain AutoCloseable contract consistency. + */ @Override public void close() throws Exception { - if (httpResponse != null ) { - try { - httpResponse.close(); - } catch (Exception e) { - throw new ClientException("Failed to close response", e); - } + if (streamingInputStream != null) { + streamingInputStream.close(); + } + if (httpResponse != null) { + httpResponse.close(); } } diff --git a/client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java b/client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java new file mode 100644 index 000000000..1fef5e170 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java @@ -0,0 +1,994 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ServerException; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QueryResponse; +import com.clickhouse.client.api.query.QuerySettings; +import com.clickhouse.data.ClickHouseFormat; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.ConsoleNotifier; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import org.apache.hc.core5.http.HttpStatus; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; + +/** + * Tests for async HTTP transport using Apache HttpClient 5 async API. + */ +@Test(groups = {"integration"}) +public class AsyncHttpClientTests extends BaseIntegrationTest { + + /** + * Test basic async query execution with real ClickHouse server. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryBasic() { + if (isCloud()) { + return; // Skip for cloud tests + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + List records = client.queryAll("SELECT timezone()"); + Assert.assertTrue(records.size() > 0); + Assert.assertNotNull(records.get(0).getString(1)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test that async query returns the same results as sync query. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryResultsMatchSync() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String query = "SELECT number, number * 2 as doubled FROM numbers(10)"; + + // First, get results using sync client + List syncResults; + try (Client syncClient = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(false) + .build()) { + + syncResults = syncClient.queryAll(query); + } + + // Then, get results using async client + List asyncResults; + try (Client asyncClient = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + asyncResults = asyncClient.queryAll(query); + } + + // Compare results + Assert.assertEquals(asyncResults.size(), syncResults.size()); + for (int i = 0; i < syncResults.size(); i++) { + Assert.assertEquals(asyncResults.get(i).getLong(1), syncResults.get(i).getLong(1)); + Assert.assertEquals(asyncResults.get(i).getLong(2), syncResults.get(i).getLong(2)); + } + } + + /** + * Test async query with CompletableFuture composition. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryWithFutureComposition() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Chain multiple async operations + CompletableFuture resultFuture = client.query("SELECT count() FROM numbers(1000)") + .thenApply(response -> { + try { + // Read the result row count from response + return response.getResultRows(); + } finally { + try { + response.close(); + } catch (Exception e) { + // ignore + } + } + }); + + Long count = resultFuture.get(30, TimeUnit.SECONDS); + Assert.assertEquals(count.longValue(), 1L); // Query returns 1 row (count result) + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async client properly handles server errors. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryServerError() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + try { + // This should fail with a syntax error + client.query("SELECT invalid;statement").get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ServerException, + "Expected ServerException but got: " + e.getCause().getClass().getName()); + ServerException se = (ServerException) e.getCause(); + Assert.assertNotEquals(se.getCode(), 0, "Expected non-zero error code for syntax error"); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async query retry on 503 Service Unavailable using WireMock. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryRetryOn503() { + if (isCloud()) { + return; + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + int serverPort = mockServer.port(); + + // First request returns 503 (Service Unavailable) + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Retry503") + .whenScenarioStateIs(STARTED) + .willSetStateTo("Retried") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)) + .build()); + + // Second request succeeds + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Retry503") + .whenScenarioStateIs("Retried") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")) + .build()); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", serverPort, false) + .setUsername("default") + .setPassword("") + .useAsyncHttp(true) + .setMaxRetries(3) + .compressServerResponse(false) + .build()) { + + QueryResponse response = client.query("SELECT 1").get(10, TimeUnit.SECONDS); + Assert.assertEquals(response.getReadRows(), 1); + response.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + mockServer.stop(); + } + } + + /** + * Test that async client is not enabled when USE_ASYNC_HTTP is false (default). + */ + @Test(groups = {"integration"}) + public void testAsyncHttpDisabledByDefault() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + // Create client without useAsyncHttp(true) - should use sync client + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .build()) { + + // Query should still work but uses sync path + List records = client.queryAll("SELECT 1"); + Assert.assertEquals(records.size(), 1); + Assert.assertEquals(records.get(0).getString(1), "1"); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test concurrent async queries. + */ + @Test(groups = {"integration"}) + public void testConcurrentAsyncQueries() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .setMaxConnections(20) + .build()) { + + int numQueries = 10; + @SuppressWarnings("unchecked") + CompletableFuture[] futures = new CompletableFuture[numQueries]; + + // Launch all queries concurrently + for (int i = 0; i < numQueries; i++) { + final int queryNum = i; + futures[i] = client.query("SELECT " + queryNum + " as num, sleep(0.1)"); + } + + // Wait for all to complete + CompletableFuture.allOf(futures).get(60, TimeUnit.SECONDS); + + // Verify all completed successfully + for (int i = 0; i < numQueries; i++) { + QueryResponse response = futures[i].get(); + Assert.assertTrue(response.getReadRows() > 0 || response.getResultRows() > 0); + response.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async client graceful shutdown. + */ + @Test(groups = {"integration"}) + public void testAsyncClientGracefulShutdown() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + // Create and close the client multiple times to verify no resource leaks + for (int i = 0; i < 3; i++) { + Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build(); + + try { + List records = client.queryAll("SELECT 1"); + Assert.assertEquals(records.size(), 1); + } catch (Exception e) { + Assert.fail("Query failed on iteration " + i, e); + } finally { + client.close(); + } + } + } + + /** + * Test that cancellation of CompletableFuture works. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryCancellation() { + if (isCloud()) { + return; + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + int serverPort = mockServer.port(); + + // Setup a delayed response + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withFixedDelay(10000) // 10 second delay + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")) + .build()); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", serverPort, false) + .setUsername("default") + .setPassword("") + .useAsyncHttp(true) + .compressServerResponse(false) + .build()) { + + CompletableFuture future = client.query("SELECT 1"); + + // Cancel after a short delay + Thread.sleep(100); + boolean cancelled = future.cancel(true); + + // The future should be cancelled + Assert.assertTrue(cancelled, "Cancellation should return true"); + Assert.assertTrue(future.isCancelled() || future.isDone()); + + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + mockServer.stop(); + } + } + + /** + * Test async query response metrics. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryMetrics() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + try (QueryResponse response = client.query("SELECT number FROM numbers(100)").get(30, TimeUnit.SECONDS)) { + // Verify metrics are populated + Assert.assertTrue(response.getReadRows() > 0, "Expected read_rows > 0"); + Assert.assertTrue(response.getReadBytes() > 0, "Expected read_bytes > 0"); + Assert.assertNotNull(response.getQueryId(), "Expected query_id to be set"); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async query with custom query ID. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryWithCustomQueryId() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String customQueryId = "test-async-query-" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + com.clickhouse.client.api.query.QuerySettings settings = + new com.clickhouse.client.api.query.QuerySettings().setQueryId(customQueryId); + + try (QueryResponse response = client.query("SELECT 1", settings).get(30, TimeUnit.SECONDS)) { + Assert.assertEquals(response.getQueryId(), customQueryId); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async streaming with larger result set to verify streaming works. + */ + @Test(groups = {"integration"}) + public void testAsyncStreamingLargeResult() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Query that returns ~1MB of data (100K rows * ~10 bytes each) + // Use TabSeparated format so we can count lines with BufferedReader + QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.TabSeparated); + try (QueryResponse response = client.query("SELECT number, toString(number) FROM numbers(100000)", settings) + .get(60, TimeUnit.SECONDS)) { + + Assert.assertTrue(response.getReadRows() > 0, "Expected read_rows > 0"); + + // Read and count lines from the streaming response + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(response.getInputStream()))) { + long lineCount = 0; + while (reader.readLine() != null) { + lineCount++; + } + + Assert.assertEquals(lineCount, 100000, "Expected 100000 rows"); + } + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async streaming response can be read incrementally. + */ + @Test(groups = {"integration"}) + public void testAsyncStreamingIncrementalRead() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Use TabSeparated format for text-based streaming + QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.TabSeparated); + try (QueryResponse response = client.query("SELECT number FROM numbers(1000)", settings) + .get(30, TimeUnit.SECONDS)) { + + java.io.InputStream is = response.getInputStream(); + byte[] buffer = new byte[100]; + int totalBytesRead = 0; + int bytesRead; + + // Read incrementally + while ((bytesRead = is.read(buffer)) != -1) { + totalBytesRead += bytesRead; + } + + Assert.assertTrue(totalBytesRead > 0, "Expected to read data from stream"); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test that streaming async does NOT deadlock when reading is delayed. + * This tests the fix for the critical deadlock issue where: + * - NIO thread blocks on pipe write (buffer full) + * - User thread waits on future.get() (waiting for stream end) + * - Neither can proceed = deadlock + * + * The fix: future completes when headers arrive, not when stream ends. + */ + @Test(groups = {"integration"}, timeOut = 30000) // 30 second timeout catches deadlock + public void testAsyncStreamingNoDeadlockOnDelayedRead() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Query that returns data larger than pipe buffer (512KB) + // This would deadlock with the old implementation if user delays reading + // Use TabSeparated format so we can count lines with BufferedReader + QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.TabSeparated); + CompletableFuture future = client.query( + "SELECT number, repeat('x', 100) FROM numbers(10000)", settings); // ~1MB response + + // Simulate delayed reading - OLD code would deadlock here + Thread.sleep(500); + + // Get response - should complete immediately since headers arrived + try (QueryResponse response = future.get(5, TimeUnit.SECONDS)) { + // Now read the stream - NIO thread continues writing while we read + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(response.getInputStream()))) { + long lineCount = 0; + while (reader.readLine() != null) { + lineCount++; + } + + Assert.assertEquals(lineCount, 10000, "Expected 10000 rows"); + } + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async query with LZ4 compression enabled. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryWithCompression() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(true) + .build()) { + + // Execute a query with compression enabled + List records = client.queryAll("SELECT number FROM numbers(100)"); + Assert.assertEquals(records.size(), 100); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async query with ClickHouse native LZ4 compression. + */ + @Test(groups = {"integration"}) + public void testAsyncQueryWithNativeLZ4Compression() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(false) // Use native ClickHouse LZ4 + .build()) { + + // Execute a query with native LZ4 compression + List records = client.queryAll("SELECT number, toString(number) FROM numbers(50)"); + Assert.assertEquals(records.size(), 50); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + // ========== Async Insert Tests ========== + + /** + * Test basic async insert with InputStream. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertBasic() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String tableName = "async_insert_test_" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Create test table + client.query("CREATE TABLE " + tableName + " (id UInt64, name String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + try { + // Insert data using InputStream + String csvData = "1,Alice\n2,Bob\n3,Charlie\n"; + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + + InsertResponse insertResponse = client.insert(tableName, dataStream, ClickHouseFormat.CSV) + .get(10, TimeUnit.SECONDS); + + Assert.assertTrue(insertResponse.getWrittenRows() > 0 || insertResponse.getMetrics() != null); + + // Verify data was inserted + List records = client.queryAll("SELECT * FROM " + tableName + " ORDER BY id"); + Assert.assertEquals(records.size(), 3); + Assert.assertEquals(records.get(0).getString("name"), "Alice"); + Assert.assertEquals(records.get(1).getString("name"), "Bob"); + Assert.assertEquals(records.get(2).getString("name"), "Charlie"); + + } finally { + // Cleanup + client.query("DROP TABLE IF EXISTS " + tableName).get(10, TimeUnit.SECONDS).close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async insert with larger data set to verify streaming works. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertLargeData() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String tableName = "async_insert_large_test_" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Create test table + client.query("CREATE TABLE " + tableName + " (id UInt64, data String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + try { + // Generate ~1MB of CSV data (10000 rows * ~100 bytes) + StringBuilder csvBuilder = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + csvBuilder.append(i).append(",").append("data_row_" + i + "_padding_to_make_it_longer_").append("\n"); + } + String csvData = csvBuilder.toString(); + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + + InsertResponse insertResponse = client.insert(tableName, dataStream, ClickHouseFormat.CSV) + .get(60, TimeUnit.SECONDS); + + // Verify data was inserted + List records = client.queryAll("SELECT count() FROM " + tableName); + Assert.assertEquals(records.get(0).getLong(1), 10000L); + + } finally { + // Cleanup + client.query("DROP TABLE IF EXISTS " + tableName).get(10, TimeUnit.SECONDS).close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async insert with compression enabled. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertWithCompression() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String tableName = "async_insert_compress_test_" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(true) + .build()) { + + // Create test table + client.query("CREATE TABLE " + tableName + " (id UInt64, value String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + try { + // Insert data with compression enabled + StringBuilder csvBuilder = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + csvBuilder.append(i).append(",value_").append(i).append("\n"); + } + String csvData = csvBuilder.toString(); + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + + InsertResponse insertResponse = client.insert(tableName, dataStream, ClickHouseFormat.CSV) + .get(30, TimeUnit.SECONDS); + + // Verify data was inserted + List records = client.queryAll("SELECT count() FROM " + tableName); + Assert.assertEquals(records.get(0).getLong(1), 1000L); + + } finally { + // Cleanup + client.query("DROP TABLE IF EXISTS " + tableName).get(10, TimeUnit.SECONDS).close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test async insert with ClickHouse native LZ4 compression. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertWithNativeLZ4Compression() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String tableName = "async_insert_lz4_test_" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(false) // Use native ClickHouse LZ4 + .build()) { + + // Create test table + client.query("CREATE TABLE " + tableName + " (id UInt64, value String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + try { + // Insert data with native LZ4 compression + StringBuilder csvBuilder = new StringBuilder(); + for (int i = 0; i < 500; i++) { + csvBuilder.append(i).append(",native_lz4_value_").append(i).append("\n"); + } + String csvData = csvBuilder.toString(); + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + + InsertResponse insertResponse = client.insert(tableName, dataStream, ClickHouseFormat.CSV) + .get(30, TimeUnit.SECONDS); + + // Verify data was inserted + List records = client.queryAll("SELECT count() FROM " + tableName); + Assert.assertEquals(records.get(0).getLong(1), 500L); + + } finally { + // Cleanup + client.query("DROP TABLE IF EXISTS " + tableName).get(10, TimeUnit.SECONDS).close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test that async and sync inserts produce the same results. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertResultsMatchSync() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String asyncTable = "async_insert_compare_async_" + System.currentTimeMillis(); + String syncTable = "async_insert_compare_sync_" + System.currentTimeMillis(); + + String csvData = "1,test1\n2,test2\n3,test3\n4,test4\n5,test5\n"; + + try { + // Insert using async client + try (Client asyncClient = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + asyncClient.query("CREATE TABLE " + asyncTable + " (id UInt64, value String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + asyncClient.insert(asyncTable, dataStream, ClickHouseFormat.CSV).get(10, TimeUnit.SECONDS); + } + + // Insert using sync client + try (Client syncClient = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(false) + .build()) { + + syncClient.query("CREATE TABLE " + syncTable + " (id UInt64, value String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + syncClient.insert(syncTable, dataStream, ClickHouseFormat.CSV).get(10, TimeUnit.SECONDS); + } + + // Compare results + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .build()) { + + List asyncRecords = client.queryAll("SELECT * FROM " + asyncTable + " ORDER BY id"); + List syncRecords = client.queryAll("SELECT * FROM " + syncTable + " ORDER BY id"); + + Assert.assertEquals(asyncRecords.size(), syncRecords.size()); + for (int i = 0; i < asyncRecords.size(); i++) { + Assert.assertEquals(asyncRecords.get(i).getLong("id"), syncRecords.get(i).getLong("id")); + Assert.assertEquals(asyncRecords.get(i).getString("value"), syncRecords.get(i).getString("value")); + } + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + // Cleanup tables regardless of test outcome + try (Client cleanupClient = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .build()) { + cleanupClient.query("DROP TABLE IF EXISTS " + asyncTable).get(10, TimeUnit.SECONDS).close(); + cleanupClient.query("DROP TABLE IF EXISTS " + syncTable).get(10, TimeUnit.SECONDS).close(); + } catch (Exception ignored) { + // Cleanup errors shouldn't fail the test + } + } + } + + /** + * Test async insert with column names specified. + */ + @Test(groups = {"integration"}) + public void testAsyncInsertWithColumnNames() { + if (isCloud()) { + return; + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String tableName = "async_insert_columns_test_" + System.currentTimeMillis(); + + try (Client client = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useAsyncHttp(true) + .build()) { + + // Create test table with default value + client.query("CREATE TABLE " + tableName + " (id UInt64, name String, status String DEFAULT 'active') ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + try { + // Insert only id and name columns (status should get default) + String csvData = "1,Alice\n2,Bob\n"; + ByteArrayInputStream dataStream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + + client.insert(tableName, Arrays.asList("id", "name"), dataStream, ClickHouseFormat.CSV) + .get(10, TimeUnit.SECONDS); + + // Verify data was inserted with default status + List records = client.queryAll("SELECT * FROM " + tableName + " ORDER BY id"); + Assert.assertEquals(records.size(), 2); + Assert.assertEquals(records.get(0).getString("status"), "active"); + Assert.assertEquals(records.get(1).getString("status"), "active"); + + } finally { + // Cleanup + client.query("DROP TABLE IF EXISTS " + tableName).get(10, TimeUnit.SECONDS).close(); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java b/client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java new file mode 100644 index 000000000..8e86de966 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java @@ -0,0 +1,368 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QueryResponse; +import com.clickhouse.client.api.query.QuerySettings; +import com.clickhouse.data.ClickHouseFormat; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Manual validation script for Async HTTP features. + * + * Run with: + * mvn exec:java -pl client-v2 \ + * -Dexec.mainClass="com.clickhouse.client.AsyncHttpManualValidation" \ + * -Dexec.classpathScope=test \ + * -Dexec.args="http://localhost:8123 default password" + * + * Or for ClickHouse Cloud: + * mvn exec:java -pl client-v2 \ + * -Dexec.mainClass="com.clickhouse.client.AsyncHttpManualValidation" \ + * -Dexec.classpathScope=test \ + * -Dexec.args="https://your-host.clickhouse.cloud:8443 default your-password" + */ +public class AsyncHttpManualValidation { + + private static int passed = 0; + private static int failed = 0; + + public static void main(String[] args) { + String endpoint = args.length > 0 ? args[0] : "http://localhost:8123"; + String username = args.length > 1 ? args[1] : "default"; + String password = args.length > 2 ? args[2] : ""; + + System.out.println("============================================================"); + System.out.println("Async HTTP Manual Validation"); + System.out.println("============================================================"); + System.out.println("Endpoint: " + endpoint); + System.out.println("Username: " + username); + System.out.println(); + + // Test with SYNC client first (baseline) + System.out.println("--- SYNC CLIENT (Baseline) ---"); + try (Client syncClient = new Client.Builder() + .addEndpoint(endpoint) + .setUsername(username) + .setPassword(password) + .useAsyncHttp(false) + .build()) { + + testBasicQuery(syncClient, "SYNC"); + } catch (Exception e) { + System.out.println("SYNC baseline failed: " + e.getMessage()); + } + + System.out.println(); + System.out.println("--- ASYNC CLIENT ---"); + + // Test with ASYNC client + try (Client asyncClient = new Client.Builder() + .addEndpoint(endpoint) + .setUsername(username) + .setPassword(password) + .useAsyncHttp(true) + .build()) { + + // Phase 1: Basic async queries + testBasicQuery(asyncClient, "ASYNC"); + testQueryMetrics(asyncClient); + testConcurrentQueries(asyncClient); + + // Phase 2: Streaming responses + testStreamingResponse(asyncClient); + testLargeResultStreaming(asyncClient); + + // Phase 3: Request compression + testQueryWithHttpCompression(asyncClient, endpoint, username, password); + testQueryWithNativeLZ4(asyncClient, endpoint, username, password); + + // Phase 4: Async inserts + testBasicInsert(asyncClient); + testLargeInsert(asyncClient); + testInsertWithCompression(asyncClient, endpoint, username, password); + + } catch (Exception e) { + e.printStackTrace(); + failed++; + } + + System.out.println(); + System.out.println("============================================================"); + System.out.println("RESULTS: " + passed + " passed, " + failed + " failed"); + System.out.println("============================================================"); + + System.exit(failed > 0 ? 1 : 0); + } + + private static void testBasicQuery(Client client, String mode) { + String testName = mode + " Basic Query"; + try { + List records = client.queryAll("SELECT 1 as num, 'hello' as greeting"); + if (records.size() == 1 && records.get(0).getLong("num") == 1) { + pass(testName); + } else { + fail(testName, "Unexpected result: " + records); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testQueryMetrics(Client client) { + String testName = "Query Metrics"; + try { + QueryResponse response = client.query("SELECT number FROM numbers(100)").get(30, TimeUnit.SECONDS); + if (response.getReadRows() > 0 && response.getQueryId() != null) { + pass(testName + " (rows=" + response.getReadRows() + ", queryId=" + response.getQueryId() + ")"); + } else { + fail(testName, "Missing metrics"); + } + response.close(); + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testConcurrentQueries(Client client) { + String testName = "Concurrent Queries (10)"; + try { + @SuppressWarnings("unchecked") + CompletableFuture[] futures = new CompletableFuture[10]; + for (int i = 0; i < 10; i++) { + futures[i] = client.query("SELECT " + i + " as num, sleep(0.05)"); + } + CompletableFuture.allOf(futures).get(60, TimeUnit.SECONDS); + + int successCount = 0; + for (CompletableFuture f : futures) { + if (f.get().getReadRows() > 0) successCount++; + f.get().close(); + } + + if (successCount == 10) { + pass(testName); + } else { + fail(testName, "Only " + successCount + "/10 succeeded"); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testStreamingResponse(Client client) { + String testName = "Streaming Response"; + try { + // Use TabSeparated format so we can count lines with BufferedReader + QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.TabSeparated); + QueryResponse response = client.query("SELECT number FROM numbers(1000)", settings).get(30, TimeUnit.SECONDS); + BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream())); + int lineCount = 0; + while (reader.readLine() != null) lineCount++; + response.close(); + + if (lineCount == 1000) { + pass(testName + " (1000 rows streamed)"); + } else { + fail(testName, "Expected 1000 rows, got " + lineCount); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testLargeResultStreaming(Client client) { + String testName = "Large Result Streaming (100K rows)"; + try { + long startMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + // Use TabSeparated format so we can count lines with BufferedReader + QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.TabSeparated); + QueryResponse response = client.query("SELECT number, toString(number) FROM numbers(100000)", settings) + .get(60, TimeUnit.SECONDS); + BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream())); + int lineCount = 0; + while (reader.readLine() != null) lineCount++; + response.close(); + + long endMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + long memUsed = (endMem - startMem) / 1024 / 1024; + + if (lineCount == 100000) { + pass(testName + " (mem delta ~" + memUsed + "MB)"); + } else { + fail(testName, "Expected 100000 rows, got " + lineCount); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testQueryWithHttpCompression(Client client, String endpoint, String user, String pass) { + String testName = "Query with HTTP LZ4 Compression"; + try (Client compressClient = new Client.Builder() + .addEndpoint(endpoint) + .setUsername(user) + .setPassword(pass) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(true) + .build()) { + + List records = compressClient.queryAll("SELECT number FROM numbers(100)"); + if (records.size() == 100) { + pass(testName); + } else { + fail(testName, "Expected 100 rows, got " + records.size()); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testQueryWithNativeLZ4(Client client, String endpoint, String user, String pass) { + String testName = "Query with Native LZ4 Compression"; + try (Client compressClient = new Client.Builder() + .addEndpoint(endpoint) + .setUsername(user) + .setPassword(pass) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(false) + .build()) { + + List records = compressClient.queryAll("SELECT number FROM numbers(50)"); + if (records.size() == 50) { + pass(testName); + } else { + fail(testName, "Expected 50 rows, got " + records.size()); + } + } catch (Exception e) { + fail(testName, e); + } + } + + private static void testBasicInsert(Client client) { + String testName = "Basic Async Insert"; + String tableName = "async_test_basic_" + System.currentTimeMillis(); + try { + client.query("CREATE TABLE " + tableName + " (id UInt64, name String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + String csvData = "1,Alice\n2,Bob\n3,Charlie\n"; + ByteArrayInputStream stream = new ByteArrayInputStream(csvData.getBytes(StandardCharsets.UTF_8)); + InsertResponse response = client.insert(tableName, stream, ClickHouseFormat.CSV) + .get(10, TimeUnit.SECONDS); + + List records = client.queryAll("SELECT count() FROM " + tableName); + long count = records.get(0).getLong(1); + + if (count == 3) { + pass(testName + " (3 rows inserted)"); + } else { + fail(testName, "Expected 3 rows, got " + count); + } + } catch (Exception e) { + fail(testName, e); + } finally { + try { client.query("DROP TABLE IF EXISTS " + tableName).get(5, TimeUnit.SECONDS).close(); } catch (Exception ignored) {} + } + } + + private static void testLargeInsert(Client client) { + String testName = "Large Async Insert (10K rows)"; + String tableName = "async_test_large_" + System.currentTimeMillis(); + try { + client.query("CREATE TABLE " + tableName + " (id UInt64, data String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + StringBuilder csv = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + csv.append(i).append(",data_row_").append(i).append("\n"); + } + ByteArrayInputStream stream = new ByteArrayInputStream(csv.toString().getBytes(StandardCharsets.UTF_8)); + + long start = System.currentTimeMillis(); + client.insert(tableName, stream, ClickHouseFormat.CSV).get(60, TimeUnit.SECONDS); + long elapsed = System.currentTimeMillis() - start; + + List records = client.queryAll("SELECT count() FROM " + tableName); + long count = records.get(0).getLong(1); + + if (count == 10000) { + pass(testName + " (" + elapsed + "ms)"); + } else { + fail(testName, "Expected 10000 rows, got " + count); + } + } catch (Exception e) { + fail(testName, e); + } finally { + try { client.query("DROP TABLE IF EXISTS " + tableName).get(5, TimeUnit.SECONDS).close(); } catch (Exception ignored) {} + } + } + + private static void testInsertWithCompression(Client client, String endpoint, String user, String pass) { + String testName = "Insert with Compression"; + String tableName = "async_test_compress_" + System.currentTimeMillis(); + Client compressClient = null; + try { + compressClient = new Client.Builder() + .addEndpoint(endpoint) + .setUsername(user) + .setPassword(pass) + .useAsyncHttp(true) + .compressClientRequest(true) + .useHttpCompression(true) + .build(); + + compressClient.query("CREATE TABLE " + tableName + " (id UInt64, value String) ENGINE = Memory") + .get(10, TimeUnit.SECONDS).close(); + + StringBuilder csv = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + csv.append(i).append(",value_").append(i).append("\n"); + } + ByteArrayInputStream stream = new ByteArrayInputStream(csv.toString().getBytes(StandardCharsets.UTF_8)); + compressClient.insert(tableName, stream, ClickHouseFormat.CSV).get(30, TimeUnit.SECONDS); + + List records = compressClient.queryAll("SELECT count() FROM " + tableName); + long count = records.get(0).getLong(1); + + if (count == 1000) { + pass(testName + " (1000 rows)"); + } else { + fail(testName, "Expected 1000 rows, got " + count); + } + } catch (Exception e) { + fail(testName, e); + } finally { + if (compressClient != null) { + try { compressClient.query("DROP TABLE IF EXISTS " + tableName).get(5, TimeUnit.SECONDS).close(); } catch (Exception ignored) {} + try { compressClient.close(); } catch (Exception ignored) {} + } + } + } + + private static void pass(String testName) { + System.out.println("[PASS] " + testName); + passed++; + } + + private static void fail(String testName, String reason) { + System.out.println("[FAIL] " + testName + " - " + reason); + failed++; + } + + private static void fail(String testName, Exception e) { + System.out.println("[FAIL] " + testName + " - " + e.getClass().getSimpleName() + ": " + e.getMessage()); + failed++; + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 52815237b..8a2e5e010 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -260,7 +260,7 @@ public void testDefaultSettings() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 34); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 35); // to check everything is set. Increment when new added. } try (Client client = new Client.Builder() @@ -360,7 +360,7 @@ public void testWithOldDefaults() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 34); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 35); // to check everything is set. Increment when new added. } } diff --git a/client-v2/src/test/resources/logback-test.xml b/client-v2/src/test/resources/logback-test.xml new file mode 100644 index 000000000..9c722f002 --- /dev/null +++ b/client-v2/src/test/resources/logback-test.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + +