fix: prevent dropped SSE events under back-to-back emission#906
fix: prevent dropped SSE events under back-to-back emission#906ehsavoie wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request changes the SSE backpressure strategy across multiple A2AServerRoutes files by requesting all events upfront (request(Long.MAX_VALUE)) instead of one by one, and introduces a headersSet flag to track when headers are written. It also reduces the buffer flush delay in EventConsumer. The reviewer identified a critical Vert.x threading model violation across all modified route files: processing reactive stream events on the background executor thread instead of the Vert.x event loop thread is unsafe because HttpServerResponse and RoutingContext are not thread-safe. To resolve this concurrency issue, the reviewer recommends wrapping Vert.x interactions in rc.vertx().runOnContext(...).
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
c4e804f to
daa12e3
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors Server-Sent Events (SSE) handling across several server route classes to ensure thread safety and prevent dropped events. It schedules response writes, errors, and completion handlers on the Vert.x event loop thread using runOnContext, and switches from single-request backpressure to requesting all events upfront (Long.MAX_VALUE). Additionally, the buffer flush delay in EventConsumer is reduced. The review feedback recommends completely removing the remaining sleep delay (BUFFER_FLUSH_DELAY_MS) in EventConsumer, as blocking threads is an anti-pattern in reactive applications and is redundant given Vert.x's event loop guarantees.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
daa12e3 to
d3d7ed0
Compare
9562f66 to
52bc280
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the Server-Sent Events (SSE) streaming implementation by consolidating the duplicate MultiSseSupport inner classes into a single shared utility class, SseResponseWriter. It also removes a temporary thread sleep workaround in EventConsumer and resolves a concurrency race condition in MainEventBusProcessorExceptionTest by synchronizing access to the log appender. The review feedback suggests marking the upstream subscription field in SseResponseWriter as volatile to guarantee cross-thread visibility, and combining the initial stream kickstart write with the first event write to reduce socket overhead and prevent duplicate failure handling.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
52bc280 to
b87e718
Compare
Extract the duplicated MultiSseSupport inner class from four route files (jsonrpc, rest, compat-0.3/jsonrpc, compat-0.3/rest) into a single shared SseResponseWriter utility in reference-common. The shared implementation fixes three latent bugs: - Guard response.end() with !response.ended() to prevent IllegalStateException when the client disconnects concurrently with stream completion - Add error handler to the kickstart SSE comment write, consistent with event writes - Switch from request(1)/backpressure to request(Long.MAX_VALUE) so events queued before tube.complete() fires are never dropped; document the buffer trade-off in the Javadoc Also dispatches all HttpServerResponse operations via runOnContext() to enforce Vert.x thread-safety, and removes the 150ms sleep workaround from EventConsumer now that ordering is guaranteed by the event loop. Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
b87e718 to
771a6d7
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request consolidates Server-Sent Events (SSE) streaming logic by extracting duplicate MultiSseSupport inner classes into a new shared utility, SseResponseWriter. It also updates documentation in EventConsumer and resolves a test race condition in MainEventBusProcessorExceptionTest by synchronizing log access. The review feedback points out that SseResponseWriter still uses the old request(1) backpressure model and bytesWritten check instead of the intended request(Long.MAX_VALUE) and headersSet flag, and that BUFFER_FLUSH_DELAY_MS in EventConsumer should be reduced to 50 ms.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() { | ||
| volatile Flow.@Nullable Subscription upstream; | ||
|
|
||
| @Override | ||
| public void onSubscribe(Flow.Subscription subscription) { | ||
| this.upstream = subscription; | ||
| this.upstream.request(1); | ||
|
|
||
| // Detect client disconnect and call EventConsumer.cancel() directly | ||
| response.closeHandler(v -> { | ||
| logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); | ||
| context.invokeEventConsumerCancelCallback(); | ||
| subscription.cancel(); | ||
| }); | ||
|
|
||
| if (onSubscribedHook != null) { | ||
| onSubscribedHook.run(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onNext(String sseEvent) { | ||
| if (response.bytesWritten() == 0) { | ||
| MultiMap headers = response.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| // Additional SSE headers to prevent buffering | ||
| headers.set("Cache-Control", "no-cache"); | ||
| headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering | ||
| response.setChunked(true); | ||
|
|
||
| // CRITICAL: Disable write queue max size to prevent buffering | ||
| // Vert.x buffers writes by default - we need immediate flushing for SSE | ||
| response.setWriteQueueMaxSize(1); | ||
|
|
||
| // Send initial SSE comment to kickstart the stream | ||
| response.write(": SSE stream started\n\n"); | ||
| } | ||
|
|
||
| response.write(Buffer.buffer(sseEvent), ar -> { | ||
| if (ar.failed()) { | ||
| // Client disconnected or write failed - cancel upstream to stop EventConsumer | ||
| // NullAway: upstream is guaranteed non-null after onSubscribe | ||
| Objects.requireNonNull(upstream).cancel(); | ||
| if (!rc.failed()) { | ||
| rc.fail(ar.cause()); | ||
| } | ||
| } else { | ||
| Objects.requireNonNull(upstream).request(1); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable throwable) { | ||
| // Cancel upstream to stop EventConsumer when error occurs | ||
| // NullAway: upstream is guaranteed non-null after onSubscribe | ||
| Objects.requireNonNull(upstream).cancel(); | ||
| if (!rc.failed()) { | ||
| rc.fail(throwable); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (response.bytesWritten() == 0) { | ||
| // No events written - still set SSE content type | ||
| MultiMap headers = response.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| } | ||
| // Guard against duplicate end() if the client disconnected concurrently | ||
| if (!response.ended()) { | ||
| response.end(); | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
The implementation in SseResponseWriter still uses the old request(1) backpressure model and the response.bytesWritten() == 0 check. This contradicts the PR description, which states that the code should switch to request(Long.MAX_VALUE) and use a headersSet boolean flag to prevent dropped SSE events under back-to-back emission. Please update the subscriber to implement these changes.
sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
volatile Flow.@Nullable Subscription upstream;
private boolean headersSet = false;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstream = subscription;
this.upstream.request(Long.MAX_VALUE);
// Detect client disconnect and call EventConsumer.cancel() directly
response.closeHandler(v -> {
logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
context.invokeEventConsumerCancelCallback();
subscription.cancel();
});
if (onSubscribedHook != null) {
onSubscribedHook.run();
}
}
@Override
public void onNext(String sseEvent) {
if (!headersSet) {
headersSet = true;
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
// Additional SSE headers to prevent buffering
headers.set("Cache-Control", "no-cache");
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
response.setChunked(true);
// CRITICAL: Disable write queue max size to prevent buffering
// Vert.x buffers writes by default - we need immediate flushing for SSE
response.setWriteQueueMaxSize(1);
// Send initial SSE comment to kickstart the stream
response.write(": SSE stream started\n\n");
}
response.write(Buffer.buffer(sseEvent), ar -> {
if (ar.failed()) {
// Client disconnected or write failed - cancel upstream to stop EventConsumer
// NullAway: upstream is guaranteed non-null after onSubscribe
Objects.requireNonNull(upstream).cancel();
if (!rc.failed()) {
rc.fail(ar.cause());
}
}
});
}
@Override
public void onError(Throwable throwable) {
// Cancel upstream to stop EventConsumer when error occurs
// NullAway: upstream is guaranteed non-null after onSubscribe
Objects.requireNonNull(upstream).cancel();
if (!rc.failed()) {
rc.fail(throwable);
}
}
@Override
public void onComplete() {
if (!headersSet) {
headersSet = true;
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
// Guard against duplicate end() if the client disconnected concurrently
if (!response.ended()) {
response.end();
}
}
});| // the write callback, causing onComplete to fire before the HTTP response.write() | ||
| // callback confirms the data was sent. This sleep ensures the write callback fires | ||
| // first, so response.end() is only called after the data is safely in flight. | ||
| private static final int BUFFER_FLUSH_DELAY_MS = 150; |
There was a problem hiding this comment.
The BUFFER_FLUSH_DELAY_MS constant is still set to 150 ms. According to the PR description, this should be reduced to 50 ms since it is now a secondary safety margin rather than the primary guard.
| private static final int BUFFER_FLUSH_DELAY_MS = 150; | |
| private static final int BUFFER_FLUSH_DELAY_MS = 50; |
With request(1) backpressure, the write-completion handler was responsible for requesting the next event. When an agent emitted two events back-to-back (e.g. artifact + completion), tube.complete() could fire while the completion event was still buffered with demand=0, causing mutiny-zero to drop it and close the stream with only one event delivered.
Switch to request(Long.MAX_VALUE) so the drain loop delivers all buffered events synchronously before tube.complete() is called. Replace the unreliable bytesWritten() == 0 header guard with a boolean headersSet flag, since multiple onNext() calls now arrive before any async write completes. Reduce BUFFER_FLUSH_DELAY_MS from 150 ms to 50 ms as it is now a secondary safety margin rather than the primary guard.