Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions frontends/stapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ def render_segments(segments, suffix=''):
# restores a multi-turn body).
st.markdown(_SUMMARY_TAG_RE.sub('', seg['content']) + suffix)

def _now_ts():
return time.strftime("%Y-%m-%d %H:%M:%S")

def _chat_msg(role, content, ts=None):
return {"role": role, "content": content, "time": ts or _now_ts()}

def _render_msg_time(msg):
ts = msg.get("time")
if ts: st.caption(ts)

def agent_backend_stream(prompt=None):
"""Drain main task display_queue.
- prompt given: start a fresh task; new dq is kept in session_state.
Expand Down Expand Up @@ -199,7 +209,10 @@ def agent_backend_stream(prompt=None):

def render_main_stream(prompt=None):
"""Render the assistant bubble for the main task (new or resumed). Saves final to messages."""
reply_ts = st.session_state.get('reply_ts') or _now_ts()
st.session_state.reply_ts = reply_ts
with st.chat_message("assistant"):
st.caption(reply_ts)
frozen = 0; live = st.empty(); response = ''
CURSOR = ' ▌'
for response in agent_backend_stream(prompt):
Expand All @@ -214,15 +227,17 @@ def render_main_stream(prompt=None):
with live.container(): render_segments([segs[i]])
if i < len(segs) - 1: live = st.empty()
if response:
st.session_state.messages.append({"role": "assistant", "content": response})
st.session_state.messages.append(_chat_msg("assistant", response, reply_ts))
st.session_state.last_reply_time = int(time.time())
st.session_state.reply_ts = ""

if "messages" not in st.session_state: st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
# 用 slot=st.empty() + with slot.container(): ... 的外壳,DOM 路径和流式渲染完全一致,跨 rerun 对齐
slot = st.empty()
with slot.container():
_render_msg_time(msg)
if msg["role"] == "assistant": render_segments(fold_turns(msg["content"]))
else: st.markdown(msg["content"])

Expand Down Expand Up @@ -255,10 +270,22 @@ def render_main_stream(prompt=None):
"e.key==='Enter'&&!e.shiftKey&&(e.isComposing||c||e.keyCode===229)&&"
"(e.stopImmediatePropagation(),e.preventDefault())},!0))})}"
"f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()")
_embed_html(f'<script>{_js_scroll_fix};{_js_ime_fix}</script>', height=0)
_js_homeend_fix = (
"!function(){if(window.parent.__homeEndFix)return;window.parent.__homeEndFix=1;"
"var d=window.parent.document;"
"function f(){d.querySelectorAll('textarea[data-testid=stChatInputTextArea]')"
".forEach(t=>{if(t.__homeEndFix)return;t.__homeEndFix=1;"
"t.addEventListener('keydown',e=>{"
"if(e.key==='Home'||e.key==='End'){"
"e.preventDefault();e.stopPropagation();"
"t.selectionStart=t.selectionEnd=(e.key==='Home')?0:t.value.length;"
"}},!0)})}"
"f();new MutationObserver(f).observe(d.body,{childList:1,subtree:1})}()"
)
_embed_html(f'<script>{_js_scroll_fix};{_js_ime_fix};{_js_homeend_fix}</script>', height=0)

if prompt := st.chat_input("any task?"):
ts = time.strftime("%Y-%m-%d %H:%M:%S")
ts = _now_ts()
cmd = (prompt or "").strip()
def _reset_and_rerun():
st.session_state.streaming = False
Expand All @@ -270,7 +297,7 @@ def _reset_and_rerun():
st.session_state.last_reply_time = int(time.time())
st.rerun()
if cmd == "/new":
st.session_state.messages = [{"role": "assistant", "content": reset_conversation(agent), "time": ts}]
st.session_state.messages = [_chat_msg("assistant", reset_conversation(agent), ts)]
_reset_and_rerun()
if cmd.startswith("/continue"):
m = re.match(r'/continue\s+(\d+)\s*$', cmd.strip())
Expand All @@ -280,18 +307,18 @@ def _reset_and_rerun():
target = sessions[idx][0] if 0 <= idx < len(sessions) else None
result = handle_frontend_command(agent, cmd)
history = extract_ui_messages(target) if target and result.startswith('✅') else None
tail = [{"role": "assistant", "content": result, "time": ts}]
tail = [_chat_msg("assistant", result, ts)]
if history:
st.session_state.messages = history + tail
else:
st.session_state.messages = list(st.session_state.messages) + \
[{"role": "user", "content": cmd, "time": ts}] + tail
[_chat_msg("user", cmd, ts)] + tail
_reset_and_rerun()
if cmd.startswith("/btw"):
answer = btw_handle_frontend(agent, cmd) # sync; bypasses put_task → main agent.run() untouched
st.session_state.messages = list(st.session_state.messages) + [
{"role": "user", "content": prompt, "time": ts},
{"role": "assistant", "content": answer, "time": ts},
_chat_msg("user", prompt, ts),
_chat_msg("assistant", answer, ts),
]
st.rerun() # preserve display_queue/partial_response so resume path drains the running main task
if cmd.startswith("/export"):
Expand Down Expand Up @@ -322,15 +349,17 @@ def _reset_and_rerun():
except Exception as e:
result = f"❌ 导出失败: {e}"
st.session_state.messages = list(st.session_state.messages) + [
{"role": "user", "content": cmd, "time": ts},
{"role": "assistant", "content": result, "time": ts},
_chat_msg("user", cmd, ts),
_chat_msg("assistant", result, ts),
]
_reset_and_rerun()
# Regular prompt: any in-flight task will be aborted by the finally block in
# agent_backend_stream when StopException interrupts the prior generator.
st.session_state.messages.append({"role": "user", "content": prompt})
st.session_state.messages.append(_chat_msg("user", prompt, ts))
if hasattr(agent, '_pet_req') and not prompt.startswith('/'): agent._pet_req('state=walk')
with st.chat_message("user"): st.markdown(prompt)
with st.chat_message("user"):
st.caption(ts)
st.markdown(prompt)
render_main_stream(prompt)
elif st.session_state.get('display_queue') is not None:
# No new prompt but a task is mid-flight (typically a /btw rerun) — resume drain.
Expand Down
46 changes: 31 additions & 15 deletions frontends/tuiapp_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,9 @@ class InputArea(TextArea):
Binding("alt+enter", "newline", "Newline", show=False),
Binding("ctrl+enter", "newline", "Newline", show=False),
Binding("shift+enter", "newline", "Newline", show=False),
# 拆掉父类 ctrl+v:父类会走 action_paste 从 app.clipboard 再插一次,
# 和终端 bracketed paste 触发的 _on_paste 双重插入 → 单行粘贴会重复
Binding("ctrl+v", "noop", "Noop", show=False),
Binding("ctrl+v", "paste", "Paste", show=False),
]

def action_noop(self) -> None:
pass

class Submitted(Message):
def __init__(self, input_area: "InputArea", value: str) -> None:
super().__init__()
Expand All @@ -247,6 +242,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._pastes: dict[int, str] = {}
self._paste_counter = 0
self._last_paste: tuple[str, str, float] = ("", "", 0.0)

def expand_placeholders(self, text: str) -> str:
def repl(m):
Expand All @@ -259,15 +255,18 @@ def reset(self) -> None:
self._pastes.clear()
self._paste_counter = 0

def action_newline(self) -> None:
result = self._replace_via_keyboard("\n", *self.selection)
if result:
self.move_cursor(result.end_location)
def _insert_paste_text(self, text: str, source: str) -> bool:
text = (text or "").replace("\r\n", "\n").replace("\r", "\n")
if not text:
return False

normalized = text
last_source, last_text, last_at = self._last_paste
now = time.monotonic()
if normalized == last_text and source != last_source and now - last_at < 0.25:
self._last_paste = (source, normalized, now)
return True

async def _on_paste(self, event: events.Paste) -> None:
if self.read_only:
return
text = event.text.replace("\r\n", "\n").replace("\r", "\n")
line_count = len(text.splitlines()) or 1
if line_count > 2:
self._paste_counter += 1
Expand All @@ -278,7 +277,24 @@ async def _on_paste(self, event: events.Paste) -> None:
if result:
self.move_cursor(result.end_location)
self.focus()
event.stop(); event.prevent_default()
self._last_paste = (source, normalized, now)
return result is not None

def action_newline(self) -> None:
result = self._replace_via_keyboard("\n", *self.selection)
if result:
self.move_cursor(result.end_location)

def action_paste(self) -> None:
if self.read_only:
return
self._insert_paste_text(self.app.clipboard, "key")

async def _on_paste(self, event: events.Paste) -> None:
if self.read_only:
return
if self._insert_paste_text(event.text, "event"):
event.stop(); event.prevent_default()

async def _on_key(self, event: events.Key) -> None:
try:
Expand Down
106 changes: 106 additions & 0 deletions tests/test_tuiapp_v2_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio
import contextlib
import io
import importlib
import sys
import unittest
from pathlib import Path

try:
from textual import events
from textual.app import App, ComposeResult
except ModuleNotFoundError: # pragma: no cover - optional UI dependency
events = None
App = object
ComposeResult = object


@unittest.skipIf(events is None, "textual is not installed")
class TuiAppV2InputPasteTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
repo_root = Path(__file__).resolve().parents[1]
frontends = repo_root / "frontends"
for path in (str(repo_root), str(frontends)):
if path not in sys.path:
sys.path.insert(0, path)

saved_modules = {}
for name in ("agentmain", "chatapp_common", "continue_cmd", "llmcore"):
saved_modules[name] = sys.modules.pop(name, None)
try:
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
module = importlib.import_module("frontends.tuiapp_v2")
finally:
for name, module_stub in saved_modules.items():
if module_stub is not None:
sys.modules[name] = module_stub
input_cls = module.InputArea

class InputHarness(App[None]):
def compose(self) -> ComposeResult:
yield input_cls(id="input")

def on_mount(self) -> None:
self.query_one(input_cls).focus()

cls.InputArea = input_cls
cls.InputHarness = InputHarness

def run_async(self, coro):
return asyncio.run(coro)

def test_ctrl_v_pastes_from_textual_clipboard(self):
async def run():
async with self.InputHarness().run_test() as pilot:
input_area = pilot.app.query_one(self.InputArea)
pilot.app.copy_to_clipboard("hello from clipboard")

await pilot.press("ctrl+v")

self.assertEqual(input_area.text, "hello from clipboard")

self.run_async(run())

def test_bracketed_paste_normalizes_windows_newlines(self):
async def run():
async with self.InputHarness().run_test() as pilot:
input_area = pilot.app.query_one(self.InputArea)

pilot.app.post_message(events.Paste("first\r\nsecond"))
await pilot.pause()

self.assertEqual(input_area.text, "first\nsecond")

self.run_async(run())

def test_long_multiline_paste_uses_placeholder_but_submits_full_text(self):
async def run():
async with self.InputHarness().run_test() as pilot:
input_area = pilot.app.query_one(self.InputArea)

pilot.app.post_message(events.Paste("one\ntwo\nthree"))
await pilot.pause()

self.assertEqual(input_area.text, "[Pasted text #1 +3 lines]")
self.assertEqual(input_area.expand_placeholders(input_area.text), "one\ntwo\nthree")

self.run_async(run())

def test_ctrl_v_and_bracketed_paste_do_not_duplicate_same_payload(self):
async def run():
async with self.InputHarness().run_test() as pilot:
input_area = pilot.app.query_one(self.InputArea)
pilot.app.copy_to_clipboard("same")

await pilot.press("ctrl+v")
pilot.app.post_message(events.Paste("same"))
await pilot.pause()

self.assertEqual(input_area.text, "same")

self.run_async(run())


if __name__ == "__main__":
unittest.main()