Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 1.9.0 [unreleased]

### BREAKING CHANGES

1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Adds partial writes support and aligns write routing with v3 defaults.
See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more.
For InfluxDB Clustered version, set `useV2Api=true` for writing.

### Features

1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client.
Expand Down
56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ dependencies {

## Usage

### Create client

To start with the client, import the `com.influxdb.v3.client` package and create a `InfluxDBClient` by:

```java
Expand All @@ -73,6 +75,7 @@ import java.util.List;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.write.WriteOptions;
Expand All @@ -90,7 +93,9 @@ public class IOxExample {
}
```

to insert data, you can use code like this:
### Write

To insert data, you can use code like this:

```java
//
Expand All @@ -113,14 +118,61 @@ client.writePoint(
orderedTagWrite
);

//
// Write with partial acceptance (default behavior)
//
WriteOptions partialWrite = new WriteOptions.Builder().build();
try {
client.writeRecords(List.of(
"temperature,region=west value=20.0",
"temperature,region=west value=\"bad\""
), partialWrite);
} catch (InfluxDBPartialWriteException e) {
// Inspect failed line details.
e.lineErrors().forEach(line ->
System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine()));
}

//
// Write via v2 compatibility endpoint (InfluxDB Clustered)
//
WriteOptions useV2 = new WriteOptions.Builder()
.useV2Api(true)
.build();
client.writeRecord("temperature,location=north value=60.0", useV2);

//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";
client.writeRecord(record);
```

to query your data, you can use code like this:
#### Accept partial writes and inspect failed lines

Partial writes are enabled by default.
`acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`.

Set `acceptPartial(false)` to disable partial writes.
With InfluxDB Core/Enterprise, when a write request fails due to one or more invalid lines, the error message starts with:

- `partial write of line protocol occurred` when partial writes are enabled.
- `parsing failed for write_lp endpoint` when partial writes are disabled.

When partial writes are disabled, any rejected line causes all lines to be rejected.
InfluxDB Clustered does not return this structured partial-write error format.

#### Compatibility with InfluxDB Clustered

For InfluxDB Clustered, enable `useV2Api` for writes.
Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`.

If `useV2Api` is set, `acceptPartial` is ignored because this compatibility mode does not support partial-write controls.
Any rejected line causes all lines to be rejected.

### Query

To query your data, you can use code like this:

```java
//
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
* <li>precision - timestamp precision when writing data</li>
* <li>gzipThreshold - payload size size for gzipping data</li>
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
* <li>writeAcceptPartial - accept partial writes</li>
* <li>writeUseV2Api - use v2 compatibility write endpoint</li>
* </ul>
*
* @param connectionString connection string
Expand Down Expand Up @@ -590,6 +592,8 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>INFLUX_PRECISION - timestamp precision when writing data</li>
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
* <li>INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes</li>
* <li>INFLUX_WRITE_USE_V2_API - use v2 compatibility write endpoint</li>
* </ul>
* Supported system properties:
* <ul>
Expand All @@ -601,6 +605,8 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>influx.precision - timestamp precision when writing data</li>
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
* <li>influx.writeAcceptPartial - accept partial writes</li>
* <li>influx.writeUseV2Api - use v2 compatibility write endpoint</li>
* </ul>
*
* @return instance of {@link InfluxDBClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.v3.client;

import java.net.http.HttpHeaders;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* HTTP exception for partial write errors returned by InfluxDB 3 write endpoint.
* Contains parsed line-level write errors so callers can decide how to handle failed lines.
*/
public class InfluxDBPartialWriteException extends InfluxDBApiHttpException {

private final List<LineError> lineErrors;

/**
* Construct a new InfluxDBPartialWriteException.
*
* @param message detail message
* @param headers response headers
* @param statusCode response status code
* @param lineErrors line-level errors parsed from response body
*/
public InfluxDBPartialWriteException(
@Nullable final String message,
@Nullable final HttpHeaders headers,
final int statusCode,
@Nonnull final List<LineError> lineErrors) {
super(message, headers, statusCode);
this.lineErrors = List.copyOf(lineErrors);
}

/**
* Line-level write errors.
*
* @return immutable list of line errors
*/
@Nonnull
public List<LineError> lineErrors() {
return lineErrors;
}

/**
* Represents one failed line from a partial write response.
*/
public static final class LineError {

private final Integer lineNumber;
private final String errorMessage;
private final String originalLine;

/**
* @param lineNumber line number in the write payload; may be null if not provided by server
* @param errorMessage line-level error message
* @param originalLine original line protocol row; may be null if not provided by server
*/
public LineError(@Nullable final Integer lineNumber,
@Nonnull final String errorMessage,
@Nullable final String originalLine) {
this.lineNumber = lineNumber;
this.errorMessage = errorMessage;
this.originalLine = originalLine;
}

/**
* @return line number or null if server didn't provide it
*/
@Nullable
public Integer lineNumber() {
return lineNumber;
}

/**
* @return line-level error message
*/
@Nonnull
public String errorMessage() {
return errorMessage;
}

/**
* @return original line protocol row or null if server didn't provide it
*/
@Nullable
public String originalLine() {
return originalLine;
}
}
}
78 changes: 77 additions & 1 deletion src/main/java/com/influxdb/v3/client/config/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
* <li><code>writeAcceptPartial</code> - accept partial writes</li>
* <li><code>writeUseV2Api</code> - use v2 compatibility write endpoint</li>
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
Expand Down Expand Up @@ -107,6 +109,8 @@ public final class ClientConfig {
private final WritePrecision writePrecision;
private final Integer gzipThreshold;
private final Boolean writeNoSync;
private final Boolean writeAcceptPartial;
private final Boolean writeUseV2Api;
private final Map<String, String> defaultTags;
@Deprecated
private final Duration timeout;
Expand Down Expand Up @@ -208,6 +212,26 @@ public Boolean getWriteNoSync() {
return writeNoSync;
}

/**
* Accept partial writes?
*
* @return accept partial writes
*/
@Nonnull
public Boolean getWriteAcceptPartial() {
return writeAcceptPartial;
}

/**
* Use v2 compatibility write endpoint?
*
* @return use v2 compatibility write endpoint
*/
@Nonnull
public Boolean getWriteUseV2Api() {
return writeUseV2Api;
}

/**
* Gets default tags used when writing points.
* @return default tags
Expand Down Expand Up @@ -370,6 +394,8 @@ public boolean equals(final Object o) {
&& writePrecision == that.writePrecision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(writeNoSync, that.writeNoSync)
&& Objects.equals(writeAcceptPartial, that.writeAcceptPartial)
&& Objects.equals(writeUseV2Api, that.writeUseV2Api)
&& Objects.equals(defaultTags, that.defaultTags)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(writeTimeout, that.writeTimeout)
Expand All @@ -388,7 +414,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
database, writePrecision, gzipThreshold, writeNoSync,
database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, writeUseV2Api,
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
proxy, proxyUrl, authenticator, headers,
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
Expand All @@ -403,6 +429,8 @@ public String toString() {
.add("writePrecision=" + writePrecision)
.add("gzipThreshold=" + gzipThreshold)
.add("writeNoSync=" + writeNoSync)
.add("writeAcceptPartial=" + writeAcceptPartial)
.add("writeUseV2Api=" + writeUseV2Api)
.add("timeout=" + timeout)
.add("writeTimeout=" + writeTimeout)
.add("queryTimeout=" + queryTimeout)
Expand Down Expand Up @@ -432,6 +460,8 @@ public static final class Builder {
private WritePrecision writePrecision;
private Integer gzipThreshold;
private Boolean writeNoSync;
private Boolean writeAcceptPartial;
private Boolean writeUseV2Api;
private Map<String, String> defaultTags;
@Deprecated
private Duration timeout;
Expand Down Expand Up @@ -554,6 +584,32 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
return this;
}

/**
* Sets whether to accept partial writes.
*
* @param writeAcceptPartial accept partial writes
* @return this
*/
@Nonnull
public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) {

this.writeAcceptPartial = writeAcceptPartial;
return this;
}

/**
* Sets whether to use v2 compatibility write endpoint.
*
* @param writeUseV2Api use v2 compatibility write endpoint
* @return this
*/
@Nonnull
public Builder writeUseV2Api(@Nullable final Boolean writeUseV2Api) {

this.writeUseV2Api = writeUseV2Api;
return this;
}

/**
* Sets default tags to be written with points.
*
Expand Down Expand Up @@ -800,6 +856,12 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
if (parameters.containsKey("writeNoSync")) {
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
}
if (parameters.containsKey("writeAcceptPartial")) {
this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial")));
}
if (parameters.containsKey("writeUseV2Api")) {
this.writeUseV2Api(Boolean.parseBoolean(parameters.get("writeUseV2Api")));
}
if (parameters.containsKey("disableGRPCCompression")) {
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
}
Expand Down Expand Up @@ -855,6 +917,14 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
if (writeNoSync != null) {
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
}
final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial");
if (writeAcceptPartial != null) {
this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial));
}
final String writeUseV2Api = get.apply("INFLUX_WRITE_USE_V2_API", "influx.writeUseV2Api");
if (writeUseV2Api != null) {
this.writeUseV2Api(Boolean.parseBoolean(writeUseV2Api));
}
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
if (writeTimeout != null) {
long to = Long.parseLong(writeTimeout);
Expand Down Expand Up @@ -911,6 +981,12 @@ private ClientConfig(@Nonnull final Builder builder) {
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
writeAcceptPartial = builder.writeAcceptPartial != null
? builder.writeAcceptPartial
: WriteOptions.DEFAULT_ACCEPT_PARTIAL;
writeUseV2Api = builder.writeUseV2Api != null
? builder.writeUseV2Api
: WriteOptions.DEFAULT_USE_V2_API;
defaultTags = builder.defaultTags;
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
writeTimeout = builder.writeTimeout != null
Expand Down
Loading
Loading