diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b0ba711..c68898b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ ### Features -1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Update Apache Flight client dependencies. +1. [#352](https://github.com/InfluxCommunity/influxdb3-java/pull/352): Upgrade Arrow Flight client dependencies. + +### Bug Fixes + +1. [#351](https://github.com/InfluxCommunity/influxdb3-java/pull/351): Enterprise/Core structured errors handling. ### CI diff --git a/pom.xml b/pom.xml index a67ad2cf..e9b2daad 100644 --- a/pom.xml +++ b/pom.xml @@ -317,6 +317,13 @@ test + + org.junit.jupiter + junit-jupiter-params + ${junit-jupiter.version} + test + + org.assertj assertj-core diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 0453e662..6fb0e05b 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -217,22 +217,12 @@ HttpResponse request(@Nonnull final String path, int statusCode = response.statusCode(); if (statusCode < 200 || statusCode >= 300) { - String reason = ""; + String reason; String body = response.body(); - if (!body.isEmpty()) { - try { - final JsonNode root = objectMapper.readTree(body); - final List possibilities = List.of("message", "error_message", "error"); - for (final String field : possibilities) { - final JsonNode node = root.findValue(field); - if (node != null) { - reason = node.asText(); - break; - } - } - } catch (JsonProcessingException e) { - LOG.debug("Can't parse msg from response {}", response); - } + reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null)); + + if (reason == null) { + reason = ""; } if (reason.isEmpty()) { @@ -257,6 +247,103 @@ HttpResponse request(@Nonnull final String path, return response; } + @Nullable + private String formatErrorMessage(@Nonnull final String body, @Nullable final String contentType) { + if (body.isEmpty()) { + return null; + } + + if (contentType != null + && !contentType.isEmpty() + && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + return null; + } + + try { + final JsonNode root = objectMapper.readTree(body); + if (!root.isObject()) { + return null; + } + + final String rootMessage = errNonEmptyField(root, "message"); + if (rootMessage != null) { + return rootMessage; + } + + final String error = errNonEmptyField(root, "error"); + final JsonNode dataNode = root.get("data"); + + // InfluxDB 3 Core/Enterprise write error format: + // {"error":"...","data":[{"error_message":"...","line_number":2,"original_line":"..."}]} + if (error != null && dataNode != null && dataNode.isArray()) { + final StringBuilder message = new StringBuilder(error); + boolean hasDetails = false; + for (JsonNode item : dataNode) { + final String detail = errFormatDataArrayDetail(item); + if (detail == null) { + continue; + } + if (!hasDetails) { + message.append(':'); + hasDetails = true; + } + message.append("\n\t").append(detail); + } + return message.toString(); + } + + // Core/Enterprise object format: + // {"error":"...","data":{"error_message":"..."}} + final String errorMessage = errNonEmptyField(dataNode, "error_message"); + if (errorMessage != null) { + return errorMessage; + } + + return error; + } catch (JsonProcessingException e) { + LOG.debug("Can't parse msg from response body {}", body, e); + return null; + } + } + + @Nullable + private String errNonEmptyText(@Nullable final JsonNode node) { + if (node == null || node.isNull()) { + return null; + } + final String value = node.asText(); + return value.isEmpty() ? null : value; + } + + @Nullable + private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final String fieldName) { + if (object == null || !object.isObject()) { + return null; + } + return errNonEmptyText(object.get(fieldName)); + } + + @Nullable + private String errFormatDataArrayDetail(@Nullable final JsonNode item) { + if (!item.isObject()) { + return null; + } + + final String errorMessage = errNonEmptyField(item, "error_message"); + if (errorMessage == null) { + return null; + } + + if (item.hasNonNull("line_number")) { + final String originalLine = errNonEmptyField(item, "original_line"); + if (originalLine != null) { + final String lineNumber = item.get("line_number").asText(); + return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")"; + } + } + return errorMessage; + } + private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { try { KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); diff --git a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java index 44b05560..75478e07 100644 --- a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java +++ b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java @@ -65,10 +65,23 @@ protected MockResponse createResponse(final int responseCode, @Nullable final Map headers, @Nullable final String body) { + return createResponse(responseCode, "text/csv; charset=utf-8", headers, body); + } + + @Nonnull + protected MockResponse createResponse(final int responseCode, + @Nullable final String contentType, + @Nullable final Map headers, + @Nullable final String body) { + MockResponse.Builder mrb = new MockResponse.Builder(); mrb.code(responseCode); - Map effectiveHeaders = new HashMap<>(Map.of("Content-Type", "text/csv; charset=utf-8", - "Date", "Tue, 26 Jun 2018 13:15:01 GMT")); + Map effectiveHeaders = new HashMap<>(Map.of( + "Date", "Tue, 26 Jun 2018 13:15:01 GMT" + )); + if (contentType != null) { + effectiveHeaders.put("Content-Type", contentType); + } if (headers != null) { effectiveHeaders.putAll(headers); } @@ -85,9 +98,6 @@ protected MockResponse createResponse(final int responseCode, @Nonnull protected MockResponse createResponse(final int responseCode) { - return createResponse(responseCode, Map.of( - "Content-Type", "text/csv; charset=utf-8", - "Date", "Tue, 26 Jun 2018 13:15:01 GMT" - ), null); + return createResponse(responseCode, "text/csv; charset=utf-8", null, null); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 4d3a2cba..cd4242d7 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -517,7 +517,8 @@ void defaultTags() throws InterruptedException { @Test public void retryHandled429Test() { mockServer.enqueue(createResponse(429, - Map.of("retry-after", "42", "content-type", "application/json"), + "application/json", + Map.of("retry-after", "42"), "{ \"message\" : \"Too Many Requests\" }")); Point point = Point.measurement("mem") diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 10cbd110..b6d4a26b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import io.netty.handler.codec.http.HttpMethod; import mockwebserver3.MockResponse; @@ -42,6 +43,9 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import com.influxdb.v3.client.AbstractMockServerTest; import com.influxdb.v3.client.InfluxDBApiException; @@ -387,6 +391,7 @@ public void errorFromHeader() { public void errorFromBody() { mockServer.enqueue(createResponse(401, + "application/json", Map.of("X-Influx-Errpr", "not used"), "{\"message\":\"token does not have sufficient permissions\"}")); @@ -402,9 +407,100 @@ public void errorFromBody() { } @Test - public void errorFromBodyEdgeWithoutMessage() { // OSS/Edge error message + public void errorFromBodyIgnoredForNonJsonContentType() { + mockServer.enqueue(createResponse(400, + "text/plain", + null, + "{\"message\":\"token does not have sufficient permissions\"}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: {\"message\":\"token does not have sufficient permissions\"}"); + } + @Test + public void errorFromBodyInvalidJsonFallsBackToBody() { mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"message\":\"token does not have sufficient permissions\"")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: {\"message\":\"token does not have sufficient permissions\""); + } + + @Test + public void errorFromBodyNullMessageFallsBackToError() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"message\":null,\"error\":\"parsing failed\"}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: parsing failed"); + } + + @Test + public void errorFromBodyEmptyMessageFallsBackToError() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"message\":\"\",\"error\":\"parsing failed\"}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: parsing failed"); + } + + @Test + public void errorFromBodyJsonArrayFallsBackToBody() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "[]")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: []"); + } + + @Test + public void errorFromBodyV3WithoutMessageAndEmptyContentType() { + + mockServer.enqueue(createResponse(400, + "", null, "{\"error\":\"parsing failed\"}")); @@ -420,9 +516,29 @@ public void errorFromBodyEdgeWithoutMessage() { // OSS/Edge error message } @Test - public void errorFromBodyEdgeWithMessage() { // OSS/Edge specific error message + public void errorFromBodyV3WithoutMessageAndWithoutContentType() { mockServer.enqueue(createResponse(400, + null, + null, + "{\"error\":\"parsing failed\"}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: parsing failed"); + } + + @Test + public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format + + mockServer.enqueue(createResponse(400, + "application/json", null, "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}")); @@ -437,6 +553,170 @@ public void errorFromBodyEdgeWithMessage() { // OSS/Edge specific error message .hasMessage("HTTP status code: 400; Message: invalid field value"); } + @Test + public void errorFromBodyV3WithDataArray() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float\",\"line_number\":2," + + "\"original_line\":\"testa6a3ad v=1 17702\"}]}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("errorFromBodyV3WithDataArrayCases") + public void errorFromBodyV3WithDataArrayCase(final String testName, + final String body, + final String expectedMessage) { + + mockServer.enqueue(createResponse(400, + "application/json", + null, + body)); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage(expectedMessage); + } + + private static Stream errorFromBodyV3WithDataArrayCases() { + return Stream.of( + Arguments.of( + "message-only detail", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"only error message\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message" + ), + Arguments.of( + "non-object item skipped", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[null,{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "no detail fields", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"line_number\":2}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "empty error_message skipped", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"\"}," + + "{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "non-object primitive item skipped", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "null error_message skipped", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":null}," + + "{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "empty original_line uses message-only detail", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"only error message\",\"line_number\":2,\"original_line\":\"\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message" + ), + Arguments.of( + "multiple valid details append without extra colon", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"},{\"error_message\":\"second issue\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)\n" + + "\tsecond issue" + ) + ); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("errorFromBodyV3FallbackCases") + public void errorFromBodyV3FallbackCase(final String testName, + final String body, + final String expectedMessage) { + + mockServer.enqueue(createResponse(400, + "application/json", + null, + body)); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null) + ) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage(expectedMessage); + } + + private static Stream errorFromBodyV3FallbackCases() { + return Stream.of( + Arguments.of( + "missing error with data array falls back to body", + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: " + + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}" + ), + Arguments.of( + "empty error with data array falls back to body", + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + + "\"bad lp\"}]}", + "HTTP status code: 400; Message: " + + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + + "\"bad lp\"}]}" + ), + Arguments.of( + "data object without error_message falls back to error", + "{\"error\":\"parsing failed\",\"data\":{}}", + "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "data object with empty error_message falls back to error", + "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}", + "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "data string falls back to error", + "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}", + "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "data number falls back to error", + "{\"error\":\"parsing failed\",\"data\":123}", + "HTTP status code: 400; Message: parsing failed" + ) + ); + } + @Test public void errorFromBodyText() { @@ -474,7 +754,8 @@ public void errorHttpExceptionThrown() { String retryDate = Instant.now().plus(300, ChronoUnit.SECONDS).toString(); mockServer.enqueue(createResponse(503, - Map.of("retry-after", retryDate, "content-type", "application/json"), + "application/json", + Map.of("retry-after", retryDate), "{\"message\":\"temporarily offline\"}")); restClient = new RestClient(new ClientConfig.Builder()