Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import io.serverlessworkflow.impl.scheduler.Cancellable;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable {
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkflowExecutionCompletableListener already extends AutoCloseable, so listing AutoCloseable here is redundant. Dropping the extra interface improves clarity without changing behavior.

Suggested change
class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable {
class SchedulerListener implements WorkflowExecutionCompletableListener {

Copilot uses AI. Check for mistakes.

private final WorkflowScheduler scheduler;
private final Map<WorkflowDefinition, WorkflowValueResolver<Duration>> afterMap =
Expand All @@ -39,7 +40,7 @@ public void addAfter(WorkflowDefinition definition, WorkflowValueResolver<Durati
}

@Override
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
WorkflowValueResolver<Duration> after = afterMap.get(workflowDefinition);
if (after != null) {
Expand All @@ -49,6 +50,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
workflowDefinition,
after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
}
return CompletableFuture.completedFuture(null);
}

public void removeAfter(WorkflowDefinition definition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
Expand Down Expand Up @@ -67,7 +69,7 @@ public class WorkflowApplication implements AutoCloseable {
private final ResourceLoaderFactory resourceLoaderFactory;
private final SchemaValidatorFactory schemaValidatorFactory;
private final WorkflowInstanceIdFactory idFactory;
private final Collection<WorkflowExecutionListener> listeners;
private final Collection<WorkflowExecutionCompletableListener> listeners;
private final Map<WorkflowDefinitionId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;
private final ExecutorServiceFactory executorFactory;
Expand Down Expand Up @@ -137,7 +139,7 @@ public ResourceLoaderFactory resourceLoaderFactory() {
return resourceLoaderFactory;
}

public Collection<WorkflowExecutionListener> listeners() {
public Collection<WorkflowExecutionCompletableListener> listeners() {
return listeners;
}

Expand Down Expand Up @@ -175,8 +177,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
private String id;
private TaskExecutorFactory taskFactory;
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
private List<WorkflowExecutionListener> listeners =
loadFromServiceLoader(WorkflowExecutionListener.class);
private List<WorkflowExecutionCompletableListener> listeners =
ServiceLoader.load(WorkflowExecutionListener.class).stream()
.map(v -> new WorkflowExecutionListenerAdapter(v.get()))
.collect(Collectors.toList());
private List<CallableTaskProxyBuilder> callableProxyBuilders =
loadFromServiceLoader(CallableTaskProxyBuilder.class);
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
Expand Down Expand Up @@ -212,6 +216,11 @@ public Builder withId(String id) {
}

public Builder withListener(WorkflowExecutionListener listener) {
listeners.add(new WorkflowExecutionListenerAdapter(listener));
return this;
}

public Builder withListener(WorkflowExecutionCompletableListener listener) {
listeners.add(listener);
return this;
}
Expand Down Expand Up @@ -414,7 +423,7 @@ public void close() {
}
definitions.clear();

for (WorkflowExecutionListener listener : listeners) {
for (WorkflowExecutionCompletableListener listener : listeners) {
safeClose(listener);
}
listeners.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,43 @@ public CompletableFuture<WorkflowModel> start() {
return startExecution(
() -> {
startedAt = Instant.now();
publishEvent(
return publishEvent(
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
});
}

protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
protected final CompletableFuture<WorkflowModel> startExecution(
Supplier<CompletableFuture<?>> runnable) {
CompletableFuture<WorkflowModel> future = futureRef.get();
if (future != null) {
return future;
}
status(WorkflowStatus.RUNNING);
runnable.run();

future =
TaskExecutorHelper.processTaskList(
workflowContext.definition().startTask(),
workflowContext,
Optional.empty(),
workflowContext
.definition()
.inputFilter()
.map(f -> f.apply(workflowContext, null, input))
.orElse(input))
.whenComplete(this::whenCompleted)
.thenApply(this::whenSuccess);
runnable
.get()
.thenCompose(
v ->
TaskExecutorHelper.processTaskList(
workflowContext.definition().startTask(),
workflowContext,
Optional.empty(),
workflowContext
.definition()
.inputFilter()
.map(f -> f.apply(workflowContext, null, input))
.orElse(input))
.whenComplete(this::whenCompleted)
.thenApply(this::whenSuccess)
.thenCompose(
model ->
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.thenApply(__ -> model)));
Comment on lines +102 to +109
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workflow execution is now on the critical path of listener completion (runnable.get() must complete before task processing starts, and the workflow future now waits for onWorkflowCompleted). With async persistence listeners, this can materially increase end-to-end latency and can cause workflows to hang indefinitely if a listener future never completes. If the goal of #1286 is to avoid persistence slowing down workflow execution, consider making event publishing fire-and-forget (e.g., triggering listener futures and observing/logging failures) rather than composing it into the main workflow chain, or apply backpressure/timeouts explicitly.

Suggested change
.thenCompose(
model ->
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.thenApply(__ -> model)));
.thenApply(
model -> {
publishEvent(
workflowContext,
l ->
l.onWorkflowCompleted(
new WorkflowCompletedEvent(workflowContext, model)))
.exceptionally(
ex -> {
// TODO: handle workflow-completed listener failure (log, metrics, etc.)
return null;
});
return model;
});

Copilot uses AI. Check for mistakes.
futureRef.set(future);
return future;
}
Expand Down Expand Up @@ -126,9 +139,6 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
.orElse(node);
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status(WorkflowStatus.COMPLETED);
publishEvent(
workflowContext,
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
return output;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,24 @@ public CompletableFuture<TaskContext> apply(
completable =
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
.thenCompose(
t -> {
CompletableFuture<?> events =
t.isRetrying()
? publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)))
: publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
return events.thenApply(v -> t);
Comment on lines +216 to +230
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task execution now waits for listener futures for onTaskStarted/onTaskRetried before validating/processing inputs. Combined with LifecycleEventsUtils.publishEvent(...) propagating failures, a slow/failing listener can delay or fail task execution. If listeners are intended to be observational (especially for persistence), consider not gating task execution on these futures, or ensure publishEvent(...) is best-effort and cannot hang indefinitely (timeouts/cancellation).

Suggested change
.thenCompose(
t -> {
CompletableFuture<?> events =
t.isRetrying()
? publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)))
: publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
return events.thenApply(v -> t);
.thenApply(
t -> {
if (t.isRetrying()) {
publishEvent(
workflowContext,
l ->
l.onTaskRetried(
new TaskRetriedEvent(workflowContext, taskContext)));
} else {
publishEvent(
workflowContext,
l ->
l.onTaskStarted(
new TaskStartedEvent(workflowContext, taskContext)));
}
return t;

Copilot uses AI. Check for mistakes.
})
.thenApply(
t -> {
if (t.isRetrying()) {
publishEvent(
workflowContext,
l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext)));
} else {
publishEvent(
workflowContext,
l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext)));
}
inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput()));
inputProcessor.ifPresent(
p -> taskContext.input(p.apply(workflowContext, t, t.rawInput())));
Expand Down Expand Up @@ -251,13 +258,16 @@ public CompletableFuture<TaskContext> apply(
p.apply(workflowContext, t, workflowContext.context())));
contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context()));
t.completedAt(Instant.now());
publishEvent(
workflowContext,
l ->
l.onTaskCompleted(
new TaskCompletedEvent(workflowContext, taskContext)));
return t;
});
})
.thenCompose(
t ->
publishEvent(
workflowContext,
l ->
l.onTaskCompleted(
new TaskCompletedEvent(workflowContext, taskContext)))
.thenApply(__ -> t));
if (timeout.isPresent()) {
completable =
completable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,19 @@
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.WorkflowContext;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class LifecycleEventsUtils {

private LifecycleEventsUtils() {}

private static final Logger logger = LoggerFactory.getLogger(LifecycleEventsUtils.class);

public static <T extends TaskEvent> void publishEvent(
WorkflowContext workflowContext, Consumer<WorkflowExecutionListener> consumer) {
workflowContext
.definition()
.application()
.listeners()
.forEach(
v -> {
try {
consumer.accept(v);
} catch (Exception ex) {
logger.error("Error processing listener. Ignoring and going on", ex);
}
});
public static CompletableFuture<?> publishEvent(
WorkflowContext workflowContext,
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
return CompletableFuture.allOf(
workflowContext.definition().application().listeners().stream()
.map(v -> function.apply(v))
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new async event publishing contract introduces important behavior that should be covered by tests (especially since it now returns a CompletableFuture). Add unit tests that verify: (1) one listener throwing synchronously doesn’t prevent others from running, (2) one listener completing exceptionally doesn’t fail the overall publish (if best-effort is intended), and (3) listener completion does/does not gate workflow/task progression depending on the intended semantics of #1286.

Suggested change
.map(v -> function.apply(v))
.map(
listener -> {
try {
CompletableFuture<?> future = function.apply(listener);
if (future == null) {
// Treat null as an already-completed listener to avoid NPEs
return CompletableFuture.completedFuture(null);
}
// Swallow exceptional completion to preserve best-effort semantics
return future.exceptionally(ex -> null);
} catch (Throwable t) {
// Ensure one listener throwing synchronously doesn't prevent others
return CompletableFuture.completedFuture(null);
}
})

Copilot uses AI. Check for mistakes.
.toArray(CompletableFuture[]::new));
}
Comment on lines +26 to 33
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes listener failure semantics: previously exceptions from listeners were caught/logged and execution continued; now (1) a synchronous exception thrown by function.apply(v) will abort publishing immediately, and (2) any returned future completing exceptionally will cause allOf(...) to complete exceptionally. Since callers now thenCompose on publishEvent, a single bad listener can fail the workflow/task execution. Consider wrapping each invocation to convert synchronous throws into a completed future, and attaching exceptionally(...)/handle(...) per-listener (with logging) so publishing remains best-effort like before.

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.ServicePriority;
import java.util.concurrent.CompletableFuture;

public interface WorkflowExecutionCompletableListener extends AutoCloseable, ServicePriority {

default CompletableFuture<?> onWorkflowStarted(WorkflowStartedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
return CompletableFuture.completedFuture(null);
}

default CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
return CompletableFuture.completedFuture(null);
}

@Override
default void close() {}
}
Loading
Loading