Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ public WorkflowTaskResult handleWorkflowTask(
// task,
// we always can rework it to graceful invalidation of the cache entity and a full replay
// from the server
throw new IllegalStateException(
"Server history for the workflow is below the progress of the workflow on the worker, the progress needs to be discarded");
throw StaleWorkflowHistoryException.newCacheProgressMismatch();
}

handleWorkflowTaskImpl(workflowTask, historyIterator);
Expand Down Expand Up @@ -299,10 +298,7 @@ private void applyServerHistory(long lastEventId, WorkflowHistoryIterator histor
// a stale node.
private void verifyAllEventsProcessed(long lastEventId, long processedEventId) {
if (lastEventId != Long.MAX_VALUE && lastEventId > 0 && processedEventId < lastEventId) {
throw new IllegalStateException(
String.format(
"Premature end of stream, expectedLastEventID=%d but no more events after eventID=%d",
lastEventId, processedEventId));
throw StaleWorkflowHistoryException.newPrematureEndOfStream(lastEventId, processedEventId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import io.grpc.Status;
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.MeteringMetadata;
Expand All @@ -21,6 +22,7 @@
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.BackoffThrottler;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.worker.*;
Expand All @@ -34,6 +36,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -42,6 +45,12 @@
public final class ReplayWorkflowTaskHandler implements WorkflowTaskHandler {

private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowTaskHandler.class);
private static final Duration MAX_STALE_HISTORY_RETRY_WINDOW = Duration.ofSeconds(10);
private static final Duration STALE_HISTORY_RETRY_COMPLETION_BUFFER = Duration.ofSeconds(1);
private static final Duration STALE_HISTORY_RETRY_INITIAL_INTERVAL = Duration.ofMillis(200);
private static final Duration STALE_HISTORY_RETRY_MAX_INTERVAL = Duration.ofSeconds(4);
private static final double STALE_HISTORY_RETRY_BACKOFF_COEFFICIENT = 2.0;
private static final double STALE_HISTORY_RETRY_MAX_JITTER_COEFFICIENT = 0.2;

private final ReplayWorkflowFactory workflowFactory;
private final String namespace;
Expand Down Expand Up @@ -82,90 +91,146 @@ public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueRespon

private Result handleWorkflowTaskWithQuery(
PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
boolean directQuery = workflowTask.hasQuery();
AtomicBoolean createdNew = new AtomicBoolean();
WorkflowExecution execution = workflowTask.getWorkflowExecution();
WorkflowRunTaskHandler workflowRunTaskHandler = null;
PollWorkflowTaskQueueResponse originalWorkflowTask = workflowTask.build();
boolean directQuery = originalWorkflowTask.hasQuery();
WorkflowExecution execution = originalWorkflowTask.getWorkflowExecution();
boolean useCache = stickyTaskQueue != null;
BackoffThrottler staleHistoryRetryBackoff =
new BackoffThrottler(
STALE_HISTORY_RETRY_INITIAL_INTERVAL,
STALE_HISTORY_RETRY_INITIAL_INTERVAL,
STALE_HISTORY_RETRY_MAX_INTERVAL,
STALE_HISTORY_RETRY_BACKOFF_COEFFICIENT,
STALE_HISTORY_RETRY_MAX_JITTER_COEFFICIENT);
int staleHistoryRetries = 0;
long staleHistoryRetryDeadlineNanos = Long.MIN_VALUE;

try {
workflowRunTaskHandler =
getOrCreateWorkflowExecutor(useCache, workflowTask, metricsScope, createdNew);
logWorkflowTaskToBeProcessed(workflowTask, createdNew);
while (true) {
PollWorkflowTaskQueueResponse.Builder attemptWorkflowTask = originalWorkflowTask.toBuilder();
AtomicBoolean createdNew = new AtomicBoolean();
WorkflowRunTaskHandler workflowRunTaskHandler = null;

ServiceWorkflowHistoryIterator historyIterator =
new ServiceWorkflowHistoryIterator(service, namespace, workflowTask, metricsScope);
boolean finalCommand;
Result result;
try {
workflowRunTaskHandler =
getOrCreateWorkflowExecutor(useCache, attemptWorkflowTask, metricsScope, createdNew);
logWorkflowTaskToBeProcessed(attemptWorkflowTask, createdNew);

if (directQuery) {
// Direct query happens when there is no reason (events) to produce a real persisted
// workflow task.
// But Server needs to notify the workflow about the query and get back the query result.
// Server creates a fake non-persisted a PollWorkflowTaskResponse with just the query.
// This WFT has no new events in the history to process
// and the worker response on such a WFT can't contain any new commands either.
QueryResult queryResult =
workflowRunTaskHandler.handleDirectQueryWorkflowTask(workflowTask, historyIterator);
finalCommand = queryResult.isWorkflowMethodCompleted();
result = createDirectQueryResult(workflowTask, queryResult, null);
} else {
// main code path, handle workflow task that can have an embedded query
WorkflowTaskResult wftResult =
workflowRunTaskHandler.handleWorkflowTask(workflowTask, historyIterator);
finalCommand = wftResult.isFinalCommand();
result =
createCompletedWFTRequest(
workflowTask.getWorkflowType().getName(),
workflowTask,
wftResult,
workflowRunTaskHandler::resetStartedEventId);
}
ServiceWorkflowHistoryIterator historyIterator =
new ServiceWorkflowHistoryIterator(
service, namespace, attemptWorkflowTask, metricsScope);
boolean finalCommand;
Result result;

if (useCache) {
if (finalCommand) {
// don't invalidate execution from the cache if we were not using cached value here
cache.invalidate(execution, metricsScope, "FinalCommand", null);
} else if (createdNew.get()) {
cache.addToCache(execution, workflowRunTaskHandler);
if (directQuery) {
// Direct query happens when there is no reason (events) to produce a real persisted
// workflow task.
// But Server needs to notify the workflow about the query and get back the query result.
// Server creates a fake non-persisted a PollWorkflowTaskResponse with just the query.
// This WFT has no new events in the history to process
// and the worker response on such a WFT can't contain any new commands either.
QueryResult queryResult =
workflowRunTaskHandler.handleDirectQueryWorkflowTask(
attemptWorkflowTask, historyIterator);
finalCommand = queryResult.isWorkflowMethodCompleted();
result = createDirectQueryResult(attemptWorkflowTask, queryResult, null);
} else {
// main code path, handle workflow task that can have an embedded query
WorkflowTaskResult wftResult =
workflowRunTaskHandler.handleWorkflowTask(attemptWorkflowTask, historyIterator);
finalCommand = wftResult.isFinalCommand();
result =
createCompletedWFTRequest(
attemptWorkflowTask.getWorkflowType().getName(),
attemptWorkflowTask,
wftResult,
workflowRunTaskHandler::resetStartedEventId);
}
}

return result;
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
// Note here that the executor might not be in the cache, even when the caching is on. In that
// case we need to close the executor explicitly. For items in the cache, invalidation
// callback will try to close again, which should be ok.
if (workflowRunTaskHandler != null) {
workflowRunTaskHandler.close();
}
if (useCache) {
if (finalCommand) {
// don't invalidate execution from the cache if we were not using cached value here
cache.invalidate(execution, metricsScope, "FinalCommand", null);
} else if (createdNew.get()) {
cache.addToCache(execution, workflowRunTaskHandler);
}
}

if (useCache) {
cache.invalidate(execution, metricsScope, "Exception", e);
// If history is full and exception occurred then sticky session hasn't been established
// yet, and we can avoid doing a reset.
if (!isFullHistory(workflowTask)) {
resetStickyTaskQueue(execution);
return result;
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
boolean staleHistoryFailure =
StaleWorkflowHistoryException.isStaleWorkflowHistoryFailure(e);
long staleHistoryRetrySleepMillis = 0;
if (staleHistoryFailure) {
if (staleHistoryRetryDeadlineNanos == Long.MIN_VALUE) {
staleHistoryRetryDeadlineNanos =
System.nanoTime() + getStaleHistoryRetryWindow(attemptWorkflowTask).toNanos();
}
staleHistoryRetryBackoff.failure(Status.Code.UNKNOWN);
staleHistoryRetrySleepMillis = staleHistoryRetryBackoff.getSleepTime();
}
// Note here that the executor might not be in the cache, even when the caching is on. In
// that case we need to close the executor explicitly. For items in the cache, invalidation
// callback will try to close again, which should be ok.
if (workflowRunTaskHandler != null) {
workflowRunTaskHandler.close();
workflowRunTaskHandler = null;
}
}

if (directQuery) {
return createDirectQueryResult(workflowTask, null, e);
} else {
// this call rethrows an exception in some scenarios
DataConverter dataConverterWithWorkflowContext =
options
.getDataConverter()
.withContext(
new WorkflowSerializationContext(namespace, execution.getWorkflowId()));
return failureToWFTResult(workflowTask, e, dataConverterWithWorkflowContext);
}
} finally {
if (!useCache && workflowRunTaskHandler != null) {
// we close the execution in finally only if we don't use cache, otherwise it stays open
workflowRunTaskHandler.close();
if (staleHistoryFailure
&& shouldRetryStaleHistory(
staleHistoryRetryDeadlineNanos, staleHistoryRetrySleepMillis)) {
staleHistoryRetries++;
if (useCache) {
cache.invalidate(execution, metricsScope, "StaleHistory", e);
}
long remainingRetryMillis =
Math.max(0, (staleHistoryRetryDeadlineNanos - System.nanoTime()) / 1_000_000);
log.info(
"Retrying workflow task after stale history response. startedEventId={}, WorkflowId={}, RunId={}, retryAttempt={}, remainingRetryMillis={}, reason={}",
attemptWorkflowTask.getStartedEventId(),
execution.getWorkflowId(),
execution.getRunId(),
staleHistoryRetries,
remainingRetryMillis,
e.getMessage());
if (staleHistoryRetrySleepMillis > 0) {
Thread.sleep(staleHistoryRetrySleepMillis);
}
continue;
}

if (useCache) {
cache.invalidate(
execution, metricsScope, staleHistoryFailure ? "StaleHistory" : "Exception", e);
// If history is full and exception occurred then sticky session hasn't been established
// yet, and we can avoid doing a reset.
if (!isFullHistory(attemptWorkflowTask)) {
resetStickyTaskQueue(execution);
}
}

if (directQuery) {
return createDirectQueryResult(attemptWorkflowTask, null, e);
} else if (staleHistoryFailure) {
// Stale history should not become a WORKFLOW_TASK_FAILED. Leave the task uncompleted and
// rely on a later attempt to replay against a fresher history read.
return createNoCompletionResult(attemptWorkflowTask);
} else {
// this call rethrows an exception in some scenarios
DataConverter dataConverterWithWorkflowContext =
options
.getDataConverter()
.withContext(
new WorkflowSerializationContext(namespace, execution.getWorkflowId()));
return failureToWFTResult(attemptWorkflowTask, e, dataConverterWithWorkflowContext);
}
} finally {
if (!useCache && workflowRunTaskHandler != null) {
// we close the execution in finally only if we don't use cache, otherwise it stays open
workflowRunTaskHandler.close();
}
}
}
}
Expand Down Expand Up @@ -352,6 +417,11 @@ private Result createDirectQueryResult(
null);
}

private Result createNoCompletionResult(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
return new Result(
workflowTask.getWorkflowType().getName(), null, null, null, null, false, null, null);
}

@Override
public boolean isAnyTypeSupported() {
return workflowFactory.isAnyTypeSupported();
Expand Down Expand Up @@ -399,6 +469,7 @@ private WorkflowRunTaskHandler createStatefulHandler(
.setHistory(getHistoryResponse.getHistory())
.setNextPageToken(getHistoryResponse.getNextPageToken());
}
validateWorkflowTaskHistory(workflowTask);
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
return new ReplayWorkflowRunTaskHandler(
namespace,
Expand All @@ -410,6 +481,54 @@ private WorkflowRunTaskHandler createStatefulHandler(
service.getServerCapabilities().get());
}

private void validateWorkflowTaskHistory(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
List<HistoryEvent> events = workflowTask.getHistory().getEventsList();
if (events.isEmpty()) {
throw StaleWorkflowHistoryException.newPrematureEndOfStream(
workflowTask.getStartedEventId(), 0);
}
if (events.get(0).getEventId() > 1) {
throw StaleWorkflowHistoryException.newIncompleteFullHistory(events.get(0).getEventId());
}

long startedEventId = workflowTask.getStartedEventId();
long lastEventId = events.get(events.size() - 1).getEventId();
if (startedEventId != Long.MAX_VALUE
&& startedEventId > 0
&& workflowTask.getNextPageToken().isEmpty()
&& lastEventId < startedEventId) {
throw StaleWorkflowHistoryException.newPrematureEndOfStream(startedEventId, lastEventId);
}
}

private boolean shouldRetryStaleHistory(
long staleHistoryRetryDeadlineNanos, long staleHistoryRetrySleepMillis) {
long nowNanos = System.nanoTime();
if (staleHistoryRetryDeadlineNanos == Long.MIN_VALUE
|| nowNanos >= staleHistoryRetryDeadlineNanos) {
return false;
}
return nowNanos + TimeUnit.MILLISECONDS.toNanos(Math.max(0, staleHistoryRetrySleepMillis))
< staleHistoryRetryDeadlineNanos;
}

private Duration getStaleHistoryRetryWindow(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
Duration retryWindow = MAX_STALE_HISTORY_RETRY_WINDOW;
List<HistoryEvent> events = workflowTask.getHistory().getEventsList();
if (!events.isEmpty() && events.get(0).hasWorkflowExecutionStartedEventAttributes()) {
Duration workflowTaskTimeout =
ProtobufTimeUtils.toJavaDuration(
events.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowTaskTimeout());
if (!workflowTaskTimeout.isZero() && workflowTaskTimeout.compareTo(retryWindow) < 0) {
retryWindow = workflowTaskTimeout;
}
}
if (retryWindow.compareTo(STALE_HISTORY_RETRY_COMPLETION_BUFFER) <= 0) {
return Duration.ZERO;
}
return retryWindow.minus(STALE_HISTORY_RETRY_COMPLETION_BUFFER);
}

private void resetStickyTaskQueue(WorkflowExecution execution) {
service
.futureStub()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.temporal.internal.replay;

final class StaleWorkflowHistoryException extends RuntimeException {

private static final String CACHE_PROGRESS_MISMATCH_MESSAGE =
"Server history for the workflow is below the progress of the workflow on the worker, the progress needs to be discarded";

private StaleWorkflowHistoryException(String message) {
super(message);
}

static IllegalStateException newCacheProgressMismatch() {
return new IllegalStateException(
CACHE_PROGRESS_MISMATCH_MESSAGE,
new StaleWorkflowHistoryException(CACHE_PROGRESS_MISMATCH_MESSAGE));
}

static IllegalStateException newPrematureEndOfStream(
long expectedLastEventId, long processedEventId) {
String message =
String.format(
"Premature end of stream, expectedLastEventID=%d but no more events after eventID=%d",
expectedLastEventId, processedEventId);
return new IllegalStateException(message, new StaleWorkflowHistoryException(message));
}

static IllegalStateException newIncompleteFullHistory(long firstEventId) {
String message =
String.format(
"Incomplete history for workflow task replay, expected history to start at eventID=1 but got eventID=%d",
firstEventId);
return new IllegalStateException(message, new StaleWorkflowHistoryException(message));
}

static boolean isStaleWorkflowHistoryFailure(Throwable e) {
while (e != null) {
if (e instanceof StaleWorkflowHistoryException) {
return true;
}
e = e.getCause();
}
return false;
}
}
Loading
Loading