diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2b0ba711..c68898b8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,7 +2,11 @@
### Features
-1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Update Apache Flight client dependencies.
+1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Upgrade Arrow Flight client dependencies.
+
+### Bug Fixes
+
+1. [#351](https://github.com/InfluxCommunity/influxdb3-java/pull/351): Enterprise/Core structured errors handling.
### CI
diff --git a/pom.xml b/pom.xml
index a67ad2cf..e9b2daad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,6 +317,13 @@
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${junit-jupiter.version}
+ test
+
+
org.assertj
assertj-core
diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java
index 0453e662..6fb0e05b 100644
--- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java
+++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java
@@ -217,22 +217,12 @@ HttpResponse request(@Nonnull final String path,
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
- String reason = "";
+ String reason;
String body = response.body();
- if (!body.isEmpty()) {
- try {
- final JsonNode root = objectMapper.readTree(body);
- final List possibilities = List.of("message", "error_message", "error");
- for (final String field : possibilities) {
- final JsonNode node = root.findValue(field);
- if (node != null) {
- reason = node.asText();
- break;
- }
- }
- } catch (JsonProcessingException e) {
- LOG.debug("Can't parse msg from response {}", response);
- }
+ reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null));
+
+ if (reason == null) {
+ reason = "";
}
if (reason.isEmpty()) {
@@ -257,6 +247,103 @@ HttpResponse request(@Nonnull final String path,
return response;
}
+ @Nullable
+ private String formatErrorMessage(@Nonnull final String body, @Nullable final String contentType) {
+ if (body.isEmpty()) {
+ return null;
+ }
+
+ if (contentType != null
+ && !contentType.isEmpty()
+ && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) {
+ return null;
+ }
+
+ try {
+ final JsonNode root = objectMapper.readTree(body);
+ if (!root.isObject()) {
+ return null;
+ }
+
+ final String rootMessage = errNonEmptyField(root, "message");
+ if (rootMessage != null) {
+ return rootMessage;
+ }
+
+ final String error = errNonEmptyField(root, "error");
+ final JsonNode dataNode = root.get("data");
+
+ // InfluxDB 3 Core/Enterprise write error format:
+ // {"error":"...","data":[{"error_message":"...","line_number":2,"original_line":"..."}]}
+ if (error != null && dataNode != null && dataNode.isArray()) {
+ final StringBuilder message = new StringBuilder(error);
+ boolean hasDetails = false;
+ for (JsonNode item : dataNode) {
+ final String detail = errFormatDataArrayDetail(item);
+ if (detail == null) {
+ continue;
+ }
+ if (!hasDetails) {
+ message.append(':');
+ hasDetails = true;
+ }
+ message.append("\n\t").append(detail);
+ }
+ return message.toString();
+ }
+
+ // Core/Enterprise object format:
+ // {"error":"...","data":{"error_message":"..."}}
+ final String errorMessage = errNonEmptyField(dataNode, "error_message");
+ if (errorMessage != null) {
+ return errorMessage;
+ }
+
+ return error;
+ } catch (JsonProcessingException e) {
+ LOG.debug("Can't parse msg from response body {}", body, e);
+ return null;
+ }
+ }
+
+ @Nullable
+ private String errNonEmptyText(@Nullable final JsonNode node) {
+ if (node == null || node.isNull()) {
+ return null;
+ }
+ final String value = node.asText();
+ return value.isEmpty() ? null : value;
+ }
+
+ @Nullable
+ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final String fieldName) {
+ if (object == null || !object.isObject()) {
+ return null;
+ }
+ return errNonEmptyText(object.get(fieldName));
+ }
+
+ @Nullable
+ private String errFormatDataArrayDetail(@Nullable final JsonNode item) {
+ if (!item.isObject()) {
+ return null;
+ }
+
+ final String errorMessage = errNonEmptyField(item, "error_message");
+ if (errorMessage == null) {
+ return null;
+ }
+
+ if (item.hasNonNull("line_number")) {
+ final String originalLine = errNonEmptyField(item, "original_line");
+ if (originalLine != null) {
+ final String lineNumber = item.get("line_number").asText();
+ return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")";
+ }
+ }
+ return errorMessage;
+ }
+
private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) {
try {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
diff --git a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java
index 44b05560..75478e07 100644
--- a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java
+++ b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java
@@ -65,10 +65,23 @@ protected MockResponse createResponse(final int responseCode,
@Nullable final Map headers,
@Nullable final String body) {
+ return createResponse(responseCode, "text/csv; charset=utf-8", headers, body);
+ }
+
+ @Nonnull
+ protected MockResponse createResponse(final int responseCode,
+ @Nullable final String contentType,
+ @Nullable final Map headers,
+ @Nullable final String body) {
+
MockResponse.Builder mrb = new MockResponse.Builder();
mrb.code(responseCode);
- Map effectiveHeaders = new HashMap<>(Map.of("Content-Type", "text/csv; charset=utf-8",
- "Date", "Tue, 26 Jun 2018 13:15:01 GMT"));
+ Map effectiveHeaders = new HashMap<>(Map.of(
+ "Date", "Tue, 26 Jun 2018 13:15:01 GMT"
+ ));
+ if (contentType != null) {
+ effectiveHeaders.put("Content-Type", contentType);
+ }
if (headers != null) {
effectiveHeaders.putAll(headers);
}
@@ -85,9 +98,6 @@ protected MockResponse createResponse(final int responseCode,
@Nonnull
protected MockResponse createResponse(final int responseCode) {
- return createResponse(responseCode, Map.of(
- "Content-Type", "text/csv; charset=utf-8",
- "Date", "Tue, 26 Jun 2018 13:15:01 GMT"
- ), null);
+ return createResponse(responseCode, "text/csv; charset=utf-8", null, null);
}
}
diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
index 4d3a2cba..cd4242d7 100644
--- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
+++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
@@ -517,7 +517,8 @@ void defaultTags() throws InterruptedException {
@Test
public void retryHandled429Test() {
mockServer.enqueue(createResponse(429,
- Map.of("retry-after", "42", "content-type", "application/json"),
+ "application/json",
+ Map.of("retry-after", "42"),
"{ \"message\" : \"Too Many Requests\" }"));
Point point = Point.measurement("mem")
diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
index 10cbd110..b6d4a26b 100644
--- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
+++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
@@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Stream;
import io.netty.handler.codec.http.HttpMethod;
import mockwebserver3.MockResponse;
@@ -42,6 +43,9 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import com.influxdb.v3.client.AbstractMockServerTest;
import com.influxdb.v3.client.InfluxDBApiException;
@@ -387,6 +391,7 @@ public void errorFromHeader() {
public void errorFromBody() {
mockServer.enqueue(createResponse(401,
+ "application/json",
Map.of("X-Influx-Errpr", "not used"),
"{\"message\":\"token does not have sufficient permissions\"}"));
@@ -402,9 +407,100 @@ public void errorFromBody() {
}
@Test
- public void errorFromBodyEdgeWithoutMessage() { // OSS/Edge error message
+ public void errorFromBodyIgnoredForNonJsonContentType() {
+ mockServer.enqueue(createResponse(400,
+ "text/plain",
+ null,
+ "{\"message\":\"token does not have sufficient permissions\"}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: {\"message\":\"token does not have sufficient permissions\"}");
+ }
+ @Test
+ public void errorFromBodyInvalidJsonFallsBackToBody() {
mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"message\":\"token does not have sufficient permissions\""));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: {\"message\":\"token does not have sufficient permissions\"");
+ }
+
+ @Test
+ public void errorFromBodyNullMessageFallsBackToError() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"message\":null,\"error\":\"parsing failed\"}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: parsing failed");
+ }
+
+ @Test
+ public void errorFromBodyEmptyMessageFallsBackToError() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"message\":\"\",\"error\":\"parsing failed\"}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: parsing failed");
+ }
+
+ @Test
+ public void errorFromBodyJsonArrayFallsBackToBody() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "[]"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: []");
+ }
+
+ @Test
+ public void errorFromBodyV3WithoutMessageAndEmptyContentType() {
+
+ mockServer.enqueue(createResponse(400,
+ "",
null,
"{\"error\":\"parsing failed\"}"));
@@ -420,9 +516,29 @@ public void errorFromBodyEdgeWithoutMessage() { // OSS/Edge error message
}
@Test
- public void errorFromBodyEdgeWithMessage() { // OSS/Edge specific error message
+ public void errorFromBodyV3WithoutMessageAndWithoutContentType() {
mockServer.enqueue(createResponse(400,
+ null,
+ null,
+ "{\"error\":\"parsing failed\"}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: parsing failed");
+ }
+
+ @Test
+ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format
+
+ mockServer.enqueue(createResponse(400,
+ "application/json",
null,
"{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}"));
@@ -437,6 +553,170 @@ public void errorFromBodyEdgeWithMessage() { // OSS/Edge specific error message
.hasMessage("HTTP status code: 400; Message: invalid field value");
}
+ @Test
+ public void errorFromBodyV3WithDataArray() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float\",\"line_number\":2,"
+ + "\"original_line\":\"testa6a3ad v=1 17702\"}]}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float (testa6a3ad v=1 17702)");
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("errorFromBodyV3WithDataArrayCases")
+ public void errorFromBodyV3WithDataArrayCase(final String testName,
+ final String body,
+ final String expectedMessage) {
+
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ body));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage(expectedMessage);
+ }
+
+ private static Stream errorFromBodyV3WithDataArrayCases() {
+ return Stream.of(
+ Arguments.of(
+ "message-only detail",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"only error message\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message"
+ ),
+ Arguments.of(
+ "non-object item skipped",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[null,{\"error_message\":"
+ + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "no detail fields",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"line_number\":2}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "empty error_message skipped",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"\"},"
+ + "{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "non-object primitive item skipped",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":"
+ + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "null error_message skipped",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":null},"
+ + "{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "empty original_line uses message-only detail",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"only error message\",\"line_number\":2,\"original_line\":\"\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message"
+ ),
+ Arguments.of(
+ "multiple valid details append without extra colon",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"},{\"error_message\":\"second issue\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)\n"
+ + "\tsecond issue"
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("errorFromBodyV3FallbackCases")
+ public void errorFromBodyV3FallbackCase(final String testName,
+ final String body,
+ final String expectedMessage) {
+
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ body));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Assertions.assertThatThrownBy(
+ () -> restClient.request("ping", HttpMethod.GET, null, null, null)
+ )
+ .isInstanceOf(InfluxDBApiException.class)
+ .hasMessage(expectedMessage);
+ }
+
+ private static Stream errorFromBodyV3FallbackCases() {
+ return Stream.of(
+ Arguments.of(
+ "missing error with data array falls back to body",
+ "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: "
+ + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}"
+ ),
+ Arguments.of(
+ "empty error with data array falls back to body",
+ "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ + "\"bad lp\"}]}",
+ "HTTP status code: 400; Message: "
+ + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ + "\"bad lp\"}]}"
+ ),
+ Arguments.of(
+ "data object without error_message falls back to error",
+ "{\"error\":\"parsing failed\",\"data\":{}}",
+ "HTTP status code: 400; Message: parsing failed"
+ ),
+ Arguments.of(
+ "data object with empty error_message falls back to error",
+ "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}",
+ "HTTP status code: 400; Message: parsing failed"
+ ),
+ Arguments.of(
+ "data string falls back to error",
+ "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}",
+ "HTTP status code: 400; Message: parsing failed"
+ ),
+ Arguments.of(
+ "data number falls back to error",
+ "{\"error\":\"parsing failed\",\"data\":123}",
+ "HTTP status code: 400; Message: parsing failed"
+ )
+ );
+ }
+
@Test
public void errorFromBodyText() {
@@ -474,7 +754,8 @@ public void errorHttpExceptionThrown() {
String retryDate = Instant.now().plus(300, ChronoUnit.SECONDS).toString();
mockServer.enqueue(createResponse(503,
- Map.of("retry-after", retryDate, "content-type", "application/json"),
+ "application/json",
+ Map.of("retry-after", retryDate),
"{\"message\":\"temporarily offline\"}"));
restClient = new RestClient(new ClientConfig.Builder()