From a37cb50b83bb2c12bb0fc1a86780587715daaa4c Mon Sep 17 00:00:00 2001 From: agrawal-siddharth Date: Mon, 1 Jun 2026 22:03:06 +0000 Subject: [PATCH] feat: add timestamps to MaximumRequestCallbackWaitTimeExceededException --- .../bigquery/storage/v1/ConnectionWorker.java | 28 ++++++++-- .../cloud/bigquery/storage/v1/Exceptions.java | 55 ++++++++++++++++++- .../bigquery/storage/v1/StreamWriterTest.java | 17 ++++++ 3 files changed, 94 insertions(+), 6 deletions(-) diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 154361baa21f..215176e7b46f 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -928,6 +928,7 @@ private ApiFuture appendInternal( requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId); ++this.inflightRequests; this.inflightBytes += requestWrapper.messageSize; + requestWrapper.placedInWaitingQueueTime = Instant.now(); waitingRequestQueue.addLast(requestWrapper); healthCheckMetrics.updateWindowedQueuedRequestsMax( waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get()); @@ -1151,10 +1152,11 @@ private void appendLoop() { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); // Check whether we should error out the current append loop. if (inflightRequestQueue.size() > 0) { - Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; + AppendRequestAndResponse firstRequest = inflightRequestQueue.getFirst(); + Instant sendInstant = firstRequest.requestSendTimeStamp; if (sendInstant != null) { healthCheckMetrics.updateResponseWait(sendInstant); - throwIfWaitCallbackTooLong(sendInstant); + throwIfWaitCallbackTooLong(firstRequest); } } healthCheckMetrics.periodicHealthCheck(); @@ -1187,6 +1189,7 @@ private void appendLoop() { requestProfilerHook.endOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); waitForBackoffIfNecessary(requestWrapper); + requestWrapper.placedInInflightQueueTime = Instant.now(); this.inflightRequestQueue.add(requestWrapper); localQueue.addLast(requestWrapper); healthCheckMetrics.updateRequestsSent(requestWrapper.messageSize); @@ -1339,11 +1342,21 @@ private void cleanupConnectionAndRequests(boolean avoidBlocking) { log.info("Append thread is done. Stream: " + streamName + " id: " + writerId); } - private void throwIfWaitCallbackTooLong(Instant timeToCheck) { + private void throwIfWaitCallbackTooLong(AppendRequestAndResponse requestWrapper) { + Instant timeToCheck = requestWrapper.requestSendTimeStamp; + if (timeToCheck == null) { + return; + } Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now()); if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) { throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException( - milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME); + milliSinceLastCallback, + writerId, + MAXIMUM_REQUEST_CALLBACK_WAIT_TIME, + requestWrapper.requestReceivedTime, + requestWrapper.placedInWaitingQueueTime, + requestWrapper.placedInInflightQueueTime, + requestWrapper.dispatchTimes); } } @@ -1824,6 +1837,11 @@ static final class AppendRequestAndResponse { // If a response is no longer expected this is set back to null. Instant requestSendTimeStamp; + final Instant requestReceivedTime; + Instant placedInWaitingQueueTime; + Instant placedInInflightQueueTime; + final List dispatchTimes = new ArrayList<>(); + AppendRequestAndResponse( AppendRowsRequest message, StreamWriter streamWriter, @@ -1852,10 +1870,12 @@ static final class AppendRequestAndResponse { this.retryAlgorithm = null; } this.recordBatchRowCount = recordBatchRowCount; + this.requestReceivedTime = Instant.now(); } void setRequestSendQueueTime() { requestSendTimeStamp = Instant.now(); + dispatchTimes.add(requestSendTimeStamp); } } diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index ed36c86dca01..45b7ab5708d2 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1; import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; @@ -23,6 +24,8 @@ import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import java.time.Duration; +import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -428,17 +431,46 @@ public static class MaximumRequestCallbackWaitTimeExceededException extends Runt private final Duration callbackWaitTime; private final String writerId; private final Duration callbackWaitTimeLimit; + private final Instant requestReceivedTime; + private final Instant placedInWaitingQueueTime; + private final Instant placedInInflightQueueTime; + private final ImmutableList dispatchTimes; + @Deprecated public MaximumRequestCallbackWaitTimeExceededException( Duration callbackWaitTime, String writerId, Duration callbackWaitTimeLimit) { + this(callbackWaitTime, writerId, callbackWaitTimeLimit, null, null, null, ImmutableList.of()); + } + + public MaximumRequestCallbackWaitTimeExceededException( + Duration callbackWaitTime, + String writerId, + Duration callbackWaitTimeLimit, + @Nullable Instant requestReceivedTime, + @Nullable Instant placedInWaitingQueueTime, + @Nullable Instant placedInInflightQueueTime, + @Nullable List dispatchTimes) { super( String.format( "Request has waited in inflight queue for %sms for writer %s, " - + "which is over maximum wait time %s", - callbackWaitTime, writerId, callbackWaitTimeLimit.toString())); + + "which is over maximum wait time %s. " + + "requestReceivedTime: %s, placedInWaitingQueueTime: %s, " + + "placedInInflightQueueTime: %s, dispatchTimes: %s", + callbackWaitTime, + writerId, + callbackWaitTimeLimit.toString(), + requestReceivedTime, + placedInWaitingQueueTime, + placedInInflightQueueTime, + dispatchTimes)); this.callbackWaitTime = callbackWaitTime; this.writerId = writerId; this.callbackWaitTimeLimit = callbackWaitTimeLimit; + this.requestReceivedTime = requestReceivedTime; + this.placedInWaitingQueueTime = placedInWaitingQueueTime; + this.placedInInflightQueueTime = placedInInflightQueueTime; + this.dispatchTimes = + dispatchTimes == null ? ImmutableList.of() : ImmutableList.copyOf(dispatchTimes); } public Duration getCallbackWaitTime() { @@ -452,6 +484,25 @@ public String getWriterId() { public Duration getCallbackWaitTimeLimit() { return callbackWaitTimeLimit; } + + @Nullable + public Instant getRequestReceivedTime() { + return requestReceivedTime; + } + + @Nullable + public Instant getPlacedInWaitingQueueTime() { + return placedInWaitingQueueTime; + } + + @Nullable + public Instant getPlacedInInflightQueueTime() { + return placedInInflightQueueTime; + } + + public ImmutableList getDispatchTimes() { + return dispatchTimes; + } } private Exceptions() {} diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 70912102ab80..194b55805d4b 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1440,8 +1440,25 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); if (i == 0) { assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + assertThat(ex.getCause()).hasMessageThat().contains("requestReceivedTime:"); + assertThat(ex.getCause()).hasMessageThat().contains("placedInWaitingQueueTime:"); + assertThat(ex.getCause()).hasMessageThat().contains("placedInInflightQueueTime:"); + assertThat(ex.getCause()).hasMessageThat().contains("dispatchTimes:"); assertThat(ex.getCause()) .isInstanceOf(Exceptions.MaximumRequestCallbackWaitTimeExceededException.class); + Exceptions.MaximumRequestCallbackWaitTimeExceededException mace = + (Exceptions.MaximumRequestCallbackWaitTimeExceededException) ex.getCause(); + assertThat(mace.getRequestReceivedTime()).isNotNull(); + assertThat(mace.getPlacedInWaitingQueueTime()).isNotNull(); + assertThat(mace.getPlacedInInflightQueueTime()).isNotNull(); + assertThat(mace.getDispatchTimes()).isNotNull(); + assertThat(mace.getDispatchTimes()).isNotEmpty(); + assertThat(mace.getRequestReceivedTime().isAfter(mace.getPlacedInWaitingQueueTime())) + .isFalse(); + assertThat(mace.getPlacedInWaitingQueueTime().isAfter(mace.getPlacedInInflightQueueTime())) + .isFalse(); + assertThat(mace.getPlacedInInflightQueueTime().isAfter(mace.getDispatchTimes().get(0))) + .isFalse(); } else { assertThat(ex.getCause()) .hasMessageThat()