Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:

- name: Start containerized server and dependencies
env:
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally committed local Maven repository configuration

High Severity

mavenLocal() has been added to allprojects.repositories and takes priority over mavenCentral(). This is typically used during local development to resolve SNAPSHOT dependencies (in this case nexusVersion = '0.5.0-SNAPSHOT'). Having mavenLocal() in the main build file causes non-reproducible builds since different machines may have different artifacts in their local Maven cache. CI builds may fail or produce inconsistent results.

Fix in Cursor Fix in Web

mavenCentral()
}
}
Expand All @@ -30,7 +31,7 @@ ext {
// Platforms
grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.15.4' // [2.9.0,)
nexusVersion = '0.4.0-alpha'
nexusVersion = '0.5.0-SNAPSHOT'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use a published nexus-sdk version

nexusVersion is set to 0.5.0-SNAPSHOT, but the only configured repositories are mavenLocal() and mavenCentral(). In a clean environment (including CI runners with an empty local Maven cache), this snapshot cannot be resolved from Maven Central, so Gradle fails before tests/build can start; this effectively makes the project non-buildable unless contributors manually publish the dependency to local Maven first.

Useful? React with 👍 / 👎.

// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.temporal.common.converter.FailureConverter;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.common.FailureUtils;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
import io.temporal.serviceclient.CheckedExceptionWrapper;
Expand Down Expand Up @@ -192,7 +193,18 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE;
break;
}
return new HandlerException(info.getType(), cause, retryBehavior);
if (failure
.getMessage()
.startsWith(String.format("handler error (%s)", info.getType()))) {
return new HandlerException(info.getType(), cause, retryBehavior);
} else {
return new HandlerException(
info.getType(),
failure.getMessage(),
cause,
retryBehavior,
NexusUtil.temporalFailureToNexusFailureInfo(failure));
}
}
case FAILUREINFO_NOT_SET:
default:
Expand Down Expand Up @@ -324,6 +336,9 @@ private Failure exceptionToFailure(Throwable throwable) {
failure.setNexusOperationExecutionFailureInfo(op);
} else if (throwable instanceof HandlerException) {
HandlerException he = (HandlerException) throwable;
if (he.getOriginalFailure() != null) {
return NexusUtil.nexusFailureToAPIFailure(he.getOriginalFailure(), true);
}
NexusHandlerErrorRetryBehavior retryBehavior =
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
switch (he.getRetryBehavior()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package io.temporal.internal.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.nexusrpc.FailureInfo;
import io.nexusrpc.Link;
import io.nexusrpc.handler.HandlerException;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
import io.temporal.api.nexus.v1.Failure;
import io.temporal.api.nexus.v1.HandlerError;
import io.temporal.common.converter.DataConverter;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -13,7 +21,8 @@
import java.util.Map;

public class NexusUtil {
private static final JsonFormat.Printer JSON_PRINTER =
private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer();
private static final JsonFormat.Printer PROTO_JSON_PRINTER =
JsonFormat.printer().omittingInsignificantWhitespace();
private static final String TEMPORAL_FAILURE_TYPE_STRING =
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
Expand Down Expand Up @@ -47,23 +56,134 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
.build();
}

public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
public static Failure temporalFailureToNexusFailure(
io.temporal.api.failure.v1.Failure temporalFailure) {
String details;
try {
details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build());
details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build());
} catch (InvalidProtocolBufferException e) {
return Failure.newBuilder()
.setMessage("Failed to serialize failure details")
.setDetails(ByteString.copyFromUtf8(e.getMessage()))
.build();
}
return Failure.newBuilder()
.setMessage(failure.getMessage())
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA)
Failure.Builder failureBuilder =
Failure.newBuilder()
.setMessage(temporalFailure.getMessage())
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA);
if (!temporalFailure.getStackTrace().isEmpty()) {
failureBuilder.setStackTrace(temporalFailure.getStackTrace());
}
return failureBuilder.build();
}

public static io.temporal.api.failure.v1.Failure nexusFailureToAPIFailure(
FailureInfo failureInfo, boolean retryable) {
io.temporal.api.failure.v1.Failure.Builder apiFailure =
io.temporal.api.failure.v1.Failure.newBuilder();

if (failureInfo.getMetadata().containsKey("type")
&& failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) {
// Details contains a JSON-serialized Temporal failure
try {
JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
} else {
// Create an ApplicationFailure with the Nexus failure data
io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo);
io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo =
io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder()
.setType("NexusFailure")
.setNonRetryable(!retryable);
if (payloads != null) {
appFailureInfo.setDetails(payloads);
}
apiFailure.setApplicationFailureInfo(appFailureInfo.build());
}

// Ensure these always get written
apiFailure.setMessage(failureInfo.getMessage());

return apiFailure.build();
}

private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads(
FailureInfo failureInfo) {
if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) {
return null;
}

// Create a copy without the message before serializing
FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build();
String json = null;
try {
json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

return io.temporal.api.common.v1.Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.putMetadata("encoding", ByteString.copyFromUtf8("json/plain"))
.setData(ByteString.copyFromUtf8(json))
.build())
.build();
}

public static FailureInfo temporalFailureToNexusFailureInfo(
io.temporal.api.failure.v1.Failure temporalFailure) {
String details;
try {
details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build());
} catch (InvalidProtocolBufferException e) {
return FailureInfo.newBuilder()
.setMessage("Failed to serialize failure details")
.setDetailsJson(e.getMessage())
.build();
}
return FailureInfo.newBuilder()
.setMessage(temporalFailure.getMessage())
.setDetailsJson(details)
.putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING)
.build();
}

public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
return temporalFailureToNexusFailure(failure);
}

public static HandlerError handlerErrorToNexusError(
HandlerException e, DataConverter dataConverter) {
HandlerError.Builder handlerError =
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()));
// TODO: check if this works on old server
if (e.getCause() != null) {
handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter));
} else if (e.getMessage() != null && !e.getMessage().isEmpty()) {
// Include message even when there's no cause
handlerError.setFailure(Failure.newBuilder().setMessage(e.getMessage()).build());
}
return handlerError.build();
}

private static NexusHandlerErrorRetryBehavior mapRetryBehavior(
HandlerException.RetryBehavior retryBehavior) {
switch (retryBehavior) {
case RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
case NON_RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
default:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
}
}

private NexusUtil() {}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package io.temporal.internal.nexus;

import static io.temporal.internal.common.NexusUtil.exceptionToNexusFailure;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.uber.m3.tally.Scope;
import io.grpc.StatusRuntimeException;
import io.nexusrpc.Header;
import io.nexusrpc.OperationException;
import io.nexusrpc.OperationState;
import io.nexusrpc.handler.*;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
import io.temporal.api.nexus.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowNotFoundException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.interceptors.WorkerInterceptor;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
Expand Down Expand Up @@ -78,9 +80,6 @@ public boolean start() {
public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException {
Request request = task.getResponse().getRequest();
Map<String, String> headers = request.getHeaderMap();
if (headers == null) {
headers = Collections.emptyMap();
}

OperationContext.Builder ctx = OperationContext.newBuilder();
headers.forEach(ctx::putHeader);
Expand Down Expand Up @@ -129,18 +128,9 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
new RuntimeException("Unknown request type: " + request.getVariantCase()));
}
} catch (HandlerException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()))
.build());
return new Result(e);
} catch (Throwable e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(HandlerException.ErrorType.INTERNAL.toString())
.setFailure(exceptionToNexusFailure(e, dataConverter))
.build());
return new Result(new HandlerException(HandlerException.ErrorType.INTERNAL, e));
} finally {
// If the task timed out, we should not send a response back to the server
if (timedOut.get()) {
Expand All @@ -154,18 +144,6 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}
}

private NexusHandlerErrorRetryBehavior mapRetryBehavior(
HandlerException.RetryBehavior retryBehavior) {
switch (retryBehavior) {
case RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
case NON_RETRYABLE:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
default:
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
}
}

private void cancelOperation(OperationContext context, OperationCancelDetails details) {
try {
serviceHandler.cancelOperation(context, details);
Expand Down Expand Up @@ -215,6 +193,7 @@ private void convertKnownFailures(Throwable e) {
if (((ApplicationFailure) failure).isNonRetryable()) {
throw new HandlerException(
HandlerException.ErrorType.INTERNAL,
"Handler failed with non-retryable application error",
failure,
HandlerException.RetryBehavior.NON_RETRYABLE);
}
Expand Down Expand Up @@ -346,11 +325,21 @@ private StartOperationResponse handleStartOperation(
convertKnownFailures(failure);
}
} catch (OperationException e) {
startResponseBuilder.setOperationError(
UnsuccessfulOperationError.newBuilder()
.setOperationState(e.getState().toString().toLowerCase())
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
.build());
TemporalFailure temporalFailure;
if (e.getState() == OperationState.FAILED) {
temporalFailure =
ApplicationFailure.newFailureWithCause(e.getMessage(), "OperationError", e.getCause());
temporalFailure.setStackTrace(e.getStackTrace());
} else if (e.getState() == OperationState.CANCELED) {
temporalFailure =
new CanceledFailure(e.getMessage(), new EncodedValues(null), e.getCause());
temporalFailure.setStackTrace(e.getStackTrace());
} else {
throw new HandlerException(
HandlerException.ErrorType.INTERNAL,
new RuntimeException("Unknown operation state: " + e.getState()));
}
startResponseBuilder.setFailure(dataConverter.exceptionToFailure(temporalFailure));
}
return startResponseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.temporal.internal.nexus;

import io.nexusrpc.OperationException;
import io.nexusrpc.OperationInfo;
import io.nexusrpc.handler.*;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkerInterceptor;
Expand Down Expand Up @@ -51,20 +50,6 @@ public OperationStartResult<Object> start(
.getResult();
}

@Override
public Object fetchResult(
OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails)
throws OperationException {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public OperationInfo fetchInfo(
OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails)
throws HandlerException {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void cancel(
OperationContext operationContext, OperationCancelDetails operationCancelDetails) {
Expand Down
Loading