Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.internal.common.PluginUtils;
import io.temporal.internal.sync.StubMarker;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
Expand Down Expand Up @@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient
private final Scope metricsScope;
private final WorkflowClientInterceptor[] interceptors;
private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
private final String workerGroupingKey = java.util.UUID.randomUUID().toString();
private final @Nullable HeartbeatManager heartbeatManager;

/**
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
Expand Down Expand Up @@ -112,6 +115,14 @@ public static WorkflowClient newInstance(
options.getNamespace(),
options.getIdentity(),
options.getDataConverter());

java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval();
if (!heartbeatInterval.isNegative()) {
this.heartbeatManager =
new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval);
} else {
this.heartbeatManager = null;
}
}

private WorkflowClientCallsInterceptor initializeClientInvoker() {
Expand Down Expand Up @@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) {
workerFactoryRegistry.deregister(workerFactory);
}

@Override
public String getWorkerGroupingKey() {
return workerGroupingKey;
}

@Override
@Nullable
public HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}

@Override
public NexusStartWorkflowResponse startNexus(
NexusStartWorkflowRequest request, Functions.Proc workflow) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.temporal.client;

import com.google.common.base.Preconditions;
import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -49,6 +51,7 @@ public static final class Builder {
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private WorkflowClientPlugin[] plugins;
private Duration workerHeartbeatInterval;

private Builder() {}

Expand All @@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) {
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins;
workerHeartbeatInterval = options.workerHeartbeatInterval;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) {
return this;
}

/**
* Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to
* zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must
* be between 1 and 60 seconds inclusive.
*
* @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable
*/
@Experimental
public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) {
this.workerHeartbeatInterval = workerHeartbeatInterval;
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -162,7 +179,8 @@ public WorkflowClientOptions build() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

/**
Expand All @@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

private static Duration resolveHeartbeatInterval(Duration raw) {
if (raw == null || raw.isZero()) {
return Duration.ofSeconds(60);
}
if (raw.isNegative()) {
return raw;
}
Preconditions.checkArgument(
raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0,
"workerHeartbeatInterval must be between 1s and 60s, got %s",
raw);
return raw;
}
}

Expand All @@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final WorkflowClientPlugin[] plugins;

private final Duration workerHeartbeatInterval;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
Expand All @@ -223,7 +258,8 @@ private WorkflowClientOptions(
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
WorkflowClientPlugin[] plugins,
Duration workerHeartbeatInterval) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
Expand All @@ -232,6 +268,7 @@ private WorkflowClientOptions(
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
this.workerHeartbeatInterval = workerHeartbeatInterval;
}

/**
Expand Down Expand Up @@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() {
return plugins;
}

/**
* Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative
* duration means heartbeating is explicitly disabled.
*/
@Experimental
public Duration getWorkerHeartbeatInterval() {
return workerHeartbeatInterval;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -311,6 +357,8 @@ public String toString() {
+ queryRejectCondition
+ ", plugins="
+ Arrays.toString(plugins)
+ ", workerHeartbeatInterval="
+ workerHeartbeatInterval
+ '}';
}

Expand All @@ -326,7 +374,9 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition
&& Arrays.equals(plugins, that.plugins);
&& Arrays.equals(plugins, that.plugins)
&& com.google.common.base.Objects.equal(
workerHeartbeatInterval, that.workerHeartbeatInterval);
}

@Override
Expand All @@ -339,6 +389,7 @@ public int hashCode() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
Arrays.hashCode(plugins));
Arrays.hashCode(plugins),
workerHeartbeatInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.temporal.internal.client;

import io.temporal.client.WorkflowClient;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Functions;
import javax.annotation.Nullable;

/**
* From OOP point of view, there is no reason for this interface not to extend {@link
Expand All @@ -18,4 +20,9 @@ public interface WorkflowClientInternal {
void deregisterWorkerFactory(WorkerFactory workerFactory);

NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);

String getWorkerGroupingKey();

@Nullable
HeartbeatManager getHeartbeatManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.temporal.worker.PollerTypeMetricsTag;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
Expand All @@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask<ActivityTas
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final Scope metricsScope;
private final PollActivityTaskQueueRequest pollRequest;
private final AtomicInteger pollGauge = new AtomicInteger();
private final PollerTracker pollerTracker;

@SuppressWarnings("deprecation")
public ActivityPollTask(
Expand All @@ -42,10 +41,12 @@ public ActivityPollTask(
double activitiesPerSecond,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
@Nonnull PollerTracker pollerTracker) {
this.service = Objects.requireNonNull(service);
this.slotSupplier = slotSupplier;
this.metricsScope = Objects.requireNonNull(metricsScope);
this.pollerTracker = Objects.requireNonNull(pollerTracker);

PollActivityTaskQueueRequest.Builder pollRequest =
PollActivityTaskQueueRequest.newBuilder()
Expand Down Expand Up @@ -100,7 +101,7 @@ public ActivityTask poll() {

MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.incrementAndGet());
.update(pollerTracker.pollStarted());

try {
response =
Expand All @@ -119,14 +120,15 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
pollerTracker.pollSucceeded();
return new ActivityTask(
response,
permit,
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
} finally {
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.decrementAndGet());
.update(pollerTracker.pollCompleted());

if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +50,9 @@ final class ActivityWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
private final AtomicInteger totalFailedTasks = new AtomicInteger();
private final PollerTracker pollerTracker;
private final AtomicBoolean serverSupportsAutoscaling;

public ActivityWorker(
Expand All @@ -75,6 +79,7 @@ public ActivityWorker(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.pollerTracker = new PollerTracker();
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
}

Expand Down Expand Up @@ -107,7 +112,8 @@ public boolean start() {
taskQueueActivitiesPerSecond,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
service.getServerCapabilities(),
pollerTracker),
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
Expand All @@ -126,7 +132,8 @@ public boolean start() {
taskQueueActivitiesPerSecond,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
service.getServerCapabilities(),
pollerTracker),
this.pollTaskExecutor,
pollerOptions,
workerMetricsScope);
Expand Down Expand Up @@ -216,6 +223,26 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) {
return pollerOptions;
}

public TrackingSlotSupplier<ActivitySlotInfo> getSlotSupplier() {
return slotSupplier;
}

public AtomicInteger getTotalProcessedTasks() {
return totalProcessedTasks;
}

public AtomicInteger getTotalFailedTasks() {
return totalFailedTasks;
}

public PollerOptions getPollerOptions() {
return pollerOptions;
}

public PollerTracker getPollerTracker() {
return pollerTracker;
}

@Override
public String toString() {
return String.format(
Expand Down Expand Up @@ -261,6 +288,15 @@ public void handle(ActivityTask task) throws Exception {
ActivityTaskHandler.Result result = null;
try {
result = handleActivity(task, metricsScope);
totalProcessedTasks.incrementAndGet();
if (result.getTaskFailed() != null
&& !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure(
result.getTaskFailed().getFailure())) {
totalFailedTasks.incrementAndGet();
}
} catch (Exception e) {
totalFailedTasks.incrementAndGet();
throw e;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed tasks counted without incrementing total processed

Medium Severity

In ActivityWorker, NexusWorker, and LocalActivityWorker, when a task handler throws an exception, totalFailedTasks is incremented in the catch block but totalProcessedTasks is not (it's only incremented on the success path). This allows totalFailedTasks to exceed totalProcessedTasks, which is semantically incorrect. WorkflowWorker correctly handles this by always incrementing totalProcessedTasks in the finally block.

Additional Locations (2)
Fix in Cursor Fix in Web

} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
MDC.remove(LoggerTag.ACTIVITY_TYPE);
Expand Down
Loading
Loading