diff --git a/frontends/stapp.py b/frontends/stapp.py
index 83a6d33c..7833b799 100644
--- a/frontends/stapp.py
+++ b/frontends/stapp.py
@@ -157,6 +157,16 @@ def render_segments(segments, suffix=''):
# restores a multi-turn body).
st.markdown(_SUMMARY_TAG_RE.sub('', seg['content']) + suffix)
+def _now_ts():
+ return time.strftime("%Y-%m-%d %H:%M:%S")
+
+def _chat_msg(role, content, ts=None):
+ return {"role": role, "content": content, "time": ts or _now_ts()}
+
+def _render_msg_time(msg):
+ ts = msg.get("time")
+ if ts: st.caption(ts)
+
def agent_backend_stream(prompt=None):
"""Drain main task display_queue.
- prompt given: start a fresh task; new dq is kept in session_state.
@@ -199,7 +209,10 @@ def agent_backend_stream(prompt=None):
def render_main_stream(prompt=None):
"""Render the assistant bubble for the main task (new or resumed). Saves final to messages."""
+ reply_ts = st.session_state.get('reply_ts') or _now_ts()
+ st.session_state.reply_ts = reply_ts
with st.chat_message("assistant"):
+ st.caption(reply_ts)
frozen = 0; live = st.empty(); response = ''
CURSOR = ' ▌'
for response in agent_backend_stream(prompt):
@@ -214,8 +227,9 @@ def render_main_stream(prompt=None):
with live.container(): render_segments([segs[i]])
if i < len(segs) - 1: live = st.empty()
if response:
- st.session_state.messages.append({"role": "assistant", "content": response})
+ st.session_state.messages.append(_chat_msg("assistant", response, reply_ts))
st.session_state.last_reply_time = int(time.time())
+ st.session_state.reply_ts = ""
if "messages" not in st.session_state: st.session_state.messages = []
for msg in st.session_state.messages:
@@ -223,6 +237,7 @@ def render_main_stream(prompt=None):
# 用 slot=st.empty() + with slot.container(): ... 的外壳,DOM 路径和流式渲染完全一致,跨 rerun 对齐
slot = st.empty()
with slot.container():
+ _render_msg_time(msg)
if msg["role"] == "assistant": render_segments(fold_turns(msg["content"]))
else: st.markdown(msg["content"])
@@ -255,10 +270,22 @@ def render_main_stream(prompt=None):
"e.key==='Enter'&&!e.shiftKey&&(e.isComposing||c||e.keyCode===229)&&"
"(e.stopImmediatePropagation(),e.preventDefault())},!0))})}"
"f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()")
-_embed_html(f'', height=0)
+_js_homeend_fix = (
+ "!function(){if(window.parent.__homeEndFix)return;window.parent.__homeEndFix=1;"
+ "var d=window.parent.document;"
+ "function f(){d.querySelectorAll('textarea[data-testid=stChatInputTextArea]')"
+ ".forEach(t=>{if(t.__homeEndFix)return;t.__homeEndFix=1;"
+ "t.addEventListener('keydown',e=>{"
+ "if(e.key==='Home'||e.key==='End'){"
+ "e.preventDefault();e.stopPropagation();"
+ "t.selectionStart=t.selectionEnd=(e.key==='Home')?0:t.value.length;"
+ "}},!0)})}"
+ "f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()"
+)
+_embed_html(f'', height=0)
if prompt := st.chat_input("any task?"):
- ts = time.strftime("%Y-%m-%d %H:%M:%S")
+ ts = _now_ts()
cmd = (prompt or "").strip()
def _reset_and_rerun():
st.session_state.streaming = False
@@ -270,7 +297,7 @@ def _reset_and_rerun():
st.session_state.last_reply_time = int(time.time())
st.rerun()
if cmd == "/new":
- st.session_state.messages = [{"role": "assistant", "content": reset_conversation(agent), "time": ts}]
+ st.session_state.messages = [_chat_msg("assistant", reset_conversation(agent), ts)]
_reset_and_rerun()
if cmd.startswith("/continue"):
m = re.match(r'/continue\s+(\d+)\s*$', cmd.strip())
@@ -280,18 +307,18 @@ def _reset_and_rerun():
target = sessions[idx][0] if 0 <= idx < len(sessions) else None
result = handle_frontend_command(agent, cmd)
history = extract_ui_messages(target) if target and result.startswith('✅') else None
- tail = [{"role": "assistant", "content": result, "time": ts}]
+ tail = [_chat_msg("assistant", result, ts)]
if history:
st.session_state.messages = history + tail
else:
st.session_state.messages = list(st.session_state.messages) + \
- [{"role": "user", "content": cmd, "time": ts}] + tail
+ [_chat_msg("user", cmd, ts)] + tail
_reset_and_rerun()
if cmd.startswith("/btw"):
answer = btw_handle_frontend(agent, cmd) # sync; bypasses put_task → main agent.run() untouched
st.session_state.messages = list(st.session_state.messages) + [
- {"role": "user", "content": prompt, "time": ts},
- {"role": "assistant", "content": answer, "time": ts},
+ _chat_msg("user", prompt, ts),
+ _chat_msg("assistant", answer, ts),
]
st.rerun() # preserve display_queue/partial_response so resume path drains the running main task
if cmd.startswith("/export"):
@@ -322,15 +349,17 @@ def _reset_and_rerun():
except Exception as e:
result = f"❌ 导出失败: {e}"
st.session_state.messages = list(st.session_state.messages) + [
- {"role": "user", "content": cmd, "time": ts},
- {"role": "assistant", "content": result, "time": ts},
+ _chat_msg("user", cmd, ts),
+ _chat_msg("assistant", result, ts),
]
_reset_and_rerun()
# Regular prompt: any in-flight task will be aborted by the finally block in
# agent_backend_stream when StopException interrupts the prior generator.
- st.session_state.messages.append({"role": "user", "content": prompt})
+ st.session_state.messages.append(_chat_msg("user", prompt, ts))
if hasattr(agent, '_pet_req') and not prompt.startswith('/'): agent._pet_req('state=walk')
- with st.chat_message("user"): st.markdown(prompt)
+ with st.chat_message("user"):
+ st.caption(ts)
+ st.markdown(prompt)
render_main_stream(prompt)
elif st.session_state.get('display_queue') is not None:
# No new prompt but a task is mid-flight (typically a /btw rerun) — resume drain.
diff --git a/frontends/tuiapp_v2.py b/frontends/tuiapp_v2.py
index 935728e6..43e78a5b 100644
--- a/frontends/tuiapp_v2.py
+++ b/frontends/tuiapp_v2.py
@@ -229,14 +229,9 @@ class InputArea(TextArea):
Binding("alt+enter", "newline", "Newline", show=False),
Binding("ctrl+enter", "newline", "Newline", show=False),
Binding("shift+enter", "newline", "Newline", show=False),
- # 拆掉父类 ctrl+v:父类会走 action_paste 从 app.clipboard 再插一次,
- # 和终端 bracketed paste 触发的 _on_paste 双重插入 → 单行粘贴会重复
- Binding("ctrl+v", "noop", "Noop", show=False),
+ Binding("ctrl+v", "paste", "Paste", show=False),
]
- def action_noop(self) -> None:
- pass
-
class Submitted(Message):
def __init__(self, input_area: "InputArea", value: str) -> None:
super().__init__()
@@ -247,6 +242,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._pastes: dict[int, str] = {}
self._paste_counter = 0
+ self._last_paste: tuple[str, str, float] = ("", "", 0.0)
def expand_placeholders(self, text: str) -> str:
def repl(m):
@@ -259,15 +255,18 @@ def reset(self) -> None:
self._pastes.clear()
self._paste_counter = 0
- def action_newline(self) -> None:
- result = self._replace_via_keyboard("\n", *self.selection)
- if result:
- self.move_cursor(result.end_location)
+ def _insert_paste_text(self, text: str, source: str) -> bool:
+ text = (text or "").replace("\r\n", "\n").replace("\r", "\n")
+ if not text:
+ return False
+
+ normalized = text
+ last_source, last_text, last_at = self._last_paste
+ now = time.monotonic()
+ if normalized == last_text and source != last_source and now - last_at < 0.25:
+ self._last_paste = (source, normalized, now)
+ return True
- async def _on_paste(self, event: events.Paste) -> None:
- if self.read_only:
- return
- text = event.text.replace("\r\n", "\n").replace("\r", "\n")
line_count = len(text.splitlines()) or 1
if line_count > 2:
self._paste_counter += 1
@@ -278,7 +277,24 @@ async def _on_paste(self, event: events.Paste) -> None:
if result:
self.move_cursor(result.end_location)
self.focus()
- event.stop(); event.prevent_default()
+ self._last_paste = (source, normalized, now)
+ return result is not None
+
+ def action_newline(self) -> None:
+ result = self._replace_via_keyboard("\n", *self.selection)
+ if result:
+ self.move_cursor(result.end_location)
+
+ def action_paste(self) -> None:
+ if self.read_only:
+ return
+ self._insert_paste_text(self.app.clipboard, "key")
+
+ async def _on_paste(self, event: events.Paste) -> None:
+ if self.read_only:
+ return
+ if self._insert_paste_text(event.text, "event"):
+ event.stop(); event.prevent_default()
async def _on_key(self, event: events.Key) -> None:
try:
diff --git a/tests/test_tuiapp_v2_input.py b/tests/test_tuiapp_v2_input.py
new file mode 100644
index 00000000..918563c0
--- /dev/null
+++ b/tests/test_tuiapp_v2_input.py
@@ -0,0 +1,106 @@
+import asyncio
+import contextlib
+import io
+import importlib
+import sys
+import unittest
+from pathlib import Path
+
+try:
+ from textual import events
+ from textual.app import App, ComposeResult
+except ModuleNotFoundError: # pragma: no cover - optional UI dependency
+ events = None
+ App = object
+ ComposeResult = object
+
+
+@unittest.skipIf(events is None, "textual is not installed")
+class TuiAppV2InputPasteTests(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ repo_root = Path(__file__).resolve().parents[1]
+ frontends = repo_root / "frontends"
+ for path in (str(repo_root), str(frontends)):
+ if path not in sys.path:
+ sys.path.insert(0, path)
+
+ saved_modules = {}
+ for name in ("agentmain", "chatapp_common", "continue_cmd", "llmcore"):
+ saved_modules[name] = sys.modules.pop(name, None)
+ try:
+ with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
+ module = importlib.import_module("frontends.tuiapp_v2")
+ finally:
+ for name, module_stub in saved_modules.items():
+ if module_stub is not None:
+ sys.modules[name] = module_stub
+ input_cls = module.InputArea
+
+ class InputHarness(App[None]):
+ def compose(self) -> ComposeResult:
+ yield input_cls(id="input")
+
+ def on_mount(self) -> None:
+ self.query_one(input_cls).focus()
+
+ cls.InputArea = input_cls
+ cls.InputHarness = InputHarness
+
+ def run_async(self, coro):
+ return asyncio.run(coro)
+
+ def test_ctrl_v_pastes_from_textual_clipboard(self):
+ async def run():
+ async with self.InputHarness().run_test() as pilot:
+ input_area = pilot.app.query_one(self.InputArea)
+ pilot.app.copy_to_clipboard("hello from clipboard")
+
+ await pilot.press("ctrl+v")
+
+ self.assertEqual(input_area.text, "hello from clipboard")
+
+ self.run_async(run())
+
+ def test_bracketed_paste_normalizes_windows_newlines(self):
+ async def run():
+ async with self.InputHarness().run_test() as pilot:
+ input_area = pilot.app.query_one(self.InputArea)
+
+ pilot.app.post_message(events.Paste("first\r\nsecond"))
+ await pilot.pause()
+
+ self.assertEqual(input_area.text, "first\nsecond")
+
+ self.run_async(run())
+
+ def test_long_multiline_paste_uses_placeholder_but_submits_full_text(self):
+ async def run():
+ async with self.InputHarness().run_test() as pilot:
+ input_area = pilot.app.query_one(self.InputArea)
+
+ pilot.app.post_message(events.Paste("one\ntwo\nthree"))
+ await pilot.pause()
+
+ self.assertEqual(input_area.text, "[Pasted text #1 +3 lines]")
+ self.assertEqual(input_area.expand_placeholders(input_area.text), "one\ntwo\nthree")
+
+ self.run_async(run())
+
+ def test_ctrl_v_and_bracketed_paste_do_not_duplicate_same_payload(self):
+ async def run():
+ async with self.InputHarness().run_test() as pilot:
+ input_area = pilot.app.query_one(self.InputArea)
+ pilot.app.copy_to_clipboard("same")
+
+ await pilot.press("ctrl+v")
+ pilot.app.post_message(events.Paste("same"))
+ await pilot.pause()
+
+ self.assertEqual(input_area.text, "same")
+
+ self.run_async(run())
+
+
+if __name__ == "__main__":
+ unittest.main()