Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import io.temporal.internal.nexus.OperationTokenUtil;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility functions shared by the implementation code. */
public final class InternalUtils {
public static String TEMPORAL_RESERVED_PREFIX = "__temporal_";

private static final Logger log = LoggerFactory.getLogger(InternalUtils.class);
private static String QUERY_TYPE_STACK_TRACE = "__stack_trace";
private static String ENHANCED_QUERY_TYPE_STACK_TRACE = "__enhanced_stack_trace";

Expand Down Expand Up @@ -93,21 +90,12 @@ public static NexusWorkflowStarter createNexusBoundStub(
? null
: request.getLinks().stream()
.map(
(link) -> {
if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor()
.getFullName()
.equals(link.getType())) {
io.temporal.api.nexus.v1.Link nexusLink =
(link) ->
LinkConverter.nexusLinkToLink(
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build();
return LinkConverter.nexusLinkToWorkflowEvent(nexusLink);
} else {
log.warn("ignoring unsupported link data type: {}", link.getType());
return null;
}
})
.build()))
.filter(Objects::nonNull)
.collect(Collectors.toList());
WorkflowOptions.Builder nexusWorkflowOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class LinkConverter {
private static final Logger log = LoggerFactory.getLogger(LinkConverter.class);

private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";
private static final String nexusOperationLinkPathFormat =
"temporal:///namespaces/%s/nexus-operations/%s/%s/details";
private static final String linkReferenceTypeKey = "referenceType";
private static final String linkEventIDKey = "eventID";
private static final String linkEventTypeKey = "eventType";
Expand All @@ -29,6 +31,10 @@ public class LinkConverter {
Link.WorkflowEvent.EventReference.getDescriptor().getName();
private static final String requestIDReferenceType =
Link.WorkflowEvent.RequestIdReference.getDescriptor().getName();
private static final String workflowEventLinkType =
Link.WorkflowEvent.getDescriptor().getFullName();
private static final String nexusOperationLinkType =
Link.NexusOperation.getDescriptor().getFullName();

public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
try {
Expand Down Expand Up @@ -160,6 +166,107 @@ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL
return link.build();
}

/**
* Dispatches on the oneof variant of {@code commonLink} and converts to the matching {@link
* io.temporal.api.nexus.v1.Link}. Returns {@code null} if no variant is set or encoding fails.
*/
public static io.temporal.api.nexus.v1.Link linkToNexusLink(Link commonLink) {
if (commonLink.hasWorkflowEvent()) {
return workflowEventToNexusLink(commonLink.getWorkflowEvent());
}
if (commonLink.hasNexusOperation()) {
return nexusOperationToNexusLink(commonLink.getNexusOperation());
}
return null;
}

/**
* Dispatches on {@link io.temporal.api.nexus.v1.Link#getType()} and converts to the matching
* {@link Link} variant. Returns {@code null} for unknown or unparseable types.
*/
public static Link nexusLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) {
String type = nexusLink.getType();
if (workflowEventLinkType.equals(type)) {
return nexusLinkToWorkflowEvent(nexusLink);
}
if (nexusOperationLinkType.equals(type)) {
return nexusLinkToNexusOperation(nexusLink);
}
log.warn("ignoring unsupported nexus link type: {}", type);
return null;
Comment thread
cursor[bot] marked this conversation as resolved.
}

public static io.temporal.api.nexus.v1.Link nexusOperationToNexusLink(Link.NexusOperation no) {
try {
String url =
String.format(
nexusOperationLinkPathFormat,
URLEncoder.encode(no.getNamespace(), StandardCharsets.UTF_8.toString()),
// See the WorkflowId comment in workflowEventToNexusLink for why '+' is rewritten to
// '%20'. OperationId is user-supplied and can legally contain spaces.
URLEncoder.encode(no.getOperationId(), StandardCharsets.UTF_8.toString())
.replace("+", "%20"),
URLEncoder.encode(no.getRunId(), StandardCharsets.UTF_8.toString()));
return io.temporal.api.nexus.v1.Link.newBuilder()
.setUrl(url)
.setType(nexusOperationLinkType)
.build();
} catch (Exception e) {
log.error("Failed to encode Nexus operation link URL", e);
}
return null;
}

public static Link nexusLinkToNexusOperation(io.temporal.api.nexus.v1.Link nexusLink) {
if (!nexusOperationLinkType.equals(nexusLink.getType())) {
log.error(
"Failed to parse Nexus link URL: cannot parse link type {} to {}",
nexusLink.getType(),
nexusOperationLinkType);
return null;
}
Link.Builder link = Link.newBuilder();
try {
URI uri = new URI(nexusLink.getUrl());

if (!"temporal".equals(uri.getScheme())) {
log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme());
return null;
}

StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/");
if (!st.hasMoreTokens() || !st.nextToken().equals("namespaces")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !st.nextToken().equals("nexus-operations")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String operationId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens()) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String runId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !st.nextToken().equals("details")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}

link.setNexusOperation(
Link.NexusOperation.newBuilder()
.setNamespace(namespace)
.setOperationId(operationId)
.setRunId(runId));
} catch (Exception e) {
log.error("Failed to parse Nexus link URL", e);
return null;
}
return link.build();
}

private static Map<String, String> parseQueryParams(URI uri) throws UnsupportedEncodingException {
final String query = uri.getQuery();
if (query == null || query.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.temporal.internal.nexus;

import static io.temporal.internal.common.LinkConverter.linkToNexusLink;
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

Expand Down Expand Up @@ -50,12 +51,10 @@ public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(

// If the start workflow response returned a link use it, otherwise
// create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
: null;
if (workflowEventLink == null) {
workflowEventLink =
io.temporal.api.nexus.v1.Link nexusLink =
linkToNexusLink(nexusCtx.getStartWorkflowResponseLink());
if (nexusLink == null) {
Link.WorkflowEvent synthesized =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
Expand All @@ -64,8 +63,8 @@ public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
nexusLink = workflowEventToNexusLink(synthesized);
}
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
if (nexusLink != null) {
try {
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,20 +301,13 @@ private StartOperationResponse handleStartOperation(
"Invalid link URL: " + link.getUrl(),
e);
}
// LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of
// other shapes (e.g. non-temporal URLs) come back null and are intentionally not
// forwarded onto the RPCs the handler issues, which require the WorkflowEvent
// variant. Log so a debugging session can see what was dropped.
io.temporal.api.common.v1.Link commonLink =
LinkConverter.nexusLinkToWorkflowEvent(link);
// Convert inbound Nexus links into common.v1.Link so RPCs issued by the handler
// (e.g. signal, signalWithStart) can attach them as request links. Both
// WorkflowEvent (caller workflow → nexus op scheduled) and NexusOperation (SANO
// record) variants flow through; other shapes are dropped.
io.temporal.api.common.v1.Link commonLink = LinkConverter.nexusLinkToLink(link);
if (commonLink != null) {
inboundCommonLinks.add(commonLink);
} else {
log.warn(
"Dropping inbound Nexus link from outbound link propagation: type='{}',"
+ " url='{}' (not a parseable temporal WorkflowEvent link)",
link.getType(),
link.getUrl());
}
});
CurrentNexusOperationContext.get().setRequestLinks(inboundCommonLinks);
Expand All @@ -335,11 +328,7 @@ private StartOperationResponse handleStartOperation(
List<io.temporal.api.nexus.v1.Link> responseLinks = new ArrayList<>();
for (io.temporal.api.common.v1.Link responseLink :
CurrentNexusOperationContext.get().getResponseLinks()) {
if (!responseLink.hasWorkflowEvent()) {
continue;
}
io.temporal.api.nexus.v1.Link converted =
LinkConverter.workflowEventToNexusLink(responseLink.getWorkflowEvent());
io.temporal.api.nexus.v1.Link converted = LinkConverter.linkToNexusLink(responseLink);
if (converted != null) {
responseLinks.add(converted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Link;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.client.NexusOperationExecutionDescription;
import io.temporal.client.StartNexusOperationOptions;
import io.temporal.client.UntypedNexusOperationHandle;
import io.temporal.client.UntypedNexusServiceClient;
Expand All @@ -30,13 +37,12 @@
import org.junit.Test;

/**
* Verifies that {@link UntypedNexusOperationHandle#cancel()} from a standalone client propagates
* through the server to the handler workflow backing a Nexus operation. Mirrors {@code
* sdk-go/test/nexus_test.go TestNexusWorkflowRunOperation}: start a Nexus operation backed by a
* workflow that awaits forever, cancel via the standalone client handle, then assert the backing
* workflow ends with {@link CanceledFailure}.
* Behavior tests for standalone Nexus operations whose handler is {@link WorkflowRunOperation},
* i.e. each SANO is backed by a workflow. Shares one fixture (a workflow that awaits forever) so
* individual tests can exercise cancel propagation, bidirectional link plumbing, and any other
* behavior that depends on the SANO ↔ backing-workflow relationship.
*/
public class StandaloneNexusClientCancelTest {
public class StandaloneNexusBackingWorkflowTest {

static final AtomicReference<String> capturedWorkflowId = new AtomicReference<>();

Expand Down Expand Up @@ -90,6 +96,95 @@ public void cancelPropagatesToBackingWorkflow() throws Exception {
}
}

/**
* Verifies bidirectional linking between the standalone Nexus operation and the backing workflow
* it starts.
*
* <ul>
* <li>Forward: a {@code Link.NexusOperation} pointing at the SANO record is attached to the
* backing workflow's WorkflowExecutionStarted completion callback (visible on {@code
* attrs.getCompletionCallbacks(i).getLinks(j)}).
* <li>Backward: a {@code Link.WorkflowEvent} pointing at the backing workflow's
* WorkflowExecutionStarted event is stored on the SANO record's {@code
* NexusOperationExecutionInfo.links} (visible via {@code handle.describe()}).
* </ul>
*/
@Test
public void linkForwardedToBackingWorkflowCallback() throws Exception {
Endpoint endpoint = testWorkflowRule.getNexusEndpoint();
UntypedNexusServiceClient svc =
testWorkflowRule
.getNexusClient()
.newUntypedNexusServiceClient(
endpoint.getSpec().getName(), CancelTargetNexusService.class.getSimpleName());

String operationId = UUID.randomUUID().toString();
UntypedNexusOperationHandle handle =
svc.start(
"operation",
StartNexusOperationOptions.newBuilder()
.setId(operationId)
.setScheduleToCloseTimeout(Duration.ofSeconds(30))
.build(),
"ignored");
String operationRunId = handle.getNexusOperationRunId();
Assert.assertNotNull("expected SANO run id to be populated by start", operationRunId);

String workflowId = waitForWorkflowIdCaptured(Duration.ofSeconds(8));

try {
History history = testWorkflowRule.getWorkflowClient().fetchHistory(workflowId).getHistory();
HistoryEvent startedEvent = history.getEventsList().get(0);
WorkflowExecutionStartedEventAttributes attrs =
startedEvent.getWorkflowExecutionStartedEventAttributes();

Link.NexusOperation found = null;
for (Callback cb : attrs.getCompletionCallbacksList()) {
for (Link link : cb.getLinksList()) {
if (link.hasNexusOperation()) {
found = link.getNexusOperation();
break;
}
}
if (found != null) {
break;
}
}
Assert.assertNotNull(
"expected Link.NexusOperation on a completion callback of the backing workflow", found);
Assert.assertEquals(
testWorkflowRule.getWorkflowClient().getOptions().getNamespace(), found.getNamespace());
Assert.assertEquals(operationId, found.getOperationId());
Assert.assertEquals(operationRunId, found.getRunId());

// Backward direction: SANO record's info.links should carry a Link.WorkflowEvent referencing
// the backing workflow's WorkflowExecutionStarted event.
NexusOperationExecutionDescription desc = handle.describe();
Link.WorkflowEvent backLink = null;
for (Link link : desc.getRawInfo().getLinksList()) {
if (link.hasWorkflowEvent() && workflowId.equals(link.getWorkflowEvent().getWorkflowId())) {
backLink = link.getWorkflowEvent();
break;
}
}
Assert.assertNotNull(
"expected Link.WorkflowEvent on the SANO record's info.links pointing at the backing"
+ " workflow",
backLink);
Assert.assertEquals(
testWorkflowRule.getWorkflowClient().getOptions().getNamespace(),
backLink.getNamespace());
EventType backLinkEventType =
backLink.hasRequestIdRef()
? backLink.getRequestIdRef().getEventType()
: backLink.getEventRef().getEventType();
Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, backLinkEventType);
} finally {
// Workflow awaits forever; cancel so the test rule shuts down cleanly.
handle.cancel("link-test-cleanup");
}
}

private static String waitForWorkflowIdCaptured(Duration budget) throws InterruptedException {
long deadlineNanos = System.nanoTime() + budget.toNanos();
while (capturedWorkflowId.get() == null && System.nanoTime() < deadlineNanos) {
Expand Down
Loading
Loading