From 90d436738e44cb1058ca365bb014de8671908112 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 30 Mar 2026 12:25:40 -0400 Subject: [PATCH 1/7] Fix workflow test Signed-off-by: Ricardo Zanini --- .../FuncEventFilterPropertiesBuilder.java | 15 +- experimental/lambda/pom.xml | 2 +- .../executors/func/EventFilteringTest.java | 166 ++++++++++++++++++ .../func/TraceExecutionListener.java | 130 ++++++++++++++ experimental/pom.xml | 2 +- .../events/DefaultCloudEventPredicate.java | 33 +++- impl/json-utils/pom.xml | 56 +++--- .../impl/jackson/JacksonCloudEventUtils.java | 56 +++--- .../impl/jackson/JsonUtils.java | 43 +++-- .../jackson/ObjectMapperFactoryProvider.java | 9 +- .../jackson/JacksonCloudEventUtilsTest.java | 105 +++++++++++ .../impl/jackson/JsonUtilsStaticInitTest.java | 60 +++++++ 12 files changed, 602 insertions(+), 75 deletions(-) create mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java create mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java create mode 100644 impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java create mode 100644 impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java index 1c322bc30..30017d891 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java @@ -20,6 +20,9 @@ import io.serverlessworkflow.api.types.func.ContextPredicate; import io.serverlessworkflow.api.types.func.EventDataPredicate; import io.serverlessworkflow.api.types.func.FilterPredicate; +import io.serverlessworkflow.api.types.func.TypedContextPredicate; +import io.serverlessworkflow.api.types.func.TypedFilterPredicate; +import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.util.function.Predicate; @@ -50,20 +53,20 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate pre } public FuncEventFilterPropertiesBuilder envelope(Predicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + "envelopePredicate", new TypedPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(ContextPredicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + "envelopePredicate", new TypedContextPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(FilterPredicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + "envelopePredicate", new TypedFilterPredicate<>(predicate, CloudEvent.class)); return this; } } diff --git a/experimental/lambda/pom.xml b/experimental/lambda/pom.xml index 50c054dad..f298a332a 100644 --- a/experimental/lambda/pom.xml +++ b/experimental/lambda/pom.xml @@ -21,7 +21,7 @@ io.serverlessworkflow serverlessworkflow-impl-core - + io.serverlessworkflow serverlessworkflow-impl-jq diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java new file mode 100644 index 000000000..b447254e8 --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.Test; + +public class EventFilteringTest { + + // --- Mock Domain Models --- + public record NewsletterRequest(String topic) {} + + public record NewsletterDraft(String title, String body) {} + + public record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // --- Mock Service Methods (replacing Quarkus Agents) --- + public NewsletterDraft writeDraft(NewsletterRequest req) { + return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); + } + + public NewsletterDraft editDraft(HumanReview review) { + return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); + } + + public void sendEmail(NewsletterDraft draft) { + // Simulates MailService.send + } + + @Test + public void testIntelligentNewsletterApprovalPath() throws Exception { + try (WorkflowApplication app = + WorkflowApplication.builder().withListener(new TraceExecutionListener()).build()) { + + // 1. Build the workflow using your exact DSL + Workflow workflow = + FuncWorkflowBuilder.workflow("intelligent-newsletter") + .tasks( + function("draftAgent", this::writeDraft).exportAsTaskOutput(), + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), + listen( + "waitHumanReview", + to().one( + consumed("org.acme.newsletter.review.done") + .extensionByInstanceId("instanceid"))) + .outputAs( + (ArrayList events) -> { + try { + return MAPPER.readValue( + ((CloudEventData) events.iterator().next()).toBytes(), + HumanReview.class); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize HumanReview", e); + } + }), + switchWhenOrElse( + h -> HumanReview.NEEDS_REVISION.equals(h.status()), + "humanEditorAgent", + "sendNewsletter", + HumanReview.class), + function("humanEditorAgent", this::editDraft) + .exportAsTaskOutput() + .then("draftReady"), + consume("sendNewsletter", this::sendEmail) + .inputFrom( + (payload, wfc, tfc) -> + payload instanceof HumanReview ? wfc.context() : payload)) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); + CompletableFuture future = instance.start(); + + // Wait for it to hit the listen state + Thread.sleep(250); + assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + + // --- THE NEGATIVE TEST: Fire an event with the WRONG instance id --- + CloudEvent maliciousEvent = + CloudEventBuilder.v1() + .withId("event-wrong") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", "SOME-OTHER-ID-12345") // Does not match instance.id() + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Malicious approval\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + publisher.publish(maliciousEvent).toCompletableFuture().join(); + + // Give the engine a moment to process and discard the event + Thread.sleep(250); + + // Assert that the workflow completely ignored it and is STILL waiting + assertThat(future.isDone()).isFalse(); + assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + + // --- THE POSITIVE TEST: Fire the CORRECT event --- + CloudEvent humanReviewEvent = + CloudEventBuilder.v1() + .withId("event-123") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", instance.id()) // Matches exactly + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + publisher.publish(humanReviewEvent).toCompletableFuture().join(); + + // Wait for the workflow to resume and finish + future.join(); + + // Assert successful completion + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + } +} diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java new file mode 100644 index 000000000..af5d2e7cc --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TraceExecutionListener implements WorkflowExecutionListener { + + private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class); + + @Override + public void onWorkflowStarted(WorkflowStartedEvent ev) { + logger.info( + "Workflow definition {} with id {} started at {} with data {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate(), + ev.workflowContext().instanceData().input()); + } + + @Override + public void onWorkflowResumed(WorkflowResumedEvent ev) { + logger.info( + "Workflow definition {} with id {} resumed at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate()); + } + + @Override + public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { + logger.info( + "Workflow definition {} with id {} suspended at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate()); + } + + @Override + public void onWorkflowCompleted(WorkflowCompletedEvent ev) { + logger.info( + "Workflow definition {} with id {} completed at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate()); + } + + @Override + public void onWorkflowFailed(WorkflowFailedEvent ev) { + logger.info( + "Workflow definition {} with id {} failed at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate(), + ev.cause()); + } + + @Override + public void onTaskStarted(TaskStartedEvent ev) { + logger.info( + "Task {} started at {}, position {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().position()); + } + + @Override + public void onTaskCompleted(TaskCompletedEvent ev) { + logger.info( + "Task {} completed at {} with output {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().output().asJavaObject()); + } + + @Override + public void onTaskFailed(TaskFailedEvent ev) { + logger.info( + "Task {} failed at {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().output().asJavaObject(), + ev.cause()); + } + + @Override + public void onTaskRetried(TaskRetriedEvent ev) { + logger.info( + "Task {} retried at {}, position {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().position()); + } + + @Override + public void onWorkflowStatusChanged(WorkflowStatusEvent ev) { + logger.info( + "Workflow definition {} with id {} changed status from {} to {} at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.previousStatus(), + ev.status(), + ev.eventDate()); + } +} diff --git a/experimental/pom.xml b/experimental/pom.xml index 9882957d1..54dd0e206 100644 --- a/experimental/pom.xml +++ b/experimental/pom.xml @@ -15,7 +15,7 @@ serverlessworkflow-impl-core ${project.version} - + io.serverlessworkflow serverlessworkflow-impl-jq ${project.version} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index c58f7de05..569ec64fe 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -36,6 +36,12 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { + /** + * {@link EventProperties#getAdditionalProperties()} property for custom {@link CloudEvent} {@link + * java.util.function.Predicate} evaluation. + */ + public static final String ENVELOPE_PREDICATE = "envelopePredicate"; + private final CloudEventAttrPredicate idFilter; private final CloudEventAttrPredicate sourceFilter; private final CloudEventAttrPredicate subjectFilter; @@ -44,9 +50,10 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { private final CloudEventAttrPredicate dataSchemaFilter; private final CloudEventAttrPredicate timeFilter; private final CloudEventAttrPredicate dataFilter; + private final CloudEventAttrPredicate envelopeFilter; private final CloudEventAttrPredicate> additionalFilter; - private static final CloudEventAttrPredicate isTrue() { + private static CloudEventAttrPredicate isTrue() { return (x, w, t) -> true; } @@ -59,9 +66,23 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), app); timeFilter = offsetTimeFilter(properties.getTime(), app); dataFilter = dataFilter(properties.getData(), app); + envelopeFilter = envelopeFilter(properties, app); additionalFilter = additionalFilter(properties.getAdditionalProperties(), app); } + private CloudEventAttrPredicate envelopeFilter( + EventProperties properties, WorkflowApplication app) { + Object envelopePredObj = null; + if (properties.getAdditionalProperties() != null) + envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE); + return envelopePredObj == null + ? isTrue() + : fromCloudEvent( + app.modelFactory(), + app.expressionFactory() + .buildPredicate(new ExpressionDescriptor(null, envelopePredObj))); + } + private CloudEventAttrPredicate> additionalFilter( Map additionalProperties, WorkflowApplication app) { return additionalProperties != null && !additionalProperties.isEmpty() @@ -72,11 +93,16 @@ private CloudEventAttrPredicate> additionalFilter( : isTrue(); } - private CloudEventAttrPredicate fromCloudEvent( + private CloudEventAttrPredicate fromCloudEventData( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); } + private CloudEventAttrPredicate fromCloudEvent( + WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { + return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e)); + } + private CloudEventAttrPredicate> fromMap( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); @@ -85,7 +111,7 @@ private CloudEventAttrPredicate> fromMap( private CloudEventAttrPredicate dataFilter( EventData data, WorkflowApplication app) { return data != null - ? fromCloudEvent( + ? fromCloudEventData( app.modelFactory(), app.expressionFactory() .buildPredicate( @@ -179,6 +205,7 @@ public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task && typeFilter.test(event.getType(), workflow, task) && dataSchemaFilter.test(event.getDataSchema(), workflow, task) && timeFilter.test(event.getTime(), workflow, task) + && envelopeFilter.test(event, workflow, task) && dataFilter.test(event.getData(), workflow, task) && additionalFilter.test(CloudEventUtils.extensions(event), workflow, task); } diff --git a/impl/json-utils/pom.xml b/impl/json-utils/pom.xml index 57627413b..3829a3842 100644 --- a/impl/json-utils/pom.xml +++ b/impl/json-utils/pom.xml @@ -1,24 +1,36 @@ - - 4.0.0 - - io.serverlessworkflow - serverlessworkflow-impl - 8.0.0-SNAPSHOT - - serverlessworkflow-impl-json - Serverless Workflow :: Impl :: Json utils - - - com.fasterxml.jackson.core - jackson-databind - - + + 4.0.0 + io.serverlessworkflow - serverlessworkflow-impl-core - - - io.cloudevents - cloudevents-json-jackson - - + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-json + Serverless Workflow :: Impl :: Json utils + + + com.fasterxml.jackson.core + jackson-databind + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.cloudevents + cloudevents-json-jackson + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.assertj + assertj-core + test + + \ No newline at end of file diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java index a3591a93c..0ad2409ef 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java @@ -15,12 +15,14 @@ */ package io.serverlessworkflow.impl.jackson; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; +import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.jackson.JsonCloudEventData; +import io.cloudevents.jackson.JsonFormat; import java.io.IOException; import java.io.UncheckedIOException; import java.time.OffsetDateTime; @@ -30,30 +32,17 @@ public class JacksonCloudEventUtils { public static JsonNode toJsonNode(CloudEvent event) { - ObjectNode result = JsonUtils.mapper().createObjectNode(); - if (event.getData() != null) { - result.set("data", toJsonNode(event.getData())); - } - if (event.getSubject() != null) { - result.put("subject", event.getSubject()); - } - if (event.getDataContentType() != null) { - result.put("datacontenttype", event.getDataContentType()); - } - result.put("id", event.getId()); - result.put("source", event.getSource().toString()); - result.put("type", event.getType()); - result.put("specversion", event.getSpecVersion().toString()); - if (event.getDataSchema() != null) { - result.put("dataschema", event.getDataSchema().toString()); + if (event == null) { + return NullNode.instance; } - if (event.getTime() != null) { - result.put("time", event.getTime().toString()); + // Delegate entirely to the official CloudEvents SDK + byte[] serialized = + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).serialize(event); + try { + return JsonUtils.mapper().readTree(serialized); + } catch (IOException e) { + throw new UncheckedIOException(e); } - event - .getExtensionNames() - .forEach(n -> result.set(n, JsonUtils.fromValue(event.getExtension(n)))); - return result; } public static OffsetDateTime toOffset(Date date) { @@ -73,5 +62,26 @@ public static JsonNode toJsonNode(CloudEventData data) { } } + public static CloudEvent toCloudEvent(JsonNode node) { + if (node == null || node.isNull()) { + return null; + } + try { + byte[] ceBytes = JsonUtils.mapper().writeValueAsBytes(node); + return EventFormatProvider.getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(ceBytes); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Failed to deserialize JsonNode to CloudEvent", e); + } + } + + public static CloudEventData toCloudEventData(JsonNode node) { + if (node == null || node.isNull()) { + return null; + } + return JsonCloudEventData.wrap(node); + } + private JacksonCloudEventUtils() {} } diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java index 8ca1724e2..236178d35 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java @@ -31,6 +31,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ShortNode; import com.fasterxml.jackson.databind.node.TextNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.serverlessworkflow.impl.WorkflowModel; import java.io.IOException; import java.math.BigDecimal; @@ -54,14 +56,12 @@ public class JsonUtils { - private static ObjectMapper mapper = ObjectMapperFactoryProvider.instance().get().get(); - public static ObjectMapper mapper() { - return mapper; + return ObjectMapperFactoryProvider.instance().get().get(); } public static Collector arrayNodeCollector() { - return new Collector() { + return new Collector<>() { @Override public BiConsumer accumulator() { return ArrayNode::add; @@ -87,7 +87,7 @@ public Function finisher() { @Override public Supplier supplier() { - return () -> mapper.createArrayNode(); + return () -> mapper().createArrayNode(); } }; } @@ -97,6 +97,7 @@ public Supplier supplier() { * Although we can use directly ObjectMapper.convertValue for implementing fromValue and toJavaValue methods, * the performance gain of avoiding an intermediate buffer is so tempting that we cannot avoid it */ + @SuppressWarnings("unchecked") public static JsonNode fromValue(Object value) { if (value == null) { return NullNode.instance; @@ -128,8 +129,12 @@ public static JsonNode fromValue(Object value) { return mapToNode((Map) value); } else if (value instanceof WorkflowModel model) { return modelToJson(model); + } else if (value instanceof CloudEvent ce) { + return JacksonCloudEventUtils.toJsonNode(ce); + } else if (value instanceof CloudEventData data) { + return JacksonCloudEventUtils.toJsonNode(data); } else { - return mapper.convertValue(value, JsonNode.class); + return mapper().convertValue(value, JsonNode.class); } } @@ -147,7 +152,7 @@ public static JsonNode fromString(String value) { String trimmedValue = value.trim(); if (trimmedValue.startsWith("{") && trimmedValue.endsWith("}")) { try { - return mapper.readTree(trimmedValue); + return mapper().readTree(trimmedValue); } catch (IOException ex) { // ignore and return test node } @@ -163,8 +168,8 @@ private static Object toJavaValue(ObjectNode node) { return result; } - private static Collection toJavaValue(ArrayNode node) { - Collection result = new ArrayList<>(); + private static Collection toJavaValue(ArrayNode node) { + Collection result = new ArrayList<>(); for (JsonNode item : node) { result.add(internalToJavaValue(item, JsonUtils::toJavaValue, JsonUtils::toJavaValue)); } @@ -181,7 +186,7 @@ public static T convertValue(Object obj, Class returnType) { } else if (obj instanceof JsonNode) { return convertValue((JsonNode) obj, returnType); } else { - return mapper.convertValue(obj, returnType); + return mapper().convertValue(obj, returnType); } } @@ -197,8 +202,12 @@ public static T convertValue(JsonNode jsonNode, Class returnType) { obj = jsonNode.asLong(); } else if (String.class.isAssignableFrom(returnType)) { obj = jsonNode.asText(); + } else if (CloudEvent.class.isAssignableFrom(returnType)) { + obj = JacksonCloudEventUtils.toCloudEvent(jsonNode); + } else if (CloudEventData.class.isAssignableFrom(returnType)) { + obj = JacksonCloudEventUtils.toCloudEventData(jsonNode); } else { - obj = mapper.convertValue(jsonNode, returnType); + obj = mapper().convertValue(jsonNode, returnType); } return returnType.cast(obj); } @@ -228,12 +237,12 @@ private static Object internalToJavaValue( } else if (jsonNode.isObject()) { return objectFunction.apply((ObjectNode) jsonNode); } else { - return mapper.convertValue(jsonNode, Object.class); + return mapper().convertValue(jsonNode, Object.class); } } public static String toString(JsonNode node) throws JsonProcessingException { - return mapper.writeValueAsString(node); + return mapper().writeValueAsString(node); } public static void addToNode(String name, Object value, ObjectNode dest) { @@ -241,7 +250,7 @@ public static void addToNode(String name, Object value, ObjectNode dest) { } private static ObjectNode mapToNode(Map value) { - ObjectNode objectNode = mapper.createObjectNode(); + ObjectNode objectNode = mapper().createObjectNode(); for (Map.Entry entry : value.entrySet()) { addToNode(entry.getKey(), entry.getValue(), objectNode); } @@ -249,7 +258,7 @@ private static ObjectNode mapToNode(Map value) { } private static ArrayNode mapToArray(Collection collection) { - return mapToArray(collection, mapper.createArrayNode()); + return mapToArray(collection, mapper().createArrayNode()); } private static ArrayNode mapToArray(Collection collection, ArrayNode arrayNode) { @@ -260,11 +269,11 @@ private static ArrayNode mapToArray(Collection collection, ArrayNode arrayNod } public static ObjectNode object() { - return mapper.createObjectNode(); + return mapper().createObjectNode(); } public static ArrayNode array() { - return mapper.createArrayNode(); + return mapper().createArrayNode(); } public static Optional toDate(JsonNode node) { diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index 3a12ea3d9..6a403a3d4 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -18,6 +18,7 @@ import static io.serverlessworkflow.impl.WorkflowUtils.loadFirst; import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.jackson.JsonFormat; import java.util.Objects; import java.util.function.Supplier; @@ -25,6 +26,11 @@ public class ObjectMapperFactoryProvider implements Supplier () -> new ObjectMapper().findAndRegisterModules()); + : loadFirst(ObjectMapperFactory.class).orElseGet(() -> () -> DEFAULT_MAPPER); } } diff --git a/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java new file mode 100644 index 000000000..672be8aab --- /dev/null +++ b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + +public class JacksonCloudEventUtilsTest { + + private CloudEvent createSampleEvent() { + return CloudEventBuilder.v1() + .withId("5dc4698e-5f98-470e-bb76-04218fe2dd0f") + .withSource(URI.create("api:/newsletter")) + .withType("org.acme.newsletter.review.done") + .withDataContentType("application/json") + .withExtension("flowinstanceid", "01KMRBFA19GZYW3XY895Z4SNCK") + .withData("{\"status\":\"NEEDS_REVISION\"}".getBytes(StandardCharsets.UTF_8)) + .build(); + } + + @Test + public void testCloudEventSerialization() { + CloudEvent event = createSampleEvent(); + + JsonNode node = JacksonCloudEventUtils.toJsonNode(event); + + assertNotNull(node); + assertTrue(node.has("specversion"), "Missing mandatory specversion attribute"); + assertEquals("1.0", node.get("specversion").asText()); + + assertFalse(node.has("specVersion"), "Jackson POJO serializer mangled the envelope!"); + + assertEquals("5dc4698e-5f98-470e-bb76-04218fe2dd0f", node.get("id").asText()); + assertEquals("01KMRBFA19GZYW3XY895Z4SNCK", node.get("flowinstanceid").asText()); + + assertTrue(node.has("data")); + assertEquals("NEEDS_REVISION", node.get("data").get("status").asText()); + } + + @Test + public void testCloudEventDeserialization() { + CloudEvent originalEvent = createSampleEvent(); + JsonNode node = JacksonCloudEventUtils.toJsonNode(originalEvent); + + CloudEvent restoredEvent = JacksonCloudEventUtils.toCloudEvent(node); + + assertNotNull(restoredEvent); + assertEquals(originalEvent.getId(), restoredEvent.getId()); + assertEquals(originalEvent.getType(), restoredEvent.getType()); + assertEquals(originalEvent.getSpecVersion(), restoredEvent.getSpecVersion()); + + assertEquals("01KMRBFA19GZYW3XY895Z4SNCK", restoredEvent.getExtension("flowinstanceid")); + } + + @Test + public void testCloudEventDataRoundTrip() { + byte[] rawJson = "{\"draft\":\"Bullish Market Update\"}".getBytes(StandardCharsets.UTF_8); + CloudEventData originalData = BytesCloudEventData.wrap(rawJson); + + JsonNode dataNode = JacksonCloudEventUtils.toJsonNode(originalData); + assertNotNull(dataNode); + assertEquals("Bullish Market Update", dataNode.get("draft").asText()); + + CloudEventData restoredData = JacksonCloudEventUtils.toCloudEventData(dataNode); + assertNotNull(restoredData); + assertEquals(rawJson.length, restoredData.toBytes().length); + } + + @Test + public void testJsonUtilsIntegration() { + CloudEvent originalEvent = createSampleEvent(); + + JsonNode modelNode = JsonUtils.fromValue(originalEvent); + assertNotNull(modelNode); + assertTrue(modelNode.has("specversion")); + + CloudEvent extractedEvent = JsonUtils.convertValue(modelNode, CloudEvent.class); + assertNotNull(extractedEvent); + assertEquals("5dc4698e-5f98-470e-bb76-04218fe2dd0f", extractedEvent.getId()); + } +} diff --git a/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java new file mode 100644 index 000000000..04a953aa9 --- /dev/null +++ b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson; + +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class JsonUtilsStaticInitTest { + + private ObjectMapperFactory originalFactory; + + @BeforeEach + public void setUp() { + // Capture the original factory (or fallback) to restore it later + // so we don't pollute other tests running in the same JVM. + originalFactory = ObjectMapperFactoryProvider.instance().get(); + } + + @AfterEach + public void tearDown() { + // Restore the provider to its original state + ObjectMapperFactoryProvider.instance().setFactory(originalFactory); + } + + @Test + public void testMapperReflectsDynamicFactoryOverride() { + ObjectMapper initialMapper = JsonUtils.mapper(); + ObjectMapper customMapper = new ObjectMapper(); + + assertNotSame( + initialMapper, + customMapper, + "Initial mapper and custom mapper should be different instances"); + ObjectMapperFactoryProvider.instance().setFactory(() -> customMapper); + ObjectMapper fetchedMapper = JsonUtils.mapper(); + + assertSame( + customMapper, + fetchedMapper, + "JsonUtils.mapper() failed to return the dynamically injected ObjectMapper. Static caching trap is present!"); + } +} From 28754a0a242922dee40be9d2bc6dfaf8ed3c8f43 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 30 Mar 2026 14:47:31 -0400 Subject: [PATCH 2/7] Enable any type of collection cast on *ModelCollection Signed-off-by: Ricardo Zanini --- .../executors/func/EventFilteringTest.java | 9 ++-- .../impl/model/func/JavaModelCollection.java | 30 +++++++++++-- .../fluent/test/FuncEventFilterTest.java | 44 ++++++++++++++++++- .../model/jackson/JacksonModelCollection.java | 38 +++++++++++++--- 4 files changed, 105 insertions(+), 16 deletions(-) diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java index b447254e8..dcaa17585 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java @@ -38,7 +38,7 @@ import io.serverlessworkflow.impl.events.EventPublisher; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; @@ -74,7 +74,6 @@ public void testIntelligentNewsletterApprovalPath() throws Exception { try (WorkflowApplication app = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build()) { - // 1. Build the workflow using your exact DSL Workflow workflow = FuncWorkflowBuilder.workflow("intelligent-newsletter") .tasks( @@ -86,11 +85,9 @@ public void testIntelligentNewsletterApprovalPath() throws Exception { consumed("org.acme.newsletter.review.done") .extensionByInstanceId("instanceid"))) .outputAs( - (ArrayList events) -> { + (List events) -> { try { - return MAPPER.readValue( - ((CloudEventData) events.iterator().next()).toBytes(), - HumanReview.class); + return MAPPER.readValue(events.get(0).toBytes(), HumanReview.class); } catch (Exception e) { throw new RuntimeException("Failed to deserialize HumanReview", e); } diff --git a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java index 5c52b33ed..c007e8d16 100644 --- a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java +++ b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java @@ -17,10 +17,14 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Optional; +import java.util.Set; public class JavaModelCollection implements Collection, WorkflowModelCollection { @@ -138,9 +142,29 @@ public Class objectClass() { } @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public Optional as(Class clazz) { - return object.getClass().isAssignableFrom(clazz) - ? Optional.of(clazz.cast(object)) - : Optional.empty(); + if (object == null) return Optional.empty(); + + if (clazz.isInstance(object)) return Optional.of(clazz.cast(object)); + + if (clazz.isAssignableFrom(List.class)) return Optional.of(clazz.cast(new ArrayList<>(object))); + else if (clazz.isAssignableFrom(Set.class)) + return Optional.of(clazz.cast(new HashSet<>(object))); + + if (clazz.isArray()) { + Class componentType = clazz.getComponentType(); + if (!componentType.isPrimitive()) { + Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); + return Optional.of(clazz.cast(object.toArray(typedArray))); + } + + Object primitiveArray = Array.newInstance(componentType, object.size()); + int i = 0; + for (Object item : object) Array.set(primitiveArray, i++, item); + return Optional.of(clazz.cast(primitiveArray)); + } + + return Optional.empty(); } } diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java index 3058eaa2d..8a4a05ee2 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java @@ -20,22 +20,28 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; /** * Tests for the Event Filter DSL specification. Verifies that the fluent builder correctly wires - * the payload parsing and contextual lambdas into the final Workflow definitions. + * the payload parsing and contextual lambdas into the final Workflow definitions, and ensures the + * ModelCollection adapters seamlessly convert between types. */ class FuncEventFilterTest { + public record Review(String author, String text, int rating) {} + @Test void testListenToOneCollection() { runIt( @@ -47,7 +53,7 @@ void testListenToOneCollection() { } @Test - void testListenToOneNode() { + void testListenToOneArrayNode() { runIt( FuncWorkflowBuilder.workflow("listenToOneReviewNode") .tasks( @@ -56,6 +62,36 @@ void testListenToOneNode() { .build()); } + @Test + void testListenToOneList() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewList") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((List list) -> list.get(0))) + .build()); + } + + @Test + void testListenToOneSet() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewSet") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((Set set) -> set.iterator().next())) + .build()); + } + + @Test + void testListenToOneArray() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewArray") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((JsonNode[] array) -> array[0])) + .build()); + } + private Workflow reviewEmitter() { return FuncWorkflowBuilder.workflow("emitReview") .tasks(emitJson("draftReady", "org.acme.test.review", Review.class)) @@ -64,10 +100,14 @@ private Workflow reviewEmitter() { private void runIt(Workflow listen) { Review review = new Review("Torrente", "espectacular", 5); + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + CompletableFuture waiting = app.workflowDefinition(listen).instance(Map.of()).start(); + app.workflowDefinition(reviewEmitter()).instance(review).start().join(); + assertThat(waiting.join().as(Review.class).orElseThrow()).isEqualTo(review); } } diff --git a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java index 4bc3b2d52..a1523b2ac 100644 --- a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java +++ b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java @@ -20,9 +20,14 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.lang.reflect.Array; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Optional; +import java.util.Set; public class JacksonModelCollection implements WorkflowModelCollection { @@ -38,12 +43,35 @@ public class JacksonModelCollection implements WorkflowModelCollection { @Override public Optional as(Class clazz) { - if (clazz.equals(Collection.class)) { - return Optional.of(clazz.cast(this)); + if (node == null) return Optional.empty(); + + if (clazz.isInstance(node)) return Optional.of(clazz.cast(node)); + + if (clazz.isInstance(this)) return Optional.of(clazz.cast(this)); + + List elements = new ArrayList<>(node.size()); + node.forEach(elements::add); + + if (clazz.isAssignableFrom(List.class)) return Optional.of(clazz.cast(elements)); + else if (clazz.isAssignableFrom(Set.class)) + return Optional.of(clazz.cast(new HashSet<>(elements))); + + if (clazz.isArray()) { + Class componentType = clazz.getComponentType(); + + if (!componentType.isPrimitive()) { + Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); + return Optional.of(clazz.cast(elements.toArray(typedArray))); + } + + Object primitiveArray = Array.newInstance(componentType, elements.size()); + int i = 0; + for (Object item : elements) Array.set(primitiveArray, i++, item); + + return Optional.of(clazz.cast(primitiveArray)); } - return clazz.isAssignableFrom(ArrayNode.class) - ? Optional.of(clazz.cast(node)) - : Optional.empty(); + + return Optional.empty(); } @Override From 4e7a946b3e69899a54825204c3c10cbd911baa14 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 30 Mar 2026 15:06:13 -0400 Subject: [PATCH 3/7] Add tests for jacksonModelFactory Signed-off-by: Ricardo Zanini --- .../FuncEventFilterPropertiesBuilder.java | 10 +- .../test/JacksonEventFilteringTest.java | 135 ++++++++++++++++++ 2 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java index 30017d891..3c5703a93 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.api.types.func.TypedFilterPredicate; import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate; import java.util.function.Predicate; public class FuncEventFilterPropertiesBuilder @@ -54,19 +55,22 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate pre public FuncEventFilterPropertiesBuilder envelope(Predicate predicate) { this.eventProperties.setAdditionalProperty( - "envelopePredicate", new TypedPredicate<>(predicate, CloudEvent.class)); + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(ContextPredicate predicate) { this.eventProperties.setAdditionalProperty( - "envelopePredicate", new TypedContextPredicate<>(predicate, CloudEvent.class)); + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedContextPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(FilterPredicate predicate) { this.eventProperties.setAdditionalProperty( - "envelopePredicate", new TypedFilterPredicate<>(predicate, CloudEvent.class)); + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedFilterPredicate<>(predicate, CloudEvent.class)); return this; } } diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java new file mode 100644 index 000000000..c04c50b4a --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; +import static org.assertj.core.api.Assertions.assertThat; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.Test; + +public class JacksonEventFilteringTest { + + // --- Mock Domain Models --- + public record NewsletterRequest(String topic) {} + + public record NewsletterDraft(String title, String body) {} + + public record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } + + // --- Mock Service Methods --- + public NewsletterDraft writeDraft(NewsletterRequest req) { + return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); + } + + public NewsletterDraft editDraft(HumanReview review) { + return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); + } + + public void sendEmail(NewsletterDraft draft) { + // Simulates MailService.send + } + + @Test + public void testJacksonAutomagicalConversion() throws Exception { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Workflow workflow = + FuncWorkflowBuilder.workflow("intelligent-newsletter") + .tasks( + function("draftAgent", this::writeDraft).exportAsTaskOutput(), + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), + listen( + "waitHumanReview", + to().one( + consumed("org.acme.newsletter.review.done") + .extensionByInstanceId("instanceid"))) + .outputAs((Collection events) -> events.iterator().next()), + // The engine sees the incoming JsonNode, sees this task expects + // HumanReview.class, + // and natively deserializes it for you before executing the lambda! + switchWhenOrElse( + h -> HumanReview.NEEDS_REVISION.equals(h.status()), + "humanEditorAgent", + "sendNewsletter", + HumanReview.class), + function("humanEditorAgent", this::editDraft) + .exportAsTaskOutput() + .then("draftReady"), + consume("sendNewsletter", this::sendEmail) + // Because we are in Jackson, the payload at this evaluation stage can be a + // Map. + // We simply check for the "status" field to know if it's the review payload. + .inputFrom( + (Map payload, + WorkflowContextData wfc, + TaskContextData tfc) -> + payload.containsKey("status") ? wfc.context() : payload)) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); + CompletableFuture future = instance.start(); + + Thread.sleep(250); + assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + + CloudEvent humanReviewEvent = + CloudEventBuilder.v1() + .withId("event-123") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", instance.id()) + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + publisher.publish(humanReviewEvent).toCompletableFuture().join(); + + future.join(); + + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + } +} From d73f3d740f5790935441b2df26b600dc2e8a4f1e Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 30 Mar 2026 19:09:39 -0400 Subject: [PATCH 4/7] Incorporate review comments Signed-off-by: Ricardo Zanini --- experimental/lambda/pom.xml | 5 + .../executors/func/EventFilteringTest.java | 48 ++++--- .../func/TraceExecutionListener.java | 130 ------------------ experimental/test/pom.xml | 65 +++++---- .../test/JacksonEventFilteringTest.java | 7 +- .../events/DefaultCloudEventPredicate.java | 4 +- .../jackson/ObjectMapperFactoryProvider.java | 14 +- .../model/jackson/JacksonModelCollection.java | 3 +- 8 files changed, 86 insertions(+), 190 deletions(-) delete mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java diff --git a/experimental/lambda/pom.xml b/experimental/lambda/pom.xml index f298a332a..22f9f897f 100644 --- a/experimental/lambda/pom.xml +++ b/experimental/lambda/pom.xml @@ -50,6 +50,11 @@ assertj-core test + + org.awaitility + awaitility + test + ch.qos.logback logback-classic diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java index dcaa17585..aefaeeb41 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java @@ -22,7 +22,7 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; -import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.fasterxml.jackson.databind.ObjectMapper; import io.cloudevents.CloudEvent; @@ -38,22 +38,13 @@ import io.serverlessworkflow.impl.events.EventPublisher; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; public class EventFilteringTest { - // --- Mock Domain Models --- - public record NewsletterRequest(String topic) {} - - public record NewsletterDraft(String title, String body) {} - - public record HumanReview(String status, String notes) { - public static final String NEEDS_REVISION = "NEEDS_REVISION"; - public static final String APPROVED = "APPROVED"; - } - private static final ObjectMapper MAPPER = new ObjectMapper(); // --- Mock Service Methods (replacing Quarkus Agents) --- @@ -71,8 +62,7 @@ public void sendEmail(NewsletterDraft draft) { @Test public void testIntelligentNewsletterApprovalPath() throws Exception { - try (WorkflowApplication app = - WorkflowApplication.builder().withListener(new TraceExecutionListener()).build()) { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { Workflow workflow = FuncWorkflowBuilder.workflow("intelligent-newsletter") @@ -111,8 +101,9 @@ public void testIntelligentNewsletterApprovalPath() throws Exception { CompletableFuture future = instance.start(); // Wait for it to hit the listen state - Thread.sleep(250); - assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); EventPublisher publisher = app.eventPublishers().iterator().next(); @@ -131,12 +122,10 @@ public void testIntelligentNewsletterApprovalPath() throws Exception { publisher.publish(maliciousEvent).toCompletableFuture().join(); - // Give the engine a moment to process and discard the event - Thread.sleep(250); - - // Assert that the workflow completely ignored it and is STILL waiting - assertThat(future.isDone()).isFalse(); - assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + await() + .pollDelay(Duration.ofMillis(250)) + .atMost(Duration.ofMillis(500)) + .until(() -> instance.status() == WorkflowStatus.WAITING && !future.isDone()); // --- THE POSITIVE TEST: Fire the CORRECT event --- CloudEvent humanReviewEvent = @@ -153,11 +142,20 @@ public void testIntelligentNewsletterApprovalPath() throws Exception { publisher.publish(humanReviewEvent).toCompletableFuture().join(); - // Wait for the workflow to resume and finish - future.join(); - // Assert successful completion - assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.COMPLETED); } } + + // --- Mock Domain Models --- + public record NewsletterRequest(String topic) {} + + public record NewsletterDraft(String title, String body) {} + + public record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } } diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java deleted file mode 100644 index af5d2e7cc..000000000 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/TraceExecutionListener.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverless.workflow.impl.executors.func; - -import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; -import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TraceExecutionListener implements WorkflowExecutionListener { - - private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class); - - @Override - public void onWorkflowStarted(WorkflowStartedEvent ev) { - logger.info( - "Workflow definition {} with id {} started at {} with data {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.eventDate(), - ev.workflowContext().instanceData().input()); - } - - @Override - public void onWorkflowResumed(WorkflowResumedEvent ev) { - logger.info( - "Workflow definition {} with id {} resumed at {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.eventDate()); - } - - @Override - public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { - logger.info( - "Workflow definition {} with id {} suspended at {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.eventDate()); - } - - @Override - public void onWorkflowCompleted(WorkflowCompletedEvent ev) { - logger.info( - "Workflow definition {} with id {} completed at {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.eventDate()); - } - - @Override - public void onWorkflowFailed(WorkflowFailedEvent ev) { - logger.info( - "Workflow definition {} with id {} failed at {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.eventDate(), - ev.cause()); - } - - @Override - public void onTaskStarted(TaskStartedEvent ev) { - logger.info( - "Task {} started at {}, position {}", - ev.taskContext().taskName(), - ev.eventDate(), - ev.taskContext().position()); - } - - @Override - public void onTaskCompleted(TaskCompletedEvent ev) { - logger.info( - "Task {} completed at {} with output {}", - ev.taskContext().taskName(), - ev.eventDate(), - ev.taskContext().output().asJavaObject()); - } - - @Override - public void onTaskFailed(TaskFailedEvent ev) { - logger.info( - "Task {} failed at {}", - ev.taskContext().taskName(), - ev.eventDate(), - ev.taskContext().output().asJavaObject(), - ev.cause()); - } - - @Override - public void onTaskRetried(TaskRetriedEvent ev) { - logger.info( - "Task {} retried at {}, position {}", - ev.taskContext().taskName(), - ev.eventDate(), - ev.taskContext().position()); - } - - @Override - public void onWorkflowStatusChanged(WorkflowStatusEvent ev) { - logger.info( - "Workflow definition {} with id {} changed status from {} to {} at {}", - ev.workflowContext().definition().workflow().getDocument().getName(), - ev.workflowContext().instanceData().id(), - ev.previousStatus(), - ev.status(), - ev.eventDate()); - } -} diff --git a/experimental/test/pom.xml b/experimental/test/pom.xml index 3b24140f4..ac91a92d9 100644 --- a/experimental/test/pom.xml +++ b/experimental/test/pom.xml @@ -1,14 +1,15 @@ - - 4.0.0 - - io.serverlessworkflow - serverlessworkflow-experimental - 8.0.0-SNAPSHOT - - serverlessworkflow-experimental-test - Serverless Workflow :: Experimental :: Test - - + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-experimental + 8.0.0-SNAPSHOT + + serverlessworkflow-experimental-test + Serverless Workflow :: Experimental :: Test + + org.junit.jupiter junit-jupiter-engine test @@ -16,28 +17,38 @@ org.mockito mockito-core - test + test - + org.assertj assertj-core - test + test - io.serverlessworkflow - serverlessworkflow-experimental-fluent-func - test + org.awaitility + awaitility + test - - io.serverlessworkflow - serverlessworkflow-experimental-lambda - test + + ch.qos.logback + logback-classic + test - - io.serverlessworkflow - serverlessworkflow-impl-model - ${project.version} - test + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-func + test + + + io.serverlessworkflow + serverlessworkflow-experimental-lambda + test + + + io.serverlessworkflow + serverlessworkflow-impl-model + ${project.version} + test - + \ No newline at end of file diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java index c04c50b4a..2d01c973c 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java @@ -23,6 +23,7 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -38,6 +39,7 @@ import io.serverlessworkflow.impl.events.EventPublisher; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -109,8 +111,9 @@ public void testJacksonAutomagicalConversion() throws Exception { WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); CompletableFuture future = instance.start(); - Thread.sleep(250); - assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); CloudEvent humanReviewEvent = CloudEventBuilder.v1() diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index 569ec64fe..dddf8127f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -73,8 +73,10 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio private CloudEventAttrPredicate envelopeFilter( EventProperties properties, WorkflowApplication app) { Object envelopePredObj = null; - if (properties.getAdditionalProperties() != null) + if (properties.getAdditionalProperties() != null + && properties.getAdditionalProperties().containsKey(ENVELOPE_PREDICATE)) envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE); + return envelopePredObj == null ? isTrue() : fromCloudEvent( diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index 6a403a3d4..ad4f253ed 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -35,7 +35,7 @@ public static ObjectMapperFactoryProvider instance() { return instance; } - private ObjectMapperFactory objectMapperFactory; + private volatile ObjectMapperFactory objectMapperFactory; private ObjectMapperFactoryProvider() {} @@ -45,8 +45,14 @@ public void setFactory(ObjectMapperFactory objectMapperFactory) { @Override public ObjectMapperFactory get() { - return objectMapperFactory != null - ? objectMapperFactory - : loadFirst(ObjectMapperFactory.class).orElseGet(() -> () -> DEFAULT_MAPPER); + if (objectMapperFactory == null) { + synchronized (this) { + if (objectMapperFactory == null) { + objectMapperFactory = + loadFirst(ObjectMapperFactory.class).orElseGet(() -> () -> DEFAULT_MAPPER); + } + } + } + return objectMapperFactory; } } diff --git a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java index a1523b2ac..83c5f1796 100644 --- a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java +++ b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java @@ -66,7 +66,8 @@ else if (clazz.isAssignableFrom(Set.class)) Object primitiveArray = Array.newInstance(componentType, elements.size()); int i = 0; - for (Object item : elements) Array.set(primitiveArray, i++, item); + for (Object item : elements) + Array.set(primitiveArray, i++, JsonUtils.convertValue(item, componentType)); return Optional.of(clazz.cast(primitiveArray)); } From 281101d888c62eed3f3d491d64015b353cb5a27f Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 30 Mar 2026 19:26:28 -0400 Subject: [PATCH 5/7] Fix sync block Signed-off-by: Ricardo Zanini --- .../impl/jackson/ObjectMapperFactoryProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index ad4f253ed..a60ef8348 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -39,7 +39,7 @@ public static ObjectMapperFactoryProvider instance() { private ObjectMapperFactoryProvider() {} - public void setFactory(ObjectMapperFactory objectMapperFactory) { + public synchronized void setFactory(ObjectMapperFactory objectMapperFactory) { this.objectMapperFactory = Objects.requireNonNull(objectMapperFactory); } From 3388954386d45fdb36f960e71eb9796b21701180 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 31 Mar 2026 12:00:19 -0400 Subject: [PATCH 6/7] Isolating as method on utils; refactor ObjectMapperFactoryProvider to lazy init mapper Signed-off-by: Ricardo Zanini --- .../impl/model/func/JavaModelCollection.java | 36 ++++------ .../impl/CollectionConversionUtils.java | 72 +++++++++++++++++++ .../jackson/ObjectMapperFactoryProvider.java | 25 +++++-- .../model/jackson/JacksonModelCollection.java | 37 +++------- 4 files changed, 113 insertions(+), 57 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java diff --git a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java index c007e8d16..947c074be 100644 --- a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java +++ b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java @@ -15,16 +15,14 @@ */ package io.serverlessworkflow.impl.model.func; +import io.serverlessworkflow.impl.CollectionConversionUtils; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; public class JavaModelCollection implements Collection, WorkflowModelCollection { @@ -82,14 +80,22 @@ public Iterator iterator() { return new ModelIterator(object.iterator()); } + private List toModelList() { + List models = new ArrayList<>(object.size()); + for (Object obj : object) + models.add(obj instanceof WorkflowModel value ? value : nextItem(obj)); + + return models; + } + @Override public Object[] toArray() { - throw new UnsupportedOperationException("toArray is not supported yet"); + return toModelList().toArray(); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException("toArray is not supported yet"); + return toModelList().toArray(a); } @Override @@ -142,29 +148,11 @@ public Class objectClass() { } @Override - @SuppressWarnings({"rawtypes", "unchecked"}) public Optional as(Class clazz) { if (object == null) return Optional.empty(); if (clazz.isInstance(object)) return Optional.of(clazz.cast(object)); - if (clazz.isAssignableFrom(List.class)) return Optional.of(clazz.cast(new ArrayList<>(object))); - else if (clazz.isAssignableFrom(Set.class)) - return Optional.of(clazz.cast(new HashSet<>(object))); - - if (clazz.isArray()) { - Class componentType = clazz.getComponentType(); - if (!componentType.isPrimitive()) { - Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); - return Optional.of(clazz.cast(object.toArray(typedArray))); - } - - Object primitiveArray = Array.newInstance(componentType, object.size()); - int i = 0; - for (Object item : object) Array.set(primitiveArray, i++, item); - return Optional.of(clazz.cast(primitiveArray)); - } - - return Optional.empty(); + return CollectionConversionUtils.as(object, clazz); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java new file mode 100644 index 000000000..2ebe7a1c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; + +public final class CollectionConversionUtils { + private CollectionConversionUtils() {} + + /** + * Safely converts a base Collection into the requested List, Set, or Array type. + * + * @param elements The base collection of elements. + * @param clazz The target class to convert to. + * @param primitiveConverter Strategy for converting items to primitives if an array is requested. + */ + public static Optional as( + Collection elements, + Class clazz, + BiFunction, Object> primitiveConverter) { + if (clazz.isAssignableFrom(List.class)) + return Optional.of(clazz.cast(new ArrayList<>(elements))); + else if (clazz.isAssignableFrom(Set.class)) + return Optional.of(clazz.cast(new HashSet<>(elements))); + + if (clazz.isArray()) { + Class componentType = clazz.getComponentType(); + + if (!componentType.isPrimitive()) { + Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); + return Optional.of(clazz.cast(elements.toArray(typedArray))); + } + + Object primitiveArray = Array.newInstance(componentType, elements.size()); + + int i = 0; + for (Object item : elements) + Array.set(primitiveArray, i++, primitiveConverter.apply(item, componentType)); + + return Optional.of(clazz.cast(primitiveArray)); + } + + return Optional.empty(); + } + + /** + * @see #as(Collection, Class, BiFunction) + */ + public static Optional as(Collection elements, Class clazz) { + return as(elements, clazz, (item, type) -> item); + } +} diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index a60ef8348..4b4109661 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -26,11 +26,6 @@ public class ObjectMapperFactoryProvider implements Supplier () -> DEFAULT_MAPPER); + loadFirst(ObjectMapperFactory.class).orElseGet(DefaultObjectMapperFactory::new); } } } return objectMapperFactory; } + + /** Internal default private factory lazy initialized. */ + private static class DefaultObjectMapperFactory implements ObjectMapperFactory { + + private final ObjectMapper mapper; + + DefaultObjectMapperFactory() { + this.mapper = + new ObjectMapper() + .findAndRegisterModules() + .registerModule(JsonFormat.getCloudEventJacksonModule()); + } + + @Override + public ObjectMapper get() { + return mapper; + } + } } diff --git a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java index 83c5f1796..aff21cca4 100644 --- a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java +++ b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java @@ -17,17 +17,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import io.serverlessworkflow.impl.CollectionConversionUtils; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.jackson.JsonUtils; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; public class JacksonModelCollection implements WorkflowModelCollection { @@ -46,33 +44,12 @@ public Optional as(Class clazz) { if (node == null) return Optional.empty(); if (clazz.isInstance(node)) return Optional.of(clazz.cast(node)); - if (clazz.isInstance(this)) return Optional.of(clazz.cast(this)); List elements = new ArrayList<>(node.size()); node.forEach(elements::add); - if (clazz.isAssignableFrom(List.class)) return Optional.of(clazz.cast(elements)); - else if (clazz.isAssignableFrom(Set.class)) - return Optional.of(clazz.cast(new HashSet<>(elements))); - - if (clazz.isArray()) { - Class componentType = clazz.getComponentType(); - - if (!componentType.isPrimitive()) { - Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); - return Optional.of(clazz.cast(elements.toArray(typedArray))); - } - - Object primitiveArray = Array.newInstance(componentType, elements.size()); - int i = 0; - for (Object item : elements) - Array.set(primitiveArray, i++, JsonUtils.convertValue(item, componentType)); - - return Optional.of(clazz.cast(primitiveArray)); - } - - return Optional.empty(); + return CollectionConversionUtils.as(elements, clazz, JsonUtils::convertValue); } @Override @@ -115,14 +92,20 @@ public WorkflowModel next() { } } + private List toModelList() { + List models = new ArrayList<>(node.size()); + node.forEach(n -> models.add(new JacksonModel(n))); + return models; + } + @Override public Object[] toArray() { - throw new UnsupportedOperationException("toArray() is not supported yet"); + return toModelList().toArray(); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException("toArray() is not supported yet"); + return toModelList().toArray(a); } @Override From 57ac21f1a293e7845a8983e8d5a2a515d30a685a Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 31 Mar 2026 12:43:24 -0400 Subject: [PATCH 7/7] Adding missing check Signed-off-by: Ricardo Zanini --- .../serverlessworkflow/impl/model/func/JavaModelCollection.java | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java index 947c074be..410fcab69 100644 --- a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java +++ b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java @@ -151,6 +151,7 @@ public Class objectClass() { public Optional as(Class clazz) { if (object == null) return Optional.empty(); + if (clazz.isInstance(this)) return Optional.of(clazz.cast(this)); if (clazz.isInstance(object)) return Optional.of(clazz.cast(object)); return CollectionConversionUtils.as(object, clazz);