diff --git a/agentmain.py b/agentmain.py index 6cd7aab8..48de55de 100644 --- a/agentmain.py +++ b/agentmain.py @@ -106,6 +106,35 @@ def put_task(self, query, source="user", images=None): self.task_queue.put({"query": query, "source": source, "images": images or [], "output": display_queue}) return display_queue + def drain_queue(self, dq, poll=2): + """Yield items from a display_queue until the 'done' sentinel arrives. + + Replaces bare ``dq.get(timeout=N)`` calls whose fixed timeout breaks + whenever a task runs longer than N seconds. Instead of guessing how + long a task will take, we tie the wait to the agent's actual lifecycle: + keep polling while ``self.is_running`` is True, then do one final flush + to catch items that were enqueued just before ``is_running`` was cleared. + """ + while True: + try: + item = dq.get(timeout=poll) + yield item + if 'done' in item: + return + except queue.Empty: + if self.is_running: + continue # agent still running — keep waiting, no matter how long + # Agent finished; flush any items enqueued in the final instant + # before is_running was cleared (guards the put→flag race). + while True: + try: + item = dq.get_nowait() + yield item + if 'done' in item: + return + except queue.Empty: + return # queue truly empty and agent is done + # i know it is dangerous, but raw_query is dangerous enough it doesn't enlarge def _handle_slash_cmd(self, raw_query, display_queue): if not raw_query.startswith('/'): return raw_query @@ -210,7 +239,7 @@ def run(self): with open(infile, encoding='utf-8') as f: raw = f.read() while True: dq = agent.put_task(raw, source='task') - while 'done' not in (item := dq.get(timeout=300)): + for item in agent.drain_queue(dq): if 'next' in item and random.random() < 0.95: # 概率写一次中间结果 with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item.get('next', '')) with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item['done'] + '\n\n[ROUND END]\n') @@ -244,8 +273,12 @@ def run(self): print(f'[Reflect] triggered: {task[:80]}') dq = agent.put_task(task, source='reflect') try: - while 'done' not in (item := dq.get(timeout=180)): pass - result = item['done'] + result = '' + for item in agent.drain_queue(dq): + if 'done' in item: + result = item['done'] + elif 'next' in item: + result = item['next'] print(result) except Exception as e: if getattr(mod, 'ONCE', False): raise