diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java index 1c322bc30..3c5703a93 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java @@ -20,7 +20,11 @@ import io.serverlessworkflow.api.types.func.ContextPredicate; import io.serverlessworkflow.api.types.func.EventDataPredicate; import io.serverlessworkflow.api.types.func.FilterPredicate; +import io.serverlessworkflow.api.types.func.TypedContextPredicate; +import io.serverlessworkflow.api.types.func.TypedFilterPredicate; +import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate; import java.util.function.Predicate; public class FuncEventFilterPropertiesBuilder @@ -50,20 +54,23 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate pre } public FuncEventFilterPropertiesBuilder envelope(Predicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(ContextPredicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedContextPredicate<>(predicate, CloudEvent.class)); return this; } public FuncEventFilterPropertiesBuilder envelope(FilterPredicate predicate) { - this.eventProperties.setData( - new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + this.eventProperties.setAdditionalProperty( + DefaultCloudEventPredicate.ENVELOPE_PREDICATE, + new TypedFilterPredicate<>(predicate, CloudEvent.class)); return this; } } diff --git a/experimental/lambda/pom.xml b/experimental/lambda/pom.xml index 50c054dad..22f9f897f 100644 --- a/experimental/lambda/pom.xml +++ b/experimental/lambda/pom.xml @@ -21,7 +21,7 @@ io.serverlessworkflow serverlessworkflow-impl-core - + io.serverlessworkflow serverlessworkflow-impl-jq @@ -50,6 +50,11 @@ assertj-core test + + org.awaitility + awaitility + test + ch.qos.logback logback-classic diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java new file mode 100644 index 000000000..aefaeeb41 --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/EventFilteringTest.java @@ -0,0 +1,161 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; +import static org.awaitility.Awaitility.await; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.Test; + +public class EventFilteringTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // --- Mock Service Methods (replacing Quarkus Agents) --- + public NewsletterDraft writeDraft(NewsletterRequest req) { + return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); + } + + public NewsletterDraft editDraft(HumanReview review) { + return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); + } + + public void sendEmail(NewsletterDraft draft) { + // Simulates MailService.send + } + + @Test + public void testIntelligentNewsletterApprovalPath() throws Exception { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Workflow workflow = + FuncWorkflowBuilder.workflow("intelligent-newsletter") + .tasks( + function("draftAgent", this::writeDraft).exportAsTaskOutput(), + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), + listen( + "waitHumanReview", + to().one( + consumed("org.acme.newsletter.review.done") + .extensionByInstanceId("instanceid"))) + .outputAs( + (List events) -> { + try { + return MAPPER.readValue(events.get(0).toBytes(), HumanReview.class); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize HumanReview", e); + } + }), + switchWhenOrElse( + h -> HumanReview.NEEDS_REVISION.equals(h.status()), + "humanEditorAgent", + "sendNewsletter", + HumanReview.class), + function("humanEditorAgent", this::editDraft) + .exportAsTaskOutput() + .then("draftReady"), + consume("sendNewsletter", this::sendEmail) + .inputFrom( + (payload, wfc, tfc) -> + payload instanceof HumanReview ? wfc.context() : payload)) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); + CompletableFuture future = instance.start(); + + // Wait for it to hit the listen state + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + + // --- THE NEGATIVE TEST: Fire an event with the WRONG instance id --- + CloudEvent maliciousEvent = + CloudEventBuilder.v1() + .withId("event-wrong") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", "SOME-OTHER-ID-12345") // Does not match instance.id() + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Malicious approval\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + publisher.publish(maliciousEvent).toCompletableFuture().join(); + + await() + .pollDelay(Duration.ofMillis(250)) + .atMost(Duration.ofMillis(500)) + .until(() -> instance.status() == WorkflowStatus.WAITING && !future.isDone()); + + // --- THE POSITIVE TEST: Fire the CORRECT event --- + CloudEvent humanReviewEvent = + CloudEventBuilder.v1() + .withId("event-123") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", instance.id()) // Matches exactly + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + publisher.publish(humanReviewEvent).toCompletableFuture().join(); + + // Assert successful completion + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.COMPLETED); + } + } + + // --- Mock Domain Models --- + public record NewsletterRequest(String topic) {} + + public record NewsletterDraft(String title, String body) {} + + public record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } +} diff --git a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java index 5c52b33ed..410fcab69 100644 --- a/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java +++ b/experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java @@ -15,11 +15,13 @@ */ package io.serverlessworkflow.impl.model.func; +import io.serverlessworkflow.impl.CollectionConversionUtils; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Optional; public class JavaModelCollection implements Collection, WorkflowModelCollection { @@ -78,14 +80,22 @@ public Iterator iterator() { return new ModelIterator(object.iterator()); } + private List toModelList() { + List models = new ArrayList<>(object.size()); + for (Object obj : object) + models.add(obj instanceof WorkflowModel value ? value : nextItem(obj)); + + return models; + } + @Override public Object[] toArray() { - throw new UnsupportedOperationException("toArray is not supported yet"); + return toModelList().toArray(); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException("toArray is not supported yet"); + return toModelList().toArray(a); } @Override @@ -139,8 +149,11 @@ public Class objectClass() { @Override public Optional as(Class clazz) { - return object.getClass().isAssignableFrom(clazz) - ? Optional.of(clazz.cast(object)) - : Optional.empty(); + if (object == null) return Optional.empty(); + + if (clazz.isInstance(this)) return Optional.of(clazz.cast(this)); + if (clazz.isInstance(object)) return Optional.of(clazz.cast(object)); + + return CollectionConversionUtils.as(object, clazz); } } diff --git a/experimental/pom.xml b/experimental/pom.xml index 9882957d1..54dd0e206 100644 --- a/experimental/pom.xml +++ b/experimental/pom.xml @@ -15,7 +15,7 @@ serverlessworkflow-impl-core ${project.version} - + io.serverlessworkflow serverlessworkflow-impl-jq ${project.version} diff --git a/experimental/test/pom.xml b/experimental/test/pom.xml index 3b24140f4..ac91a92d9 100644 --- a/experimental/test/pom.xml +++ b/experimental/test/pom.xml @@ -1,14 +1,15 @@ - - 4.0.0 - - io.serverlessworkflow - serverlessworkflow-experimental - 8.0.0-SNAPSHOT - - serverlessworkflow-experimental-test - Serverless Workflow :: Experimental :: Test - - + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-experimental + 8.0.0-SNAPSHOT + + serverlessworkflow-experimental-test + Serverless Workflow :: Experimental :: Test + + org.junit.jupiter junit-jupiter-engine test @@ -16,28 +17,38 @@ org.mockito mockito-core - test + test - + org.assertj assertj-core - test + test - io.serverlessworkflow - serverlessworkflow-experimental-fluent-func - test + org.awaitility + awaitility + test - - io.serverlessworkflow - serverlessworkflow-experimental-lambda - test + + ch.qos.logback + logback-classic + test - - io.serverlessworkflow - serverlessworkflow-impl-model - ${project.version} - test + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-func + test + + + io.serverlessworkflow + serverlessworkflow-experimental-lambda + test + + + io.serverlessworkflow + serverlessworkflow-impl-model + ${project.version} + test - + \ No newline at end of file diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java index 3058eaa2d..8a4a05ee2 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java @@ -20,22 +20,28 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; /** * Tests for the Event Filter DSL specification. Verifies that the fluent builder correctly wires - * the payload parsing and contextual lambdas into the final Workflow definitions. + * the payload parsing and contextual lambdas into the final Workflow definitions, and ensures the + * ModelCollection adapters seamlessly convert between types. */ class FuncEventFilterTest { + public record Review(String author, String text, int rating) {} + @Test void testListenToOneCollection() { runIt( @@ -47,7 +53,7 @@ void testListenToOneCollection() { } @Test - void testListenToOneNode() { + void testListenToOneArrayNode() { runIt( FuncWorkflowBuilder.workflow("listenToOneReviewNode") .tasks( @@ -56,6 +62,36 @@ void testListenToOneNode() { .build()); } + @Test + void testListenToOneList() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewList") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((List list) -> list.get(0))) + .build()); + } + + @Test + void testListenToOneSet() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewSet") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((Set set) -> set.iterator().next())) + .build()); + } + + @Test + void testListenToOneArray() { + runIt( + FuncWorkflowBuilder.workflow("listenToOneReviewArray") + .tasks( + listen("waitReview", toOne("org.acme.test.review")) + .outputAs((JsonNode[] array) -> array[0])) + .build()); + } + private Workflow reviewEmitter() { return FuncWorkflowBuilder.workflow("emitReview") .tasks(emitJson("draftReady", "org.acme.test.review", Review.class)) @@ -64,10 +100,14 @@ private Workflow reviewEmitter() { private void runIt(Workflow listen) { Review review = new Review("Torrente", "espectacular", 5); + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + CompletableFuture waiting = app.workflowDefinition(listen).instance(Map.of()).start(); + app.workflowDefinition(reviewEmitter()).instance(review).start().join(); + assertThat(waiting.join().as(Review.class).orElseThrow()).isEqualTo(review); } } diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java new file mode 100644 index 000000000..2d01c973c --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.Test; + +public class JacksonEventFilteringTest { + + // --- Mock Domain Models --- + public record NewsletterRequest(String topic) {} + + public record NewsletterDraft(String title, String body) {} + + public record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } + + // --- Mock Service Methods --- + public NewsletterDraft writeDraft(NewsletterRequest req) { + return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); + } + + public NewsletterDraft editDraft(HumanReview review) { + return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); + } + + public void sendEmail(NewsletterDraft draft) { + // Simulates MailService.send + } + + @Test + public void testJacksonAutomagicalConversion() throws Exception { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Workflow workflow = + FuncWorkflowBuilder.workflow("intelligent-newsletter") + .tasks( + function("draftAgent", this::writeDraft).exportAsTaskOutput(), + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), + listen( + "waitHumanReview", + to().one( + consumed("org.acme.newsletter.review.done") + .extensionByInstanceId("instanceid"))) + .outputAs((Collection events) -> events.iterator().next()), + // The engine sees the incoming JsonNode, sees this task expects + // HumanReview.class, + // and natively deserializes it for you before executing the lambda! + switchWhenOrElse( + h -> HumanReview.NEEDS_REVISION.equals(h.status()), + "humanEditorAgent", + "sendNewsletter", + HumanReview.class), + function("humanEditorAgent", this::editDraft) + .exportAsTaskOutput() + .then("draftReady"), + consume("sendNewsletter", this::sendEmail) + // Because we are in Jackson, the payload at this evaluation stage can be a + // Map. + // We simply check for the "status" field to know if it's the review payload. + .inputFrom( + (Map payload, + WorkflowContextData wfc, + TaskContextData tfc) -> + payload.containsKey("status") ? wfc.context() : payload)) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); + CompletableFuture future = instance.start(); + + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + CloudEvent humanReviewEvent = + CloudEventBuilder.v1() + .withId("event-123") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", instance.id()) + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + publisher.publish(humanReviewEvent).toCompletableFuture().join(); + + future.join(); + + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java new file mode 100644 index 000000000..2ebe7a1c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/CollectionConversionUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; + +public final class CollectionConversionUtils { + private CollectionConversionUtils() {} + + /** + * Safely converts a base Collection into the requested List, Set, or Array type. + * + * @param elements The base collection of elements. + * @param clazz The target class to convert to. + * @param primitiveConverter Strategy for converting items to primitives if an array is requested. + */ + public static Optional as( + Collection elements, + Class clazz, + BiFunction, Object> primitiveConverter) { + if (clazz.isAssignableFrom(List.class)) + return Optional.of(clazz.cast(new ArrayList<>(elements))); + else if (clazz.isAssignableFrom(Set.class)) + return Optional.of(clazz.cast(new HashSet<>(elements))); + + if (clazz.isArray()) { + Class componentType = clazz.getComponentType(); + + if (!componentType.isPrimitive()) { + Object[] typedArray = (Object[]) Array.newInstance(componentType, 0); + return Optional.of(clazz.cast(elements.toArray(typedArray))); + } + + Object primitiveArray = Array.newInstance(componentType, elements.size()); + + int i = 0; + for (Object item : elements) + Array.set(primitiveArray, i++, primitiveConverter.apply(item, componentType)); + + return Optional.of(clazz.cast(primitiveArray)); + } + + return Optional.empty(); + } + + /** + * @see #as(Collection, Class, BiFunction) + */ + public static Optional as(Collection elements, Class clazz) { + return as(elements, clazz, (item, type) -> item); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index c58f7de05..dddf8127f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -36,6 +36,12 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { + /** + * {@link EventProperties#getAdditionalProperties()} property for custom {@link CloudEvent} {@link + * java.util.function.Predicate} evaluation. + */ + public static final String ENVELOPE_PREDICATE = "envelopePredicate"; + private final CloudEventAttrPredicate idFilter; private final CloudEventAttrPredicate sourceFilter; private final CloudEventAttrPredicate subjectFilter; @@ -44,9 +50,10 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { private final CloudEventAttrPredicate dataSchemaFilter; private final CloudEventAttrPredicate timeFilter; private final CloudEventAttrPredicate dataFilter; + private final CloudEventAttrPredicate envelopeFilter; private final CloudEventAttrPredicate> additionalFilter; - private static final CloudEventAttrPredicate isTrue() { + private static CloudEventAttrPredicate isTrue() { return (x, w, t) -> true; } @@ -59,9 +66,25 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), app); timeFilter = offsetTimeFilter(properties.getTime(), app); dataFilter = dataFilter(properties.getData(), app); + envelopeFilter = envelopeFilter(properties, app); additionalFilter = additionalFilter(properties.getAdditionalProperties(), app); } + private CloudEventAttrPredicate envelopeFilter( + EventProperties properties, WorkflowApplication app) { + Object envelopePredObj = null; + if (properties.getAdditionalProperties() != null + && properties.getAdditionalProperties().containsKey(ENVELOPE_PREDICATE)) + envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE); + + return envelopePredObj == null + ? isTrue() + : fromCloudEvent( + app.modelFactory(), + app.expressionFactory() + .buildPredicate(new ExpressionDescriptor(null, envelopePredObj))); + } + private CloudEventAttrPredicate> additionalFilter( Map additionalProperties, WorkflowApplication app) { return additionalProperties != null && !additionalProperties.isEmpty() @@ -72,11 +95,16 @@ private CloudEventAttrPredicate> additionalFilter( : isTrue(); } - private CloudEventAttrPredicate fromCloudEvent( + private CloudEventAttrPredicate fromCloudEventData( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); } + private CloudEventAttrPredicate fromCloudEvent( + WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { + return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e)); + } + private CloudEventAttrPredicate> fromMap( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); @@ -85,7 +113,7 @@ private CloudEventAttrPredicate> fromMap( private CloudEventAttrPredicate dataFilter( EventData data, WorkflowApplication app) { return data != null - ? fromCloudEvent( + ? fromCloudEventData( app.modelFactory(), app.expressionFactory() .buildPredicate( @@ -179,6 +207,7 @@ public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task && typeFilter.test(event.getType(), workflow, task) && dataSchemaFilter.test(event.getDataSchema(), workflow, task) && timeFilter.test(event.getTime(), workflow, task) + && envelopeFilter.test(event, workflow, task) && dataFilter.test(event.getData(), workflow, task) && additionalFilter.test(CloudEventUtils.extensions(event), workflow, task); } diff --git a/impl/json-utils/pom.xml b/impl/json-utils/pom.xml index 57627413b..3829a3842 100644 --- a/impl/json-utils/pom.xml +++ b/impl/json-utils/pom.xml @@ -1,24 +1,36 @@ - - 4.0.0 - - io.serverlessworkflow - serverlessworkflow-impl - 8.0.0-SNAPSHOT - - serverlessworkflow-impl-json - Serverless Workflow :: Impl :: Json utils - - - com.fasterxml.jackson.core - jackson-databind - - + + 4.0.0 + io.serverlessworkflow - serverlessworkflow-impl-core - - - io.cloudevents - cloudevents-json-jackson - - + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-json + Serverless Workflow :: Impl :: Json utils + + + com.fasterxml.jackson.core + jackson-databind + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.cloudevents + cloudevents-json-jackson + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.assertj + assertj-core + test + + \ No newline at end of file diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java index a3591a93c..0ad2409ef 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtils.java @@ -15,12 +15,14 @@ */ package io.serverlessworkflow.impl.jackson; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; +import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.jackson.JsonCloudEventData; +import io.cloudevents.jackson.JsonFormat; import java.io.IOException; import java.io.UncheckedIOException; import java.time.OffsetDateTime; @@ -30,30 +32,17 @@ public class JacksonCloudEventUtils { public static JsonNode toJsonNode(CloudEvent event) { - ObjectNode result = JsonUtils.mapper().createObjectNode(); - if (event.getData() != null) { - result.set("data", toJsonNode(event.getData())); - } - if (event.getSubject() != null) { - result.put("subject", event.getSubject()); - } - if (event.getDataContentType() != null) { - result.put("datacontenttype", event.getDataContentType()); - } - result.put("id", event.getId()); - result.put("source", event.getSource().toString()); - result.put("type", event.getType()); - result.put("specversion", event.getSpecVersion().toString()); - if (event.getDataSchema() != null) { - result.put("dataschema", event.getDataSchema().toString()); + if (event == null) { + return NullNode.instance; } - if (event.getTime() != null) { - result.put("time", event.getTime().toString()); + // Delegate entirely to the official CloudEvents SDK + byte[] serialized = + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).serialize(event); + try { + return JsonUtils.mapper().readTree(serialized); + } catch (IOException e) { + throw new UncheckedIOException(e); } - event - .getExtensionNames() - .forEach(n -> result.set(n, JsonUtils.fromValue(event.getExtension(n)))); - return result; } public static OffsetDateTime toOffset(Date date) { @@ -73,5 +62,26 @@ public static JsonNode toJsonNode(CloudEventData data) { } } + public static CloudEvent toCloudEvent(JsonNode node) { + if (node == null || node.isNull()) { + return null; + } + try { + byte[] ceBytes = JsonUtils.mapper().writeValueAsBytes(node); + return EventFormatProvider.getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(ceBytes); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Failed to deserialize JsonNode to CloudEvent", e); + } + } + + public static CloudEventData toCloudEventData(JsonNode node) { + if (node == null || node.isNull()) { + return null; + } + return JsonCloudEventData.wrap(node); + } + private JacksonCloudEventUtils() {} } diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java index 8ca1724e2..236178d35 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java @@ -31,6 +31,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ShortNode; import com.fasterxml.jackson.databind.node.TextNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.serverlessworkflow.impl.WorkflowModel; import java.io.IOException; import java.math.BigDecimal; @@ -54,14 +56,12 @@ public class JsonUtils { - private static ObjectMapper mapper = ObjectMapperFactoryProvider.instance().get().get(); - public static ObjectMapper mapper() { - return mapper; + return ObjectMapperFactoryProvider.instance().get().get(); } public static Collector arrayNodeCollector() { - return new Collector() { + return new Collector<>() { @Override public BiConsumer accumulator() { return ArrayNode::add; @@ -87,7 +87,7 @@ public Function finisher() { @Override public Supplier supplier() { - return () -> mapper.createArrayNode(); + return () -> mapper().createArrayNode(); } }; } @@ -97,6 +97,7 @@ public Supplier supplier() { * Although we can use directly ObjectMapper.convertValue for implementing fromValue and toJavaValue methods, * the performance gain of avoiding an intermediate buffer is so tempting that we cannot avoid it */ + @SuppressWarnings("unchecked") public static JsonNode fromValue(Object value) { if (value == null) { return NullNode.instance; @@ -128,8 +129,12 @@ public static JsonNode fromValue(Object value) { return mapToNode((Map) value); } else if (value instanceof WorkflowModel model) { return modelToJson(model); + } else if (value instanceof CloudEvent ce) { + return JacksonCloudEventUtils.toJsonNode(ce); + } else if (value instanceof CloudEventData data) { + return JacksonCloudEventUtils.toJsonNode(data); } else { - return mapper.convertValue(value, JsonNode.class); + return mapper().convertValue(value, JsonNode.class); } } @@ -147,7 +152,7 @@ public static JsonNode fromString(String value) { String trimmedValue = value.trim(); if (trimmedValue.startsWith("{") && trimmedValue.endsWith("}")) { try { - return mapper.readTree(trimmedValue); + return mapper().readTree(trimmedValue); } catch (IOException ex) { // ignore and return test node } @@ -163,8 +168,8 @@ private static Object toJavaValue(ObjectNode node) { return result; } - private static Collection toJavaValue(ArrayNode node) { - Collection result = new ArrayList<>(); + private static Collection toJavaValue(ArrayNode node) { + Collection result = new ArrayList<>(); for (JsonNode item : node) { result.add(internalToJavaValue(item, JsonUtils::toJavaValue, JsonUtils::toJavaValue)); } @@ -181,7 +186,7 @@ public static T convertValue(Object obj, Class returnType) { } else if (obj instanceof JsonNode) { return convertValue((JsonNode) obj, returnType); } else { - return mapper.convertValue(obj, returnType); + return mapper().convertValue(obj, returnType); } } @@ -197,8 +202,12 @@ public static T convertValue(JsonNode jsonNode, Class returnType) { obj = jsonNode.asLong(); } else if (String.class.isAssignableFrom(returnType)) { obj = jsonNode.asText(); + } else if (CloudEvent.class.isAssignableFrom(returnType)) { + obj = JacksonCloudEventUtils.toCloudEvent(jsonNode); + } else if (CloudEventData.class.isAssignableFrom(returnType)) { + obj = JacksonCloudEventUtils.toCloudEventData(jsonNode); } else { - obj = mapper.convertValue(jsonNode, returnType); + obj = mapper().convertValue(jsonNode, returnType); } return returnType.cast(obj); } @@ -228,12 +237,12 @@ private static Object internalToJavaValue( } else if (jsonNode.isObject()) { return objectFunction.apply((ObjectNode) jsonNode); } else { - return mapper.convertValue(jsonNode, Object.class); + return mapper().convertValue(jsonNode, Object.class); } } public static String toString(JsonNode node) throws JsonProcessingException { - return mapper.writeValueAsString(node); + return mapper().writeValueAsString(node); } public static void addToNode(String name, Object value, ObjectNode dest) { @@ -241,7 +250,7 @@ public static void addToNode(String name, Object value, ObjectNode dest) { } private static ObjectNode mapToNode(Map value) { - ObjectNode objectNode = mapper.createObjectNode(); + ObjectNode objectNode = mapper().createObjectNode(); for (Map.Entry entry : value.entrySet()) { addToNode(entry.getKey(), entry.getValue(), objectNode); } @@ -249,7 +258,7 @@ private static ObjectNode mapToNode(Map value) { } private static ArrayNode mapToArray(Collection collection) { - return mapToArray(collection, mapper.createArrayNode()); + return mapToArray(collection, mapper().createArrayNode()); } private static ArrayNode mapToArray(Collection collection, ArrayNode arrayNode) { @@ -260,11 +269,11 @@ private static ArrayNode mapToArray(Collection collection, ArrayNode arrayNod } public static ObjectNode object() { - return mapper.createObjectNode(); + return mapper().createObjectNode(); } public static ArrayNode array() { - return mapper.createArrayNode(); + return mapper().createArrayNode(); } public static Optional toDate(JsonNode node) { diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index 3a12ea3d9..4b4109661 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -18,6 +18,7 @@ import static io.serverlessworkflow.impl.WorkflowUtils.loadFirst; import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.jackson.JsonFormat; import java.util.Objects; import java.util.function.Supplier; @@ -29,19 +30,42 @@ public static ObjectMapperFactoryProvider instance() { return instance; } - private ObjectMapperFactory objectMapperFactory; + private volatile ObjectMapperFactory objectMapperFactory; private ObjectMapperFactoryProvider() {} - public void setFactory(ObjectMapperFactory objectMapperFactory) { + public synchronized void setFactory(ObjectMapperFactory objectMapperFactory) { this.objectMapperFactory = Objects.requireNonNull(objectMapperFactory); } @Override public ObjectMapperFactory get() { - return objectMapperFactory != null - ? objectMapperFactory - : loadFirst(ObjectMapperFactory.class) - .orElseGet(() -> () -> new ObjectMapper().findAndRegisterModules()); + if (objectMapperFactory == null) { + synchronized (this) { + if (objectMapperFactory == null) { + objectMapperFactory = + loadFirst(ObjectMapperFactory.class).orElseGet(DefaultObjectMapperFactory::new); + } + } + } + return objectMapperFactory; + } + + /** Internal default private factory lazy initialized. */ + private static class DefaultObjectMapperFactory implements ObjectMapperFactory { + + private final ObjectMapper mapper; + + DefaultObjectMapperFactory() { + this.mapper = + new ObjectMapper() + .findAndRegisterModules() + .registerModule(JsonFormat.getCloudEventJacksonModule()); + } + + @Override + public ObjectMapper get() { + return mapper; + } } } diff --git a/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java new file mode 100644 index 000000000..672be8aab --- /dev/null +++ b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JacksonCloudEventUtilsTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + +public class JacksonCloudEventUtilsTest { + + private CloudEvent createSampleEvent() { + return CloudEventBuilder.v1() + .withId("5dc4698e-5f98-470e-bb76-04218fe2dd0f") + .withSource(URI.create("api:/newsletter")) + .withType("org.acme.newsletter.review.done") + .withDataContentType("application/json") + .withExtension("flowinstanceid", "01KMRBFA19GZYW3XY895Z4SNCK") + .withData("{\"status\":\"NEEDS_REVISION\"}".getBytes(StandardCharsets.UTF_8)) + .build(); + } + + @Test + public void testCloudEventSerialization() { + CloudEvent event = createSampleEvent(); + + JsonNode node = JacksonCloudEventUtils.toJsonNode(event); + + assertNotNull(node); + assertTrue(node.has("specversion"), "Missing mandatory specversion attribute"); + assertEquals("1.0", node.get("specversion").asText()); + + assertFalse(node.has("specVersion"), "Jackson POJO serializer mangled the envelope!"); + + assertEquals("5dc4698e-5f98-470e-bb76-04218fe2dd0f", node.get("id").asText()); + assertEquals("01KMRBFA19GZYW3XY895Z4SNCK", node.get("flowinstanceid").asText()); + + assertTrue(node.has("data")); + assertEquals("NEEDS_REVISION", node.get("data").get("status").asText()); + } + + @Test + public void testCloudEventDeserialization() { + CloudEvent originalEvent = createSampleEvent(); + JsonNode node = JacksonCloudEventUtils.toJsonNode(originalEvent); + + CloudEvent restoredEvent = JacksonCloudEventUtils.toCloudEvent(node); + + assertNotNull(restoredEvent); + assertEquals(originalEvent.getId(), restoredEvent.getId()); + assertEquals(originalEvent.getType(), restoredEvent.getType()); + assertEquals(originalEvent.getSpecVersion(), restoredEvent.getSpecVersion()); + + assertEquals("01KMRBFA19GZYW3XY895Z4SNCK", restoredEvent.getExtension("flowinstanceid")); + } + + @Test + public void testCloudEventDataRoundTrip() { + byte[] rawJson = "{\"draft\":\"Bullish Market Update\"}".getBytes(StandardCharsets.UTF_8); + CloudEventData originalData = BytesCloudEventData.wrap(rawJson); + + JsonNode dataNode = JacksonCloudEventUtils.toJsonNode(originalData); + assertNotNull(dataNode); + assertEquals("Bullish Market Update", dataNode.get("draft").asText()); + + CloudEventData restoredData = JacksonCloudEventUtils.toCloudEventData(dataNode); + assertNotNull(restoredData); + assertEquals(rawJson.length, restoredData.toBytes().length); + } + + @Test + public void testJsonUtilsIntegration() { + CloudEvent originalEvent = createSampleEvent(); + + JsonNode modelNode = JsonUtils.fromValue(originalEvent); + assertNotNull(modelNode); + assertTrue(modelNode.has("specversion")); + + CloudEvent extractedEvent = JsonUtils.convertValue(modelNode, CloudEvent.class); + assertNotNull(extractedEvent); + assertEquals("5dc4698e-5f98-470e-bb76-04218fe2dd0f", extractedEvent.getId()); + } +} diff --git a/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java new file mode 100644 index 000000000..04a953aa9 --- /dev/null +++ b/impl/json-utils/src/test/java/io/serverlessworkflow/impl/jackson/JsonUtilsStaticInitTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson; + +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class JsonUtilsStaticInitTest { + + private ObjectMapperFactory originalFactory; + + @BeforeEach + public void setUp() { + // Capture the original factory (or fallback) to restore it later + // so we don't pollute other tests running in the same JVM. + originalFactory = ObjectMapperFactoryProvider.instance().get(); + } + + @AfterEach + public void tearDown() { + // Restore the provider to its original state + ObjectMapperFactoryProvider.instance().setFactory(originalFactory); + } + + @Test + public void testMapperReflectsDynamicFactoryOverride() { + ObjectMapper initialMapper = JsonUtils.mapper(); + ObjectMapper customMapper = new ObjectMapper(); + + assertNotSame( + initialMapper, + customMapper, + "Initial mapper and custom mapper should be different instances"); + ObjectMapperFactoryProvider.instance().setFactory(() -> customMapper); + ObjectMapper fetchedMapper = JsonUtils.mapper(); + + assertSame( + customMapper, + fetchedMapper, + "JsonUtils.mapper() failed to return the dynamically injected ObjectMapper. Static caching trap is present!"); + } +} diff --git a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java index 4bc3b2d52..aff21cca4 100644 --- a/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java +++ b/impl/model/src/main/java/io/serverlessworkflow/impl/model/jackson/JacksonModelCollection.java @@ -17,11 +17,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import io.serverlessworkflow.impl.CollectionConversionUtils; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Optional; public class JacksonModelCollection implements WorkflowModelCollection { @@ -38,12 +41,15 @@ public class JacksonModelCollection implements WorkflowModelCollection { @Override public Optional as(Class clazz) { - if (clazz.equals(Collection.class)) { - return Optional.of(clazz.cast(this)); - } - return clazz.isAssignableFrom(ArrayNode.class) - ? Optional.of(clazz.cast(node)) - : Optional.empty(); + if (node == null) return Optional.empty(); + + if (clazz.isInstance(node)) return Optional.of(clazz.cast(node)); + if (clazz.isInstance(this)) return Optional.of(clazz.cast(this)); + + List elements = new ArrayList<>(node.size()); + node.forEach(elements::add); + + return CollectionConversionUtils.as(elements, clazz, JsonUtils::convertValue); } @Override @@ -86,14 +92,20 @@ public WorkflowModel next() { } } + private List toModelList() { + List models = new ArrayList<>(node.size()); + node.forEach(n -> models.add(new JacksonModel(n))); + return models; + } + @Override public Object[] toArray() { - throw new UnsupportedOperationException("toArray() is not supported yet"); + return toModelList().toArray(); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException("toArray() is not supported yet"); + return toModelList().toArray(a); } @Override