From b8f650b7622d5e776046ee0939f1da9933b36ed4 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 08:30:49 +0100 Subject: [PATCH 01/21] feat: support partial writes --- README.md | 18 ++++ .../influxdb/v3/client/InfluxDBClient.java | 3 + .../v3/client/config/ClientConfig.java | 40 ++++++++- .../client/internal/InfluxDBClientImpl.java | 21 +++-- .../v3/client/internal/RestClient.java | 90 ++++++++++++++++++- .../v3/client/write/WriteOptions.java | 81 +++++++++++++++-- .../v3/client/InfluxDBClientWriteTest.java | 80 ++++++++++++++--- .../v3/client/config/ClientConfigTest.java | 11 ++- .../v3/client/internal/RestClientTest.java | 23 +++-- .../v3/client/write/WriteOptionsTest.java | 26 +++++- 10 files changed, 358 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 7c7c6c1e..ae66ad49 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ import java.util.List; import java.util.stream.Stream; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.write.WriteOptions; @@ -113,6 +114,23 @@ client.writePoint( orderedTagWrite ); +// +// Write with partial acceptance +// +WriteOptions partialWrite = new WriteOptions.Builder() + .acceptPartial(true) + .build(); +try { + client.writeRecords(List.of( + "temperature,region=west value=20.0", + "temperature,region=west value=\"bad\"" + ), partialWrite); +} catch (InfluxDBPartialWriteException e) { + // Inspect failed line details. + e.lineErrors().forEach(line -> + System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine())); +} + // // Write by LineProtocol // diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index ec7208cd..e5efb64b 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -557,6 +557,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { *
  • precision - timestamp precision when writing data
  • *
  • gzipThreshold - payload size size for gzipping data
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • * * * @param connectionString connection string @@ -590,6 +591,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
  • INFLUX_PRECISION - timestamp precision when writing data
  • *
  • INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
  • *
  • INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
  • + *
  • INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes
  • * * Supported system properties: * * * @return instance of {@link InfluxDBClient} diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index 4326f3a5..df413fb3 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -57,6 +57,7 @@ *
  • defaultTags - defaultTags added when writing points to InfluxDB
  • *
  • gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • *
  • timeout - deprecated in 1.4.0 timeout when connecting to InfluxDB, * please use more informative properties writeTimeout and queryTimeout
  • *
  • writeTimeout - timeout when writing data to InfluxDB
  • @@ -107,6 +108,7 @@ public final class ClientConfig { private final WritePrecision writePrecision; private final Integer gzipThreshold; private final Boolean writeNoSync; + private final Boolean writeAcceptPartial; private final Map defaultTags; @Deprecated private final Duration timeout; @@ -208,6 +210,16 @@ public Boolean getWriteNoSync() { return writeNoSync; } + /** + * Accept partial writes? + * + * @return accept partial writes + */ + @Nonnull + public Boolean getWriteAcceptPartial() { + return writeAcceptPartial; + } + /** * Gets default tags used when writing points. * @return default tags @@ -370,6 +382,7 @@ public boolean equals(final Object o) { && writePrecision == that.writePrecision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(writeNoSync, that.writeNoSync) + && Objects.equals(writeAcceptPartial, that.writeAcceptPartial) && Objects.equals(defaultTags, that.defaultTags) && Objects.equals(timeout, that.timeout) && Objects.equals(writeTimeout, that.writeTimeout) @@ -388,7 +401,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, - database, writePrecision, gzipThreshold, writeNoSync, + database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors); @@ -403,6 +416,7 @@ public String toString() { .add("writePrecision=" + writePrecision) .add("gzipThreshold=" + gzipThreshold) .add("writeNoSync=" + writeNoSync) + .add("writeAcceptPartial=" + writeAcceptPartial) .add("timeout=" + timeout) .add("writeTimeout=" + writeTimeout) .add("queryTimeout=" + queryTimeout) @@ -432,6 +446,7 @@ public static final class Builder { private WritePrecision writePrecision; private Integer gzipThreshold; private Boolean writeNoSync; + private Boolean writeAcceptPartial; private Map defaultTags; @Deprecated private Duration timeout; @@ -554,6 +569,19 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) { return this; } + /** + * Sets whether to accept partial writes. + * + * @param writeAcceptPartial accept partial writes + * @return this + */ + @Nonnull + public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) { + + this.writeAcceptPartial = writeAcceptPartial; + return this; + } + /** * Sets default tags to be written with points. * @@ -800,6 +828,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform if (parameters.containsKey("writeNoSync")) { this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync"))); } + if (parameters.containsKey("writeAcceptPartial")) { + this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial"))); + } if (parameters.containsKey("disableGRPCCompression")) { this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression"))); } @@ -855,6 +886,10 @@ public ClientConfig build(@Nonnull final Map env, final Properti if (writeNoSync != null) { this.writeNoSync(Boolean.parseBoolean(writeNoSync)); } + final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial"); + if (writeAcceptPartial != null) { + this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial)); + } final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout"); if (writeTimeout != null) { long to = Long.parseLong(writeTimeout); @@ -911,6 +946,9 @@ private ClientConfig(@Nonnull final Builder builder) { writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION; gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD; writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC; + writeAcceptPartial = builder.writeAcceptPartial != null + ? builder.writeAcceptPartial + : WriteOptions.DEFAULT_ACCEPT_PARTIAL; defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); writeTimeout = builder.writeTimeout != null diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 8c2ce466..0e5e8dd0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -309,15 +309,22 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti String path; Map queryParams; boolean noSync = options.noSyncSafe(config); - if (noSync) { - // Setting no_sync=true is supported only in the v3 API. + boolean acceptPartial = options.acceptPartialSafe(config); + boolean useV3Write = noSync || acceptPartial; + if (useV3Write) { + // no_sync=true and accept_partial=true are supported only in the v3 API. path = "api/v3/write_lp"; queryParams = new HashMap<>() {{ put("org", config.getOrganization()); put("db", database); put("precision", WritePrecisionConverter.toV3ApiString(precision)); - put("no_sync", "true"); }}; + if (noSync) { + queryParams.put("no_sync", "true"); + } + if (acceptPartial) { + queryParams.put("accept_partial", "true"); + } } else { // By default, use the v2 API. path = "api/v2/write"; @@ -373,10 +380,12 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti try { restClient.request(path, HttpMethod.POST, body, queryParams, headers); } catch (InfluxDBApiHttpException e) { - if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { - // Server does not support the v3 write API, can't use the NoSync option. + if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { + // Server does not support the v3 write API, can't use v3-only write options. throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " - + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + + "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).", + e.headers(), + e.statusCode()); } throw e; } diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 6fb0e05b..4f7659d2 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -57,6 +57,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; final class RestClient implements AutoCloseable { @@ -219,7 +220,8 @@ HttpResponse request(@Nonnull final String path, if (statusCode < 200 || statusCode >= 300) { String reason; String body = response.body(); - reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null)); + String contentType = response.headers().firstValue("Content-Type").orElse(null); + reason = formatErrorMessage(body, contentType); if (reason == null) { reason = ""; @@ -241,6 +243,10 @@ HttpResponse request(@Nonnull final String path, } String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); + List lineErrors = parsePartialWriteLineErrors(body, contentType); + if (!lineErrors.isEmpty()) { + throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors); + } throw new InfluxDBApiHttpException(message, response.headers(), response.statusCode()); } @@ -306,6 +312,47 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St } } + @Nonnull + private List parsePartialWriteLineErrors( + @Nonnull final String body, + @Nullable final String contentType) { + + if (body.isEmpty()) { + return List.of(); + } + + if (contentType != null + && !contentType.isEmpty() + && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + return List.of(); + } + + try { + final JsonNode root = objectMapper.readTree(body); + if (!root.isObject()) { + return List.of(); + } + + final String error = errNonEmptyField(root, "error"); + final JsonNode dataNode = root.get("data"); + if (error == null || dataNode == null || !dataNode.isArray()) { + return List.of(); + } + + final List lineErrors = new ArrayList<>(); + for (JsonNode item : dataNode) { + InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(item); + if (lineError != null) { + lineErrors.add(lineError); + } + } + return lineErrors; + } catch (JsonProcessingException e) { + LOG.debug("Can't parse line errors from response body {}", body, e); + return List.of(); + } + } + @Nullable private String errNonEmptyText(@Nullable final JsonNode node) { if (node == null || node.isNull()) { @@ -344,6 +391,47 @@ private String errFormatDataArrayDetail(@Nullable final JsonNode item) { return errorMessage; } + @Nullable + private InfluxDBPartialWriteException.LineError errParseDataArrayLineError(@Nullable final JsonNode item) { + if (item == null || !item.isObject()) { + return null; + } + + final String errorMessage = errNonEmptyField(item, "error_message"); + if (errorMessage == null) { + return null; + } + + final Integer lineNumber = errParseLineNumber(item); + final String originalLine = errNonEmptyField(item, "original_line"); + + return new InfluxDBPartialWriteException.LineError(lineNumber, errorMessage, originalLine); + } + + @Nullable + private Integer errParseLineNumber(@Nonnull final JsonNode item) { + if (!item.hasNonNull("line_number")) { + return null; + } + + final JsonNode lineNumber = item.get("line_number"); + if (lineNumber.isIntegralNumber()) { + return lineNumber.intValue(); + } + if (lineNumber.isTextual()) { + final String value = lineNumber.asText(); + if (value.isEmpty()) { + return null; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return null; + } + } + return null; + } + private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { try { KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index e9b4ca1b..4761c134 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -68,6 +68,10 @@ public final class WriteOptions { * Default NoSync. */ public static final boolean DEFAULT_NO_SYNC = false; + /** + * Default AcceptPartial. + */ + public static final boolean DEFAULT_ACCEPT_PARTIAL = false; /** * Default timeout for writes in seconds. Set to {@value} @@ -81,12 +85,14 @@ public final class WriteOptions { @Deprecated(forRemoval = true) public static final WriteOptions DEFAULTS = new WriteOptions( - null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null, null); + null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL, + null, null, null); private final String database; private final WritePrecision precision; private final Integer gzipThreshold; private final Boolean noSync; + private final Boolean acceptPartial; private final Map defaultTags; private final List tagOrder; private final Map headers; @@ -99,6 +105,7 @@ public final class WriteOptions { */ public static WriteOptions defaultWriteOptions() { return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, + DEFAULT_ACCEPT_PARTIAL, null, null, null); } @@ -152,7 +159,7 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync) { - this(database, precision, gzipThreshold, noSync, null, null); + this(database, precision, gzipThreshold, noSync, null, null, null); } /** @@ -184,7 +191,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Integer gzipThreshold, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, null, defaultTags, headers); + this(database, precision, gzipThreshold, null, null, defaultTags, headers, null); } /** @@ -209,7 +216,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Boolean noSync, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, noSync, defaultTags, headers, null); + this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, null); } /** @@ -223,6 +230,8 @@ public WriteOptions(@Nullable final String database, * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. * @param noSync Skip waiting for WAL persistence on write. * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param acceptPartial Request partial write acceptance. + * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. * @param defaultTags Default tags to be added when writing points. * @param headers The headers to be added to write request. * The headers specified here are preferred over the headers @@ -234,6 +243,7 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync, + @Nullable final Boolean acceptPartial, @Nullable final Map defaultTags, @Nullable final Map headers, @Nullable final List tagOrder) { @@ -241,11 +251,40 @@ public WriteOptions(@Nullable final String database, this.precision = precision; this.gzipThreshold = gzipThreshold; this.noSync = noSync; + this.acceptPartial = acceptPartial; this.defaultTags = defaultTags == null ? Map.of() : defaultTags; this.tagOrder = sanitizeTagOrder(tagOrder); this.headers = headers == null ? Map.of() : headers; } + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + * @param tagOrder Preferred order of tags in line protocol serialization. + * Null or empty tag names are ignored. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Map defaultTags, + @Nullable final Map headers, + @Nullable final List tagOrder) { + this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, tagOrder); + } + /** * @param config with default value * @return The destination database for writes. @@ -303,6 +342,18 @@ public boolean noSyncSafe(@Nonnull final ClientConfig config) { : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC); } + /** + * @param config with default value + * @return Accept partial write. + */ + public boolean acceptPartialSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return acceptPartial != null ? acceptPartial + : (config.getWriteAcceptPartial() != null + ? config.getWriteAcceptPartial() + : DEFAULT_ACCEPT_PARTIAL); + } + /** * @return The headers to be added to write request. */ @@ -332,6 +383,7 @@ public boolean equals(final Object o) { && precision == that.precision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(noSync, that.noSync) + && Objects.equals(acceptPartial, that.acceptPartial) && defaultTags.equals(that.defaultTags) && tagOrder.equals(that.tagOrder) && headers.equals(that.headers); @@ -339,7 +391,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, tagOrder, headers); + return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, defaultTags, tagOrder, + headers); } private boolean isNotDefined(final String option) { @@ -368,6 +421,7 @@ public static final class Builder { private WritePrecision precision; private Integer gzipThreshold; private Boolean noSync; + private Boolean acceptPartial; private Map defaultTags = new HashMap<>(); private List tagOrder = List.of(); private Map headers = new HashMap<>(); @@ -424,6 +478,19 @@ public Builder noSync(@Nonnull final Boolean noSync) { return this; } + /** + * Sets whether to request partial write acceptance. + * + * @param acceptPartial request partial write acceptance + * @return this + */ + @Nonnull + public Builder acceptPartial(@Nonnull final Boolean acceptPartial) { + + this.acceptPartial = acceptPartial; + return this; + } + /** * Sets defaultTags. * @@ -473,7 +540,7 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, - builder.headers, builder.tagOrder); + this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial, + builder.defaultTags, builder.headers, builder.tagOrder); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index a7d31240..23569283 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -224,6 +224,23 @@ void writeNoSyncTrueUsesV3API() throws InterruptedException { assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); } + @Test + void writeAcceptPartialTrueUsesV3API() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.NS).acceptPartial(true).build()); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getUrl()).isNotNull(); + assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getUrl().queryParameter("no_sync")).isNull(); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); + } + @Test void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); @@ -242,7 +259,29 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true" + assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" + + " (supported by InfluxDB 3 Core/Enterprise servers only)."); + } + + @Test + void writeAcceptPartialTrueOnV2ServerThrowsException() throws InterruptedException { + mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); + + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, + () -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.MS).acceptPartial(true).build()) + ); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getUrl()).isNotNull(); + assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); + + assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); + assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" + " (supported by InfluxDB 3 Core/Enterprise servers only)."); } @@ -256,7 +295,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -272,7 +311,21 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + } + + @Test + void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writeAcceptPartial(true) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", false, true, false); } @Test @@ -285,7 +338,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -301,7 +354,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } @Test @@ -317,7 +370,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -336,7 +389,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } @Test @@ -352,7 +405,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -371,17 +424,19 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } private void checkWriteCalled(final String expectedPath, final String expectedDB, final String expectedPrecision, final boolean expectedNoSync, + final boolean expectedAcceptPartial, final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getUrl(); assertThat(requestUrl).isNotNull(); assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); - if (expectedNoSync) { + boolean expectedV3 = expectedNoSync || expectedAcceptPartial; + if (expectedV3) { assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); } else { assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); @@ -392,6 +447,11 @@ private void checkWriteCalled(final String expectedPath, final String expectedDB } else { assertThat(requestUrl.queryParameter("no_sync")).isNull(); } + if (expectedAcceptPartial) { + assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo("true"); + } else { + assertThat(requestUrl.queryParameter("accept_partial")).isNull(); + } if (expectedGzip) { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } else { diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index f5ce2e8f..56eaa020 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -79,6 +79,7 @@ void toStringConfig() { Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true); Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true); Assertions.assertThat(configString).contains("writeNoSync=false"); + Assertions.assertThat(configString).contains("writeAcceptPartial=false"); Assertions.assertThat(configString).contains("timeout=PT30S"); Assertions.assertThat(configString).contains("writeTimeout=PT35S"); Assertions.assertThat(configString).contains("queryTimeout=PT2M"); @@ -90,7 +91,8 @@ void toStringConfig() { void fromConnectionString() throws MalformedURLException { ClientConfig cfg = new ClientConfig.Builder() .build("http://localhost:9999/" - + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true"); + + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128" + + "&writeNoSync=true&writeAcceptPartial=true"); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray()); Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org"); @@ -98,6 +100,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -109,6 +112,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -208,6 +212,7 @@ void fromEnv() { "INFLUX_PRECISION", "ms", "INFLUX_GZIP_THRESHOLD", "64", "INFLUX_WRITE_NO_SYNC", "true", + "INFLUX_WRITE_ACCEPT_PARTIAL", "true", "INFLUX_DISABLE_GRPC_COMPRESSION", "true" ); @@ -220,6 +225,7 @@ void fromEnv() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } @@ -287,6 +293,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); // basic properties = new Properties(); @@ -324,6 +331,7 @@ void fromSystemProperties() { properties.put("influx.precision", "ms"); properties.put("influx.gzipThreshold", "64"); properties.put("influx.writeNoSync", "true"); + properties.put("influx.writeAcceptPartial", "true"); properties.put("influx.disableGRPCCompression", "true"); cfg = new ClientConfig.Builder() .build(new HashMap<>(), properties); @@ -334,6 +342,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index b6d4a26b..a724e647 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -51,6 +51,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; @@ -567,13 +568,21 @@ public void errorFromBodyV3WithDataArray() { .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," - + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isEqualTo(2); + Assertions.assertThat(lineError.errorMessage()) + .isEqualTo("invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float"); + Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702"); } @ParameterizedTest(name = "{0}") diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java index a1b7974c..a2432da8 100644 --- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java @@ -57,9 +57,10 @@ void optionsBasics() { @Test void optionsEqualAll() { - WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true); + WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true, true, null, null, null); WriteOptions optionsViaBuilder = new WriteOptions.Builder() - .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build(); + .database("my-database").precision(WritePrecision.S).gzipThreshold(512) + .noSync(true).acceptPartial(true).build(); Assertions.assertThat(options).isEqualTo(optionsViaBuilder); @@ -67,6 +68,9 @@ void optionsEqualAll() { .database("my-database").precision(WritePrecision.S).gzipThreshold(1024).noSync(true).build(); WriteOptions noSyncMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(false).build(); + WriteOptions acceptPartialMismatch = new WriteOptions.Builder() + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) + .acceptPartial(false).build(); WriteOptions defaultTagsMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) .defaultTags(Map.of("region", "west")).build(); @@ -79,6 +83,7 @@ void optionsEqualAll() { Assertions.assertThat(options).isNotEqualTo(gzipMismatch); Assertions.assertThat(options).isNotEqualTo(noSyncMismatch); + Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch); Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch); Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch); Assertions.assertThat(options).isNotEqualTo(headersMismatch); @@ -147,12 +152,14 @@ void optionsEmpty() { Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION); Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD); + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); Assertions.assertThat(options.tagOrderSafe()).isEmpty(); WriteOptions builderOptions = new WriteOptions.Builder().build(); Assertions.assertThat(builderOptions.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S); Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512); + Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); } @Test @@ -234,6 +241,19 @@ void optionsOverrideWriteNoSync() { Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false); } + @Test + void optionsOverrideWriteAcceptPartial() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeAcceptPartial(true) + .build(); + + WriteOptions options = new WriteOptions.Builder().acceptPartial(false).build(); + + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(false); + } + @Test void optionsOverridesDefaultTags() { Map defaultTagsBase = new HashMap<>() {{ @@ -303,6 +323,8 @@ void optionsHashCode() { .isNotEqualTo(builder.database("my-database").build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.defaultTags(defaultTags).build().hashCode()); + Assertions.assertThat(baseOptions.hashCode()) + .isNotEqualTo(builder.acceptPartial(true).build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.tagOrder(List.of("region", "host")).build().hashCode()); } From 87e86c253b3c26af8946309daa25af2cec03bf02 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 09:56:51 +0100 Subject: [PATCH 02/21] fix: harden error parser --- .../v3/client/internal/RestClient.java | 23 ++++-- .../v3/client/integration/E2ETest.java | 74 +++++++++++++++++++ .../v3/client/internal/RestClientTest.java | 44 ++++++++--- 3 files changed, 125 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 4f7659d2..1091065e 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -335,18 +335,27 @@ private List parsePartialWriteLineError final String error = errNonEmptyField(root, "error"); final JsonNode dataNode = root.get("data"); - if (error == null || dataNode == null || !dataNode.isArray()) { + if (error == null || dataNode == null) { return List.of(); } - final List lineErrors = new ArrayList<>(); - for (JsonNode item : dataNode) { - InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(item); - if (lineError != null) { - lineErrors.add(lineError); + if (dataNode.isArray()) { + final List lineErrors = new ArrayList<>(); + for (JsonNode item : dataNode) { + InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(item); + if (lineError != null) { + lineErrors.add(lineError); + } } + return lineErrors; + } + + if (dataNode.isObject()) { + InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(dataNode); + return lineError == null ? List.of() : List.of(lineError); } - return lineErrors; + + return List.of(); } catch (JsonProcessingException e) { LOG.debug("Can't parse line errors from response body {}", body, e); return List.of(); diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 6ec5ee72..1df052b0 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -41,7 +41,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; @@ -186,6 +188,78 @@ public void testQuery() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testAcceptPartialWriteError() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "temperature,room=room1 value=18.94647\n" + + "temperatureroom=room2value=20.268019\n" + + "temperature,room=room3 value=24.064857\n" + + "temperature,room=room4 value=43i"; + + WriteOptions options = new WriteOptions.Builder() + .acceptPartial(true) + .build(); + + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); + + String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: Expected at least one space character, got end of input (temperatureroom=room)\n" + + "\tline 4: invalid column type for column 'value', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer (temperature,room=roo)"; + Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage); + + InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialError.lineErrors()).hasSize(2); + Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); + Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) + .isEqualTo("Expected at least one space character, got end of input"); + Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) + .isEqualTo("temperatureroom=room"); + + Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(4); + Assertions.assertThat(partialError.lineErrors().get(1).errorMessage()) + .isEqualTo("invalid column type for column 'value', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer"); + Assertions.assertThat(partialError.lineErrors().get(1).originalLine()) + .isEqualTo("temperature,room=roo"); + + Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class); + Assertions.assertThat(partialError.statusCode()).isEqualTo(400); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testWriteErrorWithoutAcceptPartial() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "temperature,room=room1 value=18.94647\n" + + "temperatureroom=room2value=20.268019\n" + + "temperature,room=room3 value=24.064857\n" + + "temperature,room=room4 value=43i"; + + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points)); + Assertions.assertThat(thrown.getMessage()) + .isEqualTo("HTTP status code: 400; Message: write buffer error: " + + "line protocol parse failed: Expected at least one space character, got end of input"); + } + } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index a724e647..22551e62 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -547,11 +547,19 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) + Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .isInstanceOf(InfluxDBApiHttpException.class) .hasMessage("HTTP status code: 400; Message: invalid field value"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400); + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isNull(); + Assertions.assertThat(lineError.errorMessage()).isEqualTo("invalid field value"); + Assertions.assertThat(lineError.originalLine()).isNull(); } @Test @@ -669,6 +677,7 @@ private static Stream errorFromBodyV3WithDataArrayCases() { @MethodSource("errorFromBodyV3FallbackCases") public void errorFromBodyV3FallbackCase(final String testName, final String body, + final Class expectedClass, final String expectedMessage) { mockServer.enqueue(createResponse(400, @@ -680,11 +689,10 @@ public void errorFromBodyV3FallbackCase(final String testName, .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage(expectedMessage); + Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(expectedClass) + .hasMessage(expectedMessage); } private static Stream errorFromBodyV3FallbackCases() { @@ -692,6 +700,7 @@ private static Stream errorFromBodyV3FallbackCases() { Arguments.of( "missing error with data array falls back to body", "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}" ), @@ -699,6 +708,7 @@ private static Stream errorFromBodyV3FallbackCases() { "empty error with data array falls back to body", "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}" @@ -706,22 +716,38 @@ private static Stream errorFromBodyV3FallbackCases() { Arguments.of( "data object without error_message falls back to error", "{\"error\":\"parsing failed\",\"data\":{}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data object with empty error_message falls back to error", "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data string falls back to error", "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data number falls back to error", "{\"error\":\"parsing failed\",\"data\":123}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "partial-write with invalid data string falls back to error", + "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "partial-write with empty data object falls back to error", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" ) ); } From 24c38d3db85f906eb39b2453b3d8afc41a14d2a0 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 10:04:34 +0100 Subject: [PATCH 03/21] feat: add typed exception --- .../client/InfluxDBPartialWriteException.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java new file mode 100644 index 00000000..9e7a2494 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java @@ -0,0 +1,110 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.net.http.HttpHeaders; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * HTTP exception for partial write errors returned by InfluxDB 3 write endpoint. + * Contains parsed line-level write errors so callers can decide how to handle failed lines. + */ +public class InfluxDBPartialWriteException extends InfluxDBApiHttpException { + + private final List lineErrors; + + /** + * Construct a new InfluxDBPartialWriteException. + * + * @param message detail message + * @param headers response headers + * @param statusCode response status code + * @param lineErrors line-level errors parsed from response body + */ + public InfluxDBPartialWriteException( + @Nullable final String message, + @Nullable final HttpHeaders headers, + final int statusCode, + @Nonnull final List lineErrors) { + super(message, headers, statusCode); + this.lineErrors = List.copyOf(lineErrors); + } + + /** + * Line-level write errors. + * + * @return immutable list of line errors + */ + @Nonnull + public List lineErrors() { + return lineErrors; + } + + /** + * Represents one failed line from a partial write response. + */ + public static final class LineError { + + private final Integer lineNumber; + private final String errorMessage; + private final String originalLine; + + /** + * @param lineNumber line number in the write payload; may be null if not provided by server + * @param errorMessage line-level error message + * @param originalLine original line protocol row; may be null if not provided by server + */ + public LineError(@Nullable final Integer lineNumber, + @Nonnull final String errorMessage, + @Nullable final String originalLine) { + this.lineNumber = lineNumber; + this.errorMessage = errorMessage; + this.originalLine = originalLine; + } + + /** + * @return line number or null if server didn't provide it + */ + @Nullable + public Integer lineNumber() { + return lineNumber; + } + + /** + * @return line-level error message + */ + @Nonnull + public String errorMessage() { + return errorMessage; + } + + /** + * @return original line protocol row or null if server didn't provide it + */ + @Nullable + public String originalLine() { + return originalLine; + } + } +} From da10b1970ecaeb8e764a1defe83755ed12030565 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 10:07:48 +0100 Subject: [PATCH 04/21] docs: update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index df16256c..8e224172 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ 1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client. 1. [#363](https://github.com/InfluxCommunity/influxdb3-java/pull/363): Support custom tag order via `tagOrder` write option. See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. +1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Support partial writes via `acceptPartial` write option. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. ## 1.8.0 [2026-02-19] From 624c567d8f972b700523ec1c76a9c5da9676b051 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 10:12:58 +0100 Subject: [PATCH 05/21] test: more coverage --- .../v3/client/write/WriteOptions.java | 5 +--- .../v3/client/internal/RestClientTest.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index 4761c134..0e7df2af 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -348,10 +348,7 @@ public boolean noSyncSafe(@Nonnull final ClientConfig config) { */ public boolean acceptPartialSafe(@Nonnull final ClientConfig config) { Arguments.checkNotNull(config, "config"); - return acceptPartial != null ? acceptPartial - : (config.getWriteAcceptPartial() != null - ? config.getWriteAcceptPartial() - : DEFAULT_ACCEPT_PARTIAL); + return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial(); } /** diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 22551e62..25f0db8b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -669,6 +669,33 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + "\tline 2: bad line (bad lp)\n" + "\tsecond issue" + ), + Arguments.of( + "textual numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"2\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "textual non-numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline x: bad line (bad lp)" + ), + Arguments.of( + "empty textual line_number with empty original_line", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"only error message\",\"line_number\":\"\",\"original_line\":\"\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message" + ), + Arguments.of( + "non-textual line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline true: bad line (bad lp)" ) ); } From 1d7c70351d2b54c8eef6b181dc03b133f39d7e5d Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 10:43:48 +0100 Subject: [PATCH 06/21] test: more coverage --- src/main/java/com/influxdb/v3/client/internal/RestClient.java | 4 ++-- src/main/java/com/influxdb/v3/client/write/WriteOptions.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 1091065e..e84116eb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -401,8 +401,8 @@ private String errFormatDataArrayDetail(@Nullable final JsonNode item) { } @Nullable - private InfluxDBPartialWriteException.LineError errParseDataArrayLineError(@Nullable final JsonNode item) { - if (item == null || !item.isObject()) { + private InfluxDBPartialWriteException.LineError errParseDataArrayLineError(@Nonnull final JsonNode item) { + if (!item.isObject()) { return null; } diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index 0e7df2af..8fddccd6 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -338,8 +338,7 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) { */ public boolean noSyncSafe(@Nonnull final ClientConfig config) { Arguments.checkNotNull(config, "config"); - return noSync != null ? noSync - : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC); + return noSync != null ? noSync : config.getWriteNoSync(); } /** From 8c3a1cb0748aec1a3173d03baa7bc8f9d5a9f318 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 10:54:56 +0100 Subject: [PATCH 07/21] test: more coverage --- .../v3/client/config/ClientConfigTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index 56eaa020..f940afd7 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -56,6 +56,21 @@ void equalConfig() { Assertions.assertThat(config).isEqualTo(config); Assertions.assertThat(config).isEqualTo(configBuilder.build()); Assertions.assertThat(config).isNotEqualTo(configBuilder); + Assertions.assertThat(config).isNotEqualTo(new ClientConfig.Builder() + .host("http://localhost:9999") + .token("my-token".toCharArray()) + .organization("my-org") + .database("my-db") + .writePrecision(WritePrecision.NS) + .writeAcceptPartial(true) + .timeout(Duration.ofSeconds(30)) + .writeTimeout(Duration.ofSeconds(35)) + .queryTimeout(Duration.ofSeconds(120)) + .allowHttpRedirects(true) + .disableServerCertificateValidation(true) + .headers(Map.of("X-device", "ab-01")) + .disableGRPCCompression(true) + .build()); Assertions.assertThat(config).isNotEqualTo(configBuilder.database("database").build()); } From 47984cf26632e8249ba1dde9338fa2ffc4cd6a51 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 13:00:06 +0100 Subject: [PATCH 08/21] fix: strict partial error prasing with simple fallback --- .../v3/client/internal/RestClient.java | 132 +++++++++++------- .../v3/client/internal/RestClientTest.java | 47 ++++++- 2 files changed, 121 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index e84116eb..47c6ad82 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -46,6 +46,7 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -243,7 +244,8 @@ HttpResponse request(@Nonnull final String path, } String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); - List lineErrors = parsePartialWriteLineErrors(body, contentType); + List lineErrors = + parsePartialWriteLineErrors(path, body, contentType); if (!lineErrors.isEmpty()) { throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors); } @@ -284,11 +286,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St if (error != null && dataNode != null && dataNode.isArray()) { final StringBuilder message = new StringBuilder(error); boolean hasDetails = false; - for (JsonNode item : dataNode) { - final String detail = errFormatDataArrayDetail(item); - if (detail == null) { - continue; - } + for (String detail : errFormatDataArrayDetails(dataNode)) { if (!hasDetails) { message.append(':'); hasDetails = true; @@ -314,8 +312,12 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St @Nonnull private List parsePartialWriteLineErrors( + @Nonnull final String path, @Nonnull final String body, @Nullable final String contentType) { + if (!isWriteEndpoint(path)) { + return List.of(); + } if (body.isEmpty()) { return List.of(); @@ -340,9 +342,14 @@ private List parsePartialWriteLineError } if (dataNode.isArray()) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed == null) { + return List.of(); + } + final List lineErrors = new ArrayList<>(); - for (JsonNode item : dataNode) { - InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(item); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); if (lineError != null) { lineErrors.add(lineError); } @@ -351,8 +358,13 @@ private List parsePartialWriteLineError } if (dataNode.isObject()) { - InfluxDBPartialWriteException.LineError lineError = errParseDataArrayLineError(dataNode); - return lineError == null ? List.of() : List.of(lineError); + try { + final ErrDataArrayItem item = objectMapper.treeToValue(dataNode, ErrDataArrayItem.class); + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + return lineError == null ? List.of() : List.of(lineError); + } catch (JsonProcessingException e) { + return List.of(); + } } return List.of(); @@ -362,12 +374,25 @@ private List parsePartialWriteLineError } } + private boolean isWriteEndpoint(@Nonnull final String path) { + return "api/v2/write".equals(path) || "api/v3/write_lp".equals(path); + } + @Nullable private String errNonEmptyText(@Nullable final JsonNode node) { if (node == null || node.isNull()) { return null; } - final String value = node.asText(); + + final String value; + if (node.isTextual()) { + value = node.asText(); + } else if (node.isNumber() || node.isBoolean()) { + value = node.asText(); + } else { + value = node.toString(); + } + return value.isEmpty() ? null : value; } @@ -380,65 +405,66 @@ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final } @Nullable - private String errFormatDataArrayDetail(@Nullable final JsonNode item) { - if (!item.isObject()) { - return null; - } + private List errFormatDataArrayDetails(@Nonnull final JsonNode dataNode) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed != null) { + final List details = new ArrayList<>(); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + if (lineError == null) { + continue; + } - final String errorMessage = errNonEmptyField(item, "error_message"); - if (errorMessage == null) { - return null; + if (lineError.lineNumber() != null + && lineError.originalLine() != null + && !lineError.originalLine().isEmpty()) { + details.add("line " + lineError.lineNumber() + ": " + + lineError.errorMessage() + " (" + lineError.originalLine() + ")"); + } else { + details.add(lineError.errorMessage()); + } + } + return details; } - if (item.hasNonNull("line_number")) { - final String originalLine = errNonEmptyField(item, "original_line"); - if (originalLine != null) { - final String lineNumber = item.get("line_number").asText(); - return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")"; + final List details = new ArrayList<>(); + for (JsonNode item : dataNode) { + final String raw = errNonEmptyText(item); + if (raw != null) { + details.add(raw); } } - return errorMessage; + return details; } @Nullable - private InfluxDBPartialWriteException.LineError errParseDataArrayLineError(@Nonnull final JsonNode item) { - if (!item.isObject()) { + private ErrDataArrayItem[] errReadDataArray(@Nonnull final JsonNode dataNode) { + try { + return objectMapper.treeToValue(dataNode, ErrDataArrayItem[].class); + } catch (JsonProcessingException e) { return null; } + } - final String errorMessage = errNonEmptyField(item, "error_message"); - if (errorMessage == null) { + @Nullable + private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final ErrDataArrayItem item) { + if (item == null || item.errorMessage == null || item.errorMessage.isEmpty()) { return null; } - final Integer lineNumber = errParseLineNumber(item); - final String originalLine = errNonEmptyField(item, "original_line"); - - return new InfluxDBPartialWriteException.LineError(lineNumber, errorMessage, originalLine); + final String originalLine = (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine; + return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine); } - @Nullable - private Integer errParseLineNumber(@Nonnull final JsonNode item) { - if (!item.hasNonNull("line_number")) { - return null; - } + private static final class ErrDataArrayItem { + @JsonProperty("error_message") + private String errorMessage; - final JsonNode lineNumber = item.get("line_number"); - if (lineNumber.isIntegralNumber()) { - return lineNumber.intValue(); - } - if (lineNumber.isTextual()) { - final String value = lineNumber.asText(); - if (value.isEmpty()) { - return null; - } - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - return null; - } - } - return null; + @JsonProperty("line_number") + private Integer lineNumber; + + @JsonProperty("original_line") + private String originalLine; } private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 25f0db8b..c0370640 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -547,7 +547,7 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format .host(baseURL) .build()); - Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); Assertions.assertThat(thrown) .isInstanceOf(InfluxDBPartialWriteException.class) .isInstanceOf(InfluxDBApiHttpException.class) @@ -576,7 +576,7 @@ public void errorFromBodyV3WithDataArray() { .host(baseURL) .build()); - Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); Assertions.assertThat(thrown) .isInstanceOf(InfluxDBPartialWriteException.class) .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" @@ -593,6 +593,28 @@ public void errorFromBodyV3WithDataArray() { Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702"); } + @Test + public void errorFromBodyV3WithDataArrayAnyInvalidItemFallsBackToHttpException() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}," + + "{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}]}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBApiHttpException.class) + .isNotInstanceOf(InfluxDBPartialWriteException.class) + .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" + + "\t{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}"); + } + @ParameterizedTest(name = "{0}") @MethodSource("errorFromBodyV3WithDataArrayCases") public void errorFromBodyV3WithDataArrayCase(final String testName, @@ -647,7 +669,8 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":" + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: bad line (bad lp)" + + "\t1\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}" ), Arguments.of( "null error_message skipped", @@ -670,6 +693,13 @@ private static Stream errorFromBodyV3WithDataArrayCases() { + "\tline 2: bad line (bad lp)\n" + "\tsecond issue" ), + Arguments.of( + "array of strings fallback", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[\"bad line 1\",\"bad line 2\"]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tbad line 1\n" + + "\tbad line 2" + ), Arguments.of( "textual numeric line_number", "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" @@ -682,7 +712,7 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}", "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline x: bad line (bad lp)" + + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}" ), Arguments.of( "empty textual line_number with empty original_line", @@ -695,7 +725,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}", "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline true: bad line (bad lp)" + + "\t{\"error_message\":\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}" + ), + Arguments.of( + "object line_number preserved as text", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}" ) ); } From ccde72901517c9b105eea9348f41197e82f0de1a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 13:05:38 +0100 Subject: [PATCH 09/21] fix: line length linter complaint --- src/main/java/com/influxdb/v3/client/internal/RestClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 47c6ad82..943b954b 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -452,7 +452,8 @@ private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final E return null; } - final String originalLine = (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine; + final String originalLine = + (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine; return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine); } From 266c8e8d3c6a5ac0a5c7dee6708d6078a7a0d8a4 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 13:12:22 +0100 Subject: [PATCH 10/21] test: more coverage --- .../v3/client/internal/RestClientTest.java | 66 ++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index c0370640..fb481423 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -740,12 +740,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() { @ParameterizedTest(name = "{0}") @MethodSource("errorFromBodyV3FallbackCases") public void errorFromBodyV3FallbackCase(final String testName, + final String requestPath, + final String contentType, final String body, final Class expectedClass, final String expectedMessage) { mockServer.enqueue(createResponse(400, - "application/json", + contentType, null, body)); @@ -753,7 +755,7 @@ public void errorFromBodyV3FallbackCase(final String testName, .host(baseURL) .build()); - Throwable thrown = catchThrowable(() -> restClient.request("ping", HttpMethod.GET, null, null, null)); + Throwable thrown = catchThrowable(() -> restClient.request(requestPath, HttpMethod.GET, null, null, null)); Assertions.assertThat(thrown) .isInstanceOf(expectedClass) .hasMessage(expectedMessage); @@ -763,6 +765,8 @@ private static Stream errorFromBodyV3FallbackCases() { return Stream.of( Arguments.of( "missing error with data array falls back to body", + "ping", + "application/json", "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " @@ -770,6 +774,8 @@ private static Stream errorFromBodyV3FallbackCases() { ), Arguments.of( "empty error with data array falls back to body", + "ping", + "application/json", "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}", InfluxDBApiHttpException.class, @@ -779,39 +785,95 @@ private static Stream errorFromBodyV3FallbackCases() { ), Arguments.of( "data object without error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{}}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data object with empty error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data string falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data number falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":123}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "partial-write with invalid data string falls back to error", + "ping", + "application/json", "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: partial write of line protocol occurred" ), Arguments.of( "partial-write with empty data object falls back to error", + "ping", + "application/json", "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}", InfluxDBApiHttpException.class, "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint ignores line-error parsing for non-json content type", + "api/v3/write_lp", + "text/plain", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: " + + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}" + ), + Arguments.of( + "write endpoint with non-object root falls back to body", + "api/v3/write_lp", + "application/json", + "[]", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: []" + ), + Arguments.of( + "write endpoint with invalid line-error object type falls back to http exception", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\"," + + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: bad line" + ), + Arguments.of( + "write endpoint with scalar data falls back to error", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":123}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint invalid json body falls back to raw body", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\"", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: {\"error\":\"partial write of line protocol occurred\"" ) ); } From 7f4aa1f06e059132af4fdb4ffd9e5d539eb2737d Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 10 Mar 2026 13:29:55 +0100 Subject: [PATCH 11/21] fix: annotation --- src/main/java/com/influxdb/v3/client/internal/RestClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 943b954b..a9afbfa3 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -404,7 +404,7 @@ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final return errNonEmptyText(object.get(fieldName)); } - @Nullable + @Nonnull private List errFormatDataArrayDetails(@Nonnull final JsonNode dataNode) { final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); if (parsed != null) { From f946fdb1037df01c6a080beac069bd9427afd65b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 14:44:44 +0200 Subject: [PATCH 12/21] feat: default to v3 write endpoint and add UseV2Api compatibility mode --- README.md | 14 +- .../influxdb/v3/client/InfluxDBClient.java | 3 + .../v3/client/config/ClientConfig.java | 40 +++- .../client/internal/InfluxDBClientImpl.java | 32 ++- .../v3/client/internal/RestClient.java | 19 +- .../v3/client/write/WriteOptions.java | 92 ++++++++- .../v3/client/InfluxDBClientWriteTest.java | 194 +++++++++--------- .../v3/client/config/ClientConfigTest.java | 15 +- .../v3/client/integration/E2ETest.java | 36 +++- .../v3/client/internal/RestClientTest.java | 2 +- .../v3/client/write/WriteOptionsTest.java | 39 +++- 11 files changed, 347 insertions(+), 139 deletions(-) diff --git a/README.md b/README.md index ae66ad49..542528c9 100644 --- a/README.md +++ b/README.md @@ -115,11 +115,9 @@ client.writePoint( ); // -// Write with partial acceptance +// Write with partial acceptance (default behavior) // -WriteOptions partialWrite = new WriteOptions.Builder() - .acceptPartial(true) - .build(); +WriteOptions partialWrite = new WriteOptions.Builder().build(); try { client.writeRecords(List.of( "temperature,region=west value=20.0", @@ -131,6 +129,14 @@ try { System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine())); } +// +// Write via v2 compatibility endpoint (InfluxDB Clustered) +// +WriteOptions useV2 = new WriteOptions.Builder() + .useV2Api(true) + .build(); +client.writeRecord("temperature,location=north value=60.0", useV2); + // // Write by LineProtocol // diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index e5efb64b..181b914c 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -558,6 +558,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { *
  • gzipThreshold - payload size size for gzipping data
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • *
  • writeAcceptPartial - accept partial writes
  • + *
  • writeUseV2Api - use v2 compatibility write endpoint
  • * * * @param connectionString connection string @@ -592,6 +593,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
  • INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
  • *
  • INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
  • *
  • INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes
  • + *
  • INFLUX_WRITE_USE_V2_API - use v2 compatibility write endpoint
  • * * Supported system properties: *
      @@ -604,6 +606,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
    • influx.gzipThreshold - payload size size for gzipping data
    • *
    • influx.writeNoSync - skip waiting for WAL persistence on write
    • *
    • influx.writeAcceptPartial - accept partial writes
    • + *
    • influx.writeUseV2Api - use v2 compatibility write endpoint
    • *
    * * @return instance of {@link InfluxDBClient} diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index df413fb3..11021d6d 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -58,6 +58,7 @@ *
  • gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • *
  • writeAcceptPartial - accept partial writes
  • + *
  • writeUseV2Api - use v2 compatibility write endpoint
  • *
  • timeout - deprecated in 1.4.0 timeout when connecting to InfluxDB, * please use more informative properties writeTimeout and queryTimeout
  • *
  • writeTimeout - timeout when writing data to InfluxDB
  • @@ -109,6 +110,7 @@ public final class ClientConfig { private final Integer gzipThreshold; private final Boolean writeNoSync; private final Boolean writeAcceptPartial; + private final Boolean writeUseV2Api; private final Map defaultTags; @Deprecated private final Duration timeout; @@ -220,6 +222,16 @@ public Boolean getWriteAcceptPartial() { return writeAcceptPartial; } + /** + * Use v2 compatibility write endpoint? + * + * @return use v2 compatibility write endpoint + */ + @Nonnull + public Boolean getWriteUseV2Api() { + return writeUseV2Api; + } + /** * Gets default tags used when writing points. * @return default tags @@ -383,6 +395,7 @@ public boolean equals(final Object o) { && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(writeNoSync, that.writeNoSync) && Objects.equals(writeAcceptPartial, that.writeAcceptPartial) + && Objects.equals(writeUseV2Api, that.writeUseV2Api) && Objects.equals(defaultTags, that.defaultTags) && Objects.equals(timeout, that.timeout) && Objects.equals(writeTimeout, that.writeTimeout) @@ -401,7 +414,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, - database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, + database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, writeUseV2Api, timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors); @@ -417,6 +430,7 @@ public String toString() { .add("gzipThreshold=" + gzipThreshold) .add("writeNoSync=" + writeNoSync) .add("writeAcceptPartial=" + writeAcceptPartial) + .add("writeUseV2Api=" + writeUseV2Api) .add("timeout=" + timeout) .add("writeTimeout=" + writeTimeout) .add("queryTimeout=" + queryTimeout) @@ -447,6 +461,7 @@ public static final class Builder { private Integer gzipThreshold; private Boolean writeNoSync; private Boolean writeAcceptPartial; + private Boolean writeUseV2Api; private Map defaultTags; @Deprecated private Duration timeout; @@ -582,6 +597,19 @@ public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) { return this; } + /** + * Sets whether to use v2 compatibility write endpoint. + * + * @param writeUseV2Api use v2 compatibility write endpoint + * @return this + */ + @Nonnull + public Builder writeUseV2Api(@Nullable final Boolean writeUseV2Api) { + + this.writeUseV2Api = writeUseV2Api; + return this; + } + /** * Sets default tags to be written with points. * @@ -831,6 +859,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform if (parameters.containsKey("writeAcceptPartial")) { this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial"))); } + if (parameters.containsKey("writeUseV2Api")) { + this.writeUseV2Api(Boolean.parseBoolean(parameters.get("writeUseV2Api"))); + } if (parameters.containsKey("disableGRPCCompression")) { this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression"))); } @@ -890,6 +921,10 @@ public ClientConfig build(@Nonnull final Map env, final Properti if (writeAcceptPartial != null) { this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial)); } + final String writeUseV2Api = get.apply("INFLUX_WRITE_USE_V2_API", "influx.writeUseV2Api"); + if (writeUseV2Api != null) { + this.writeUseV2Api(Boolean.parseBoolean(writeUseV2Api)); + } final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout"); if (writeTimeout != null) { long to = Long.parseLong(writeTimeout); @@ -949,6 +984,9 @@ private ClientConfig(@Nonnull final Builder builder) { writeAcceptPartial = builder.writeAcceptPartial != null ? builder.writeAcceptPartial : WriteOptions.DEFAULT_ACCEPT_PARTIAL; + writeUseV2Api = builder.writeUseV2Api != null + ? builder.writeUseV2Api + : WriteOptions.DEFAULT_USE_V2_API; defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); writeTimeout = builder.writeTimeout != null diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 0e5e8dd0..3dc05155 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -305,14 +305,21 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } WritePrecision precision = options.precisionSafe(config); + options.validate(config); String path; Map queryParams; boolean noSync = options.noSyncSafe(config); boolean acceptPartial = options.acceptPartialSafe(config); - boolean useV3Write = noSync || acceptPartial; - if (useV3Write) { - // no_sync=true and accept_partial=true are supported only in the v3 API. + boolean useV2Api = options.useV2ApiSafe(config); + if (useV2Api) { + path = "api/v2/write"; + queryParams = new HashMap<>() {{ + put("org", config.getOrganization()); + put("bucket", database); + put("precision", WritePrecisionConverter.toV2ApiString(precision)); + }}; + } else { path = "api/v3/write_lp"; queryParams = new HashMap<>() {{ put("org", config.getOrganization()); @@ -322,17 +329,9 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti if (noSync) { queryParams.put("no_sync", "true"); } - if (acceptPartial) { - queryParams.put("accept_partial", "true"); + if (!acceptPartial) { + queryParams.put("accept_partial", "false"); } - } else { - // By default, use the v2 API. - path = "api/v2/write"; - queryParams = new HashMap<>() {{ - put("org", config.getOrganization()); - put("bucket", database); - put("precision", WritePrecisionConverter.toV2ApiString(precision)); - }}; } Map defaultTags = options.defaultTagsSafe(config); @@ -380,10 +379,9 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti try { restClient.request(path, HttpMethod.POST, body, queryParams, headers); } catch (InfluxDBApiHttpException e) { - if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { - // Server does not support the v3 write API, can't use v3-only write options. - throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " - + "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).", + if (!useV2Api && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { + throw new InfluxDBApiHttpException("Server doesn't support v3 write API. " + + "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint.", e.headers(), e.statusCode()); } diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index a9afbfa3..32906e2a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -36,6 +36,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; @@ -245,7 +246,7 @@ HttpResponse request(@Nonnull final String path, String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); List lineErrors = - parsePartialWriteLineErrors(path, body, contentType); + parsePartialWriteLineErrors(body, contentType); if (!lineErrors.isEmpty()) { throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors); } @@ -312,13 +313,8 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St @Nonnull private List parsePartialWriteLineErrors( - @Nonnull final String path, @Nonnull final String body, @Nullable final String contentType) { - if (!isWriteEndpoint(path)) { - return List.of(); - } - if (body.isEmpty()) { return List.of(); } @@ -337,7 +333,7 @@ private List parsePartialWriteLineError final String error = errNonEmptyField(root, "error"); final JsonNode dataNode = root.get("data"); - if (error == null || dataNode == null) { + if (!isV3PartialWriteError(error) || dataNode == null) { return List.of(); } @@ -374,8 +370,13 @@ private List parsePartialWriteLineError } } - private boolean isWriteEndpoint(@Nonnull final String path) { - return "api/v2/write".equals(path) || "api/v3/write_lp".equals(path); + private boolean isV3PartialWriteError(@Nullable final String errorMessage) { + if (errorMessage == null || errorMessage.isEmpty()) { + return false; + } + String normalized = errorMessage.toLowerCase(Locale.ROOT); + return normalized.contains("partial write of line protocol occurred") + || normalized.contains("parsing failed for write_lp endpoint"); } @Nullable diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index 8fddccd6..00618ec1 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -43,6 +43,9 @@ *
  • precision - specifies the precision to use for the timestamp of points
  • *
  • defaultTags - specifies tags to be added by default to all write operations using points.
  • *
  • tagOrder - specifies preferred tag order for point serialization.
  • + *
  • noSync - skip waiting for WAL persistence on write
  • + *
  • acceptPartial - accept partial writes
  • + *
  • useV2Api - use v2 compatibility write endpoint
  • *
  • headers - specifies the headers to be added to write request
  • * *

    @@ -71,7 +74,11 @@ public final class WriteOptions { /** * Default AcceptPartial. */ - public static final boolean DEFAULT_ACCEPT_PARTIAL = false; + public static final boolean DEFAULT_ACCEPT_PARTIAL = true; + /** + * Default UseV2Api. + */ + public static final boolean DEFAULT_USE_V2_API = false; /** * Default timeout for writes in seconds. Set to {@value} @@ -86,13 +93,14 @@ public final class WriteOptions { @Deprecated(forRemoval = true) public static final WriteOptions DEFAULTS = new WriteOptions( null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL, - null, null, null); + DEFAULT_USE_V2_API, null, null, null); private final String database; private final WritePrecision precision; private final Integer gzipThreshold; private final Boolean noSync; private final Boolean acceptPartial; + private final Boolean useV2Api; private final Map defaultTags; private final List tagOrder; private final Map headers; @@ -105,7 +113,7 @@ public final class WriteOptions { */ public static WriteOptions defaultWriteOptions() { return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, - DEFAULT_ACCEPT_PARTIAL, + DEFAULT_ACCEPT_PARTIAL, DEFAULT_USE_V2_API, null, null, null); } @@ -216,7 +224,38 @@ public WriteOptions(@Nullable final String database, @Nullable final Boolean noSync, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, null); + this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, null); + } + + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param acceptPartial Request partial write acceptance. + * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + * @param tagOrder Preferred order of tags in line protocol serialization. + * Null or empty tag names are ignored. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Boolean acceptPartial, + @Nullable final Map defaultTags, + @Nullable final Map headers, + @Nullable final List tagOrder) { + this(database, precision, gzipThreshold, noSync, acceptPartial, null, defaultTags, headers, tagOrder); } /** @@ -232,6 +271,8 @@ public WriteOptions(@Nullable final String database, * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. * @param acceptPartial Request partial write acceptance. * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. + * @param useV2Api Use v2 compatibility write endpoint. + * If it is not specified then use {@link WriteOptions#DEFAULT_USE_V2_API}. * @param defaultTags Default tags to be added when writing points. * @param headers The headers to be added to write request. * The headers specified here are preferred over the headers @@ -244,6 +285,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync, @Nullable final Boolean acceptPartial, + @Nullable final Boolean useV2Api, @Nullable final Map defaultTags, @Nullable final Map headers, @Nullable final List tagOrder) { @@ -252,6 +294,7 @@ public WriteOptions(@Nullable final String database, this.gzipThreshold = gzipThreshold; this.noSync = noSync; this.acceptPartial = acceptPartial; + this.useV2Api = useV2Api; this.defaultTags = defaultTags == null ? Map.of() : defaultTags; this.tagOrder = sanitizeTagOrder(tagOrder); this.headers = headers == null ? Map.of() : headers; @@ -282,7 +325,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Map defaultTags, @Nullable final Map headers, @Nullable final List tagOrder) { - this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, tagOrder); + this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, tagOrder); } /** @@ -350,6 +393,27 @@ public boolean acceptPartialSafe(@Nonnull final ClientConfig config) { return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial(); } + /** + * @param config with default value + * @return Use v2 compatibility write endpoint. + */ + public boolean useV2ApiSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return useV2Api != null ? useV2Api : config.getWriteUseV2Api(); + } + + /** + * Validate write option combinations. + * + * @param config with default values + */ + public void validate(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + if (useV2ApiSafe(config) && noSyncSafe(config)) { + throw new IllegalArgumentException("invalid write options: NoSync cannot be used in V2 API"); + } + } + /** * @return The headers to be added to write request. */ @@ -380,6 +444,7 @@ public boolean equals(final Object o) { && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(noSync, that.noSync) && Objects.equals(acceptPartial, that.acceptPartial) + && Objects.equals(useV2Api, that.useV2Api) && defaultTags.equals(that.defaultTags) && tagOrder.equals(that.tagOrder) && headers.equals(that.headers); @@ -387,7 +452,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, defaultTags, tagOrder, + return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, useV2Api, defaultTags, tagOrder, headers); } @@ -418,6 +483,7 @@ public static final class Builder { private Integer gzipThreshold; private Boolean noSync; private Boolean acceptPartial; + private Boolean useV2Api; private Map defaultTags = new HashMap<>(); private List tagOrder = List.of(); private Map headers = new HashMap<>(); @@ -487,6 +553,19 @@ public Builder acceptPartial(@Nonnull final Boolean acceptPartial) { return this; } + /** + * Sets whether to use v2 compatibility write endpoint. + * + * @param useV2Api use v2 compatibility write endpoint + * @return this + */ + @Nonnull + public Builder useV2Api(@Nonnull final Boolean useV2Api) { + + this.useV2Api = useV2Api; + return this; + } + /** * Sets defaultTags. * @@ -537,6 +616,7 @@ public WriteOptions build() { private WriteOptions(@Nonnull final Builder builder) { this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial, + builder.useV2Api, builder.defaultTags, builder.headers, builder.tagOrder); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 23569283..c46896ba 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -27,7 +27,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import io.netty.handler.codec.http.HttpResponseStatus; import mockwebserver3.RecordedRequest; @@ -36,6 +39,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; @@ -106,7 +112,7 @@ void databaseParameter() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database"); } @Test @@ -119,7 +125,7 @@ void databaseParameterSpecified() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database-2"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database-2"); } @Test @@ -147,7 +153,7 @@ void precisionParameter() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); } @Test @@ -160,7 +166,7 @@ void precisionParameterSpecified() throws InterruptedException { RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second"); } @Test @@ -191,63 +197,64 @@ void gzipParameterSpecified() throws InterruptedException { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } - @Test - void writeNoSyncFalseUsesV2API() throws InterruptedException { - mockServer.enqueue(createResponse(200)); - - client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build()); - - assertThat(mockServer.getRequestCount()).isEqualTo(1); - RecordedRequest request = mockServer.takeRequest(); - assertThat(request).isNotNull(); - assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v2/write"); - assertThat(request.getUrl().queryParameter("no_sync")).isNull(); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns"); - - } - - @Test - void writeNoSyncTrueUsesV3API() throws InterruptedException { + @ParameterizedTest(name = "{0}") + @MethodSource("writeRoutingCases") + void writeRoutingCase(final String name, + final Consumer configure, + final String expectedPath, + final String expectedDbParamName, + final String expectedPrecision, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial) throws InterruptedException { mockServer.enqueue(createResponse(200)); + WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.NS); + configure.accept(optionsBuilder); client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build()); + optionsBuilder.build()); assertThat(mockServer.getRequestCount()).isEqualTo(1); RecordedRequest request = mockServer.takeRequest(); assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); - } - - @Test - void writeAcceptPartialTrueUsesV3API() throws InterruptedException { - mockServer.enqueue(createResponse(200)); - - client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.NS).acceptPartial(true).build()); - - assertThat(mockServer.getRequestCount()).isEqualTo(1); - RecordedRequest request = mockServer.takeRequest(); - assertThat(request).isNotNull(); - assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("no_sync")).isNull(); - assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); + assertThat(request.getUrl().encodedPath()).isEqualTo(expectedPath); + assertThat(request.getUrl().queryParameter(expectedDbParamName)).isEqualTo("my-database"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo(expectedPrecision); + assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); + } + + private static Stream writeRoutingCases() { + return Stream.of( + Arguments.of("v3 noSync=false", (Consumer) b -> b.noSync(false), + "/api/v3/write_lp", "db", "nanosecond", null, null), + Arguments.of("v3 noSync=true", (Consumer) b -> b.noSync(true), + "/api/v3/write_lp", "db", "nanosecond", "true", null), + Arguments.of("v3 acceptPartial=true", (Consumer) b -> b.acceptPartial(true), + "/api/v3/write_lp", "db", "nanosecond", null, null), + Arguments.of("v3 acceptPartial=false", (Consumer) b -> b.acceptPartial(false), + "/api/v3/write_lp", "db", "nanosecond", null, "false"), + Arguments.of("v2 useV2Api=true", (Consumer) b -> b.useV2Api(true), + "/api/v2/write", "bucket", "ns", null, null), + Arguments.of("v2 useV2Api=true, acceptPartial=false", + (Consumer) b -> b.useV2Api(true).acceptPartial(false), + "/api/v2/write", "bucket", "ns", null, null) + ); } - @Test - void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { + @ParameterizedTest(name = "{0}") + @MethodSource("writeV3MethodNotAllowedCases") + void writeV3MethodNotAllowedMappedError(final String name, + final Consumer configure, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial) throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); + WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.MS); + configure.accept(optionsBuilder); + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, - () -> client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build()) + () -> client.writeRecord("mem,tag=one value=1.0", optionsBuilder.build()) ); assertThat(mockServer.getRequestCount()).isEqualTo(1); @@ -255,34 +262,31 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { assertThat(request).isNotNull(); assertThat(request.getUrl()).isNotNull(); assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" - + " (supported by InfluxDB 3 Core/Enterprise servers only)."); + assertThat(ae.getMessage()).contains("Server doesn't support v3 write API. " + + "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint."); } - @Test - void writeAcceptPartialTrueOnV2ServerThrowsException() throws InterruptedException { - mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); - - InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, - () -> client.writeRecord("mem,tag=one value=1.0", - new WriteOptions.Builder().precision(WritePrecision.MS).acceptPartial(true).build()) + private static Stream writeV3MethodNotAllowedCases() { + return Stream.of( + Arguments.of("noSync=true", (Consumer) b -> b.noSync(true), "true", null), + Arguments.of("acceptPartial=true", (Consumer) b -> b.acceptPartial(true), null, null) ); + } - assertThat(mockServer.getRequestCount()).isEqualTo(1); - RecordedRequest request = mockServer.takeRequest(); - assertThat(request).isNotNull(); - assertThat(request.getUrl()).isNotNull(); - assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); - assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); + @Test + void writeUseV2ApiNoSyncValidation() { + Throwable thrown = catchThrowable(() -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().useV2Api(true).noSync(true).build())); - assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" - + " (supported by InfluxDB 3 Core/Enterprise servers only)."); + Assertions.assertThat(thrown) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid write options: NoSync cannot be used in V2 API"); + assertThat(mockServer.getRequestCount()).isEqualTo(0); } @Test @@ -295,7 +299,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -311,7 +315,7 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } @Test @@ -325,7 +329,21 @@ void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", false, true, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); + } + + @Test + void writeRecordWithDefaultWriteOptionsUseV2Config() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writeUseV2Api(true) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v2/write", "DB", "ns", false, null, null, false); } @Test @@ -338,7 +356,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -354,7 +372,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } @Test @@ -370,7 +388,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -389,7 +407,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } @Test @@ -405,7 +423,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false); } @Test @@ -424,34 +442,26 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true); } private void checkWriteCalled(final String expectedPath, final String expectedDB, - final String expectedPrecision, final boolean expectedNoSync, - final boolean expectedAcceptPartial, + final String expectedPrecision, final boolean expectedV3, + @Nullable final String expectedNoSync, + @Nullable final String expectedAcceptPartial, final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getUrl(); assertThat(requestUrl).isNotNull(); assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); - boolean expectedV3 = expectedNoSync || expectedAcceptPartial; if (expectedV3) { assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); } else { assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); } assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision); - if (expectedNoSync) { - assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true"); - } else { - assertThat(requestUrl.queryParameter("no_sync")).isNull(); - } - if (expectedAcceptPartial) { - assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo("true"); - } else { - assertThat(requestUrl.queryParameter("accept_partial")).isNull(); - } + assertThat(requestUrl.queryParameter("no_sync")).isEqualTo(expectedNoSync); + assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial); if (expectedGzip) { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } else { @@ -480,8 +490,8 @@ void allParameterSpecified() throws InterruptedException { assertThat(request.getUrl()).isNotNull(); assertThat(request.getHeaders().get("Content-Type")).isEqualTo("text/plain; charset=utf-8"); assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); - assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s"); - assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("your-database"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second"); + assertThat(request.getUrl().queryParameter("db")).isEqualTo("your-database"); } @Test diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index f940afd7..272c3e4e 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -62,7 +62,7 @@ void equalConfig() { .organization("my-org") .database("my-db") .writePrecision(WritePrecision.NS) - .writeAcceptPartial(true) + .writeAcceptPartial(false) .timeout(Duration.ofSeconds(30)) .writeTimeout(Duration.ofSeconds(35)) .queryTimeout(Duration.ofSeconds(120)) @@ -94,7 +94,8 @@ void toStringConfig() { Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true); Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true); Assertions.assertThat(configString).contains("writeNoSync=false"); - Assertions.assertThat(configString).contains("writeAcceptPartial=false"); + Assertions.assertThat(configString).contains("writeAcceptPartial=true"); + Assertions.assertThat(configString).contains("writeUseV2Api=false"); Assertions.assertThat(configString).contains("timeout=PT30S"); Assertions.assertThat(configString).contains("writeTimeout=PT35S"); Assertions.assertThat(configString).contains("queryTimeout=PT2M"); @@ -107,7 +108,7 @@ void fromConnectionString() throws MalformedURLException { ClientConfig cfg = new ClientConfig.Builder() .build("http://localhost:9999/" + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128" - + "&writeNoSync=true&writeAcceptPartial=true"); + + "&writeNoSync=true&writeAcceptPartial=true&writeUseV2Api=true"); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray()); Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org"); @@ -116,6 +117,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -128,6 +130,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -139,6 +142,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -228,6 +232,7 @@ void fromEnv() { "INFLUX_GZIP_THRESHOLD", "64", "INFLUX_WRITE_NO_SYNC", "true", "INFLUX_WRITE_ACCEPT_PARTIAL", "true", + "INFLUX_WRITE_USE_V2_API", "true", "INFLUX_DISABLE_GRPC_COMPRESSION", "true" ); @@ -241,6 +246,7 @@ void fromEnv() { Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } @@ -309,6 +315,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); // basic properties = new Properties(); @@ -347,6 +354,7 @@ void fromSystemProperties() { properties.put("influx.gzipThreshold", "64"); properties.put("influx.writeNoSync", "true"); properties.put("influx.writeAcceptPartial", "true"); + properties.put("influx.writeUseV2Api", "true"); properties.put("influx.disableGRPCCompression", "true"); cfg = new ClientConfig.Builder() .build(new HashMap<>(), properties); @@ -358,6 +366,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 1df052b0..fc09cd10 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -253,10 +253,40 @@ public void testWriteErrorWithoutAcceptPartial() throws Exception { + "temperature,room=room3 value=24.064857\n" + "temperature,room=room4 value=43i"; - Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points)); + WriteOptions options = new WriteOptions.Builder() + .acceptPartial(false) + .build(); + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); + Assertions.assertThat(thrown.getMessage()) + .startsWith("HTTP status code: 400; Message: parsing failed for write_lp endpoint"); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testWriteErrorWithUseV2Api() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "temperature,room=room1 value=18.94647\n" + + "temperatureroom=room2value=20.268019\n" + + "temperature,room=room3 value=24.064857\n" + + "temperature,room=room4 value=43i"; + + WriteOptions options = new WriteOptions.Builder() + .useV2Api(true) + .build(); + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class); + Assertions.assertThat(thrown).isNotInstanceOf(InfluxDBPartialWriteException.class); Assertions.assertThat(thrown.getMessage()) - .isEqualTo("HTTP status code: 400; Message: write buffer error: " - + "line protocol parse failed: Expected at least one space character, got end of input"); + .contains("write buffer error: line protocol parse failed"); } } diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index fb481423..b12bbb5b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -541,7 +541,7 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format mockServer.enqueue(createResponse(400, "application/json", null, - "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}")); + "{\"error\":\"parsing failed for write_lp endpoint\",\"data\":{\"error_message\":\"invalid field value\"}}")); restClient = new RestClient(new ClientConfig.Builder() .host(baseURL) diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java index a2432da8..fdf95b09 100644 --- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java @@ -71,6 +71,9 @@ void optionsEqualAll() { WriteOptions acceptPartialMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) .acceptPartial(false).build(); + WriteOptions useV2ApiMismatch = new WriteOptions.Builder() + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) + .useV2Api(true).build(); WriteOptions defaultTagsMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) .defaultTags(Map.of("region", "west")).build(); @@ -84,6 +87,7 @@ void optionsEqualAll() { Assertions.assertThat(options).isNotEqualTo(gzipMismatch); Assertions.assertThat(options).isNotEqualTo(noSyncMismatch); Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch); + Assertions.assertThat(options).isNotEqualTo(useV2ApiMismatch); Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch); Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch); Assertions.assertThat(options).isNotEqualTo(headersMismatch); @@ -153,6 +157,7 @@ void optionsEmpty() { Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION); Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD); Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); Assertions.assertThat(options.tagOrderSafe()).isEmpty(); WriteOptions builderOptions = new WriteOptions.Builder().build(); @@ -160,6 +165,7 @@ void optionsEmpty() { Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S); Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512); Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); + Assertions.assertThat(builderOptions.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API); } @Test @@ -246,12 +252,39 @@ void optionsOverrideWriteAcceptPartial() { ClientConfig config = configBuilder .database("my-database") .organization("my-org") - .writeAcceptPartial(true) + .writeAcceptPartial(false) .build(); - WriteOptions options = new WriteOptions.Builder().acceptPartial(false).build(); + WriteOptions options = new WriteOptions.Builder().acceptPartial(true).build(); + + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(true); + } + + @Test + void optionsOverrideWriteUseV2Api() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeUseV2Api(false) + .build(); + + WriteOptions options = new WriteOptions.Builder().useV2Api(true).build(); + + Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(true); + } + + @Test + void optionsValidateUseV2ApiAndNoSync() { + ClientConfig config = configBuilder.build(); + + WriteOptions options = new WriteOptions.Builder() + .useV2Api(true) + .noSync(true) + .build(); - Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(false); + Assertions.assertThatThrownBy(() -> options.validate(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid write options: NoSync cannot be used in V2 API"); } @Test From 7318e224eb334456c1c05cbd69fd45c8a1cf6d63 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 15:00:27 +0200 Subject: [PATCH 13/21] docs: update CHANGELOG --- CHANGELOG.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e224172..d70e2b9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,16 @@ ## 1.9.0 [unreleased] +### BREAKING CHANGES + +1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Adds partial writes support and aligns write routing with v3 defaults. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. + For InfluxDB Clustered version, set `useV2Api=true` for writing. + ### Features 1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client. 1. [#363](https://github.com/InfluxCommunity/influxdb3-java/pull/363): Support custom tag order via `tagOrder` write option. See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. -1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Support partial writes via `acceptPartial` write option. - See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. ## 1.8.0 [2026-02-19] From b9ecfd8524417739279ae1cca90d5628eb09bc5e Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 15:01:24 +0200 Subject: [PATCH 14/21] test: fix line length --- .../com/influxdb/v3/client/InfluxDBClientWriteTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index c46896ba..4382b646 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -274,7 +274,12 @@ void writeV3MethodNotAllowedMappedError(final String name, private static Stream writeV3MethodNotAllowedCases() { return Stream.of( Arguments.of("noSync=true", (Consumer) b -> b.noSync(true), "true", null), - Arguments.of("acceptPartial=true", (Consumer) b -> b.acceptPartial(true), null, null) + Arguments.of( + "acceptPartial=true", + (Consumer) b -> b.acceptPartial(true), + null, + null + ) ); } From 1615b0d2c9ee5c5296308314f5570959aeedda5f Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 15:19:58 +0200 Subject: [PATCH 15/21] test: align test with referential Go implementation --- .../v3/client/integration/E2ETest.java | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index fc09cd10..dc02f7da 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -199,10 +199,9 @@ public void testAcceptPartialWriteError() throws Exception { System.getenv("TESTING_INFLUXDB_DATABASE"), null)) { - String points = "temperature,room=room1 value=18.94647\n" - + "temperatureroom=room2value=20.268019\n" - + "temperature,room=room3 value=24.064857\n" - + "temperature,room=room4 value=43i"; + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; WriteOptions options = new WriteOptions.Builder() .acceptPartial(true) @@ -212,25 +211,27 @@ public void testAcceptPartialWriteError() throws Exception { Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: Expected at least one space character, got end of input (temperatureroom=room)\n" - + "\tline 4: invalid column type for column 'value', expected iox::column_type::field::float, " - + "got iox::column_type::field::integer (temperature,room=roo)"; + + "\tline 2: invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string (home,room=Sunroom te)\n" + + "\tline 3: invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer (home,room=Sunroom te)"; Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage); InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; Assertions.assertThat(partialError.lineErrors()).hasSize(2); Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) - .isEqualTo("Expected at least one space character, got end of input"); + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string"); Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) - .isEqualTo("temperatureroom=room"); + .isEqualTo("home,room=Sunroom te"); - Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(4); + Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(3); Assertions.assertThat(partialError.lineErrors().get(1).errorMessage()) - .isEqualTo("invalid column type for column 'value', expected iox::column_type::field::float, " + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + "got iox::column_type::field::integer"); Assertions.assertThat(partialError.lineErrors().get(1).originalLine()) - .isEqualTo("temperature,room=roo"); + .isEqualTo("home,room=Sunroom te"); Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class); Assertions.assertThat(partialError.statusCode()).isEqualTo(400); @@ -248,10 +249,9 @@ public void testWriteErrorWithoutAcceptPartial() throws Exception { System.getenv("TESTING_INFLUXDB_DATABASE"), null)) { - String points = "temperature,room=room1 value=18.94647\n" - + "temperatureroom=room2value=20.268019\n" - + "temperature,room=room3 value=24.064857\n" - + "temperature,room=room4 value=43i"; + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; WriteOptions options = new WriteOptions.Builder() .acceptPartial(false) @@ -259,7 +259,16 @@ public void testWriteErrorWithoutAcceptPartial() throws Exception { Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); Assertions.assertThat(thrown.getMessage()) - .startsWith("HTTP status code: 400; Message: parsing failed for write_lp endpoint"); + .contains("parsing failed for write_lp endpoint"); + + InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialError.lineErrors()).hasSize(1); + Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); + Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) + .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, " + + "got iox::column_type::field::string"); + Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) + .isEqualTo("home,room=Sunroom te"); } } @@ -274,10 +283,9 @@ public void testWriteErrorWithUseV2Api() throws Exception { System.getenv("TESTING_INFLUXDB_DATABASE"), null)) { - String points = "temperature,room=room1 value=18.94647\n" - + "temperatureroom=room2value=20.268019\n" - + "temperature,room=room3 value=24.064857\n" - + "temperature,room=room4 value=43i"; + String points = "home,room=Sunroom temp=96 1735545600\n" + + "home,room=Sunroom temp=\"hi\" 1735545610\n" + + "home,room=Sunroom temp=88i 1735545620"; WriteOptions options = new WriteOptions.Builder() .useV2Api(true) From 895d3f0ee17726ad29dff68b3de701850668b085 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 15:49:24 +0200 Subject: [PATCH 16/21] fix: align error parsing --- .../com/influxdb/v3/client/internal/RestClient.java | 10 +++++++--- .../influxdb/v3/client/internal/RestClientTest.java | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 32906e2a..f45dd932 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -299,9 +299,13 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St // Core/Enterprise object format: // {"error":"...","data":{"error_message":"..."}} - final String errorMessage = errNonEmptyField(dataNode, "error_message"); - if (errorMessage != null) { - return errorMessage; + if (isV3PartialWriteError(error) && dataNode != null && dataNode.isObject()) { + final StringBuilder message = new StringBuilder(error); + final String errorMessage = errNonEmptyField(dataNode, "error_message"); + if (errorMessage != null) { + message.append(":\n\t").append(errorMessage); + } + return message.toString(); } return error; diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index b12bbb5b..3dbae035 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -551,7 +551,8 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format Assertions.assertThat(thrown) .isInstanceOf(InfluxDBPartialWriteException.class) .isInstanceOf(InfluxDBApiHttpException.class) - .hasMessage("HTTP status code: 400; Message: invalid field value"); + .hasMessage("HTTP status code: 400; Message: parsing failed for write_lp endpoint:\n" + + "\tinvalid field value"); InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400); From 02fe2df4dd751bd61778248a9c9bd5f5b46e0740 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 15:59:19 +0200 Subject: [PATCH 17/21] test: align expected error --- .../java/com/influxdb/v3/client/internal/RestClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 3dbae035..25f18744 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -858,7 +858,7 @@ private static Stream errorFromBodyV3FallbackCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\"," + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}", InfluxDBApiHttpException.class, - "HTTP status code: 400; Message: bad line" + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tbad line" ), Arguments.of( "write endpoint with scalar data falls back to error", From 88f678b15f34940dbdb0d900620cf22cb2a9d939 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 16:06:14 +0200 Subject: [PATCH 18/21] refactor: minor simplification and reuse --- .../v3/client/internal/RestClient.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index f45dd932..de2aa116 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -262,9 +262,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St return null; } - if (contentType != null - && !contentType.isEmpty() - && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + if (!errIsJsonLikeContentType(contentType)) { return null; } @@ -300,12 +298,10 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St // Core/Enterprise object format: // {"error":"...","data":{"error_message":"..."}} if (isV3PartialWriteError(error) && dataNode != null && dataNode.isObject()) { - final StringBuilder message = new StringBuilder(error); final String errorMessage = errNonEmptyField(dataNode, "error_message"); - if (errorMessage != null) { - message.append(":\n\t").append(errorMessage); - } - return message.toString(); + return errorMessage == null + ? error + : error + ":\n\t" + errorMessage; } return error; @@ -323,9 +319,7 @@ private List parsePartialWriteLineError return List.of(); } - if (contentType != null - && !contentType.isEmpty() - && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + if (!errIsJsonLikeContentType(contentType)) { return List.of(); } @@ -383,6 +377,12 @@ private boolean isV3PartialWriteError(@Nullable final String errorMessage) { || normalized.contains("parsing failed for write_lp endpoint"); } + private boolean errIsJsonLikeContentType(@Nullable final String contentType) { + return contentType == null + || contentType.isEmpty() + || contentType.regionMatches(true, 0, "application/json", 0, "application/json".length()); + } + @Nullable private String errNonEmptyText(@Nullable final JsonNode node) { if (node == null || node.isNull()) { From 4a77bd97c60fd8e08be3d12354812ada478cd4b9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 17:30:47 +0200 Subject: [PATCH 19/21] docs: update README --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index 542528c9..8ae6179d 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,28 @@ String record = "temperature,location=north value=60.0"; client.writeRecord(record); ``` +#### Accept partial writes and inspect failed lines + +Partial writes are enabled by default. +`acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`. + +Set `acceptPartial(false)` to disable partial writes. +With InfluxDB Core/Enterprise, when a write request fails due to one or more invalid lines, the error message starts with: + +- `partial write of line protocol occurred` when partial writes are enabled. +- `parsing failed for write_lp endpoint` when partial writes are disabled. + +When partial writes are disabled, any rejected line causes all lines to be rejected. +InfluxDB Clustered does not return this structured partial-write error format. + +#### Compatibility with InfluxDB Clustered + +For InfluxDB Clustered, enable `useV2Api` for writes. +Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`. + +If `useV2Api` is set, `acceptPartial` is ignored because this compatibility mode does not support partial-write controls. +Any rejected line causes all lines to be rejected. + to query your data, you can use code like this: ```java From 470c481096bc388773db212a02b45a630456dbce Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 17:37:10 +0200 Subject: [PATCH 20/21] docs: fix heading levels --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8ae6179d..e2b7ab5a 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ String record = "temperature,location=north value=60.0"; client.writeRecord(record); ``` -#### Accept partial writes and inspect failed lines +### Accept partial writes and inspect failed lines Partial writes are enabled by default. `acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`. @@ -158,7 +158,7 @@ With InfluxDB Core/Enterprise, when a write request fails due to one or more inv When partial writes are disabled, any rejected line causes all lines to be rejected. InfluxDB Clustered does not return this structured partial-write error format. -#### Compatibility with InfluxDB Clustered +### Compatibility with InfluxDB Clustered For InfluxDB Clustered, enable `useV2Api` for writes. Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`. From e2635b71f9df54b71bc3b168d463eadd82ab6466 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 20 Apr 2026 17:46:43 +0200 Subject: [PATCH 21/21] docs: add headings --- README.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e2b7ab5a..4bf6c39b 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ dependencies { ## Usage +### Create client + To start with the client, import the `com.influxdb.v3.client` package and create a `InfluxDBClient` by: ```java @@ -91,7 +93,9 @@ public class IOxExample { } ``` -to insert data, you can use code like this: +### Write + +To insert data, you can use code like this: ```java // @@ -144,7 +148,7 @@ String record = "temperature,location=north value=60.0"; client.writeRecord(record); ``` -### Accept partial writes and inspect failed lines +#### Accept partial writes and inspect failed lines Partial writes are enabled by default. `acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`. @@ -158,7 +162,7 @@ With InfluxDB Core/Enterprise, when a write request fails due to one or more inv When partial writes are disabled, any rejected line causes all lines to be rejected. InfluxDB Clustered does not return this structured partial-write error format. -### Compatibility with InfluxDB Clustered +#### Compatibility with InfluxDB Clustered For InfluxDB Clustered, enable `useV2Api` for writes. Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`. @@ -166,7 +170,9 @@ Like other write options, this can be configured in client code, connection stri If `useV2Api` is set, `acceptPartial` is ignored because this compatibility mode does not support partial-write controls. Any rejected line causes all lines to be rejected. -to query your data, you can use code like this: +### Query + +To query your data, you can use code like this: ```java //