From ad435f3e233db76d7896e0dcf97e351616ed42ca Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 1 Apr 2026 19:11:22 +0200 Subject: [PATCH] [Fix #1286] Non blocking persistence Signed-off-by: fjtirado --- .../impl/SchedulerListener.java | 8 +- .../impl/WorkflowApplication.java | 19 ++- .../impl/WorkflowMutableInstance.java | 44 ++++--- .../impl/executors/AbstractTaskExecutor.java | 40 +++--- .../impl/lifecycle/LifecycleEventsUtils.java | 28 ++--- .../WorkflowExecutionCompletableListener.java | 81 ++++++++++++ .../WorkflowExecutionListenerAdapter.java | 116 ++++++++++++++++++ .../impl/LowestPriorityListener.java | 4 +- .../impl/MediumPriorityListener.java | 4 +- .../impl/TopPriorityListener.java | 4 +- .../impl/WorkflowListenerTest.java | 18 +-- .../AbstractAsyncPersistenceExecutor.java | 53 +------- .../persistence/AsyncPersistenceExecutor.java | 6 +- .../DefaultPersistenceInstanceHandlers.java | 2 +- .../impl/persistence/PersistenceExecutor.java | 8 -- .../persistence/SyncPersistenceExecutor.java | 36 ------ .../TransactedPersistenceInstanceWriter.java | 4 +- .../WorkflowPersistenceInstance.java | 1 + .../WorkflowPersistenceListener.java | 41 ++++--- 19 files changed, 324 insertions(+), 193 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java delete mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java index 2aa576947..2f4a4c6d4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java @@ -16,14 +16,15 @@ package io.serverlessworkflow.impl; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import io.serverlessworkflow.impl.scheduler.Cancellable; import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; import java.time.Duration; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -class SchedulerListener implements WorkflowExecutionListener, AutoCloseable { +class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable { private final WorkflowScheduler scheduler; private final Map> afterMap = @@ -39,7 +40,7 @@ public void addAfter(WorkflowDefinition definition, WorkflowValueResolver onWorkflowCompleted(WorkflowCompletedEvent ev) { WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition(); WorkflowValueResolver after = afterMap.get(workflowDefinition); if (after != null) { @@ -49,6 +50,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) { workflowDefinition, after.apply((WorkflowContext) ev.workflowContext(), null, ev.output()))); } + return CompletableFuture.completedFuture(null); } public void removeAfter(WorkflowDefinition definition) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 98e177617..5139d6fb8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -34,7 +34,9 @@ import io.serverlessworkflow.impl.executors.TaskExecutorFactory; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.RuntimeDescriptor; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter; import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ExternalResourceHandler; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; @@ -67,7 +69,7 @@ public class WorkflowApplication implements AutoCloseable { private final ResourceLoaderFactory resourceLoaderFactory; private final SchemaValidatorFactory schemaValidatorFactory; private final WorkflowInstanceIdFactory idFactory; - private final Collection listeners; + private final Collection listeners; private final Map definitions; private final WorkflowPositionFactory positionFactory; private final ExecutorServiceFactory executorFactory; @@ -137,7 +139,7 @@ public ResourceLoaderFactory resourceLoaderFactory() { return resourceLoaderFactory; } - public Collection listeners() { + public Collection listeners() { return listeners; } @@ -175,8 +177,10 @@ public SchemaValidator getValidator(SchemaInline inline) { private String id; private TaskExecutorFactory taskFactory; private Collection exprFactories = new HashSet<>(); - private List listeners = - loadFromServiceLoader(WorkflowExecutionListener.class); + private List listeners = + ServiceLoader.load(WorkflowExecutionListener.class).stream() + .map(v -> new WorkflowExecutionListenerAdapter(v.get())) + .collect(Collectors.toList()); private List callableProxyBuilders = loadFromServiceLoader(CallableTaskProxyBuilder.class); private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get(); @@ -212,6 +216,11 @@ public Builder withId(String id) { } public Builder withListener(WorkflowExecutionListener listener) { + listeners.add(new WorkflowExecutionListenerAdapter(listener)); + return this; + } + + public Builder withListener(WorkflowExecutionCompletableListener listener) { listeners.add(listener); return this; } @@ -414,7 +423,7 @@ public void close() { } definitions.clear(); - for (WorkflowExecutionListener listener : listeners) { + for (WorkflowExecutionCompletableListener listener : listeners) { safeClose(listener); } listeners.clear(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index cdea2d56a..0c67713a7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -70,30 +70,43 @@ public CompletableFuture start() { return startExecution( () -> { startedAt = Instant.now(); - publishEvent( + return publishEvent( workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext))); }); } - protected final CompletableFuture startExecution(Runnable runnable) { + protected final CompletableFuture startExecution( + Supplier> runnable) { CompletableFuture future = futureRef.get(); if (future != null) { return future; } status(WorkflowStatus.RUNNING); - runnable.run(); + future = - TaskExecutorHelper.processTaskList( - workflowContext.definition().startTask(), - workflowContext, - Optional.empty(), - workflowContext - .definition() - .inputFilter() - .map(f -> f.apply(workflowContext, null, input)) - .orElse(input)) - .whenComplete(this::whenCompleted) - .thenApply(this::whenSuccess); + runnable + .get() + .thenCompose( + v -> + TaskExecutorHelper.processTaskList( + workflowContext.definition().startTask(), + workflowContext, + Optional.empty(), + workflowContext + .definition() + .inputFilter() + .map(f -> f.apply(workflowContext, null, input)) + .orElse(input)) + .whenComplete(this::whenCompleted) + .thenApply(this::whenSuccess) + .thenCompose( + model -> + publishEvent( + workflowContext, + l -> + l.onWorkflowCompleted( + new WorkflowCompletedEvent(workflowContext, model))) + .thenApply(__ -> model))); futureRef.set(future); return future; } @@ -126,9 +139,6 @@ private WorkflowModel whenSuccess(WorkflowModel node) { .orElse(node); workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output)); status(WorkflowStatus.COMPLETED); - publishEvent( - workflowContext, - l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output))); return output; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index de7ce0132..cd8590521 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -213,17 +213,24 @@ public CompletableFuture apply( completable = completable .thenCompose(workflowContext.instance()::suspendedCheck) + .thenCompose( + t -> { + CompletableFuture events = + t.isRetrying() + ? publishEvent( + workflowContext, + l -> + l.onTaskRetried( + new TaskRetriedEvent(workflowContext, taskContext))) + : publishEvent( + workflowContext, + l -> + l.onTaskStarted( + new TaskStartedEvent(workflowContext, taskContext))); + return events.thenApply(v -> t); + }) .thenApply( t -> { - if (t.isRetrying()) { - publishEvent( - workflowContext, - l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext))); - } else { - publishEvent( - workflowContext, - l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); - } inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); inputProcessor.ifPresent( p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); @@ -251,13 +258,16 @@ public CompletableFuture apply( p.apply(workflowContext, t, workflowContext.context()))); contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); t.completedAt(Instant.now()); - publishEvent( - workflowContext, - l -> - l.onTaskCompleted( - new TaskCompletedEvent(workflowContext, taskContext))); return t; - }); + }) + .thenCompose( + t -> + publishEvent( + workflowContext, + l -> + l.onTaskCompleted( + new TaskCompletedEvent(workflowContext, taskContext))) + .thenApply(__ -> t)); if (timeout.isPresent()) { completable = completable diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java index d7fb25db5..2ba43062f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java @@ -16,29 +16,19 @@ package io.serverlessworkflow.impl.lifecycle; import io.serverlessworkflow.impl.WorkflowContext; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; public class LifecycleEventsUtils { private LifecycleEventsUtils() {} - private static final Logger logger = LoggerFactory.getLogger(LifecycleEventsUtils.class); - - public static void publishEvent( - WorkflowContext workflowContext, Consumer consumer) { - workflowContext - .definition() - .application() - .listeners() - .forEach( - v -> { - try { - consumer.accept(v); - } catch (Exception ex) { - logger.error("Error processing listener. Ignoring and going on", ex); - } - }); + public static CompletableFuture publishEvent( + WorkflowContext workflowContext, + Function> function) { + return CompletableFuture.allOf( + workflowContext.definition().application().listeners().stream() + .map(v -> function.apply(v)) + .toArray(CompletableFuture[]::new)); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java new file mode 100644 index 000000000..4e83a9e77 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.ServicePriority; +import java.util.concurrent.CompletableFuture; + +public interface WorkflowExecutionCompletableListener extends AutoCloseable, ServicePriority { + + default CompletableFuture onWorkflowStarted(WorkflowStartedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowSuspended(WorkflowSuspendedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowResumed(WorkflowResumedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowCompleted(WorkflowCompletedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowFailed(WorkflowFailedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowCancelled(WorkflowCancelledEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskStarted(TaskStartedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskCompleted(TaskCompletedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskFailed(TaskFailedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskCancelled(TaskCancelledEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskSuspended(TaskSuspendedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskResumed(TaskResumedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onTaskRetried(TaskRetriedEvent ev) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture onWorkflowStatusChanged(WorkflowStatusEvent ev) { + return CompletableFuture.completedFuture(null); + } + + @Override + default void close() {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java new file mode 100644 index 000000000..4bd3ae9e3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java @@ -0,0 +1,116 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import java.util.concurrent.CompletableFuture; + +public class WorkflowExecutionListenerAdapter implements WorkflowExecutionCompletableListener { + + private final WorkflowExecutionListener listener; + + public WorkflowExecutionListenerAdapter(WorkflowExecutionListener listener) { + this.listener = listener; + } + + @Override + public CompletableFuture onWorkflowStarted(WorkflowStartedEvent ev) { + listener.onWorkflowStarted(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowSuspended(WorkflowSuspendedEvent ev) { + listener.onWorkflowSuspended(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowResumed(WorkflowResumedEvent ev) { + listener.onWorkflowResumed(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowCompleted(WorkflowCompletedEvent ev) { + listener.onWorkflowCompleted(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowFailed(WorkflowFailedEvent ev) { + listener.onWorkflowFailed(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowCancelled(WorkflowCancelledEvent ev) { + listener.onWorkflowCancelled(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskStarted(TaskStartedEvent ev) { + listener.onTaskStarted(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskCompleted(TaskCompletedEvent ev) { + listener.onTaskCompleted(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskFailed(TaskFailedEvent ev) { + listener.onTaskFailed(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskCancelled(TaskCancelledEvent ev) { + listener.onTaskCancelled(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskSuspended(TaskSuspendedEvent ev) { + listener.onTaskSuspended(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskResumed(TaskResumedEvent ev) { + listener.onTaskResumed(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onTaskRetried(TaskRetriedEvent ev) { + listener.onTaskRetried(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onWorkflowStatusChanged(WorkflowStatusEvent ev) { + listener.onWorkflowStatusChanged(ev); + return CompletableFuture.completedFuture(null); + } + + @Override + public int priority() { + return listener.priority(); + } +} diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java b/impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java index 8673ef087..7109ab4c6 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java @@ -15,9 +15,9 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; -public class LowestPriorityListener implements WorkflowExecutionListener { +public class LowestPriorityListener implements WorkflowExecutionCompletableListener { public int priority() { return DEFAULT_PRIORITY + 1; } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java b/impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java index b378c80cf..a26abe944 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java @@ -15,10 +15,10 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import java.util.Objects; -public class MediumPriorityListener implements WorkflowExecutionListener { +public class MediumPriorityListener implements WorkflowExecutionCompletableListener { private final String id; diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java b/impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java index 375aeb08f..5b55f0248 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java @@ -15,9 +15,9 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; -public class TopPriorityListener implements WorkflowExecutionListener { +public class TopPriorityListener implements WorkflowExecutionCompletableListener { public int priority() { return DEFAULT_PRIORITY - 1; } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java index d52263ba0..536648211 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java @@ -17,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -33,10 +33,10 @@ void setup() { @Test void testSorted() { - WorkflowExecutionListener mediumPrio = new MediumPriorityListener("javi"); - WorkflowExecutionListener lowestPrio = new LowestPriorityListener(); - WorkflowExecutionListener topPrio = new TopPriorityListener(); - WorkflowExecutionListener anotherMediumPrio = new MediumPriorityListener("javier"); + WorkflowExecutionCompletableListener mediumPrio = new MediumPriorityListener("javi"); + WorkflowExecutionCompletableListener lowestPrio = new LowestPriorityListener(); + WorkflowExecutionCompletableListener topPrio = new TopPriorityListener(); + WorkflowExecutionCompletableListener anotherMediumPrio = new MediumPriorityListener("javier"); WorkflowApplication app = WorkflowApplication.builder() @@ -54,10 +54,10 @@ void testSorted() { @Test void testNotDuplicated() { - WorkflowExecutionListener mediumPrio = new MediumPriorityListener("javi"); - WorkflowExecutionListener lowestPrio = new LowestPriorityListener(); - WorkflowExecutionListener topPrio = new TopPriorityListener(); - WorkflowExecutionListener anotherMediumPrio = new MediumPriorityListener("javi"); + WorkflowExecutionCompletableListener mediumPrio = new MediumPriorityListener("javi"); + WorkflowExecutionCompletableListener lowestPrio = new LowestPriorityListener(); + WorkflowExecutionCompletableListener topPrio = new TopPriorityListener(); + WorkflowExecutionCompletableListener anotherMediumPrio = new MediumPriorityListener("javi"); WorkflowApplication app = WorkflowApplication.builder() diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java index f94393d21..55d44ee43 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java @@ -16,64 +16,15 @@ package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowContextData; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor { - - private static final Logger logger = - LoggerFactory.getLogger(AbstractAsyncPersistenceExecutor.class); - - private final Map> futuresMap = new ConcurrentHashMap<>(); - @Override public CompletableFuture execute(Runnable runnable, WorkflowContextData context) { - final ExecutorService service = - executorService().orElse(context.definition().application().executorService()); - return futuresMap.compute( - context.instanceData().id(), - (k, v) -> - v == null - ? CompletableFuture.runAsync(runnable, service) - : v.thenRunAsync(runnable, service)); - } - - @Override - public CompletableFuture startInstance(Runnable runnable, WorkflowContextData context) { - return SyncPersistenceExecutor.execute(runnable); - } - - @Override - public CompletableFuture deleteInstance(Runnable runnable, WorkflowContextData context) { - CompletableFuture completable = futuresMap.remove(context.instanceData().id()); - if (completable != null) { - CompletableFuture result = completable.whenComplete((__, ___) -> runnable.run()); - completable.cancel(true); - return result; - } else { - return CompletableFuture.completedFuture(null); - } - } - - @Override - public void close() { - for (CompletableFuture future : futuresMap.values()) { - try { - future.get(); - } catch (InterruptedException ex) { - logger.warn("Thread interrupted while writing to db", ex); - Thread.currentThread().interrupt(); - } catch (ExecutionException ex) { - logger.warn("Exception while writing to db", ex.getCause()); - } - } - futuresMap.clear(); + return CompletableFuture.runAsync( + runnable, executorService().orElse(context.definition().application().executorService())); } protected abstract Optional executorService(); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java index 3cb79403e..e8a10ab8a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java @@ -22,7 +22,11 @@ public class AsyncPersistenceExecutor extends AbstractAsyncPersistenceExecutor { private final Optional service; - protected AsyncPersistenceExecutor(ExecutorService service) { + public AsyncPersistenceExecutor() { + this.service = Optional.empty(); + } + + public AsyncPersistenceExecutor(ExecutorService service) { this.service = Optional.ofNullable(service); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java index 953111042..ac582a1b5 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -43,7 +43,7 @@ public Builder withPersistenceExecutor(PersistenceExecutor executor) { public PersistenceInstanceHandlers build() { return new DefaultPersistenceInstanceHandlers( new DefaultPersistenceInstanceWriter( - store, executor == null ? new SyncPersistenceExecutor() : executor), + store, executor == null ? new AsyncPersistenceExecutor() : executor), new DefaultPersistenceInstanceReader(store), store); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java index 4520ed5be..8d7703764 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java @@ -21,13 +21,5 @@ public interface PersistenceExecutor extends AutoCloseable { CompletableFuture execute(Runnable runnable, WorkflowContextData context); - default CompletableFuture startInstance(Runnable runnable, WorkflowContextData context) { - return execute(runnable, context); - } - - default CompletableFuture deleteInstance(Runnable runnable, WorkflowContextData context) { - return execute(runnable, context); - } - default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java deleted file mode 100644 index 701d60056..000000000 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence; - -import io.serverlessworkflow.impl.WorkflowContextData; -import java.util.concurrent.CompletableFuture; - -public class SyncPersistenceExecutor implements PersistenceExecutor { - - @Override - public CompletableFuture execute(Runnable runnable, WorkflowContextData context) { - return execute(runnable); - } - - public static CompletableFuture execute(Runnable runnable) { - try { - runnable.run(); - return CompletableFuture.completedFuture(null); - } catch (Exception ex) { - return CompletableFuture.failedFuture(ex); - } - } -} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java index 0e95cd565..9495aeb41 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java @@ -34,14 +34,14 @@ protected CompletableFuture doTransaction( protected CompletableFuture doStartInstance( Consumer operation, WorkflowContextData context) { return persistenceExecutor() - .startInstance(() -> doTransaction(operation, context.definition()), context); + .execute(() -> doTransaction(operation, context.definition()), context); } @Override protected CompletableFuture doCompleteInstance( Consumer operation, WorkflowContextData context) { return persistenceExecutor() - .deleteInstance(() -> doTransaction(operation, context.definition()), context); + .execute(() -> doTransaction(operation, context.definition()), context); } protected abstract void doTransaction( diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index a245c113d..df767d6e2 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -41,6 +41,7 @@ public CompletableFuture start() { if (info.status() == WorkflowStatus.SUSPENDED) { internalSuspend(); } + return CompletableFuture.completedFuture(null); }); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index 781b8c12a..e31f2943d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -20,13 +20,14 @@ import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; +import java.util.concurrent.CompletableFuture; -public class WorkflowPersistenceListener implements WorkflowExecutionListener { +public class WorkflowPersistenceListener implements WorkflowExecutionCompletableListener { private final PersistenceInstanceWriter persistenceWriter; @@ -35,47 +36,47 @@ public WorkflowPersistenceListener(PersistenceInstanceWriter persistenceWriter) } @Override - public void onWorkflowStarted(WorkflowStartedEvent ev) { - persistenceWriter.started(ev.workflowContext()); + public CompletableFuture onWorkflowStarted(WorkflowStartedEvent ev) { + return persistenceWriter.started(ev.workflowContext()); } @Override - public void onWorkflowFailed(WorkflowFailedEvent ev) { - persistenceWriter.failed(ev.workflowContext(), ev.cause()); + public CompletableFuture onWorkflowFailed(WorkflowFailedEvent ev) { + return persistenceWriter.failed(ev.workflowContext(), ev.cause()); } @Override - public void onWorkflowCancelled(WorkflowCancelledEvent ev) { - persistenceWriter.aborted(ev.workflowContext()); + public CompletableFuture onWorkflowCancelled(WorkflowCancelledEvent ev) { + return persistenceWriter.aborted(ev.workflowContext()); } @Override - public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { - persistenceWriter.suspended(ev.workflowContext()); + public CompletableFuture onWorkflowSuspended(WorkflowSuspendedEvent ev) { + return persistenceWriter.suspended(ev.workflowContext()); } @Override - public void onWorkflowResumed(WorkflowResumedEvent ev) { - persistenceWriter.resumed(ev.workflowContext()); + public CompletableFuture onWorkflowResumed(WorkflowResumedEvent ev) { + return persistenceWriter.resumed(ev.workflowContext()); } @Override - public void onWorkflowCompleted(WorkflowCompletedEvent ev) { - persistenceWriter.completed(ev.workflowContext()); + public CompletableFuture onWorkflowCompleted(WorkflowCompletedEvent ev) { + return persistenceWriter.completed(ev.workflowContext()); } @Override - public void onTaskStarted(TaskStartedEvent ev) { - persistenceWriter.taskStarted(ev.workflowContext(), ev.taskContext()); + public CompletableFuture onTaskStarted(TaskStartedEvent ev) { + return persistenceWriter.taskStarted(ev.workflowContext(), ev.taskContext()); } @Override - public void onTaskCompleted(TaskCompletedEvent ev) { - persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext()); + public CompletableFuture onTaskCompleted(TaskCompletedEvent ev) { + return persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext()); } @Override - public void onTaskRetried(TaskRetriedEvent ev) { - persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); + public CompletableFuture onTaskRetried(TaskRetriedEvent ev) { + return persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); } }