diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index a39b7920b6..372dfb857b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -53,7 +53,7 @@ enum HandleEventStatus { /** Initial set of SDK flags that will be set on all new workflow executions. */ @VisibleForTesting public static List initialFlags = - Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)); + Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION); /** * Keep track of the change versions that have been seen by the SDK. This is used to generate the diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 1c90397e04..747d3a09d8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { * Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic * scheduling if the getVersion call was removed. * */ - if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) { + if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) { // This can happen if we are replaying a workflow and encounter a getVersion call that did not // exist on the original execution and the range does not include the default version. if (versionToUse == null) { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java new file mode 100644 index 0000000000..d240c5d86f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java @@ -0,0 +1,81 @@ +package io.temporal.workflow.failure; + +import static io.temporal.testUtils.Eventually.assertEventually; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowException; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class WorkflowFailureGetVersionTest { + + @Rule public TestName testName = new TestName(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowGetVersionAndException.class) + .build(); + + @Test + public void getVersionAndException() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute, testName.getMethodName()); + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + + try { + HistoryEvent workflowTaskFailed = + assertEventually( + Duration.ofSeconds(5), + () -> { + List failedEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); + Assert.assertFalse("No workflow task failure recorded", failedEvents.isEmpty()); + return failedEvents.get(0); + }); + + Failure failure = + getDeepestFailure(workflowTaskFailed.getWorkflowTaskFailedEventAttributes().getFailure()); + Assert.assertEquals("Any error", failure.getMessage()); + Assert.assertTrue(failure.hasApplicationFailureInfo()); + Assert.assertEquals( + RuntimeException.class.getName(), failure.getApplicationFailureInfo().getType()); + } finally { + try { + workflowStub.terminate("terminate test workflow"); + } catch (WorkflowException ignored) { + } + } + } + + private static Failure getDeepestFailure(Failure failure) { + while (failure.hasCause()) { + failure = failure.getCause(); + } + return failure; + } + + public static class TestWorkflowGetVersionAndException implements TestWorkflow1 { + + @Override + public String execute(String unused) { + String changeId = "change-id"; + Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1); + Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1); + throw new RuntimeException("Any error"); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultithreadingRemoveTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultithreadingRemoveTest.java index cc49e88002..13f9753321 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultithreadingRemoveTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultithreadingRemoveTest.java @@ -1,12 +1,13 @@ package io.temporal.workflow.versionTests; -import static org.junit.Assert.*; -import static org.junit.Assume.assumeTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import io.temporal.internal.Issue; import io.temporal.testing.WorkflowReplayer; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.worker.WorkerOptions; import io.temporal.workflow.Async; import io.temporal.workflow.Workflow; @@ -14,19 +15,19 @@ import io.temporal.workflow.shared.TestWorkflows; import io.temporal.workflow.unsafe.WorkflowUnsafe; import java.time.Duration; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @Issue("https://github.com/temporalio/sdk-java/issues/2307") -public class GetVersionMultithreadingRemoveTest extends BaseVersionTest { +public class GetVersionMultithreadingRemoveTest { private static boolean hasReplayed; @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes( - getDefaultWorkflowImplementationOptions(), TestGetVersionWorkflowImpl.class) + .setWorkflowTypes(TestGetVersionWorkflowImpl.class) .setActivityImplementations(new TestActivities.TestActivitiesImpl()) // Forcing a replay. Full history arrived from a normal queue causing a replay. .setWorkerOptions( @@ -35,18 +36,30 @@ public class GetVersionMultithreadingRemoveTest extends BaseVersionTest { .build()) .build(); - public GetVersionMultithreadingRemoveTest(boolean setVersioningFlag, boolean upsertVersioningSA) { - super(setVersioningFlag, upsertVersioningSA); + @Before + public void setUp() { + hasReplayed = false; } @Test public void testGetVersionMultithreadingRemoval() { - assumeTrue("This test only passes if SKIP_YIELD_ON_VERSION is enabled", setVersioningFlag); TestWorkflows.TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + assertTrue(hasReplayed); assertEquals("activity1", result); + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "newThread null", + "getVersion", + "executeActivity customActivity1", + "sleep PT1S", + "activity customActivity1"); } @Test @@ -76,9 +89,8 @@ public String execute(String taskQueue) { } else { hasReplayed = true; } - String result = - "activity" + testActivities.activity1(1); // This is executed in non-replay mode. - return result; + + return "activity" + testActivities.activity1(1); } } }