-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Open
Labels
api: bigquerystorageIssues related to the BigQuery Storage API.Issues related to the BigQuery Storage API.priority: p3Desirable enhancement or fix. May not be included in next release.Desirable enhancement or fix. May not be included in next release.
Description
Environment details
- Specify the API at the beginning of the title. For example, "BigQuery: ...").
General, Core, and Other are also allowed as types - OS type and version: GKE 1.26.14-gke.1044000 (x86-64) running amazoncorretto:11-al2023-headless
- Java version: 11
- version(s): 26.17.0 (com.google.cloud:libraries-bom:26.17.0)
Steps to reproduce
- Create a StreamWrite with
.setEnableConnectionPool(true). - Write to a BigQuery table that fails, e.g. because it doesn't exist, or the schema is wrong. (why? because some times the schema is out of sync. Nevertheless, such a condition should not cause a memory leak). Keep sending traffic to (2) persistently.
- Observe that
com.google.cloud.bigquery.storage.v1.ConnectionWorkerPoolis continually creating new ConnectionWorkers (expected), but old andisConnectionInUnrecoverableState()ConnectionWorkers are retained in fieldconnectionToWriteStream(bug), and nevercloseed (bug)
Code example
Problem is caused by https://github.com/googleapis/java-bigquerystorage/blob/d8d5278ca54317a599e7c8b7c661eedd075f6a74/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java#L338: the cleanup operation neither removes the ConnectionWorker from the other field that tracks the ConnectionWorker object, connectionToWriteStream, nor closes it.
Stack trace
None captured
External references such as API reference guides
Any additional information below
The following is the diff between 26.17.0 and the current main branch version:
@@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
@@ -41,6 +42,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/** Pool of connections to accept appends and distirbute to different connections. */
@@ -65,6 +67,11 @@
private final java.time.Duration maxRetryDuration;
/*
+ * Retry settings for in-stream retries.
+ */
+ private RetrySettings retrySettings;
+
+ /*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;
@@ -91,6 +98,10 @@
* TraceId for debugging purpose.
*/
private final String traceId;
+ /*
+ * Sets the compression to use for the calls
+ */
+ private String compressorName;
/** Used for test on the number of times createWorker is called. */
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
@@ -199,14 +210,18 @@
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
- BigQueryWriteSettings clientSettings) {
+ @Nullable String comperssorName,
+ BigQueryWriteSettings clientSettings,
+ RetrySettings retrySettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
+ this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
+ this.retrySettings = retrySettings;
}
/**
@@ -379,7 +394,9 @@
maxRetryDuration,
limitExceededBehavior,
traceId,
- clientSettings);
+ compressorName,
+ clientSettings,
+ retrySettings);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
I don't think this issue is fixed by upgrading.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: bigquerystorageIssues related to the BigQuery Storage API.Issues related to the BigQuery Storage API.priority: p3Desirable enhancement or fix. May not be included in next release.Desirable enhancement or fix. May not be included in next release.