diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index 4c5ec49b12..0886802de9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -22,14 +22,11 @@ import io.temporal.internal.nexus.OperationTokenUtil; import java.util.*; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Utility functions shared by the implementation code. */ public final class InternalUtils { public static String TEMPORAL_RESERVED_PREFIX = "__temporal_"; - private static final Logger log = LoggerFactory.getLogger(InternalUtils.class); private static String QUERY_TYPE_STACK_TRACE = "__stack_trace"; private static String ENHANCED_QUERY_TYPE_STACK_TRACE = "__enhanced_stack_trace"; @@ -93,21 +90,12 @@ public static NexusWorkflowStarter createNexusBoundStub( ? null : request.getLinks().stream() .map( - (link) -> { - if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor() - .getFullName() - .equals(link.getType())) { - io.temporal.api.nexus.v1.Link nexusLink = + (link) -> + LinkConverter.nexusLinkToLink( io.temporal.api.nexus.v1.Link.newBuilder() .setType(link.getType()) .setUrl(link.getUri().toString()) - .build(); - return LinkConverter.nexusLinkToWorkflowEvent(nexusLink); - } else { - log.warn("ignoring unsupported link data type: {}", link.getType()); - return null; - } - }) + .build())) .filter(Objects::nonNull) .collect(Collectors.toList()); WorkflowOptions.Builder nexusWorkflowOptions = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java b/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java index 6d270eec63..1eef63af25 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java @@ -20,6 +20,8 @@ public class LinkConverter { private static final Logger log = LoggerFactory.getLogger(LinkConverter.class); private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history"; + private static final String nexusOperationLinkPathFormat = + "temporal:///namespaces/%s/nexus-operations/%s/%s/details"; private static final String linkReferenceTypeKey = "referenceType"; private static final String linkEventIDKey = "eventID"; private static final String linkEventTypeKey = "eventType"; @@ -29,6 +31,10 @@ public class LinkConverter { Link.WorkflowEvent.EventReference.getDescriptor().getName(); private static final String requestIDReferenceType = Link.WorkflowEvent.RequestIdReference.getDescriptor().getName(); + private static final String workflowEventLinkType = + Link.WorkflowEvent.getDescriptor().getFullName(); + private static final String nexusOperationLinkType = + Link.NexusOperation.getDescriptor().getFullName(); public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) { try { @@ -160,6 +166,107 @@ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL return link.build(); } + /** + * Dispatches on the oneof variant of {@code commonLink} and converts to the matching {@link + * io.temporal.api.nexus.v1.Link}. Returns {@code null} if no variant is set or encoding fails. + */ + public static io.temporal.api.nexus.v1.Link linkToNexusLink(Link commonLink) { + if (commonLink.hasWorkflowEvent()) { + return workflowEventToNexusLink(commonLink.getWorkflowEvent()); + } + if (commonLink.hasNexusOperation()) { + return nexusOperationToNexusLink(commonLink.getNexusOperation()); + } + return null; + } + + /** + * Dispatches on {@link io.temporal.api.nexus.v1.Link#getType()} and converts to the matching + * {@link Link} variant. Returns {@code null} for unknown or unparseable types. + */ + public static Link nexusLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) { + String type = nexusLink.getType(); + if (workflowEventLinkType.equals(type)) { + return nexusLinkToWorkflowEvent(nexusLink); + } + if (nexusOperationLinkType.equals(type)) { + return nexusLinkToNexusOperation(nexusLink); + } + log.warn("ignoring unsupported nexus link type: {}", type); + return null; + } + + public static io.temporal.api.nexus.v1.Link nexusOperationToNexusLink(Link.NexusOperation no) { + try { + String url = + String.format( + nexusOperationLinkPathFormat, + URLEncoder.encode(no.getNamespace(), StandardCharsets.UTF_8.toString()), + // See the WorkflowId comment in workflowEventToNexusLink for why '+' is rewritten to + // '%20'. OperationId is user-supplied and can legally contain spaces. + URLEncoder.encode(no.getOperationId(), StandardCharsets.UTF_8.toString()) + .replace("+", "%20"), + URLEncoder.encode(no.getRunId(), StandardCharsets.UTF_8.toString())); + return io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl(url) + .setType(nexusOperationLinkType) + .build(); + } catch (Exception e) { + log.error("Failed to encode Nexus operation link URL", e); + } + return null; + } + + public static Link nexusLinkToNexusOperation(io.temporal.api.nexus.v1.Link nexusLink) { + if (!nexusOperationLinkType.equals(nexusLink.getType())) { + log.error( + "Failed to parse Nexus link URL: cannot parse link type {} to {}", + nexusLink.getType(), + nexusOperationLinkType); + return null; + } + Link.Builder link = Link.newBuilder(); + try { + URI uri = new URI(nexusLink.getUrl()); + + if (!"temporal".equals(uri.getScheme())) { + log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme()); + return null; + } + + StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/"); + if (!st.hasMoreTokens() || !st.nextToken().equals("namespaces")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens() || !st.nextToken().equals("nexus-operations")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String operationId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens()) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String runId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens() || !st.nextToken().equals("details")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + + link.setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace(namespace) + .setOperationId(operationId) + .setRunId(runId)); + } catch (Exception e) { + log.error("Failed to parse Nexus link URL", e); + return null; + } + return link.build(); + } + private static Map parseQueryParams(URI uri) throws UnsupportedEncodingException { final String query = uri.getQuery(); if (query == null || query.isEmpty()) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java index 5cd24018f9..de1da6c7a0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java @@ -1,5 +1,6 @@ package io.temporal.internal.nexus; +import static io.temporal.internal.common.LinkConverter.linkToNexusLink; import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; @@ -50,12 +51,10 @@ public static NexusStartWorkflowResponse startWorkflowAndAttachLinks( // If the start workflow response returned a link use it, otherwise // create the link information about the new workflow and return to the caller. - Link.WorkflowEvent workflowEventLink = - nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() - ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() - : null; - if (workflowEventLink == null) { - workflowEventLink = + io.temporal.api.nexus.v1.Link nexusLink = + linkToNexusLink(nexusCtx.getStartWorkflowResponseLink()); + if (nexusLink == null) { + Link.WorkflowEvent synthesized = Link.WorkflowEvent.newBuilder() .setNamespace(nexusCtx.getNamespace()) .setWorkflowId(workflowExec.getWorkflowId()) @@ -64,8 +63,8 @@ public static NexusStartWorkflowResponse startWorkflowAndAttachLinks( Link.WorkflowEvent.EventReference.newBuilder() .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) .build(); + nexusLink = workflowEventToNexusLink(synthesized); } - io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); if (nexusLink != null) { try { ctx.addLinks(nexusProtoLinkToLink(nexusLink)); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 0fac5263a9..adaecca330 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -301,20 +301,13 @@ private StartOperationResponse handleStartOperation( "Invalid link URL: " + link.getUrl(), e); } - // LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of - // other shapes (e.g. non-temporal URLs) come back null and are intentionally not - // forwarded onto the RPCs the handler issues, which require the WorkflowEvent - // variant. Log so a debugging session can see what was dropped. - io.temporal.api.common.v1.Link commonLink = - LinkConverter.nexusLinkToWorkflowEvent(link); + // Convert inbound Nexus links into common.v1.Link so RPCs issued by the handler + // (e.g. signal, signalWithStart) can attach them as request links. Both + // WorkflowEvent (caller workflow → nexus op scheduled) and NexusOperation (SANO + // record) variants flow through; other shapes are dropped. + io.temporal.api.common.v1.Link commonLink = LinkConverter.nexusLinkToLink(link); if (commonLink != null) { inboundCommonLinks.add(commonLink); - } else { - log.warn( - "Dropping inbound Nexus link from outbound link propagation: type='{}'," - + " url='{}' (not a parseable temporal WorkflowEvent link)", - link.getType(), - link.getUrl()); } }); CurrentNexusOperationContext.get().setRequestLinks(inboundCommonLinks); @@ -335,11 +328,7 @@ private StartOperationResponse handleStartOperation( List responseLinks = new ArrayList<>(); for (io.temporal.api.common.v1.Link responseLink : CurrentNexusOperationContext.get().getResponseLinks()) { - if (!responseLink.hasWorkflowEvent()) { - continue; - } - io.temporal.api.nexus.v1.Link converted = - LinkConverter.workflowEventToNexusLink(responseLink.getWorkflowEvent()); + io.temporal.api.nexus.v1.Link converted = LinkConverter.linkToNexusLink(responseLink); if (converted != null) { responseLinks.add(converted); } diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusClientCancelTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusBackingWorkflowTest.java similarity index 50% rename from temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusClientCancelTest.java rename to temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusBackingWorkflowTest.java index ec95b8a47c..53992f387c 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusClientCancelTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusBackingWorkflowTest.java @@ -7,7 +7,14 @@ import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.Callback; +import io.temporal.api.common.v1.Link; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.NexusOperationExecutionDescription; import io.temporal.client.StartNexusOperationOptions; import io.temporal.client.UntypedNexusOperationHandle; import io.temporal.client.UntypedNexusServiceClient; @@ -30,13 +37,12 @@ import org.junit.Test; /** - * Verifies that {@link UntypedNexusOperationHandle#cancel()} from a standalone client propagates - * through the server to the handler workflow backing a Nexus operation. Mirrors {@code - * sdk-go/test/nexus_test.go TestNexusWorkflowRunOperation}: start a Nexus operation backed by a - * workflow that awaits forever, cancel via the standalone client handle, then assert the backing - * workflow ends with {@link CanceledFailure}. + * Behavior tests for standalone Nexus operations whose handler is {@link WorkflowRunOperation}, + * i.e. each SANO is backed by a workflow. Shares one fixture (a workflow that awaits forever) so + * individual tests can exercise cancel propagation, bidirectional link plumbing, and any other + * behavior that depends on the SANO ↔ backing-workflow relationship. */ -public class StandaloneNexusClientCancelTest { +public class StandaloneNexusBackingWorkflowTest { static final AtomicReference capturedWorkflowId = new AtomicReference<>(); @@ -90,6 +96,95 @@ public void cancelPropagatesToBackingWorkflow() throws Exception { } } + /** + * Verifies bidirectional linking between the standalone Nexus operation and the backing workflow + * it starts. + * + *
    + *
  • Forward: a {@code Link.NexusOperation} pointing at the SANO record is attached to the + * backing workflow's WorkflowExecutionStarted completion callback (visible on {@code + * attrs.getCompletionCallbacks(i).getLinks(j)}). + *
  • Backward: a {@code Link.WorkflowEvent} pointing at the backing workflow's + * WorkflowExecutionStarted event is stored on the SANO record's {@code + * NexusOperationExecutionInfo.links} (visible via {@code handle.describe()}). + *
+ */ + @Test + public void linkForwardedToBackingWorkflowCallback() throws Exception { + Endpoint endpoint = testWorkflowRule.getNexusEndpoint(); + UntypedNexusServiceClient svc = + testWorkflowRule + .getNexusClient() + .newUntypedNexusServiceClient( + endpoint.getSpec().getName(), CancelTargetNexusService.class.getSimpleName()); + + String operationId = UUID.randomUUID().toString(); + UntypedNexusOperationHandle handle = + svc.start( + "operation", + StartNexusOperationOptions.newBuilder() + .setId(operationId) + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build(), + "ignored"); + String operationRunId = handle.getNexusOperationRunId(); + Assert.assertNotNull("expected SANO run id to be populated by start", operationRunId); + + String workflowId = waitForWorkflowIdCaptured(Duration.ofSeconds(8)); + + try { + History history = testWorkflowRule.getWorkflowClient().fetchHistory(workflowId).getHistory(); + HistoryEvent startedEvent = history.getEventsList().get(0); + WorkflowExecutionStartedEventAttributes attrs = + startedEvent.getWorkflowExecutionStartedEventAttributes(); + + Link.NexusOperation found = null; + for (Callback cb : attrs.getCompletionCallbacksList()) { + for (Link link : cb.getLinksList()) { + if (link.hasNexusOperation()) { + found = link.getNexusOperation(); + break; + } + } + if (found != null) { + break; + } + } + Assert.assertNotNull( + "expected Link.NexusOperation on a completion callback of the backing workflow", found); + Assert.assertEquals( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace(), found.getNamespace()); + Assert.assertEquals(operationId, found.getOperationId()); + Assert.assertEquals(operationRunId, found.getRunId()); + + // Backward direction: SANO record's info.links should carry a Link.WorkflowEvent referencing + // the backing workflow's WorkflowExecutionStarted event. + NexusOperationExecutionDescription desc = handle.describe(); + Link.WorkflowEvent backLink = null; + for (Link link : desc.getRawInfo().getLinksList()) { + if (link.hasWorkflowEvent() && workflowId.equals(link.getWorkflowEvent().getWorkflowId())) { + backLink = link.getWorkflowEvent(); + break; + } + } + Assert.assertNotNull( + "expected Link.WorkflowEvent on the SANO record's info.links pointing at the backing" + + " workflow", + backLink); + Assert.assertEquals( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace(), + backLink.getNamespace()); + EventType backLinkEventType = + backLink.hasRequestIdRef() + ? backLink.getRequestIdRef().getEventType() + : backLink.getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, backLinkEventType); + } finally { + // Workflow awaits forever; cancel so the test rule shuts down cleanly. + handle.cancel("link-test-cleanup"); + } + } + private static String waitForWorkflowIdCaptured(Duration budget) throws InterruptedException { long deadlineNanos = System.nanoTime() + budget.toNanos(); while (capturedWorkflowId.get() == null && System.nanoTime() < deadlineNanos) { diff --git a/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusSignalLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusSignalLinkingTest.java new file mode 100644 index 0000000000..2864ed2ca7 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/nexus/StandaloneNexusSignalLinkingTest.java @@ -0,0 +1,210 @@ +package io.temporal.client.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.Link; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.client.BatchRequest; +import io.temporal.client.NexusOperationExecutionDescription; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.client.UntypedNexusOperationHandle; +import io.temporal.client.UntypedNexusServiceClient; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies bidirectional link propagation when a Nexus operation handler signal-with-starts a + * workflow and the operation itself was kicked off through a standalone Nexus client (SANO). + * + *
    + *
  • Forward: the inbound Nexus task carries a {@code Link.NexusOperation} pointing at the SANO + * record. The handler forwards it onto the signal-with-start request so the callee's {@code + * WorkflowExecutionSignaled} (and {@code WorkflowExecutionStarted}) events carry that + * backlink. + *
  • Backward: the server returns a {@code signal_link} on {@code + * SignalWithStartWorkflowExecutionResponse} pointing at the callee's signal event. The + * handler drains it onto the {@code StartOperationResponse.links}, and the server stores it + * on the SANO record's {@code NexusOperationExecutionInfo.links}. + *
+ * + *

Requires a real server: standalone Nexus operations and {@code EnableCHASMSignalBacklinks} are + * not implemented by the in-memory test server. Test skips locally and runs in CI. + */ +public class StandaloneNexusSignalLinkingTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(SanoSignalCalleeWorkflowImpl.class) + .setNexusServiceImplementation(new SanoSignalingNexusServiceImpl()) + .build(); + + @Before + public void requireStandaloneNexusSupport() { + assumeTrue( + "server does not support standalone Nexus operations and signal backlinks", + testWorkflowRule.isUseExternalService()); + } + + @Test + public void linksFlowBothDirectionsForSanoTriggeredSignal() throws Exception { + Endpoint endpoint = testWorkflowRule.getNexusEndpoint(); + UntypedNexusServiceClient svc = + testWorkflowRule + .getNexusClient() + .newUntypedNexusServiceClient( + endpoint.getSpec().getName(), SanoSignalingNexusService.class.getSimpleName()); + + String operationId = UUID.randomUUID().toString(); + String calleeWorkflowId = "sano-signal-callee-" + UUID.randomUUID(); + UntypedNexusOperationHandle handle = + svc.start( + "operation", + StartNexusOperationOptions.newBuilder() + .setId(operationId) + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build(), + calleeWorkflowId); + String operationRunId = handle.getNexusOperationRunId(); + Assert.assertNotNull("expected SANO run id to be populated by start", operationRunId); + + String result = handle.getResult(30, TimeUnit.SECONDS, String.class); + Assert.assertEquals("signaled", result); + + // The callee was signal-with-started by the handler and exits once it sees one signal. + String calleeResult = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(calleeWorkflowId) + .getResult(String.class); + Assert.assertEquals("from-sano", calleeResult); + + // Forward direction: callee's WorkflowExecutionSignaled event carries a Link.NexusOperation + // pointing at the SANO record. The same backlink also lands on WorkflowExecutionStarted via + // the SWS request's links field; assert on the signal event because that's the one the SANO + // backlink is principally for. + History calleeHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(calleeWorkflowId).getHistory(); + HistoryEvent signaledEvent = + findEventOfType(calleeHistory, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Assert.assertNotNull("expected a WorkflowExecutionSignaled event on the callee", signaledEvent); + Link.NexusOperation forwardLink = null; + for (Link link : signaledEvent.getLinksList()) { + if (link.hasNexusOperation()) { + forwardLink = link.getNexusOperation(); + break; + } + } + Assert.assertNotNull( + "expected Link.NexusOperation on callee's WorkflowExecutionSignaled event", forwardLink); + Assert.assertEquals( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace(), + forwardLink.getNamespace()); + Assert.assertEquals(operationId, forwardLink.getOperationId()); + Assert.assertEquals(operationRunId, forwardLink.getRunId()); + + // Backward direction: the signal_link returned on the SWS response is drained onto the + // StartOperationResponse.links by the handler, and the server stores it on the SANO record. + // Read via describe → NexusOperationExecutionInfo.links. + NexusOperationExecutionDescription desc = handle.describe(); + Link backwardLink = null; + for (Link link : desc.getRawInfo().getLinksList()) { + if (link.hasWorkflowEvent() + && calleeWorkflowId.equals(link.getWorkflowEvent().getWorkflowId())) { + backwardLink = link; + break; + } + } + Assert.assertNotNull( + "expected the signal response link on the SANO record's info.links", backwardLink); + EventType backwardLinkEventType = + backwardLink.getWorkflowEvent().hasRequestIdRef() + ? backwardLink.getWorkflowEvent().getRequestIdRef().getEventType() + : backwardLink.getWorkflowEvent().getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, backwardLinkEventType); + } + + private static HistoryEvent findEventOfType(History history, EventType type) { + for (HistoryEvent e : history.getEventsList()) { + if (e.getEventType() == type) { + return e; + } + } + return null; + } + + @Service + public interface SanoSignalingNexusService { + @Operation + String operation(String calleeWorkflowId); + } + + @ServiceImpl(service = SanoSignalingNexusService.class) + public static class SanoSignalingNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (OperationContext ctx, + io.nexusrpc.handler.OperationStartDetails details, + String input) -> { + WorkflowClient client = Nexus.getOperationContext().getWorkflowClient(); + String tq = Nexus.getOperationContext().getInfo().getTaskQueue(); + SanoSignalCalleeWorkflow startStub = + client.newWorkflowStub( + SanoSignalCalleeWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(input).setTaskQueue(tq).build()); + BatchRequest batch = client.newSignalWithStartRequest(); + batch.add(startStub::execute); + batch.add(startStub::ping, "from-sano"); + client.signalWithStart(batch); + return "signaled"; + }); + } + } + + @WorkflowInterface + public interface SanoSignalCalleeWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void ping(String msg); + } + + public static class SanoSignalCalleeWorkflowImpl implements SanoSignalCalleeWorkflow { + private String received; + + @Override + public String execute() { + Workflow.await(() -> received != null); + return received; + } + + @Override + public void ping(String msg) { + received = msg; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java index 60b67b1b81..9024cbff6f 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java @@ -1,6 +1,10 @@ package io.temporal.internal.common; +import static io.temporal.internal.common.LinkConverter.linkToNexusLink; +import static io.temporal.internal.common.LinkConverter.nexusLinkToLink; +import static io.temporal.internal.common.LinkConverter.nexusLinkToNexusOperation; import static io.temporal.internal.common.LinkConverter.nexusLinkToWorkflowEvent; +import static io.temporal.internal.common.LinkConverter.nexusOperationToNexusLink; import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static org.junit.Assert.*; @@ -352,4 +356,202 @@ public void testConvertNexusToWorkflowEvent_InvalidEventType() { assertNull(nexusLinkToWorkflowEvent(input)); } + + @Test + public void testConvertNexusOperationToNexus_Valid() { + Link.NexusOperation input = + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id") + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertEquals(expected, nexusOperationToNexusLink(input)); + } + + @Test + public void testConvertNexusOperationToNexus_ValidSlash() { + Link.NexusOperation input = + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op/id") + .setRunId("run-id") + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op%2Fid/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertEquals(expected, nexusOperationToNexusLink(input)); + } + + @Test + public void testConvertNexusToNexusOperation_Valid() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + Link expected = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id")) + .build(); + + assertEquals(expected, nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_ValidSlash() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op%2Fid/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + Link expected = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op/id") + .setRunId("run-id")) + .build(); + + assertEquals(expected, nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_WrongType() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_InvalidScheme() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("random:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_InvalidPathMissingDetails() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + @Test + public void testNexusLinkToLink_WorkflowEventRoundTrip() { + Link.WorkflowEvent we = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(we); + assertEquals("temporal.api.common.v1.Link.WorkflowEvent", nexusLink.getType()); + + Link converted = nexusLinkToLink(nexusLink); + assertNotNull(converted); + assertEquals(Link.newBuilder().setWorkflowEvent(we).build(), converted); + } + + @Test + public void testNexusLinkToLink_NexusOperation() { + io.temporal.api.nexus.v1.Link nexusLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + Link expected = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id")) + .build(); + + assertEquals(expected, nexusLinkToLink(nexusLink)); + } + + @Test + public void testNexusLinkToLink_UnknownType() { + io.temporal.api.nexus.v1.Link nexusLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/workflows/wf-id/run-id/history") + .setType("unknown.type") + .build(); + + assertNull(nexusLinkToLink(nexusLink)); + } + + @Test + public void testLinkToNexusLink_WorkflowEvent() { + Link.WorkflowEvent we = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link actual = + linkToNexusLink(Link.newBuilder().setWorkflowEvent(we).build()); + assertEquals(workflowEventToNexusLink(we), actual); + } + + @Test + public void testLinkToNexusLink_NexusOperation() { + Link.NexusOperation no = + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id") + .build(); + + io.temporal.api.nexus.v1.Link actual = + linkToNexusLink(Link.newBuilder().setNexusOperation(no).build()); + assertEquals(nexusOperationToNexusLink(no), actual); + } + + @Test + public void testLinkToNexusLink_Empty() { + assertNull(linkToNexusLink(Link.newBuilder().build())); + } }