diff --git a/CHANGELOG.md b/CHANGELOG.md index df16256c..d70e2b9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## 1.9.0 [unreleased] +### BREAKING CHANGES + +1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Adds partial writes support and aligns write routing with v3 defaults. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. + For InfluxDB Clustered version, set `useV2Api=true` for writing. + ### Features 1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client. diff --git a/README.md b/README.md index 7c7c6c1e..4bf6c39b 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ dependencies { ## Usage +### Create client + To start with the client, import the `com.influxdb.v3.client` package and create a `InfluxDBClient` by: ```java @@ -73,6 +75,7 @@ import java.util.List; import java.util.stream.Stream; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.write.WriteOptions; @@ -90,7 +93,9 @@ public class IOxExample { } ``` -to insert data, you can use code like this: +### Write + +To insert data, you can use code like this: ```java // @@ -113,6 +118,29 @@ client.writePoint( orderedTagWrite ); +// +// Write with partial acceptance (default behavior) +// +WriteOptions partialWrite = new WriteOptions.Builder().build(); +try { + client.writeRecords(List.of( + "temperature,region=west value=20.0", + "temperature,region=west value=\"bad\"" + ), partialWrite); +} catch (InfluxDBPartialWriteException e) { + // Inspect failed line details. + e.lineErrors().forEach(line -> + System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine())); +} + +// +// Write via v2 compatibility endpoint (InfluxDB Clustered) +// +WriteOptions useV2 = new WriteOptions.Builder() + .useV2Api(true) + .build(); +client.writeRecord("temperature,location=north value=60.0", useV2); + // // Write by LineProtocol // @@ -120,7 +148,31 @@ String record = "temperature,location=north value=60.0"; client.writeRecord(record); ``` -to query your data, you can use code like this: +#### Accept partial writes and inspect failed lines + +Partial writes are enabled by default. +`acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`. + +Set `acceptPartial(false)` to disable partial writes. +With InfluxDB Core/Enterprise, when a write request fails due to one or more invalid lines, the error message starts with: + +- `partial write of line protocol occurred` when partial writes are enabled. +- `parsing failed for write_lp endpoint` when partial writes are disabled. + +When partial writes are disabled, any rejected line causes all lines to be rejected. +InfluxDB Clustered does not return this structured partial-write error format. + +#### Compatibility with InfluxDB Clustered + +For InfluxDB Clustered, enable `useV2Api` for writes. +Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`. + +If `useV2Api` is set, `acceptPartial` is ignored because this compatibility mode does not support partial-write controls. +Any rejected line causes all lines to be rejected. + +### Query + +To query your data, you can use code like this: ```java // diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index ec7208cd..181b914c 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -557,6 +557,8 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { *
  • precision - timestamp precision when writing data
  • *
  • gzipThreshold - payload size size for gzipping data
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • + *
  • writeUseV2Api - use v2 compatibility write endpoint
  • * * * @param connectionString connection string @@ -590,6 +592,8 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
  • INFLUX_PRECISION - timestamp precision when writing data
  • *
  • INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
  • *
  • INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
  • + *
  • INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes
  • + *
  • INFLUX_WRITE_USE_V2_API - use v2 compatibility write endpoint
  • * * Supported system properties: * * * @return instance of {@link InfluxDBClient} diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java new file mode 100644 index 00000000..9e7a2494 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java @@ -0,0 +1,110 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.net.http.HttpHeaders; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * HTTP exception for partial write errors returned by InfluxDB 3 write endpoint. + * Contains parsed line-level write errors so callers can decide how to handle failed lines. + */ +public class InfluxDBPartialWriteException extends InfluxDBApiHttpException { + + private final List lineErrors; + + /** + * Construct a new InfluxDBPartialWriteException. + * + * @param message detail message + * @param headers response headers + * @param statusCode response status code + * @param lineErrors line-level errors parsed from response body + */ + public InfluxDBPartialWriteException( + @Nullable final String message, + @Nullable final HttpHeaders headers, + final int statusCode, + @Nonnull final List lineErrors) { + super(message, headers, statusCode); + this.lineErrors = List.copyOf(lineErrors); + } + + /** + * Line-level write errors. + * + * @return immutable list of line errors + */ + @Nonnull + public List lineErrors() { + return lineErrors; + } + + /** + * Represents one failed line from a partial write response. + */ + public static final class LineError { + + private final Integer lineNumber; + private final String errorMessage; + private final String originalLine; + + /** + * @param lineNumber line number in the write payload; may be null if not provided by server + * @param errorMessage line-level error message + * @param originalLine original line protocol row; may be null if not provided by server + */ + public LineError(@Nullable final Integer lineNumber, + @Nonnull final String errorMessage, + @Nullable final String originalLine) { + this.lineNumber = lineNumber; + this.errorMessage = errorMessage; + this.originalLine = originalLine; + } + + /** + * @return line number or null if server didn't provide it + */ + @Nullable + public Integer lineNumber() { + return lineNumber; + } + + /** + * @return line-level error message + */ + @Nonnull + public String errorMessage() { + return errorMessage; + } + + /** + * @return original line protocol row or null if server didn't provide it + */ + @Nullable + public String originalLine() { + return originalLine; + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index 4326f3a5..11021d6d 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -57,6 +57,8 @@ *
  • defaultTags - defaultTags added when writing points to InfluxDB
  • *
  • gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • + *
  • writeUseV2Api - use v2 compatibility write endpoint
  • *
  • timeout - deprecated in 1.4.0 timeout when connecting to InfluxDB, * please use more informative properties writeTimeout and queryTimeout
  • *
  • writeTimeout - timeout when writing data to InfluxDB
  • @@ -107,6 +109,8 @@ public final class ClientConfig { private final WritePrecision writePrecision; private final Integer gzipThreshold; private final Boolean writeNoSync; + private final Boolean writeAcceptPartial; + private final Boolean writeUseV2Api; private final Map defaultTags; @Deprecated private final Duration timeout; @@ -208,6 +212,26 @@ public Boolean getWriteNoSync() { return writeNoSync; } + /** + * Accept partial writes? + * + * @return accept partial writes + */ + @Nonnull + public Boolean getWriteAcceptPartial() { + return writeAcceptPartial; + } + + /** + * Use v2 compatibility write endpoint? + * + * @return use v2 compatibility write endpoint + */ + @Nonnull + public Boolean getWriteUseV2Api() { + return writeUseV2Api; + } + /** * Gets default tags used when writing points. * @return default tags @@ -370,6 +394,8 @@ public boolean equals(final Object o) { && writePrecision == that.writePrecision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(writeNoSync, that.writeNoSync) + && Objects.equals(writeAcceptPartial, that.writeAcceptPartial) + && Objects.equals(writeUseV2Api, that.writeUseV2Api) && Objects.equals(defaultTags, that.defaultTags) && Objects.equals(timeout, that.timeout) && Objects.equals(writeTimeout, that.writeTimeout) @@ -388,7 +414,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, - database, writePrecision, gzipThreshold, writeNoSync, + database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, writeUseV2Api, timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors); @@ -403,6 +429,8 @@ public String toString() { .add("writePrecision=" + writePrecision) .add("gzipThreshold=" + gzipThreshold) .add("writeNoSync=" + writeNoSync) + .add("writeAcceptPartial=" + writeAcceptPartial) + .add("writeUseV2Api=" + writeUseV2Api) .add("timeout=" + timeout) .add("writeTimeout=" + writeTimeout) .add("queryTimeout=" + queryTimeout) @@ -432,6 +460,8 @@ public static final class Builder { private WritePrecision writePrecision; private Integer gzipThreshold; private Boolean writeNoSync; + private Boolean writeAcceptPartial; + private Boolean writeUseV2Api; private Map defaultTags; @Deprecated private Duration timeout; @@ -554,6 +584,32 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) { return this; } + /** + * Sets whether to accept partial writes. + * + * @param writeAcceptPartial accept partial writes + * @return this + */ + @Nonnull + public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) { + + this.writeAcceptPartial = writeAcceptPartial; + return this; + } + + /** + * Sets whether to use v2 compatibility write endpoint. + * + * @param writeUseV2Api use v2 compatibility write endpoint + * @return this + */ + @Nonnull + public Builder writeUseV2Api(@Nullable final Boolean writeUseV2Api) { + + this.writeUseV2Api = writeUseV2Api; + return this; + } + /** * Sets default tags to be written with points. * @@ -800,6 +856,12 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform if (parameters.containsKey("writeNoSync")) { this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync"))); } + if (parameters.containsKey("writeAcceptPartial")) { + this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial"))); + } + if (parameters.containsKey("writeUseV2Api")) { + this.writeUseV2Api(Boolean.parseBoolean(parameters.get("writeUseV2Api"))); + } if (parameters.containsKey("disableGRPCCompression")) { this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression"))); } @@ -855,6 +917,14 @@ public ClientConfig build(@Nonnull final Map env, final Properti if (writeNoSync != null) { this.writeNoSync(Boolean.parseBoolean(writeNoSync)); } + final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial"); + if (writeAcceptPartial != null) { + this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial)); + } + final String writeUseV2Api = get.apply("INFLUX_WRITE_USE_V2_API", "influx.writeUseV2Api"); + if (writeUseV2Api != null) { + this.writeUseV2Api(Boolean.parseBoolean(writeUseV2Api)); + } final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout"); if (writeTimeout != null) { long to = Long.parseLong(writeTimeout); @@ -911,6 +981,12 @@ private ClientConfig(@Nonnull final Builder builder) { writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION; gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD; writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC; + writeAcceptPartial = builder.writeAcceptPartial != null + ? builder.writeAcceptPartial + : WriteOptions.DEFAULT_ACCEPT_PARTIAL; + writeUseV2Api = builder.writeUseV2Api != null + ? builder.writeUseV2Api + : WriteOptions.DEFAULT_USE_V2_API; defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); writeTimeout = builder.writeTimeout != null diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 8c2ce466..3dc05155 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -305,27 +305,33 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } WritePrecision precision = options.precisionSafe(config); + options.validate(config); String path; Map queryParams; boolean noSync = options.noSyncSafe(config); - if (noSync) { - // Setting no_sync=true is supported only in the v3 API. - path = "api/v3/write_lp"; + boolean acceptPartial = options.acceptPartialSafe(config); + boolean useV2Api = options.useV2ApiSafe(config); + if (useV2Api) { + path = "api/v2/write"; queryParams = new HashMap<>() {{ put("org", config.getOrganization()); - put("db", database); - put("precision", WritePrecisionConverter.toV3ApiString(precision)); - put("no_sync", "true"); + put("bucket", database); + put("precision", WritePrecisionConverter.toV2ApiString(precision)); }}; } else { - // By default, use the v2 API. - path = "api/v2/write"; + path = "api/v3/write_lp"; queryParams = new HashMap<>() {{ put("org", config.getOrganization()); - put("bucket", database); - put("precision", WritePrecisionConverter.toV2ApiString(precision)); + put("db", database); + put("precision", WritePrecisionConverter.toV3ApiString(precision)); }}; + if (noSync) { + queryParams.put("no_sync", "true"); + } + if (!acceptPartial) { + queryParams.put("accept_partial", "false"); + } } Map defaultTags = options.defaultTagsSafe(config); @@ -373,10 +379,11 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti try { restClient.request(path, HttpMethod.POST, body, queryParams, headers); } catch (InfluxDBApiHttpException e) { - if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { - // Server does not support the v3 write API, can't use the NoSync option. - throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " - + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + if (!useV2Api && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { + throw new InfluxDBApiHttpException("Server doesn't support v3 write API. " + + "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint.", + e.headers(), + e.statusCode()); } throw e; } diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 6fb0e05b..de2aa116 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -36,6 +36,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; @@ -46,6 +47,7 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -57,6 +59,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; final class RestClient implements AutoCloseable { @@ -219,7 +222,8 @@ HttpResponse request(@Nonnull final String path, if (statusCode < 200 || statusCode >= 300) { String reason; String body = response.body(); - reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null)); + String contentType = response.headers().firstValue("Content-Type").orElse(null); + reason = formatErrorMessage(body, contentType); if (reason == null) { reason = ""; @@ -241,6 +245,11 @@ HttpResponse request(@Nonnull final String path, } String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); + List lineErrors = + parsePartialWriteLineErrors(body, contentType); + if (!lineErrors.isEmpty()) { + throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors); + } throw new InfluxDBApiHttpException(message, response.headers(), response.statusCode()); } @@ -253,9 +262,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St return null; } - if (contentType != null - && !contentType.isEmpty() - && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + if (!errIsJsonLikeContentType(contentType)) { return null; } @@ -278,11 +285,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St if (error != null && dataNode != null && dataNode.isArray()) { final StringBuilder message = new StringBuilder(error); boolean hasDetails = false; - for (JsonNode item : dataNode) { - final String detail = errFormatDataArrayDetail(item); - if (detail == null) { - continue; - } + for (String detail : errFormatDataArrayDetails(dataNode)) { if (!hasDetails) { message.append(':'); hasDetails = true; @@ -294,9 +297,11 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St // Core/Enterprise object format: // {"error":"...","data":{"error_message":"..."}} - final String errorMessage = errNonEmptyField(dataNode, "error_message"); - if (errorMessage != null) { - return errorMessage; + if (isV3PartialWriteError(error) && dataNode != null && dataNode.isObject()) { + final String errorMessage = errNonEmptyField(dataNode, "error_message"); + return errorMessage == null + ? error + : error + ":\n\t" + errorMessage; } return error; @@ -306,12 +311,93 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St } } + @Nonnull + private List parsePartialWriteLineErrors( + @Nonnull final String body, + @Nullable final String contentType) { + if (body.isEmpty()) { + return List.of(); + } + + if (!errIsJsonLikeContentType(contentType)) { + return List.of(); + } + + try { + final JsonNode root = objectMapper.readTree(body); + if (!root.isObject()) { + return List.of(); + } + + final String error = errNonEmptyField(root, "error"); + final JsonNode dataNode = root.get("data"); + if (!isV3PartialWriteError(error) || dataNode == null) { + return List.of(); + } + + if (dataNode.isArray()) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed == null) { + return List.of(); + } + + final List lineErrors = new ArrayList<>(); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + if (lineError != null) { + lineErrors.add(lineError); + } + } + return lineErrors; + } + + if (dataNode.isObject()) { + try { + final ErrDataArrayItem item = objectMapper.treeToValue(dataNode, ErrDataArrayItem.class); + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + return lineError == null ? List.of() : List.of(lineError); + } catch (JsonProcessingException e) { + return List.of(); + } + } + + return List.of(); + } catch (JsonProcessingException e) { + LOG.debug("Can't parse line errors from response body {}", body, e); + return List.of(); + } + } + + private boolean isV3PartialWriteError(@Nullable final String errorMessage) { + if (errorMessage == null || errorMessage.isEmpty()) { + return false; + } + String normalized = errorMessage.toLowerCase(Locale.ROOT); + return normalized.contains("partial write of line protocol occurred") + || normalized.contains("parsing failed for write_lp endpoint"); + } + + private boolean errIsJsonLikeContentType(@Nullable final String contentType) { + return contentType == null + || contentType.isEmpty() + || contentType.regionMatches(true, 0, "application/json", 0, "application/json".length()); + } + @Nullable private String errNonEmptyText(@Nullable final JsonNode node) { if (node == null || node.isNull()) { return null; } - final String value = node.asText(); + + final String value; + if (node.isTextual()) { + value = node.asText(); + } else if (node.isNumber() || node.isBoolean()) { + value = node.asText(); + } else { + value = node.toString(); + } + return value.isEmpty() ? null : value; } @@ -323,25 +409,68 @@ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final return errNonEmptyText(object.get(fieldName)); } + @Nonnull + private List errFormatDataArrayDetails(@Nonnull final JsonNode dataNode) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed != null) { + final List details = new ArrayList<>(); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + if (lineError == null) { + continue; + } + + if (lineError.lineNumber() != null + && lineError.originalLine() != null + && !lineError.originalLine().isEmpty()) { + details.add("line " + lineError.lineNumber() + ": " + + lineError.errorMessage() + " (" + lineError.originalLine() + ")"); + } else { + details.add(lineError.errorMessage()); + } + } + return details; + } + + final List details = new ArrayList<>(); + for (JsonNode item : dataNode) { + final String raw = errNonEmptyText(item); + if (raw != null) { + details.add(raw); + } + } + return details; + } + @Nullable - private String errFormatDataArrayDetail(@Nullable final JsonNode item) { - if (!item.isObject()) { + private ErrDataArrayItem[] errReadDataArray(@Nonnull final JsonNode dataNode) { + try { + return objectMapper.treeToValue(dataNode, ErrDataArrayItem[].class); + } catch (JsonProcessingException e) { return null; } + } - final String errorMessage = errNonEmptyField(item, "error_message"); - if (errorMessage == null) { + @Nullable + private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final ErrDataArrayItem item) { + if (item == null || item.errorMessage == null || item.errorMessage.isEmpty()) { return null; } - if (item.hasNonNull("line_number")) { - final String originalLine = errNonEmptyField(item, "original_line"); - if (originalLine != null) { - final String lineNumber = item.get("line_number").asText(); - return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")"; - } - } - return errorMessage; + final String originalLine = + (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine; + return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine); + } + + private static final class ErrDataArrayItem { + @JsonProperty("error_message") + private String errorMessage; + + @JsonProperty("line_number") + private Integer lineNumber; + + @JsonProperty("original_line") + private String originalLine; } private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index e9b4ca1b..00618ec1 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -43,6 +43,9 @@ *
  • precision - specifies the precision to use for the timestamp of points
  • *
  • defaultTags - specifies tags to be added by default to all write operations using points.
  • *
  • tagOrder - specifies preferred tag order for point serialization.
  • + *
  • noSync - skip waiting for WAL persistence on write
  • + *
  • acceptPartial - accept partial writes
  • + *
  • useV2Api - use v2 compatibility write endpoint
  • *
  • headers - specifies the headers to be added to write request
  • * *

    @@ -68,6 +71,14 @@ public final class WriteOptions { * Default NoSync. */ public static final boolean DEFAULT_NO_SYNC = false; + /** + * Default AcceptPartial. + */ + public static final boolean DEFAULT_ACCEPT_PARTIAL = true; + /** + * Default UseV2Api. + */ + public static final boolean DEFAULT_USE_V2_API = false; /** * Default timeout for writes in seconds. Set to {@value} @@ -81,12 +92,15 @@ public final class WriteOptions { @Deprecated(forRemoval = true) public static final WriteOptions DEFAULTS = new WriteOptions( - null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null, null); + null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL, + DEFAULT_USE_V2_API, null, null, null); private final String database; private final WritePrecision precision; private final Integer gzipThreshold; private final Boolean noSync; + private final Boolean acceptPartial; + private final Boolean useV2Api; private final Map defaultTags; private final List tagOrder; private final Map headers; @@ -99,6 +113,7 @@ public final class WriteOptions { */ public static WriteOptions defaultWriteOptions() { return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, + DEFAULT_ACCEPT_PARTIAL, DEFAULT_USE_V2_API, null, null, null); } @@ -152,7 +167,7 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync) { - this(database, precision, gzipThreshold, noSync, null, null); + this(database, precision, gzipThreshold, noSync, null, null, null); } /** @@ -184,7 +199,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Integer gzipThreshold, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, null, defaultTags, headers); + this(database, precision, gzipThreshold, null, null, defaultTags, headers, null); } /** @@ -209,7 +224,38 @@ public WriteOptions(@Nullable final String database, @Nullable final Boolean noSync, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, noSync, defaultTags, headers, null); + this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, null); + } + + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param acceptPartial Request partial write acceptance. + * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + * @param tagOrder Preferred order of tags in line protocol serialization. + * Null or empty tag names are ignored. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Boolean acceptPartial, + @Nullable final Map defaultTags, + @Nullable final Map headers, + @Nullable final List tagOrder) { + this(database, precision, gzipThreshold, noSync, acceptPartial, null, defaultTags, headers, tagOrder); } /** @@ -223,6 +269,10 @@ public WriteOptions(@Nullable final String database, * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. * @param noSync Skip waiting for WAL persistence on write. * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param acceptPartial Request partial write acceptance. + * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. + * @param useV2Api Use v2 compatibility write endpoint. + * If it is not specified then use {@link WriteOptions#DEFAULT_USE_V2_API}. * @param defaultTags Default tags to be added when writing points. * @param headers The headers to be added to write request. * The headers specified here are preferred over the headers @@ -234,6 +284,8 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync, + @Nullable final Boolean acceptPartial, + @Nullable final Boolean useV2Api, @Nullable final Map defaultTags, @Nullable final Map headers, @Nullable final List tagOrder) { @@ -241,11 +293,41 @@ public WriteOptions(@Nullable final String database, this.precision = precision; this.gzipThreshold = gzipThreshold; this.noSync = noSync; + this.acceptPartial = acceptPartial; + this.useV2Api = useV2Api; this.defaultTags = defaultTags == null ? Map.of() : defaultTags; this.tagOrder = sanitizeTagOrder(tagOrder); this.headers = headers == null ? Map.of() : headers; } + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + * @param tagOrder Preferred order of tags in line protocol serialization. + * Null or empty tag names are ignored. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Map defaultTags, + @Nullable final Map headers, + @Nullable final List tagOrder) { + this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, tagOrder); + } + /** * @param config with default value * @return The destination database for writes. @@ -299,8 +381,37 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) { */ public boolean noSyncSafe(@Nonnull final ClientConfig config) { Arguments.checkNotNull(config, "config"); - return noSync != null ? noSync - : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC); + return noSync != null ? noSync : config.getWriteNoSync(); + } + + /** + * @param config with default value + * @return Accept partial write. + */ + public boolean acceptPartialSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial(); + } + + /** + * @param config with default value + * @return Use v2 compatibility write endpoint. + */ + public boolean useV2ApiSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return useV2Api != null ? useV2Api : config.getWriteUseV2Api(); + } + + /** + * Validate write option combinations. + * + * @param config with default values + */ + public void validate(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + if (useV2ApiSafe(config) && noSyncSafe(config)) { + throw new IllegalArgumentException("invalid write options: NoSync cannot be used in V2 API"); + } } /** @@ -332,6 +443,8 @@ public boolean equals(final Object o) { && precision == that.precision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(noSync, that.noSync) + && Objects.equals(acceptPartial, that.acceptPartial) + && Objects.equals(useV2Api, that.useV2Api) && defaultTags.equals(that.defaultTags) && tagOrder.equals(that.tagOrder) && headers.equals(that.headers); @@ -339,7 +452,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, tagOrder, headers); + return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, useV2Api, defaultTags, tagOrder, + headers); } private boolean isNotDefined(final String option) { @@ -368,6 +482,8 @@ public static final class Builder { private WritePrecision precision; private Integer gzipThreshold; private Boolean noSync; + private Boolean acceptPartial; + private Boolean useV2Api; private Map defaultTags = new HashMap<>(); private List tagOrder = List.of(); private Map headers = new HashMap<>(); @@ -424,6 +540,32 @@ public Builder noSync(@Nonnull final Boolean noSync) { return this; } + /** + * Sets whether to request partial write acceptance. + * + * @param acceptPartial request partial write acceptance + * @return this + */ + @Nonnull + public Builder acceptPartial(@Nonnull final Boolean acceptPartial) { + + this.acceptPartial = acceptPartial; + return this; + } + + /** + * Sets whether to use v2 compatibility write endpoint. + * + * @param useV2Api use v2 compatibility write endpoint + * @return this + */ + @Nonnull + public Builder useV2Api(@Nonnull final Boolean useV2Api) { + + this.useV2Api = useV2Api; + return this; + } + /** * Sets defaultTags. * @@ -473,7 +615,8 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, - builder.headers, builder.tagOrder); + this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial, + builder.useV2Api, + builder.defaultTags, builder.headers, builder.tagOrder); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index a7d31240..4382b646 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -27,7 +27,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import io.netty.handler.codec.http.HttpResponseStatus; import mockwebserver3.RecordedRequest; @@ -36,6 +39,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; @@ -106,7 +112,7 @@ void databaseParameter() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database"); } @Test @@ -119,7 +125,7 @@ void databaseParameterSpecified() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database-2"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database-2"); } @Test @@ -147,7 +153,7 @@ void precisionParameter() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); } @Test @@ -160,7 +166,7 @@ void precisionParameterSpecified() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second"); } @Test @@ -191,46 +197,64 @@ void gzipParameterSpecified() throws InterruptedException { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } - @Test - void writeNoSyncFalseUsesV2API() throws InterruptedException { + @ParameterizedTest(name = "{0}") + @MethodSource("writeRoutingCases") + void writeRoutingCase(final String name, + final Consumer configure, + final String expectedPath, + final String expectedDbParamName, + final String expectedPrecision, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial) throws InterruptedException { mockServer.enqueue(createResponse(200)); + WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.NS); + configure.accept(optionsBuilder); client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build()); + optionsBuilder.build()); assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v2/write"); - assertThat(request.getUrl().queryParameter("no_sync")).isNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns"); - - } - - @Test - void writeNoSyncTrueUsesV3API() throws InterruptedException { - mockServer.enqueue(createResponse(200)); - - client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build()); - - assertThat(mockServer.getRequestCount()).isEqualTo(1); - RecordedRequest request = mockServer.takeRequest(); - assertThat(request).isNotNull(); - assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); + assertThat(request.getUrl().encodedPath()).isEqualTo(expectedPath); + assertThat(request.getUrl().queryParameter(expectedDbParamName)).isEqualTo("my-database"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo(expectedPrecision); + assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); + } + + private static Stream writeRoutingCases() { + return Stream.of( + Arguments.of("v3 noSync=false", (Consumer) b -> b.noSync(false), + "/api/v3/write_lp", "db", "nanosecond", null, null), + Arguments.of("v3 noSync=true", (Consumer) b -> b.noSync(true), + "/api/v3/write_lp", "db", "nanosecond", "true", null), + Arguments.of("v3 acceptPartial=true", (Consumer) b -> b.acceptPartial(true), + "/api/v3/write_lp", "db", "nanosecond", null, null), + Arguments.of("v3 acceptPartial=false", (Consumer) b -> b.acceptPartial(false), + "/api/v3/write_lp", "db", "nanosecond", null, "false"), + Arguments.of("v2 useV2Api=true", (Consumer) b -> b.useV2Api(true), + "/api/v2/write", "bucket", "ns", null, null), + Arguments.of("v2 useV2Api=true, acceptPartial=false", + (Consumer) b -> b.useV2Api(true).acceptPartial(false), + "/api/v2/write", "bucket", "ns", null, null) + ); } - @Test - void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { + @ParameterizedTest(name = "{0}") + @MethodSource("writeV3MethodNotAllowedCases") + void writeV3MethodNotAllowedMappedError(final String name, + final Consumer configure, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial) throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); + WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.MS); + configure.accept(optionsBuilder); + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, - () -> client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) + () -> client.writeRecord("mem,tag=one value=1.0", optionsBuilder.build()) ); assertThat(mockServer.getRequestCount()).isEqualTo(1); @@ -238,12 +262,36 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true" - + " (supported by InfluxDB 3 Core/Enterprise servers only)."); + assertThat(ae.getMessage()).contains("Server doesn't support v3 write API. " + + "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint."); + } + + private static Stream writeV3MethodNotAllowedCases() { + return Stream.of( + Arguments.of("noSync=true", (Consumer) b -> b.noSync(true), "true", null), + Arguments.of( + "acceptPartial=true", + (Consumer) b -> b.acceptPartial(true), + null, + null + ) + ); + } + + @Test + void writeUseV2ApiNoSyncValidation() { + Throwable thrown = catchThrowable(() -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().useV2Api(true).noSync(true).build())); + + Assertions.assertThat(thrown) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid write options: NoSync cannot be used in V2 API"); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @Test @@ -256,7 +304,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -272,7 +320,35 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); + } + + @Test + void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writeAcceptPartial(true) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); + } + + @Test + void writeRecordWithDefaultWriteOptionsUseV2Config() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writeUseV2Api(true) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, null, null, false); } @Test @@ -285,7 +361,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -301,7 +377,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } @Test @@ -317,7 +393,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -336,7 +412,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } @Test @@ -352,7 +428,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -371,27 +447,26 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } private void checkWriteCalled(final String expectedPath, final String expectedDB, - final String expectedPrecision, final boolean expectedNoSync, + final String expectedPrecision, final boolean expectedV3, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial, final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getUrl(); assertThat(requestUrl).isNotNull(); assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); - if (expectedNoSync) { + if (expectedV3) { assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); } else { assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); } assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision); - if (expectedNoSync) { - assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true"); - } else { - assertThat(requestUrl.queryParameter("no_sync")).isNull(); - } + assertThat(requestUrl.queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); if (expectedGzip) { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } else { @@ -420,8 +495,8 @@ void allParameterSpecified() throws InterruptedException { assertThat(request.getUrl()).isNotNull(); assertThat(request.getHeaders().get("Content-Type")).isEqualTo("text/plain; charset=utf-8"); assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s"); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("your-database"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("your-database"); } @Test diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index f5ce2e8f..272c3e4e 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -56,6 +56,21 @@ void equalConfig() { Assertions.assertThat(config).isEqualTo(config); Assertions.assertThat(config).isEqualTo(configBuilder.build()); Assertions.assertThat(config).isNotEqualTo(configBuilder); + Assertions.assertThat(config).isNotEqualTo(new ClientConfig.Builder() + .host("http://localhost:9999") + .token("my-token".toCharArray()) + .organization("my-org") + .database("my-db") + .writePrecision(WritePrecision.NS) + .writeAcceptPartial(false) + .timeout(Duration.ofSeconds(30)) + .writeTimeout(Duration.ofSeconds(35)) + .queryTimeout(Duration.ofSeconds(120)) + .allowHttpRedirects(true) + .disableServerCertificateValidation(true) + .headers(Map.of("X-device", "ab-01")) + .disableGRPCCompression(true) + .build()); Assertions.assertThat(config).isNotEqualTo(configBuilder.database("database").build()); } @@ -79,6 +94,8 @@ void toStringConfig() { Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true); Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true); Assertions.assertThat(configString).contains("writeNoSync=false"); + Assertions.assertThat(configString).contains("writeAcceptPartial=true"); + Assertions.assertThat(configString).contains("writeUseV2Api=false"); Assertions.assertThat(configString).contains("timeout=PT30S"); Assertions.assertThat(configString).contains("writeTimeout=PT35S"); Assertions.assertThat(configString).contains("queryTimeout=PT2M"); @@ -90,7 +107,8 @@ void toStringConfig() { void fromConnectionString() throws MalformedURLException { ClientConfig cfg = new ClientConfig.Builder() .build("http://localhost:9999/" - + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true"); + + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128" + + "&writeNoSync=true&writeAcceptPartial=true&writeUseV2Api=true"); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray()); Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org"); @@ -98,6 +116,8 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -109,6 +129,8 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -120,6 +142,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -208,6 +231,8 @@ void fromEnv() { "INFLUX_PRECISION", "ms", "INFLUX_GZIP_THRESHOLD", "64", "INFLUX_WRITE_NO_SYNC", "true", + "INFLUX_WRITE_ACCEPT_PARTIAL", "true", + "INFLUX_WRITE_USE_V2_API", "true", "INFLUX_DISABLE_GRPC_COMPRESSION", "true" ); @@ -220,6 +245,8 @@ void fromEnv() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } @@ -287,6 +314,8 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); // basic properties = new Properties(); @@ -324,6 +353,8 @@ void fromSystemProperties() { properties.put("influx.precision", "ms"); properties.put("influx.gzipThreshold", "64"); properties.put("influx.writeNoSync", "true"); + properties.put("influx.writeAcceptPartial", "true"); + properties.put("influx.writeUseV2Api", "true"); properties.put("influx.disableGRPCCompression", "true"); cfg = new ClientConfig.Builder() .build(new HashMap<>(), properties); @@ -334,6 +365,8 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 6ec5ee72..dc02f7da 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -41,7 +41,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; @@ -186,6 +188,116 @@ public void testQuery() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testAcceptPartialWriteError() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; + + WriteOptions options = new WriteOptions.Builder() + .acceptPartial(true) + .build(); + + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); + + String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string (home,room=Sunroom te)\n" + + "\tline 3: invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer (home,room=Sunroom te)"; + Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage); + + InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialError.lineErrors()).hasSize(2); + Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); + Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string"); + Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) + .isEqualTo("home,room=Sunroom te"); + + Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(3); + Assertions.assertThat(partialError.lineErrors().get(1).errorMessage()) + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer"); + Assertions.assertThat(partialError.lineErrors().get(1).originalLine()) + .isEqualTo("home,room=Sunroom te"); + + Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class); + Assertions.assertThat(partialError.statusCode()).isEqualTo(400); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testWriteErrorWithoutAcceptPartial() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; + + WriteOptions options = new WriteOptions.Builder() + .acceptPartial(false) + .build(); + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); + Assertions.assertThat(thrown.getMessage()) + .contains("parsing failed for write_lp endpoint"); + + InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialError.lineErrors()).hasSize(1); + Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); + Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string"); + Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) + .isEqualTo("home,room=Sunroom te"); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testWriteErrorWithUseV2Api() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; + + WriteOptions options = new WriteOptions.Builder() + .useV2Api(true) + .build(); + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); + Assertions.assertThat(thrown).isNotInstanceOf(InfluxDBPartialWriteException.class); + Assertions.assertThat(thrown.getMessage()) + .contains("write buffer error: line protocol parse failed"); + } + } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index b6d4a26b..25f18744 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -51,6 +51,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; @@ -540,17 +541,26 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format mockServer.enqueue(createResponse(400, "application/json", null, - "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}")); + "{\"error\":\"parsing failed for write_lp endpoint\",\"data\":{\"error_message\":\"invalid field value\"}}")); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage("HTTP status code: 400; Message: invalid field value"); + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .isInstanceOf(InfluxDBApiHttpException.class) + .hasMessage("HTTP status code: 400; Message: parsing failed for write_lp endpoint:\n" + + "\tinvalid field value"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400); + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isNull(); + Assertions.assertThat(lineError.errorMessage()).isEqualTo("invalid field value"); + Assertions.assertThat(lineError.originalLine()).isNull(); } @Test @@ -567,13 +577,43 @@ public void errorFromBodyV3WithDataArray() { .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isEqualTo(2); + Assertions.assertThat(lineError.errorMessage()) + .isEqualTo("invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float"); + Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702"); + } + + @Test + public void errorFromBodyV3WithDataArrayAnyInvalidItemFallsBackToHttpException() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}," + + "{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}]}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBApiHttpException.class) + .isNotInstanceOf(InfluxDBPartialWriteException.class) .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," - + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" + + "\t{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}"); } @ParameterizedTest(name = "{0}") @@ -630,7 +670,8 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":" + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: bad line (bad lp)" + + "\t1\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}" ), Arguments.of( "null error_message skipped", @@ -652,6 +693,47 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + "\tline 2: bad line (bad lp)\n" + "\tsecond issue" + ), + Arguments.of( + "array of strings fallback", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[\"bad line 1\",\"bad line 2\"]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tbad line 1\n" + + "\tbad line 2" + ), + Arguments.of( + "textual numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"2\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "textual non-numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}" + ), + Arguments.of( + "empty textual line_number with empty original_line", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"only error message\",\"line_number\":\"\",\"original_line\":\"\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message" + ), + Arguments.of( + "non-textual line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}" + ), + Arguments.of( + "object line_number preserved as text", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}" ) ); } @@ -659,11 +741,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() { @ParameterizedTest(name = "{0}") @MethodSource("errorFromBodyV3FallbackCases") public void errorFromBodyV3FallbackCase(final String testName, + final String requestPath, + final String contentType, final String body, + final Class expectedClass, final String expectedMessage) { mockServer.enqueue(createResponse(400, - "application/json", + contentType, null, body)); @@ -671,48 +756,125 @@ public void errorFromBodyV3FallbackCase(final String testName, .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage(expectedMessage); + Throwable thrown = catchThrowable(() -> restClient.request(requestPath, HttpMethod.GET, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(expectedClass) + .hasMessage(expectedMessage); } private static Stream errorFromBodyV3FallbackCases() { return Stream.of( Arguments.of( "missing error with data array falls back to body", + "ping", + "application/json", "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}" ), Arguments.of( "empty error with data array falls back to body", + "ping", + "application/json", "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}" ), Arguments.of( "data object without error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data object with empty error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data string falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data number falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":123}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "partial-write with invalid data string falls back to error", + "ping", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "partial-write with empty data object falls back to error", + "ping", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint ignores line-error parsing for non-json content type", + "api/v3/write_lp", + "text/plain", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: " + + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}" + ), + Arguments.of( + "write endpoint with non-object root falls back to body", + "api/v3/write_lp", + "application/json", + "[]", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: []" + ), + Arguments.of( + "write endpoint with invalid line-error object type falls back to http exception", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\"," + + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tbad line" + ), + Arguments.of( + "write endpoint with scalar data falls back to error", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":123}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint invalid json body falls back to raw body", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\"", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: {\"error\":\"partial write of line protocol occurred\"" ) ); } diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java index a1b7974c..fdf95b09 100644 --- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java @@ -57,9 +57,10 @@ void optionsBasics() { @Test void optionsEqualAll() { - WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true); + WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true, true, null, null, null); WriteOptions optionsViaBuilder = new WriteOptions.Builder() - .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build(); + .database("my-database").precision(WritePrecision.S).gzipThreshold(512) + .noSync(true).acceptPartial(true).build(); Assertions.assertThat(options).isEqualTo(optionsViaBuilder); @@ -67,6 +68,12 @@ void optionsEqualAll() { .database("my-database").precision(WritePrecision.S).gzipThreshold(1024).noSync(true).build(); WriteOptions noSyncMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(false).build(); + WriteOptions acceptPartialMismatch = new WriteOptions.Builder() + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) + .acceptPartial(false).build(); + WriteOptions useV2ApiMismatch = new WriteOptions.Builder() + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) + .useV2Api(true).build(); WriteOptions defaultTagsMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) .defaultTags(Map.of("region", "west")).build(); @@ -79,6 +86,8 @@ void optionsEqualAll() { Assertions.assertThat(options).isNotEqualTo(gzipMismatch); Assertions.assertThat(options).isNotEqualTo(noSyncMismatch); + Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch); + Assertions.assertThat(options).isNotEqualTo(useV2ApiMismatch); Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch); Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch); Assertions.assertThat(options).isNotEqualTo(headersMismatch); @@ -147,12 +156,16 @@ void optionsEmpty() { Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION); Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD); + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); Assertions.assertThat(options.tagOrderSafe()).isEmpty(); WriteOptions builderOptions = new WriteOptions.Builder().build(); Assertions.assertThat(builderOptions.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S); Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512); + Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(builderOptions.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); } @Test @@ -234,6 +247,46 @@ void optionsOverrideWriteNoSync() { Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false); } + @Test + void optionsOverrideWriteAcceptPartial() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeAcceptPartial(false) + .build(); + + WriteOptions options = new WriteOptions.Builder().acceptPartial(true).build(); + + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(true); + } + + @Test + void optionsOverrideWriteUseV2Api() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeUseV2Api(false) + .build(); + + WriteOptions options = new WriteOptions.Builder().useV2Api(true).build(); + + Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(true); + } + + @Test + void optionsValidateUseV2ApiAndNoSync() { + ClientConfig config = configBuilder.build(); + + WriteOptions options = new WriteOptions.Builder() + .useV2Api(true) + .noSync(true) + .build(); + + Assertions.assertThatThrownBy(() -> options.validate(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid write options: NoSync cannot be used in V2 API"); + } + @Test void optionsOverridesDefaultTags() { Map defaultTagsBase = new HashMap<>() {{ @@ -303,6 +356,8 @@ void optionsHashCode() { .isNotEqualTo(builder.database("my-database").build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.defaultTags(defaultTags).build().hashCode()); + Assertions.assertThat(baseOptions.hashCode()) + .isNotEqualTo(builder.acceptPartial(true).build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.tagOrder(List.of("region", "host")).build().hashCode()); }