diff --git a/frontends/stapp.py b/frontends/stapp.py index 83a6d33c..7833b799 100644 --- a/frontends/stapp.py +++ b/frontends/stapp.py @@ -157,6 +157,16 @@ def render_segments(segments, suffix=''): # restores a multi-turn body). st.markdown(_SUMMARY_TAG_RE.sub('', seg['content']) + suffix) +def _now_ts(): + return time.strftime("%Y-%m-%d %H:%M:%S") + +def _chat_msg(role, content, ts=None): + return {"role": role, "content": content, "time": ts or _now_ts()} + +def _render_msg_time(msg): + ts = msg.get("time") + if ts: st.caption(ts) + def agent_backend_stream(prompt=None): """Drain main task display_queue. - prompt given: start a fresh task; new dq is kept in session_state. @@ -199,7 +209,10 @@ def agent_backend_stream(prompt=None): def render_main_stream(prompt=None): """Render the assistant bubble for the main task (new or resumed). Saves final to messages.""" + reply_ts = st.session_state.get('reply_ts') or _now_ts() + st.session_state.reply_ts = reply_ts with st.chat_message("assistant"): + st.caption(reply_ts) frozen = 0; live = st.empty(); response = '' CURSOR = ' ▌' for response in agent_backend_stream(prompt): @@ -214,8 +227,9 @@ def render_main_stream(prompt=None): with live.container(): render_segments([segs[i]]) if i < len(segs) - 1: live = st.empty() if response: - st.session_state.messages.append({"role": "assistant", "content": response}) + st.session_state.messages.append(_chat_msg("assistant", response, reply_ts)) st.session_state.last_reply_time = int(time.time()) + st.session_state.reply_ts = "" if "messages" not in st.session_state: st.session_state.messages = [] for msg in st.session_state.messages: @@ -223,6 +237,7 @@ def render_main_stream(prompt=None): # 用 slot=st.empty() + with slot.container(): ... 的外壳,DOM 路径和流式渲染完全一致,跨 rerun 对齐 slot = st.empty() with slot.container(): + _render_msg_time(msg) if msg["role"] == "assistant": render_segments(fold_turns(msg["content"])) else: st.markdown(msg["content"]) @@ -255,10 +270,22 @@ def render_main_stream(prompt=None): "e.key==='Enter'&&!e.shiftKey&&(e.isComposing||c||e.keyCode===229)&&" "(e.stopImmediatePropagation(),e.preventDefault())},!0))})}" "f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()") -_embed_html(f'', height=0) +_js_homeend_fix = ( + "!function(){if(window.parent.__homeEndFix)return;window.parent.__homeEndFix=1;" + "var d=window.parent.document;" + "function f(){d.querySelectorAll('textarea[data-testid=stChatInputTextArea]')" + ".forEach(t=>{if(t.__homeEndFix)return;t.__homeEndFix=1;" + "t.addEventListener('keydown',e=>{" + "if(e.key==='Home'||e.key==='End'){" + "e.preventDefault();e.stopPropagation();" + "t.selectionStart=t.selectionEnd=(e.key==='Home')?0:t.value.length;" + "}},!0)})}" + "f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()" +) +_embed_html(f'', height=0) if prompt := st.chat_input("any task?"): - ts = time.strftime("%Y-%m-%d %H:%M:%S") + ts = _now_ts() cmd = (prompt or "").strip() def _reset_and_rerun(): st.session_state.streaming = False @@ -270,7 +297,7 @@ def _reset_and_rerun(): st.session_state.last_reply_time = int(time.time()) st.rerun() if cmd == "/new": - st.session_state.messages = [{"role": "assistant", "content": reset_conversation(agent), "time": ts}] + st.session_state.messages = [_chat_msg("assistant", reset_conversation(agent), ts)] _reset_and_rerun() if cmd.startswith("/continue"): m = re.match(r'/continue\s+(\d+)\s*$', cmd.strip()) @@ -280,18 +307,18 @@ def _reset_and_rerun(): target = sessions[idx][0] if 0 <= idx < len(sessions) else None result = handle_frontend_command(agent, cmd) history = extract_ui_messages(target) if target and result.startswith('✅') else None - tail = [{"role": "assistant", "content": result, "time": ts}] + tail = [_chat_msg("assistant", result, ts)] if history: st.session_state.messages = history + tail else: st.session_state.messages = list(st.session_state.messages) + \ - [{"role": "user", "content": cmd, "time": ts}] + tail + [_chat_msg("user", cmd, ts)] + tail _reset_and_rerun() if cmd.startswith("/btw"): answer = btw_handle_frontend(agent, cmd) # sync; bypasses put_task → main agent.run() untouched st.session_state.messages = list(st.session_state.messages) + [ - {"role": "user", "content": prompt, "time": ts}, - {"role": "assistant", "content": answer, "time": ts}, + _chat_msg("user", prompt, ts), + _chat_msg("assistant", answer, ts), ] st.rerun() # preserve display_queue/partial_response so resume path drains the running main task if cmd.startswith("/export"): @@ -322,15 +349,17 @@ def _reset_and_rerun(): except Exception as e: result = f"❌ 导出失败: {e}" st.session_state.messages = list(st.session_state.messages) + [ - {"role": "user", "content": cmd, "time": ts}, - {"role": "assistant", "content": result, "time": ts}, + _chat_msg("user", cmd, ts), + _chat_msg("assistant", result, ts), ] _reset_and_rerun() # Regular prompt: any in-flight task will be aborted by the finally block in # agent_backend_stream when StopException interrupts the prior generator. - st.session_state.messages.append({"role": "user", "content": prompt}) + st.session_state.messages.append(_chat_msg("user", prompt, ts)) if hasattr(agent, '_pet_req') and not prompt.startswith('/'): agent._pet_req('state=walk') - with st.chat_message("user"): st.markdown(prompt) + with st.chat_message("user"): + st.caption(ts) + st.markdown(prompt) render_main_stream(prompt) elif st.session_state.get('display_queue') is not None: # No new prompt but a task is mid-flight (typically a /btw rerun) — resume drain. diff --git a/frontends/tuiapp_v2.py b/frontends/tuiapp_v2.py index 935728e6..43e78a5b 100644 --- a/frontends/tuiapp_v2.py +++ b/frontends/tuiapp_v2.py @@ -229,14 +229,9 @@ class InputArea(TextArea): Binding("alt+enter", "newline", "Newline", show=False), Binding("ctrl+enter", "newline", "Newline", show=False), Binding("shift+enter", "newline", "Newline", show=False), - # 拆掉父类 ctrl+v:父类会走 action_paste 从 app.clipboard 再插一次, - # 和终端 bracketed paste 触发的 _on_paste 双重插入 → 单行粘贴会重复 - Binding("ctrl+v", "noop", "Noop", show=False), + Binding("ctrl+v", "paste", "Paste", show=False), ] - def action_noop(self) -> None: - pass - class Submitted(Message): def __init__(self, input_area: "InputArea", value: str) -> None: super().__init__() @@ -247,6 +242,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._pastes: dict[int, str] = {} self._paste_counter = 0 + self._last_paste: tuple[str, str, float] = ("", "", 0.0) def expand_placeholders(self, text: str) -> str: def repl(m): @@ -259,15 +255,18 @@ def reset(self) -> None: self._pastes.clear() self._paste_counter = 0 - def action_newline(self) -> None: - result = self._replace_via_keyboard("\n", *self.selection) - if result: - self.move_cursor(result.end_location) + def _insert_paste_text(self, text: str, source: str) -> bool: + text = (text or "").replace("\r\n", "\n").replace("\r", "\n") + if not text: + return False + + normalized = text + last_source, last_text, last_at = self._last_paste + now = time.monotonic() + if normalized == last_text and source != last_source and now - last_at < 0.25: + self._last_paste = (source, normalized, now) + return True - async def _on_paste(self, event: events.Paste) -> None: - if self.read_only: - return - text = event.text.replace("\r\n", "\n").replace("\r", "\n") line_count = len(text.splitlines()) or 1 if line_count > 2: self._paste_counter += 1 @@ -278,7 +277,24 @@ async def _on_paste(self, event: events.Paste) -> None: if result: self.move_cursor(result.end_location) self.focus() - event.stop(); event.prevent_default() + self._last_paste = (source, normalized, now) + return result is not None + + def action_newline(self) -> None: + result = self._replace_via_keyboard("\n", *self.selection) + if result: + self.move_cursor(result.end_location) + + def action_paste(self) -> None: + if self.read_only: + return + self._insert_paste_text(self.app.clipboard, "key") + + async def _on_paste(self, event: events.Paste) -> None: + if self.read_only: + return + if self._insert_paste_text(event.text, "event"): + event.stop(); event.prevent_default() async def _on_key(self, event: events.Key) -> None: try: diff --git a/tests/test_tuiapp_v2_input.py b/tests/test_tuiapp_v2_input.py new file mode 100644 index 00000000..918563c0 --- /dev/null +++ b/tests/test_tuiapp_v2_input.py @@ -0,0 +1,106 @@ +import asyncio +import contextlib +import io +import importlib +import sys +import unittest +from pathlib import Path + +try: + from textual import events + from textual.app import App, ComposeResult +except ModuleNotFoundError: # pragma: no cover - optional UI dependency + events = None + App = object + ComposeResult = object + + +@unittest.skipIf(events is None, "textual is not installed") +class TuiAppV2InputPasteTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + repo_root = Path(__file__).resolve().parents[1] + frontends = repo_root / "frontends" + for path in (str(repo_root), str(frontends)): + if path not in sys.path: + sys.path.insert(0, path) + + saved_modules = {} + for name in ("agentmain", "chatapp_common", "continue_cmd", "llmcore"): + saved_modules[name] = sys.modules.pop(name, None) + try: + with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): + module = importlib.import_module("frontends.tuiapp_v2") + finally: + for name, module_stub in saved_modules.items(): + if module_stub is not None: + sys.modules[name] = module_stub + input_cls = module.InputArea + + class InputHarness(App[None]): + def compose(self) -> ComposeResult: + yield input_cls(id="input") + + def on_mount(self) -> None: + self.query_one(input_cls).focus() + + cls.InputArea = input_cls + cls.InputHarness = InputHarness + + def run_async(self, coro): + return asyncio.run(coro) + + def test_ctrl_v_pastes_from_textual_clipboard(self): + async def run(): + async with self.InputHarness().run_test() as pilot: + input_area = pilot.app.query_one(self.InputArea) + pilot.app.copy_to_clipboard("hello from clipboard") + + await pilot.press("ctrl+v") + + self.assertEqual(input_area.text, "hello from clipboard") + + self.run_async(run()) + + def test_bracketed_paste_normalizes_windows_newlines(self): + async def run(): + async with self.InputHarness().run_test() as pilot: + input_area = pilot.app.query_one(self.InputArea) + + pilot.app.post_message(events.Paste("first\r\nsecond")) + await pilot.pause() + + self.assertEqual(input_area.text, "first\nsecond") + + self.run_async(run()) + + def test_long_multiline_paste_uses_placeholder_but_submits_full_text(self): + async def run(): + async with self.InputHarness().run_test() as pilot: + input_area = pilot.app.query_one(self.InputArea) + + pilot.app.post_message(events.Paste("one\ntwo\nthree")) + await pilot.pause() + + self.assertEqual(input_area.text, "[Pasted text #1 +3 lines]") + self.assertEqual(input_area.expand_placeholders(input_area.text), "one\ntwo\nthree") + + self.run_async(run()) + + def test_ctrl_v_and_bracketed_paste_do_not_duplicate_same_payload(self): + async def run(): + async with self.InputHarness().run_test() as pilot: + input_area = pilot.app.query_one(self.InputArea) + pilot.app.copy_to_clipboard("same") + + await pilot.press("ctrl+v") + pilot.app.post_message(events.Paste("same")) + await pilot.pause() + + self.assertEqual(input_area.text, "same") + + self.run_async(run()) + + +if __name__ == "__main__": + unittest.main()