diff --git a/src/cleo/formatters/formatter.py b/src/cleo/formatters/formatter.py index 0ac13118..d6d4fbd7 100644 --- a/src/cleo/formatters/formatter.py +++ b/src/cleo/formatters/formatter.py @@ -2,6 +2,7 @@ import re +from contextvars import ContextVar from typing import ClassVar from cleo.exceptions import CleoValueError @@ -18,6 +19,9 @@ def __init__( self, decorated: bool = False, styles: dict[str, Style] | None = None ) -> None: self._decorated = decorated + self._decoration_suppressed: ContextVar[bool] = ContextVar( + "decoration_suppressed", default=False + ) self._styles: dict[str, Style] = {} self.set_style("error", Style("red", options=["bold"])) @@ -120,14 +124,11 @@ def format_and_wrap(self, message: str, width: int) -> str: return output.replace("\0", "\\").replace("\\<", "<") def remove_format(self, text: str) -> str: - decorated = self._decorated - - self._decorated = False - text = re.sub(r"\033\[[^m]*m", "", self.format(text)) - - self._decorated = decorated - - return text + token = self._decoration_suppressed.set(True) + try: + return re.sub(r"\033\[[^m]*m", "", self.format(text)) + finally: + self._decoration_suppressed.reset(token) def _create_style_from_string(self, string: str) -> Style | None: if string in self._styles: @@ -165,7 +166,7 @@ def _apply_current_style( return "", current_line_length if not width: - if self.is_decorated(): + if self._should_decorate(): return self._style_stack.current.apply(text), current_line_length return text, current_line_length @@ -193,8 +194,11 @@ def _apply_current_style( if current_line_length >= width: current_line_length = 0 - if self.is_decorated(): + if self._should_decorate(): apply = self._style_stack.current.apply text = "\n".join(map(apply, lines)) return text, current_line_length + + def _should_decorate(self) -> bool: + return self.is_decorated() and not self._decoration_suppressed.get() diff --git a/tests/io/outputs/test_section_output.py b/tests/io/outputs/test_section_output.py index 2a9fcabd..13eb159f 100644 --- a/tests/io/outputs/test_section_output.py +++ b/tests/io/outputs/test_section_output.py @@ -1,10 +1,14 @@ from __future__ import annotations from io import StringIO +from threading import Event +from threading import Thread import pytest +from cleo.formatters.formatter import Formatter from cleo.io.outputs.section_output import SectionOutput +from cleo.io.outputs.stream_output import StreamOutput @pytest.fixture() @@ -109,3 +113,40 @@ def test_multiple_sections_output( stream.read() == "Foo\nBar\n\x1b[2A\x1b[0JBar\n\x1b[1A\x1b[0JBaz\nBar\n\x1b[1A\x1b[0JFoobar\n" ) + + +def test_remove_format_does_not_change_shared_decoration_state() -> None: + entered = Event() + continue_run = Event() + + class SlowFormatter(Formatter): + def format(self, message: str) -> str: + entered.set() + continue_run.wait(timeout=2) + return super().format(message) + + stream = StringIO() + formatter = SlowFormatter(decorated=True) + output = StreamOutput(stream, decorated=True, formatter=formatter) + section = output.section() + result: dict[str, object] = {} + + def worker() -> None: + result["text"] = section.remove_format("x") + + thread = Thread(target=worker) + thread.start() + assert entered.wait(timeout=2) + + result["output_during"] = output.is_decorated() + result["section_during"] = section.is_decorated() + + continue_run.set() + thread.join(timeout=2) + + assert not thread.is_alive() + assert result == { + "text": "x", + "output_during": True, + "section_during": True, + }