Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions agentex/src/adapters/streams/adapter_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,29 @@ def _send_redis_connection_metrics(self):
except Exception as e:
logger.error(f"Failed to send metrics: {e}", exc_info=e)

async def get_stream_tail_id(self, topic: str) -> str:
"""
Snapshot the current tail of a Redis stream as a concrete entry ID.

The Redis "$" sentinel re-resolves to the stream tail on every XREAD
call, so any entry XADD'd in the gap between BLOCKing calls is
unreachable. Callers should resolve a stable cursor once on entry
via this helper and advance it forward from yielded entry IDs.

Returns the entry ID of the most recent stream entry, or "0-0" if
the stream is empty or does not exist — in which case the next
XREAD will return the first XADD whenever it lands.
"""
try:
entries = await self.redis.xrevrange(name=topic, count=1)
except Exception as e:
logger.error(f"Error snapshotting tail of Redis stream {topic}: {e}")
raise
if not entries:
return "0-0"
tail_id, _fields = entries[0]
return tail_id.decode("utf-8") if isinstance(tail_id, bytes) else tail_id

async def read_messages(
self, topic: str, last_id: str, timeout_ms: int = 2000, count: int = 10
) -> AsyncIterator[tuple[str, dict[str, Any]]]:
Expand Down
17 changes: 17 additions & 0 deletions agentex/src/adapters/streams/port.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ async def read_messages(
"""
raise NotImplementedError

@abstractmethod
async def get_stream_tail_id(self, topic: str) -> str:
"""
Resolve the current tail of a stream into a concrete entry ID
suitable for use as a stable cursor in subsequent read_messages calls.

Unlike the Redis "$" sentinel — which is re-resolved to the tail on
every XREAD call and so loses entries XADD'd between calls — this
returns a fixed ID at the moment of the call. Callers advance it
forward as entries arrive.

Returns a sentinel meaning "from the beginning" when the stream is
empty or does not yet exist, so the next read picks up the first
XADD whenever it lands.
"""
raise NotImplementedError

@abstractmethod
async def cleanup_stream(self, topic: str) -> None:
"""
Expand Down
7 changes: 6 additions & 1 deletion agentex/src/domain/use_cases/streams_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ async def stream_task_events(
stream_topic = get_task_event_stream_topic(task_id=task_id)
# Send initial connection data
yield f"data: {TaskStreamConnectedEventEntity(type='connected', taskId=task_id).model_dump_json()}\n\n"
last_id = "$" # Start with most recent messages only
# Snapshot the tail once on entry rather than passing "$" to every
# XREAD. "$" re-resolves to the current tail on each call, so any
# entry XADD'd in the gap between BLOCKing reads lands behind the
# new "$" and is unreachable — silently dropping deltas from
# fast-emitting agents.
last_id = await self.stream_repository.get_stream_tail_id(stream_topic)
last_message_time = asyncio.get_running_loop().time()
ping_interval = float(
self.environment_variables.SSE_KEEPALIVE_PING_INTERVAL
Expand Down
97 changes: 97 additions & 0 deletions agentex/tests/integration/test_task_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,103 @@ async def collect_initial_event():

print("✅ Stream connected event includes correct task ID")

async def test_event_xadded_in_inter_cycle_gap_is_delivered(
self, test_agent_and_task, streams_use_case
):
"""
Deterministic symptom-level regression for the SSE drop bug.

With last_id="$" the consumer re-resolves its cursor to the
current stream tail on every XREAD call. Any entry XADD'd in
the ~100ms gap between an empty BLOCK return and the next
BLOCK call lands with an ID equal to the new "$" — XREAD waits
for entries strictly greater, so the entry is unreachable from
this consumer forever.

Reproduction strategy (no race-window timing):
- Patch repo.read_messages so it signals an asyncio.Event the
instant the first BLOCK returns empty. asyncio scheduling
guarantees the consumer is then about to enter its 100ms
inter-cycle asyncio.sleep before yielding control back here.
- XADD a uniquely-tagged event synchronously on that signal.
asyncio yields control to the consumer's sleep, so the XADD
lands inside the gap.
- Wait for the second BLOCK cycle to elapse, then assert the
reader received the sentinel.

Under the bug this test fails (the XADD is lost); under the fix
it passes (snapshotted cursor advances past our entry).
"""
from src.utils.stream_topics import get_task_event_stream_topic

_agent, task = test_agent_and_task
stream_topic = get_task_event_stream_topic(task_id=task.id)
repo = streams_use_case.stream_repository

first_empty_block_returned = asyncio.Event()
call_count = 0
original_read_messages = repo.read_messages

async def patched_read_messages(topic, last_id, timeout_ms=2000, count=10):
nonlocal call_count
call_count += 1
my_idx = call_count
yielded = False
async for item in original_read_messages(
topic, last_id, timeout_ms=timeout_ms, count=count
):
yielded = True
yield item
if my_idx == 1 and not yielded:
first_empty_block_returned.set()

repo.read_messages = patched_read_messages

sentinel = "inter-cycle-gap-sentinel"
received_sentinels: list[str] = []

async def reader():
try:
async for event_data in streams_use_case.stream_task_events(
task_id=task.id
):
if event_data.startswith("data: "):
payload_str = event_data[6:].strip()
if sentinel in payload_str:
received_sentinels.append(payload_str)
except asyncio.CancelledError:
pass

reader_task = asyncio.create_task(reader())

try:
await asyncio.wait_for(first_empty_block_returned.wait(), timeout=5)

# XADD synchronously inside the gap. Under the bug, the next
# xread re-resolves "$" to this entry's ID and waits for
# strictly greater entries — losing this one forever.
await repo.send_data(
stream_topic,
{"type": "error", "message": sentinel},
)

# Allow the second BLOCK cycle to complete.
await asyncio.sleep(2.5)

assert received_sentinels, (
"Sentinel XADDed during the inter-cycle gap was not "
"delivered to the consumer. The stream cursor has "
"regressed to literal '$' — fast-emitting agents will "
"silently drop deltas."
)
finally:
reader_task.cancel()
try:
await reader_task
except asyncio.CancelledError:
pass
repo.read_messages = original_read_messages

async def test_stream_sends_keepalive_pings_during_idle_periods(
self, test_agent_and_task, streams_use_case
):
Expand Down
Loading