From 7b2d5c1d90da11a5c0f4085dd89029a32449f1c3 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Wed, 24 Jun 2026 15:40:33 -0700 Subject: [PATCH 1/8] separated request creation and execution --- .../com/clickhouse/client/api/Client.java | 134 ++++++++-------- .../client/api/insert/InsertResponse.java | 10 +- .../client/api/internal/ClientUtils.java | 15 ++ .../api/internal/HttpAPIClientHelper.java | 146 ++++++++++++++---- .../client/api/query/QueryResponse.java | 44 +++--- .../transport/internal/TransportRequest.java | 36 +++++ .../transport/internal/TransportResponse.java | 60 +++++++ 7 files changed, 312 insertions(+), 133 deletions(-) create mode 100644 client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java create mode 100644 client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index da157cb9d..742d757a1 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -16,6 +16,7 @@ import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.insert.InsertSettings; import com.clickhouse.client.api.internal.ClientStatisticsHolder; +import com.clickhouse.client.api.internal.ClientUtils; import com.clickhouse.client.api.internal.CredentialsManager; import com.clickhouse.client.api.internal.HttpAPIClientHelper; import com.clickhouse.client.api.internal.MapUtils; @@ -36,6 +37,8 @@ import com.clickhouse.client.api.serde.POJOSerDe; import com.clickhouse.client.api.transport.Endpoint; import com.clickhouse.client.api.transport.HttpEndpoint; +import com.clickhouse.client.api.transport.internal.TransportRequest; +import com.clickhouse.client.api.transport.internal.TransportResponse; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseDataType; @@ -43,8 +46,6 @@ import com.google.common.collect.ImmutableList; import net.jpountz.lz4.LZ4Factory; import org.apache.hc.core5.concurrent.DefaultThreadFactory; -import org.apache.hc.core5.http.ClassicHttpResponse; -import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1319,43 +1320,37 @@ public CompletableFuture insert(String tableName, List data, RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request - try (ClassicHttpResponse httpResponse = - httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), - out -> { - out.write("INSERT INTO ".getBytes()); - out.write(tableName.getBytes()); - out.write(" \n FORMAT ".getBytes()); - out.write(format.name().getBytes()); - out.write(" \n".getBytes()); - for (Object obj : data) { - - for (POJOFieldSerializer serializer : serializersForTable) { - try { - serializer.serialize(obj, out); - } catch (InvocationTargetException | IllegalAccessException | IOException e) { - throw new DataSerializationException(obj, serializer, e); - } - } + TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(), + out -> { + out.write("INSERT INTO ".getBytes()); + out.write(tableName.getBytes()); + out.write(" \n FORMAT ".getBytes()); + out.write(format.name().getBytes()); + out.write(" \n".getBytes()); + for (Object obj : data) { + + for (POJOFieldSerializer serializer : serializersForTable) { + try { + serializer.serialize(obj, out); + } catch (InvocationTargetException | IllegalAccessException | IOException e) { + throw new DataSerializationException(obj, serializer, e); } - out.close(); - })) { - - + } + } + out.close(); + }); + try (TransportResponse httpResponse = httpClientHelper.executeRequest(transportRequest)) { // Check response - if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { - LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); + if (httpResponse.getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getStatusCode(), durationSince(startTime)); selectedEndpoint = getNextAliveNode(); continue; } ClientStatisticsHolder clientStats = globalClientStats.remove(operationId); - OperationMetrics metrics = new OperationMetrics(clientStats); - String summary = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}"); - ProcessParser.parseSummary(summary, metrics); - String queryId = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId(), String::valueOf); - metrics.operationComplete(); - metrics.setQueryId(queryId); - return new InsertResponse(metrics, HttpAPIClientHelper.collectResponseHeaders(httpResponse)); + OperationMetrics metrics = completeOperation(httpResponse, clientStats, requestSettings.getQueryId()); + + return new InsertResponse(httpResponse, metrics); } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); @@ -1373,7 +1368,6 @@ public CompletableFuture insert(String tableName, List data, throw (lastException == null ? new ClientException(errMsg) : lastException); }; return runAsyncOperation(supplier, requestSettings.getAllSettings()); - } /** @@ -1509,7 +1503,7 @@ public CompletableFuture insert(String tableName, clientStats.start(ClientMetrics.OP_DURATION); final ClientStatisticsHolder finalClientStats = clientStats; - Supplier responseSupplier; + final int writeBufferSize = requestSettings.getInputStreamCopyBufferSize() <= 0 ? (int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) : @@ -1533,7 +1527,8 @@ public CompletableFuture insert(String tableName, if (requestSettings.getQueryId() == null && queryIdGenerator != null) { requestSettings.setQueryId(queryIdGenerator.get()); } - responseSupplier = () -> { + + Supplier responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node Endpoint selectedEndpoint = getNextAliveNode(); @@ -1541,28 +1536,23 @@ public CompletableFuture insert(String tableName, RuntimeException lastException = null; for (int i = 0; i <= retries; i++) { // Execute request - try (ClassicHttpResponse httpResponse = - httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), - out -> { - writer.onOutput(out); - out.close(); - })) { + TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(), + out -> { + writer.onOutput(out); + out.close(); + }); + try (TransportResponse httpResponse = httpClientHelper.executeRequest(transportRequest)) { // Check response - if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { - LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); + if (httpResponse.getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getStatusCode(), durationSince(startTime)); selectedEndpoint = getNextAliveNode(); continue; } - OperationMetrics metrics = new OperationMetrics(finalClientStats); - String summary = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}"); - ProcessParser.parseSummary(summary, metrics); - String queryId = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId(), String::valueOf); - metrics.operationComplete(); - metrics.setQueryId(queryId); - return new InsertResponse(metrics, HttpAPIClientHelper.collectResponseHeaders(httpResponse)); + OperationMetrics metrics = completeOperation(httpResponse, finalClientStats, requestSettings.getQueryId()); + return new InsertResponse(httpResponse, metrics); } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); @@ -1668,38 +1658,28 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, MapQueries data in one of descriptive format and creates a reader out of the response stream.

*

Format is selected internally so is ignored when passed in settings. If query contains format diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java index 2fc09f312..c83161725 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java @@ -3,21 +3,17 @@ import com.clickhouse.client.api.http.ClickHouseHttpProto; import com.clickhouse.client.api.metrics.OperationMetrics; import com.clickhouse.client.api.metrics.ServerMetrics; +import com.clickhouse.client.api.transport.internal.TransportResponse; -import java.util.Collections; import java.util.Map; public class InsertResponse implements AutoCloseable { private OperationMetrics operationMetrics; private final Map responseHeaders; - public InsertResponse(OperationMetrics metrics) { - this(metrics, Collections.emptyMap()); - } - - public InsertResponse(OperationMetrics metrics, Map responseHeaders) { + public InsertResponse(TransportResponse transportResponse, OperationMetrics metrics) { this.operationMetrics = metrics; - this.responseHeaders = responseHeaders; + this.responseHeaders = transportResponse.getHeaders(); } @Override diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java index 235d55cad..085c43181 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java @@ -1,5 +1,10 @@ package com.clickhouse.client.api.internal; +import org.slf4j.Logger; + +import java.io.Closeable; +import java.io.IOException; + /** * Class containing utility methods used across the client. */ @@ -14,4 +19,14 @@ public static boolean isNotBlank(String str) { public static boolean isBlank(String str) { return str == null || str.trim().isEmpty(); } + + public static void quiteClose(Closeable closeable, Logger log) { + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + log.warn("Failed to close object " + closeable, e); + } + } + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 37fe4ab8c..89c31ac09 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -13,6 +13,8 @@ import com.clickhouse.client.api.enums.ProxyType; import com.clickhouse.client.api.http.ClickHouseHttpProto; import com.clickhouse.client.api.transport.Endpoint; +import com.clickhouse.client.api.transport.internal.TransportRequest; +import com.clickhouse.client.api.transport.internal.TransportResponse; import com.clickhouse.client.config.ClickHouseDefaultSslContextProvider; import com.clickhouse.data.ClickHouseFormat; import net.jpountz.lz4.LZ4Factory; @@ -44,6 +46,7 @@ import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.NoHttpResponseException; +import org.apache.hc.core5.http.ProtocolException; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.config.Http1Config; @@ -513,9 +516,37 @@ private HttpPost createPostRequest(URI uri, Map requestConfig) { return req; } - public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, - String body) throws Exception { + private static final class TransportRequestImpl implements TransportRequest { + private final HttpPost delegate; + private final Map config; + TransportRequestImpl(HttpPost delegate, Map config) { + this.delegate = delegate; + this.config = config; + } + + @Override + public boolean cancel() throws Exception { + if (delegate.isCancelled()) { + return true; + } + return delegate.cancel(); + } + + @Override + public Map getConfig() { + return config; + } + + @Override + @SuppressWarnings("unchecked") + public T getDelegate() { + return (T) delegate; + } + } + + public TransportRequest createRequest(Endpoint server, Map requestConfig, + String body) { boolean useMultipart = ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getOrDefault(requestConfig) && requestConfig.containsKey(HttpAPIClientHelper.KEY_STATEMENT_PARAMS); @@ -538,27 +569,93 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r req.setEntity(wrapRequestEntity(httpEntity, requestConfig)); } else { - final String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - - HttpEntity httpEntity = new ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8.name()), CONTENT_TYPE, contentEncoding); + final HttpEntity httpEntity; + try { + final String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + httpEntity = new ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8.name()), CONTENT_TYPE, contentEncoding); + } catch (UnsupportedEncodingException | ProtocolException e) { + throw new ClientException("failed to create request body entity", e); + } req.setEntity(wrapRequestEntity(httpEntity, requestConfig)); } - // execute - return doPostRequest(requestConfig, req); + return new TransportRequestImpl(req, requestConfig); } - public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, - IOCallback writeCallback) throws Exception { + private static final class TransportResponseImpl implements TransportResponse { + + private final ClassicHttpResponse delegate; + + TransportResponseImpl(ClassicHttpResponse delegate) { + this.delegate = delegate; + } + + @Override + public int getStatusCode() { + return 0; + } + + @Override + public ClickHouseFormat getDataFormat() { + Header formatHeader = delegate.getFirstHeader(ClickHouseHttpProto.HEADER_FORMAT); + return formatHeader == null ? null : ClickHouseFormat.valueOf(formatHeader.getValue()); + } + + @Override + public String getSummaryJson() { + return HttpAPIClientHelper.getHeaderVal(delegate + .getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}"); + } + + @Override + public String getQueryId() { + return HttpAPIClientHelper.getHeaderVal(delegate + .getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), null); + } + + @Override + public T getDelegate() { + return (T) delegate; + } + + @Override + public Map getHeaders() { + return HttpAPIClientHelper.collectResponseHeaders(delegate); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public InputStream createDataInputStream() { + try { + return delegate.getEntity().getContent(); + } catch (Exception e) { + throw new ClientException("Failed to construct input stream", e); + } + } + } + + public TransportResponse executeRequest(TransportRequest transportRequest) throws Exception { + return new TransportResponseImpl(doPostRequest(transportRequest.getConfig(), transportRequest.getDelegate())); + } + + public TransportRequest createRequest(Endpoint server, Map requestConfig, IOCallback writeCallback) { final URI uri = createRequestURI(server, requestConfig, true); final HttpPost req = createPostRequest(uri, requestConfig); - String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - req.setEntity(wrapRequestEntity( - new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), - requestConfig)); + try { + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + req.setEntity(wrapRequestEntity( + new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback), + requestConfig)); + } catch (ProtocolException e) { + throw new ClientException("failed to create request body entity", e); + } - return doPostRequest(requestConfig, req); + return new TransportRequestImpl(req, requestConfig); } private ClassicHttpResponse doPostRequest(Map requestConfig, HttpPost req) throws Exception { @@ -589,30 +686,20 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt return httpResponse; } catch (UnknownHostException e) { - closeQuietly(httpResponse); + ClientUtils.quiteClose(httpResponse, LOG); LOG.warn("Host '{}' unknown", req.getAuthority()); throw e; } catch (ConnectException | NoRouteToHostException e) { - closeQuietly(httpResponse); + ClientUtils.quiteClose(httpResponse, LOG); LOG.warn("Failed to connect to '{}': {}", req.getAuthority(), e.getMessage()); throw e; } catch (Exception e) { - closeQuietly(httpResponse); + ClientUtils.quiteClose(httpResponse, LOG); LOG.debug("Failed to execute request to '{}': {}", req.getAuthority(), e.getMessage(), e); throw e; } } - public static void closeQuietly(ClassicHttpResponse httpResponse) { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - LOG.warn("Failed to close response"); - } - } - } - private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); private void addHeaders(HttpPost req, Map requestConfig) { @@ -804,7 +891,10 @@ public static int getHeaderInt(Header header, int defaultValue) { ClickHouseHttpProto.HEADER_SRV_SUMMARY, ClickHouseHttpProto.HEADER_SRV_DISPLAY_NAME, ClickHouseHttpProto.HEADER_DATABASE, - ClickHouseHttpProto.HEADER_DB_USER + ClickHouseHttpProto.HEADER_DB_USER, + ClickHouseHttpProto.HEADER_TIMEZONE, + ClickHouseHttpProto.HEADER_FORMAT, + ClickHouseHttpProto.HEADER_PROGRESS )); /** diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java index 6e237a18d..32c1d4dab 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java @@ -3,14 +3,14 @@ import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.http.ClickHouseHttpProto; +import com.clickhouse.client.api.internal.ClientUtils; import com.clickhouse.client.api.metrics.OperationMetrics; import com.clickhouse.client.api.metrics.ServerMetrics; +import com.clickhouse.client.api.transport.internal.TransportResponse; import com.clickhouse.data.ClickHouseFormat; -import org.apache.hc.core5.http.ClassicHttpResponse; -import org.apache.hc.core5.http.Header; import java.io.InputStream; -import java.util.Collections; +import java.time.ZoneId; import java.util.Map; import java.util.TimeZone; @@ -31,51 +31,43 @@ public class QueryResponse implements AutoCloseable { private final ClickHouseFormat format; - private QuerySettings settings; + private final QuerySettings settings; - private OperationMetrics operationMetrics; + private final OperationMetrics operationMetrics; - private ClassicHttpResponse httpResponse; + private final TransportResponse transportResponse; private final Map responseHeaders; - public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, QuerySettings settings, - OperationMetrics operationMetrics) { - this(response, format, settings, operationMetrics, Collections.emptyMap()); - } - - public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, QuerySettings settings, - OperationMetrics operationMetrics, Map responseHeaders) { - this.httpResponse = response; + public QueryResponse(TransportResponse response, ClickHouseFormat format, QuerySettings settings, OperationMetrics operationMetrics) { + this.transportResponse = response; this.format = format; this.operationMetrics = operationMetrics; this.settings = settings; - this.responseHeaders = responseHeaders; + this.responseHeaders = response.getHeaders(); - Header tzHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE); - if (tzHeader != null) { + String timeZoneHeader = responseHeaders.get(ClickHouseHttpProto.HEADER_TIMEZONE); + if (timeZoneHeader != null) { + TimeZone serverTz; try { - this.settings.setOption(ClientConfigProperties.SERVER_TIMEZONE.getKey(), - TimeZone.getTimeZone(tzHeader.getValue())); + serverTz = TimeZone.getTimeZone(timeZoneHeader); } catch (Exception e) { throw new ClientException("Failed to parse server timezone", e); } + this.settings.setOption(ClientConfigProperties.SERVER_TIMEZONE.getKey(), + serverTz); } } public InputStream getInputStream() { - try { - return httpResponse.getEntity().getContent(); - } catch (Exception e) { - throw new ClientException("Failed to construct input stream", e); - } + return transportResponse.createDataInputStream(); } @Override public void close() throws Exception { - if (httpResponse != null ) { + if (transportResponse != null ) { try { - httpResponse.close(); + transportResponse.close(); } catch (Exception e) { throw new ClientException("Failed to close response", e); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java new file mode 100644 index 000000000..6a1a66401 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java @@ -0,0 +1,36 @@ +package com.clickhouse.client.api.transport.internal; + +import java.util.Map; + +public interface TransportRequest { + + + /** + * Gives access to transport delegate. Used strictly only by transport. + * @return internal transport request object + * @param - Type of delegate + */ + T getDelegate(); + + /** + * Returns reference to request configuration. Implementation should + * store only copy because configuration map is created for each request separately + * @return request configuration map + */ + Map getConfig(); + + /** + * Cancels request associated with the object. Implementation should + * treat this method like close() and release all resource. + * When request is canceled it cannot be reused. All reusable objects + * should be saved elsewhere. + * In many cases cancellation of IO operation is problematic and result + * of this method should not be used in core logic. + * Operation is idempotent and can be called on canceled request multiple + * times. + * + * @return result of operation. True if request was canceled for sure. False when result cannot be known. + * @throws Exception - when something extraordinary happens while canceling the request. + */ + boolean cancel() throws Exception; +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java new file mode 100644 index 000000000..32fe89465 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java @@ -0,0 +1,60 @@ +package com.clickhouse.client.api.transport.internal; + +import com.clickhouse.data.ClickHouseFormat; + +import java.io.Closeable; +import java.io.InputStream; +import java.util.Map; + +public interface TransportResponse extends Closeable { + + /** + * Transport status code translated to one of values: + *

    + *
  • 503 - for service unavailable
  • + *
  • 500 - server error
  • + *
  • 400 - user error
  • + *
  • 404 - endpoint not found
  • + *
  • 403 - access not granted
  • + *
  • 401 - no authentication information
  • + *
  • 200 - ok
  • + *
+ * @return integer value of status code + */ + int getStatusCode(); + + + /** + * Data format returned by server or calculated other way + * @return data format + */ + ClickHouseFormat getDataFormat(); + + String getSummaryJson(); + + String getQueryId(); + + /** + * Gives access to transport delegate. Used strictly only by transport. + * @return internal transport response object + * @param - Type of delegate + */ + T getDelegate(); + + + /** + * Server headers. + * @return response headers + */ + Map getHeaders(); + + + /** + * Creates a new stream to read data. It is applicable only for + * blocking transports. In real life this should be called once. + * It is important to mention that each time new input stream is created. + * + * @return new data stream + */ + InputStream createDataInputStream(); +} From fd444b06449506a456ca5a1dfe993d1091846722 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Thu, 25 Jun 2026 22:00:56 -0700 Subject: [PATCH 2/8] Added more tests and removed dead code halding 503 in client --- .../com/clickhouse/client/api/Client.java | 39 +-- .../client/api/query/QueryResponse.java | 14 +- .../client/api/internal/ClientUtilsTest.java | 48 +++ .../api/transport/TransportBaseTests.java | 326 ++++++++++++++++++ .../clickhouse/client/insert/InsertTests.java | 31 ++ .../clickhouse/client/query/QueryTests.java | 49 +++ 6 files changed, 468 insertions(+), 39 deletions(-) create mode 100644 client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java create mode 100644 client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index ba5a95881..6f0a56be7 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -47,7 +47,6 @@ import com.google.common.collect.ImmutableList; import net.jpountz.lz4.LZ4Factory; import org.apache.hc.core5.concurrent.DefaultThreadFactory; -import org.apache.hc.core5.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1398,18 +1397,11 @@ public CompletableFuture insert(String tableName, List data, } out.close(); }); - try (TransportResponse httpResponse = httpClientHelper.executeRequest(transportRequest)) { - // Check response - if (httpResponse.getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { - LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getStatusCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); - continue; - } - + try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { ClientStatisticsHolder clientStats = globalClientStats.remove(operationId); - OperationMetrics metrics = completeOperation(httpResponse, clientStats, requestSettings.getQueryId()); + OperationMetrics metrics = completeOperation(transportResponse, clientStats, requestSettings.getQueryId()); - return new InsertResponse(httpResponse, metrics); + return new InsertResponse(transportResponse, metrics); } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); @@ -1424,7 +1416,8 @@ public CompletableFuture insert(String tableName, List data, String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); LOG.warn(errMsg); - throw (lastException == null ? new ClientException(errMsg) : lastException); }; + throw (lastException == null ? new ClientException(errMsg) : lastException); + }; return runAsyncOperation(supplier, requestSettings.getAllSettings()); } @@ -1601,17 +1594,9 @@ public CompletableFuture insert(String tableName, out.close(); }); - try (TransportResponse httpResponse = httpClientHelper.executeRequest(transportRequest)) { - - // Check response - if (httpResponse.getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { - LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getStatusCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); - continue; - } - - OperationMetrics metrics = completeOperation(httpResponse, finalClientStats, requestSettings.getQueryId()); - return new InsertResponse(httpResponse, metrics); + try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { + OperationMetrics metrics = completeOperation(transportResponse, finalClientStats, requestSettings.getQueryId()); + return new InsertResponse(transportResponse, metrics); } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); @@ -1722,14 +1707,6 @@ public CompletableFuture query(String sqlQuery, Map responseHeaders; public QueryResponse(TransportResponse response, ClickHouseFormat format, QuerySettings settings, OperationMetrics operationMetrics) { + Objects.requireNonNull(response, "response is null"); this.transportResponse = response; this.format = format; this.operationMetrics = operationMetrics; @@ -65,12 +65,10 @@ public InputStream getInputStream() { @Override public void close() throws Exception { - if (transportResponse != null ) { - try { - transportResponse.close(); - } catch (Exception e) { - throw new ClientException("Failed to close response", e); - } + try { + transportResponse.close(); + } catch (Exception e) { + throw new ClientException("Failed to close response", e); } } diff --git a/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java new file mode 100644 index 000000000..9cf10b25a --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java @@ -0,0 +1,48 @@ +package com.clickhouse.client.api.internal; + +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.io.IOException; + +public class ClientUtilsTest { + + @Test(groups = {"unit"}) + public void testQuiteCloseSwallowsExceptionAndLogs() throws IOException { + Logger log = Mockito.mock(Logger.class); + IOException failure = new IOException("close failed"); + Closeable closeable = Mockito.mock(Closeable.class); + Mockito.doThrow(failure).when(closeable).close(); + + // Should not propagate the exception thrown by close() + ClientUtils.quiteClose(closeable, log); + + Mockito.verify(closeable).close(); + Mockito.verify(log).warn(Mockito.contains("Failed to close object"), Mockito.eq(failure)); + } + + @Test(groups = {"unit"}) + public void testQuiteCloseClosesSuccessfully() throws IOException { + Logger log = Mockito.mock(Logger.class); + Closeable closeable = Mockito.mock(Closeable.class); + + ClientUtils.quiteClose(closeable, log); + + Mockito.verify(closeable).close(); + Mockito.verifyNoInteractions(log); + } + + @Test(groups = {"unit"}) + public void testQuiteCloseWithNull() { + Logger log = Mockito.mock(Logger.class); + + // Should be a no-op and not throw on a null closeable + ClientUtils.quiteClose(null, log); + + Mockito.verifyNoInteractions(log); + Assert.assertTrue(true); + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java new file mode 100644 index 000000000..113304e6c --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java @@ -0,0 +1,326 @@ +package com.clickhouse.client.api.transport; + + +import com.clickhouse.client.BaseIntegrationTest; +import com.clickhouse.client.ClickHouseServerForTest; +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientFaultCause; +import com.clickhouse.client.api.ServerException; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.metadata.TableSchema; +import com.clickhouse.client.api.query.QueryResponse; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.ConsoleNotifier; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.clickhouse.data.ClickHouseColumn; +import com.clickhouse.data.ClickHouseFormat; +import org.apache.hc.core5.http.HttpStatus; +import org.testcontainers.utility.ThrowingFunction; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; + +@Test(groups = {"integration"}) +public class TransportBaseTests extends BaseIntegrationTest { + + private static final String RETRYABLE_BODY = + "Code: 202. DB::Exception: Too many simultaneous queries. (TOO_MANY_SIMULTANEOUS_QUERIES)"; + private static final int RETRYABLE_CODE = 202; + + private static final String NON_RETRYABLE_BODY = + "Code: 62. DB::Exception: Syntax error. (SYNTAX_ERROR)"; + private static final int NON_RETRYABLE_CODE = 62; + + private WireMockServer startMockServer() { + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + return mockServer; + } + + private Client mockServerClient(WireMockServer mockServer, int maxRetries, ClientFaultCause... retryOn) { + Client.Builder builder = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressClientRequest(false) + .compressServerResponse(false) + .setMaxRetries(maxRetries); + if (retryOn.length > 0) { + builder.retryOnFailures(retryOn); + } + return builder.build(); + } + + /** + * A retryable server error on the first attempt should be retried and the following successful + * response should be returned. Covers the retry branch of the operation loop + * ({@code shouldRetry(...)} == true) and recovery on the next node. + */ + @Test(groups = {"integration"}, dataProvider = "operationProvider") + public void testRetriesAndSucceedsAfterRetryableServerError(String operation, + ThrowingFunction function) { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = startMockServer(); + + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Retry") + .whenScenarioStateIs(STARTED) + .willSetStateTo("Recovered") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE) + .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE)) + .withBody(RETRYABLE_BODY)).build()); + + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Retry") + .whenScenarioStateIs("Recovered") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + try (Client client = mockServerClient(mockServer, 1)) { + function.apply(client); + } catch (Exception e) { + Assert.fail("[" + operation + "] should have recovered after a retry", e); + } finally { + mockServer.stop(); + } + + Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 2, + "[" + operation + "] expected one failed attempt followed by a successful retry"); + } + + /** + * When the server keeps returning a retryable error the operation should be retried + * {@code maxRetries} times and then fail by re-throwing the last captured exception. + * Covers the final throw of the operation loop ({@code throw lastException}). + */ + @Test(groups = {"integration"}, dataProvider = "operationProvider") + public void testThrowsAfterExhaustingRetries(String operation, + ThrowingFunction function) { + if (isCloud()) { + return; // mocked server + } + + int maxRetries = 2; + WireMockServer mockServer = startMockServer(); + + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE) + .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE)) + .withBody(RETRYABLE_BODY)).build()); + + try (Client client = mockServerClient(mockServer, maxRetries)) { + function.apply(client); + Assert.fail("[" + operation + "] expected exception after exhausting retries"); + } catch (ServerException e) { + Assert.assertEquals(e.getCode(), RETRYABLE_CODE); + } catch (Exception e) { + Assert.fail("[" + operation + "] unexpected exception type", e); + } finally { + mockServer.stop(); + } + + Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), maxRetries + 1, + "[" + operation + "] expected initial attempt plus " + maxRetries + " retries"); + } + + /** + * A non-retryable server error should be re-thrown immediately without consuming any of the + * configured retries. Covers the {@code else { throw lastException; }} branch. + */ + @Test(groups = {"integration"}, dataProvider = "operationProvider") + public void testThrowsImmediatelyWhenNotRetryable(String operation, + ThrowingFunction function) { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = startMockServer(); + + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_BAD_REQUEST) + .withHeader("X-ClickHouse-Exception-Code", String.valueOf(NON_RETRYABLE_CODE)) + .withBody(NON_RETRYABLE_BODY)).build()); + + try (Client client = mockServerClient(mockServer, 3)) { + function.apply(client); + Assert.fail("[" + operation + "] expected exception for non-retryable error"); + } catch (ServerException e) { + Assert.assertEquals(e.getCode(), NON_RETRYABLE_CODE); + } catch (Exception e) { + Assert.fail("[" + operation + "] unexpected exception type", e); + } finally { + mockServer.stop(); + } + + Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 1, + "[" + operation + "] a non-retryable error must not be retried"); + } + + /** + * When retries are disabled ({@link ClientFaultCause#None}) even an otherwise retryable error + * must be re-thrown on the first attempt. Covers the {@code else { throw lastException; }} + * branch when {@code shouldRetry(...)} returns {@code false} because of the configuration. + */ + @Test(groups = {"integration"}, dataProvider = "operationProvider") + public void testRetriesDisabledThrowsImmediately(String operation, + ThrowingFunction function) { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = startMockServer(); + + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE) + .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE)) + .withBody(RETRYABLE_BODY)).build()); + + try (Client client = mockServerClient(mockServer, 3, ClientFaultCause.None)) { + function.apply(client); + Assert.fail("[" + operation + "] expected exception when retries are disabled"); + } catch (ServerException e) { + Assert.assertEquals(e.getCode(), RETRYABLE_CODE); + } catch (Exception e) { + Assert.fail("[" + operation + "] unexpected exception type", e); + } finally { + mockServer.stop(); + } + + Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 1, + "[" + operation + "] retries are disabled, no retry expected"); + } + + /** + * When no query id is supplied by the caller but a generator is configured, the operation must + * use the generated id. Covers the + * {@code if (requestSettings.getQueryId() == null && queryIdGenerator != null)} block that is + * present in every operation path. + */ + @Test(groups = {"integration"}, dataProvider = "operationProvider") + public void testGeneratedQueryIdIsUsedWhenNotSet(String operation, + ThrowingFunction function) { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = startMockServer(); + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + String generatedId = "generated-" + UUID.randomUUID(); + AtomicInteger generatorCalls = new AtomicInteger(); + Supplier idGenerator = () -> { + generatorCalls.incrementAndGet(); + return generatedId; + }; + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressClientRequest(false) + .compressServerResponse(false) + .setQueryIdGenerator(idGenerator) + .build()) { + function.apply(client); + } catch (Exception e) { + mockServer.stop(); + Assert.fail("[" + operation + "] operation should succeed", e); + return; + } + + try { + Assert.assertTrue(generatorCalls.get() >= 1, + "[" + operation + "] query id generator should have been invoked"); + mockServer.verify(WireMock.postRequestedFor(WireMock.anyUrl()) + .withQueryParam("query_id", WireMock.equalTo(generatedId))); + } finally { + mockServer.stop(); + } + } + + /** + * Provides one {@link ThrowingFunction} per retry-loop code path in {@link Client}, so every + * failure-handling test exercises all of them: + *
    + *
  • {@code query} - {@link Client#query(String)}
  • + *
  • {@code insert-stream} - {@link Client#insert(String, java.io.InputStream, ClickHouseFormat)}
  • + *
  • {@code insert-pojo} - {@link Client#insert(String, java.util.List)}
  • + *
+ */ + @DataProvider(name = "operationProvider") + public static Object[][] operationProvider() { + ThrowingFunction queryFunction = (client) -> { + try (QueryResponse response = client.query("SELECT timezone()").get(30, TimeUnit.SECONDS)) { + return null; + } + }; + + ThrowingFunction streamInsertFunction = (client) -> { + try (InsertResponse response = client.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV) + .get(30, TimeUnit.SECONDS)) { + return null; + } + }; + + ThrowingFunction pojoInsertFunction = (client) -> { + client.register(InsertablePojo.class, new TableSchema("table01", null, "default", + Collections.singletonList(ClickHouseColumn.of("id", "Int32")))); + try (InsertResponse response = client.insert("table01", + Collections.singletonList(new InsertablePojo(1))) + .get(30, TimeUnit.SECONDS)) { + return null; + } + }; + + return new Object[][]{ + {"query", queryFunction}, + {"insert-stream", streamInsertFunction}, + {"insert-pojo", pojoInsertFunction} + }; + } + + public static class InsertablePojo { + private int id; + + public InsertablePojo() { + } + + public InsertablePojo(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index 6ffdfea5e..cf9eb057b 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -13,6 +13,7 @@ import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.insert.InsertSettings; +import com.clickhouse.client.api.internal.ClientUtils; import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.client.api.metrics.ClientMetrics; @@ -140,6 +141,36 @@ public void insertSimplePOJOs() throws Exception { assertEquals(response.getQueryId(), uuid); } + @Test(groups = { "integration" }, enabled = true) + public void testInsertResponseStats() throws Exception { + String tableName = "insert_response_stats_table"; + String createSQL = SamplePOJO.generateTableCreateSQL(tableName); + String uuid = UUID.randomUUID().toString(); + + initTable(tableName, createSQL); + + client.register(SamplePOJO.class, client.getTableSchema(tableName)); + List simplePOJOs = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + simplePOJOs.add(new SamplePOJO()); + } + + settings.setQueryId(uuid); + try (InsertResponse response = client.insert(tableName, simplePOJOs, settings).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS)) { + assertEquals(response.getWrittenRows(), simplePOJOs.size()); + assertTrue(response.getWrittenBytes() > 0, "written bytes should be positive"); + // server reads back the inserted rows to write them, so read counters are populated + assertTrue(response.getReadRows() >= 0, "read rows should be non-negative"); + assertTrue(response.getReadBytes() >= 0, "read bytes should be non-negative"); + assertTrue(response.getServerTime() >= 0, "server time should be non-negative"); + assertTrue(response.getResultRows() >= 0, "result rows should be non-negative"); + assertEquals(response.getQueryId(), uuid); + Assert.assertNotNull(response.getMetrics(), "metrics should be present"); + Assert.assertTrue(ClientUtils.isNotBlank(response.getServerDisplayName())); + Assert.assertNotNull(response.getResponseHeaders(), "response headers should be present"); + } + } + @Test(groups = { "integration" }, enabled = true) public void insertPOJOWithJSON() throws Exception { if (isCloud()) { diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index d9db0d0ce..94a4c490b 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -11,6 +11,7 @@ import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.ServerException; import com.clickhouse.client.api.command.CommandSettings; +import com.clickhouse.client.api.http.ClickHouseHttpProto; import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader; import com.clickhouse.client.api.enums.Protocol; @@ -1466,6 +1467,54 @@ public void testQueryMetrics() throws Exception { } } + @Test(groups = {"integration"}) + public void testQueryResponseGettersAndHeaders() throws Exception { + String uuid = UUID.randomUUID().toString(); + QuerySettings settings = new QuerySettings() + .setFormat(ClickHouseFormat.TabSeparated) + .waitEndOfQuery(true) + .setQueryId(uuid); + + try (QueryResponse response = client.query("SELECT number FROM system.numbers LIMIT 10", settings).get()) { + // Format getter + Assert.assertEquals(response.getFormat(), ClickHouseFormat.TabSeparated); + + // Metrics getters + Assert.assertNotNull(response.getMetrics(), "metrics should be present"); + Assert.assertEquals(response.getReadRows(), 10); + Assert.assertTrue(response.getReadBytes() > 0, "read bytes should be positive"); + Assert.assertTrue(response.getWrittenRows() >= 0, "written rows should be non-negative"); + Assert.assertTrue(response.getWrittenBytes() >= 0, "written bytes should be non-negative"); + Assert.assertTrue(response.getServerTime() >= 0, "server time should be non-negative"); + Assert.assertTrue(response.getResultRows() > 0, "result rows should be positive"); + Assert.assertTrue(response.getTotalRowsToRead() >= 0, "total rows to read should be non-negative"); + Assert.assertEquals(response.getQueryId(), uuid); + + // Timezone and settings getters + Assert.assertNotNull(response.getTimeZone(), "server timezone should be resolved from response header"); + Assert.assertNotNull(response.getSettings(), "settings should be present"); + + // Server display name getter + Assert.assertNotNull(response.getServerDisplayName(), "server display name header should be present"); + + // All whitelisted response headers must be returned + Map headers = response.getResponseHeaders(); + Assert.assertNotNull(headers, "response headers map should be present"); + Assert.assertEquals(headers.get(ClickHouseHttpProto.HEADER_QUERY_ID), uuid, + "query id header should round-trip"); + Assert.assertNotNull(headers.get(ClickHouseHttpProto.HEADER_TIMEZONE), + "timezone header should be present"); + Assert.assertNotNull(headers.get(ClickHouseHttpProto.HEADER_FORMAT), + "format header should be present"); + Assert.assertNotNull(headers.get(ClickHouseHttpProto.HEADER_SRV_DISPLAY_NAME), + "server display name header should be present"); + Assert.assertNotNull(headers.get(ClickHouseHttpProto.HEADER_SRV_SUMMARY), + "summary header should be present"); + Assert.assertEquals(headers.get(ClickHouseHttpProto.HEADER_SRV_DISPLAY_NAME), + response.getServerDisplayName(), "getter should match header map value"); + } + } + protected final static List DATASET_COLUMNS = Arrays.asList( "col1 UInt32", "col2 Int32", From 47a49658d9ab1ce32fdfdeef611edbcd20583d38 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Fri, 26 Jun 2026 08:20:33 -0700 Subject: [PATCH 3/8] Added cancelation tests --- .../clickhouse/client/HttpTransportTests.java | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 6d1a27010..4f7953d81 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -21,13 +21,19 @@ import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.insert.InsertSettings; import com.clickhouse.client.api.internal.DataTypeConverter; +import com.clickhouse.client.api.internal.HttpAPIClientHelper; import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.internal.ValidationUtils; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; +import com.clickhouse.client.api.transport.Endpoint; +import com.clickhouse.client.api.transport.HttpEndpoint; +import com.clickhouse.client.api.transport.internal.TransportRequest; +import com.clickhouse.client.api.transport.internal.TransportResponse; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.data.ClickHouseFormat; +import net.jpountz.lz4.LZ4Factory; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.common.ConsoleNotifier; @@ -64,6 +70,7 @@ import javax.net.ssl.SSLHandshakeException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.StringWriter; import java.math.BigInteger; import java.net.InetAddress; @@ -102,11 +109,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.zip.GZIPOutputStream; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @Test(groups = {"integration"}) @@ -2532,4 +2544,173 @@ private static String toPem(X509Certificate certificate) throws Exception { } return stringWriter.toString(); } + + /** + * Exercises {@code HttpAPIClientHelper.TransportRequestImpl#cancel()}, which is not reachable through the + * high-level {@link Client} API. A {@link HttpAPIClientHelper} is created directly and used to start a request + * for an effectively endless stream (reading from {@code system.numbers}) to emulate a long-running query. + * The test verifies the whole life cycle against the server: + *
    + *
  1. the query is observed running on the server ({@code system.processes});
  2. + *
  3. the in-flight request is cancelled from another thread and the reader stops with an error;
  4. + *
  5. the query is no longer running on the server;
  6. + *
  7. {@code system.query_log} contains a record proving the query was interrupted (not finished).
  8. + *
+ */ + @Test(groups = {"integration"}) + @SuppressWarnings("java:S2925") + public void testTransportRequestCancel() throws Exception { + if (isCloud()) { + return; // relies on direct transport access and local system tables (processes / query_log) + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String queryId = "transport-cancel-" + UUID.randomUUID(); + + Map configuration = new HashMap<>(); + configuration.put(ClientConfigProperties.USER.getKey(), "default"); + configuration.put(ClientConfigProperties.PASSWORD.getKey(), ClickHouseServerForTest.getPassword()); + configuration.put(ClientConfigProperties.DATABASE.getKey(), ClickHouseServerForTest.getDatabase()); + configuration.put(ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), Boolean.FALSE); + configuration.put(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), Boolean.FALSE); + + HttpAPIClientHelper helper = new HttpAPIClientHelper(new HashMap<>(configuration), null, false, + LZ4Factory.fastestInstance()); + + try (Client verifyClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, server.getHost(), server.getPort(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .compressClientRequest(false) + .compressServerResponse(false) + .build()) { + + Endpoint endpoint = new HttpEndpoint(server.getHost(), server.getPort(), false, "/"); + + Map requestConfig = new HashMap<>(configuration); + requestConfig.put(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), ClickHouseFormat.TSV); + requestConfig.put(ClientConfigProperties.QUERY_ID.getKey(), queryId); + + // Endless result set so the query stays active on the server until the request is cancelled. + TransportRequest request = helper.createRequest(endpoint, requestConfig, + "SELECT number FROM system.numbers"); + + AtomicBoolean readStarted = new AtomicBoolean(false); + AtomicLong bytesRead = new AtomicLong(0); + AtomicReference readError = new AtomicReference<>(); + + Thread reader = new Thread(() -> { + long start = System.currentTimeMillis(); + try (TransportResponse response = helper.executeRequest(request); + InputStream in = response.createDataInputStream()) { + byte[] buffer = new byte[8192]; + int read; + while ((read = in.read(buffer)) != -1) { + readStarted.set(true); + bytesRead.addAndGet(read); + // Safety valve so the test can never hang on the endless stream if cancel() fails. + if (System.currentTimeMillis() - start > 30_000) { + break; + } + } + } catch (Throwable t) { + readStarted.set(true); + readError.set(t); + } + }, "transport-request-reader"); + reader.start(); + + // 1. The query must actually be running on the server. + Assert.assertTrue(waitForCondition(() -> isQueryRunning(verifyClient, queryId), 20_000), + "query was not observed running on the server (query_id=" + queryId + ")"); + Assert.assertNull(readError.get(), "reading should still be in progress while the query runs"); + + // 2. Cancel the in-flight request from a separate thread. + AtomicBoolean cancelled = new AtomicBoolean(false); + AtomicReference cancelError = new AtomicReference<>(); + Thread canceller = new Thread(() -> { + try { + cancelled.set(request.cancel()); + } catch (Throwable t) { + cancelError.set(t); + } + }, "transport-request-canceller"); + canceller.start(); + canceller.join(5_000); + + Assert.assertNull(cancelError.get(), "cancel() must not throw"); + Assert.assertTrue(cancelled.get(), "cancel() should report the request as cancelled"); + + // The reader must stop with an error once the underlying connection is aborted. + reader.join(15_000); + Assert.assertFalse(reader.isAlive(), "reading must stop after the request was cancelled"); + Assert.assertNotNull(readError.get(), + "reading a cancelled request must fail, but read " + bytesRead.get() + " bytes"); + + // 3. The server must stop running the query shortly after the client disconnects. + Assert.assertTrue(waitForCondition(() -> !isQueryRunning(verifyClient, queryId), 20_000), + "query is still running on the server after cancellation (query_id=" + queryId + ")"); + + // 4. The query_log must show the query was interrupted instead of finishing successfully. + assertQueryWasInterrupted(verifyClient, queryId); + assertTrue(request.cancel()); + } finally { + helper.close(); + } + } + + private static boolean isQueryRunning(Client client, String queryId) { + List rows = client.queryAll( + "SELECT count() AS c FROM system.processes WHERE query_id = '" + queryId + "'"); + return !rows.isEmpty() && rows.get(0).getLong("c") > 0; + } + + private static boolean waitForCondition(Supplier condition, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < deadline) { + try { + if (Boolean.TRUE.equals(condition.get())) { + return true; + } + } catch (Exception ignore) { + // transient query failures (e.g. server busy) are retried until the timeout elapses + } + Thread.sleep(100); + } + return false; + } + + private static void assertQueryWasInterrupted(Client client, String queryId) throws InterruptedException { + long deadline = System.currentTimeMillis() + 30_000; + String seenTypes = ""; + while (System.currentTimeMillis() < deadline) { + client.queryAll("SYSTEM FLUSH LOGS"); + List rows = client.queryAll( + "SELECT toString(type) AS type, exception_code FROM system.query_log " + + "WHERE query_id = '" + queryId + "' AND event_date >= today() - 1"); + + StringBuilder types = new StringBuilder(); + for (GenericRecord row : rows) { + String type = row.getString("type"); + int exceptionCode = row.getInteger("exception_code"); + types.append(type).append('(').append(exceptionCode).append(") "); + + Assert.assertNotEquals(type, "QueryFinish", + "query unexpectedly finished successfully instead of being interrupted (query_id=" + + queryId + ")"); + if ("ExceptionWhileProcessing".equals(type)) { + Assert.assertNotEquals(exceptionCode, 0, + "an interrupted query must be logged with a non-zero exception code (query_id=" + + queryId + ")"); + return; // found the proof the query was interrupted on the server + } + } + seenTypes = types.toString(); + Thread.sleep(250); + } + Assert.fail("no query_log record proving the query was interrupted (query_id=" + queryId + + ", seen types: [" + seenTypes + "])"); + } } From 9943d8d8ba8229fe3326d9a8c068d2c3a62253b7 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Fri, 26 Jun 2026 19:29:49 -0700 Subject: [PATCH 4/8] Implemented cancellation method in client and added tests --- .../com/clickhouse/client/api/Client.java | 47 ++++- .../api/internal/HttpAPIClientHelper.java | 10 +- .../transport/internal/TransportRequest.java | 5 +- .../api/transport/TransportBaseTests.java | 188 +++++++++++++++++- 4 files changed, 243 insertions(+), 7 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 6f0a56be7..58ce18f01 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.ref.WeakReference; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.ZoneId; @@ -135,6 +136,8 @@ public class Client implements AutoCloseable { private final Map> typeHintMapping; + private final ConcurrentHashMap> ongoingRequests = new ConcurrentHashMap<>(); + // Server context private String dbUser; private String serverVersion; @@ -1397,6 +1400,9 @@ public CompletableFuture insert(String tableName, List data, } out.close(); }); + + registerTransportReq(requestSettings.getQueryId(), transportRequest); + try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { ClientStatisticsHolder clientStats = globalClientStats.remove(operationId); OperationMetrics metrics = completeOperation(transportResponse, clientStats, requestSettings.getQueryId()); @@ -1405,7 +1411,7 @@ public CompletableFuture insert(String tableName, List data, } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && !wasPrevRequestCanceled(requestSettings.getQueryId())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); } else { @@ -1593,6 +1599,7 @@ public CompletableFuture insert(String tableName, writer.onOutput(out); out.close(); }); + registerTransportReq(requestSettings.getQueryId(), transportRequest); try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { OperationMetrics metrics = completeOperation(transportResponse, finalClientStats, requestSettings.getQueryId()); @@ -1600,7 +1607,7 @@ public CompletableFuture insert(String tableName, } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && !wasPrevRequestCanceled(requestSettings.getQueryId())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); } else { @@ -1704,6 +1711,7 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map query(String sqlQuery, Map(tr)); + } + } + + private boolean wasPrevRequestCanceled(String queryId) { + if (queryId != null) { + WeakReference trRef = ongoingRequests.get(queryId); + TransportRequest tr = trRef.get(); + return tr != null && tr.isCancelled(); + } + return false; + } + public CompletableFuture query(String sqlQuery, Map queryParams) { return query(sqlQuery, queryParams, null); } @@ -2243,6 +2267,23 @@ public void updateAccessToken(String accessToken) { this.credentialsManager.setAccessToken(accessToken); } + /** + * Note: It is recommended to use operation timeout settings instead of this method. + * Tries to cancel ongoing request. This method cancels IO operations but doesn't + * kill query on server side. Original queryId should be used to cancel the request. + * + * @param queryId - original query id that was passed in operation settings. + */ + public void cancelRequest(String queryId) { + WeakReference reqRef = ongoingRequests.get(queryId); + if (reqRef != null) { + TransportRequest req = reqRef.get(); + if (req != null) { + req.cancel(); + } + } + } + private Endpoint getNextAliveNode() { return endpoints.get(0); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index c8c78e539..699215998 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -103,6 +103,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Function; @@ -536,6 +537,7 @@ private HttpPost createPostRequest(URI uri, Map requestConfig) { private static final class TransportRequestImpl implements TransportRequest { private final HttpPost delegate; private final Map config; + private final AtomicBoolean cancelled = new AtomicBoolean(false); TransportRequestImpl(HttpPost delegate, Map config) { this.delegate = delegate; @@ -543,13 +545,19 @@ private static final class TransportRequestImpl implements TransportRequest { } @Override - public boolean cancel() throws Exception { + public boolean cancel() { + cancelled.set(true); if (delegate.isCancelled()) { return true; } return delegate.cancel(); } + @Override + public boolean isCancelled() { + return cancelled.get(); + } + @Override public Map getConfig() { return config; diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java index 6a1a66401..06ef4875e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java @@ -30,7 +30,8 @@ public interface TransportRequest { * times. * * @return result of operation. True if request was canceled for sure. False when result cannot be known. - * @throws Exception - when something extraordinary happens while canceling the request. */ - boolean cancel() throws Exception; + boolean cancel(); + + boolean isCancelled(); } diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java index 113304e6c..f122ccf47 100644 --- a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java @@ -2,14 +2,19 @@ import com.clickhouse.client.BaseIntegrationTest; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseServerForTest; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ServerException; import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.insert.InsertSettings; import com.clickhouse.client.api.metadata.TableSchema; +import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; +import com.clickhouse.client.api.query.QuerySettings; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.common.ConsoleNotifier; @@ -23,10 +28,15 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; @@ -239,7 +249,7 @@ public void testGeneratedQueryIdIsUsedWhenNotSet(String operation, }; try (Client client = new Client.Builder() - .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) .setUsername("default") .setPassword(ClickHouseServerForTest.getPassword()) .compressClientRequest(false) @@ -305,6 +315,182 @@ public static Object[][] operationProvider() { }; } + /** + * Exercises {@link Client#cancelRequest(String)} for every combination of + * {@code {query, insert} x {sync, async}} operations. A long-running operation is started against the + * real server with a known {@code query_id} and is interrupted from the test thread while it is still + * in progress. The whole life cycle is verified: + *
    + *
  1. the operation is observed running on the server ({@code system.processes});
  2. + *
  3. {@link Client#cancelRequest(String)} aborts the in-flight request and the operation fails;
  4. + *
  5. the query is no longer running on the server.
  6. + *
+ */ + @Test(groups = {"integration"}, dataProvider = "cancelRequestProvider") + @SuppressWarnings("java:S2925") + public void testCancelRequest(String name, boolean async, boolean isInsert) throws Exception { + if (isCloud()) { + return; // relies on local system tables (processes / query_log) + } + + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + String queryId = "client-cancel-" + UUID.randomUUID(); + // Fully qualified so the table created via runQuery (default database) matches the one the client + // inserts into (client default database is the test database). + String table = ClickHouseServerForTest.getDatabase() + ".client_cancel_" + + UUID.randomUUID().toString().replace('-', '_'); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, server.getHost(), server.getPort(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .compressClientRequest(false) + .compressServerResponse(false) + .useAsyncRequests(async) + .build()) { + + AtomicReference opError = new AtomicReference<>(); + AtomicBoolean opFinished = new AtomicBoolean(false); + + Runnable operation; + if (isInsert) { + Assert.assertTrue(runQuery("CREATE TABLE " + table + + " (number UInt64) ENGINE = MergeTree ORDER BY number"), "[" + name + "] failed to create table"); + operation = () -> { + // Endless input stream so the insert stays active on the server until the request is cancelled. + try (InsertResponse response = client.insert(table, endlessTsvStream(), ClickHouseFormat.TSV, + new InsertSettings().setQueryId(queryId)).get(35, TimeUnit.SECONDS)) { + opFinished.set(true); + } catch (Throwable t) { + opError.set(t); + } + }; + } else { + operation = () -> { + // Endless result set so the query stays active on the server until the request is cancelled. + try (QueryResponse response = client.query("SELECT number FROM system.numbers", + new QuerySettings().setQueryId(queryId)).get(35, TimeUnit.SECONDS); + InputStream in = response.getInputStream()) { + byte[] buffer = new byte[8192]; + long start = System.currentTimeMillis(); + while (in.read(buffer) != -1) { + // Safety valve so the test can never hang on the endless stream if cancel() fails. + if (System.currentTimeMillis() - start > 30_000) { + break; + } + } + opFinished.set(true); + } catch (Throwable t) { + opError.set(t); + } + }; + } + + Thread worker = new Thread(operation, "client-cancel-" + name); + worker.start(); + + // 1. The operation must actually be running on the server. + Assert.assertTrue(waitForCondition(() -> isQueryRunning(client, queryId), 20_000), + "[" + name + "] operation was not observed running on the server (query_id=" + queryId + + ", opError=" + opError.get() + ")"); + Assert.assertNull(opError.get(), "[" + name + "] operation should still be in progress while it runs"); + + // 2. Cancel the in-flight request through the high-level Client API. The request is only weakly + // referenced by the client, so cancellation is reissued in a short loop to land independently of GC + // timing while the operation is still in progress. + cancelUntilStopped(client, queryId, worker, 20_000); + Assert.assertFalse(worker.isAlive(), "[" + name + "] operation must stop after the request was cancelled"); + Assert.assertFalse(opFinished.get(), "[" + name + "] a cancelled operation must not complete successfully"); + Assert.assertNotNull(opError.get(), "[" + name + "] a cancelled operation must fail with an error"); + + // 3. The server must stop running the query shortly after the client disconnects. + Assert.assertTrue(waitForCondition(() -> !isQueryRunning(client, queryId), 20_000), + "[" + name + "] query is still running on the server after cancellation (query_id=" + queryId + ")"); + } finally { + runQuery("DROP TABLE IF EXISTS " + table); + } + } + + @DataProvider(name = "cancelRequestProvider") + public static Object[][] cancelRequestProvider() { + return new Object[][]{ + {"query-sync", false, false}, + {"query-async", true, false}, + {"insert-sync", false, true}, + {"insert-async", true, true} + }; + } + + /** + * Reissues {@link Client#cancelRequest(String)} until the worker stops or the timeout elapses. The request + * is held by the client only through a {@link java.lang.ref.WeakReference}, so retrying makes the test + * robust against an unlucky GC clearing the reference between attempts. + */ + private static void cancelUntilStopped(Client client, String queryId, Thread worker, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + do { + client.cancelRequest(queryId); + worker.join(200); + } while (worker.isAlive() && System.currentTimeMillis() < deadline); + } + + /** + * Produces an effectively endless stream of well-formed single-column TSV rows in 64 KiB chunks. The chunk + * size is large enough to flush through the client's network buffer so the insert is actually observed + * running on the server, while the short sleep keeps the request in progress without flooding it. + */ + private static InputStream endlessTsvStream() { + final byte[] row = "1\n".getBytes(StandardCharsets.US_ASCII); + final byte[] chunk = new byte[64 * 1024]; + for (int i = 0; i < chunk.length; i++) { + chunk[i] = row[i % row.length]; + } + return new InputStream() { + @Override + public int read() { + byte[] single = new byte[1]; + return read(single, 0, 1) == -1 ? -1 : (single[0] & 0xFF); + } + + @Override + public int read(byte[] b, int off, int len) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return -1; + } + int n = Math.min(len, chunk.length); + System.arraycopy(chunk, 0, b, off, n); + return n; + } + }; + } + + private static boolean isQueryRunning(Client client, String queryId) { + List rows = client.queryAll( + "SELECT count() AS c FROM system.processes WHERE query_id = '" + queryId + "'"); + return !rows.isEmpty() && rows.get(0).getLong("c") > 0; + } + + private static boolean waitForCondition(Supplier condition, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < deadline) { + try { + if (Boolean.TRUE.equals(condition.get())) { + return true; + } + } catch (Exception ignore) { + // transient query failures (e.g. server busy) are retried until the timeout elapses + } + Thread.sleep(100); + } + return false; + } + public static class InsertablePojo { private int id; From ea05afdd7f4f1662e567e07341f242b91500829f Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 29 Jun 2026 16:43:02 -0700 Subject: [PATCH 5/8] Fixed issue with 503 being not handled. Reorganized code handing status code to be more certain about codes --- .../com/clickhouse/client/api/Client.java | 2 +- .../client/api/internal/ClientUtils.java | 3 +- .../api/internal/HttpAPIClientHelper.java | 67 ++++++++----- .../transport/internal/TransportResponse.java | 16 ---- .../clickhouse/client/HttpTransportTests.java | 96 +++++++++++++++++++ .../client/api/internal/ClientUtilsTest.java | 12 +-- 6 files changed, 146 insertions(+), 50 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 6f0a56be7..672f357f4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -1716,7 +1716,7 @@ public CompletableFuture query(String sqlQuery, Map 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) : @@ -608,11 +605,6 @@ private static final class TransportResponseImpl implements TransportResponse { this.delegate = delegate; } - @Override - public int getStatusCode() { - return 0; - } - @Override public ClickHouseFormat getDataFormat() { Header formatHeader = delegate.getFirstHeader(ClickHouseHttpProto.HEADER_FORMAT); @@ -632,6 +624,7 @@ public String getQueryId() { } @Override + @SuppressWarnings("unchecked") public T getDelegate() { return (T) delegate; } @@ -680,6 +673,7 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt doPoolVent(); ClassicHttpResponse httpResponse = null; + boolean closeResponse = true; HttpContext context = createRequestHttpContext(requestConfig); try { httpResponse = httpClient.executeOpen(null, req, context); @@ -688,35 +682,58 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt httpResponse.getCode(), requestConfig)); - if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { - throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings."); - } else if (httpResponse.getCode() == HttpStatus.SC_BAD_GATEWAY) { - httpResponse.close(); - throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); - } else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { - try { - throw readError(req, httpResponse); - } finally { - httpResponse.close(); - } + if (httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { + throw readError(req, httpResponse); } - return httpResponse; + int statusCode = httpResponse.getCode(); + switch (statusCode) { + case HttpStatus.SC_OK: + closeResponse = false; + return httpResponse; + case HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED: + throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings."); + case HttpStatus.SC_BAD_GATEWAY: + throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); + case HttpStatus.SC_SERVICE_UNAVAILABLE: + throw new ServerException(0, "Server returned '503 Service Unavailable'. Check network settings.", + HttpStatus.SC_SERVICE_UNAVAILABLE, getQueryId(httpResponse, req)); + case HttpStatus.SC_BAD_REQUEST: + case HttpStatus.SC_UNAUTHORIZED: + case HttpStatus.SC_FORBIDDEN: + case HttpStatus.SC_SERVER_ERROR: + case HttpStatus.SC_NOT_FOUND: + // ClickHouse usually uses SC_BAD_REQUEST and SC_SERVER_ERROR to return error. + // SC_UNAUTHORIZED, SC_FORBIDDEN is for authentication + // SC_NOT_FOUND can be returned by ClickHouse when path doesn't match database, but also by proxy + // others we cannot handle properly + throw readError(req, httpResponse); + default: + throw new ClientException("Unexpected result status " + statusCode); + } } catch (UnknownHostException e) { - ClientUtils.quiteClose(httpResponse, LOG); LOG.warn("Host '{}' unknown", req.getAuthority()); throw e; } catch (ConnectException | NoRouteToHostException e) { - ClientUtils.quiteClose(httpResponse, LOG); LOG.warn("Failed to connect to '{}': {}", req.getAuthority(), e.getMessage()); throw e; } catch (Exception e) { - ClientUtils.quiteClose(httpResponse, LOG); LOG.debug("Failed to execute request to '{}': {}", req.getAuthority(), e.getMessage(), e); throw e; + } finally { + if (closeResponse) { + ClientUtils.quietClose(httpResponse, LOG); + } } } + private String getQueryId(HttpResponse httpResponse, HttpPost httpRequest) { + final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final Header clientQueryIdHeader = httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null); + return queryHeader == null ? "" : queryHeader.getValue(); + } + private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); private void addHeaders(HttpPost req, Map requestConfig) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java index 32fe89465..beecea34d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java @@ -8,22 +8,6 @@ public interface TransportResponse extends Closeable { - /** - * Transport status code translated to one of values: - *
    - *
  • 503 - for service unavailable
  • - *
  • 500 - server error
  • - *
  • 400 - user error
  • - *
  • 404 - endpoint not found
  • - *
  • 403 - access not granted
  • - *
  • 401 - no authentication information
  • - *
  • 200 - ok
  • - *
- * @return integer value of status code - */ - int getStatusCode(); - - /** * Data format returned by server or calculated other way * @return data format diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 4f7953d81..2d8a8fb72 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -2114,6 +2114,102 @@ public static Object[][] testHttpStatusCompressedBodyDataProvider() { }; } + @Test(groups = {"integration"}) + public void test503ServiceUnavailableSurfacesAsServerException() throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE) + .withBody("Service Unavailable")) + .build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressServerResponse(false) + .retryOnFailures(ClientFaultCause.None) + .build()) { + + Throwable thrown = Assert.expectThrows(Throwable.class, + () -> client.query("SELECT 1").get(10, TimeUnit.SECONDS)); + + ServerException serverException = findServerException(thrown); + Assert.assertNotNull(serverException, + "Expected 503 to be reported as a ServerException, but was: " + thrown); + Assert.assertEquals(serverException.getTransportProtocolCode(), HttpStatus.SC_SERVICE_UNAVAILABLE, + "Expected transport protocol code 503, but was: " + serverException.getTransportProtocolCode()); + + Assert.assertTrue(containsMessageInCauseChain(thrown, "503 Service Unavailable"), + "Expected '503 Service Unavailable' in failure message chain, but was: " + thrown); + + Assert.assertNull(findCause(thrown, ConnectionInitiationException.class), + "503 should not be reported as a ConnectionInitiationException, but was: " + thrown); + } + } finally { + mockServer.stop(); + } + } + + @Test(groups = {"integration"}) + public void test503ServiceUnavailableIsRetried() throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + // First request fails with a retryable 503 server error + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("ServiceUnavailable") + .withRequestBody(WireMock.containing("SELECT 1")) + .whenScenarioStateIs(STARTED) + .willSetStateTo("Recovered") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE) + .withHeader("X-ClickHouse-Exception-Code", "202") + .withBody("Code: 202. DB::Exception: Too many simultaneous queries. (TOO_MANY_SIMULTANEOUS_QUERIES)")) + .build()); + + // Second request (retry) succeeds + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("ServiceUnavailable") + .withRequestBody(WireMock.containing("SELECT 1")) + .whenScenarioStateIs("Recovered") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")) + .build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressServerResponse(false) + .setMaxRetries(1) + .retryOnFailures(ClientFaultCause.ServerRetryable) + .build()) { + + try (QueryResponse response = client.query("SELECT 1").get(10, TimeUnit.SECONDS)) { + Assert.assertNotNull(response); + } + } + + mockServer.verify(2, WireMock.postRequestedFor(WireMock.anyUrl())); + } finally { + mockServer.stop(); + } + } + private byte[] fetchBinaryPayload(String query, int networkBufferSize, int timeoutSec) throws Exception { QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes); try (Client client = newClient() diff --git a/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java index 9cf10b25a..88c5d8526 100644 --- a/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java +++ b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java @@ -11,36 +11,36 @@ public class ClientUtilsTest { @Test(groups = {"unit"}) - public void testQuiteCloseSwallowsExceptionAndLogs() throws IOException { + public void testQuietCloseSwallowsExceptionAndLogs() throws IOException { Logger log = Mockito.mock(Logger.class); IOException failure = new IOException("close failed"); Closeable closeable = Mockito.mock(Closeable.class); Mockito.doThrow(failure).when(closeable).close(); // Should not propagate the exception thrown by close() - ClientUtils.quiteClose(closeable, log); + ClientUtils.quietClose(closeable, log); Mockito.verify(closeable).close(); Mockito.verify(log).warn(Mockito.contains("Failed to close object"), Mockito.eq(failure)); } @Test(groups = {"unit"}) - public void testQuiteCloseClosesSuccessfully() throws IOException { + public void testQuietCloseClosesSuccessfully() throws IOException { Logger log = Mockito.mock(Logger.class); Closeable closeable = Mockito.mock(Closeable.class); - ClientUtils.quiteClose(closeable, log); + ClientUtils.quietClose(closeable, log); Mockito.verify(closeable).close(); Mockito.verifyNoInteractions(log); } @Test(groups = {"unit"}) - public void testQuiteCloseWithNull() { + public void testQuietCloseWithNull() { Logger log = Mockito.mock(Logger.class); // Should be a no-op and not throw on a null closeable - ClientUtils.quiteClose(null, log); + ClientUtils.quietClose(null, log); Mockito.verifyNoInteractions(log); Assert.assertTrue(true); From 8ea713beaa2f9432d63fc389b51efcb551292807 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 29 Jun 2026 17:39:20 -0700 Subject: [PATCH 6/8] Added exception handling --- .../src/main/java/com/clickhouse/client/api/Client.java | 2 +- .../client/api/internal/HttpAPIClientHelper.java | 8 ++++++-- .../client/api/transport/TransportBaseTests.java | 9 +++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 63085126e..c4d3237b9 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -2274,7 +2274,7 @@ public void updateAccessToken(String accessToken) { * * @param queryId - original query id that was passed in operation settings. */ - public void cancelRequest(String queryId) { + public void cancelTransportRequest(String queryId) { WeakReference reqRef = ongoingRequests.get(queryId); if (reqRef != null) { TransportRequest req = reqRef.get(); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 53ff0b9bd..ea59d7fe6 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -27,6 +27,7 @@ import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.classic.RequestFailedException; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; @@ -727,6 +728,9 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt throw e; } catch (Exception e) { LOG.debug("Failed to execute request to '{}': {}", req.getAuthority(), e.getMessage(), e); + if (e instanceof RequestFailedException && req.isCancelled()) { + throw new TransportException("Request was cancelled on client side", e, getQueryId(httpResponse, req)); + } throw e; } finally { if (closeResponse) { @@ -736,8 +740,8 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt } private String getQueryId(HttpResponse httpResponse, HttpPost httpRequest) { - final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); - final Header clientQueryIdHeader = httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final Header serverQueryIdHeader = httpResponse == null ? null : httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final Header clientQueryIdHeader = httpResponse == null ? null : httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null); return queryHeader == null ? "" : queryHeader.getValue(); } diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java index f122ccf47..50af6b5f1 100644 --- a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java @@ -316,13 +316,13 @@ public static Object[][] operationProvider() { } /** - * Exercises {@link Client#cancelRequest(String)} for every combination of + * Exercises {@link Client#cancelTransportRequest(String)} for every combination of * {@code {query, insert} x {sync, async}} operations. A long-running operation is started against the * real server with a known {@code query_id} and is interrupted from the test thread while it is still * in progress. The whole life cycle is verified: *
    *
  1. the operation is observed running on the server ({@code system.processes});
  2. - *
  3. {@link Client#cancelRequest(String)} aborts the in-flight request and the operation fails;
  4. + *
  5. {@link Client#cancelTransportRequest(String)} aborts the in-flight request and the operation fails;
  6. *
  7. the query is no longer running on the server.
  8. *
*/ @@ -363,6 +363,7 @@ public void testCancelRequest(String name, boolean async, boolean isInsert) thro new InsertSettings().setQueryId(queryId)).get(35, TimeUnit.SECONDS)) { opFinished.set(true); } catch (Throwable t) { + t.printStackTrace(); opError.set(t); } }; @@ -423,7 +424,7 @@ public static Object[][] cancelRequestProvider() { } /** - * Reissues {@link Client#cancelRequest(String)} until the worker stops or the timeout elapses. The request + * Reissues {@link Client#cancelTransportRequest(String)} until the worker stops or the timeout elapses. The request * is held by the client only through a {@link java.lang.ref.WeakReference}, so retrying makes the test * robust against an unlucky GC clearing the reference between attempts. */ @@ -431,7 +432,7 @@ private static void cancelUntilStopped(Client client, String queryId, Thread wor throws InterruptedException { long deadline = System.currentTimeMillis() + timeoutMillis; do { - client.cancelRequest(queryId); + client.cancelTransportRequest(queryId); worker.join(200); } while (worker.isAlive() && System.currentTimeMillis() < deadline); } From 862dd4844eb2564c7dfd799869f8d71d4ca05699 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Tue, 30 Jun 2026 19:38:10 -0700 Subject: [PATCH 7/8] Fixed 503 error. Updated changelog --- CHANGELOG.md | 6 ++ .../com/clickhouse/client/api/Client.java | 56 ++++++++++++------- .../api/internal/HttpAPIClientHelper.java | 49 ++++++++-------- .../client/api/query/QueryResponse.java | 3 +- .../transport/internal/TransportRequest.java | 4 ++ .../clickhouse/client/HttpTransportTests.java | 15 +++-- .../api/transport/TransportBaseTests.java | 35 +++++------- docs/features.md | 1 + 8 files changed, 95 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 320805a55..4feee82cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,8 +27,14 @@ - **[client-v2]** `Client.Builder#useBearerTokenAuth(String)` now stores the bearer token under the `access_token` configuration key (with the `Bearer ` prefix) instead of writing it directly into `http_header_authorization`. The HTTP wire format is unchanged, but the token is no longer observable through `Client#getReadOnlyConfig()` under the `http_header_authorization` key. +- **[client-v2]** HTTP `503 Service Unavailable` responses are now surfaced as a connection-style failure (`java.net.ConnectException`) and are retried by default. Previously a `503` was treated as a server error (`ServerException`) and fell under the `ServerRetryable` fault cause. It has been moved to the `ConnectTimeout` fault cause category so that connectivity/availability failures are handled uniformly with other connection errors. Callers that specifically excluded `ServerRetryable` to avoid retrying `503` should now adjust their `client_retry_on_failures` configuration to exclude `ConnectTimeout` instead. + +- **[client-v2]** Unexpected/unknown HTTP status codes (those the client cannot interpret as a ClickHouse response) now throw a `ClientException` instead of a `ServerException`. Since the client cannot meaningfully handle these responses, they are reported as a client-side error rather than being attributed to the server. + ### New Features +- **[client-v2]** Added `Client#cancelTransportRequest(String queryId)` to cancel an in-flight request that has not yet received a response from the server, identified by the query id supplied in the operation settings. This aborts the request on the client side (cancels the underlying IO operation) but does **not** issue a `KILL QUERY` on the server, so a query that already started executing may continue to run server-side. It is recommended to use operation timeout settings where possible; this API is intended for explicitly aborting a request from the client. + - **[client-v2]** Added `Session` API to encapsulate and manage ClickHouse session settings (`session_id`, `session_check`, `session_timeout`, `session_timezone`) as a reusable object. The `Session` instance can be applied to any request settings using `applyTo()`, and session state can be cleared via `clearSession()`. Additionally, added `resetOption(String)` to `InsertSettings`, `QuerySettings`, and `CommonSettings` to allow removing specific settings. Settings explicitly set to `null` will not be sent to the server, which is useful for overriding global settings. - **[client-v2]** Added runtime credential update APIs on `Client`: `updateUserAndPassword(String, String)`, `updateAccessToken(String)`, and `updateBearerToken(String)`. Subsequent requests on the same `Client` instance use the new credentials without rebuilding the client. The authentication method is fixed at construction time; calling a runtime updater that does not match the configured method throws `ClientMisconfigurationException`. See `docs/authentication.md` for details and migration guidance. diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index c4d3237b9..bae667cac 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.ref.WeakReference; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.ZoneId; @@ -66,6 +65,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.StringJoiner; import java.util.TimeZone; @@ -114,6 +114,8 @@ public class Client implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Client.class); + private static final int REQ_REGISTRY_SIZE = 1000; // initial capacity hint for the ongoing-request registry, not a hard limit; no separate config needed. + private HttpAPIClientHelper httpClientHelper = null; private final List endpoints; @@ -136,7 +138,7 @@ public class Client implements AutoCloseable { private final Map> typeHintMapping; - private final ConcurrentHashMap> ongoingRequests = new ConcurrentHashMap<>(); + private final ConcurrentHashMap ongoingRequests = new ConcurrentHashMap<>(REQ_REGISTRY_SIZE); // Server context private String dbUser; @@ -235,7 +237,6 @@ public String getDefaultDatabase() { return (String) this.configuration.get(ClientConfigProperties.DATABASE.getKey()); } - /** * Frees the resources associated with the client. *
    @@ -1411,12 +1412,16 @@ public CompletableFuture insert(String tableName, List data, } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && !wasPrevRequestCanceled(requestSettings.getQueryId())) { + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(requestSettings.getQueryId())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); } else { throw lastException; } + } finally { + // Insert completes once the request returns; the response exposes no stream to read afterwards, + // so the request is no longer cancellable and can be unregistered. + unregisterTransportReq(requestSettings.getQueryId()); } } @@ -1607,12 +1612,16 @@ public CompletableFuture insert(String tableName, } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && !wasPrevRequestCanceled(requestSettings.getQueryId())) { + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(requestSettings.getQueryId())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); } else { throw lastException; } + } finally { + // Insert completes once the request returns; the response exposes no stream to read afterwards, + // so the request is no longer cancellable and can be unregistered. + unregisterTransportReq(requestSettings.getQueryId()); } if (i < retries) { @@ -1709,9 +1718,10 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map query(String sqlQuery, Map(tr)); + ongoingRequests.put(queryId, tr); } } - private boolean wasPrevRequestCanceled(String queryId) { + private void unregisterTransportReq(String queryId) { if (queryId != null) { - WeakReference trRef = ongoingRequests.get(queryId); - TransportRequest tr = trRef.get(); - return tr != null && tr.isCancelled(); + ongoingRequests.remove(queryId); } - return false; + } + + private boolean requestIsNotCancelled(String queryId) { + if (queryId != null) { + TransportRequest tr = ongoingRequests.get(queryId); + return tr == null || !tr.isCancelled(); + } + return true; } public CompletableFuture query(String sqlQuery, Map queryParams) { @@ -2275,12 +2295,10 @@ public void updateAccessToken(String accessToken) { * @param queryId - original query id that was passed in operation settings. */ public void cancelTransportRequest(String queryId) { - WeakReference reqRef = ongoingRequests.get(queryId); - if (reqRef != null) { - TransportRequest req = reqRef.get(); - if (req != null) { - req.cancel(); - } + Objects.requireNonNull(queryId, "queryId should be not null"); + TransportRequest req = ongoingRequests.get(queryId); + if (req != null) { + req.cancel(); } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index ea59d7fe6..6e3820dde 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -659,25 +659,9 @@ public InputStream createDataInputStream() { } public TransportResponse executeRequest(TransportRequest transportRequest) throws Exception { - return new TransportResponseImpl(doPostRequest(transportRequest.getConfig(), transportRequest.getDelegate())); - } - - public TransportRequest createRequest(Endpoint server, Map requestConfig, IOCallback writeCallback) { - final URI uri = createRequestURI(server, requestConfig, true); - final HttpPost req = createPostRequest(uri, requestConfig); - try { - String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - req.setEntity(wrapRequestEntity( - new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback), - requestConfig)); - } catch (ProtocolException e) { - throw new ClientException("failed to create request body entity", e); - } - - return new TransportRequestImpl(req, requestConfig); - } - private ClassicHttpResponse doPostRequest(Map requestConfig, HttpPost req) throws Exception { + final Map requestConfig = transportRequest.getConfig(); + final HttpPost req = transportRequest.getDelegate(); doPoolVent(); @@ -699,14 +683,13 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt switch (statusCode) { case HttpStatus.SC_OK: closeResponse = false; - return httpResponse; + return new TransportResponseImpl(httpResponse); case HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED: throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings."); case HttpStatus.SC_BAD_GATEWAY: throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); case HttpStatus.SC_SERVICE_UNAVAILABLE: - throw new ServerException(0, "Server returned '503 Service Unavailable'. Check network settings.", - HttpStatus.SC_SERVICE_UNAVAILABLE, getQueryId(httpResponse, req)); + throw new ConnectException("Server returned '503 Service Unavailable'. Check network settings."); case HttpStatus.SC_BAD_REQUEST: case HttpStatus.SC_UNAUTHORIZED: case HttpStatus.SC_FORBIDDEN: @@ -739,9 +722,24 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt } } + public TransportRequest createRequest(Endpoint server, Map requestConfig, IOCallback writeCallback) { + final URI uri = createRequestURI(server, requestConfig, true); + final HttpPost req = createPostRequest(uri, requestConfig); + try { + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + req.setEntity(wrapRequestEntity( + new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback), + requestConfig)); + } catch (ProtocolException e) { + throw new ClientException("failed to create request body entity", e); + } + + return new TransportRequestImpl(req, requestConfig); + } + private String getQueryId(HttpResponse httpResponse, HttpPost httpRequest) { final Header serverQueryIdHeader = httpResponse == null ? null : httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); - final Header clientQueryIdHeader = httpResponse == null ? null : httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); + final Header clientQueryIdHeader = httpRequest == null ? null : httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID); final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null); return queryHeader == null ? "" : queryHeader.getValue(); } @@ -1013,8 +1011,11 @@ public boolean shouldRetry(Throwable ex, Map requestSettings) { // This method wraps some client specific exceptions into specific ClientException or just ClientException // ClientException will be also wrapped public RuntimeException wrapException(String message, Exception cause, String queryId) { - if (cause instanceof ClientException || cause instanceof ServerException) { - return (RuntimeException) cause; + // Already-classified exceptions (ClientException, ServerException, ConnectionInitiationException, ...) + // are returned as-is so their specific type is preserved instead of being reboxed as a generic + // ClickHouseException. + if (cause instanceof ClickHouseException) { + return (ClickHouseException) cause; } if (cause instanceof SSLException) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java index fbece9511..595cf54b5 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java @@ -38,7 +38,8 @@ public class QueryResponse implements AutoCloseable { private final Map responseHeaders; - public QueryResponse(TransportResponse response, ClickHouseFormat format, QuerySettings settings, OperationMetrics operationMetrics) { + public QueryResponse(TransportResponse response, ClickHouseFormat format, QuerySettings settings, + OperationMetrics operationMetrics) { Objects.requireNonNull(response, "response is null"); this.transportResponse = response; this.format = format; diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java index 06ef4875e..79f656425 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java @@ -33,5 +33,9 @@ public interface TransportRequest { */ boolean cancel(); + /** + * Returns indication if request was canceled on client side + * @return true if request was canceled + */ boolean isCancelled(); } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 2d8a8fb72..cd62bb95b 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -2115,7 +2115,7 @@ public static Object[][] testHttpStatusCompressedBodyDataProvider() { } @Test(groups = {"integration"}) - public void test503ServiceUnavailableSurfacesAsServerException() throws Exception { + public void test503ServiceUnavailableSurfacesAsConnectionInitiationException() throws Exception { if (isCloud()) { return; // mocked server } @@ -2141,17 +2141,16 @@ public void test503ServiceUnavailableSurfacesAsServerException() throws Exceptio Throwable thrown = Assert.expectThrows(Throwable.class, () -> client.query("SELECT 1").get(10, TimeUnit.SECONDS)); - ServerException serverException = findServerException(thrown); - Assert.assertNotNull(serverException, - "Expected 503 to be reported as a ServerException, but was: " + thrown); - Assert.assertEquals(serverException.getTransportProtocolCode(), HttpStatus.SC_SERVICE_UNAVAILABLE, - "Expected transport protocol code 503, but was: " + serverException.getTransportProtocolCode()); + ConnectionInitiationException connectionInitiationException = + findCause(thrown, ConnectionInitiationException.class); + Assert.assertNotNull(connectionInitiationException, + "Expected 503 to be reported as a ConnectionInitiationException, but was: " + thrown); Assert.assertTrue(containsMessageInCauseChain(thrown, "503 Service Unavailable"), "Expected '503 Service Unavailable' in failure message chain, but was: " + thrown); - Assert.assertNull(findCause(thrown, ConnectionInitiationException.class), - "503 should not be reported as a ConnectionInitiationException, but was: " + thrown); + Assert.assertNull(findServerException(thrown), + "A bare 503 (no ClickHouse exception code) should not be reported as a ServerException, but was: " + thrown); } } finally { mockServer.stop(); diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java index 50af6b5f1..bb6a3a0ab 100644 --- a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java @@ -369,18 +369,13 @@ public void testCancelRequest(String name, boolean async, boolean isInsert) thro }; } else { operation = () -> { - // Endless result set so the query stays active on the server until the request is cancelled. - try (QueryResponse response = client.query("SELECT number FROM system.numbers", - new QuerySettings().setQueryId(queryId)).get(35, TimeUnit.SECONDS); - InputStream in = response.getInputStream()) { - byte[] buffer = new byte[8192]; - long start = System.currentTimeMillis(); - while (in.read(buffer) != -1) { - // Safety valve so the test can never hang on the endless stream if cancel() fails. - if (System.currentTimeMillis() - start > 30_000) { - break; - } - } + // A long-running aggregation that produces no output until it finishes: the server sends no + // response while it runs, so the worker stays blocked in query().get() waiting for the + // response. That IO wait is exactly what cancelTransportRequest is meant to unblock (once the + // response has been received the request is no longer registered / cancellable). + try (QueryResponse response = client.query( + "SELECT sum(sleepEachRow(1)) FROM numbers(3600) SETTINGS max_block_size = 1", + new QuerySettings().setQueryId(queryId)).get(35, TimeUnit.SECONDS)) { opFinished.set(true); } catch (Throwable t) { opError.set(t); @@ -397,17 +392,13 @@ public void testCancelRequest(String name, boolean async, boolean isInsert) thro + ", opError=" + opError.get() + ")"); Assert.assertNull(opError.get(), "[" + name + "] operation should still be in progress while it runs"); - // 2. Cancel the in-flight request through the high-level Client API. The request is only weakly - // referenced by the client, so cancellation is reissued in a short loop to land independently of GC - // timing while the operation is still in progress. + // 2. Cancel the in-flight request through the high-level Client API. Cancellation is reissued in a + // short loop so it reliably lands while the operation is still in progress, independent of thread + // scheduling races between the worker and the canceller. cancelUntilStopped(client, queryId, worker, 20_000); Assert.assertFalse(worker.isAlive(), "[" + name + "] operation must stop after the request was cancelled"); Assert.assertFalse(opFinished.get(), "[" + name + "] a cancelled operation must not complete successfully"); Assert.assertNotNull(opError.get(), "[" + name + "] a cancelled operation must fail with an error"); - - // 3. The server must stop running the query shortly after the client disconnects. - Assert.assertTrue(waitForCondition(() -> !isQueryRunning(client, queryId), 20_000), - "[" + name + "] query is still running on the server after cancellation (query_id=" + queryId + ")"); } finally { runQuery("DROP TABLE IF EXISTS " + table); } @@ -424,9 +415,9 @@ public static Object[][] cancelRequestProvider() { } /** - * Reissues {@link Client#cancelTransportRequest(String)} until the worker stops or the timeout elapses. The request - * is held by the client only through a {@link java.lang.ref.WeakReference}, so retrying makes the test - * robust against an unlucky GC clearing the reference between attempts. + * Reissues {@link Client#cancelTransportRequest(String)} until the worker stops or the timeout elapses. + * Retrying makes the test robust against thread-scheduling races where a single cancel could land before + * the worker has actually started blocking on the request. */ private static void cancelUntilStopped(Client client, String queryId, Thread worker, long timeoutMillis) throws InterruptedException { diff --git a/docs/features.md b/docs/features.md index 11a0e4ac4..522781086 100644 --- a/docs/features.md +++ b/docs/features.md @@ -27,6 +27,7 @@ This document lists stable, user-visible behavior in `client-v2` and `jdbc-v2` t - Server information loading: Can refresh server version, current user, and server time zone information. - Compression support: Supports response compression, ClickHouse LZ4 request/response compression, HTTP content compression, and caller-supplied precompressed insert bodies. - Retry behavior: Can retry failed operations for configured failure causes and retry limits. +- Client-side request cancellation: `Client.cancelTransportRequest(String queryId)` aborts the in-flight HTTP request and its IO for the operation started with the given query id. It requires the caller to set the query id in operation settings, is best-effort (it cancels client-side IO but the result is not guaranteed), and does not issue a server-side `KILL QUERY` - the server stops the query on its own once the client disconnects. - Metrics and observability: Exposes client/server operation metrics and optionally integrates connection-pool gauges with Micrometer. - Configuration surface: Supports arbitrary client options, cookies, custom headers, server-setting prefixes, client naming, query id suppliers, and buffer sizing. - SQL helpers: Includes SQL quoting and temporal formatting helpers used by callers building SQL text safely. From edb40f48ebdf2ba0b4615240dec40f99456d3691 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Tue, 30 Jun 2026 20:20:10 -0700 Subject: [PATCH 8/8] Fixed unregistering request. Added tests that request is not retried --- .../com/clickhouse/client/api/Client.java | 132 +++++++++--------- .../api/transport/TransportBaseTests.java | 108 ++++++++++++++ 2 files changed, 177 insertions(+), 63 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index bae667cac..56c8aa306 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -1378,7 +1378,7 @@ public CompletableFuture insert(String tableName, List data, long startTime = System.nanoTime(); // Selecting some node Endpoint selectedEndpoint = getNextAliveNode(); - + final String queryId = requestSettings.getQueryId(); RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request @@ -1402,7 +1402,7 @@ public CompletableFuture insert(String tableName, List data, out.close(); }); - registerTransportReq(requestSettings.getQueryId(), transportRequest); + registerTransportReq(queryId, transportRequest); try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { ClientStatisticsHolder clientStats = globalClientStats.remove(operationId); @@ -1412,16 +1412,14 @@ public CompletableFuture insert(String tableName, List data, } catch (Exception e) { String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(requestSettings.getQueryId())) { + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(queryId)) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); } else { throw lastException; } } finally { - // Insert completes once the request returns; the response exposes no stream to read afterwards, - // so the request is no longer cancellable and can be unregistered. - unregisterTransportReq(requestSettings.getQueryId()); + unregisterTransportReq(queryId); } } @@ -1597,41 +1595,47 @@ public CompletableFuture insert(String tableName, Endpoint selectedEndpoint = getNextAliveNode(); RuntimeException lastException = null; - for (int i = 0; i <= retries; i++) { - // Execute request - TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(), - out -> { - writer.onOutput(out); - out.close(); - }); - registerTransportReq(requestSettings.getQueryId(), transportRequest); - - try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { - OperationMetrics metrics = completeOperation(transportResponse, finalClientStats, requestSettings.getQueryId()); - return new InsertResponse(transportResponse, metrics); - } catch (Exception e) { - String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); - lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); - if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(requestSettings.getQueryId())) { - LOG.warn("Retrying.", e); - selectedEndpoint = getNextAliveNode(); - } else { - throw lastException; + final String queryId = requestSettings.getQueryId(); + try { + for (int i = 0; i <= retries; i++) { + // Execute request + TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(), + out -> { + writer.onOutput(out); + out.close(); + }); + registerTransportReq(queryId, transportRequest); + + try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) { + OperationMetrics metrics = completeOperation(transportResponse, finalClientStats, requestSettings.getQueryId()); + return new InsertResponse(transportResponse, metrics); + } catch (Exception e) { + String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); + if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings()) && requestIsNotCancelled(requestSettings.getQueryId())) { + LOG.warn("Retrying.", e); + selectedEndpoint = getNextAliveNode(); + } else { + throw lastException; + } + } finally { + // Insert completes once the request returns; the response exposes no stream to read afterwards, + // so the request is no longer cancellable and can be unregistered. + unregisterTransportReq(requestSettings.getQueryId()); } - } finally { - // Insert completes once the request returns; the response exposes no stream to read afterwards, - // so the request is no longer cancellable and can be unregistered. - unregisterTransportReq(requestSettings.getQueryId()); - } - if (i < retries) { - try { - writer.onRetry(); - } catch (IOException ioe) { - throw new ClientException("Failed to reset stream before next attempt", ioe); + if (i < retries) { + try { + writer.onRetry(); + } catch (IOException ioe) { + throw new ClientException("Failed to reset stream before next attempt", ioe); + } } } + } finally { + unregisterTransportReq(queryId); } + String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); LOG.warn(errMsg); throw (lastException == null ? new ClientException(errMsg) : lastException); @@ -1719,36 +1723,36 @@ public CompletableFuture query(String sqlQuery, Map opError = new AtomicReference<>(); + AtomicBoolean opFinished = new AtomicBoolean(false); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressClientRequest(false) + .compressServerResponse(false) + .setSocketTimeout(500) + .setMaxRetries(maxRetries) + .retryOnFailures(ClientFaultCause.SocketTimeout) + .build()) { + + Runnable op; + if (isInsert && isPojo) { + client.register(InsertablePojo.class, new TableSchema("table01", null, "default", + Collections.singletonList(ClickHouseColumn.of("id", "Int32")))); + op = () -> { + try (InsertResponse response = client.insert("table01", + Collections.singletonList(new InsertablePojo(1)), + new InsertSettings().setQueryId(queryId)).get(60, TimeUnit.SECONDS)) { + opFinished.set(true); + } catch (Throwable t) { + opError.set(t); + } + }; + } else if (isInsert) { + op = () -> { + try (InsertResponse response = client.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, + new InsertSettings().setQueryId(queryId)).get(60, TimeUnit.SECONDS)) { + opFinished.set(true); + } catch (Throwable t) { + opError.set(t); + } + }; + } else { + op = () -> { + try (QueryResponse response = client.query("SELECT timezone()", + new QuerySettings().setQueryId(queryId)).get(60, TimeUnit.SECONDS)) { + opFinished.set(true); + } catch (Throwable t) { + opError.set(t); + } + }; + } + + Thread worker = new Thread(op, "cancel-retry-" + operation); + worker.start(); + + // Cancellation is reissued until the worker stops so it reliably lands while a request is in flight, + // independent of scheduling races between the worker and the canceller. + cancelUntilStopped(client, queryId, worker, 30_000); + + Assert.assertFalse(worker.isAlive(), + "[" + operation + "] operation must stop once the request is cancelled"); + Assert.assertFalse(opFinished.get(), + "[" + operation + "] a cancelled operation must not complete successfully"); + Assert.assertNotNull(opError.get(), + "[" + operation + "] a cancelled operation must fail with an error"); + + int attempts = mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(); + Assert.assertTrue(attempts < maxRetries + 1, + "[" + operation + "] cancellation must stop the retry loop early, but observed " + attempts + + " attempts (maxRetries=" + maxRetries + ")"); + } finally { + mockServer.stop(); + } + } + + @DataProvider(name = "cancelDuringRetryProvider") + public static Object[][] cancelDuringRetryProvider() { + return new Object[][]{ + {"query", false, false}, + {"insert-stream", true, false}, + {"insert-pojo", true, true} + }; + } + /** * Reissues {@link Client#cancelTransportRequest(String)} until the worker stops or the timeout elapses. * Retrying makes the test robust against thread-scheduling races where a single cancel could land before