Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6527575
Remove the auth config parameter
mtopolnik Jun 18, 2026
75df997
Remove the in_flight_window parameter
mtopolnik Jun 18, 2026
fc16eb6
Remove the gorilla ingress parameter
mtopolnik Jun 18, 2026
7bc3725
Accept transaction key in QWP query client config
mtopolnik Jun 18, 2026
9864440
Reject non-QWP keys on ws:: connect strings
mtopolnik Jun 18, 2026
091c635
Tighten QWP connect-string key validation
mtopolnik Jun 18, 2026
8843a49
Remove the path parameter
mtopolnik Jun 19, 2026
e7147fc
Make QuestDB facade QWP-over-WebSocket only
mtopolnik Jun 23, 2026
ff4d257
Add tests that every config key is honored
mtopolnik Jun 23, 2026
15b3901
Fail fast on malformed config in build()
mtopolnik Jun 23, 2026
f4479a1
Close the query client on a fromConfig failure
mtopolnik Jun 23, 2026
ad9d04a
Reject blank ws credentials and remove dead code
mtopolnik Jun 23, 2026
ad7e1cd
Remove unreachable protocol-already-set guard
mtopolnik Jun 23, 2026
1fcc6cb
Close config test gaps and drop unused field
mtopolnik Jun 24, 2026
1e2462b
Drop the ValueType tag from the key registry
mtopolnik Jun 24, 2026
8a7ff39
Fail fast on sf_durability in config validation
mtopolnik Jun 24, 2026
4d75c79
Align ws auth error messages and close test gaps
mtopolnik Jun 24, 2026
99e2ecd
Prove sender-pool unwind in build-failure test
mtopolnik Jun 24, 2026
cc5f519
Merge remote-tracking branch 'origin/main' into mt_qwp-cleanup-params
bluestreak01 Jun 24, 2026
98269c8
Use Java 8-compatible map init in ConfigView
bluestreak01 Jun 24, 2026
fceb60d
fix(config): accept '_' digit separator in addr port
bluestreak01 Jun 24, 2026
7c61bea
Merge branch 'main' into mt_qwp-cleanup-params
bluestreak01 Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions core/src/main/java/io/questdb/client/QuestDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ static QuestDBBuilder builder() {

/**
* Connects with a single configuration string used for both ingest and
* egress. The schema must be {@code http}, {@code https}, {@code ws} or
* {@code wss}; the other half of the deployment is derived by schema
* translation ({@code http}<->{@code ws}, {@code https}<->{@code wss}).
* egress. The schema must be {@code ws} or {@code wss}: QuestDB ingests and
* queries over QWP (the QuestDB WebSocket protocol), so one string
* configures both clients.
* <p>
* Use {@link #connect(CharSequence, CharSequence)} or {@link #builder()}
* for ingest transports other than HTTP/HTTPS, or when ingest and egress
* use different addresses.
* when ingest and egress use different addresses or credentials.
*
* @param configurationString a Sender- or QwpQueryClient-style config
* string (see {@link Sender#fromConfig} or
* @param configurationString a {@code ws}/{@code wss} config string (see
* {@link Sender#fromConfig} or
* {@link io.questdb.client.cutlass.qwp.client.QwpQueryClient#fromConfig})
* @return a connected QuestDB handle
*/
Expand Down
190 changes: 142 additions & 48 deletions core/src/main/java/io/questdb/client/QuestDBBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,25 @@

package io.questdb.client;

import io.questdb.client.impl.ConfigStringTranslator;
import io.questdb.client.cutlass.qwp.client.QwpQueryClient;
import io.questdb.client.impl.ConfigString;
import io.questdb.client.impl.ConfigView;
import io.questdb.client.impl.QuestDBImpl;
import org.jetbrains.annotations.TestOnly;

import java.util.function.IntConsumer;
import java.util.function.LongConsumer;

/**
* Builder for {@link QuestDB}. Most callers use {@link QuestDB#connect(CharSequence)};
* this builder is for pool sizing, idle/lifetime knobs, acquire timeout,
* and the case where ingest and egress configs differ.
* <p>
* Both configs must use the {@code ws} or {@code wss} schema (QWP over
* WebSocket). A pool key (e.g. {@code sender_pool_min}) may be carried in the
* connect string or set with an explicit builder call; an explicit call always
* wins. When both connect strings carry the same pool key with different values,
* {@link #build()} fails.
*/
public final class QuestDBBuilder {

Expand All @@ -41,16 +53,21 @@ public final class QuestDBBuilder {
static final int DEFAULT_POOL_MAX = 4;
static final int DEFAULT_POOL_MIN = 1;

private long acquireTimeoutMillis = DEFAULT_ACQUIRE_TIMEOUT_MILLIS;
private long housekeeperIntervalMillis = DEFAULT_HOUSEKEEPER_INTERVAL_MILLIS;
private long idleTimeoutMillis = DEFAULT_IDLE_TIMEOUT_MILLIS;
// Every valid pool value is >= 0, so -1 unambiguously marks "not set
// explicitly". The public pool setters are the only writers of these
// fields, so field != UNSET is exactly the "set explicitly" bit.
private static final int UNSET = -1;

private long acquireTimeoutMillis = UNSET;
private long housekeeperIntervalMillis = UNSET;
private long idleTimeoutMillis = UNSET;
private String ingestConfig;
private long maxLifetimeMillis = DEFAULT_MAX_LIFETIME_MILLIS;
private long maxLifetimeMillis = UNSET;
private String queryConfig;
private int queryPoolMax = DEFAULT_POOL_MAX;
private int queryPoolMin = DEFAULT_POOL_MIN;
private int senderPoolMax = DEFAULT_POOL_MAX;
private int senderPoolMin = DEFAULT_POOL_MIN;
private int queryPoolMax = UNSET;
private int queryPoolMin = UNSET;
private int senderPoolMax = UNSET;
private int senderPoolMin = UNSET;

QuestDBBuilder() {
}
Expand All @@ -69,7 +86,9 @@ public QuestDBBuilder acquireTimeoutMillis(long millis) {
}

/**
* Builds the {@link QuestDB} handle. Eagerly creates {@code min}
* Builds the {@link QuestDB} handle. Validates both connect strings up
* front -- so a malformed config fails here even when both pools have
* {@code min == 0} and nothing connects -- then eagerly creates {@code min}
* connections in each pool; further slots are allocated lazily up to
* {@code max} when load demands and reaped back to {@code min} when
* idle.
Expand All @@ -88,6 +107,30 @@ public QuestDB build() {
if (queryConfig == null) {
throw new IllegalStateException("query configuration is required; call fromConfig() or queryConfig()");
}
ConfigString ingestCs = ConfigString.parse(ingestConfig);
ConfigString queryCs = ConfigString.parse(queryConfig);
ConfigView ingestView = new ConfigView(ingestCs);
ConfigView queryView = new ConfigView(queryCs);
// Validate both connect strings exactly as the pools will, but without
// connecting. The ingest string runs the full Sender parse plus
// validateParameters -- ingress value keys are registry-STRING, so only
// the real parse validates their values. The egress string runs the
// typed validateConfig. A malformed config therefore fails here even
// when a pool min is 0 and nothing connects.
Sender.LineSenderBuilder.validateWsConfigString(ingestConfig);
QwpQueryClient.validateConfig(queryView, "wss".equals(queryCs.schema()));

// A view carries no side; getInt/getLong read any key, so the ingest
// and query views also serve the POOL reads.
resolvePoolInt(senderPoolMin, "sender_pool_min", ingestView, queryView, DEFAULT_POOL_MIN, this::senderPoolMin);
resolvePoolInt(senderPoolMax, "sender_pool_max", ingestView, queryView, DEFAULT_POOL_MAX, this::senderPoolMax);
resolvePoolInt(queryPoolMin, "query_pool_min", ingestView, queryView, DEFAULT_POOL_MIN, this::queryPoolMin);
resolvePoolInt(queryPoolMax, "query_pool_max", ingestView, queryView, DEFAULT_POOL_MAX, this::queryPoolMax);
resolvePoolLong(acquireTimeoutMillis, "acquire_timeout_ms", ingestView, queryView, DEFAULT_ACQUIRE_TIMEOUT_MILLIS, this::acquireTimeoutMillis);
resolvePoolLong(idleTimeoutMillis, "idle_timeout_ms", ingestView, queryView, DEFAULT_IDLE_TIMEOUT_MILLIS, this::idleTimeoutMillis);
resolvePoolLong(maxLifetimeMillis, "max_lifetime_ms", ingestView, queryView, DEFAULT_MAX_LIFETIME_MILLIS, this::maxLifetimeMillis);
resolvePoolLong(housekeeperIntervalMillis, "housekeeper_interval_ms", ingestView, queryView, DEFAULT_HOUSEKEEPER_INTERVAL_MILLIS, this::housekeeperIntervalMillis);

return new QuestDBImpl(
ingestConfig,
queryConfig,
Expand All @@ -103,42 +146,14 @@ public QuestDB build() {
}

/**
* Sets a single unified configuration string used to derive both the
* ingest and the egress config. Schema must be {@code http}, {@code https},
* {@code ws} or {@code wss}; the other half is derived by schema
* translation.
* Sets a single configuration string used for both ingest and egress. The
* schema must be {@code ws} or {@code wss}.
*/
public QuestDBBuilder fromConfig(CharSequence configurationString) {
ConfigStringTranslator.Bundle bundle = ConfigStringTranslator.deriveBothSides(configurationString);
this.ingestConfig = bundle.ingestConfig;
this.queryConfig = bundle.queryConfig;
ConfigStringTranslator.PoolConfig pc = bundle.poolConfig;
// Apply pool keys carried in the string. Explicit builder calls AFTER
// fromConfig() will overwrite these -- last write wins.
if (pc.senderPoolMin != ConfigStringTranslator.PoolConfig.UNSET) {
senderPoolMin(pc.senderPoolMin);
}
if (pc.senderPoolMax != ConfigStringTranslator.PoolConfig.UNSET) {
senderPoolMax(pc.senderPoolMax);
}
if (pc.queryPoolMin != ConfigStringTranslator.PoolConfig.UNSET) {
queryPoolMin(pc.queryPoolMin);
}
if (pc.queryPoolMax != ConfigStringTranslator.PoolConfig.UNSET) {
queryPoolMax(pc.queryPoolMax);
}
if (pc.acquireTimeoutMillis != ConfigStringTranslator.PoolConfig.UNSET) {
acquireTimeoutMillis(pc.acquireTimeoutMillis);
}
if (pc.idleTimeoutMillis != ConfigStringTranslator.PoolConfig.UNSET) {
idleTimeoutMillis(pc.idleTimeoutMillis);
}
if (pc.maxLifetimeMillis != ConfigStringTranslator.PoolConfig.UNSET) {
maxLifetimeMillis(pc.maxLifetimeMillis);
}
if (pc.housekeeperIntervalMillis != ConfigStringTranslator.PoolConfig.UNSET) {
housekeeperIntervalMillis(pc.housekeeperIntervalMillis);
}
requireWebSocketSchema(configurationString, "connection");
String s = configurationString.toString();
this.ingestConfig = s;
this.queryConfig = s;
return this;
}

Expand Down Expand Up @@ -169,9 +184,11 @@ public QuestDBBuilder idleTimeoutMillis(long millis) {
}

/**
* Sets the ingest-side configuration in {@link Sender#fromConfig} format.
* Sets the ingest-side configuration. The schema must be {@code ws} or
* {@code wss}.
*/
public QuestDBBuilder ingestConfig(CharSequence configurationString) {
requireWebSocketSchema(configurationString, "ingest");
this.ingestConfig = configurationString.toString();
return this;
}
Expand All @@ -190,11 +207,11 @@ public QuestDBBuilder maxLifetimeMillis(long millis) {
}

/**
* Sets the query-side configuration in
* {@link io.questdb.client.cutlass.qwp.client.QwpQueryClient#fromConfig}
* format.
* Sets the query-side configuration. The schema must be {@code ws} or
* {@code wss}.
*/
public QuestDBBuilder queryConfig(CharSequence configurationString) {
requireWebSocketSchema(configurationString, "query");
this.queryConfig = configurationString.toString();
return this;
}
Expand Down Expand Up @@ -272,4 +289,81 @@ public QuestDBBuilder senderPoolSize(int size) {
this.senderPoolMax = size;
return this;
}

/**
* Snapshot of the resolved pool config, keyed by connect-string key name.
* Valid after {@link #build()} has run pool-key resolution. Drives the
* per-key "honored" guard test.
*/
@TestOnly
public java.util.Map<String, Object> poolConfigSnapshotForTest() {
java.util.Map<String, Object> m = new java.util.HashMap<>();
m.put("sender_pool_min", senderPoolMin);
m.put("sender_pool_max", senderPoolMax);
m.put("query_pool_min", queryPoolMin);
m.put("query_pool_max", queryPoolMax);
m.put("acquire_timeout_ms", acquireTimeoutMillis);
m.put("idle_timeout_ms", idleTimeoutMillis);
m.put("max_lifetime_ms", maxLifetimeMillis);
m.put("housekeeper_interval_ms", housekeeperIntervalMillis);
return m;
}

private static void requireWebSocketSchema(CharSequence config, String role) {
String schema = ConfigString.parse(config).schema();
if (!"ws".equals(schema) && !"wss".equals(schema)) {
throw new IllegalArgumentException(
role + " configuration must use the ws or wss schema; got: " + schema);
}
}

private void resolvePoolInt(int current, String key, ConfigView ingest, ConfigView query, int dflt, IntConsumer setter) {
if (current != UNSET) {
return; // explicit builder call wins; skip the conflict check
}
boolean inIngest = ingest.has(key);
boolean inQuery = query.has(key);
int value;
if (inIngest && inQuery) {
int vi = ingest.getInt(key, UNSET);
int vq = query.getInt(key, UNSET);
if (vi != vq) {
throw new IllegalArgumentException(
"conflicting pool config: " + key + " (ingest=" + vi + ", query=" + vq + ")");
}
value = vi;
} else if (inIngest) {
value = ingest.getInt(key, UNSET);
} else if (inQuery) {
value = query.getInt(key, UNSET);
} else {
value = dflt;
}
setter.accept(value);
}

private void resolvePoolLong(long current, String key, ConfigView ingest, ConfigView query, long dflt, LongConsumer setter) {
if (current != UNSET) {
return; // explicit builder call wins; skip the conflict check
}
boolean inIngest = ingest.has(key);
boolean inQuery = query.has(key);
long value;
if (inIngest && inQuery) {
long vi = ingest.getLong(key, UNSET);
long vq = query.getLong(key, UNSET);
if (vi != vq) {
throw new IllegalArgumentException(
"conflicting pool config: " + key + " (ingest=" + vi + ", query=" + vq + ")");
}
value = vi;
} else if (inIngest) {
value = ingest.getLong(key, UNSET);
} else if (inQuery) {
value = query.getLong(key, UNSET);
} else {
value = dflt;
}
setter.accept(value);
}
}
Loading
Loading