[Fix #1286] Non blocking persistence#1288
[Fix #1286] Non blocking persistence#1288fjtirado wants to merge 1 commit intoserverlessworkflow:mainfrom
Conversation
Signed-off-by: fjtirado <ftirados@redhat.com>
There was a problem hiding this comment.
This will create backward incompatibility on qflow when released, but I guess it is acceptable (we are swithcing to AsyncPersistenceExceutor
There was a problem hiding this comment.
Pull request overview
Implements non-blocking persistence by moving workflow lifecycle listeners to a CompletableFuture-based contract and switching default persistence execution to async.
Changes:
- Introduces
WorkflowExecutionCompletableListenerand adapts existingWorkflowExecutionListenerimplementations via an adapter. - Refactors lifecycle event publishing to return/compose
CompletableFutures rather than fire-and-forget. - Updates persistence executors/writers to use a unified
execute(...)path and defaults toAsyncPersistenceExecutor.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java | Migrates persistence listener callbacks to return CompletableFuture for async persistence writes. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java | Adjusts async control flow in start() to return a CompletableFuture from a callback. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java | Uses PersistenceExecutor.execute(...) uniformly (removes start/delete special-casing). |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java | Removes synchronous executor implementation. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java | Simplifies executor API by removing startInstance/deleteInstance defaults. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java | Defaults persistence execution to AsyncPersistenceExecutor when none provided. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java | Makes async executor publicly constructible and adds a no-arg constructor. |
| impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java | Reworks async execution implementation (removes per-instance chaining/close drain). |
| impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java | Updates tests to use the new completable listener type. |
| impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java | Updates test listener to implement WorkflowExecutionCompletableListener. |
| impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java | Updates test listener to implement WorkflowExecutionCompletableListener. |
| impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java | Updates test listener to implement WorkflowExecutionCompletableListener. |
| impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java | Adds adapter from legacy listener to completable listener. |
| impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java | Adds new async listener interface with default completed futures. |
| impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java | Refactors event publishing to return a combined CompletableFuture. |
| impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java | Composes task lifecycle events into the task execution future chain. |
| impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java | Composes workflow start/completion events into the workflow execution future chain. |
| impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java | Stores completable listeners, adapts legacy listeners loaded via ServiceLoader, updates builder APIs. |
| impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java | Migrates scheduler listener to completable listener callback shape. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public static CompletableFuture<?> publishEvent( | ||
| WorkflowContext workflowContext, | ||
| Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) { | ||
| return CompletableFuture.allOf( | ||
| workflowContext.definition().application().listeners().stream() | ||
| .map(v -> function.apply(v)) | ||
| .toArray(CompletableFuture[]::new)); | ||
| } |
There was a problem hiding this comment.
This changes listener failure semantics: previously exceptions from listeners were caught/logged and execution continued; now (1) a synchronous exception thrown by function.apply(v) will abort publishing immediately, and (2) any returned future completing exceptionally will cause allOf(...) to complete exceptionally. Since callers now thenCompose on publishEvent, a single bad listener can fail the workflow/task execution. Consider wrapping each invocation to convert synchronous throws into a completed future, and attaching exceptionally(...)/handle(...) per-listener (with logging) so publishing remains best-effort like before.
| listener.onWorkflowStarted(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) { | ||
| listener.onWorkflowSuspended(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) { | ||
| listener.onWorkflowResumed(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) { | ||
| listener.onWorkflowCompleted(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) { | ||
| listener.onWorkflowFailed(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) { | ||
| listener.onWorkflowCancelled(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) { | ||
| listener.onTaskStarted(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) { | ||
| listener.onTaskCompleted(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) { | ||
| listener.onTaskFailed(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) { | ||
| listener.onTaskCancelled(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) { | ||
| listener.onTaskSuspended(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) { | ||
| listener.onTaskResumed(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) { | ||
| listener.onTaskRetried(ev); | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) { | ||
| listener.onWorkflowStatusChanged(ev); | ||
| return CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
If the wrapped legacy listener throws, this method throws synchronously and no CompletableFuture is returned, which can break publishEvent(...) mid-stream. To keep the adapter safe in async pipelines, catch RuntimeException/Exception from listener.on... and return a failed future (or a completed future if listeners are intended to be best-effort), ideally with logging consistent with prior behavior.
| listener.onWorkflowStarted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) { | |
| listener.onWorkflowSuspended(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) { | |
| listener.onWorkflowResumed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) { | |
| listener.onWorkflowCompleted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) { | |
| listener.onWorkflowFailed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) { | |
| listener.onWorkflowCancelled(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) { | |
| listener.onTaskStarted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) { | |
| listener.onTaskCompleted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) { | |
| listener.onTaskFailed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) { | |
| listener.onTaskCancelled(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) { | |
| listener.onTaskSuspended(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) { | |
| listener.onTaskResumed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) { | |
| listener.onTaskRetried(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) { | |
| listener.onWorkflowStatusChanged(ev); | |
| return CompletableFuture.completedFuture(null); | |
| try { | |
| listener.onWorkflowStarted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) { | |
| try { | |
| listener.onWorkflowSuspended(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) { | |
| try { | |
| listener.onWorkflowResumed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) { | |
| try { | |
| listener.onWorkflowCompleted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) { | |
| try { | |
| listener.onWorkflowFailed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) { | |
| try { | |
| listener.onWorkflowCancelled(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) { | |
| try { | |
| listener.onTaskStarted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) { | |
| try { | |
| listener.onTaskCompleted(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) { | |
| try { | |
| listener.onTaskFailed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) { | |
| try { | |
| listener.onTaskCancelled(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) { | |
| try { | |
| listener.onTaskSuspended(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) { | |
| try { | |
| listener.onTaskResumed(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) { | |
| try { | |
| listener.onTaskRetried(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| } | |
| @Override | |
| public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) { | |
| try { | |
| listener.onWorkflowStatusChanged(ev); | |
| return CompletableFuture.completedFuture(null); | |
| } catch (Exception e) { | |
| return CompletableFuture.failedFuture(e); | |
| } |
| @Override | ||
| public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) { | ||
| final ExecutorService service = | ||
| executorService().orElse(context.definition().application().executorService()); | ||
| return futuresMap.compute( | ||
| context.instanceData().id(), | ||
| (k, v) -> | ||
| v == null | ||
| ? CompletableFuture.runAsync(runnable, service) | ||
| : v.thenRunAsync(runnable, service)); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) { | ||
| return SyncPersistenceExecutor.execute(runnable); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) { | ||
| CompletableFuture<Void> completable = futuresMap.remove(context.instanceData().id()); | ||
| if (completable != null) { | ||
| CompletableFuture<Void> result = completable.whenComplete((__, ___) -> runnable.run()); | ||
| completable.cancel(true); | ||
| return result; | ||
| } else { | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| for (CompletableFuture<Void> future : futuresMap.values()) { | ||
| try { | ||
| future.get(); | ||
| } catch (InterruptedException ex) { | ||
| logger.warn("Thread interrupted while writing to db", ex); | ||
| Thread.currentThread().interrupt(); | ||
| } catch (ExecutionException ex) { | ||
| logger.warn("Exception while writing to db", ex.getCause()); | ||
| } | ||
| } | ||
| futuresMap.clear(); | ||
| return CompletableFuture.runAsync( | ||
| runnable, executorService().orElse(context.definition().application().executorService())); | ||
| } |
There was a problem hiding this comment.
The previous implementation serialized persistence writes per workflow instance (via a ConcurrentHashMap chaining futures) and close() drained outstanding writes. This new implementation schedules every write independently, which can lead to out-of-order/concurrent persistence operations for the same instance and can drop in-flight writes on shutdown (no drain). If ordering and durability are required, reintroduce per-instance sequencing (still non-blocking by chaining on the executor) and restore a close() strategy that waits for (or explicitly cancels/flushes) outstanding operations deterministically.
| .thenCompose( | ||
| model -> | ||
| publishEvent( | ||
| workflowContext, | ||
| l -> | ||
| l.onWorkflowCompleted( | ||
| new WorkflowCompletedEvent(workflowContext, model))) | ||
| .thenApply(__ -> model))); |
There was a problem hiding this comment.
Workflow execution is now on the critical path of listener completion (runnable.get() must complete before task processing starts, and the workflow future now waits for onWorkflowCompleted). With async persistence listeners, this can materially increase end-to-end latency and can cause workflows to hang indefinitely if a listener future never completes. If the goal of #1286 is to avoid persistence slowing down workflow execution, consider making event publishing fire-and-forget (e.g., triggering listener futures and observing/logging failures) rather than composing it into the main workflow chain, or apply backpressure/timeouts explicitly.
| .thenCompose( | |
| model -> | |
| publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onWorkflowCompleted( | |
| new WorkflowCompletedEvent(workflowContext, model))) | |
| .thenApply(__ -> model))); | |
| .thenApply( | |
| model -> { | |
| publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onWorkflowCompleted( | |
| new WorkflowCompletedEvent(workflowContext, model))) | |
| .exceptionally( | |
| ex -> { | |
| // TODO: handle workflow-completed listener failure (log, metrics, etc.) | |
| return null; | |
| }); | |
| return model; | |
| }); |
| .thenCompose( | ||
| t -> { | ||
| CompletableFuture<?> events = | ||
| t.isRetrying() | ||
| ? publishEvent( | ||
| workflowContext, | ||
| l -> | ||
| l.onTaskRetried( | ||
| new TaskRetriedEvent(workflowContext, taskContext))) | ||
| : publishEvent( | ||
| workflowContext, | ||
| l -> | ||
| l.onTaskStarted( | ||
| new TaskStartedEvent(workflowContext, taskContext))); | ||
| return events.thenApply(v -> t); |
There was a problem hiding this comment.
Task execution now waits for listener futures for onTaskStarted/onTaskRetried before validating/processing inputs. Combined with LifecycleEventsUtils.publishEvent(...) propagating failures, a slow/failing listener can delay or fail task execution. If listeners are intended to be observational (especially for persistence), consider not gating task execution on these futures, or ensure publishEvent(...) is best-effort and cannot hang indefinitely (timeouts/cancellation).
| .thenCompose( | |
| t -> { | |
| CompletableFuture<?> events = | |
| t.isRetrying() | |
| ? publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onTaskRetried( | |
| new TaskRetriedEvent(workflowContext, taskContext))) | |
| : publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onTaskStarted( | |
| new TaskStartedEvent(workflowContext, taskContext))); | |
| return events.thenApply(v -> t); | |
| .thenApply( | |
| t -> { | |
| if (t.isRetrying()) { | |
| publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onTaskRetried( | |
| new TaskRetriedEvent(workflowContext, taskContext))); | |
| } else { | |
| publishEvent( | |
| workflowContext, | |
| l -> | |
| l.onTaskStarted( | |
| new TaskStartedEvent(workflowContext, taskContext))); | |
| } | |
| return t; |
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
||
| class SchedulerListener implements WorkflowExecutionListener, AutoCloseable { | ||
| class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable { |
There was a problem hiding this comment.
WorkflowExecutionCompletableListener already extends AutoCloseable, so listing AutoCloseable here is redundant. Dropping the extra interface improves clarity without changing behavior.
| class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable { | |
| class SchedulerListener implements WorkflowExecutionCompletableListener { |
| Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) { | ||
| return CompletableFuture.allOf( | ||
| workflowContext.definition().application().listeners().stream() | ||
| .map(v -> function.apply(v)) |
There was a problem hiding this comment.
The new async event publishing contract introduces important behavior that should be covered by tests (especially since it now returns a CompletableFuture). Add unit tests that verify: (1) one listener throwing synchronously doesn’t prevent others from running, (2) one listener completing exceptionally doesn’t fail the overall publish (if best-effort is intended), and (3) listener completion does/does not gate workflow/task progression depending on the intended semantics of #1286.
| .map(v -> function.apply(v)) | |
| .map( | |
| listener -> { | |
| try { | |
| CompletableFuture<?> future = function.apply(listener); | |
| if (future == null) { | |
| // Treat null as an already-completed listener to avoid NPEs | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| // Swallow exceptional completion to preserve best-effort semantics | |
| return future.exceptionally(ex -> null); | |
| } catch (Throwable t) { | |
| // Ensure one listener throwing synchronously doesn't prevent others | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| }) |
Fix #1286