From 922722c736185040be78f6c9676bf693ed36d64d Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 15 May 2026 14:44:20 -0700 Subject: [PATCH 1/2] fix(streams): snapshot SSE cursor once on entry to stop drop race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stream_task_events initialized last_id="$" and only advanced it when XREAD returned entries. When BLOCK timed out empty, the outer loop re-issued XREAD with the literal "$" — Redis re-resolves "$" to the current stream tail at call time, so any XADD'd entry in the ~100ms gap between the empty return and the next call landed behind the new "$" and was unreachable from the consumer. For fast-emitting agents (token-level LLM streaming at multiple Hz), this silently dropped deltas. Slow agents were unaffected, which made the failure mode timing-dependent. Snapshot the tail once on entry via a new repository helper get_stream_tail_id (XREVRANGE topic + - COUNT 1, "0-0" fallback for empty/missing streams). The existing loop already advances last_id correctly from yielded entry IDs, so the snapshot only replaces the initial "$". Adds an integration regression test that spies on the repository and asserts (a) get_stream_tail_id is called once on entry and (b) no read_messages call ever receives "$" as last_id. Deterministic — doesn't depend on race-window timing. --- agentex/src/adapters/streams/adapter_redis.py | 23 ++++++ agentex/src/adapters/streams/port.py | 17 ++++ .../src/domain/use_cases/streams_use_case.py | 7 +- agentex/tests/integration/test_task_stream.py | 81 +++++++++++++++++++ 4 files changed, 127 insertions(+), 1 deletion(-) diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index 4f34ee32..f1a20ea8 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -151,6 +151,29 @@ def _send_redis_connection_metrics(self): except Exception as e: logger.error(f"Failed to send metrics: {e}", exc_info=e) + async def get_stream_tail_id(self, topic: str) -> str: + """ + Snapshot the current tail of a Redis stream as a concrete entry ID. + + The Redis "$" sentinel re-resolves to the stream tail on every XREAD + call, so any entry XADD'd in the gap between BLOCKing calls is + unreachable. Callers should resolve a stable cursor once on entry + via this helper and advance it forward from yielded entry IDs. + + Returns the entry ID of the most recent stream entry, or "0-0" if + the stream is empty or does not exist — in which case the next + XREAD will return the first XADD whenever it lands. + """ + try: + entries = await self.redis.xrevrange(name=topic, count=1) + except Exception as e: + logger.error(f"Error snapshotting tail of Redis stream {topic}: {e}") + raise + if not entries: + return "0-0" + tail_id, _fields = entries[0] + return tail_id.decode("utf-8") if isinstance(tail_id, bytes) else tail_id + async def read_messages( self, topic: str, last_id: str, timeout_ms: int = 2000, count: int = 10 ) -> AsyncIterator[tuple[str, dict[str, Any]]]: diff --git a/agentex/src/adapters/streams/port.py b/agentex/src/adapters/streams/port.py index 9c537b7f..910c7a0e 100644 --- a/agentex/src/adapters/streams/port.py +++ b/agentex/src/adapters/streams/port.py @@ -43,6 +43,23 @@ async def read_messages( """ raise NotImplementedError + @abstractmethod + async def get_stream_tail_id(self, topic: str) -> str: + """ + Resolve the current tail of a stream into a concrete entry ID + suitable for use as a stable cursor in subsequent read_messages calls. + + Unlike the Redis "$" sentinel — which is re-resolved to the tail on + every XREAD call and so loses entries XADD'd between calls — this + returns a fixed ID at the moment of the call. Callers advance it + forward as entries arrive. + + Returns a sentinel meaning "from the beginning" when the stream is + empty or does not yet exist, so the next read picks up the first + XADD whenever it lands. + """ + raise NotImplementedError + @abstractmethod async def cleanup_stream(self, topic: str) -> None: """ diff --git a/agentex/src/domain/use_cases/streams_use_case.py b/agentex/src/domain/use_cases/streams_use_case.py index 98c43b34..48eec2ce 100644 --- a/agentex/src/domain/use_cases/streams_use_case.py +++ b/agentex/src/domain/use_cases/streams_use_case.py @@ -103,7 +103,12 @@ async def stream_task_events( stream_topic = get_task_event_stream_topic(task_id=task_id) # Send initial connection data yield f"data: {TaskStreamConnectedEventEntity(type='connected', taskId=task_id).model_dump_json()}\n\n" - last_id = "$" # Start with most recent messages only + # Snapshot the tail once on entry rather than passing "$" to every + # XREAD. "$" re-resolves to the current tail on each call, so any + # entry XADD'd in the gap between BLOCKing reads lands behind the + # new "$" and is unreachable — silently dropping deltas from + # fast-emitting agents. + last_id = await self.stream_repository.get_stream_tail_id(stream_topic) last_message_time = asyncio.get_running_loop().time() ping_interval = float( self.environment_variables.SSE_KEEPALIVE_PING_INTERVAL diff --git a/agentex/tests/integration/test_task_stream.py b/agentex/tests/integration/test_task_stream.py index 14c85807..616949cc 100644 --- a/agentex/tests/integration/test_task_stream.py +++ b/agentex/tests/integration/test_task_stream.py @@ -462,6 +462,87 @@ async def collect_initial_event(): print("✅ Stream connected event includes correct task ID") + async def test_stream_task_events_uses_snapshotted_cursor_not_dollar( + self, test_agent_and_task, streams_use_case + ): + """ + Regression: stream_task_events must snapshot the Redis stream tail + on entry via get_stream_tail_id rather than passing "$" to xread. + + With "$", each BLOCK call re-resolves the cursor to the current + stream tail. Any entry XADD'd in the inter-call gap (the ~100ms + sleep between empty BLOCKs) lands behind the new "$" and is + unreachable from the consumer — silently dropping deltas from + fast-emitting agents. + + Spy on the repository to verify (a) get_stream_tail_id is called + once on entry, and (b) no call to read_messages ever receives "$" + as last_id. The behavioral assertion is deterministic — it + doesn't depend on race-window timing. + """ + _agent, task = test_agent_and_task + + repo = streams_use_case.stream_repository + + tail_topic_calls: list[str] = [] + original_get_tail = repo.get_stream_tail_id + + async def spy_get_stream_tail_id(topic): + tail_topic_calls.append(topic) + return await original_get_tail(topic) + + cursor_values: list[str] = [] + original_read_messages = repo.read_messages + + async def spy_read_messages(topic, last_id, timeout_ms=2000, count=10): + cursor_values.append(last_id) + async for item in original_read_messages( + topic, last_id, timeout_ms=timeout_ms, count=count + ): + yield item + + repo.get_stream_tail_id = spy_get_stream_tail_id + repo.read_messages = spy_read_messages + + try: + + async def drive(): + try: + async for _ in streams_use_case.stream_task_events(task_id=task.id): + pass + except asyncio.CancelledError: + pass + + drive_task = asyncio.create_task(drive()) + + # Wait past the first 2s BLOCK so a second read_messages call + # is issued — that's the call that would re-use "$" under the bug. + await asyncio.sleep(2.3) + + drive_task.cancel() + try: + await drive_task + except asyncio.CancelledError: + pass + + assert len(tail_topic_calls) == 1, ( + "Expected exactly one get_stream_tail_id call on entry; " + f"got {len(tail_topic_calls)}: {tail_topic_calls}" + ) + + assert len(cursor_values) >= 2, ( + "Expected >= 2 read_messages calls (one per BLOCK cycle); " + f"got {len(cursor_values)}: {cursor_values}" + ) + + assert "$" not in cursor_values, ( + "stream_task_events passed '$' to read_messages — race-prone! " + f"cursors observed: {cursor_values}" + ) + finally: + repo.get_stream_tail_id = original_get_tail + repo.read_messages = original_read_messages + async def test_stream_sends_keepalive_pings_during_idle_periods( self, test_agent_and_task, streams_use_case ): From 2900c04508ca30babb5bfed8869d1cb0ae0d4fb6 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 15 May 2026 14:53:45 -0700 Subject: [PATCH 2/2] test(streams): make SSE drop test deterministic via gap-signal patch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the spy-based contract assertion with a symptom-level test that deterministically reproduces the bug: Patch repo.read_messages to set an asyncio.Event the instant the first BLOCK returns empty. asyncio scheduling guarantees the consumer then enters its 100ms inter-cycle sleep before yielding control, so a synchronous XADD on the signal lands inside the gap window. Under the buggy "$" cursor, the next xread re-resolves $ to our entry's ID and waits for strictly greater entries — losing the entry forever. Confirmed the test fails against last_id="$" (AssertionError: assert []) and passes against the snapshotted cursor. --- agentex/tests/integration/test_task_stream.py | 124 ++++++++++-------- 1 file changed, 70 insertions(+), 54 deletions(-) diff --git a/agentex/tests/integration/test_task_stream.py b/agentex/tests/integration/test_task_stream.py index 616949cc..289010ee 100644 --- a/agentex/tests/integration/test_task_stream.py +++ b/agentex/tests/integration/test_task_stream.py @@ -462,85 +462,101 @@ async def collect_initial_event(): print("✅ Stream connected event includes correct task ID") - async def test_stream_task_events_uses_snapshotted_cursor_not_dollar( + async def test_event_xadded_in_inter_cycle_gap_is_delivered( self, test_agent_and_task, streams_use_case ): """ - Regression: stream_task_events must snapshot the Redis stream tail - on entry via get_stream_tail_id rather than passing "$" to xread. - - With "$", each BLOCK call re-resolves the cursor to the current - stream tail. Any entry XADD'd in the inter-call gap (the ~100ms - sleep between empty BLOCKs) lands behind the new "$" and is - unreachable from the consumer — silently dropping deltas from - fast-emitting agents. - - Spy on the repository to verify (a) get_stream_tail_id is called - once on entry, and (b) no call to read_messages ever receives "$" - as last_id. The behavioral assertion is deterministic — it - doesn't depend on race-window timing. + Deterministic symptom-level regression for the SSE drop bug. + + With last_id="$" the consumer re-resolves its cursor to the + current stream tail on every XREAD call. Any entry XADD'd in + the ~100ms gap between an empty BLOCK return and the next + BLOCK call lands with an ID equal to the new "$" — XREAD waits + for entries strictly greater, so the entry is unreachable from + this consumer forever. + + Reproduction strategy (no race-window timing): + - Patch repo.read_messages so it signals an asyncio.Event the + instant the first BLOCK returns empty. asyncio scheduling + guarantees the consumer is then about to enter its 100ms + inter-cycle asyncio.sleep before yielding control back here. + - XADD a uniquely-tagged event synchronously on that signal. + asyncio yields control to the consumer's sleep, so the XADD + lands inside the gap. + - Wait for the second BLOCK cycle to elapse, then assert the + reader received the sentinel. + + Under the bug this test fails (the XADD is lost); under the fix + it passes (snapshotted cursor advances past our entry). """ - _agent, task = test_agent_and_task + from src.utils.stream_topics import get_task_event_stream_topic + _agent, task = test_agent_and_task + stream_topic = get_task_event_stream_topic(task_id=task.id) repo = streams_use_case.stream_repository - tail_topic_calls: list[str] = [] - original_get_tail = repo.get_stream_tail_id - - async def spy_get_stream_tail_id(topic): - tail_topic_calls.append(topic) - return await original_get_tail(topic) - - cursor_values: list[str] = [] + first_empty_block_returned = asyncio.Event() + call_count = 0 original_read_messages = repo.read_messages - async def spy_read_messages(topic, last_id, timeout_ms=2000, count=10): - cursor_values.append(last_id) + async def patched_read_messages(topic, last_id, timeout_ms=2000, count=10): + nonlocal call_count + call_count += 1 + my_idx = call_count + yielded = False async for item in original_read_messages( topic, last_id, timeout_ms=timeout_ms, count=count ): + yielded = True yield item + if my_idx == 1 and not yielded: + first_empty_block_returned.set() - repo.get_stream_tail_id = spy_get_stream_tail_id - repo.read_messages = spy_read_messages + repo.read_messages = patched_read_messages - try: + sentinel = "inter-cycle-gap-sentinel" + received_sentinels: list[str] = [] - async def drive(): - try: - async for _ in streams_use_case.stream_task_events(task_id=task.id): - pass - except asyncio.CancelledError: - pass - - drive_task = asyncio.create_task(drive()) - - # Wait past the first 2s BLOCK so a second read_messages call - # is issued — that's the call that would re-use "$" under the bug. - await asyncio.sleep(2.3) - - drive_task.cancel() + async def reader(): try: - await drive_task + async for event_data in streams_use_case.stream_task_events( + task_id=task.id + ): + if event_data.startswith("data: "): + payload_str = event_data[6:].strip() + if sentinel in payload_str: + received_sentinels.append(payload_str) except asyncio.CancelledError: pass - assert len(tail_topic_calls) == 1, ( - "Expected exactly one get_stream_tail_id call on entry; " - f"got {len(tail_topic_calls)}: {tail_topic_calls}" - ) + reader_task = asyncio.create_task(reader()) - assert len(cursor_values) >= 2, ( - "Expected >= 2 read_messages calls (one per BLOCK cycle); " - f"got {len(cursor_values)}: {cursor_values}" + try: + await asyncio.wait_for(first_empty_block_returned.wait(), timeout=5) + + # XADD synchronously inside the gap. Under the bug, the next + # xread re-resolves "$" to this entry's ID and waits for + # strictly greater entries — losing this one forever. + await repo.send_data( + stream_topic, + {"type": "error", "message": sentinel}, ) - assert "$" not in cursor_values, ( - "stream_task_events passed '$' to read_messages — race-prone! " - f"cursors observed: {cursor_values}" + # Allow the second BLOCK cycle to complete. + await asyncio.sleep(2.5) + + assert received_sentinels, ( + "Sentinel XADDed during the inter-cycle gap was not " + "delivered to the consumer. The stream cursor has " + "regressed to literal '$' — fast-emitting agents will " + "silently drop deltas." ) finally: - repo.get_stream_tail_id = original_get_tail + reader_task.cancel() + try: + await reader_task + except asyncio.CancelledError: + pass repo.read_messages = original_read_messages async def test_stream_sends_keepalive_pings_during_idle_periods(