diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java index 63bef84b..31bf37b4 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java @@ -46,7 +46,12 @@ public FuncCallTaskBuilder function(Function function) { } public FuncCallTaskBuilder function(Function function, Class argClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass)); + return function(function, argClass, null); + } + + public FuncCallTaskBuilder function( + Function function, Class argClass, Class returnClass) { + this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass)); super.setTask(this.callTaskJava.getCallJava()); return this; } @@ -56,7 +61,12 @@ public FuncCallTaskBuilder function(ContextFunction function) { } public FuncCallTaskBuilder function(ContextFunction function, Class argClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass)); + return function(function, argClass, null); + } + + public FuncCallTaskBuilder function( + ContextFunction function, Class argClass, Class returnClass) { + this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass)); super.setTask(this.callTaskJava.getCallJava()); return this; } @@ -66,7 +76,12 @@ public FuncCallTaskBuilder function(FilterFunction function) { } public FuncCallTaskBuilder function(FilterFunction function, Class argClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass)); + return function(function, argClass, null); + } + + public FuncCallTaskBuilder function( + FilterFunction function, Class argClass, Class outputClass) { + this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, outputClass)); super.setTask(this.callTaskJava.getCallJava()); return this; } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java index 5c46bfc7..625b224a 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java @@ -16,10 +16,10 @@ package io.serverlessworkflow.fluent.func; import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.EventDataFunction; import io.serverlessworkflow.api.types.func.FilterFunction; -import io.serverlessworkflow.fluent.func.dsl.SerializableFunction; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.util.function.Function; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java index 7ea014d7..c57edbae 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java @@ -56,13 +56,20 @@ public FuncForkTaskBuilder branch(String name, Function function) { public FuncForkTaskBuilder branch( String name, Function function, Class argParam) { + return branch(name, function, argParam, null); + } + + public FuncForkTaskBuilder branch( + String name, Function function, Class argParam, Class returnClass) { if (name == null || name.isBlank()) { name = "branch-" + this.items.size(); } this.items.add( new TaskItem( name, - new Task().withCallTask(new CallTaskJava(CallJava.function(function, argParam))))); + new Task() + .withCallTask( + new CallTaskJava(CallJava.function(function, argParam, returnClass))))); return this; } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java index 6da5d4be..202813c7 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java @@ -15,6 +15,9 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; +import io.serverlessworkflow.api.reflection.func.SerializablePredicate; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java index eb21cad9..036534b8 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java @@ -29,47 +29,52 @@ public final class FuncCallStep extends Step, FuncCallT private final ContextFunction ctxFn; private final FilterFunction filterFn; private final Class argClass; + private final Class returnClass; /** Function variant (unnamed). */ - FuncCallStep(Function fn, Class argClass) { - this(null, fn, argClass); + FuncCallStep(Function fn, Class argClass, Class returnClass) { + this(null, fn, argClass, returnClass); } /** Function variant (named). */ - FuncCallStep(String name, Function fn, Class argClass) { + FuncCallStep(String name, Function fn, Class argClass, Class returnClass) { this.name = name; this.fn = fn; this.ctxFn = null; this.filterFn = null; this.argClass = argClass; + this.returnClass = returnClass; } /** ContextFunction variant (unnamed). */ - FuncCallStep(ContextFunction ctxFn, Class argClass) { - this(null, ctxFn, argClass); + FuncCallStep(ContextFunction ctxFn, Class argClass, Class returnClass) { + this(null, ctxFn, argClass, returnClass); } /** ContextFunction variant (named). */ - FuncCallStep(String name, ContextFunction ctxFn, Class argClass) { + FuncCallStep(String name, ContextFunction ctxFn, Class argClass, Class returnClass) { this.name = name; this.fn = null; this.ctxFn = ctxFn; this.filterFn = null; this.argClass = argClass; + this.returnClass = returnClass; } /** FilterFunction variant (unnamed). */ - FuncCallStep(FilterFunction filterFn, Class argClass) { - this(null, filterFn, argClass); + FuncCallStep(FilterFunction filterFn, Class argClass, Class returnClass) { + this(null, filterFn, argClass, returnClass); } /** FilterFunction variant (named). */ - FuncCallStep(String name, FilterFunction filterFn, Class argClass) { + FuncCallStep( + String name, FilterFunction filterFn, Class argClass, Class returnClass) { this.name = name; this.fn = null; this.ctxFn = null; this.filterFn = filterFn; this.argClass = argClass; + this.returnClass = returnClass; } @Override @@ -77,11 +82,11 @@ protected void configure(FuncTaskItemListBuilder list, Consumer apply = cb -> { if (ctxFn != null) { - cb.function(ctxFn, argClass); + cb.function(ctxFn, argClass, returnClass); } else if (filterFn != null) { - cb.function(filterFn, argClass); + cb.function(filterFn, argClass, returnClass); } else { - cb.function(fn, argClass); + cb.function(fn, argClass, returnClass); } post.accept(cb); }; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index d5d14ef8..746d0ce1 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -16,6 +16,12 @@ package io.serverlessworkflow.fluent.func.dsl; import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.reflection.func.InstanceIdFunction; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.reflection.func.SerializableConsumer; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; +import io.serverlessworkflow.api.reflection.func.SerializablePredicate; +import io.serverlessworkflow.api.reflection.func.UniqueIdBiFunction; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.OAuth2AuthenticationData; import io.serverlessworkflow.api.types.func.ContextFunction; @@ -103,7 +109,11 @@ public static Consumer fn( * @return a consumer that configures a {@code FuncCallTaskBuilder} */ public static Consumer fn(SerializableFunction function) { - return f -> f.function(function, ReflectionUtils.inferInputType(function)); + return f -> + f.function( + function, + ReflectionUtils.inferInputType(function), + ReflectionUtils.inferResultType(function)); } /** @@ -348,11 +358,16 @@ public static FuncCallStep withContext(ContextFunction fn) { */ public static FuncCallStep withContext( String name, ContextFunction fn, Class in) { - return new FuncCallStep<>(name, fn, in); + return withContext(name, fn, in, ReflectionUtils.inferResultType(fn)); + } + + public static FuncCallStep withContext( + String name, ContextFunction fn, Class in, Class out) { + return new FuncCallStep<>(name, fn, in, out); } public static FuncCallStep withContext(String name, ContextFunction fn) { - return new FuncCallStep<>(name, fn, ReflectionUtils.inferInputType(fn)); + return withContext(name, fn, ReflectionUtils.inferInputType(fn)); } /** @@ -384,7 +399,12 @@ public static FuncCallStep withFilter(FilterFunction fn, Clas */ public static FuncCallStep withFilter( String name, FilterFunction fn, Class in) { - return new FuncCallStep<>(name, fn, in); + return withFilter(name, fn, in, ReflectionUtils.inferResultType(fn)); + } + + public static FuncCallStep withFilter( + String name, FilterFunction fn, Class in, Class out) { + return new FuncCallStep<>(name, fn, in, out); } public static FuncCallStep withFilter(FilterFunction fn) { @@ -407,8 +427,13 @@ public static FuncCallStep withFilter(String name, FilterFunction FuncCallStep withInstanceId( String name, InstanceIdFunction fn, Class in) { + return withInstanceId(name, fn, in, ReflectionUtils.inferResultType(fn)); + } + + public static FuncCallStep withInstanceId( + String name, InstanceIdFunction fn, Class in, Class out) { ContextFunction jcf = (payload, wctx) -> fn.apply(wctx.instanceData().id(), payload); - return new FuncCallStep<>(name, jcf, in); + return new FuncCallStep<>(name, jcf, in, out); } /** @@ -463,9 +488,14 @@ static String defaultUniqueId(WorkflowContextData wctx, TaskContextData tctx) { */ public static FuncCallStep withUniqueId( String name, UniqueIdBiFunction fn, Class in) { + return withUniqueId(name, fn, in, ReflectionUtils.inferResultType(fn)); + } + + public static FuncCallStep withUniqueId( + String name, UniqueIdBiFunction fn, Class in, Class out) { FilterFunction jff = (payload, wctx, tctx) -> fn.apply(defaultUniqueId(wctx, tctx), payload); - return new FuncCallStep<>(name, jff, in); + return new FuncCallStep<>(name, jff, in, out); } public static FuncCallStep withUniqueId(String name, UniqueIdBiFunction fn) { @@ -577,7 +607,23 @@ public static FuncCallStep agent(String name, UniqueIdBiFunction FuncCallStep function(Function fn, Class inputClass) { - return new FuncCallStep<>(fn, inputClass); + return function(fn, inputClass, null); + } + + /** + * Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input + * type. + * + * @param fn the function to execute at runtime + * @param inputClass expected input class for model conversion + * @param outputClass expected outputClass class for model conversion + * @param input type + * @param result type + * @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)}) + */ + public static FuncCallStep function( + Function fn, Class inputClass, Class outputClass) { + return new FuncCallStep<>(fn, inputClass, outputClass); } /** @@ -590,8 +636,8 @@ public static FuncCallStep function(Function fn, Class inp * @return a call step */ public static FuncCallStep function(SerializableFunction fn) { - Class inputClass = ReflectionUtils.inferInputType(fn); - return new FuncCallStep<>(fn, inputClass); + return new FuncCallStep<>( + fn, ReflectionUtils.inferInputType(fn), ReflectionUtils.inferResultType(fn)); } /** @@ -604,8 +650,8 @@ public static FuncCallStep function(SerializableFunction fn) * @return a named call step */ public static FuncCallStep function(String name, SerializableFunction fn) { - Class inputClass = ReflectionUtils.inferInputType(fn); - return new FuncCallStep<>(name, fn, inputClass); + return new FuncCallStep<>( + name, fn, ReflectionUtils.inferInputType(fn), ReflectionUtils.inferResultType(fn)); } /** @@ -620,7 +666,23 @@ public static FuncCallStep function(String name, SerializableFuncti */ public static FuncCallStep function( String name, Function fn, Class inputClass) { - return new FuncCallStep<>(name, fn, inputClass); + return new FuncCallStep<>(name, fn, inputClass, null); + } + + /** + * Named variant of {@link #function(Function, Class)} with explicit input type. + * + * @param name task name + * @param fn the function to execute + * @param inputClass expected input class + * @param outputClass expected output class + * @param input type + * @param output type + * @return a named call step + */ + public static FuncCallStep function( + String name, Function fn, Class inputClass, Class outputClass) { + return new FuncCallStep<>(name, fn, inputClass, outputClass); } // ------------------ tasks ---------------- // diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java index 03c11933..7b7f9bee 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java @@ -18,6 +18,8 @@ import io.cloudevents.CloudEventData; import io.cloudevents.core.data.BytesCloudEventData; import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.EventDataFunction; import io.serverlessworkflow.fluent.func.FuncEmitEventPropertiesBuilder; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java index b20fc72b..3b8c0823 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java @@ -21,6 +21,7 @@ import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.jackson.PojoCloudEventDataMapper; +import io.serverlessworkflow.api.reflection.func.SerializablePredicate; import io.serverlessworkflow.api.types.func.ContextPredicate; import io.serverlessworkflow.api.types.func.FilterPredicate; import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java index d1261d43..2f5055b0 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java @@ -15,6 +15,9 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; +import io.serverlessworkflow.api.reflection.func.SerializablePredicate; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.FilterFunction; diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java index 9b3b5136..b3ae573c 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java @@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.serverlessworkflow.api.reflection.func.UniqueIdBiFunction; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.FilterFunction; -import io.serverlessworkflow.fluent.func.dsl.UniqueIdBiFunction; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.WorkflowInstanceData; diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java index fe890139..466a9c27 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java @@ -23,39 +23,70 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -public abstract class AbstractJavaCallExecutor implements CallableTask { +public abstract class AbstractJavaCallExecutor implements CallableTask { protected final Optional> inputClass; + private final Optional> outputClass; + private final Optional typeConverter; + private final boolean directCompletable; + private final boolean convertedCompletable; protected AbstractJavaCallExecutor() { - this(Optional.empty()); + this(Optional.empty(), Optional.empty()); } - protected AbstractJavaCallExecutor(Optional> inputClass) { + protected AbstractJavaCallExecutor( + Optional> inputClass, Optional> outputClass) { this.inputClass = inputClass; + this.outputClass = outputClass; + this.typeConverter = outputClass.flatMap(DataTypeConverterRegistry.get()::find); + this.directCompletable = outputClass.filter(c -> c.equals(CompletableFuture.class)).isPresent(); + this.convertedCompletable = + typeConverter.filter(c -> c.targetType().equals(CompletableFuture.class)).isPresent(); } @Override public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - Object result = - convertResponse(callJavaFunction(workflowContext, taskContext, model2Input(input))); WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory(); - return result instanceof CompletableFuture future - ? future.thenApply(v -> output2Model(modelFactory, input, convertResponse(v))) - : CompletableFuture.completedFuture(output2Model(modelFactory, input, result)); + + if (directCompletable) { + return ((CompletableFuture) + callJavaFunction(workflowContext, taskContext, model2Input(input))) + .thenApply(v -> output2Model(modelFactory, input, convertResponse(v))); + } else if (convertedCompletable) { + return ((CompletableFuture) + convertTypedResponse( + callJavaFunction(workflowContext, taskContext, model2Input(input)))) + .thenApply(v -> output2Model(modelFactory, input, convertResponse(v))); + } else if (outputClass.isPresent()) { + return CompletableFuture.supplyAsync( + () -> callJavaFunction(workflowContext, taskContext, model2Input(input)), + workflowContext.definition().application().executorService()) + .thenApply(v -> output2Model(modelFactory, input, convertTypedResponse(v))); + } else { + Object result = + convertResponse(callJavaFunction(workflowContext, taskContext, model2Input(input))); + return result instanceof CompletableFuture future + ? future.thenApply(v -> output2Model(modelFactory, input, convertResponse(v))) + : CompletableFuture.completedFuture(output2Model(modelFactory, input, result)); + } } - protected abstract Object callJavaFunction( + protected abstract V callJavaFunction( WorkflowContext workflowContext, TaskContext taskContext, T input); protected T model2Input(WorkflowModel model) { return JavaFuncUtils.convertT(model, inputClass); } + protected Object convertTypedResponse(V obj) { + return obj == null ? null : typeConverter.map(c -> c.apply(obj)).orElse(obj); + } + protected Object convertResponse(Object obj) { - return obj == null - ? null + return obj == null || obj instanceof CompletableFuture + ? obj : DataTypeConverterRegistry.get().find(obj.getClass()).map(c -> c.apply(obj)).orElse(obj); } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java index 0e322c02..53c316ac 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java @@ -20,4 +20,6 @@ public interface DataTypeConverter extends Function, ServicePriority { Class sourceType(); + + Class targetType(); } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java index ea97e89e..aa7b4cdb 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java @@ -43,7 +43,7 @@ private DataTypeConverterRegistry() { } @SuppressWarnings("rawtypes") - public Optional find(Class clazz) { + public Optional find(Class clazz) { return convertersMap.computeIfAbsent(clazz, this::searchConverter); } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java index d8781ab7..b8c600b7 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java @@ -20,19 +20,20 @@ import io.serverlessworkflow.impl.WorkflowContext; import java.util.Optional; -public class JavaContextFunctionCallExecutor extends AbstractJavaCallExecutor { +public class JavaContextFunctionCallExecutor extends AbstractJavaCallExecutor { private final ContextFunction function; public JavaContextFunctionCallExecutor( - Optional> inputClass, ContextFunction function) { - super(inputClass); + Optional> inputClass, + Optional> outputClass, + ContextFunction function) { + super(inputClass, outputClass); this.function = function; } @Override - protected Object callJavaFunction( - WorkflowContext workflowContext, TaskContext taskContext, T input) { + protected V callJavaFunction(WorkflowContext workflowContext, TaskContext taskContext, T input) { return function.apply(input, workflowContext); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java index c542d166..3d5bf9a5 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java @@ -30,6 +30,7 @@ public class JavaContextFunctionCallExecutorBuilder protected ContextFunction function; protected Optional> inputClass; + protected Optional> outputClass; @Override public boolean accept(Class clazz) { @@ -43,10 +44,11 @@ public void init( WorkflowMutablePosition position) { this.function = task.function(); this.inputClass = task.inputClass(); + this.outputClass = task.outputClass(); } @Override public CallableTask build() { - return new JavaContextFunctionCallExecutor(inputClass, function); + return new JavaContextFunctionCallExecutor(inputClass, outputClass, function); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java index f5e58393..638b0dd1 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java @@ -20,19 +20,20 @@ import io.serverlessworkflow.impl.WorkflowContext; import java.util.Optional; -public class JavaFilterFunctionCallExecutor extends AbstractJavaCallExecutor { +public class JavaFilterFunctionCallExecutor extends AbstractJavaCallExecutor { private final FilterFunction function; public JavaFilterFunctionCallExecutor( - Optional> inputClass, FilterFunction function) { - super(inputClass); + Optional> inputClass, + Optional> outputClass, + FilterFunction function) { + super(inputClass, outputClass); this.function = function; } @Override - protected Object callJavaFunction( - WorkflowContext workflowContext, TaskContext taskContext, T input) { + protected V callJavaFunction(WorkflowContext workflowContext, TaskContext taskContext, T input) { return function.apply(input, workflowContext, taskContext); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java index db354e2d..d1bba503 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java @@ -30,6 +30,7 @@ public class JavaFilterFunctionCallExecutorBuilder private FilterFunction function; private Optional> inputClass; + private Optional> outputClass; @Override public boolean accept(Class clazz) { @@ -43,10 +44,11 @@ public void init( WorkflowMutablePosition position) { this.function = task.function(); this.inputClass = task.inputClass(); + this.outputClass = task.outputClass(); } @Override public CallableTask build() { - return new JavaFilterFunctionCallExecutor<>(inputClass, function); + return new JavaFilterFunctionCallExecutor<>(inputClass, outputClass, function); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java index 33ac6c6e..dc99ac91 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java @@ -20,18 +20,18 @@ import java.util.Optional; import java.util.function.Function; -public class JavaFunctionCallExecutor extends AbstractJavaCallExecutor { +public class JavaFunctionCallExecutor extends AbstractJavaCallExecutor { private final Function function; - public JavaFunctionCallExecutor(Optional> inputClass, Function function) { - super(inputClass); + public JavaFunctionCallExecutor( + Optional> inputClass, Optional> outputClass, Function function) { + super(inputClass, outputClass); this.function = function; } @Override - protected Object callJavaFunction( - WorkflowContext workflowContext, TaskContext taskContext, T input) { + protected V callJavaFunction(WorkflowContext workflowContext, TaskContext taskContext, T input) { return function.apply(input); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java index dd4ec9a7..b0a29caa 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java @@ -30,6 +30,7 @@ public class JavaFunctionCallExecutorBuilder protected Function function; protected Optional> inputClass; + protected Optional> outputClass; @Override public boolean accept(Class clazz) { @@ -43,10 +44,11 @@ public void init( WorkflowMutablePosition position) { function = task.function(); inputClass = task.inputClass(); + outputClass = task.outputClass(); } @Override public CallableTask build() { - return new JavaFunctionCallExecutor<>(inputClass, function); + return new JavaFunctionCallExecutor<>(inputClass, outputClass, function); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java index 0f9bae72..840f3a20 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -public class JavaLoopFunctionCallExecutor extends AbstractJavaCallExecutor { +public class JavaLoopFunctionCallExecutor extends AbstractJavaCallExecutor { private final LoopFunction function; private final String varName; @@ -32,8 +32,7 @@ public JavaLoopFunctionCallExecutor(LoopFunction function, String varNa } @Override - protected Object callJavaFunction( - WorkflowContext workflowContext, TaskContext taskContext, T input) { + protected R callJavaFunction(WorkflowContext workflowContext, TaskContext taskContext, T input) { return function.apply(input, (V) safeObject(taskContext.variables().get(varName))); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java index 9aa3e6c6..5e4837d5 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java @@ -21,22 +21,21 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -public class JavaLoopFunctionIndexCallExecutor extends AbstractJavaCallExecutor { +public class JavaLoopFunctionIndexCallExecutor extends AbstractJavaCallExecutor { - private final LoopFunctionIndex function; + private final LoopFunctionIndex function; private final String varName; private final String indexName; public JavaLoopFunctionIndexCallExecutor( - LoopFunctionIndex function, String varName, String indexName) { + LoopFunctionIndex function, String varName, String indexName) { this.function = function; this.varName = varName; this.indexName = indexName; } @Override - protected Object callJavaFunction( - WorkflowContext workflowContext, TaskContext taskContext, T input) { + protected R callJavaFunction(WorkflowContext workflowContext, TaskContext taskContext, T input) { return function.apply( input, (V) safeObject(taskContext.variables().get(varName)), diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java index a64f0d57..24caaedf 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java @@ -28,4 +28,9 @@ public String apply(StringBuilder t) { public Class sourceType() { return StringBuilder.class; } + + @Override + public Class targetType() { + return String.class; + } } diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncCallAsyncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncCallAsyncTest.java new file mode 100644 index 00000000..1aead510 --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncCallAsyncTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static org.assertj.core.api.Assertions.assertThat; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + +public class FuncCallAsyncTest { + + private void safeSleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + private CompletableFuture waitAsync(Integer waitTime) { + return CompletableFuture.supplyAsync( + () -> { + safeSleep(waitTime); + return 1; + }); + } + + private Integer waitSync(Integer waitTime) { + safeSleep(waitTime); + return 1; + } + + @Test + void testCompletableCall() { + runIt(FuncWorkflowBuilder.workflow("waitCompletable").tasks(function(this::waitAsync)).build()); + } + + @Test + void testReferencedFunctionCall() { + runIt(FuncWorkflowBuilder.workflow("waitReference").tasks(function(this::waitSync)).build()); + } + + @Test + void testLambdaCall() { + runIt(FuncWorkflowBuilder.workflow("waitLambda").tasks(function(v -> 1)).build()); + } + + private class TimeListener implements WorkflowExecutionListener { + + private AtomicLong startTime = new AtomicLong(); + + @Override + public void onWorkflowStarted(WorkflowStartedEvent ev) { + startTime.set(System.currentTimeMillis()); + } + } + + private void runIt(Workflow workflow) { + TimeListener listener = new TimeListener(); + try (WorkflowApplication app = WorkflowApplication.builder().withListener(listener).build()) { + final long waitTime = 200; + WorkflowInstance instance = app.workflowDefinition(workflow).instance(waitTime); + CompletableFuture future = instance.start(); + assertThat(System.currentTimeMillis() - listener.startTime.get()).isLessThan(waitTime); + assertThat(future.join().asNumber().map(Number::intValue).orElseThrow()).isEqualTo(1); + } + } +} diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java index 8a4a05ee..44447baa 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncEventFilterTest.java @@ -15,17 +15,34 @@ */ package io.serverlessworkflow.fluent.test; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -40,8 +57,6 @@ */ class FuncEventFilterTest { - public record Review(String author, String text, int rating) {} - @Test void testListenToOneCollection() { runIt( @@ -111,4 +126,94 @@ private void runIt(Workflow listen) { assertThat(waiting.join().as(Review.class).orElseThrow()).isEqualTo(review); } } + + // --- Mock Service Methods --- + NewsletterDraft writeDraft(NewsletterRequest req) { + return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); + } + + NewsletterDraft editDraft(HumanReview review) { + return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); + } + + void sendEmail(NewsletterDraft draft) { + // Simulates MailService.send + } + + @Test + void testJacksonAutomagicalConversion() throws Exception { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Workflow workflow = + FuncWorkflowBuilder.workflow("intelligent-newsletter") + .tasks( + function("draftAgent", this::writeDraft).exportAsTaskOutput(), + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), + listen( + "waitHumanReview", + to().one( + consumed("org.acme.newsletter.review.done") + .extensionByInstanceId("instanceid"))) + .outputAs((Collection events) -> events.iterator().next()), + // The engine sees the incoming JsonNode, sees this task expects + // HumanReview.class, + // and natively deserializes it for you before executing the lambda! + switchWhenOrElse( + h -> HumanReview.NEEDS_REVISION.equals(h.status()), + "humanEditorAgent", + "sendNewsletter", + HumanReview.class), + function("humanEditorAgent", this::editDraft) + .exportAsTaskOutput() + .then("draftReady"), + consume("sendNewsletter", this::sendEmail) + // Because we are in Jackson, the payload at this evaluation stage can be a + // Map. + // We simply check for the "status" field to know if it's the review payload. + .inputFrom( + (Map payload, + WorkflowContextData wfc, + TaskContextData tfc) -> + payload.containsKey("status") ? wfc.context() : payload)) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); + CompletableFuture future = instance.start(); + + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + CloudEvent humanReviewEvent = + CloudEventBuilder.v1() + .withId("event-123") + .withSource(URI.create("test:/human-editor")) + .withType("org.acme.newsletter.review.done") + .withExtension("instanceid", instance.id()) + .withData( + "application/json", + "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" + .getBytes(StandardCharsets.UTF_8)) + .build(); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + publisher.publish(humanReviewEvent).toCompletableFuture().join(); + + future.join(); + + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + } + + record Review(String author, String text, int rating) {} + + record NewsletterRequest(String topic) {} + + record NewsletterDraft(String title, String body) {} + + record HumanReview(String status, String notes) { + public static final String NEEDS_REVISION = "NEEDS_REVISION"; + public static final String APPROVED = "APPROVED"; + } } diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java deleted file mode 100644 index 2d01c973..00000000 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/JacksonEventFilteringTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.test; - -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import io.cloudevents.CloudEvent; -import io.cloudevents.core.builder.CloudEventBuilder; -import io.serverlessworkflow.api.types.Workflow; -import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.events.EventPublisher; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.junit.jupiter.api.Test; - -public class JacksonEventFilteringTest { - - // --- Mock Domain Models --- - public record NewsletterRequest(String topic) {} - - public record NewsletterDraft(String title, String body) {} - - public record HumanReview(String status, String notes) { - public static final String NEEDS_REVISION = "NEEDS_REVISION"; - public static final String APPROVED = "APPROVED"; - } - - // --- Mock Service Methods --- - public NewsletterDraft writeDraft(NewsletterRequest req) { - return new NewsletterDraft("Draft: " + req.topic(), "Initial body..."); - } - - public NewsletterDraft editDraft(HumanReview review) { - return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes()); - } - - public void sendEmail(NewsletterDraft draft) { - // Simulates MailService.send - } - - @Test - public void testJacksonAutomagicalConversion() throws Exception { - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - - Workflow workflow = - FuncWorkflowBuilder.workflow("intelligent-newsletter") - .tasks( - function("draftAgent", this::writeDraft).exportAsTaskOutput(), - emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), - listen( - "waitHumanReview", - to().one( - consumed("org.acme.newsletter.review.done") - .extensionByInstanceId("instanceid"))) - .outputAs((Collection events) -> events.iterator().next()), - // The engine sees the incoming JsonNode, sees this task expects - // HumanReview.class, - // and natively deserializes it for you before executing the lambda! - switchWhenOrElse( - h -> HumanReview.NEEDS_REVISION.equals(h.status()), - "humanEditorAgent", - "sendNewsletter", - HumanReview.class), - function("humanEditorAgent", this::editDraft) - .exportAsTaskOutput() - .then("draftReady"), - consume("sendNewsletter", this::sendEmail) - // Because we are in Jackson, the payload at this evaluation stage can be a - // Map. - // We simply check for the "status" field to know if it's the review payload. - .inputFrom( - (Map payload, - WorkflowContextData wfc, - TaskContextData tfc) -> - payload.containsKey("status") ? wfc.context() : payload)) - .build(); - - WorkflowDefinition definition = app.workflowDefinition(workflow); - WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); - CompletableFuture future = instance.start(); - - await() - .atMost(Duration.ofSeconds(5)) - .until(() -> instance.status() == WorkflowStatus.WAITING); - - CloudEvent humanReviewEvent = - CloudEventBuilder.v1() - .withId("event-123") - .withSource(URI.create("test:/human-editor")) - .withType("org.acme.newsletter.review.done") - .withExtension("instanceid", instance.id()) - .withData( - "application/json", - "{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}" - .getBytes(StandardCharsets.UTF_8)) - .build(); - - EventPublisher publisher = app.eventPublishers().iterator().next(); - publisher.publish(humanReviewEvent).toCompletableFuture().join(); - - future.join(); - - assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); - } - } -} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/InstanceIdFunction.java similarity index 94% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/InstanceIdFunction.java index 8a9fe291..083456d4 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/InstanceIdFunction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import java.io.Serializable; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java similarity index 56% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java index c0b18499..6b30fcdb 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.FilterFunction; @@ -27,43 +27,48 @@ * * @see Serialize a Lambda in Java */ -final class ReflectionUtils { +public final class ReflectionUtils { private ReflectionUtils() {} @SuppressWarnings("unchecked") - static Class inferInputType(ContextFunction fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + public static Class inferInputType(ContextFunction fn) { + return (Class) inferInputTypeFromAny(fn, 0); } @SuppressWarnings("unchecked") - static Class inferInputType(FilterFunction fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + public static Class inferInputType(FilterFunction fn) { + return (Class) inferInputTypeFromAny(fn, 0); } @SuppressWarnings("unchecked") - static Class inferInputType(SerializableFunction fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + public static Class inferInputType(SerializableFunction fn) { + return (Class) inferInputTypeFromAny(fn, 0); } @SuppressWarnings("unchecked") - static Class inferInputType(SerializablePredicate fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + public static Class inferInputType(SerializablePredicate fn) { + return (Class) inferInputTypeFromAny(fn, 0); } @SuppressWarnings("unchecked") - static Class inferInputType(InstanceIdFunction fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 1)); + public static Class inferInputType(InstanceIdFunction fn) { + return (Class) inferInputTypeFromAny(fn, 1); } @SuppressWarnings("unchecked") - static Class inferInputType(UniqueIdBiFunction fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 1)); + public static Class inferInputType(UniqueIdBiFunction fn) { + return (Class) inferInputTypeFromAny(fn, 1); } @SuppressWarnings("unchecked") - static Class inferInputType(SerializableConsumer fn) { - return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + public static Class inferInputType(SerializableConsumer fn) { + return (Class) inferInputTypeFromAny(fn, 0); + } + + @SuppressWarnings("unchecked") + public static Class inferResultType(Object fn) { + return (Class) inferMethodType(fn).returnType(); } /** @@ -72,7 +77,11 @@ static Class inferInputType(SerializableConsumer fn) { * * @param lambdaParamIndex The index of the payload parameter in the interface's apply method */ - private static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { + public static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { + return inferMethodType(fn).parameterArray()[lambdaParamIndex]; + } + + private static MethodType inferMethodType(Object fn) { try { Method m = fn.getClass().getDeclaredMethod("writeReplace"); m.setAccessible(true); @@ -82,20 +91,11 @@ private static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { // getInstantiatedMethodType() provides the exact generic signature resolved // by the compiler, completely bypassing captured variables and method kind switches! - MethodType mt = MethodType.fromMethodDescriptorString(sl.getInstantiatedMethodType(), cl); - - return mt.parameterArray()[lambdaParamIndex]; - - } catch (Exception ignore) { - return null; - } - } - private static Class throwIllegalStateIfNull(Class clazz) { - if (clazz == null) { + return MethodType.fromMethodDescriptorString(sl.getInstantiatedMethodType(), cl); + } catch (ReflectiveOperationException ex) { throw new IllegalStateException( - "Cannot infer input type from lambda. Pass Class or use a method reference."); + "Cannot infer type from lambda. Pass Class or use a method reference.", ex); } - return clazz; } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableConsumer.java similarity index 94% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableConsumer.java index 0de74340..60df61d0 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableConsumer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import java.io.Serializable; import java.util.function.Consumer; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableFunction.java similarity index 94% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableFunction.java index e5d48d1d..c4b2eee0 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializableFunction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import java.io.Serializable; import java.util.function.Function; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializablePredicate.java similarity index 94% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializablePredicate.java index 62d36d01..a960b8db 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/SerializablePredicate.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import java.io.Serializable; import java.util.function.Predicate; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/UniqueIdBiFunction.java similarity index 95% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/UniqueIdBiFunction.java index f1f0d788..4340cc3f 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/UniqueIdBiFunction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl; +package io.serverlessworkflow.api.reflection.func; import java.io.Serializable; import java.util.function.BiFunction; diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/Review.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallAbstractJavaFunction.java similarity index 51% rename from experimental/test/src/test/java/io/serverlessworkflow/fluent/test/Review.java rename to experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallAbstractJavaFunction.java index ab16a454..0a8af120 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/Review.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallAbstractJavaFunction.java @@ -13,6 +13,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.test; +package io.serverlessworkflow.api.types.func; -record Review(String title, String description, int rating) {} +import java.util.Optional; + +public abstract class CallAbstractJavaFunction extends CallJava { + + private static final long serialVersionUID = 1L; + + private final Optional> outputClass; + + protected CallAbstractJavaFunction() { + this(Optional.empty(), Optional.empty()); + } + + protected CallAbstractJavaFunction( + Optional> inputClass, Optional> outputClass) { + super(inputClass); + this.outputClass = outputClass; + } + + public Optional> outputClass() { + return outputClass; + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java index e9829755..e441ecfb 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java @@ -20,128 +20,150 @@ import java.util.function.Consumer; import java.util.function.Function; -public abstract class CallJava extends TaskBase { +public abstract class CallJava extends TaskBase { private static final long serialVersionUID = 1L; - public static CallJava consumer(Consumer consumer) { + private final Optional> inputClass; + + protected CallJava() { + this(Optional.empty()); + } + + protected CallJava(Optional> inputClass) { + this.inputClass = inputClass; + } + + public Optional> inputClass() { + return inputClass; + } + + public static CallJava consumer(Consumer consumer) { return new CallJavaConsumer<>(consumer, Optional.empty()); } - public static CallJava consumer(Consumer consumer, Class inputClass) { + public static CallJava consumer(Consumer consumer, Class inputClass) { return new CallJavaConsumer<>(consumer, Optional.ofNullable(inputClass)); } public static CallJavaFunction function(Function function) { - return new CallJavaFunction<>(function, Optional.empty()); + return new CallJavaFunction<>(function, Optional.empty(), Optional.empty()); } public static CallJavaFunction function( Function function, Class inputClass) { - return new CallJavaFunction<>(function, Optional.ofNullable(inputClass)); + return new CallJavaFunction<>(function, Optional.ofNullable(inputClass), Optional.empty()); + } + + public static CallJavaFunction function( + Function function, Class inputClass, Class outputClass) { + return new CallJavaFunction<>( + function, Optional.ofNullable(inputClass), Optional.ofNullable(outputClass)); } - public static CallJava loopFunction( + public static CallJava loopFunction( LoopFunctionIndex function, String varName, String indexName) { return new CallJavaLoopFunctionIndex<>(function, varName, indexName); } - public static CallJava loopFunction(LoopFunction function, String varName) { + public static CallJava loopFunction(LoopFunction function, String varName) { return new CallJavaLoopFunction<>(function, varName); } - public static CallJava function(ContextFunction function, Class inputClass) { - return new CallJavaContextFunction<>(function, Optional.ofNullable(inputClass)); + public static CallJava function(ContextFunction function, Class inputClass) { + return new CallJavaContextFunction<>( + function, Optional.ofNullable(inputClass), Optional.empty()); + } + + public static CallJava function( + ContextFunction function, Class inputClass, Class outputClass) { + return new CallJavaContextFunction<>( + function, Optional.ofNullable(inputClass), Optional.ofNullable(outputClass)); } - public static CallJava function(FilterFunction function, Class inputClass) { - return new CallJavaFilterFunction<>(function, Optional.ofNullable(inputClass)); + public static CallJava function(FilterFunction function, Class inputClass) { + return new CallJavaFilterFunction<>( + function, Optional.ofNullable(inputClass), Optional.empty()); } - public static class CallJavaConsumer extends CallJava { + public static CallJava function( + FilterFunction function, Class inputClass, Class outputClass) { + return new CallJavaFilterFunction<>( + function, Optional.ofNullable(inputClass), Optional.ofNullable(outputClass)); + } + + public static class CallJavaConsumer extends CallJava { private static final long serialVersionUID = 1L; private final Consumer consumer; - private final Optional> inputClass; public CallJavaConsumer(Consumer consumer, Optional> inputClass) { + super(inputClass); this.consumer = consumer; - this.inputClass = inputClass; } public Consumer consumer() { return consumer; } - - public Optional> inputClass() { - return inputClass; - } } - public static class CallJavaFunction extends CallJava { + public static class CallJavaFunction extends CallAbstractJavaFunction { private static final long serialVersionUID = 1L; - private Function function; - private Optional> inputClass; + private final Function function; - public CallJavaFunction(Function function, Optional> inputClass) { + public CallJavaFunction( + Function function, Optional> inputClass, Optional> outputClass) { + super(inputClass, outputClass); this.function = function; - this.inputClass = inputClass; } public Function function() { return function; } - - public Optional> inputClass() { - return inputClass; - } } - public static class CallJavaContextFunction extends CallJava { + public static class CallJavaContextFunction extends CallAbstractJavaFunction { private static final long serialVersionUID = 1L; private final ContextFunction function; - private final Optional> inputClass; - public CallJavaContextFunction(ContextFunction function, Optional> inputClass) { + public CallJavaContextFunction( + ContextFunction function, + Optional> inputClass, + Optional> outputClass) { + super(inputClass, outputClass); this.function = function; - this.inputClass = inputClass; } public ContextFunction function() { return function; } - - public Optional> inputClass() { - return inputClass; - } } - public static class CallJavaFilterFunction extends CallJava { + public static class CallJavaFilterFunction extends CallAbstractJavaFunction { private static final long serialVersionUID = 1L; private final FilterFunction function; - private final Optional> inputClass; - public CallJavaFilterFunction(FilterFunction function, Optional> inputClass) { + public CallJavaFilterFunction( + FilterFunction function, + Optional> inputClass, + Optional> outputClass) { + super(inputClass, outputClass); this.function = function; - this.inputClass = inputClass; } public FilterFunction function() { return function; } - - public Optional> inputClass() { - return inputClass; - } } - public static class CallJavaLoopFunction extends CallJava { + public static class CallJavaLoopFunction extends CallAbstractJavaFunction { private static final long serialVersionUID = 1L; private LoopFunction function; private String varName; public CallJavaLoopFunction(LoopFunction function, String varName) { + this.function = function; this.varName = varName; } @@ -155,7 +177,7 @@ public String varName() { } } - public static class CallJavaLoopFunctionIndex extends CallJava { + public static class CallJavaLoopFunctionIndex extends CallAbstractJavaFunction { private static final long serialVersionUID = 1L; private final LoopFunctionIndex function; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index dddf8127..3d586303 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -73,9 +73,9 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio private CloudEventAttrPredicate envelopeFilter( EventProperties properties, WorkflowApplication app) { Object envelopePredObj = null; - if (properties.getAdditionalProperties() != null - && properties.getAdditionalProperties().containsKey(ENVELOPE_PREDICATE)) + if (properties.getAdditionalProperties() != null) { envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE); + } return envelopePredObj == null ? isTrue()