From 6851d112b6f2b48625c90628b447a3b70dbd4b83 Mon Sep 17 00:00:00 2001 From: A5rocks Date: Mon, 22 Jun 2026 17:26:54 -0400 Subject: [PATCH 1/4] Switch to pyrepl --- .github/workflows/ci.yml | 3 + src/trio/_repl.py | 145 ++++++++++------- src/trio/_tests/test_repl.py | 300 ++++++++++++++--------------------- test-requirements.in | 1 + test-requirements.txt | 2 + 5 files changed, 214 insertions(+), 237 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1bdb7a8bbc..dc8284f172 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -301,6 +301,7 @@ jobs: run: ./ci.sh env: NO_TEST_REQUIREMENTS: '${{ matrix.no_test_requirements }}' + TERM: xterm - if: >- always() && matrix.check_formatting != '1' @@ -346,6 +347,8 @@ jobs: check-latest: true - name: Run tests run: ./ci.sh + env: + TERM: xterm - if: always() uses: codecov/codecov-action@v5 with: diff --git a/src/trio/_repl.py b/src/trio/_repl.py index 6b5612e28f..b72d9a8c14 100644 --- a/src/trio/_repl.py +++ b/src/trio/_repl.py @@ -1,47 +1,58 @@ from __future__ import annotations import ast -import contextlib import inspect import sys import warnings from code import InteractiveConsole +from signal import SIGINT, raise_signal, signal from types import CodeType, FrameType, FunctionType from typing import TYPE_CHECKING import outcome import trio -import trio.lowlevel from trio._util import final if TYPE_CHECKING: from collections.abc import Callable - -class SuppressDecorator(contextlib.ContextDecorator, contextlib.suppress): - pass - - -@SuppressDecorator(KeyboardInterrupt) -@trio.lowlevel.disable_ki_protection -def terminal_newline() -> None: # TODO: test this line - import fcntl - import termios - - # Fake up a newline char as if user had typed it at the terminal +try: + import pyrepl + from pyrepl import commands, reader as r, readline +except ImportError: try: - fcntl.ioctl(sys.stdin, termios.TIOCSTI, b"\n") # type: ignore[attr-defined, unused-ignore] - except OSError as e: - print(f"\nPress enter! Newline injection failed: {e}", end="", flush=True) + import _pyrepl as pyrepl + from _pyrepl import commands, reader as r, readline + except ImportError: + print( + "Trio's REPL requires CPython 3.13+, PyPy, or installing pyrepl from PyPI." + ) + exit(1) + +# there are differences between the CPython pyrepl and PyPI pyrepl +try: + # The following expression fails on PyPy, even though you can + # `import pyrepl`. This is important because PyPy simply vendors + # CPython pyrepl: https://github.com/pypy/pypy/issues/4990 + pyrepl.__version__ # noqa: B018 +except AttributeError: + CPYTHON_VENDOR = True +else: + CPYTHON_VENDOR = False @final class TrioInteractiveConsole(InteractiveConsole): - def __init__(self, repl_locals: dict[str, object] | None = None) -> None: + def __init__( # type: ignore[no-any-unimported] + self, + repl_locals: dict[str, object] | None = None, + reader: r.Reader | None = None, + ) -> None: super().__init__(locals=repl_locals) - self.token: trio.lowlevel.TrioToken | None = None self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT + self.reader = reader + self.trim_first_char = False self.interrupted = False def runcode(self, code: CodeType) -> None: @@ -71,53 +82,56 @@ def runcode(self, code: CodeType) -> None: # We always use sys.excepthook, unlike other implementations. # This means that overriding self.write also does nothing to tbs. sys.excepthook(sys.last_type, sys.last_value, sys.last_traceback) + # clear any residual KI trio.from_thread.run(trio.lowlevel.checkpoint_if_cancelled) # trio.from_thread.check_cancelled() has too long of a memory - if sys.platform == "win32": # TODO: test this line - - def raw_input(self, prompt: str = "") -> str: - try: - return input(prompt) - except EOFError: - # check if trio has a pending KI - trio.from_thread.run(trio.lowlevel.checkpoint_if_cancelled) - raise - - else: - - def raw_input(self, prompt: str = "") -> str: - from signal import SIGINT, signal + def raw_input(self, prompt: str = "") -> str: + def install_handler() -> Callable[[int, FrameType | None], None] | int | None: + def handler(sig: int, frame: FrameType | None) -> None: + self.interrupted = True - assert not self.interrupted + return signal(SIGINT, handler) - def install_handler() -> ( - Callable[[int, FrameType | None], None] | int | None - ): - def handler( - sig: int, frame: FrameType | None - ) -> None: # TODO: test this line - self.interrupted = True - token.run_sync_soon(terminal_newline, idempotent=True) - - token = trio.lowlevel.current_trio_token() - - return signal(SIGINT, handler) - - prev_handler = trio.from_thread.run_sync(install_handler) - try: - return input(prompt) - finally: - trio.from_thread.run_sync(signal, SIGINT, prev_handler) - if self.interrupted: # TODO: test this line - raise KeyboardInterrupt + prev_handler = trio.from_thread.run_sync(install_handler) + assert self.reader is not None + self.reader.ps1 = prompt + self.interrupted = False + self.reader.prepare() + try: + self.reader.refresh() + while not self.reader.finished and not self.interrupted: + if not self.reader.handle1(block=False): + # let's avoid busy waiting + if CPYTHON_VENDOR: + self.reader.console.wait(100) + else: + self.reader.console.pollob.poll(100) + + if self.interrupted: + if not CPYTHON_VENDOR: + self.trim_first_char = True + raise KeyboardInterrupt + if CPYTHON_VENDOR: + return self.reader.get_unicode() # type: ignore[no-any-return] + else: + return self.reader.get_str() # type: ignore[no-any-return] + finally: + trio.from_thread.run_sync(signal, SIGINT, prev_handler) + self.reader.restore() + + if not CPYTHON_VENDOR: + # pyrepl has some special handling to make sure that + # the console is always ended in `\r\n` when done. + # However, InteractiveConsole assumes that the input + # was exited without a newline! So we need this hack. def write(self, output: str) -> None: - if self.interrupted: # TODO: test this line + if self.trim_first_char: assert output == "\nKeyboardInterrupt\n" sys.stderr.write(output[1:]) - self.interrupted = False + self.trim_first_char = False else: sys.stderr.write(output) @@ -141,9 +155,6 @@ async def run_repl(console: TrioInteractiveConsole) -> None: def main(original_locals: dict[str, object]) -> None: - with contextlib.suppress(ImportError): - import readline # noqa: F401 - repl_locals: dict[str, object] = {"trio": trio} for key in { "__name__", @@ -155,5 +166,19 @@ def main(original_locals: dict[str, object]) -> None: }: repl_locals[key] = original_locals[key] - console = TrioInteractiveConsole(repl_locals) + # This call also registers all necessary signal handlers. + # Otherwise, we would not be able to run `multiline_input` in a + # child thread. + reader = readline._get_reader() + + if not CPYTHON_VENDOR: + # The default `interrupt` command finishes the console, which + # adds an extra newline. Unforgivable! + class interrupt(commands.FinishCommand): # type: ignore[misc,no-any-unimported] + def do(self) -> None: + raise_signal(SIGINT) + + reader.commands["interrupt"] = interrupt + + console = TrioInteractiveConsole(repl_locals, reader) trio.run(run_repl, console) diff --git a/src/trio/_tests/test_repl.py b/src/trio/_tests/test_repl.py index 28075d551c..731696bd38 100644 --- a/src/trio/_tests/test_repl.py +++ b/src/trio/_tests/test_repl.py @@ -1,16 +1,29 @@ from __future__ import annotations +import errno import os -import pathlib +import re import signal -import subprocess import sys -from functools import partial from typing import Protocol import pytest -import trio._repl +from trio._tests.pytest_plugin import SKIP_OPTIONAL_IMPORTS + +if sys.platform == "win32" and sys.version_info < (3, 13): + pytest.skip("PyPI pyrepl only supports unix", allow_module_level=True) + +try: + import trio._repl +except SystemExit: + if SKIP_OPTIONAL_IMPORTS: + pytest.skip( + "we exit out of the REPL quickly if no pyrepl is found", + allow_module_level=True, + ) + else: + raise class RawInput(Protocol): @@ -230,33 +243,7 @@ async def test_base_exception_capture_from_coroutine( assert "AFTER BaseException" in out -def test_main_entrypoint() -> None: - """ - Basic smoke test when running via the package __main__ entrypoint. - """ - repl = subprocess.run([sys.executable, "-m", "trio"], input=b"exit()") - assert repl.returncode == 0 - - -def should_try_newline_injection() -> bool: - if sys.platform != "linux": - return False - - sysctl = pathlib.Path("/proc/sys/dev/tty/legacy_tiocsti") - if not sysctl.exists(): # pragma: no cover - return True - - else: - return sysctl.read_text() == "1" - - -@pytest.mark.skipif( - not should_try_newline_injection(), - reason="the ioctl we use is disabled in CI", -) -def test_ki_newline_injection() -> None: # TODO: test this line - # TODO: we want to remove this functionality, eg by using vendored - # pyrepls. +def start_repl() -> tuple[int, int]: assert sys.platform != "win32" import pty @@ -269,160 +256,119 @@ def test_ki_newline_injection() -> None: # TODO: test this line if pid == 0: os.execlp(sys.executable, *[sys.executable, "-u", "-m", "trio"]) + return pid, pty_fd + + +def read_until(buffer: bytearray, expected: bytes, fd: int) -> None: + while expected not in strip_some_escape_chars(buffer): + try: + res = os.read(fd, 4096) + except OSError as e: + if e.errno == errno.EIO: + break # process died + else: + print(buffer, res) + raise + + if res == b"": + break + buffer += res + + if b"_pyrepl.unix_console.InvalidTerminal" in buffer: + pytest.skip("this PTY doesn't support the necessary operations") + + +def write_out(text: bytes, buffer: bytearray, fd: int) -> None: + # for some reason this is required for pypy :( + small_buffer = bytearray() + for i, char in enumerate(text): + os.write(fd, bytes([char])) + read_until(small_buffer, text[: i + 1], fd) + + buffer += small_buffer + + +def strip_some_escape_chars(buffer: bytearray) -> bytes: + # https://superuser.com/a/380778 + return re.sub(b"\x1b" + rb"\[.*?[\x40-\x7E]", b"", buffer) + + +@pytest.mark.skipif( + sys.platform == "win32", + reason="Python doesn't support ConPTY, so we can't make the right environment for the REPL.", +) +def test_main_entrypoint() -> None: + """ + Basic smoke test when running via the package __main__ entrypoint. + """ + pid, fd = start_repl() + # setup: - buffer = b"" - while not buffer.endswith(b"import trio\r\n>>> "): - buffer += os.read(pty_fd, 4096) + buffer = bytearray() + read_until(buffer, b"import trio", fd) - # sanity check: - print(buffer.decode()) - buffer = b"" - os.write(pty_fd, b'print("hello!")\n') - while not buffer.endswith(b">>> "): - buffer += os.read(pty_fd, 4096) + buffer = buffer.split(b"import trio")[-1] + read_until(buffer, b">>>", fd) - assert buffer.count(b"hello!") == 2 + # just exit: + write_out(b"exit()", buffer, fd) + os.write(fd, b"\n") - # press ctrl+c - print(buffer.decode()) - buffer = b"" - os.kill(pid, signal.SIGINT) - while not buffer.endswith(b">>> "): - buffer += os.read(pty_fd, 4096) + # and flush any output: + buffer = bytearray() + read_until(buffer, b"something impossible", fd) - assert b"KeyboardInterrupt" in buffer + assert os.waitpid(pid, 0)[1] == 0 - # press ctrl+c later - print(buffer.decode()) - buffer = b"" - os.write(pty_fd, b'print("hello!")') - os.kill(pid, signal.SIGINT) - while not buffer.endswith(b">>> "): - buffer += os.read(pty_fd, 4096) - assert b"KeyboardInterrupt" in buffer +@pytest.mark.skipif( + sys.platform == "win32", + reason="Python doesn't support ConPTY, so we can't make the right environment for the REPL.", +) +def test_repl_ki() -> None: + pid, fd = start_repl() + + # setup: + buffer = bytearray() + read_until(buffer, b"import trio", fd) + + buffer = buffer.split(b"import trio")[-1] + read_until(buffer, b">>>", fd) + + # sanity check: print(buffer.decode()) - os.close(pty_fd) - os.waitpid(pid, 0)[1] + buffer = bytearray() + write_out(b'print("hello!") # mark', buffer, fd) + os.write(fd, b"\n") + read_until(buffer, b"# mark", fd) + buffer = buffer.split(b"# mark")[-1] + read_until(buffer, b">>>", fd) -async def test_ki_in_repl() -> None: - async with trio.open_nursery() as nursery: - proc = await nursery.start( - partial( - trio.run_process, - [sys.executable, "-u", "-m", "trio"], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=subprocess.PIPE, - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if sys.platform == "win32" else 0, # type: ignore[attr-defined,unused-ignore] - ) - ) + assert buffer.count(b"hello!") == 1 - async with proc.stdout: - # setup - buffer = b"" - async for part in proc.stdout: # pragma: no branch - buffer += part - # TODO: consider making run_process stdout have some universal newlines thing - if buffer.replace(b"\r\n", b"\n").endswith(b"import trio\n>>> "): - break - - # ensure things work - print(buffer.decode()) - buffer = b"" - await proc.stdin.send_all(b'print("hello!")\n') - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.endswith(b">>> "): - break - - assert b"hello!" in buffer - print(buffer.decode()) - - # this seems to be necessary on Windows for reasons - # (the parents of process groups ignore ctrl+c by default...) - if sys.platform == "win32": - buffer = b"" - await proc.stdin.send_all( - b"import ctypes; ctypes.windll.kernel32.SetConsoleCtrlHandler(None, False)\n" - ) - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.endswith(b">>> "): - break - - print(buffer.decode()) - - # try to decrease flakiness... - buffer = b"" - await proc.stdin.send_all( - b"import coverage; trio.lowlevel.enable_ki_protection(coverage.pytracer.PyTracer._trace)\n" - ) - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.endswith(b">>> "): - break - - print(buffer.decode()) - - # ensure that ctrl+c on a prompt works - # NOTE: for some reason, signal.SIGINT doesn't work for this test. - # Using CTRL_C_EVENT is also why we need subprocess.CREATE_NEW_PROCESS_GROUP - signal_sent = signal.CTRL_C_EVENT if sys.platform == "win32" else signal.SIGINT # type: ignore[attr-defined,unused-ignore] - os.kill(proc.pid, signal_sent) - if sys.platform == "win32": - # we rely on EOFError which... doesn't happen with pipes. - # I'm not sure how to fix it... - await proc.stdin.send_all(b"\n") - else: - # we test injection separately - await proc.stdin.send_all(b"\n") - - buffer = b"" - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.endswith(b">>> "): - break - - assert b"KeyboardInterrupt" in buffer - - # ensure ctrl+c while a command runs works - print(buffer.decode()) - await proc.stdin.send_all(b'print("READY"); await trio.sleep_forever()\n') - killed = False - buffer = b"" - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.replace(b"\r\n", b"\n").endswith(b"READY\n") and not killed: - os.kill(proc.pid, signal_sent) - killed = True - if buffer.endswith(b">>> "): - break - - assert b"trio" in buffer - assert b"KeyboardInterrupt" in buffer - - # make sure it works for sync commands too - # (though this would be hard to break) - print(buffer.decode()) - await proc.stdin.send_all( - b'import time; print("READY"); time.sleep(99999)\n' - ) - killed = False - buffer = b"" - async for part in proc.stdout: # pragma: no branch - buffer += part - if buffer.replace(b"\r\n", b"\n").endswith(b"READY\n") and not killed: - os.kill(proc.pid, signal_sent) - killed = True - if buffer.endswith(b">>> "): - break - - assert b"Traceback" in buffer - assert b"KeyboardInterrupt" in buffer - - print(buffer.decode()) - - # kill the process - nursery.cancel_scope.cancel() + # press ctrl+c + print(buffer.decode()) + if trio._repl.CPYTHON_VENDOR: + # would you believe me if I told you pyrepl has a bug where + # `UnixConsole.get_event(block=False)`... blocks? well anyways, + # that means this part of the test doesn't work. + buffer = bytearray() + os.kill(pid, signal.SIGINT) + read_until(buffer, b">>>", fd) + + assert b"KeyboardInterrupt" in buffer + + # press ctrl+c later + print(buffer.decode()) + buffer = bytearray() + write_out(b'print("hello!") # mark', buffer, fd) + read_until(buffer, b"# mark", fd) + + buffer = bytearray() + os.kill(pid, signal.SIGINT) + read_until(buffer, b">>>", fd) + + assert b"KeyboardInterrupt" in buffer + os.close(fd) + os.waitpid(pid, 0)[1] diff --git a/test-requirements.in b/test-requirements.in index 272da6a34c..e913c819c6 100644 --- a/test-requirements.in +++ b/test-requirements.in @@ -8,6 +8,7 @@ trustme # for the ssl + DTLS tests pylint # for pylint finding all symbols tests jedi; implementation_name == "cpython" # for jedi code completion tests cryptography>=41.0.0 # cryptography<41 segfaults on pypy3.10 +pyrepl; python_version < "3.13" and implementation_name != "pypy" # Tools black; implementation_name == "cpython" diff --git a/test-requirements.txt b/test-requirements.txt index 4b1aff1376..2551811f50 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -129,6 +129,8 @@ pylint==4.0.5 # via -r test-requirements.in pyopenssl==26.2.0 # via -r test-requirements.in +pyrepl==0.11.4 ; python_full_version < '3.13' and implementation_name != 'pypy' + # via -r test-requirements.in pyright==1.1.409 # via -r test-requirements.in pytest==9.0.3 From 1287a1b596df1d9a1fc68f95004fb784e350ab63 Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 23 Jun 2026 02:17:44 -0400 Subject: [PATCH 2/4] Try addressing coverage --- src/trio/_repl.py | 4 ++-- src/trio/_tests/test_repl.py | 22 +++++++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/trio/_repl.py b/src/trio/_repl.py index b72d9a8c14..86119fb8b3 100644 --- a/src/trio/_repl.py +++ b/src/trio/_repl.py @@ -167,8 +167,8 @@ def main(original_locals: dict[str, object]) -> None: repl_locals[key] = original_locals[key] # This call also registers all necessary signal handlers. - # Otherwise, we would not be able to run `multiline_input` in a - # child thread. + # Otherwise, we would not be able to run `readline` in a child + # thread. reader = readline._get_reader() if not CPYTHON_VENDOR: diff --git a/src/trio/_tests/test_repl.py b/src/trio/_tests/test_repl.py index 731696bd38..2f9b2a7b1c 100644 --- a/src/trio/_tests/test_repl.py +++ b/src/trio/_tests/test_repl.py @@ -17,13 +17,11 @@ try: import trio._repl except SystemExit: - if SKIP_OPTIONAL_IMPORTS: - pytest.skip( - "we exit out of the REPL quickly if no pyrepl is found", - allow_module_level=True, - ) - else: - raise + assert SKIP_OPTIONAL_IMPORTS + pytest.skip( + "we exit out of the REPL quickly if no pyrepl is found", + allow_module_level=True, + ) class RawInput(Protocol): @@ -253,7 +251,7 @@ def start_repl() -> tuple[int, int]: # (which I don't know how to replicate... so I copied this # structure from pty.spawn...) pid, pty_fd = pty.fork() # type: ignore[attr-defined,unused-ignore] - if pid == 0: + if pid == 0: # pragma: no cover os.execlp(sys.executable, *[sys.executable, "-u", "-m", "trio"]) return pid, pty_fd @@ -264,11 +262,9 @@ def read_until(buffer: bytearray, expected: bytes, fd: int) -> None: try: res = os.read(fd, 4096) except OSError as e: - if e.errno == errno.EIO: - break # process died - else: - print(buffer, res) - raise + # process died + assert e.errno == errno.EIO # noqa: PT017 + break if res == b"": break From 93b58a8399286673847ef00d60d55436f37959d1 Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 23 Jun 2026 12:10:05 -0400 Subject: [PATCH 3/4] Fool around and remove the background thread --- src/trio/_repl.py | 119 +++++++++++++++-------------------- src/trio/_tests/test_repl.py | 71 ++++++++++----------- 2 files changed, 87 insertions(+), 103 deletions(-) diff --git a/src/trio/_repl.py b/src/trio/_repl.py index 86119fb8b3..896ea971f4 100644 --- a/src/trio/_repl.py +++ b/src/trio/_repl.py @@ -3,20 +3,15 @@ import ast import inspect import sys -import warnings from code import InteractiveConsole -from signal import SIGINT, raise_signal, signal -from types import CodeType, FrameType, FunctionType -from typing import TYPE_CHECKING +from signal import SIGINT, raise_signal +from types import CodeType, FunctionType import outcome import trio from trio._util import final -if TYPE_CHECKING: - from collections.abc import Callable - try: import pyrepl from pyrepl import commands, reader as r, readline @@ -47,13 +42,9 @@ class TrioInteractiveConsole(InteractiveConsole): def __init__( # type: ignore[no-any-unimported] self, repl_locals: dict[str, object] | None = None, - reader: r.Reader | None = None, ) -> None: super().__init__(locals=repl_locals) self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT - self.reader = reader - self.trim_first_char = False - self.interrupted = False def runcode(self, code: CodeType) -> None: func = FunctionType(code, self.locals) @@ -87,71 +78,63 @@ def runcode(self, code: CodeType) -> None: trio.from_thread.run(trio.lowlevel.checkpoint_if_cancelled) # trio.from_thread.check_cancelled() has too long of a memory - def raw_input(self, prompt: str = "") -> str: - def install_handler() -> Callable[[int, FrameType | None], None] | int | None: - def handler(sig: int, frame: FrameType | None) -> None: - self.interrupted = True - - return signal(SIGINT, handler) - - prev_handler = trio.from_thread.run_sync(install_handler) - assert self.reader is not None - self.reader.ps1 = prompt - self.interrupted = False - self.reader.prepare() - try: - self.reader.refresh() - while not self.reader.finished and not self.interrupted: - if not self.reader.handle1(block=False): - # let's avoid busy waiting - if CPYTHON_VENDOR: - self.reader.console.wait(100) - else: - self.reader.console.pollob.poll(100) - - if self.interrupted: - if not CPYTHON_VENDOR: - self.trim_first_char = True - raise KeyboardInterrupt - if CPYTHON_VENDOR: - return self.reader.get_unicode() # type: ignore[no-any-return] - else: - return self.reader.get_str() # type: ignore[no-any-return] - finally: - trio.from_thread.run_sync(signal, SIGINT, prev_handler) - self.reader.restore() +async def repl_input(reader: r.Reader | None, prompt: str) -> str: + assert reader is not None + reader.ps1 = prompt + reader.prepare() + try: + reader.refresh() + while not reader.finished: + if not reader.handle1(block=False): + if sys.platform == "win32": + await trio.lowlevel.wait_readable(pyrepl.windows_console.InHandle) + else: + await trio.lowlevel.wait_readable(reader.console.input_fd) + + if CPYTHON_VENDOR: + return reader.get_unicode() # type: ignore[no-any-return] + else: + return reader.get_str() # type: ignore[no-any-return] + finally: + reader.restore() - if not CPYTHON_VENDOR: - # pyrepl has some special handling to make sure that - # the console is always ended in `\r\n` when done. - # However, InteractiveConsole assumes that the input - # was exited without a newline! So we need this hack. - def write(self, output: str) -> None: - if self.trim_first_char: - assert output == "\nKeyboardInterrupt\n" - sys.stderr.write(output[1:]) - self.trim_first_char = False - else: - sys.stderr.write(output) +async def run_repl(console: TrioInteractiveConsole, reader: r.Reader | None) -> None: + # mostly copy-pasted from code.InteractiveConsole.interact + try: + sys.ps1 # noqa: B018 + except AttributeError: + sys.ps1 = ">>> " + try: + sys.ps2 # noqa: B018 + except AttributeError: + sys.ps2 = "... " -async def run_repl(console: TrioInteractiveConsole) -> None: banner = ( f"trio REPL {sys.version} on {sys.platform}\n" f'Use "await" directly instead of "trio.run()".\n' f'Type "help", "copyright", "credits" or "license" ' f"for more information.\n" - f'{getattr(sys, "ps1", ">>> ")}import trio' + f'{getattr(sys, "ps1", ">>> ")}import trio\n' ) - try: - await trio.to_thread.run_sync(console.interact, banner) - finally: - warnings.filterwarnings( - "ignore", - message=r"^coroutine .* was never awaited$", - category=RuntimeWarning, - ) + console.write(banner) + more = 0 + + while True: + try: + prompt = sys.ps2 if more else sys.ps1 + try: + line = await repl_input(reader, prompt) + except EOFError: + console.write("\n") + break + else: + more = await trio.to_thread.run_sync(console.push, line) + except KeyboardInterrupt: + console.write("\nKeyboardInterrupt\n") + console.resetbuffer() + more = 0 def main(original_locals: dict[str, object]) -> None: @@ -180,5 +163,5 @@ def do(self) -> None: reader.commands["interrupt"] = interrupt - console = TrioInteractiveConsole(repl_locals, reader) - trio.run(run_repl, console) + console = TrioInteractiveConsole(repl_locals) + trio.run(run_repl, console, reader) diff --git a/src/trio/_tests/test_repl.py b/src/trio/_tests/test_repl.py index 2f9b2a7b1c..04c0413448 100644 --- a/src/trio/_tests/test_repl.py +++ b/src/trio/_tests/test_repl.py @@ -5,12 +5,15 @@ import re import signal import sys -from typing import Protocol +from typing import TYPE_CHECKING, Protocol import pytest from trio._tests.pytest_plugin import SKIP_OPTIONAL_IMPORTS +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + if sys.platform == "win32" and sys.version_info < (3, 13): pytest.skip("PyPI pyrepl only supports unix", allow_module_level=True) @@ -24,35 +27,33 @@ ) -class RawInput(Protocol): +class ReplInput(Protocol): def __call__(self, prompt: str = "") -> str: ... -def build_raw_input(cmds: list[str]) -> RawInput: +def build_repl_input(cmds: list[str]) -> Callable[[object, str], Awaitable[str]]: """ Pass in a list of strings. Returns a callable that returns each string, each time its called When there are not more strings to return, raise EOFError """ cmds_iter = iter(cmds) - prompts = [] - def _raw_helper(prompt: str = "") -> str: - prompts.append(prompt) + async def _repl_input_helper(_reader: object, _prompt: str) -> str: try: return next(cmds_iter) except StopIteration: raise EOFError from None - return _raw_helper + return _repl_input_helper -def test_build_raw_input() -> None: +async def test_build_repl_input() -> None: """Quick test of our helper function.""" - raw_input = build_raw_input(["cmd1"]) - assert raw_input() == "cmd1" + repl_input = build_repl_input(["cmd1"]) + assert await repl_input(None, None) == "cmd1" with pytest.raises(EOFError): - raw_input() + await repl_input(None, None) async def test_basic_interaction( @@ -64,7 +65,7 @@ async def test_basic_interaction( Ensure that the interpreted prints the expected results. """ console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ # evaluate simple expression and recall the value "x = 1", @@ -86,22 +87,22 @@ async def test_basic_interaction( "sys.stdout.write('hello stdout\\n')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, _err = capsys.readouterr() assert out.splitlines() == ["x=1", "'hello'", "2", "4", "hello stdout", "13"] async def test_system_exits_quit_interpreter(monkeypatch: pytest.MonkeyPatch) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ "raise SystemExit", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) with pytest.raises(SystemExit): - await trio._repl.run_repl(console) + await trio._repl.run_repl(console, None) async def test_KI_interrupts( @@ -109,7 +110,7 @@ async def test_KI_interrupts( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ "import signal, trio, trio.lowlevel", "async def f():", @@ -124,8 +125,8 @@ async def test_KI_interrupts( "print('AFTER KeyboardInterrupt')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, err = capsys.readouterr() assert "KeyboardInterrupt" in err assert "should" not in out @@ -137,7 +138,7 @@ async def test_system_exits_in_exc_group( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ "import sys", "if sys.version_info < (3, 11):", @@ -147,8 +148,8 @@ async def test_system_exits_in_exc_group( "print('AFTER BaseExceptionGroup')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, _err = capsys.readouterr() # assert that raise SystemExit in an exception group # doesn't quit @@ -160,7 +161,7 @@ async def test_system_exits_in_nested_exc_group( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ "import sys", "if sys.version_info < (3, 11):", @@ -171,8 +172,8 @@ async def test_system_exits_in_nested_exc_group( "print('AFTER BaseExceptionGroup')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, _err = capsys.readouterr() # assert that raise SystemExit in an exception group # doesn't quit @@ -184,15 +185,15 @@ async def test_base_exception_captured( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ # The statement after raise should still get executed "raise BaseException", "print('AFTER BaseException')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, err = capsys.readouterr() assert "_threads.py" not in err assert "_repl.py" not in err @@ -204,15 +205,15 @@ async def test_exc_group_captured( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ # The statement after raise should still get executed "raise ExceptionGroup('', [KeyError()])", "print('AFTER ExceptionGroup')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, _err = capsys.readouterr() assert "AFTER ExceptionGroup" in out @@ -222,7 +223,7 @@ async def test_base_exception_capture_from_coroutine( monkeypatch: pytest.MonkeyPatch, ) -> None: console = trio._repl.TrioInteractiveConsole() - raw_input = build_raw_input( + repl_input = build_repl_input( [ "async def async_func_raises_base_exception():", " raise BaseException", @@ -233,8 +234,8 @@ async def test_base_exception_capture_from_coroutine( "print('AFTER BaseException')", ], ) - monkeypatch.setattr(console, "raw_input", raw_input) - await trio._repl.run_repl(console) + monkeypatch.setattr(trio._repl, "repl_input", repl_input) + await trio._repl.run_repl(console, None) out, err = capsys.readouterr() assert "_threads.py" not in err assert "_repl.py" not in err From e876d780ee699e64969c842def05c7cca6e54dd6 Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 23 Jun 2026 18:52:25 -0400 Subject: [PATCH 4/4] Fix any CI failures --- newsfragments/3007.bugfix.rst | 6 ++++++ src/trio/_repl.py | 15 ++++++++------- src/trio/_tests/test_repl.py | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) create mode 100644 newsfragments/3007.bugfix.rst diff --git a/newsfragments/3007.bugfix.rst b/newsfragments/3007.bugfix.rst new file mode 100644 index 0000000000..98fa369ca7 --- /dev/null +++ b/newsfragments/3007.bugfix.rst @@ -0,0 +1,6 @@ +Switch ``python -m trio`` to use ``pyrepl``. This means that keyboard +interrupts should be processed more consistently, as well as matching +the builtin Python REPL more. One downside is that now using the trio +REPL requires PyPy, CPython 3.13+, or installing the ``pyrepl`` package +from PyPI. We recommend the first two options, as our support for the +version on PyPI is worse. diff --git a/src/trio/_repl.py b/src/trio/_repl.py index 896ea971f4..496e56d462 100644 --- a/src/trio/_repl.py +++ b/src/trio/_repl.py @@ -6,6 +6,7 @@ from code import InteractiveConsole from signal import SIGINT, raise_signal from types import CodeType, FunctionType +from typing import cast import outcome @@ -39,7 +40,7 @@ @final class TrioInteractiveConsole(InteractiveConsole): - def __init__( # type: ignore[no-any-unimported] + def __init__( self, repl_locals: dict[str, object] | None = None, ) -> None: @@ -79,7 +80,7 @@ def runcode(self, code: CodeType) -> None: # trio.from_thread.check_cancelled() has too long of a memory -async def repl_input(reader: r.Reader | None, prompt: str) -> str: +async def repl_input(reader: r.Reader | None, prompt: str) -> str: # type: ignore[no-any-unimported] assert reader is not None reader.ps1 = prompt reader.prepare() @@ -87,7 +88,7 @@ async def repl_input(reader: r.Reader | None, prompt: str) -> str: reader.refresh() while not reader.finished: if not reader.handle1(block=False): - if sys.platform == "win32": + if sys.platform == "win32": # TODO: test this line await trio.lowlevel.wait_readable(pyrepl.windows_console.InHandle) else: await trio.lowlevel.wait_readable(reader.console.input_fd) @@ -100,7 +101,7 @@ async def repl_input(reader: r.Reader | None, prompt: str) -> str: reader.restore() -async def run_repl(console: TrioInteractiveConsole, reader: r.Reader | None) -> None: +async def run_repl(console: TrioInteractiveConsole, reader: r.Reader | None) -> None: # type: ignore[no-any-unimported] # mostly copy-pasted from code.InteractiveConsole.interact try: sys.ps1 # noqa: B018 @@ -123,7 +124,7 @@ async def run_repl(console: TrioInteractiveConsole, reader: r.Reader | None) -> while True: try: - prompt = sys.ps2 if more else sys.ps1 + prompt = cast("str", sys.ps2 if more else sys.ps1) try: line = await repl_input(reader, prompt) except EOFError: @@ -131,7 +132,7 @@ async def run_repl(console: TrioInteractiveConsole, reader: r.Reader | None) -> break else: more = await trio.to_thread.run_sync(console.push, line) - except KeyboardInterrupt: + except KeyboardInterrupt: # TODO: test this line console.write("\nKeyboardInterrupt\n") console.resetbuffer() more = 0 @@ -158,7 +159,7 @@ def main(original_locals: dict[str, object]) -> None: # The default `interrupt` command finishes the console, which # adds an extra newline. Unforgivable! class interrupt(commands.FinishCommand): # type: ignore[misc,no-any-unimported] - def do(self) -> None: + def do(self) -> None: # TODO: test this line raise_signal(SIGINT) reader.commands["interrupt"] = interrupt diff --git a/src/trio/_tests/test_repl.py b/src/trio/_tests/test_repl.py index 04c0413448..5da0003dd7 100644 --- a/src/trio/_tests/test_repl.py +++ b/src/trio/_tests/test_repl.py @@ -51,9 +51,9 @@ async def _repl_input_helper(_reader: object, _prompt: str) -> str: async def test_build_repl_input() -> None: """Quick test of our helper function.""" repl_input = build_repl_input(["cmd1"]) - assert await repl_input(None, None) == "cmd1" + assert await repl_input(None, "x") == "cmd1" with pytest.raises(EOFError): - await repl_input(None, None) + await repl_input(None, "x") async def test_basic_interaction(