Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions agentmain.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,35 @@ def put_task(self, query, source="user", images=None):
self.task_queue.put({"query": query, "source": source, "images": images or [], "output": display_queue})
return display_queue

def drain_queue(self, dq, poll=2):
"""Yield items from a display_queue until the 'done' sentinel arrives.

Replaces bare ``dq.get(timeout=N)`` calls whose fixed timeout breaks
whenever a task runs longer than N seconds. Instead of guessing how
long a task will take, we tie the wait to the agent's actual lifecycle:
keep polling while ``self.is_running`` is True, then do one final flush
to catch items that were enqueued just before ``is_running`` was cleared.
"""
while True:
try:
item = dq.get(timeout=poll)
yield item
if 'done' in item:
return
except queue.Empty:
if self.is_running:
continue # agent still running — keep waiting, no matter how long
# Agent finished; flush any items enqueued in the final instant
# before is_running was cleared (guards the put→flag race).
while True:
try:
item = dq.get_nowait()
yield item
if 'done' in item:
return
except queue.Empty:
return # queue truly empty and agent is done

# i know it is dangerous, but raw_query is dangerous enough it doesn't enlarge
def _handle_slash_cmd(self, raw_query, display_queue):
if not raw_query.startswith('/'): return raw_query
Expand Down Expand Up @@ -210,7 +239,7 @@ def run(self):
with open(infile, encoding='utf-8') as f: raw = f.read()
while True:
dq = agent.put_task(raw, source='task')
while 'done' not in (item := dq.get(timeout=300)):
for item in agent.drain_queue(dq):
if 'next' in item and random.random() < 0.95: # 概率写一次中间结果
with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item.get('next', ''))
with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item['done'] + '\n\n[ROUND END]\n')
Expand Down Expand Up @@ -244,8 +273,12 @@ def run(self):
print(f'[Reflect] triggered: {task[:80]}')
dq = agent.put_task(task, source='reflect')
try:
while 'done' not in (item := dq.get(timeout=180)): pass
result = item['done']
result = ''
for item in agent.drain_queue(dq):
if 'done' in item:
result = item['done']
elif 'next' in item:
result = item['next']
print(result)
except Exception as e:
if getattr(mod, 'ONCE', False): raise
Expand Down