diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index 4f34ee3..f1a20ea 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -151,6 +151,29 @@ def _send_redis_connection_metrics(self): except Exception as e: logger.error(f"Failed to send metrics: {e}", exc_info=e) + async def get_stream_tail_id(self, topic: str) -> str: + """ + Snapshot the current tail of a Redis stream as a concrete entry ID. + + The Redis "$" sentinel re-resolves to the stream tail on every XREAD + call, so any entry XADD'd in the gap between BLOCKing calls is + unreachable. Callers should resolve a stable cursor once on entry + via this helper and advance it forward from yielded entry IDs. + + Returns the entry ID of the most recent stream entry, or "0-0" if + the stream is empty or does not exist — in which case the next + XREAD will return the first XADD whenever it lands. + """ + try: + entries = await self.redis.xrevrange(name=topic, count=1) + except Exception as e: + logger.error(f"Error snapshotting tail of Redis stream {topic}: {e}") + raise + if not entries: + return "0-0" + tail_id, _fields = entries[0] + return tail_id.decode("utf-8") if isinstance(tail_id, bytes) else tail_id + async def read_messages( self, topic: str, last_id: str, timeout_ms: int = 2000, count: int = 10 ) -> AsyncIterator[tuple[str, dict[str, Any]]]: diff --git a/agentex/src/adapters/streams/port.py b/agentex/src/adapters/streams/port.py index 9c537b7..910c7a0 100644 --- a/agentex/src/adapters/streams/port.py +++ b/agentex/src/adapters/streams/port.py @@ -43,6 +43,23 @@ async def read_messages( """ raise NotImplementedError + @abstractmethod + async def get_stream_tail_id(self, topic: str) -> str: + """ + Resolve the current tail of a stream into a concrete entry ID + suitable for use as a stable cursor in subsequent read_messages calls. + + Unlike the Redis "$" sentinel — which is re-resolved to the tail on + every XREAD call and so loses entries XADD'd between calls — this + returns a fixed ID at the moment of the call. Callers advance it + forward as entries arrive. + + Returns a sentinel meaning "from the beginning" when the stream is + empty or does not yet exist, so the next read picks up the first + XADD whenever it lands. + """ + raise NotImplementedError + @abstractmethod async def cleanup_stream(self, topic: str) -> None: """ diff --git a/agentex/src/domain/use_cases/streams_use_case.py b/agentex/src/domain/use_cases/streams_use_case.py index 98c43b3..48eec2c 100644 --- a/agentex/src/domain/use_cases/streams_use_case.py +++ b/agentex/src/domain/use_cases/streams_use_case.py @@ -103,7 +103,12 @@ async def stream_task_events( stream_topic = get_task_event_stream_topic(task_id=task_id) # Send initial connection data yield f"data: {TaskStreamConnectedEventEntity(type='connected', taskId=task_id).model_dump_json()}\n\n" - last_id = "$" # Start with most recent messages only + # Snapshot the tail once on entry rather than passing "$" to every + # XREAD. "$" re-resolves to the current tail on each call, so any + # entry XADD'd in the gap between BLOCKing reads lands behind the + # new "$" and is unreachable — silently dropping deltas from + # fast-emitting agents. + last_id = await self.stream_repository.get_stream_tail_id(stream_topic) last_message_time = asyncio.get_running_loop().time() ping_interval = float( self.environment_variables.SSE_KEEPALIVE_PING_INTERVAL diff --git a/agentex/tests/integration/test_task_stream.py b/agentex/tests/integration/test_task_stream.py index 14c8580..289010e 100644 --- a/agentex/tests/integration/test_task_stream.py +++ b/agentex/tests/integration/test_task_stream.py @@ -462,6 +462,103 @@ async def collect_initial_event(): print("✅ Stream connected event includes correct task ID") + async def test_event_xadded_in_inter_cycle_gap_is_delivered( + self, test_agent_and_task, streams_use_case + ): + """ + Deterministic symptom-level regression for the SSE drop bug. + + With last_id="$" the consumer re-resolves its cursor to the + current stream tail on every XREAD call. Any entry XADD'd in + the ~100ms gap between an empty BLOCK return and the next + BLOCK call lands with an ID equal to the new "$" — XREAD waits + for entries strictly greater, so the entry is unreachable from + this consumer forever. + + Reproduction strategy (no race-window timing): + - Patch repo.read_messages so it signals an asyncio.Event the + instant the first BLOCK returns empty. asyncio scheduling + guarantees the consumer is then about to enter its 100ms + inter-cycle asyncio.sleep before yielding control back here. + - XADD a uniquely-tagged event synchronously on that signal. + asyncio yields control to the consumer's sleep, so the XADD + lands inside the gap. + - Wait for the second BLOCK cycle to elapse, then assert the + reader received the sentinel. + + Under the bug this test fails (the XADD is lost); under the fix + it passes (snapshotted cursor advances past our entry). + """ + from src.utils.stream_topics import get_task_event_stream_topic + + _agent, task = test_agent_and_task + stream_topic = get_task_event_stream_topic(task_id=task.id) + repo = streams_use_case.stream_repository + + first_empty_block_returned = asyncio.Event() + call_count = 0 + original_read_messages = repo.read_messages + + async def patched_read_messages(topic, last_id, timeout_ms=2000, count=10): + nonlocal call_count + call_count += 1 + my_idx = call_count + yielded = False + async for item in original_read_messages( + topic, last_id, timeout_ms=timeout_ms, count=count + ): + yielded = True + yield item + if my_idx == 1 and not yielded: + first_empty_block_returned.set() + + repo.read_messages = patched_read_messages + + sentinel = "inter-cycle-gap-sentinel" + received_sentinels: list[str] = [] + + async def reader(): + try: + async for event_data in streams_use_case.stream_task_events( + task_id=task.id + ): + if event_data.startswith("data: "): + payload_str = event_data[6:].strip() + if sentinel in payload_str: + received_sentinels.append(payload_str) + except asyncio.CancelledError: + pass + + reader_task = asyncio.create_task(reader()) + + try: + await asyncio.wait_for(first_empty_block_returned.wait(), timeout=5) + + # XADD synchronously inside the gap. Under the bug, the next + # xread re-resolves "$" to this entry's ID and waits for + # strictly greater entries — losing this one forever. + await repo.send_data( + stream_topic, + {"type": "error", "message": sentinel}, + ) + + # Allow the second BLOCK cycle to complete. + await asyncio.sleep(2.5) + + assert received_sentinels, ( + "Sentinel XADDed during the inter-cycle gap was not " + "delivered to the consumer. The stream cursor has " + "regressed to literal '$' — fast-emitting agents will " + "silently drop deltas." + ) + finally: + reader_task.cancel() + try: + await reader_task + except asyncio.CancelledError: + pass + repo.read_messages = original_read_messages + async def test_stream_sends_keepalive_pings_during_idle_periods( self, test_agent_and_task, streams_use_case ):