Skip to content

[Fix #1286] Non blocking persistence#1288

Open
fjtirado wants to merge 1 commit intoserverlessworkflow:mainfrom
fjtirado:Fix_#1286
Open

[Fix #1286] Non blocking persistence#1288
fjtirado wants to merge 1 commit intoserverlessworkflow:mainfrom
fjtirado:Fix_#1286

Conversation

@fjtirado
Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado commented Apr 1, 2026

Fix #1286

Signed-off-by: fjtirado <ftirados@redhat.com>
@fjtirado fjtirado marked this pull request as ready for review April 1, 2026 17:41
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create backward incompatibility on qflow when released, but I guess it is acceptable (we are swithcing to AsyncPersistenceExceutor

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements non-blocking persistence by moving workflow lifecycle listeners to a CompletableFuture-based contract and switching default persistence execution to async.

Changes:

  • Introduces WorkflowExecutionCompletableListener and adapts existing WorkflowExecutionListener implementations via an adapter.
  • Refactors lifecycle event publishing to return/compose CompletableFutures rather than fire-and-forget.
  • Updates persistence executors/writers to use a unified execute(...) path and defaults to AsyncPersistenceExecutor.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java Migrates persistence listener callbacks to return CompletableFuture for async persistence writes.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java Adjusts async control flow in start() to return a CompletableFuture from a callback.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java Uses PersistenceExecutor.execute(...) uniformly (removes start/delete special-casing).
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java Removes synchronous executor implementation.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java Simplifies executor API by removing startInstance/deleteInstance defaults.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java Defaults persistence execution to AsyncPersistenceExecutor when none provided.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java Makes async executor publicly constructible and adds a no-arg constructor.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java Reworks async execution implementation (removes per-instance chaining/close drain).
impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowListenerTest.java Updates tests to use the new completable listener type.
impl/core/src/test/java/io/serverlessworkflow/impl/TopPriorityListener.java Updates test listener to implement WorkflowExecutionCompletableListener.
impl/core/src/test/java/io/serverlessworkflow/impl/MediumPriorityListener.java Updates test listener to implement WorkflowExecutionCompletableListener.
impl/core/src/test/java/io/serverlessworkflow/impl/LowestPriorityListener.java Updates test listener to implement WorkflowExecutionCompletableListener.
impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListenerAdapter.java Adds adapter from legacy listener to completable listener.
impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionCompletableListener.java Adds new async listener interface with default completed futures.
impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java Refactors event publishing to return a combined CompletableFuture.
impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java Composes task lifecycle events into the task execution future chain.
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java Composes workflow start/completion events into the workflow execution future chain.
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java Stores completable listeners, adapts legacy listeners loaded via ServiceLoader, updates builder APIs.
impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java Migrates scheduler listener to completable listener callback shape.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +26 to 33
public static CompletableFuture<?> publishEvent(
WorkflowContext workflowContext,
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
return CompletableFuture.allOf(
workflowContext.definition().application().listeners().stream()
.map(v -> function.apply(v))
.toArray(CompletableFuture[]::new));
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes listener failure semantics: previously exceptions from listeners were caught/logged and execution continued; now (1) a synchronous exception thrown by function.apply(v) will abort publishing immediately, and (2) any returned future completing exceptionally will cause allOf(...) to complete exceptionally. Since callers now thenCompose on publishEvent, a single bad listener can fail the workflow/task execution. Consider wrapping each invocation to convert synchronous throws into a completed future, and attaching exceptionally(...)/handle(...) per-listener (with logging) so publishing remains best-effort like before.

Copilot uses AI. Check for mistakes.
Comment on lines +30 to +109
listener.onWorkflowStarted(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
listener.onWorkflowSuspended(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
listener.onWorkflowResumed(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
listener.onWorkflowCompleted(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
listener.onWorkflowFailed(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
listener.onWorkflowCancelled(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
listener.onTaskStarted(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
listener.onTaskCompleted(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
listener.onTaskFailed(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
listener.onTaskCancelled(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
listener.onTaskSuspended(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
listener.onTaskResumed(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
listener.onTaskRetried(ev);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
listener.onWorkflowStatusChanged(ev);
return CompletableFuture.completedFuture(null);
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the wrapped legacy listener throws, this method throws synchronously and no CompletableFuture is returned, which can break publishEvent(...) mid-stream. To keep the adapter safe in async pipelines, catch RuntimeException/Exception from listener.on... and return a failed future (or a completed future if listeners are intended to be best-effort), ideally with logging consistent with prior behavior.

Suggested change
listener.onWorkflowStarted(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
listener.onWorkflowSuspended(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
listener.onWorkflowResumed(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
listener.onWorkflowCompleted(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
listener.onWorkflowFailed(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
listener.onWorkflowCancelled(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
listener.onTaskStarted(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
listener.onTaskCompleted(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
listener.onTaskFailed(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
listener.onTaskCancelled(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
listener.onTaskSuspended(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
listener.onTaskResumed(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
listener.onTaskRetried(ev);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
listener.onWorkflowStatusChanged(ev);
return CompletableFuture.completedFuture(null);
try {
listener.onWorkflowStarted(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
try {
listener.onWorkflowSuspended(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
try {
listener.onWorkflowResumed(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
try {
listener.onWorkflowCompleted(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
try {
listener.onWorkflowFailed(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
try {
listener.onWorkflowCancelled(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
try {
listener.onTaskStarted(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
try {
listener.onTaskCompleted(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
try {
listener.onTaskFailed(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
try {
listener.onTaskCancelled(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
try {
listener.onTaskSuspended(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
try {
listener.onTaskResumed(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
try {
listener.onTaskRetried(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Override
public CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
try {
listener.onWorkflowStatusChanged(ev);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}

Copilot uses AI. Check for mistakes.
Comment on lines 24 to 28
@Override
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
final ExecutorService service =
executorService().orElse(context.definition().application().executorService());
return futuresMap.compute(
context.instanceData().id(),
(k, v) ->
v == null
? CompletableFuture.runAsync(runnable, service)
: v.thenRunAsync(runnable, service));
}

@Override
public CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) {
return SyncPersistenceExecutor.execute(runnable);
}

@Override
public CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) {
CompletableFuture<Void> completable = futuresMap.remove(context.instanceData().id());
if (completable != null) {
CompletableFuture<Void> result = completable.whenComplete((__, ___) -> runnable.run());
completable.cancel(true);
return result;
} else {
return CompletableFuture.completedFuture(null);
}
}

@Override
public void close() {
for (CompletableFuture<Void> future : futuresMap.values()) {
try {
future.get();
} catch (InterruptedException ex) {
logger.warn("Thread interrupted while writing to db", ex);
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
logger.warn("Exception while writing to db", ex.getCause());
}
}
futuresMap.clear();
return CompletableFuture.runAsync(
runnable, executorService().orElse(context.definition().application().executorService()));
}
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation serialized persistence writes per workflow instance (via a ConcurrentHashMap chaining futures) and close() drained outstanding writes. This new implementation schedules every write independently, which can lead to out-of-order/concurrent persistence operations for the same instance and can drop in-flight writes on shutdown (no drain). If ordering and durability are required, reintroduce per-instance sequencing (still non-blocking by chaining on the executor) and restore a close() strategy that waits for (or explicitly cancels/flushes) outstanding operations deterministically.

Copilot uses AI. Check for mistakes.
Comment on lines +102 to +109
.thenCompose(
model ->
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.thenApply(__ -> model)));
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workflow execution is now on the critical path of listener completion (runnable.get() must complete before task processing starts, and the workflow future now waits for onWorkflowCompleted). With async persistence listeners, this can materially increase end-to-end latency and can cause workflows to hang indefinitely if a listener future never completes. If the goal of #1286 is to avoid persistence slowing down workflow execution, consider making event publishing fire-and-forget (e.g., triggering listener futures and observing/logging failures) rather than composing it into the main workflow chain, or apply backpressure/timeouts explicitly.

Suggested change
.thenCompose(
model ->
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.thenApply(__ -> model)));
.thenApply(
model -> {
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.exceptionally(
ex -> {
// TODO: handle workflow-completed listener failure (log, metrics, etc.)
return null;
});
return model;
});

Copilot uses AI. Check for mistakes.
Comment on lines +216 to +230
.thenCompose(
t -> {
CompletableFuture<?> events =
t.isRetrying()
? publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)))
: publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
return events.thenApply(v -> t);
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task execution now waits for listener futures for onTaskStarted/onTaskRetried before validating/processing inputs. Combined with LifecycleEventsUtils.publishEvent(...) propagating failures, a slow/failing listener can delay or fail task execution. If listeners are intended to be observational (especially for persistence), consider not gating task execution on these futures, or ensure publishEvent(...) is best-effort and cannot hang indefinitely (timeouts/cancellation).

Suggested change
.thenCompose(
t -> {
CompletableFuture<?> events =
t.isRetrying()
? publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)))
: publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
return events.thenApply(v -> t);
.thenApply(
t -> {
if (t.isRetrying()) {
publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)));
} else {
publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
}
return t;

Copilot uses AI. Check for mistakes.
import java.util.concurrent.ConcurrentHashMap;

class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable {
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowExecutionCompletableListener already extends AutoCloseable, so listing AutoCloseable here is redundant. Dropping the extra interface improves clarity without changing behavior.

Suggested change
class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable {
class SchedulerListener implements WorkflowExecutionCompletableListener {

Copilot uses AI. Check for mistakes.
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
return CompletableFuture.allOf(
workflowContext.definition().application().listeners().stream()
.map(v -> function.apply(v))
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new async event publishing contract introduces important behavior that should be covered by tests (especially since it now returns a CompletableFuture). Add unit tests that verify: (1) one listener throwing synchronously doesn’t prevent others from running, (2) one listener completing exceptionally doesn’t fail the overall publish (if best-effort is intended), and (3) listener completion does/does not gate workflow/task progression depending on the intended semantics of #1286.

Suggested change
.map(v -> function.apply(v))
.map(
listener -> {
try {
CompletableFuture<?> future = function.apply(listener);
if (future == null) {
// Treat null as an already-completed listener to avoid NPEs
return CompletableFuture.completedFuture(null);
}
// Swallow exceptional completion to preserve best-effort semantics
return future.exceptionally(ex -> null);
} catch (Throwable t) {
// Ensure one listener throwing synchronously doesn't prevent others
return CompletableFuture.completedFuture(null);
}
})

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow integration of Listeners execution into the Workflow execution

2 participants