details = new ArrayList<>();
+ for (JsonNode item : dataNode) {
+ final String raw = errNonEmptyText(item);
+ if (raw != null) {
+ details.add(raw);
+ }
+ }
+ return details;
+ }
+
@Nullable
- private String errFormatDataArrayDetail(@Nullable final JsonNode item) {
- if (!item.isObject()) {
+ private ErrDataArrayItem[] errReadDataArray(@Nonnull final JsonNode dataNode) {
+ try {
+ return objectMapper.treeToValue(dataNode, ErrDataArrayItem[].class);
+ } catch (JsonProcessingException e) {
return null;
}
+ }
- final String errorMessage = errNonEmptyField(item, "error_message");
- if (errorMessage == null) {
+ @Nullable
+ private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final ErrDataArrayItem item) {
+ if (item == null || item.errorMessage == null || item.errorMessage.isEmpty()) {
return null;
}
- if (item.hasNonNull("line_number")) {
- final String originalLine = errNonEmptyField(item, "original_line");
- if (originalLine != null) {
- final String lineNumber = item.get("line_number").asText();
- return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")";
- }
- }
- return errorMessage;
+ final String originalLine =
+ (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine;
+ return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine);
+ }
+
+ private static final class ErrDataArrayItem {
+ @JsonProperty("error_message")
+ private String errorMessage;
+
+ @JsonProperty("line_number")
+ private Integer lineNumber;
+
+ @JsonProperty("original_line")
+ private String originalLine;
}
private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) {
diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
index e9b4ca1b..00618ec1 100644
--- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
+++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
@@ -43,6 +43,9 @@
* precision - specifies the precision to use for the timestamp of points
* defaultTags - specifies tags to be added by default to all write operations using points.
* tagOrder - specifies preferred tag order for point serialization.
+ * noSync - skip waiting for WAL persistence on write
+ * acceptPartial - accept partial writes
+ * useV2Api - use v2 compatibility write endpoint
* headers - specifies the headers to be added to write request
*
*
@@ -68,6 +71,14 @@ public final class WriteOptions {
* Default NoSync.
*/
public static final boolean DEFAULT_NO_SYNC = false;
+ /**
+ * Default AcceptPartial.
+ */
+ public static final boolean DEFAULT_ACCEPT_PARTIAL = true;
+ /**
+ * Default UseV2Api.
+ */
+ public static final boolean DEFAULT_USE_V2_API = false;
/**
* Default timeout for writes in seconds. Set to {@value}
@@ -81,12 +92,15 @@ public final class WriteOptions {
@Deprecated(forRemoval = true)
public static final WriteOptions DEFAULTS = new WriteOptions(
- null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null, null);
+ null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL,
+ DEFAULT_USE_V2_API, null, null, null);
private final String database;
private final WritePrecision precision;
private final Integer gzipThreshold;
private final Boolean noSync;
+ private final Boolean acceptPartial;
+ private final Boolean useV2Api;
private final Map defaultTags;
private final List tagOrder;
private final Map headers;
@@ -99,6 +113,7 @@ public final class WriteOptions {
*/
public static WriteOptions defaultWriteOptions() {
return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC,
+ DEFAULT_ACCEPT_PARTIAL, DEFAULT_USE_V2_API,
null, null, null);
}
@@ -152,7 +167,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final WritePrecision precision,
@Nullable final Integer gzipThreshold,
@Nullable final Boolean noSync) {
- this(database, precision, gzipThreshold, noSync, null, null);
+ this(database, precision, gzipThreshold, noSync, null, null, null);
}
/**
@@ -184,7 +199,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final Integer gzipThreshold,
@Nullable final Map defaultTags,
@Nullable final Map headers) {
- this(database, precision, gzipThreshold, null, defaultTags, headers);
+ this(database, precision, gzipThreshold, null, null, defaultTags, headers, null);
}
/**
@@ -209,7 +224,38 @@ public WriteOptions(@Nullable final String database,
@Nullable final Boolean noSync,
@Nullable final Map defaultTags,
@Nullable final Map headers) {
- this(database, precision, gzipThreshold, noSync, defaultTags, headers, null);
+ this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, null);
+ }
+
+ /**
+ * Construct WriteAPI options.
+ *
+ * @param database The database to be used for InfluxDB operations.
+ * If it is not specified then use {@link ClientConfig#getDatabase()}.
+ * @param precision The precision to use for the timestamp of points.
+ * If it is not specified then use {@link ClientConfig#getWritePrecision()}.
+ * @param gzipThreshold The threshold for compressing request body.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}.
+ * @param noSync Skip waiting for WAL persistence on write.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}.
+ * @param acceptPartial Request partial write acceptance.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}.
+ * @param defaultTags Default tags to be added when writing points.
+ * @param headers The headers to be added to write request.
+ * The headers specified here are preferred over the headers
+ * specified in the client configuration.
+ * @param tagOrder Preferred order of tags in line protocol serialization.
+ * Null or empty tag names are ignored.
+ */
+ public WriteOptions(@Nullable final String database,
+ @Nullable final WritePrecision precision,
+ @Nullable final Integer gzipThreshold,
+ @Nullable final Boolean noSync,
+ @Nullable final Boolean acceptPartial,
+ @Nullable final Map defaultTags,
+ @Nullable final Map headers,
+ @Nullable final List tagOrder) {
+ this(database, precision, gzipThreshold, noSync, acceptPartial, null, defaultTags, headers, tagOrder);
}
/**
@@ -223,6 +269,10 @@ public WriteOptions(@Nullable final String database,
* If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}.
* @param noSync Skip waiting for WAL persistence on write.
* If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}.
+ * @param acceptPartial Request partial write acceptance.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}.
+ * @param useV2Api Use v2 compatibility write endpoint.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_USE_V2_API}.
* @param defaultTags Default tags to be added when writing points.
* @param headers The headers to be added to write request.
* The headers specified here are preferred over the headers
@@ -234,6 +284,8 @@ public WriteOptions(@Nullable final String database,
@Nullable final WritePrecision precision,
@Nullable final Integer gzipThreshold,
@Nullable final Boolean noSync,
+ @Nullable final Boolean acceptPartial,
+ @Nullable final Boolean useV2Api,
@Nullable final Map defaultTags,
@Nullable final Map headers,
@Nullable final List tagOrder) {
@@ -241,11 +293,41 @@ public WriteOptions(@Nullable final String database,
this.precision = precision;
this.gzipThreshold = gzipThreshold;
this.noSync = noSync;
+ this.acceptPartial = acceptPartial;
+ this.useV2Api = useV2Api;
this.defaultTags = defaultTags == null ? Map.of() : defaultTags;
this.tagOrder = sanitizeTagOrder(tagOrder);
this.headers = headers == null ? Map.of() : headers;
}
+ /**
+ * Construct WriteAPI options.
+ *
+ * @param database The database to be used for InfluxDB operations.
+ * If it is not specified then use {@link ClientConfig#getDatabase()}.
+ * @param precision The precision to use for the timestamp of points.
+ * If it is not specified then use {@link ClientConfig#getWritePrecision()}.
+ * @param gzipThreshold The threshold for compressing request body.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}.
+ * @param noSync Skip waiting for WAL persistence on write.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}.
+ * @param defaultTags Default tags to be added when writing points.
+ * @param headers The headers to be added to write request.
+ * The headers specified here are preferred over the headers
+ * specified in the client configuration.
+ * @param tagOrder Preferred order of tags in line protocol serialization.
+ * Null or empty tag names are ignored.
+ */
+ public WriteOptions(@Nullable final String database,
+ @Nullable final WritePrecision precision,
+ @Nullable final Integer gzipThreshold,
+ @Nullable final Boolean noSync,
+ @Nullable final Map defaultTags,
+ @Nullable final Map headers,
+ @Nullable final List tagOrder) {
+ this(database, precision, gzipThreshold, noSync, null, null, defaultTags, headers, tagOrder);
+ }
+
/**
* @param config with default value
* @return The destination database for writes.
@@ -299,8 +381,37 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) {
*/
public boolean noSyncSafe(@Nonnull final ClientConfig config) {
Arguments.checkNotNull(config, "config");
- return noSync != null ? noSync
- : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC);
+ return noSync != null ? noSync : config.getWriteNoSync();
+ }
+
+ /**
+ * @param config with default value
+ * @return Accept partial write.
+ */
+ public boolean acceptPartialSafe(@Nonnull final ClientConfig config) {
+ Arguments.checkNotNull(config, "config");
+ return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial();
+ }
+
+ /**
+ * @param config with default value
+ * @return Use v2 compatibility write endpoint.
+ */
+ public boolean useV2ApiSafe(@Nonnull final ClientConfig config) {
+ Arguments.checkNotNull(config, "config");
+ return useV2Api != null ? useV2Api : config.getWriteUseV2Api();
+ }
+
+ /**
+ * Validate write option combinations.
+ *
+ * @param config with default values
+ */
+ public void validate(@Nonnull final ClientConfig config) {
+ Arguments.checkNotNull(config, "config");
+ if (useV2ApiSafe(config) && noSyncSafe(config)) {
+ throw new IllegalArgumentException("invalid write options: NoSync cannot be used in V2 API");
+ }
}
/**
@@ -332,6 +443,8 @@ public boolean equals(final Object o) {
&& precision == that.precision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(noSync, that.noSync)
+ && Objects.equals(acceptPartial, that.acceptPartial)
+ && Objects.equals(useV2Api, that.useV2Api)
&& defaultTags.equals(that.defaultTags)
&& tagOrder.equals(that.tagOrder)
&& headers.equals(that.headers);
@@ -339,7 +452,8 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
- return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, tagOrder, headers);
+ return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, useV2Api, defaultTags, tagOrder,
+ headers);
}
private boolean isNotDefined(final String option) {
@@ -368,6 +482,8 @@ public static final class Builder {
private WritePrecision precision;
private Integer gzipThreshold;
private Boolean noSync;
+ private Boolean acceptPartial;
+ private Boolean useV2Api;
private Map defaultTags = new HashMap<>();
private List tagOrder = List.of();
private Map headers = new HashMap<>();
@@ -424,6 +540,32 @@ public Builder noSync(@Nonnull final Boolean noSync) {
return this;
}
+ /**
+ * Sets whether to request partial write acceptance.
+ *
+ * @param acceptPartial request partial write acceptance
+ * @return this
+ */
+ @Nonnull
+ public Builder acceptPartial(@Nonnull final Boolean acceptPartial) {
+
+ this.acceptPartial = acceptPartial;
+ return this;
+ }
+
+ /**
+ * Sets whether to use v2 compatibility write endpoint.
+ *
+ * @param useV2Api use v2 compatibility write endpoint
+ * @return this
+ */
+ @Nonnull
+ public Builder useV2Api(@Nonnull final Boolean useV2Api) {
+
+ this.useV2Api = useV2Api;
+ return this;
+ }
+
/**
* Sets defaultTags.
*
@@ -473,7 +615,8 @@ public WriteOptions build() {
}
private WriteOptions(@Nonnull final Builder builder) {
- this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags,
- builder.headers, builder.tagOrder);
+ this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial,
+ builder.useV2Api,
+ builder.defaultTags, builder.headers, builder.tagOrder);
}
}
diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
index a7d31240..4382b646 100644
--- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
+++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
@@ -27,7 +27,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import io.netty.handler.codec.http.HttpResponseStatus;
import mockwebserver3.RecordedRequest;
@@ -36,6 +39,9 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.write.WriteOptions;
@@ -106,7 +112,7 @@ void databaseParameter() throws InterruptedException {
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database");
+ assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database");
}
@Test
@@ -119,7 +125,7 @@ void databaseParameterSpecified() throws InterruptedException {
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("my-database-2");
+ assertThat(request.getUrl().queryParameter("db")).isEqualTo("my-database-2");
}
@Test
@@ -147,7 +153,7 @@ void precisionParameter() throws InterruptedException {
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond");
}
@Test
@@ -160,7 +166,7 @@ void precisionParameterSpecified() throws InterruptedException {
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second");
}
@Test
@@ -191,46 +197,64 @@ void gzipParameterSpecified() throws InterruptedException {
assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip");
}
- @Test
- void writeNoSyncFalseUsesV2API() throws InterruptedException {
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("writeRoutingCases")
+ void writeRoutingCase(final String name,
+ final Consumer configure,
+ final String expectedPath,
+ final String expectedDbParamName,
+ final String expectedPrecision,
+ @Nullable final String expectedNoSync,
+ @Nullable final String expectedAcceptPartial) throws InterruptedException {
mockServer.enqueue(createResponse(200));
+ WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.NS);
+ configure.accept(optionsBuilder);
client.writeRecord("mem,tag=one value=1.0",
- new WriteOptions.Builder().precision(WritePrecision.NS).noSync(false).build());
+ optionsBuilder.build());
assertThat(mockServer.getRequestCount()).isEqualTo(1);
RecordedRequest request = mockServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v2/write");
- assertThat(request.getUrl().queryParameter("no_sync")).isNull();
- assertThat(request.getUrl().queryParameter("precision")).isEqualTo("ns");
-
- }
-
- @Test
- void writeNoSyncTrueUsesV3API() throws InterruptedException {
- mockServer.enqueue(createResponse(200));
-
- client.writeRecord("mem,tag=one value=1.0",
- new WriteOptions.Builder().precision(WritePrecision.NS).noSync(true).build());
-
- assertThat(mockServer.getRequestCount()).isEqualTo(1);
- RecordedRequest request = mockServer.takeRequest();
- assertThat(request).isNotNull();
- assertThat(request.getUrl()).isNotNull();
- assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp");
- assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true");
- assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond");
+ assertThat(request.getUrl().encodedPath()).isEqualTo(expectedPath);
+ assertThat(request.getUrl().queryParameter(expectedDbParamName)).isEqualTo("my-database");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo(expectedPrecision);
+ assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync);
+ assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial);
+ }
+
+ private static Stream writeRoutingCases() {
+ return Stream.of(
+ Arguments.of("v3 noSync=false", (Consumer) b -> b.noSync(false),
+ "/api/v3/write_lp", "db", "nanosecond", null, null),
+ Arguments.of("v3 noSync=true", (Consumer) b -> b.noSync(true),
+ "/api/v3/write_lp", "db", "nanosecond", "true", null),
+ Arguments.of("v3 acceptPartial=true", (Consumer) b -> b.acceptPartial(true),
+ "/api/v3/write_lp", "db", "nanosecond", null, null),
+ Arguments.of("v3 acceptPartial=false", (Consumer) b -> b.acceptPartial(false),
+ "/api/v3/write_lp", "db", "nanosecond", null, "false"),
+ Arguments.of("v2 useV2Api=true", (Consumer) b -> b.useV2Api(true),
+ "/api/v2/write", "bucket", "ns", null, null),
+ Arguments.of("v2 useV2Api=true, acceptPartial=false",
+ (Consumer) b -> b.useV2Api(true).acceptPartial(false),
+ "/api/v2/write", "bucket", "ns", null, null)
+ );
}
- @Test
- void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("writeV3MethodNotAllowedCases")
+ void writeV3MethodNotAllowedMappedError(final String name,
+ final Consumer configure,
+ @Nullable final String expectedNoSync,
+ @Nullable final String expectedAcceptPartial) throws InterruptedException {
mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code()));
+ WriteOptions.Builder optionsBuilder = new WriteOptions.Builder().precision(WritePrecision.MS);
+ configure.accept(optionsBuilder);
+
InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class,
- () -> client.writeRecord("mem,tag=one value=1.0",
- new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build())
+ () -> client.writeRecord("mem,tag=one value=1.0", optionsBuilder.build())
);
assertThat(mockServer.getRequestCount()).isEqualTo(1);
@@ -238,12 +262,36 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
assertThat(request).isNotNull();
assertThat(request.getUrl()).isNotNull();
assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp");
- assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo("true");
+ assertThat(request.getUrl().queryParameter("no_sync")).isEqualTo(expectedNoSync);
+ assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial);
assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond");
assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code());
- assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true"
- + " (supported by InfluxDB 3 Core/Enterprise servers only).");
+ assertThat(ae.getMessage()).contains("Server doesn't support v3 write API. "
+ + "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint.");
+ }
+
+ private static Stream writeV3MethodNotAllowedCases() {
+ return Stream.of(
+ Arguments.of("noSync=true", (Consumer) b -> b.noSync(true), "true", null),
+ Arguments.of(
+ "acceptPartial=true",
+ (Consumer) b -> b.acceptPartial(true),
+ null,
+ null
+ )
+ );
+ }
+
+ @Test
+ void writeUseV2ApiNoSyncValidation() {
+ Throwable thrown = catchThrowable(() -> client.writeRecord("mem,tag=one value=1.0",
+ new WriteOptions.Builder().useV2Api(true).noSync(true).build()));
+
+ Assertions.assertThat(thrown)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("invalid write options: NoSync cannot be used in V2 API");
+ assertThat(mockServer.getRequestCount()).isEqualTo(0);
}
@Test
@@ -256,7 +304,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writeRecord("mem,tag=one value=1.0");
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false);
}
@Test
@@ -272,7 +320,35 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writeRecord("mem,tag=one value=1.0");
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true);
+ }
+
+ @Test
+ void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception {
+ mockServer.enqueue(createResponse(200));
+
+ ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
+ .writeAcceptPartial(true)
+ .build();
+ try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
+ client.writeRecord("mem,tag=one value=1.0");
+ }
+
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false);
+ }
+
+ @Test
+ void writeRecordWithDefaultWriteOptionsUseV2Config() throws Exception {
+ mockServer.enqueue(createResponse(200));
+
+ ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
+ .writeUseV2Api(true)
+ .build();
+ try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
+ client.writeRecord("mem,tag=one value=1.0");
+ }
+
+ checkWriteCalled("/api/v2/write", "DB", "ns", false, null, null, false);
}
@Test
@@ -285,7 +361,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false);
}
@Test
@@ -301,7 +377,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true);
}
@Test
@@ -317,7 +393,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writePoint(point);
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false);
}
@Test
@@ -336,7 +412,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writePoint(point);
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true);
}
@Test
@@ -352,7 +428,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writePoints(List.of(point));
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", true, null, null, false);
}
@Test
@@ -371,27 +447,26 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writePoints(List.of(point));
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, "true", null, true);
}
private void checkWriteCalled(final String expectedPath, final String expectedDB,
- final String expectedPrecision, final boolean expectedNoSync,
+ final String expectedPrecision, final boolean expectedV3,
+ @Nullable final String expectedNoSync,
+ @Nullable final String expectedAcceptPartial,
final boolean expectedGzip) throws InterruptedException {
RecordedRequest request = assertThatServerRequested();
HttpUrl requestUrl = request.getUrl();
assertThat(requestUrl).isNotNull();
assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath);
- if (expectedNoSync) {
+ if (expectedV3) {
assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB);
} else {
assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB);
}
assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision);
- if (expectedNoSync) {
- assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true");
- } else {
- assertThat(requestUrl.queryParameter("no_sync")).isNull();
- }
+ assertThat(requestUrl.queryParameter("no_sync")).isEqualTo(expectedNoSync);
+ assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo(expectedAcceptPartial);
if (expectedGzip) {
assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip");
} else {
@@ -420,8 +495,8 @@ void allParameterSpecified() throws InterruptedException {
assertThat(request.getUrl()).isNotNull();
assertThat(request.getHeaders().get("Content-Type")).isEqualTo("text/plain; charset=utf-8");
assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip");
- assertThat(request.getUrl().queryParameter("precision")).isEqualTo("s");
- assertThat(request.getUrl().queryParameter("bucket")).isEqualTo("your-database");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo("second");
+ assertThat(request.getUrl().queryParameter("db")).isEqualTo("your-database");
}
@Test
diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
index f5ce2e8f..272c3e4e 100644
--- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
+++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
@@ -56,6 +56,21 @@ void equalConfig() {
Assertions.assertThat(config).isEqualTo(config);
Assertions.assertThat(config).isEqualTo(configBuilder.build());
Assertions.assertThat(config).isNotEqualTo(configBuilder);
+ Assertions.assertThat(config).isNotEqualTo(new ClientConfig.Builder()
+ .host("http://localhost:9999")
+ .token("my-token".toCharArray())
+ .organization("my-org")
+ .database("my-db")
+ .writePrecision(WritePrecision.NS)
+ .writeAcceptPartial(false)
+ .timeout(Duration.ofSeconds(30))
+ .writeTimeout(Duration.ofSeconds(35))
+ .queryTimeout(Duration.ofSeconds(120))
+ .allowHttpRedirects(true)
+ .disableServerCertificateValidation(true)
+ .headers(Map.of("X-device", "ab-01"))
+ .disableGRPCCompression(true)
+ .build());
Assertions.assertThat(config).isNotEqualTo(configBuilder.database("database").build());
}
@@ -79,6 +94,8 @@ void toStringConfig() {
Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true);
Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true);
Assertions.assertThat(configString).contains("writeNoSync=false");
+ Assertions.assertThat(configString).contains("writeAcceptPartial=true");
+ Assertions.assertThat(configString).contains("writeUseV2Api=false");
Assertions.assertThat(configString).contains("timeout=PT30S");
Assertions.assertThat(configString).contains("writeTimeout=PT35S");
Assertions.assertThat(configString).contains("queryTimeout=PT2M");
@@ -90,7 +107,8 @@ void toStringConfig() {
void fromConnectionString() throws MalformedURLException {
ClientConfig cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
- + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true");
+ + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128"
+ + "&writeNoSync=true&writeAcceptPartial=true&writeUseV2Api=true");
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray());
Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org");
@@ -98,6 +116,8 @@ void fromConnectionString() throws MalformedURLException {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true);
cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
@@ -109,6 +129,8 @@ void fromConnectionString() throws MalformedURLException {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API);
cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
@@ -120,6 +142,7 @@ void fromConnectionString() throws MalformedURLException {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API);
cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
@@ -208,6 +231,8 @@ void fromEnv() {
"INFLUX_PRECISION", "ms",
"INFLUX_GZIP_THRESHOLD", "64",
"INFLUX_WRITE_NO_SYNC", "true",
+ "INFLUX_WRITE_ACCEPT_PARTIAL", "true",
+ "INFLUX_WRITE_USE_V2_API", "true",
"INFLUX_DISABLE_GRPC_COMPRESSION", "true"
);
@@ -220,6 +245,8 @@ void fromEnv() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true);
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
}
@@ -287,6 +314,8 @@ void fromSystemProperties() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(WriteOptions.DEFAULT_USE_V2_API);
// basic
properties = new Properties();
@@ -324,6 +353,8 @@ void fromSystemProperties() {
properties.put("influx.precision", "ms");
properties.put("influx.gzipThreshold", "64");
properties.put("influx.writeNoSync", "true");
+ properties.put("influx.writeAcceptPartial", "true");
+ properties.put("influx.writeUseV2Api", "true");
properties.put("influx.disableGRPCCompression", "true");
cfg = new ClientConfig.Builder()
.build(new HashMap<>(), properties);
@@ -334,6 +365,8 @@ void fromSystemProperties() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteUseV2Api()).isEqualTo(true);
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
}
diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
index 6ec5ee72..dc02f7da 100644
--- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
+++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
@@ -41,7 +41,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import com.influxdb.v3.client.InfluxDBApiHttpException;
import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;
import com.influxdb.v3.client.config.ClientConfig;
@@ -186,6 +188,116 @@ public void testQuery() throws Exception {
}
}
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
+ @Test
+ public void testAcceptPartialWriteError() throws Exception {
+ try (InfluxDBClient client = InfluxDBClient.getInstance(
+ System.getenv("TESTING_INFLUXDB_URL"),
+ System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
+ System.getenv("TESTING_INFLUXDB_DATABASE"),
+ null)) {
+
+ String points = "home,room=Sunroom temp=96 1735545600\n"
+ + "home,room=Sunroom temp=\"hi\" 1735545610\n"
+ + "home,room=Sunroom temp=88i 1735545620";
+
+ WriteOptions options = new WriteOptions.Builder()
+ .acceptPartial(true)
+ .build();
+
+ Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options));
+ Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class);
+
+ String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: invalid column type for column 'temp', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::string (home,room=Sunroom te)\n"
+ + "\tline 3: invalid column type for column 'temp', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::integer (home,room=Sunroom te)";
+ Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage);
+
+ InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialError.lineErrors()).hasSize(2);
+ Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2);
+ Assertions.assertThat(partialError.lineErrors().get(0).errorMessage())
+ .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::string");
+ Assertions.assertThat(partialError.lineErrors().get(0).originalLine())
+ .isEqualTo("home,room=Sunroom te");
+
+ Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(3);
+ Assertions.assertThat(partialError.lineErrors().get(1).errorMessage())
+ .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::integer");
+ Assertions.assertThat(partialError.lineErrors().get(1).originalLine())
+ .isEqualTo("home,room=Sunroom te");
+
+ Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class);
+ Assertions.assertThat(partialError.statusCode()).isEqualTo(400);
+ }
+ }
+
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
+ @Test
+ public void testWriteErrorWithoutAcceptPartial() throws Exception {
+ try (InfluxDBClient client = InfluxDBClient.getInstance(
+ System.getenv("TESTING_INFLUXDB_URL"),
+ System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
+ System.getenv("TESTING_INFLUXDB_DATABASE"),
+ null)) {
+
+ String points = "home,room=Sunroom temp=96 1735545600\n"
+ + "home,room=Sunroom temp=\"hi\" 1735545610\n"
+ + "home,room=Sunroom temp=88i 1735545620";
+
+ WriteOptions options = new WriteOptions.Builder()
+ .acceptPartial(false)
+ .build();
+ Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options));
+ Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class);
+ Assertions.assertThat(thrown.getMessage())
+ .contains("parsing failed for write_lp endpoint");
+
+ InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialError.lineErrors()).hasSize(1);
+ Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2);
+ Assertions.assertThat(partialError.lineErrors().get(0).errorMessage())
+ .isEqualTo("invalid column type for column 'temp', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::string");
+ Assertions.assertThat(partialError.lineErrors().get(0).originalLine())
+ .isEqualTo("home,room=Sunroom te");
+ }
+ }
+
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
+ @Test
+ public void testWriteErrorWithUseV2Api() throws Exception {
+ try (InfluxDBClient client = InfluxDBClient.getInstance(
+ System.getenv("TESTING_INFLUXDB_URL"),
+ System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
+ System.getenv("TESTING_INFLUXDB_DATABASE"),
+ null)) {
+
+ String points = "home,room=Sunroom temp=96 1735545600\n"
+ + "home,room=Sunroom temp=\"hi\" 1735545610\n"
+ + "home,room=Sunroom temp=88i 1735545620";
+
+ WriteOptions options = new WriteOptions.Builder()
+ .useV2Api(true)
+ .build();
+ Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options));
+ Assertions.assertThat(thrown).isInstanceOf(InfluxDBApiHttpException.class);
+ Assertions.assertThat(thrown).isNotInstanceOf(InfluxDBPartialWriteException.class);
+ Assertions.assertThat(thrown.getMessage())
+ .contains("write buffer error: line protocol parse failed");
+ }
+ }
+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
index b6d4a26b..25f18744 100644
--- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
+++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
@@ -51,6 +51,7 @@
import com.influxdb.v3.client.InfluxDBApiException;
import com.influxdb.v3.client.InfluxDBApiHttpException;
import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.write.WriteOptions;
@@ -540,17 +541,26 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format
mockServer.enqueue(createResponse(400,
"application/json",
null,
- "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"invalid field value\"}}"));
+ "{\"error\":\"parsing failed for write_lp endpoint\",\"data\":{\"error_message\":\"invalid field value\"}}"));
restClient = new RestClient(new ClientConfig.Builder()
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
- .hasMessage("HTTP status code: 400; Message: invalid field value");
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBPartialWriteException.class)
+ .isInstanceOf(InfluxDBApiHttpException.class)
+ .hasMessage("HTTP status code: 400; Message: parsing failed for write_lp endpoint:\n"
+ + "\tinvalid field value");
+
+ InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400);
+ Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1);
+ InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0);
+ Assertions.assertThat(lineError.lineNumber()).isNull();
+ Assertions.assertThat(lineError.errorMessage()).isEqualTo("invalid field value");
+ Assertions.assertThat(lineError.originalLine()).isNull();
}
@Test
@@ -567,13 +577,43 @@ public void errorFromBodyV3WithDataArray() {
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBPartialWriteException.class)
+ .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float (testa6a3ad v=1 17702)");
+
+ InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1);
+ InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0);
+ Assertions.assertThat(lineError.lineNumber()).isEqualTo(2);
+ Assertions.assertThat(lineError.errorMessage())
+ .isEqualTo("invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float");
+ Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702");
+ }
+
+ @Test
+ public void errorFromBodyV3WithDataArrayAnyInvalidItemFallsBackToHttpException() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"},"
+ + "{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}]}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBApiHttpException.class)
+ .isNotInstanceOf(InfluxDBPartialWriteException.class)
.hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n"
- + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer,"
- + " got iox::column_type::field::float (testa6a3ad v=1 17702)");
+ + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n"
+ + "\t{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}");
}
@ParameterizedTest(name = "{0}")
@@ -630,7 +670,8 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
"{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":"
+ "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
"HTTP status code: 400; Message: partial write of line protocol occurred:\n"
- + "\tline 2: bad line (bad lp)"
+ + "\t1\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}"
),
Arguments.of(
"null error_message skipped",
@@ -652,6 +693,47 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
"HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ "\tline 2: bad line (bad lp)\n"
+ "\tsecond issue"
+ ),
+ Arguments.of(
+ "array of strings fallback",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[\"bad line 1\",\"bad line 2\"]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tbad line 1\n"
+ + "\tbad line 2"
+ ),
+ Arguments.of(
+ "textual numeric line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":\"2\",\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "textual non-numeric line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}"
+ ),
+ Arguments.of(
+ "empty textual line_number with empty original_line",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"only error message\",\"line_number\":\"\",\"original_line\":\"\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message"
+ ),
+ Arguments.of(
+ "non-textual line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}"
+ ),
+ Arguments.of(
+ "object line_number preserved as text",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}"
)
);
}
@@ -659,11 +741,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
@ParameterizedTest(name = "{0}")
@MethodSource("errorFromBodyV3FallbackCases")
public void errorFromBodyV3FallbackCase(final String testName,
+ final String requestPath,
+ final String contentType,
final String body,
+ final Class extends InfluxDBApiException> expectedClass,
final String expectedMessage) {
mockServer.enqueue(createResponse(400,
- "application/json",
+ contentType,
null,
body));
@@ -671,48 +756,125 @@ public void errorFromBodyV3FallbackCase(final String testName,
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
- .hasMessage(expectedMessage);
+ Throwable thrown = catchThrowable(() -> restClient.request(requestPath, HttpMethod.GET, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(expectedClass)
+ .hasMessage(expectedMessage);
}
private static Stream errorFromBodyV3FallbackCases() {
return Stream.of(
Arguments.of(
"missing error with data array falls back to body",
+ "ping",
+ "application/json",
"{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: "
+ "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}"
),
Arguments.of(
"empty error with data array falls back to body",
+ "ping",
+ "application/json",
"{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ "\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: "
+ "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ "\"bad lp\"}]}"
),
Arguments.of(
"data object without error_message falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":{}}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data object with empty error_message falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data string falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data number falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":123}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
+ ),
+ Arguments.of(
+ "partial-write with invalid data string falls back to error",
+ "ping",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "partial-write with empty data object falls back to error",
+ "ping",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "write endpoint ignores line-error parsing for non-json content type",
+ "api/v3/write_lp",
+ "text/plain",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\","
+ + "\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: "
+ + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\","
+ + "\"line_number\":2,\"original_line\":\"bad lp\"}]}"
+ ),
+ Arguments.of(
+ "write endpoint with non-object root falls back to body",
+ "api/v3/write_lp",
+ "application/json",
+ "[]",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: []"
+ ),
+ Arguments.of(
+ "write endpoint with invalid line-error object type falls back to http exception",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\","
+ + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tbad line"
+ ),
+ Arguments.of(
+ "write endpoint with scalar data falls back to error",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":123}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "write endpoint invalid json body falls back to raw body",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\"",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: {\"error\":\"partial write of line protocol occurred\""
)
);
}
diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
index a1b7974c..fdf95b09 100644
--- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
+++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
@@ -57,9 +57,10 @@ void optionsBasics() {
@Test
void optionsEqualAll() {
- WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true);
+ WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true, true, null, null, null);
WriteOptions optionsViaBuilder = new WriteOptions.Builder()
- .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build();
+ .database("my-database").precision(WritePrecision.S).gzipThreshold(512)
+ .noSync(true).acceptPartial(true).build();
Assertions.assertThat(options).isEqualTo(optionsViaBuilder);
@@ -67,6 +68,12 @@ void optionsEqualAll() {
.database("my-database").precision(WritePrecision.S).gzipThreshold(1024).noSync(true).build();
WriteOptions noSyncMismatch = new WriteOptions.Builder()
.database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(false).build();
+ WriteOptions acceptPartialMismatch = new WriteOptions.Builder()
+ .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true)
+ .acceptPartial(false).build();
+ WriteOptions useV2ApiMismatch = new WriteOptions.Builder()
+ .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true)
+ .useV2Api(true).build();
WriteOptions defaultTagsMismatch = new WriteOptions.Builder()
.database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true)
.defaultTags(Map.of("region", "west")).build();
@@ -79,6 +86,8 @@ void optionsEqualAll() {
Assertions.assertThat(options).isNotEqualTo(gzipMismatch);
Assertions.assertThat(options).isNotEqualTo(noSyncMismatch);
+ Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch);
+ Assertions.assertThat(options).isNotEqualTo(useV2ApiMismatch);
Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch);
Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch);
Assertions.assertThat(options).isNotEqualTo(headersMismatch);
@@ -147,12 +156,16 @@ void optionsEmpty() {
Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database");
Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION);
Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD);
+ Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
+ Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API);
Assertions.assertThat(options.tagOrderSafe()).isEmpty();
WriteOptions builderOptions = new WriteOptions.Builder().build();
Assertions.assertThat(builderOptions.databaseSafe(config)).isEqualTo("my-database");
Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S);
Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512);
+ Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
+ Assertions.assertThat(builderOptions.useV2ApiSafe(config)).isEqualTo(WriteOptions.DEFAULT_USE_V2_API);
}
@Test
@@ -234,6 +247,46 @@ void optionsOverrideWriteNoSync() {
Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false);
}
+ @Test
+ void optionsOverrideWriteAcceptPartial() {
+ ClientConfig config = configBuilder
+ .database("my-database")
+ .organization("my-org")
+ .writeAcceptPartial(false)
+ .build();
+
+ WriteOptions options = new WriteOptions.Builder().acceptPartial(true).build();
+
+ Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(true);
+ }
+
+ @Test
+ void optionsOverrideWriteUseV2Api() {
+ ClientConfig config = configBuilder
+ .database("my-database")
+ .organization("my-org")
+ .writeUseV2Api(false)
+ .build();
+
+ WriteOptions options = new WriteOptions.Builder().useV2Api(true).build();
+
+ Assertions.assertThat(options.useV2ApiSafe(config)).isEqualTo(true);
+ }
+
+ @Test
+ void optionsValidateUseV2ApiAndNoSync() {
+ ClientConfig config = configBuilder.build();
+
+ WriteOptions options = new WriteOptions.Builder()
+ .useV2Api(true)
+ .noSync(true)
+ .build();
+
+ Assertions.assertThatThrownBy(() -> options.validate(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("invalid write options: NoSync cannot be used in V2 API");
+ }
+
@Test
void optionsOverridesDefaultTags() {
Map defaultTagsBase = new HashMap<>() {{
@@ -303,6 +356,8 @@ void optionsHashCode() {
.isNotEqualTo(builder.database("my-database").build().hashCode());
Assertions.assertThat(baseOptions.hashCode())
.isNotEqualTo(builder.defaultTags(defaultTags).build().hashCode());
+ Assertions.assertThat(baseOptions.hashCode())
+ .isNotEqualTo(builder.acceptPartial(true).build().hashCode());
Assertions.assertThat(baseOptions.hashCode())
.isNotEqualTo(builder.tagOrder(List.of("region", "host")).build().hashCode());
}