diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java
index 5cf21a52e..672f357f4 100644
--- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java
+++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java
@@ -17,6 +17,7 @@
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
+import com.clickhouse.client.api.internal.ClientUtils;
import com.clickhouse.client.api.internal.CredentialsManager;
import com.clickhouse.client.api.internal.HttpAPIClientHelper;
import com.clickhouse.client.api.internal.MapUtils;
@@ -37,6 +38,8 @@
import com.clickhouse.client.api.serde.POJOSerDe;
import com.clickhouse.client.api.transport.Endpoint;
import com.clickhouse.client.api.transport.HttpEndpoint;
+import com.clickhouse.client.api.transport.internal.TransportRequest;
+import com.clickhouse.client.api.transport.internal.TransportResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
@@ -44,9 +47,6 @@
import com.google.common.collect.ImmutableList;
import net.jpountz.lz4.LZ4Factory;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
-import org.apache.hc.core5.http.ClassicHttpResponse;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1378,43 +1378,30 @@ public CompletableFuture insert(String tableName, List> data,
RuntimeException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
- try (ClassicHttpResponse httpResponse =
- httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(),
- out -> {
- out.write("INSERT INTO ".getBytes());
- out.write(tableName.getBytes());
- out.write(" \n FORMAT ".getBytes());
- out.write(format.name().getBytes());
- out.write(" \n".getBytes());
- for (Object obj : data) {
-
- for (POJOFieldSerializer serializer : serializersForTable) {
- try {
- serializer.serialize(obj, out);
- } catch (InvocationTargetException | IllegalAccessException | IOException e) {
- throw new DataSerializationException(obj, serializer, e);
- }
- }
+ TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(),
+ out -> {
+ out.write("INSERT INTO ".getBytes());
+ out.write(tableName.getBytes());
+ out.write(" \n FORMAT ".getBytes());
+ out.write(format.name().getBytes());
+ out.write(" \n".getBytes());
+ for (Object obj : data) {
+
+ for (POJOFieldSerializer serializer : serializersForTable) {
+ try {
+ serializer.serialize(obj, out);
+ } catch (InvocationTargetException | IllegalAccessException | IOException e) {
+ throw new DataSerializationException(obj, serializer, e);
}
- out.close();
- })) {
-
-
- // Check response
- if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
- LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
- selectedEndpoint = getNextAliveNode();
- continue;
- }
-
+ }
+ }
+ out.close();
+ });
+ try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) {
ClientStatisticsHolder clientStats = globalClientStats.remove(operationId);
- OperationMetrics metrics = new OperationMetrics(clientStats);
- String summary = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}");
- ProcessParser.parseSummary(summary, metrics);
- String queryId = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId(), String::valueOf);
- metrics.operationComplete();
- metrics.setQueryId(queryId);
- return new InsertResponse(metrics, HttpAPIClientHelper.collectResponseHeaders(httpResponse));
+ OperationMetrics metrics = completeOperation(transportResponse, clientStats, requestSettings.getQueryId());
+
+ return new InsertResponse(transportResponse, metrics);
} catch (Exception e) {
String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId());
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
@@ -1429,10 +1416,10 @@ public CompletableFuture insert(String tableName, List> data,
String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId());
LOG.warn(errMsg);
- throw (lastException == null ? new ClientException(errMsg) : lastException); };
+ throw (lastException == null ? new ClientException(errMsg) : lastException);
+ };
return runAsyncOperation(supplier, requestSettings.getAllSettings());
-
}
/**
@@ -1568,7 +1555,7 @@ public CompletableFuture insert(String tableName,
clientStats.start(ClientMetrics.OP_DURATION);
final ClientStatisticsHolder finalClientStats = clientStats;
- Supplier responseSupplier;
+
final int writeBufferSize = requestSettings.getInputStreamCopyBufferSize() <= 0 ?
(int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) :
@@ -1592,7 +1579,8 @@ public CompletableFuture insert(String tableName,
if (requestSettings.getQueryId() == null && queryIdGenerator != null) {
requestSettings.setQueryId(queryIdGenerator.get());
}
- responseSupplier = () -> {
+
+ Supplier responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Endpoint selectedEndpoint = getNextAliveNode();
@@ -1600,28 +1588,15 @@ public CompletableFuture insert(String tableName,
RuntimeException lastException = null;
for (int i = 0; i <= retries; i++) {
// Execute request
- try (ClassicHttpResponse httpResponse =
- httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(),
- out -> {
- writer.onOutput(out);
- out.close();
- })) {
-
-
- // Check response
- if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
- LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
- selectedEndpoint = getNextAliveNode();
- continue;
- }
-
- OperationMetrics metrics = new OperationMetrics(finalClientStats);
- String summary = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}");
- ProcessParser.parseSummary(summary, metrics);
- String queryId = HttpAPIClientHelper.getHeaderVal(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), requestSettings.getQueryId(), String::valueOf);
- metrics.operationComplete();
- metrics.setQueryId(queryId);
- return new InsertResponse(metrics, HttpAPIClientHelper.collectResponseHeaders(httpResponse));
+ TransportRequest transportRequest = httpClientHelper.createRequest(selectedEndpoint, requestSettings.getAllSettings(),
+ out -> {
+ writer.onOutput(out);
+ out.close();
+ });
+
+ try (TransportResponse transportResponse = httpClientHelper.executeRequest(transportRequest)) {
+ OperationMetrics metrics = completeOperation(transportResponse, finalClientStats, requestSettings.getQueryId());
+ return new InsertResponse(transportResponse, metrics);
} catch (Exception e) {
String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId());
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
@@ -1728,38 +1703,20 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, MapQueries data in one of descriptive format and creates a reader out of the response stream.
* Format is selected internally so is ignored when passed in settings. If query contains format
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java
index 2fc09f312..c83161725 100644
--- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java
+++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java
@@ -3,21 +3,17 @@
import com.clickhouse.client.api.http.ClickHouseHttpProto;
import com.clickhouse.client.api.metrics.OperationMetrics;
import com.clickhouse.client.api.metrics.ServerMetrics;
+import com.clickhouse.client.api.transport.internal.TransportResponse;
-import java.util.Collections;
import java.util.Map;
public class InsertResponse implements AutoCloseable {
private OperationMetrics operationMetrics;
private final Map responseHeaders;
- public InsertResponse(OperationMetrics metrics) {
- this(metrics, Collections.emptyMap());
- }
-
- public InsertResponse(OperationMetrics metrics, Map responseHeaders) {
+ public InsertResponse(TransportResponse transportResponse, OperationMetrics metrics) {
this.operationMetrics = metrics;
- this.responseHeaders = responseHeaders;
+ this.responseHeaders = transportResponse.getHeaders();
}
@Override
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java
index 235d55cad..9635fc4d7 100644
--- a/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java
+++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/ClientUtils.java
@@ -1,5 +1,9 @@
package com.clickhouse.client.api.internal;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+
/**
* Class containing utility methods used across the client.
*/
@@ -14,4 +18,14 @@ public static boolean isNotBlank(String str) {
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
+
+ public static void quietClose(Closeable closeable, Logger log) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ log.warn("Failed to close object " + closeable, e);
+ }
+ }
+ }
}
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
index ab4b0153c..46d5e3fa1 100644
--- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
+++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
@@ -15,6 +15,8 @@
import com.clickhouse.client.api.enums.SSLMode;
import com.clickhouse.client.api.http.ClickHouseHttpProto;
import com.clickhouse.client.api.transport.Endpoint;
+import com.clickhouse.client.api.transport.internal.TransportRequest;
+import com.clickhouse.client.api.transport.internal.TransportResponse;
import com.clickhouse.data.ClickHouseFormat;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
@@ -43,8 +45,10 @@
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.NoHttpResponseException;
+import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
@@ -365,10 +369,7 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) :
@@ -529,9 +530,37 @@ private HttpPost createPostRequest(URI uri, Map requestConfig) {
return req;
}
- public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig,
- String body) throws Exception {
+ private static final class TransportRequestImpl implements TransportRequest {
+ private final HttpPost delegate;
+ private final Map config;
+
+ TransportRequestImpl(HttpPost delegate, Map config) {
+ this.delegate = delegate;
+ this.config = config;
+ }
+
+ @Override
+ public boolean cancel() throws Exception {
+ if (delegate.isCancelled()) {
+ return true;
+ }
+ return delegate.cancel();
+ }
+
+ @Override
+ public Map getConfig() {
+ return config;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T getDelegate() {
+ return (T) delegate;
+ }
+ }
+ public TransportRequest createRequest(Endpoint server, Map requestConfig,
+ String body) {
boolean useMultipart = ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getOrDefault(requestConfig) &&
requestConfig.containsKey(HttpAPIClientHelper.KEY_STATEMENT_PARAMS);
@@ -554,27 +583,89 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r
req.setEntity(wrapRequestEntity(httpEntity, requestConfig));
} else {
- final String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
-
- HttpEntity httpEntity = new ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8.name()), CONTENT_TYPE, contentEncoding);
+ final HttpEntity httpEntity;
+ try {
+ final String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
+ httpEntity = new ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8.name()), CONTENT_TYPE, contentEncoding);
+ } catch (UnsupportedEncodingException | ProtocolException e) {
+ throw new ClientException("failed to create request body entity", e);
+ }
req.setEntity(wrapRequestEntity(httpEntity, requestConfig));
}
- // execute
- return doPostRequest(requestConfig, req);
+ return new TransportRequestImpl(req, requestConfig);
+ }
+
+
+ private static final class TransportResponseImpl implements TransportResponse {
+
+ private final ClassicHttpResponse delegate;
+
+ TransportResponseImpl(ClassicHttpResponse delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ClickHouseFormat getDataFormat() {
+ Header formatHeader = delegate.getFirstHeader(ClickHouseHttpProto.HEADER_FORMAT);
+ return formatHeader == null ? null : ClickHouseFormat.valueOf(formatHeader.getValue());
+ }
+
+ @Override
+ public String getSummaryJson() {
+ return HttpAPIClientHelper.getHeaderVal(delegate
+ .getFirstHeader(ClickHouseHttpProto.HEADER_SRV_SUMMARY), "{}");
+ }
+
+ @Override
+ public String getQueryId() {
+ return HttpAPIClientHelper.getHeaderVal(delegate
+ .getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID), null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T getDelegate() {
+ return (T) delegate;
+ }
+
+ @Override
+ public Map getHeaders() {
+ return HttpAPIClientHelper.collectResponseHeaders(delegate);
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public InputStream createDataInputStream() {
+ try {
+ return delegate.getEntity().getContent();
+ } catch (Exception e) {
+ throw new ClientException("Failed to construct input stream", e);
+ }
+ }
}
- public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig,
- IOCallback writeCallback) throws Exception {
+ public TransportResponse executeRequest(TransportRequest transportRequest) throws Exception {
+ return new TransportResponseImpl(doPostRequest(transportRequest.getConfig(), transportRequest.getDelegate()));
+ }
+ public TransportRequest createRequest(Endpoint server, Map requestConfig, IOCallback writeCallback) {
final URI uri = createRequestURI(server, requestConfig, true);
final HttpPost req = createPostRequest(uri, requestConfig);
- String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
- req.setEntity(wrapRequestEntity(
- new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback),
- requestConfig));
+ try {
+ String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
+ req.setEntity(wrapRequestEntity(
+ new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback),
+ requestConfig));
+ } catch (ProtocolException e) {
+ throw new ClientException("failed to create request body entity", e);
+ }
- return doPostRequest(requestConfig, req);
+ return new TransportRequestImpl(req, requestConfig);
}
private ClassicHttpResponse doPostRequest(Map requestConfig, HttpPost req) throws Exception {
@@ -582,6 +673,7 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt
doPoolVent();
ClassicHttpResponse httpResponse = null;
+ boolean closeResponse = true;
HttpContext context = createRequestHttpContext(requestConfig);
try {
httpResponse = httpClient.executeOpen(null, req, context);
@@ -590,43 +682,56 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt
httpResponse.getCode(),
requestConfig));
- if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
- throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
- } else if (httpResponse.getCode() == HttpStatus.SC_BAD_GATEWAY) {
- httpResponse.close();
- throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
- } else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
- try {
- throw readError(req, httpResponse);
- } finally {
- httpResponse.close();
- }
+ if (httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
+ throw readError(req, httpResponse);
}
- return httpResponse;
+ int statusCode = httpResponse.getCode();
+ switch (statusCode) {
+ case HttpStatus.SC_OK:
+ closeResponse = false;
+ return httpResponse;
+ case HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED:
+ throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
+ case HttpStatus.SC_BAD_GATEWAY:
+ throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
+ case HttpStatus.SC_SERVICE_UNAVAILABLE:
+ throw new ServerException(0, "Server returned '503 Service Unavailable'. Check network settings.",
+ HttpStatus.SC_SERVICE_UNAVAILABLE, getQueryId(httpResponse, req));
+ case HttpStatus.SC_BAD_REQUEST:
+ case HttpStatus.SC_UNAUTHORIZED:
+ case HttpStatus.SC_FORBIDDEN:
+ case HttpStatus.SC_SERVER_ERROR:
+ case HttpStatus.SC_NOT_FOUND:
+ // ClickHouse usually uses SC_BAD_REQUEST and SC_SERVER_ERROR to return error.
+ // SC_UNAUTHORIZED, SC_FORBIDDEN is for authentication
+ // SC_NOT_FOUND can be returned by ClickHouse when path doesn't match database, but also by proxy
+ // others we cannot handle properly
+ throw readError(req, httpResponse);
+ default:
+ throw new ClientException("Unexpected result status " + statusCode);
+ }
} catch (UnknownHostException e) {
- closeQuietly(httpResponse);
LOG.warn("Host '{}' unknown", req.getAuthority());
throw e;
} catch (ConnectException | NoRouteToHostException e) {
- closeQuietly(httpResponse);
LOG.warn("Failed to connect to '{}': {}", req.getAuthority(), e.getMessage());
throw e;
} catch (Exception e) {
- closeQuietly(httpResponse);
LOG.debug("Failed to execute request to '{}': {}", req.getAuthority(), e.getMessage(), e);
throw e;
+ } finally {
+ if (closeResponse) {
+ ClientUtils.quietClose(httpResponse, LOG);
+ }
}
}
- public static void closeQuietly(ClassicHttpResponse httpResponse) {
- if (httpResponse != null) {
- try {
- httpResponse.close();
- } catch (IOException e) {
- LOG.warn("Failed to close response");
- }
- }
+ private String getQueryId(HttpResponse httpResponse, HttpPost httpRequest) {
+ final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
+ final Header clientQueryIdHeader = httpRequest.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
+ final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null);
+ return queryHeader == null ? "" : queryHeader.getValue();
}
private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8");
@@ -820,7 +925,10 @@ public static int getHeaderInt(Header header, int defaultValue) {
ClickHouseHttpProto.HEADER_SRV_SUMMARY,
ClickHouseHttpProto.HEADER_SRV_DISPLAY_NAME,
ClickHouseHttpProto.HEADER_DATABASE,
- ClickHouseHttpProto.HEADER_DB_USER
+ ClickHouseHttpProto.HEADER_DB_USER,
+ ClickHouseHttpProto.HEADER_TIMEZONE,
+ ClickHouseHttpProto.HEADER_FORMAT,
+ ClickHouseHttpProto.HEADER_PROGRESS
));
/**
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java
index 6e237a18d..fbece9511 100644
--- a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java
+++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java
@@ -5,13 +5,12 @@
import com.clickhouse.client.api.http.ClickHouseHttpProto;
import com.clickhouse.client.api.metrics.OperationMetrics;
import com.clickhouse.client.api.metrics.ServerMetrics;
+import com.clickhouse.client.api.transport.internal.TransportResponse;
import com.clickhouse.data.ClickHouseFormat;
-import org.apache.hc.core5.http.ClassicHttpResponse;
-import org.apache.hc.core5.http.Header;
import java.io.InputStream;
-import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.TimeZone;
/**
@@ -31,54 +30,45 @@ public class QueryResponse implements AutoCloseable {
private final ClickHouseFormat format;
- private QuerySettings settings;
+ private final QuerySettings settings;
- private OperationMetrics operationMetrics;
+ private final OperationMetrics operationMetrics;
- private ClassicHttpResponse httpResponse;
+ private final TransportResponse transportResponse;
private final Map responseHeaders;
- public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, QuerySettings settings,
- OperationMetrics operationMetrics) {
- this(response, format, settings, operationMetrics, Collections.emptyMap());
- }
-
- public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, QuerySettings settings,
- OperationMetrics operationMetrics, Map responseHeaders) {
- this.httpResponse = response;
+ public QueryResponse(TransportResponse response, ClickHouseFormat format, QuerySettings settings, OperationMetrics operationMetrics) {
+ Objects.requireNonNull(response, "response is null");
+ this.transportResponse = response;
this.format = format;
this.operationMetrics = operationMetrics;
this.settings = settings;
- this.responseHeaders = responseHeaders;
+ this.responseHeaders = response.getHeaders();
- Header tzHeader = response.getFirstHeader(ClickHouseHttpProto.HEADER_TIMEZONE);
- if (tzHeader != null) {
+ String timeZoneHeader = responseHeaders.get(ClickHouseHttpProto.HEADER_TIMEZONE);
+ if (timeZoneHeader != null) {
+ TimeZone serverTz;
try {
- this.settings.setOption(ClientConfigProperties.SERVER_TIMEZONE.getKey(),
- TimeZone.getTimeZone(tzHeader.getValue()));
+ serverTz = TimeZone.getTimeZone(timeZoneHeader);
} catch (Exception e) {
throw new ClientException("Failed to parse server timezone", e);
}
+ this.settings.setOption(ClientConfigProperties.SERVER_TIMEZONE.getKey(),
+ serverTz);
}
}
public InputStream getInputStream() {
- try {
- return httpResponse.getEntity().getContent();
- } catch (Exception e) {
- throw new ClientException("Failed to construct input stream", e);
- }
+ return transportResponse.createDataInputStream();
}
@Override
public void close() throws Exception {
- if (httpResponse != null ) {
- try {
- httpResponse.close();
- } catch (Exception e) {
- throw new ClientException("Failed to close response", e);
- }
+ try {
+ transportResponse.close();
+ } catch (Exception e) {
+ throw new ClientException("Failed to close response", e);
}
}
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java
new file mode 100644
index 000000000..6a1a66401
--- /dev/null
+++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportRequest.java
@@ -0,0 +1,36 @@
+package com.clickhouse.client.api.transport.internal;
+
+import java.util.Map;
+
+public interface TransportRequest {
+
+
+ /**
+ * Gives access to transport delegate. Used strictly only by transport.
+ * @return internal transport request object
+ * @param - Type of delegate
+ */
+ T getDelegate();
+
+ /**
+ * Returns reference to request configuration. Implementation should
+ * store only copy because configuration map is created for each request separately
+ * @return request configuration map
+ */
+ Map getConfig();
+
+ /**
+ * Cancels request associated with the object. Implementation should
+ * treat this method like close() and release all resource.
+ * When request is canceled it cannot be reused. All reusable objects
+ * should be saved elsewhere.
+ * In many cases cancellation of IO operation is problematic and result
+ * of this method should not be used in core logic.
+ * Operation is idempotent and can be called on canceled request multiple
+ * times.
+ *
+ * @return result of operation. True if request was canceled for sure. False when result cannot be known.
+ * @throws Exception - when something extraordinary happens while canceling the request.
+ */
+ boolean cancel() throws Exception;
+}
diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java
new file mode 100644
index 000000000..beecea34d
--- /dev/null
+++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/internal/TransportResponse.java
@@ -0,0 +1,44 @@
+package com.clickhouse.client.api.transport.internal;
+
+import com.clickhouse.data.ClickHouseFormat;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.Map;
+
+public interface TransportResponse extends Closeable {
+
+ /**
+ * Data format returned by server or calculated other way
+ * @return data format
+ */
+ ClickHouseFormat getDataFormat();
+
+ String getSummaryJson();
+
+ String getQueryId();
+
+ /**
+ * Gives access to transport delegate. Used strictly only by transport.
+ * @return internal transport response object
+ * @param - Type of delegate
+ */
+ T getDelegate();
+
+
+ /**
+ * Server headers.
+ * @return response headers
+ */
+ Map getHeaders();
+
+
+ /**
+ * Creates a new stream to read data. It is applicable only for
+ * blocking transports. In real life this should be called once.
+ * It is important to mention that each time new input stream is created.
+ *
+ * @return new data stream
+ */
+ InputStream createDataInputStream();
+}
diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java
index 6d1a27010..2d8a8fb72 100644
--- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java
+++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java
@@ -21,13 +21,19 @@
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.DataTypeConverter;
+import com.clickhouse.client.api.internal.HttpAPIClientHelper;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.internal.ValidationUtils;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
+import com.clickhouse.client.api.transport.Endpoint;
+import com.clickhouse.client.api.transport.HttpEndpoint;
+import com.clickhouse.client.api.transport.internal.TransportRequest;
+import com.clickhouse.client.api.transport.internal.TransportResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseFormat;
+import net.jpountz.lz4.LZ4Factory;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
@@ -64,6 +70,7 @@
import javax.net.ssl.SSLHandshakeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
import java.io.StringWriter;
import java.math.BigInteger;
import java.net.InetAddress;
@@ -102,11 +109,16 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Test(groups = {"integration"})
@@ -2102,6 +2114,102 @@ public static Object[][] testHttpStatusCompressedBodyDataProvider() {
};
}
+ @Test(groups = {"integration"})
+ public void test503ServiceUnavailableSurfacesAsServerException() throws Exception {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = new WireMockServer(WireMockConfiguration
+ .options().dynamicPort().notifier(new ConsoleNotifier(false)));
+ mockServer.start();
+
+ try {
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)
+ .withBody("Service Unavailable"))
+ .build());
+
+ try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
+ .setUsername("default")
+ .setPassword(ClickHouseServerForTest.getPassword())
+ .compressServerResponse(false)
+ .retryOnFailures(ClientFaultCause.None)
+ .build()) {
+
+ Throwable thrown = Assert.expectThrows(Throwable.class,
+ () -> client.query("SELECT 1").get(10, TimeUnit.SECONDS));
+
+ ServerException serverException = findServerException(thrown);
+ Assert.assertNotNull(serverException,
+ "Expected 503 to be reported as a ServerException, but was: " + thrown);
+ Assert.assertEquals(serverException.getTransportProtocolCode(), HttpStatus.SC_SERVICE_UNAVAILABLE,
+ "Expected transport protocol code 503, but was: " + serverException.getTransportProtocolCode());
+
+ Assert.assertTrue(containsMessageInCauseChain(thrown, "503 Service Unavailable"),
+ "Expected '503 Service Unavailable' in failure message chain, but was: " + thrown);
+
+ Assert.assertNull(findCause(thrown, ConnectionInitiationException.class),
+ "503 should not be reported as a ConnectionInitiationException, but was: " + thrown);
+ }
+ } finally {
+ mockServer.stop();
+ }
+ }
+
+ @Test(groups = {"integration"})
+ public void test503ServiceUnavailableIsRetried() throws Exception {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = new WireMockServer(WireMockConfiguration
+ .options().dynamicPort().notifier(new ConsoleNotifier(false)));
+ mockServer.start();
+
+ try {
+ // First request fails with a retryable 503 server error
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .inScenario("ServiceUnavailable")
+ .withRequestBody(WireMock.containing("SELECT 1"))
+ .whenScenarioStateIs(STARTED)
+ .willSetStateTo("Recovered")
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)
+ .withHeader("X-ClickHouse-Exception-Code", "202")
+ .withBody("Code: 202. DB::Exception: Too many simultaneous queries. (TOO_MANY_SIMULTANEOUS_QUERIES)"))
+ .build());
+
+ // Second request (retry) succeeds
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .inScenario("ServiceUnavailable")
+ .withRequestBody(WireMock.containing("SELECT 1"))
+ .whenScenarioStateIs("Recovered")
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_OK)
+ .withHeader("X-ClickHouse-Summary", "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}"))
+ .build());
+
+ try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
+ .setUsername("default")
+ .setPassword(ClickHouseServerForTest.getPassword())
+ .compressServerResponse(false)
+ .setMaxRetries(1)
+ .retryOnFailures(ClientFaultCause.ServerRetryable)
+ .build()) {
+
+ try (QueryResponse response = client.query("SELECT 1").get(10, TimeUnit.SECONDS)) {
+ Assert.assertNotNull(response);
+ }
+ }
+
+ mockServer.verify(2, WireMock.postRequestedFor(WireMock.anyUrl()));
+ } finally {
+ mockServer.stop();
+ }
+ }
+
private byte[] fetchBinaryPayload(String query, int networkBufferSize, int timeoutSec) throws Exception {
QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
try (Client client = newClient()
@@ -2532,4 +2640,173 @@ private static String toPem(X509Certificate certificate) throws Exception {
}
return stringWriter.toString();
}
+
+ /**
+ * Exercises {@code HttpAPIClientHelper.TransportRequestImpl#cancel()}, which is not reachable through the
+ * high-level {@link Client} API. A {@link HttpAPIClientHelper} is created directly and used to start a request
+ * for an effectively endless stream (reading from {@code system.numbers}) to emulate a long-running query.
+ * The test verifies the whole life cycle against the server:
+ *
+ * - the query is observed running on the server ({@code system.processes});
+ * - the in-flight request is cancelled from another thread and the reader stops with an error;
+ * - the query is no longer running on the server;
+ * - {@code system.query_log} contains a record proving the query was interrupted (not finished).
+ *
+ */
+ @Test(groups = {"integration"})
+ @SuppressWarnings("java:S2925")
+ public void testTransportRequestCancel() throws Exception {
+ if (isCloud()) {
+ return; // relies on direct transport access and local system tables (processes / query_log)
+ }
+
+ ClickHouseNode server = getServer(ClickHouseProtocol.HTTP);
+ String queryId = "transport-cancel-" + UUID.randomUUID();
+
+ Map configuration = new HashMap<>();
+ configuration.put(ClientConfigProperties.USER.getKey(), "default");
+ configuration.put(ClientConfigProperties.PASSWORD.getKey(), ClickHouseServerForTest.getPassword());
+ configuration.put(ClientConfigProperties.DATABASE.getKey(), ClickHouseServerForTest.getDatabase());
+ configuration.put(ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), Boolean.FALSE);
+ configuration.put(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), Boolean.FALSE);
+
+ HttpAPIClientHelper helper = new HttpAPIClientHelper(new HashMap<>(configuration), null, false,
+ LZ4Factory.fastestInstance());
+
+ try (Client verifyClient = new Client.Builder()
+ .addEndpoint(Protocol.HTTP, server.getHost(), server.getPort(), false)
+ .setUsername("default")
+ .setPassword(ClickHouseServerForTest.getPassword())
+ .setDefaultDatabase(ClickHouseServerForTest.getDatabase())
+ .compressClientRequest(false)
+ .compressServerResponse(false)
+ .build()) {
+
+ Endpoint endpoint = new HttpEndpoint(server.getHost(), server.getPort(), false, "/");
+
+ Map requestConfig = new HashMap<>(configuration);
+ requestConfig.put(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), ClickHouseFormat.TSV);
+ requestConfig.put(ClientConfigProperties.QUERY_ID.getKey(), queryId);
+
+ // Endless result set so the query stays active on the server until the request is cancelled.
+ TransportRequest request = helper.createRequest(endpoint, requestConfig,
+ "SELECT number FROM system.numbers");
+
+ AtomicBoolean readStarted = new AtomicBoolean(false);
+ AtomicLong bytesRead = new AtomicLong(0);
+ AtomicReference readError = new AtomicReference<>();
+
+ Thread reader = new Thread(() -> {
+ long start = System.currentTimeMillis();
+ try (TransportResponse response = helper.executeRequest(request);
+ InputStream in = response.createDataInputStream()) {
+ byte[] buffer = new byte[8192];
+ int read;
+ while ((read = in.read(buffer)) != -1) {
+ readStarted.set(true);
+ bytesRead.addAndGet(read);
+ // Safety valve so the test can never hang on the endless stream if cancel() fails.
+ if (System.currentTimeMillis() - start > 30_000) {
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ readStarted.set(true);
+ readError.set(t);
+ }
+ }, "transport-request-reader");
+ reader.start();
+
+ // 1. The query must actually be running on the server.
+ Assert.assertTrue(waitForCondition(() -> isQueryRunning(verifyClient, queryId), 20_000),
+ "query was not observed running on the server (query_id=" + queryId + ")");
+ Assert.assertNull(readError.get(), "reading should still be in progress while the query runs");
+
+ // 2. Cancel the in-flight request from a separate thread.
+ AtomicBoolean cancelled = new AtomicBoolean(false);
+ AtomicReference cancelError = new AtomicReference<>();
+ Thread canceller = new Thread(() -> {
+ try {
+ cancelled.set(request.cancel());
+ } catch (Throwable t) {
+ cancelError.set(t);
+ }
+ }, "transport-request-canceller");
+ canceller.start();
+ canceller.join(5_000);
+
+ Assert.assertNull(cancelError.get(), "cancel() must not throw");
+ Assert.assertTrue(cancelled.get(), "cancel() should report the request as cancelled");
+
+ // The reader must stop with an error once the underlying connection is aborted.
+ reader.join(15_000);
+ Assert.assertFalse(reader.isAlive(), "reading must stop after the request was cancelled");
+ Assert.assertNotNull(readError.get(),
+ "reading a cancelled request must fail, but read " + bytesRead.get() + " bytes");
+
+ // 3. The server must stop running the query shortly after the client disconnects.
+ Assert.assertTrue(waitForCondition(() -> !isQueryRunning(verifyClient, queryId), 20_000),
+ "query is still running on the server after cancellation (query_id=" + queryId + ")");
+
+ // 4. The query_log must show the query was interrupted instead of finishing successfully.
+ assertQueryWasInterrupted(verifyClient, queryId);
+ assertTrue(request.cancel());
+ } finally {
+ helper.close();
+ }
+ }
+
+ private static boolean isQueryRunning(Client client, String queryId) {
+ List rows = client.queryAll(
+ "SELECT count() AS c FROM system.processes WHERE query_id = '" + queryId + "'");
+ return !rows.isEmpty() && rows.get(0).getLong("c") > 0;
+ }
+
+ private static boolean waitForCondition(Supplier condition, long timeoutMillis)
+ throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMillis;
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ if (Boolean.TRUE.equals(condition.get())) {
+ return true;
+ }
+ } catch (Exception ignore) {
+ // transient query failures (e.g. server busy) are retried until the timeout elapses
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+
+ private static void assertQueryWasInterrupted(Client client, String queryId) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + 30_000;
+ String seenTypes = "";
+ while (System.currentTimeMillis() < deadline) {
+ client.queryAll("SYSTEM FLUSH LOGS");
+ List rows = client.queryAll(
+ "SELECT toString(type) AS type, exception_code FROM system.query_log " +
+ "WHERE query_id = '" + queryId + "' AND event_date >= today() - 1");
+
+ StringBuilder types = new StringBuilder();
+ for (GenericRecord row : rows) {
+ String type = row.getString("type");
+ int exceptionCode = row.getInteger("exception_code");
+ types.append(type).append('(').append(exceptionCode).append(") ");
+
+ Assert.assertNotEquals(type, "QueryFinish",
+ "query unexpectedly finished successfully instead of being interrupted (query_id="
+ + queryId + ")");
+ if ("ExceptionWhileProcessing".equals(type)) {
+ Assert.assertNotEquals(exceptionCode, 0,
+ "an interrupted query must be logged with a non-zero exception code (query_id="
+ + queryId + ")");
+ return; // found the proof the query was interrupted on the server
+ }
+ }
+ seenTypes = types.toString();
+ Thread.sleep(250);
+ }
+ Assert.fail("no query_log record proving the query was interrupted (query_id=" + queryId
+ + ", seen types: [" + seenTypes + "])");
+ }
}
diff --git a/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java
new file mode 100644
index 000000000..88c5d8526
--- /dev/null
+++ b/client-v2/src/test/java/com/clickhouse/client/api/internal/ClientUtilsTest.java
@@ -0,0 +1,48 @@
+package com.clickhouse.client.api.internal;
+
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ClientUtilsTest {
+
+ @Test(groups = {"unit"})
+ public void testQuietCloseSwallowsExceptionAndLogs() throws IOException {
+ Logger log = Mockito.mock(Logger.class);
+ IOException failure = new IOException("close failed");
+ Closeable closeable = Mockito.mock(Closeable.class);
+ Mockito.doThrow(failure).when(closeable).close();
+
+ // Should not propagate the exception thrown by close()
+ ClientUtils.quietClose(closeable, log);
+
+ Mockito.verify(closeable).close();
+ Mockito.verify(log).warn(Mockito.contains("Failed to close object"), Mockito.eq(failure));
+ }
+
+ @Test(groups = {"unit"})
+ public void testQuietCloseClosesSuccessfully() throws IOException {
+ Logger log = Mockito.mock(Logger.class);
+ Closeable closeable = Mockito.mock(Closeable.class);
+
+ ClientUtils.quietClose(closeable, log);
+
+ Mockito.verify(closeable).close();
+ Mockito.verifyNoInteractions(log);
+ }
+
+ @Test(groups = {"unit"})
+ public void testQuietCloseWithNull() {
+ Logger log = Mockito.mock(Logger.class);
+
+ // Should be a no-op and not throw on a null closeable
+ ClientUtils.quietClose(null, log);
+
+ Mockito.verifyNoInteractions(log);
+ Assert.assertTrue(true);
+ }
+}
diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java
new file mode 100644
index 000000000..113304e6c
--- /dev/null
+++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/TransportBaseTests.java
@@ -0,0 +1,326 @@
+package com.clickhouse.client.api.transport;
+
+
+import com.clickhouse.client.BaseIntegrationTest;
+import com.clickhouse.client.ClickHouseServerForTest;
+import com.clickhouse.client.api.Client;
+import com.clickhouse.client.api.ClientFaultCause;
+import com.clickhouse.client.api.ServerException;
+import com.clickhouse.client.api.enums.Protocol;
+import com.clickhouse.client.api.insert.InsertResponse;
+import com.clickhouse.client.api.metadata.TableSchema;
+import com.clickhouse.client.api.query.QueryResponse;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.clickhouse.data.ClickHouseColumn;
+import com.clickhouse.data.ClickHouseFormat;
+import org.apache.hc.core5.http.HttpStatus;
+import org.testcontainers.utility.ThrowingFunction;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
+
+@Test(groups = {"integration"})
+public class TransportBaseTests extends BaseIntegrationTest {
+
+ private static final String RETRYABLE_BODY =
+ "Code: 202. DB::Exception: Too many simultaneous queries. (TOO_MANY_SIMULTANEOUS_QUERIES)";
+ private static final int RETRYABLE_CODE = 202;
+
+ private static final String NON_RETRYABLE_BODY =
+ "Code: 62. DB::Exception: Syntax error. (SYNTAX_ERROR)";
+ private static final int NON_RETRYABLE_CODE = 62;
+
+ private WireMockServer startMockServer() {
+ WireMockServer mockServer = new WireMockServer(WireMockConfiguration
+ .options().dynamicPort().notifier(new ConsoleNotifier(false)));
+ mockServer.start();
+ return mockServer;
+ }
+
+ private Client mockServerClient(WireMockServer mockServer, int maxRetries, ClientFaultCause... retryOn) {
+ Client.Builder builder = new Client.Builder()
+ .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
+ .setUsername("default")
+ .setPassword(ClickHouseServerForTest.getPassword())
+ .compressClientRequest(false)
+ .compressServerResponse(false)
+ .setMaxRetries(maxRetries);
+ if (retryOn.length > 0) {
+ builder.retryOnFailures(retryOn);
+ }
+ return builder.build();
+ }
+
+ /**
+ * A retryable server error on the first attempt should be retried and the following successful
+ * response should be returned. Covers the retry branch of the operation loop
+ * ({@code shouldRetry(...)} == true) and recovery on the next node.
+ */
+ @Test(groups = {"integration"}, dataProvider = "operationProvider")
+ public void testRetriesAndSucceedsAfterRetryableServerError(String operation,
+ ThrowingFunction function) {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = startMockServer();
+
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .inScenario("Retry")
+ .whenScenarioStateIs(STARTED)
+ .willSetStateTo("Recovered")
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)
+ .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE))
+ .withBody(RETRYABLE_BODY)).build());
+
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .inScenario("Retry")
+ .whenScenarioStateIs("Recovered")
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_OK)
+ .withHeader("X-ClickHouse-Summary",
+ "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());
+
+ try (Client client = mockServerClient(mockServer, 1)) {
+ function.apply(client);
+ } catch (Exception e) {
+ Assert.fail("[" + operation + "] should have recovered after a retry", e);
+ } finally {
+ mockServer.stop();
+ }
+
+ Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 2,
+ "[" + operation + "] expected one failed attempt followed by a successful retry");
+ }
+
+ /**
+ * When the server keeps returning a retryable error the operation should be retried
+ * {@code maxRetries} times and then fail by re-throwing the last captured exception.
+ * Covers the final throw of the operation loop ({@code throw lastException}).
+ */
+ @Test(groups = {"integration"}, dataProvider = "operationProvider")
+ public void testThrowsAfterExhaustingRetries(String operation,
+ ThrowingFunction function) {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ int maxRetries = 2;
+ WireMockServer mockServer = startMockServer();
+
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)
+ .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE))
+ .withBody(RETRYABLE_BODY)).build());
+
+ try (Client client = mockServerClient(mockServer, maxRetries)) {
+ function.apply(client);
+ Assert.fail("[" + operation + "] expected exception after exhausting retries");
+ } catch (ServerException e) {
+ Assert.assertEquals(e.getCode(), RETRYABLE_CODE);
+ } catch (Exception e) {
+ Assert.fail("[" + operation + "] unexpected exception type", e);
+ } finally {
+ mockServer.stop();
+ }
+
+ Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), maxRetries + 1,
+ "[" + operation + "] expected initial attempt plus " + maxRetries + " retries");
+ }
+
+ /**
+ * A non-retryable server error should be re-thrown immediately without consuming any of the
+ * configured retries. Covers the {@code else { throw lastException; }} branch.
+ */
+ @Test(groups = {"integration"}, dataProvider = "operationProvider")
+ public void testThrowsImmediatelyWhenNotRetryable(String operation,
+ ThrowingFunction function) {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = startMockServer();
+
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_BAD_REQUEST)
+ .withHeader("X-ClickHouse-Exception-Code", String.valueOf(NON_RETRYABLE_CODE))
+ .withBody(NON_RETRYABLE_BODY)).build());
+
+ try (Client client = mockServerClient(mockServer, 3)) {
+ function.apply(client);
+ Assert.fail("[" + operation + "] expected exception for non-retryable error");
+ } catch (ServerException e) {
+ Assert.assertEquals(e.getCode(), NON_RETRYABLE_CODE);
+ } catch (Exception e) {
+ Assert.fail("[" + operation + "] unexpected exception type", e);
+ } finally {
+ mockServer.stop();
+ }
+
+ Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 1,
+ "[" + operation + "] a non-retryable error must not be retried");
+ }
+
+ /**
+ * When retries are disabled ({@link ClientFaultCause#None}) even an otherwise retryable error
+ * must be re-thrown on the first attempt. Covers the {@code else { throw lastException; }}
+ * branch when {@code shouldRetry(...)} returns {@code false} because of the configuration.
+ */
+ @Test(groups = {"integration"}, dataProvider = "operationProvider")
+ public void testRetriesDisabledThrowsImmediately(String operation,
+ ThrowingFunction function) {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = startMockServer();
+
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE)
+ .withHeader("X-ClickHouse-Exception-Code", String.valueOf(RETRYABLE_CODE))
+ .withBody(RETRYABLE_BODY)).build());
+
+ try (Client client = mockServerClient(mockServer, 3, ClientFaultCause.None)) {
+ function.apply(client);
+ Assert.fail("[" + operation + "] expected exception when retries are disabled");
+ } catch (ServerException e) {
+ Assert.assertEquals(e.getCode(), RETRYABLE_CODE);
+ } catch (Exception e) {
+ Assert.fail("[" + operation + "] unexpected exception type", e);
+ } finally {
+ mockServer.stop();
+ }
+
+ Assert.assertEquals(mockServer.findAll(WireMock.postRequestedFor(WireMock.anyUrl())).size(), 1,
+ "[" + operation + "] retries are disabled, no retry expected");
+ }
+
+ /**
+ * When no query id is supplied by the caller but a generator is configured, the operation must
+ * use the generated id. Covers the
+ * {@code if (requestSettings.getQueryId() == null && queryIdGenerator != null)} block that is
+ * present in every operation path.
+ */
+ @Test(groups = {"integration"}, dataProvider = "operationProvider")
+ public void testGeneratedQueryIdIsUsedWhenNotSet(String operation,
+ ThrowingFunction function) {
+ if (isCloud()) {
+ return; // mocked server
+ }
+
+ WireMockServer mockServer = startMockServer();
+ mockServer.addStubMapping(WireMock.post(WireMock.anyUrl())
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_OK)
+ .withHeader("X-ClickHouse-Summary",
+ "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());
+
+ String generatedId = "generated-" + UUID.randomUUID();
+ AtomicInteger generatorCalls = new AtomicInteger();
+ Supplier idGenerator = () -> {
+ generatorCalls.incrementAndGet();
+ return generatedId;
+ };
+
+ try (Client client = new Client.Builder()
+ .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
+ .setUsername("default")
+ .setPassword(ClickHouseServerForTest.getPassword())
+ .compressClientRequest(false)
+ .compressServerResponse(false)
+ .setQueryIdGenerator(idGenerator)
+ .build()) {
+ function.apply(client);
+ } catch (Exception e) {
+ mockServer.stop();
+ Assert.fail("[" + operation + "] operation should succeed", e);
+ return;
+ }
+
+ try {
+ Assert.assertTrue(generatorCalls.get() >= 1,
+ "[" + operation + "] query id generator should have been invoked");
+ mockServer.verify(WireMock.postRequestedFor(WireMock.anyUrl())
+ .withQueryParam("query_id", WireMock.equalTo(generatedId)));
+ } finally {
+ mockServer.stop();
+ }
+ }
+
+ /**
+ * Provides one {@link ThrowingFunction} per retry-loop code path in {@link Client}, so every
+ * failure-handling test exercises all of them:
+ *
+ * - {@code query} - {@link Client#query(String)}
+ * - {@code insert-stream} - {@link Client#insert(String, java.io.InputStream, ClickHouseFormat)}
+ * - {@code insert-pojo} - {@link Client#insert(String, java.util.List)}
+ *
+ */
+ @DataProvider(name = "operationProvider")
+ public static Object[][] operationProvider() {
+ ThrowingFunction queryFunction = (client) -> {
+ try (QueryResponse response = client.query("SELECT timezone()").get(30, TimeUnit.SECONDS)) {
+ return null;
+ }
+ };
+
+ ThrowingFunction streamInsertFunction = (client) -> {
+ try (InsertResponse response = client.insert("table01",
+ new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV)
+ .get(30, TimeUnit.SECONDS)) {
+ return null;
+ }
+ };
+
+ ThrowingFunction pojoInsertFunction = (client) -> {
+ client.register(InsertablePojo.class, new TableSchema("table01", null, "default",
+ Collections.singletonList(ClickHouseColumn.of("id", "Int32"))));
+ try (InsertResponse response = client.insert("table01",
+ Collections.singletonList(new InsertablePojo(1)))
+ .get(30, TimeUnit.SECONDS)) {
+ return null;
+ }
+ };
+
+ return new Object[][]{
+ {"query", queryFunction},
+ {"insert-stream", streamInsertFunction},
+ {"insert-pojo", pojoInsertFunction}
+ };
+ }
+
+ public static class InsertablePojo {
+ private int id;
+
+ public InsertablePojo() {
+ }
+
+ public InsertablePojo(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+ }
+}
diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java
index 6ffdfea5e..cf9eb057b 100644
--- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java
+++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java
@@ -13,6 +13,7 @@
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
+import com.clickhouse.client.api.internal.ClientUtils;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.metrics.ClientMetrics;
@@ -140,6 +141,36 @@ public void insertSimplePOJOs() throws Exception {
assertEquals(response.getQueryId(), uuid);
}
+ @Test(groups = { "integration" }, enabled = true)
+ public void testInsertResponseStats() throws Exception {
+ String tableName = "insert_response_stats_table";
+ String createSQL = SamplePOJO.generateTableCreateSQL(tableName);
+ String uuid = UUID.randomUUID().toString();
+
+ initTable(tableName, createSQL);
+
+ client.register(SamplePOJO.class, client.getTableSchema(tableName));
+ List