From 1b98b92550b641b36743a30e0d692f62d1196f62 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:16:56 -0700 Subject: [PATCH 1/5] Prevent task loss on shutdown when server is capable --- .../internal/worker/ActivityPollTask.java | 2 + .../internal/worker/ActivityWorker.java | 14 +- .../worker/AsyncActivityPollTask.java | 2 + .../internal/worker/AsyncNexusPollTask.java | 3 + .../temporal/internal/worker/AsyncPoller.java | 26 ++-- .../worker/AsyncWorkflowPollTask.java | 3 + .../temporal/internal/worker/BasePoller.java | 33 +++-- .../internal/worker/MultiThreadedPoller.java | 5 +- .../worker/NamespaceCapabilities.java | 29 ++++ .../internal/worker/NexusPollTask.java | 2 + .../temporal/internal/worker/NexusWorker.java | 14 +- .../internal/worker/SingleWorkerOptions.java | 19 ++- .../internal/worker/SyncActivityWorker.java | 5 +- .../internal/worker/SyncNexusWorker.java | 5 +- .../internal/worker/SyncWorkflowWorker.java | 5 +- .../internal/worker/WorkflowPollTask.java | 2 + .../internal/worker/WorkflowWorker.java | 68 ++++------ .../main/java/io/temporal/worker/Worker.java | 128 +++++++++++++----- .../io/temporal/worker/WorkerFactory.java | 16 ++- .../internal/worker/AsyncPollerTest.java | 2 +- .../internal/worker/SlotSupplierTest.java | 2 + .../worker/StickyQueueBacklogTest.java | 1 + .../internal/worker/WorkflowWorkerTest.java | 7 +- 23 files changed, 260 insertions(+), 133 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index 7072d163ee..80dc113e96 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -38,6 +38,7 @@ public ActivityPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @@ -52,6 +53,7 @@ public ActivityPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (activitiesPerSecond > 0) { pollRequest.setTaskQueueMetadata( TaskQueueMetadata.newBuilder() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 3387ee7607..d4c2c18459 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +48,7 @@ final class ActivityWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private final NamespaceCapabilities namespaceCapabilities; public ActivityWorker( @Nonnull WorkflowServiceStubs service, @@ -59,7 +58,7 @@ public ActivityWorker( @Nonnull SingleWorkerOptions options, @Nonnull ActivityTaskHandler handler, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -75,7 +74,7 @@ public ActivityWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.namespaceCapabilities = namespaceCapabilities; } @Override @@ -103,6 +102,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), taskQueueActivitiesPerSecond, this.slotSupplier, @@ -110,7 +110,7 @@ public boolean start() { service.getServerCapabilities()), this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities, workerMetricsScope); } else { @@ -122,6 +122,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), taskQueueActivitiesPerSecond, this.slotSupplier, @@ -129,7 +130,8 @@ public boolean start() { service.getServerCapabilities()), this.pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java index 7f5573e243..0d376e34bb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java @@ -43,6 +43,7 @@ public AsyncActivityPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @@ -57,6 +58,7 @@ public AsyncActivityPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (activitiesPerSecond > 0) { pollRequest.setTaskQueueMetadata( TaskQueueMetadata.newBuilder() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java index 10be3b588b..3333322066 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java @@ -42,6 +42,7 @@ public AsyncNexusPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull Scope metricsScope, @Nonnull Supplier serverCapabilities, @@ -56,6 +57,8 @@ public AsyncNexusPollTask( .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); + if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequest.setDeploymentOptions( WorkerVersioningProtoUtils.deploymentOptionsToProto( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java index 5106343792..7859484bbe 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java @@ -29,7 +29,6 @@ final class AsyncPoller extends BasePoller { private final List> asyncTaskPollers; private final PollerOptions pollerOptions; private final PollerBehaviorAutoscaling pollerBehavior; - private final boolean serverSupportsAutoscaling; private final Scope workerMetricsScope; private Throttler pollRateThrottler; private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = @@ -43,7 +42,7 @@ final class AsyncPoller extends BasePoller { PollTaskAsync asyncTaskPoller, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - boolean serverSupportsAutoscaling, + NamespaceCapabilities namespaceCapabilities, Scope workerMetricsScope) { this( slotSupplier, @@ -51,7 +50,7 @@ final class AsyncPoller extends BasePoller { Collections.singletonList(asyncTaskPoller), taskExecutor, pollerOptions, - serverSupportsAutoscaling, + namespaceCapabilities, workerMetricsScope); } @@ -61,9 +60,9 @@ final class AsyncPoller extends BasePoller { List> asyncTaskPollers, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - boolean serverSupportsAutoscaling, + NamespaceCapabilities namespaceCapabilities, Scope workerMetricsScope) { - super(taskExecutor); + super(taskExecutor, namespaceCapabilities); Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null"); Objects.requireNonNull(slotReservationData, "slotReservation data should not be null"); Objects.requireNonNull(asyncTaskPollers, "asyncTaskPollers should not be null"); @@ -82,7 +81,6 @@ final class AsyncPoller extends BasePoller { + " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported."); } this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior(); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; this.pollerOptions = pollerOptions; this.workerMetricsScope = workerMetricsScope; } @@ -114,7 +112,7 @@ public boolean start() { pollerBehavior.getMinConcurrentTaskPollers(), pollerBehavior.getMaxConcurrentTaskPollers(), pollerBehavior.getInitialConcurrentTaskPollers(), - serverSupportsAutoscaling, + namespaceCapabilities.isPollerAutoscaling(), (newTarget) -> { log.debug( "Updating maximum number of pollers for {} to: {}", @@ -136,12 +134,14 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return super.shutdown(shutdownManager, interruptTasks) .thenApply( (f) -> { - for (PollTaskAsync asyncTaskPoller : asyncTaskPollers) { - try { - log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel()); - asyncTaskPoller.cancel(new RuntimeException("Shutting down poller")); - } catch (Throwable e) { - log.error("Error while cancelling poll task", e); + if (!namespaceCapabilities.isGracefulPollShutdown()) { + for (PollTaskAsync asyncTaskPoller : asyncTaskPollers) { + try { + log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel()); + asyncTaskPoller.cancel(new RuntimeException("Shutting down poller")); + } catch (Throwable e) { + log.error("Error while cancelling poll task", e); + } } } return null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java index 4a7f793cc0..b20143f990 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java @@ -53,6 +53,7 @@ public AsyncWorkflowPollTask( @Nonnull String taskQueue, @Nullable String stickyTaskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, @@ -66,6 +67,8 @@ public AsyncWorkflowPollTask( .setNamespace(Objects.requireNonNull(namespace)) .setIdentity(Objects.requireNonNull(identity)); + pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey); + if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequestBuilder.setDeploymentOptions( WorkerVersioningProtoUtils.deploymentOptionsToProto( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java index 9b8141fc02..9e685bbcae 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java @@ -27,9 +27,14 @@ abstract class BasePoller implements SuspendableWorker { protected ExecutorService pollExecutor; - protected BasePoller(ShutdownableTaskExecutor taskExecutor) { + protected final NamespaceCapabilities namespaceCapabilities; + + protected BasePoller( + ShutdownableTaskExecutor taskExecutor, NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(taskExecutor, "taskExecutor should not be null"); this.taskExecutor = taskExecutor; + this.namespaceCapabilities = + Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); } @Override @@ -55,15 +60,23 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return CompletableFuture.completedFuture(null); } - return shutdownManager - // it's ok to forcefully shutdown pollers, because they are stuck in a long poll call - // so we don't risk loosing any progress doing that. - .shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1)) - .exceptionally( - e -> { - log.error("Unexpected exception during shutdown", e); - return null; - }); + CompletableFuture pollExecutorShutdown; + if (namespaceCapabilities.isGracefulPollShutdown()) { + // When graceful poll shutdown is enabled, the server will complete outstanding polls with + // empty responses after ShutdownWorker is called. We simply wait for polls to return. + pollExecutorShutdown = + shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor"); + } else { + // Old behaviour forcibly stops outstanding polls. + pollExecutorShutdown = + shutdownManager.shutdownExecutorNow( + pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1)); + } + return pollExecutorShutdown.exceptionally( + e -> { + log.error("Unexpected exception during shutdown", e); + return null; + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java index 8dcaa6f33a..7fe0335b15 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java @@ -52,8 +52,9 @@ public MultiThreadedPoller( PollTask pollTask, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - Scope workerMetricsScope) { - super(taskExecutor); + Scope workerMetricsScope, + NamespaceCapabilities namespaceCapabilities) { + super(taskExecutor, namespaceCapabilities); Objects.requireNonNull(identity, "identity cannot be null"); Objects.requireNonNull(pollTask, "poll service should not be null"); Objects.requireNonNull(pollerOptions, "pollerOptions should not be null"); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java new file mode 100644 index 0000000000..5212bc9eb7 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java @@ -0,0 +1,29 @@ +package io.temporal.internal.worker; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Holds namespace-level capabilities discovered from the server's DescribeNamespace response. A + * single instance is shared across all workers in a WorkerFactory and is populated at startup. Uses + * AtomicBooleans so capabilities can be set after construction. + */ +public final class NamespaceCapabilities { + private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false); + + public boolean isPollerAutoscaling() { + return pollerAutoscaling.get(); + } + + public void setPollerAutoscaling(boolean value) { + pollerAutoscaling.set(value); + } + + public boolean isGracefulPollShutdown() { + return gracefulPollShutdown.get(); + } + + public void setGracefulPollShutdown(boolean value) { + gracefulPollShutdown.set(value); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java index 13e88690e6..25e3178f88 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java @@ -35,6 +35,7 @@ public NexusPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, @@ -48,6 +49,7 @@ public NexusPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequest.setDeploymentOptions( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index c1b999cdbb..f1ea2363f0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +52,7 @@ final class NexusWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private final NamespaceCapabilities namespaceCapabilities; private final boolean forceOldFailureFormat; public NexusWorker( @@ -64,7 +63,7 @@ public NexusWorker( @Nonnull NexusTaskHandler handler, @Nonnull DataConverter dataConverter, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -80,7 +79,7 @@ public NexusWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.namespaceCapabilities = namespaceCapabilities; // Allow tests to force old format for backward compatibility testing String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat"); this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat); @@ -110,13 +109,14 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), workerMetricsScope, service.getServerCapabilities(), this.slotSupplier), this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities, workerMetricsScope); } else { poller = @@ -127,13 +127,15 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), this.slotSupplier, workerMetricsScope, service.getServerCapabilities()), this.pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java index f8baba01db..5593707720 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java @@ -40,6 +40,7 @@ public static final class Builder { private Duration drainStickyTaskQueueTimeout; private boolean usingVirtualThreads; private WorkerDeploymentOptions deploymentOptions; + private String workerInstanceKey; private Builder() {} @@ -64,6 +65,7 @@ private Builder(SingleWorkerOptions options) { this.drainStickyTaskQueueTimeout = options.getDrainStickyTaskQueueTimeout(); this.usingVirtualThreads = options.isUsingVirtualThreads(); this.deploymentOptions = options.getDeploymentOptions(); + this.workerInstanceKey = options.getWorkerInstanceKey(); } public Builder setIdentity(String identity) { @@ -155,6 +157,11 @@ public Builder setDeploymentOptions(WorkerDeploymentOptions deploymentOptions) { return this; } + public Builder setWorkerInstanceKey(String workerInstanceKey) { + this.workerInstanceKey = workerInstanceKey; + return this; + } + public SingleWorkerOptions build() { PollerOptions pollerOptions = this.pollerOptions; if (pollerOptions == null) { @@ -193,7 +200,8 @@ public SingleWorkerOptions build() { this.defaultHeartbeatThrottleInterval, drainStickyTaskQueueTimeout, usingVirtualThreads, - this.deploymentOptions); + this.deploymentOptions, + this.workerInstanceKey); } } @@ -214,6 +222,7 @@ public SingleWorkerOptions build() { private final Duration drainStickyTaskQueueTimeout; private final boolean usingVirtualThreads; private final WorkerDeploymentOptions deploymentOptions; + private final String workerInstanceKey; private SingleWorkerOptions( String identity, @@ -232,7 +241,8 @@ private SingleWorkerOptions( Duration defaultHeartbeatThrottleInterval, Duration drainStickyTaskQueueTimeout, boolean usingVirtualThreads, - WorkerDeploymentOptions deploymentOptions) { + WorkerDeploymentOptions deploymentOptions, + String workerInstanceKey) { this.identity = identity; this.binaryChecksum = binaryChecksum; this.buildId = buildId; @@ -250,6 +260,7 @@ private SingleWorkerOptions( this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout; this.usingVirtualThreads = usingVirtualThreads; this.deploymentOptions = deploymentOptions; + this.workerInstanceKey = workerInstanceKey; } public String getIdentity() { @@ -331,6 +342,10 @@ public WorkerDeploymentOptions getDeploymentOptions() { return deploymentOptions; } + public String getWorkerInstanceKey() { + return workerInstanceKey; + } + public WorkerVersioningOptions getWorkerVersioningOptions() { return new WorkerVersioningOptions( this.getBuildId(), this.isUsingBuildIdForVersioning(), this.getDeploymentOptions()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index 87ddc7c4a0..bb0b72e361 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -11,7 +11,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,7 @@ public SyncActivityWorker( double taskQueueActivitiesPerSecond, SingleWorkerOptions options, SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -76,7 +75,7 @@ public SyncActivityWorker( options, taskHandler, slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } public void registerActivityImplementations(Object... activitiesImplementation) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index 23471f86e3..a749697e34 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -6,7 +6,6 @@ import io.temporal.worker.tuning.SlotSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +25,7 @@ public SyncNexusWorker( String taskQueue, SingleWorkerOptions options, SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -47,7 +46,7 @@ public SyncNexusWorker( taskHandler, options.getDataConverter(), slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index 2b94effb6e..a4e16aa25a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -25,7 +25,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -70,7 +69,7 @@ public SyncWorkflowWorker( @Nonnull EagerActivityDispatcher eagerActivityDispatcher, @Nonnull SlotSupplier slotSupplier, @Nonnull SlotSupplier laSlotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = singleWorkerOptions.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -125,7 +124,7 @@ public SyncWorkflowWorker( taskHandler, eagerActivityDispatcher, slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); // Exists to support Worker#replayWorkflowExecution functionality. // This handler has to be non-sticky to avoid evicting actual executions from the cache diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index fa5e3cc796..11c120c544 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -48,6 +48,7 @@ public WorkflowPollTask( @Nonnull String taskQueue, @Nullable String stickyTaskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull StickyQueueBalancer stickyQueueBalancer, @@ -70,6 +71,7 @@ public WorkflowPollTask( PollWorkflowTaskQueueRequest.newBuilder() .setNamespace(Objects.requireNonNull(namespace)) .setIdentity(Objects.requireNonNull(identity)); + pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey); if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequestBuilder.setDeploymentOptions( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 4986808c49..26d6387739 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -38,7 +37,6 @@ import org.slf4j.MDC; final class WorkflowWorker implements SuspendableWorker { - private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown"; private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class); private final WorkflowRunLockManager runLocks; @@ -55,7 +53,7 @@ final class WorkflowWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final EagerActivityDispatcher eagerActivityDispatcher; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private final NamespaceCapabilities namespaceCapabilities; private PollTaskExecutor pollTaskExecutor; @@ -76,7 +74,7 @@ public WorkflowWorker( @Nonnull WorkflowTaskHandler handler, @Nonnull EagerActivityDispatcher eagerActivityDispatcher, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -91,7 +89,7 @@ public WorkflowWorker( this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities()); this.eagerActivityDispatcher = eagerActivityDispatcher; this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.namespaceCapabilities = namespaceCapabilities; } @Override @@ -119,6 +117,7 @@ public boolean start() { taskQueue, null, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -131,6 +130,7 @@ public boolean start() { taskQueue, stickyTaskQueueName, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -146,6 +146,7 @@ public boolean start() { taskQueue, null, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -158,7 +159,7 @@ public boolean start() { pollers, this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities, workerMetricsScope); } else { PollerBehaviorSimpleMaximum pollerBehavior = @@ -176,6 +177,7 @@ public boolean start() { taskQueue, stickyTaskQueueName, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, stickyQueueBalancer, @@ -183,7 +185,8 @@ public boolean start() { service.getServerCapabilities()), pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); @@ -213,40 +216,23 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout()) : CompletableFuture.completedFuture(null)) .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks)); - return CompletableFuture.allOf( - pollerShutdown.thenCompose( - ignore -> { - if (!interruptTasks && stickyTaskQueueName != null) { - return shutdownManager.waitOnWorkerShutdownRequest( - service - .futureStub() - .shutdownWorker( - ShutdownWorkerRequest.newBuilder() - .setIdentity(options.getIdentity()) - .setNamespace(namespace) - .setStickyTaskQueue(stickyTaskQueueName) - .setReason(GRACEFUL_SHUTDOWN_MESSAGE) - .build())); - } - return CompletableFuture.completedFuture(null); - }), - pollerShutdown - .thenCompose( - ignore -> - !interruptTasks - ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( - slotSupplier, supplierName) - : CompletableFuture.completedFuture(null)) - .thenCompose( - ignore -> - pollTaskExecutor != null - ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) - : CompletableFuture.completedFuture(null)) - .exceptionally( - e -> { - log.error("Unexpected exception during shutdown", e); - return null; - })); + return pollerShutdown + .thenCompose( + ignore -> + !interruptTasks + ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( + slotSupplier, supplierName) + : CompletableFuture.completedFuture(null)) + .thenCompose( + ignore -> + pollTaskExecutor != null + ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) + : CompletableFuture.completedFuture(null)) + .exceptionally( + e -> { + log.error("Unexpected exception during shutdown", e); + return null; + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index 5ffd300e9e..ee3a30f8e1 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -5,6 +5,8 @@ import com.google.common.base.Strings; import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.workflowservice.v1.ShutdownWorkerRequest; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.Experimental; @@ -17,6 +19,7 @@ import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.worker.*; import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; @@ -42,7 +45,13 @@ public final class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); private final WorkerOptions options; private final String taskQueue; + private final String workerInstanceKey = UUID.randomUUID().toString(); private final List plugins; + private final WorkflowServiceStubs service; + private final String namespace; + private final String identity; + private final String stickyTaskQueueName; + private final NamespaceCapabilities namespaceCapabilities; final SyncWorkflowWorker workflowWorker; final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; @@ -70,23 +79,32 @@ public final class Worker { WorkflowThreadExecutor workflowThreadExecutor, List contextPropagators, @Nonnull List plugins, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(client, "client should not be null"); + this.namespaceCapabilities = + Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); Preconditions.checkArgument( !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); this.taskQueue = taskQueue; + this.service = client.getWorkflowServiceStubs(); this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); + this.namespace = namespace; Map tags = new ImmutableMap.Builder(1).put(MetricsTag.TASK_QUEUE, taskQueue).build(); Scope taggedScope = metricsScope.tagged(tags); SingleWorkerOptions activityOptions = toActivityOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); if (this.options.isLocalActivityWorkerOnly()) { activityWorker = null; } else { @@ -104,7 +122,7 @@ public final class Worker { this.options.getMaxTaskQueueActivitiesPerSecond(), activityOptions, activitySlotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } EagerActivityDispatcher eagerActivityDispatcher = @@ -114,7 +132,12 @@ public final class Worker { SingleWorkerOptions nexusOptions = toNexusOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); SlotSupplier nexusSlotSupplier = this.options.getWorkerTuner() == null ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentNexusExecutionSize()) @@ -123,12 +146,7 @@ public final class Worker { nexusWorker = new SyncNexusWorker( - client, - namespace, - taskQueue, - nexusOptions, - nexusSlotSupplier, - serverSupportsAutoscaling); + client, namespace, taskQueue, nexusOptions, nexusSlotSupplier, namespaceCapabilities); SingleWorkerOptions singleWorkerOptions = toWorkflowWorkerOptions( @@ -137,10 +155,16 @@ public final class Worker { clientOptions, taskQueue, contextPropagators, - taggedScope); + taggedScope, + workerInstanceKey); SingleWorkerOptions localActivityOptions = toLocalActivityOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); SlotSupplier workflowSlotSupplier = this.options.getWorkerTuner() == null @@ -153,6 +177,10 @@ public final class Worker { : this.options.getWorkerTuner().getLocalActivitySlotSupplier(); attachMetricsToResourceController(taggedScope, localActivitySlotSupplier); + this.identity = singleWorkerOptions.getIdentity(); + this.stickyTaskQueueName = + useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null; + workflowWorker = new SyncWorkflowWorker( client, @@ -162,12 +190,12 @@ public final class Worker { localActivityOptions, runLocks, cache, - useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null, + stickyTaskQueueName, workflowThreadExecutor, eagerActivityDispatcher, workflowSlotSupplier, localActivitySlotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } /** @@ -425,18 +453,40 @@ void start() { } CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) { - CompletableFuture workflowWorkerShutdownFuture = - workflowWorker.shutdown(shutdownManager, interruptUserTasks); - CompletableFuture nexusWorkerShutdownFuture = - nexusWorker.shutdown(shutdownManager, interruptUserTasks); + ShutdownWorkerRequest.Builder requestBuilder = + ShutdownWorkerRequest.newBuilder() + .setNamespace(namespace) + .setIdentity(identity) + .setWorkerInstanceKey(workerInstanceKey) + .setTaskQueue(taskQueue) + .setReason("graceful shutdown") + .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) + .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); if (activityWorker != null) { - return CompletableFuture.allOf( - activityWorker.shutdown(shutdownManager, interruptUserTasks), - workflowWorkerShutdownFuture, - nexusWorkerShutdownFuture); - } else { - return CompletableFuture.allOf(workflowWorkerShutdownFuture, nexusWorkerShutdownFuture); + requestBuilder.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY); + } + if (stickyTaskQueueName != null) { + requestBuilder.setStickyTaskQueue(stickyTaskQueueName); } + CompletableFuture shutdownWorkerRpc = + shutdownManager.waitOnWorkerShutdownRequest( + service.futureStub().shutdownWorker(requestBuilder.build())); + + return shutdownWorkerRpc.thenCompose( + ignore -> { + CompletableFuture workflowWorkerShutdownFuture = + workflowWorker.shutdown(shutdownManager, interruptUserTasks); + CompletableFuture nexusWorkerShutdownFuture = + nexusWorker.shutdown(shutdownManager, interruptUserTasks); + if (activityWorker != null) { + return CompletableFuture.allOf( + activityWorker.shutdown(shutdownManager, interruptUserTasks), + workflowWorkerShutdownFuture, + nexusWorkerShutdownFuture); + } else { + return CompletableFuture.allOf(workflowWorkerShutdownFuture, nexusWorkerShutdownFuture); + } + }); } boolean isTerminated() { @@ -602,8 +652,10 @@ private static SingleWorkerOptions toActivityOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker()) .setPollerOptions( PollerOptions.newBuilder() @@ -624,8 +676,10 @@ private static SingleWorkerOptions toNexusOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior( @@ -646,7 +700,8 @@ private static SingleWorkerOptions toWorkflowWorkerOptions( WorkflowClientOptions clientOptions, String taskQueue, List contextPropagators, - Scope metricsScope) { + Scope metricsScope, + String workerInstanceKey) { Map tags = new ImmutableMap.Builder(1).put(MetricsTag.TASK_QUEUE, taskQueue).build(); @@ -675,7 +730,8 @@ private static SingleWorkerOptions toWorkflowWorkerOptions( } } - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior( @@ -697,8 +753,10 @@ private static SingleWorkerOptions toLocalActivityOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) @@ -715,7 +773,8 @@ private static SingleWorkerOptions.Builder toSingleWorkerOptions( WorkerFactoryOptions factoryOptions, WorkerOptions options, WorkflowClientOptions clientOptions, - List contextPropagators) { + List contextPropagators, + String workerInstanceKey) { String buildId = null; if (options.getBuildId() != null) { buildId = options.getBuildId(); @@ -738,7 +797,8 @@ private static SingleWorkerOptions.Builder toSingleWorkerOptions( .setWorkerInterceptors(factoryOptions.getWorkerInterceptors()) .setMaxHeartbeatThrottleInterval(options.getMaxHeartbeatThrottleInterval()) .setDefaultHeartbeatThrottleInterval(options.getDefaultHeartbeatThrottleInterval()) - .setDeploymentOptions(options.getDeploymentOptions()); + .setDeploymentOptions(options.getDeploymentOptions()) + .setWorkerInstanceKey(workerInstanceKey); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 3a22b7351b..1da17cc95f 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -13,6 +13,7 @@ import io.temporal.internal.common.PluginUtils; import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.task.VirtualThreadDelegate; +import io.temporal.internal.worker.NamespaceCapabilities; import io.temporal.internal.worker.ShutdownManager; import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.internal.worker.WorkflowRunLockManager; @@ -29,7 +30,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -58,8 +58,8 @@ public final class WorkerFactory { /** Plugins propagated from the client and applied to this factory. */ private final List plugins; - /** Set during start() if the namespace has the poller_autoscaling capability. */ - private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + /** Namespace capabilities populated during start() from DescribeNamespace response. */ + private final NamespaceCapabilities namespaceCapabilities = new NamespaceCapabilities(); private State state = State.Initial; @@ -198,7 +198,7 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { workflowThreadExecutor, workflowClient.getOptions().getContextPropagators(), plugins, - pollerAutoscaling); + namespaceCapabilities); workers.put(taskQueue, worker); // Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, @@ -266,7 +266,13 @@ public synchronized void start() { .setNamespace(workflowClient.getOptions().getNamespace()) .build()); if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { - pollerAutoscaling.set(true); + namespaceCapabilities.setPollerAutoscaling(true); + } + if (describeNamespaceResponse + .getNamespaceInfo() + .getCapabilities() + .getWorkerPollCompleteOnShutdown()) { + namespaceCapabilities.setGracefulPollShutdown(true); } // Build plugin execution chain (reverse order for proper nesting) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java index 2ade977627..5faa34ca7c 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java @@ -133,7 +133,7 @@ private AsyncPoller newPoller( pollTask, taskExecutor, options, - false, + new NamespaceCapabilities(), new NoopScope()); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java index ee25255470..cab62bb9f7 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java @@ -80,6 +80,7 @@ public void supplierIsCalledAppropriately() { TASK_QUEUE, "stickytaskqueue", "", + "test-instance-key", new WorkerVersioningOptions("", false, null), trackingSS, stickyQueueBalancer, @@ -170,6 +171,7 @@ public void asyncPollerSupplierIsCalledAppropriately() throws Exception { TASK_QUEUE, null, "", + "test-instance-key", new WorkerVersioningOptions("", false, null), trackingSS, metricsScope, diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 0a080ec63f..78e7b57d0a 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -68,6 +68,7 @@ public void stickyQueueBacklogResetTest() { "taskqueue", "stickytaskqueue", "", + "test-instance-key", new WorkerVersioningOptions("", false, null), slotSupplier, stickyQueueBalancer, diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index cbe194091a..4a65a2486b 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -31,7 +31,6 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -85,7 +84,7 @@ public void concurrentPollRequestLockTest() throws Exception { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -255,7 +254,7 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -398,7 +397,7 @@ public boolean isAnyTypeSupported() { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); From c4740e03d6fba6e56143f28a02c3a7158d188322 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:45:15 -0700 Subject: [PATCH 2/5] Don't block on shutdown RPC in interrupt mode --- .gitignore | 3 ++- temporal-sdk/src/main/java/io/temporal/worker/Worker.java | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ef1d5524bd..9f4365d048 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,5 @@ src/main/idls/* .settings .vscode/ */bin -/.claude \ No newline at end of file +/.claude +**/.factorypath diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index ee3a30f8e1..293b17c5f8 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -472,7 +472,13 @@ CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interr shutdownManager.waitOnWorkerShutdownRequest( service.futureStub().shutdownWorker(requestBuilder.build())); - return shutdownWorkerRpc.thenCompose( + // When interrupting tasks (shutdownNow), fire the RPC but don't block on it — proceed to + // shut down pollers immediately. For graceful shutdown, wait for the RPC so the server can + // complete outstanding polls with empty responses before we start waiting on them. + CompletableFuture preShutdown = + interruptUserTasks ? CompletableFuture.completedFuture(null) : shutdownWorkerRpc; + + return preShutdown.thenCompose( ignore -> { CompletableFuture workflowWorkerShutdownFuture = workflowWorker.shutdown(shutdownManager, interruptUserTasks); From 33e318a3d783360021d752395b5d67ab51b76b17 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:54:51 -0700 Subject: [PATCH 3/5] Add test --- .../worker/GracefulPollShutdownTest.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java new file mode 100644 index 0000000000..edd8968695 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -0,0 +1,149 @@ +package io.temporal.internal.worker; + +import com.uber.m3.tally.NoopScope; +import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +/** + * Tests that an in-flight poll survives shutdown when graceful poll shutdown is enabled, and is + * killed promptly when it is not. + */ +@RunWith(Parameterized.class) +public class GracefulPollShutdownTest { + + @Parameterized.Parameter public boolean graceful; + + @Parameterized.Parameters(name = "graceful={0}") + public static Object[] data() { + return new Object[] {true, false}; + } + + @Test(timeout = 10_000) + public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception { + NamespaceCapabilities capabilities = new NamespaceCapabilities(); + capabilities.setGracefulPollShutdown(graceful); + + AtomicReference processedTask = new AtomicReference<>(); + CountDownLatch taskProcessedLatch = new CountDownLatch(1); + ShutdownableTaskExecutor taskExecutor = + new ShutdownableTaskExecutor() { + @Override + public void process(@NonNull String task) { + processedTask.set(task); + taskProcessedLatch.countDown(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public CompletableFuture shutdown( + ShutdownManager shutdownManager, boolean interruptTasks) { + return CompletableFuture.completedFuture(null); + } + + @Override + public void awaitTermination(long timeout, TimeUnit unit) {} + }; + + // -- poll task: first call returns immediately, second blocks until released -- + CountDownLatch secondPollStarted = new CountDownLatch(1); + CountDownLatch releasePoll = new CountDownLatch(1); + + MultiThreadedPoller.PollTask pollTask = + new MultiThreadedPoller.PollTask() { + private int callCount = 0; + + @Override + public synchronized String poll() { + callCount++; + if (callCount == 1) { + return "task-1"; + } else if (callCount == 2) { + secondPollStarted.countDown(); + try { + releasePoll.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + return "task-2"; + } + // Subsequent calls just block until interrupted (simulates long poll) + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + } + }; + + // -- create poller with 1 thread so polls are sequential -- + MultiThreadedPoller poller = + new MultiThreadedPoller<>( + "test-identity", + pollTask, + taskExecutor, + PollerOptions.newBuilder() + .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) + .setPollThreadNamePrefix("test-poller") + .build(), + new NoopScope(), + capabilities); + + poller.start(); + + // Wait for the first task to be processed (proves poller is running) + assertTrue("first task should be processed", taskProcessedLatch.await(5, TimeUnit.SECONDS)); + assertEquals("task-1", processedTask.get()); + + // Wait for the second poll to be in-flight + assertTrue("second poll should start", secondPollStarted.await(5, TimeUnit.SECONDS)); + + // Trigger shutdown (don't interrupt tasks) + ShutdownManager shutdownManager = new ShutdownManager(); + CompletableFuture shutdownFuture = poller.shutdown(shutdownManager, false); + + if (graceful) { + // In graceful mode the poller waits for the in-flight poll to complete. + // The shutdown should NOT have completed yet since the poll is still blocked. + assertFalse("shutdown should not complete while poll is in-flight", shutdownFuture.isDone()); + + // Simulate the server returning the poll response (as it would after ShutdownWorker RPC) + releasePoll.countDown(); + + // Wait for shutdown to complete - the poll should return "task-2" and be processed + shutdownFuture.get(5, TimeUnit.SECONDS); + + assertEquals("task-2", processedTask.get()); + } else { + // In legacy mode the poller forcefully interrupts in-flight polls. + // Shutdown should complete quickly without releasing the blocked poll. + shutdownFuture.get(5, TimeUnit.SECONDS); + + // The second task should NOT have been processed since the poll was killed. + assertNotEquals( + "task-2 should not be processed in legacy mode", "task-2", processedTask.get()); + } + + shutdownManager.close(); + } +} From 97a247dc02e17f15a53ddf7709247c37f1b635b3 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 18:18:11 -0700 Subject: [PATCH 4/5] Format fix --- .../internal/worker/GracefulPollShutdownTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java index edd8968695..45c2d1f7f7 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -1,18 +1,17 @@ package io.temporal.internal.worker; +import static org.junit.Assert.*; + import com.uber.m3.tally.NoopScope; import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Tests that an in-flight poll survives shutdown when graceful poll shutdown is enabled, and is From 646981c115933b1a17fd3adc56683cb659c41e94 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Sat, 28 Mar 2026 10:24:14 -0700 Subject: [PATCH 5/5] Test fixes --- .../io/temporal/internal/worker/StickyQueueBacklogTest.java | 1 + .../java/io/temporal/internal/worker/WorkflowWorkerTest.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 78e7b57d0a..ddd6ddaf9b 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -96,6 +96,7 @@ public void stickyQueueBacklogResetTest() { .setKind(TaskQueueKind.TASK_QUEUE_KIND_STICKY) .build()) .setNamespace("default") + .setWorkerInstanceKey("test-instance-key") .build()))) .thenReturn(pollResponse); if (throwOnPoll) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index 4a65a2486b..de6e1708ad 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -73,6 +73,7 @@ public void concurrentPollRequestLockTest() throws Exception { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(3)) @@ -243,6 +244,7 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) @@ -386,6 +388,7 @@ public boolean isAnyTypeSupported() { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1))