From 53a4d04877fe6bd889ae33b045c430fce61e6aba Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 03:13:08 -0800 Subject: [PATCH 1/9] Whitespace and README typo cleanups --- src/xcbproto/README.md | 5 +-- src/xcbproto/gen_xcb_to_py.py | 63 ++++++++--------------------------- 2 files changed, 16 insertions(+), 52 deletions(-) diff --git a/src/xcbproto/README.md b/src/xcbproto/README.md index 351e0b97..797f8f87 100644 --- a/src/xcbproto/README.md +++ b/src/xcbproto/README.md @@ -19,8 +19,9 @@ The generator is a **maintainer tool**, not part of the normal build process: ``` 3. The generator reads the XML protocol definitions and emits `xcbgen.py`. -4. The maintainer ensures that this worked correctly, and moves the file to `src/mss/linux/xcbgen.py`. -5. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. +4. The maintainer ensures that this worked correctly, and runs `ruff check --fix` and `ruff format` on the emitted file. +5. The maintainer moves the file to `src/mss/linux/xcbgen.py`. +6. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. ## Protocol XML Files diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index 1e41acc8..ec69313a 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -364,11 +364,7 @@ def _parse_child(self, child: ET._Element) -> None: ) ) case "fd": - self.members.append( - FdField( - name=child.attrib["name"], - ) - ) + self.members.append(FdField(name=child.attrib["name"])) case "pad": self.members.append(parse_pad(child)) case "list": @@ -377,10 +373,7 @@ def _parse_child(self, child: ET._Element) -> None: return case _: msg = f"Unsupported member {child.tag} in {self.protocol}:{self.name}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) def _parse(self) -> None: with parsing_note(f"while parsing {self.protocol}:{self.name}", self._element): @@ -442,10 +435,7 @@ def parse_enum(protocol: str, elem: ET.Element) -> EnumDefn: items.append(EnumerationItem(item.attrib["name"], 1 << int(bit.text, 0))) else: msg = f"Unsupported enum item in {protocol}:{name}:{item}" - raise GenerationError( - msg, - element=item, - ) + raise GenerationError(msg, element=item) return EnumDefn(protocol, name, items) @@ -465,10 +455,7 @@ def parse_xidunion(protocol: str, elem: ET.Element) -> XidUnionDefn: continue else: msg = f"Unsupported xidunion member {child.tag} in {protocol}:{name}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) if not members: msg = f"xidunion {protocol}:{name} must include at least one type" raise GenerationError(msg, element=elem) @@ -508,52 +495,34 @@ def parse_protocol(path: Path) -> ProtocolModule: # noqa: PLR0912, PLR0915 case "enum": if child.attrib["name"] in module.enums: msg = f"Duplicate enum {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.enums[child.attrib["name"]] = parse_enum(protocol, child) case "typedef": if child.attrib["newname"] in module.types: msg = f"Duplicate type {child.attrib['newname']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["newname"]] = TypedefDefn( protocol, child.attrib["newname"], child.attrib["oldname"] ) case "xidtype": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = XidTypeDefn(protocol, child.attrib["name"]) case "xidunion": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = parse_xidunion(protocol, child) case "struct": if child.attrib["name"] in module.types: msg = f"Duplicate type {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.types[child.attrib["name"]] = StructDefn(protocol, child.attrib["name"], child) case "request": if child.attrib["name"] in module.requests: msg = f"Duplicate request {child.attrib['name']} in protocol {protocol}" - raise GenerationError( - msg, - element=child, - ) + raise GenerationError(msg, element=child) module.requests[child.attrib["name"]] = RequestDefn(protocol, child.attrib["name"], child) case "import": # There's actually some leeway in how the imports are resolved. We only require the imported @@ -964,9 +933,7 @@ def emit_structlike( elif isinstance(member, Field): if seen_list: msg = f"Structure {entry.protocol}:{entry.name} has fields after lists, which is unsupported" - raise GenerationError( - msg, - ) + raise GenerationError(msg) name = format_field_name(member) type_expr = python_type_for(registry, entry.protocol, member.type) field_entries.append((name, type_expr)) @@ -975,9 +942,7 @@ def emit_structlike( continue if member.align is not None or member.bytes is None: msg = f"Struct {entry.protocol}:{entry.name} uses align-based padding, which is unsupported" - raise GenerationError( - msg, - ) + raise GenerationError(msg) name = f"pad{pad_index}" pad_index += 1 field_entries.append((name, f"c_uint8 * {member.bytes}")) @@ -1091,9 +1056,7 @@ def emit_types( rv += emit_reply(writer, registry, typ) else: msg = f"Unsupported type kind {type(typ).__name__} for {typ.protocol}:{typ.name}" - raise GenerationError( - msg, - ) + raise GenerationError(msg) return rv From 66bae937b7d3bc33d316875806fe80dc5742958b Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 03:22:24 -0800 Subject: [PATCH 2/9] Make XIDs and other primitive types comparable Otherwise, you need things like `this_visual_id.value == that_visual_id.value`, and get unexpected negatives for just `this_visual_id == that_visual_id`. This can be a potential source of bugs. --- src/mss/linux/base.py | 8 ++++---- src/mss/linux/xcbgen.py | 24 +++++++++++++++++++++--- src/mss/linux/xcbhelpers.py | 8 +++++++- src/mss/linux/xshmgetimage.py | 6 +++--- src/xcbproto/gen_xcb_to_py.py | 15 +++++++++++++-- 5 files changed, 48 insertions(+), 13 deletions(-) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index f9921d53..c4d84f12 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -66,7 +66,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 # we'll have to ask the server for its depth and visual. assert self.root == self.drawable # noqa: S101 self.drawable_depth = self.pref_screen.root_depth - self.drawable_visual_id = self.pref_screen.root_visual.value + self.drawable_visual_id = self.pref_screen.root_visual # Server image byte order if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: msg = "Only X11 servers using LSB-First images are supported." @@ -103,7 +103,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 msg = "Internal error: drawable's depth not found in screen's supported depths" raise ScreenShotError(msg) for visual_info in xcb.depth_visuals(xcb_depth): - if visual_info.visual_id.value == self.drawable_visual_id: + if visual_info.visual_id == self.drawable_visual_id: break else: msg = "Internal error: drawable's visual not found in screen's supported visuals" @@ -277,11 +277,11 @@ def _grab_impl_xgetimage(self, monitor: Monitor, /) -> ScreenShot: # Copy this into a new bytearray, so that it will persist after we clear the image structure. img_data = bytearray(img_data_arr) - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + if img_reply.depth != self.drawable_depth or img_reply.visual != self.drawable_visual_id: # This should never happen; a window can't change its visual. msg = ( "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id.value)}, " f"got {img_reply.depth},{hex(img_reply.visual.value)}" ) raise ScreenShotError(msg) diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index 6fba72b3..f3a36782 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -96,7 +96,13 @@ class Drawable(XID): class Keycode(c_uint8): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Keycode): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class Format(Structure): @@ -117,7 +123,13 @@ class Colormap(XID): class Visualid(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Visualid): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class Visualtype(Structure): @@ -261,7 +273,13 @@ class RandrQueryVersionReply(Structure): class Timestamp(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, Timestamp): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class RandrCrtc(XID): diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index f387c79e..714d7191 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -100,7 +100,13 @@ class Connection(Structure): class XID(c_uint32): - pass + def __eq__(self, other: object) -> bool: + if isinstance(other, XID): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) class GenericErrorStructure(Structure): diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py index cdd882c3..6b5a4927 100644 --- a/src/mss/linux/xshmgetimage.py +++ b/src/mss/linux/xshmgetimage.py @@ -166,7 +166,7 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: img_reply = xcb.shm_get_image( self.conn, - self.drawable.value, + self.drawable, monitor["left"], monitor["top"], monitor["width"], @@ -177,11 +177,11 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: 0, ) - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + if img_reply.depth != self.drawable_depth or img_reply.visual != self.drawable_visual_id: # This should never happen; a window can't change its visual. msg = ( "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id.value)}, " f"got {img_reply.depth},{hex(img_reply.visual.value)}" ) raise ScreenShotError(msg) diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index ec69313a..a18a8094 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -37,6 +37,7 @@ from contextlib import contextmanager from dataclasses import dataclass, field from pathlib import Path +from textwrap import dedent, indent from typing import TYPE_CHECKING from lxml import etree as ET # noqa: N812 (traditional name) @@ -703,7 +704,7 @@ def __init__(self, fh: io.TextIOBase) -> None: def write(self, line: str = "") -> None: if line: - self._fh.write(" " * self._indent + line + "\n") + self._fh.write(indent(line, " " * self._indent) + "\n") else: self._fh.write("\n") @@ -899,7 +900,17 @@ def emit_typedef(writer: CodeWriter, registry: ProtocolRegistry, entry: TypedefD writer.write() writer.write(f"class {class_name}({base}):") with writer.indent(): - writer.write("pass") + writer.write( + dedent(f""" + def __eq__(self, other: object) -> bool: + if isinstance(other, {class_name}): + return self.value == other.value + return NotImplemented + + def __hash__(self) -> int: + return hash(self.value) + """) + ) # Struct-like types From e7b9c2d54b1ea2c1807f4bc9a532cb2e2ec384b4 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 03:36:39 -0800 Subject: [PATCH 3/9] Add InternAtom support --- src/mss/linux/xcb.py | 165 ++++++++++++++++++++++++++++++++-- src/mss/linux/xcbhelpers.py | 21 +++++ src/tests/test_xcb.py | 81 ++++++++++++++++- src/xcbproto/gen_xcb_to_py.py | 3 +- 4 files changed, 262 insertions(+), 8 deletions(-) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index f599e2a7..7c3991d8 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -1,11 +1,13 @@ from __future__ import annotations -from ctypes import _Pointer, c_int +import contextlib +from ctypes import _Pointer, addressof, c_int +from typing import Literal, overload from . import xcbgen # We import these just so they're re-exported to our users. -# ruff: noqa: F401, TC001 +# ruff: noqa: F401 from .xcbgen import ( RANDR_MAJOR_VERSION, RANDR_MINOR_VERSION, @@ -109,7 +111,7 @@ ) # These are also here to re-export. -from .xcbhelpers import LIB, XID, Connection, QueryExtensionReply, XcbExtension, XError +from .xcbhelpers import LIB, XID, Connection, InternAtomReply, QueryExtensionReply, XcbExtension, XError XCB_CONN_ERROR = 1 XCB_CONN_CLOSED_EXT_NOTSUPPORTED = 2 @@ -134,6 +136,147 @@ #### High-level XCB function wrappers +# XCB_NONE is the universal null resource or null atom parameter value for many core X requests +XCB_NONE = Atom(0) + +# Some atoms are defined by the spec, to avoid apps having to look them up. It's fine to look them up anyway. +_PREDEFINED_ATOMS = { + "PRIMARY": Atom(1), + "SECONDARY": Atom(2), + "ARC": Atom(3), + "ATOM": Atom(4), + "BITMAP": Atom(5), + "CARDINAL": Atom(6), + "COLORMAP": Atom(7), + "CURSOR": Atom(8), + "CUT_BUFFER0": Atom(9), + "CUT_BUFFER1": Atom(10), + "CUT_BUFFER2": Atom(11), + "CUT_BUFFER3": Atom(12), + "CUT_BUFFER4": Atom(13), + "CUT_BUFFER5": Atom(14), + "CUT_BUFFER6": Atom(15), + "CUT_BUFFER7": Atom(16), + "DRAWABLE": Atom(17), + "FONT": Atom(18), + "INTEGER": Atom(19), + "PIXMAP": Atom(20), + "POINT": Atom(21), + "RECTANGLE": Atom(22), + "RESOURCE_MANAGER": Atom(23), + "RGB_COLOR_MAP": Atom(24), + "RGB_BEST_MAP": Atom(25), + "RGB_BLUE_MAP": Atom(26), + "RGB_DEFAULT_MAP": Atom(27), + "RGB_GRAY_MAP": Atom(28), + "RGB_GREEN_MAP": Atom(29), + "RGB_RED_MAP": Atom(30), + "STRING": Atom(31), + "VISUALID": Atom(32), + "WINDOW": Atom(33), + "WM_COMMAND": Atom(34), + "WM_HINTS": Atom(35), + "WM_CLIENT_MACHINE": Atom(36), + "WM_ICON_NAME": Atom(37), + "WM_ICON_SIZE": Atom(38), + "WM_NAME": Atom(39), + "WM_NORMAL_HINTS": Atom(40), + "WM_SIZE_HINTS": Atom(41), + "WM_ZOOM_HINTS": Atom(42), + "MIN_SPACE": Atom(43), + "NORM_SPACE": Atom(44), + "MAX_SPACE": Atom(45), + "END_SPACE": Atom(46), + "SUPERSCRIPT_X": Atom(47), + "SUPERSCRIPT_Y": Atom(48), + "SUBSCRIPT_X": Atom(49), + "SUBSCRIPT_Y": Atom(50), + "UNDERLINE_POSITION": Atom(51), + "UNDERLINE_THICKNESS": Atom(52), + "STRIKEOUT_ASCENT": Atom(53), + "STRIKEOUT_DESCENT": Atom(54), + "ITALIC_ANGLE": Atom(55), + "X_HEIGHT": Atom(56), + "QUAD_WIDTH": Atom(57), + "WEIGHT": Atom(58), + "POINT_SIZE": Atom(59), + "RESOLUTION": Atom(60), + "COPYRIGHT": Atom(61), + "NOTICE": Atom(62), + "FONT_NAME": Atom(63), + "FAMILY_NAME": Atom(64), + "FULL_NAME": Atom(65), + "CAP_HEIGHT": Atom(66), + "WM_CLASS": Atom(67), + "WM_TRANSIENT_FOR": Atom(68), +} + +# The atom cache needs to be per-connection. Rather than keying on a (connection, name) tuple, we use a two-level +# cache keyed by the integer address of the underlying XCB connection (see ctypes.addressof in intern_atom). +_ATOM_CACHE: dict[int, dict[str, Atom]] = {} + + +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: Literal[False] = False, +) -> Atom: ... +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: Literal[True] = True, +) -> Atom | None: ... +@overload +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: bool, +) -> Atom | None: ... + + +def intern_atom( + xcb_conn: Connection | _Pointer[Connection], + name: str, + *, + only_if_exists: bool = False, +) -> Atom | None: + if name in _PREDEFINED_ATOMS: + return _PREDEFINED_ATOMS[name] + + if isinstance(xcb_conn, _Pointer): + # Dereference the pointer before using the cache. + xcb_conn = xcb_conn.contents + cache_key = addressof(xcb_conn) + if cache_key not in _ATOM_CACHE: + # This can happen if the connection was closed and its cache cleared, but some code still has a reference to + # the connection object. We could re-create the cache entry, but it's safer to just fail instead of silently + # allowing lookups to succeed when they shouldn't. + msg = "Connection to X server is closed" + raise XError(msg) + if name in _ATOM_CACHE[cache_key]: + return _ATOM_CACHE[cache_key][name] + + # Atom names are required to be Latin-1, per the X protocol spec, although anything that's not in the XPCS (a + # subset of ASCII) is vendor-defined. + name_encoded = name.encode("latin_1", errors="strict") + cookie = LIB.xcb.xcb_intern_atom(xcb_conn, 1 if only_if_exists else 0, len(name_encoded), name_encoded) + atom_as_xid = cookie.reply(xcb_conn).atom + if atom_as_xid.value == 0: + if not only_if_exists: + # This shouldn't be possible. We at least need to have a path for the type-checker to be happy, though. + msg = f"X server failed to intern atom '{name}'" + raise XError(msg) + # We don't do negative caching, since any app might intern the atom at any time. + return None + atom = Atom(atom_as_xid.value) + _ATOM_CACHE[cache_key][name] = atom + return atom + def get_extension_data( xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] @@ -210,13 +353,23 @@ def connect(display: str | bytes | None = None) -> tuple[Connection, int]: prefetch_extension_data(conn_p, LIB.shm_id) prefetch_extension_data(conn_p, LIB.xfixes_id) + _ATOM_CACHE[addressof(conn_p.contents)] = {} + return conn_p.contents, pref_screen_num.value -def disconnect(conn: Connection) -> None: - conn_err = LIB.xcb.xcb_connection_has_error(conn) +def disconnect(xcb_conn: Connection | _Pointer[Connection]) -> None: + if isinstance(xcb_conn, _Pointer): + # Dereference the pointer before using the cache. + xcb_conn = xcb_conn.contents + + # The cache might already be cleared if the connection had an error, or if disconnect was called multiple times. + with contextlib.suppress(KeyError): + del _ATOM_CACHE[addressof(xcb_conn)] + + conn_err = LIB.xcb.xcb_connection_has_error(xcb_conn) # XCB won't free its connection structures until we disconnect, even in the event of an error. - LIB.xcb.xcb_disconnect(conn) + LIB.xcb.xcb_disconnect(xcb_conn) if conn_err != 0: msg = "Connection to X server closed: " conn_errmsg = XCB_CONN_ERRMSG.get(conn_err) diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index 714d7191..de5fafd0 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -10,6 +10,7 @@ Structure, _Pointer, addressof, + c_char, c_char_p, c_int, c_uint, @@ -125,6 +126,18 @@ class GenericErrorStructure(Structure): ) +# We special-case InternAtom for convenience. +class InternAtomReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + # This is actually an Atom, not a raw XID, but we handle the type conversion in intern_atom. + ("atom", XID), + ) + + #### Request / response handling # # The following recaps a lot of what's in the xcb-requests(3) man page, with a few notes about what we're doing in @@ -468,6 +481,14 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.xcb.xcb_disconnect.argtypes = [POINTER(Connection)] self.xcb.xcb_disconnect.restype = None + # We special-case InternAtom for convenience. + initialize_xcb_typed_func( + LIB.xcb, + "xcb_intern_atom", + [POINTER(Connection), c_uint8, c_uint16, POINTER(c_char)], + InternAtomReply, + ) + libxcb_randr_so = ctypes.util.find_library("xcb-randr") if libxcb_randr_so is None: msg = "Library libxcb-randr.so not found" diff --git a/src/tests/test_xcb.py b/src/tests/test_xcb.py index 24903ace..be18ccdd 100644 --- a/src/tests/test_xcb.py +++ b/src/tests/test_xcb.py @@ -12,9 +12,13 @@ sizeof, ) from types import SimpleNamespace -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable from unittest.mock import Mock from weakref import finalize +import platform + +if TYPE_CHECKING: + from collections.abc import Generator import pytest @@ -28,6 +32,10 @@ ) +if platform.system().lower() != "linux": + pytestmark = pytest.mark.skip + + def _force_gc() -> None: gc.collect() gc.collect() @@ -224,6 +232,77 @@ def visual_validation_env(monkeypatch: pytest.MonkeyPatch) -> _VisualValidationH return _VisualValidationHarness(monkeypatch) +#### intern_atom tests + + +class TestInternAtom: + """Tests for xcb.intern_atom and the _ATOM_CACHE mechanism.""" + + @pytest.fixture(autouse=True) + def setup_intern_atom(self) -> Generator[None, None, None]: + self.conn, _ = xcb.connect() + yield + xcb.disconnect(self.conn) + + def _mock_xcb_intern_atom(self, monkeypatch: pytest.MonkeyPatch, atom_value: int) -> Mock: + """Patch LIB.xcb.xcb_intern_atom to return a fake reply with the given atom value.""" + fake_reply = SimpleNamespace(atom=SimpleNamespace(value=atom_value)) + fake_cookie = Mock() + fake_cookie.reply.return_value = fake_reply + mock = Mock(return_value=fake_cookie) + monkeypatch.setattr(xcb.LIB.xcb, "xcb_intern_atom", mock) + return mock + + def test_predefined_atom_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 0) + atom = xcb.intern_atom(self.conn, "PRIMARY") + assert atom == xcb.Atom(1) + mock.assert_not_called() + + def test_cache_miss_calls_xcb_and_caches_result(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 100) + cache_key = addressof(self.conn) + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") + assert atom == xcb.Atom(100) + mock.assert_called_once() + assert xcb._ATOM_CACHE[cache_key]["_NET_WM_NAME"] == xcb.Atom(100) + + def test_cache_hit_skips_xcb(self, monkeypatch: pytest.MonkeyPatch) -> None: + mock = self._mock_xcb_intern_atom(monkeypatch, 0) + # xcb.connect() in setup_intern_atom guarantees a cache entry for self.conn. + xcb._ATOM_CACHE[addressof(self.conn)]["_NET_WM_NAME"] = xcb.Atom(100) + atom = xcb.intern_atom(self.conn, "_NET_WM_NAME") + assert atom == xcb.Atom(100) + mock.assert_not_called() + + def test_only_if_exists_returns_none_when_missing(self) -> None: + atom = xcb.intern_atom(self.conn, "_MSS_TEST_NONEXISTENT_ATOM_12345", only_if_exists=True) + assert atom is None + + def test_raises_when_missing_and_not_only_if_exists(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Exercises the "shouldn't be possible" code path where the server returns 0 with only_if_exists=False. + self._mock_xcb_intern_atom(monkeypatch, 0) + with pytest.raises(xcb.XError, match="X server failed to intern atom"): + xcb.intern_atom(self.conn, "_NET_NONEXISTENT") + + def test_pointer_connection_uses_correct_cache_key(self) -> None: + atom = xcb.intern_atom(pointer(self.conn), "_NET_WM_NAME") + assert atom is not None + assert addressof(self.conn) in xcb._ATOM_CACHE + + +def test_atom_cache_lifecycle() -> None: + """connect() initializes and disconnect() clears the per-connection atom cache entry.""" + before = set(xcb._ATOM_CACHE) + conn, _ = xcb.connect() + cache_key = addressof(conn) + assert cache_key in xcb._ATOM_CACHE + assert xcb._ATOM_CACHE[cache_key] == {} + xcb.disconnect(conn) + assert cache_key not in xcb._ATOM_CACHE + assert set(xcb._ATOM_CACHE) == before + + def test_xgetimage_visual_validation_accepts_default_setup(visual_validation_env: _VisualValidationHarness) -> None: visual_validation_env.reset() mss_instance = xgetimage.MSS() diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index a18a8094..f504af66 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -59,7 +59,8 @@ "GetGeometry", "GetImage", "GetProperty", - # We handle InternAtom specially. + # We handle InternAtom specially in xcb.py: it's the only request we use that includes a list (the name) in + # the request. Rather than writing the code for autogeneration, we just open-code that one. "NoOperation", ], "randr": [ From 403c0e7452481ff71533054c939495a54d163802 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 03:38:16 -0800 Subject: [PATCH 4/9] Add EDID parser This will be needed to get the monitor's human-friendly name under Linux: XRandR will give us the EDID block, but we need to parse it. --- src/mss/tools.py | 130 ++++++++++++++++++++++++++++++++++++++++ src/tests/test_tools.py | 116 ++++++++++++++++++++++++++++++++++- 2 files changed, 245 insertions(+), 1 deletion(-) diff --git a/src/mss/tools.py b/src/mss/tools.py index 7f053665..9e4b7e9f 100644 --- a/src/mss/tools.py +++ b/src/mss/tools.py @@ -12,6 +12,136 @@ from pathlib import Path +_EDID_BLOCK_LEN = 128 +_EDID_SERIAL_NUMBER_NOT_SET = 0 # The serial number field is unused +_EDID_MANUFACTURE_WEEK_YEAR_IS_MODEL = 0xFF # The year field is for the model year, not the year of manufacture. +_EDID_MANUFACTURE_WEEK_UNKNOWN = 0 # Only the year of manufacture, not the week, is known. +_EDID_YEAR_BASE = 1990 # The starting year for EDID years +_EDID_DESCR_OFFSETS = [0x48, 0x5A, 0x6C] # Display descriptor definition locations in the base block +_EDID_DESCR_LEN = 18 +_EDID_DESCR_ZERO_LOCS = [0, 1, 2, 4] # Locations that must be 0 to mark a display descriptor definition +_EDID_DESCR_TAG_LOC = 3 # Location of display descriptor tag +_EDID_DESCR_TAG_SN = 0xFF # Descriptor is a string serial number +_EDID_DESCR_TAG_NAME = 0xFC # Descriptor is a string model name +_EDID_DESCR_STR_LOC = slice(5, 18) # Location of string in a display descriptor + + +def parse_edid(edid_data: bytes) -> dict: + """Parse a monitor's EDID block. + + Many fields are currently ignored, but may be added in the future. + + If the EDID block cannot be parsed, this returns an empty dict. + + The dict defines the following fields. Any of these may be + missing, if the EDID block does not define them. + + - id_legacy (str): The legacy monitor ID, used in a number of + APIs. This is simply f"{manufacturer}{product_code:04X}". + Those subfields are not part of the returned dict, but are + nominally described as: + + - manufacturer (str): A three-letter, all-uppercase code + specifying the manufacturer's legacy PnP ID. The registry is + managed by UEFI forum. + - product_code (int): A 16-bit product code. This is typically + displayed as four hex digits if rendered to a string. + + - serial_number (str | int): Serial number of the monitor. EDID + block may provide this as an int, string, or both; the string + version is preferred. + - manufacture_week (int): The week, 1-54, of manufacture. This + may not be populated, even if the year is. (The way the weeks + are numbered is up to the manufacturer.) + - manufacture_year (int): The year, 1990 or later, of manufacture. + - model_year (int): The year, 1990 or later, that the model was + released. This is used if the manufacturer doesn't want to + update their EDID block each year; the manufacture_year field is + more common. + - display_name (str): The monitor's model. This is the preferred + value for display. If this field is not present, then id_legacy + is a distant second. + + Currently, the serial_number and name fields are always in ASCII. + This function doesn't currently try to implement the + internationalization extensions defined in the VESA LS-EXT + standard. However, we may in the future. + + We also don't currently inspect the extension blocks. The name + and serial number can be in CTA-861 extension blocks; I'll need to + see how common that is. + """ + # See also https://glenwing.github.io/docs/ for a lot of the relevant specs. + + if len(edid_data) < _EDID_BLOCK_LEN: + # Too short + return {} + + # Get the basic identification information from the start of the + # header. This has been part of EDID for a very long time. + block0 = edid_data[:_EDID_BLOCK_LEN] + if sum(block0) % 256 != 0: + # Checksum failure + return {} + + ( + header, + id_manufacturer_msb, + id_manufacturer_lsb, + id_product_code, + id_serial_number, + manufacture_week, + manufacture_year, + _edid_version, + _edid_revision, + _ext_count, + ) = struct.unpack("<8s2BHIBBBB106xBx", block0) + + if header != b"\x00\xff\xff\xff\xff\xff\xff\x00": + # Header incorrect + return {} + id_manufacturer_packed = id_manufacturer_msb << 8 | id_manufacturer_lsb + id_manufacturer = ( + chr(((id_manufacturer_packed >> 10) % 32) + 64) + + chr(((id_manufacturer_packed >> 5) % 32) + 64) + + chr((id_manufacturer_packed % 32) + 64) + ) + rv: dict[str, int | str] = { + "id_legacy": f"{id_manufacturer}{id_product_code:04X}", + } + if id_serial_number != _EDID_SERIAL_NUMBER_NOT_SET: + rv["serial_number"] = id_serial_number + if manufacture_week == _EDID_MANUFACTURE_WEEK_YEAR_IS_MODEL: + rv["model_year"] = manufacture_year + _EDID_YEAR_BASE + else: + if manufacture_week != _EDID_MANUFACTURE_WEEK_UNKNOWN: + rv["manufacture_week"] = manufacture_week + rv["manufacture_year"] = manufacture_year + _EDID_YEAR_BASE + + # Read the display descriptor definitions, which can have more useful information. + for descr_offset in _EDID_DESCR_OFFSETS: + descr = block0[descr_offset : descr_offset + _EDID_DESCR_LEN] + if any(descr[field_offset] != 0 for field_offset in _EDID_DESCR_ZERO_LOCS): + # Not a display descriptor definition + continue + # Check the tag in descr[3]. + # These strings are in ASCII, optionally terminated by \x0A then right-padded with \x20. In case a + # manufacturer got it a little wrong, we ignore everything after \x0A, and we also strip trailing \x20. (The + # spec requires the \x0A, but some manufacturers don't follow that.) + if descr[_EDID_DESCR_TAG_LOC] == _EDID_DESCR_TAG_SN: # Serial number + sn = descr[_EDID_DESCR_STR_LOC] + sn, _, _ = sn.partition(b"\x0a") + sn = sn.rstrip(b" ") + rv["serial_number"] = sn.decode("ascii", errors="replace") + elif descr[_EDID_DESCR_TAG_LOC] == _EDID_DESCR_TAG_NAME: # Name + name = descr[_EDID_DESCR_STR_LOC] + name, _, _ = name.partition(b"\x0a") + name = name.rstrip(b" ") + rv["display_name"] = name.decode("ascii", errors="replace") + + return rv + + def to_png(data: bytes, size: tuple[int, int], /, *, level: int = 6, output: Path | str | None = None) -> bytes | None: """Dump data to a PNG file. If `output` is `None`, create no file but return the whole PNG data. diff --git a/src/tests/test_tools.py b/src/tests/test_tools.py index 78feea73..7183b2d6 100644 --- a/src/tests/test_tools.py +++ b/src/tests/test_tools.py @@ -5,12 +5,13 @@ from __future__ import annotations import io +import struct from pathlib import Path from typing import TYPE_CHECKING import pytest -from mss.tools import to_png +from mss.tools import parse_edid, to_png if TYPE_CHECKING: from collections.abc import Callable @@ -57,3 +58,116 @@ def test_output_raw_bytes() -> None: raw = to_png(data, (WIDTH, HEIGHT)) assert isinstance(raw, bytes) assert_is_valid_png(raw=raw) + + +# --------------------------------------------------------------------------- +# Helpers and tests for parse_edid +# --------------------------------------------------------------------------- + + +def _make_edid( # noqa: PLR0913 + *, + manufacturer: str = "TST", + product_code: int = 0x1234, + serial_number: int = 0, + manufacture_week: int = 0, + manufacture_year: int = 30, + descriptors: list[tuple[int, int, str]] | None = None, + bad_checksum: bool = False, +) -> bytes: + """Build a minimal 128-byte EDID block.""" + data = bytearray(128) + data[0:8] = b"\x00\xff\xff\xff\xff\xff\xff\x00" + packed = ( + ((ord(manufacturer[0]) - ord("@")) << 10) + | ((ord(manufacturer[1]) - ord("@")) << 5) + | (ord(manufacturer[2]) - ord("@")) + ) + data[8] = (packed >> 8) & 0xFF + data[9] = packed & 0xFF + struct.pack_into(" None: + assert parse_edid(b"") == {} + assert parse_edid(b"\x00" * 64) == {} + assert parse_edid(b"\x00" * 127) == {} + + +def test_parse_edid_invalid_checksum() -> None: + assert parse_edid(_make_edid(bad_checksum=True)) == {} + + +def test_parse_edid_invalid_header() -> None: + data = bytearray(_make_edid()) + data[0] = 0x01 # corrupt the header magic + data[127] = (-sum(data[:127])) % 256 # recompute checksum + assert parse_edid(bytes(data)) == {} + + +def test_parse_edid_basic() -> None: + result = parse_edid(_make_edid(manufacturer="TST", product_code=0x1234)) + assert result["id_legacy"] == "TST1234" + + +def test_parse_edid_manufacture_year_only() -> None: + result = parse_edid(_make_edid(manufacture_week=0, manufacture_year=30)) + assert result["manufacture_year"] == 2020 + assert "manufacture_week" not in result + assert "model_year" not in result + + +def test_parse_edid_manufacture_week_and_year() -> None: + result = parse_edid(_make_edid(manufacture_week=10, manufacture_year=30)) + assert result["manufacture_year"] == 2020 + assert result["manufacture_week"] == 10 + assert "model_year" not in result + + +def test_parse_edid_model_year() -> None: + result = parse_edid(_make_edid(manufacture_week=0xFF, manufacture_year=31)) + assert result["model_year"] == 2021 + assert "manufacture_year" not in result + assert "manufacture_week" not in result + + +def test_parse_edid_serial_number_integer() -> None: + result = parse_edid(_make_edid(serial_number=12345)) + assert result["serial_number"] == 12345 + + +def test_parse_edid_serial_number_not_set() -> None: + result = parse_edid(_make_edid(serial_number=0)) + assert "serial_number" not in result + + +def test_parse_edid_descriptor_serial_number() -> None: + result = parse_edid(_make_edid(descriptors=[(0x48, 0xFF, "SN123456")])) + assert result["serial_number"] == "SN123456" + + +def test_parse_edid_descriptor_display_name() -> None: + result = parse_edid(_make_edid(descriptors=[(0x5A, 0xFC, "Test Monitor")])) + assert result["display_name"] == "Test Monitor" + + +def test_parse_edid_descriptor_string_serial_overrides_integer() -> None: + result = parse_edid(_make_edid(serial_number=99, descriptors=[(0x48, 0xFF, "STRSERIAL")])) + assert result["serial_number"] == "STRSERIAL" From 861926a2a64f961212f9230f47d5ba3a4e2a2330 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 03:39:44 -0800 Subject: [PATCH 5/9] Add details about monitors on Linux See BoboTiG/python-mss PR #469 and issue #153. There are no plans to add similar code to the legacy Xlib backend. --- src/mss/linux/base.py | 206 +++++++++++++++++++++++++++++----- src/mss/linux/xcb.py | 18 +++ src/mss/linux/xcbgen.py | 204 ++++++++++++++++++++++++++++++--- src/xcbproto/gen_xcb_to_py.py | 4 + 4 files changed, 392 insertions(+), 40 deletions(-) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index c4d84f12..2236c961 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -1,14 +1,18 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any +from urllib.parse import urlencode from mss.base import MSSBase from mss.exception import ScreenShotError +from mss.tools import parse_edid from . import xcb from .xcb import LIB if TYPE_CHECKING: + from ctypes import Array + from mss.models import Monitor from mss.screenshot import ScreenShot @@ -140,8 +144,37 @@ def _monitors_impl(self) -> None: msg = "Cannot identify monitors while the connection is closed" raise ScreenShotError(msg) - # The first entry is the whole X11 screen that the root is on. That's the one that covers all the - # monitors. + self._append_root_monitor() + + randr_version = self._randr_get_version() + if randr_version is None or randr_version < (1, 2): + return + + # XRandR terminology (very abridged, but enough for this code): + # - X screen / framebuffer: the overall drawable area for this root. + # - CRTC: a display controller that scans out a rectangular region of the X screen. A CRTC with zero + # outputs is inactive. A CRTC may drive multiple outputs in clone/mirroring mode. + # - Output: a physical connector (e.g. "HDMI-1", "DP-1"). The RandR "connection" state (connected vs + # disconnected) is separate from whether the output is currently driven by a CRTC. + # - Monitor (RandR 1.5+): a logical rectangle presented to clients. Monitors may be client-defined (useful + # for tiled displays) and are the closest match to what MSS wants. + # + # This implementation prefers RandR 1.5+ Monitors when available; otherwise it falls back to enumerating + # active CRTCs. + + primary_output = self._randr_get_primary_output(randr_version) + edid_atom = self._randr_get_edid_atom() + + if randr_version >= (1, 5): + self._monitors_from_randr_monitors(primary_output, edid_atom) + else: + self._monitors_from_randr_crtcs(randr_version, primary_output, edid_atom) + + def _append_root_monitor(self) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + root_geom = xcb.get_geometry(self.conn, self.root) self._monitors.append( { @@ -152,47 +185,168 @@ def _monitors_impl(self) -> None: } ) - # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by - # Xrandr. We don't presently try to work with Xinerama. So, we're going to check the different outputs, - # according to Xrandr. If that fails, we'll just leave the one root covering everything. + def _randr_get_version(self) -> tuple[int, int] | None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) - # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in - # __init__. randr_ext_data = xcb.get_extension_data(self.conn, LIB.randr_id) if not randr_ext_data.present: - return + return None - # We ask the server to give us anything up to the version we support (i.e., what we expect the reply - # structs to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok - # with that, but we also use a faster path if the server implements at least 1.3. randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) - randr_version = (randr_version_data.major_version, randr_version_data.minor_version) - if randr_version < (1, 2): - return + return (randr_version_data.major_version, randr_version_data.minor_version) + + def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.RandrOutput: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + if randr_version >= (1, 3): + primary_output_data = xcb.randr_get_output_primary(self.conn, self.drawable) + return primary_output_data.output + return xcb.RandrOutput(0) + + def _randr_get_edid_atom(self) -> xcb.Atom | None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + edid_atom = xcb.intern_atom(self.conn, "EDID", only_if_exists=True) + if edid_atom is not None: + return edid_atom + + # Formerly, "EDID" was known as "EdidData". I don't know when it changed. + return xcb.intern_atom(self.conn, "EdidData", only_if_exists=True) + + def _randr_output_ids( + self, + output: xcb.RandrOutput, + timestamp: xcb.Timestamp, + edid_atom: xcb.Atom | None, + /, + ) -> dict[str, Any]: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + output_info = xcb.randr_get_output_info(self.conn, output, timestamp) + if output_info.status != 0: + msg = "Display configuration changed while detecting monitors." + raise ScreenShotError(msg) + + rv: dict[str, Any] = {} + + output_name_arr = xcb.randr_get_output_info_name(output_info) + rv["output"] = bytes(output_name_arr).decode("utf_8", errors="replace") + + if edid_atom is not None: + edid_prop = xcb.randr_get_output_property( + self.conn, # connection + output, # output + edid_atom, # property + xcb.XCB_NONE, # property type: Any + 0, # long-offset: 0 + 1024, # long-length: in 4-byte units; 4k is plenty for an EDID + 0, # delete: false + 0, # pending: false + ) + if edid_prop.type_.value != 0: + edid_block = bytes(xcb.randr_get_output_property_data(edid_prop)) + edid_data = parse_edid(edid_block) + if "display_name" in edid_data: + rv["name"] = edid_data["display_name"] + + edid_params: dict[str, str] = {} + if "id_legacy" in edid_data: + edid_params["model"] = edid_data["id_legacy"] + if "serial_number" in edid_data: + edid_params["serial"] = str(edid_data["serial_number"]) + if "manufacture_year" in edid_data: + if "manufacture_week" in edid_data: + edid_params["mfr_date"] = ( + f"{edid_data['manufacture_year']:04d}W{edid_data['manufacture_week']:02d}" + ) + else: + edid_params["mfr_date"] = f"{edid_data['manufacture_year']:04d}" + if "model_year" in edid_data: + edid_params["model_year"] = f"{edid_data['model_year']:04d}" + if edid_params: + rv["unique_id"] = urlencode(edid_params) + + return rv + + @staticmethod + def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput, /) -> xcb.RandrOutput: + if len(outputs) == 0: + msg = "No RandR outputs available" + raise ScreenShotError(msg) + if any(o == primary_output for o in outputs): + return primary_output + return outputs[0] + + def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput, edid_atom: xcb.Atom | None, /) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + monitors_reply = xcb.randr_get_monitors(self.conn, self.drawable, 1) + timestamp = monitors_reply.timestamp + for randr_monitor in xcb.randr_get_monitors_monitors(monitors_reply): + monitor = { + "left": randr_monitor.x, + "top": randr_monitor.y, + "width": randr_monitor.width, + "height": randr_monitor.height, + } + if randr_monitor.primary: + monitor["is_primary"] = True + + if randr_monitor.nOutput > 0: + outputs = xcb.randr_monitor_info_outputs(randr_monitor) + chosen_output = self._choose_randr_output(outputs, primary_output) + monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom) + + self._monitors.append(monitor) + + def _monitors_from_randr_crtcs( + self, + randr_version: tuple[int, int], + primary_output: xcb.RandrOutput, + edid_atom: xcb.Atom | None, + /, + ) -> None: + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply - # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that - # the server supports it. if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): - screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) + screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable) crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) else: - # Either the client or the server doesn't support the _current form. That's ok; we'll use the old - # function, which forces a new query to the physical monitors. screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) + timestamp = screen_resources.config_timestamp for crtc in crtcs: - crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) + crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, timestamp) if crtc_info.num_outputs == 0: continue - self._monitors.append( - {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} - ) + monitor = { + "left": crtc_info.x, + "top": crtc_info.y, + "width": crtc_info.width, + "height": crtc_info.height, + } + + outputs = xcb.randr_get_crtc_info_outputs(crtc_info) + chosen_output = self._choose_randr_output(outputs, primary_output) + monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom) + if chosen_output == primary_output: + monitor["is_primary"] = True - # Extra credit would be to enumerate the virtual desktops; see - # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that - # style is. + self._monitors.append(monitor) def _cursor_impl_check_xfixes(self) -> bool: """Check XFixes availability and version. diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index 7c3991d8..64ea929a 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -31,12 +31,19 @@ ImageOrder, Keycode, Pixmap, + RandrConnection, RandrCrtc, RandrGetCrtcInfoReply, + RandrGetMonitorsReply, + RandrGetOutputInfoReply, + RandrGetOutputPrimaryReply, + RandrGetOutputPropertyReply, RandrGetScreenResourcesCurrentReply, RandrGetScreenResourcesReply, RandrMode, RandrModeInfo, + RandrMonitorInfo, + RandrMonitorInfoIterator, RandrOutput, RandrQueryVersionReply, RandrSetConfig, @@ -77,6 +84,16 @@ randr_get_crtc_info, randr_get_crtc_info_outputs, randr_get_crtc_info_possible, + randr_get_monitors, + randr_get_monitors_monitors, + randr_get_output_info, + randr_get_output_info_clones, + randr_get_output_info_crtcs, + randr_get_output_info_modes, + randr_get_output_info_name, + randr_get_output_primary, + randr_get_output_property, + randr_get_output_property_data, randr_get_screen_resources, randr_get_screen_resources_crtcs, randr_get_screen_resources_current, @@ -87,6 +104,7 @@ randr_get_screen_resources_modes, randr_get_screen_resources_names, randr_get_screen_resources_outputs, + randr_monitor_info_outputs, randr_query_version, render_pictdepth_visuals, render_pictscreen_depths, diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index f3a36782..606ad290 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -41,6 +41,12 @@ # Enum classes +class RandrConnection(IntEnum): + Connected = 0 + Disconnected = 1 + Unknown = 2 + + class RandrSetConfig(IntEnum): Success = 0 InvalidConfigTime = 1 @@ -363,6 +369,81 @@ class RandrGetCrtcInfoReply(Structure): ) +class RandrGetOutputInfoReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("status", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("timestamp", Timestamp), + ("crtc", RandrCrtc), + ("mm_width", c_uint32), + ("mm_height", c_uint32), + ("connection", c_uint8), + ("subpixel_order", c_uint8), + ("num_crtcs", c_uint16), + ("num_modes", c_uint16), + ("num_preferred", c_uint16), + ("num_clones", c_uint16), + ("name_len", c_uint16), + ) + + +class RandrGetOutputPrimaryReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + ("output", RandrOutput), + ) + + +class RandrGetOutputPropertyReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("format_", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("type_", Atom), + ("bytes_after", c_uint32), + ("num_items", c_uint32), + ("pad0", c_uint8 * 12), + ) + + +class RandrMonitorInfo(Structure): + _fields_ = ( + ("name", Atom), + ("primary", c_uint8), + ("automatic", c_uint8), + ("nOutput", c_uint16), + ("x", c_int16), + ("y", c_int16), + ("width", c_uint16), + ("height", c_uint16), + ("width_in_millimeters", c_uint32), + ("height_in_millimeters", c_uint32), + ) + + +class RandrMonitorInfoIterator(Structure): + _fields_ = (("data", POINTER(RandrMonitorInfo)), ("rem", c_int), ("index", c_int)) + + +class RandrGetMonitorsReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("pad0", c_uint8 * 1), + ("sequence", c_uint16), + ("length", c_uint32), + ("timestamp", Timestamp), + ("nMonitors", c_uint32), + ("nOutputs", c_uint32), + ("pad1", c_uint8 * 12), + ) + + class RenderQueryVersionReply(Structure): _fields_ = ( ("response_type", c_uint8), @@ -615,6 +696,42 @@ def randr_get_crtc_info_possible(r: RandrGetCrtcInfoReply) -> Array[RandrOutput] ) +def randr_get_output_info_crtcs(r: RandrGetOutputInfoReply) -> Array[RandrCrtc]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_crtcs, LIB.randr.xcb_randr_get_output_info_crtcs_length, r + ) + + +def randr_get_output_info_modes(r: RandrGetOutputInfoReply) -> Array[RandrMode]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_modes, LIB.randr.xcb_randr_get_output_info_modes_length, r + ) + + +def randr_get_output_info_clones(r: RandrGetOutputInfoReply) -> Array[RandrOutput]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_info_clones, LIB.randr.xcb_randr_get_output_info_clones_length, r + ) + + +def randr_get_output_info_name(r: RandrGetOutputInfoReply) -> Array[c_uint8]: + return array_from_xcb(LIB.randr.xcb_randr_get_output_info_name, LIB.randr.xcb_randr_get_output_info_name_length, r) + + +def randr_get_output_property_data(r: RandrGetOutputPropertyReply) -> Array[c_uint8]: + return array_from_xcb( + LIB.randr.xcb_randr_get_output_property_data, LIB.randr.xcb_randr_get_output_property_data_length, r + ) + + +def randr_monitor_info_outputs(r: RandrMonitorInfo) -> Array[RandrOutput]: + return array_from_xcb(LIB.randr.xcb_randr_monitor_info_outputs, LIB.randr.xcb_randr_monitor_info_outputs_length, r) + + +def randr_get_monitors_monitors(r: RandrGetMonitorsReply) -> list[RandrMonitorInfo]: + return list_from_xcb(LIB.randr.xcb_randr_get_monitors_monitors_iterator, LIB.randr.xcb_randr_monitor_info_next, r) + + def render_pictdepth_visuals(r: RenderPictdepth) -> Array[RenderPictvisual]: return array_from_xcb(LIB.render.xcb_render_pictdepth_visuals, LIB.render.xcb_render_pictdepth_visuals_length, r) @@ -704,6 +821,33 @@ def randr_get_crtc_info(c: Connection, crtc: RandrCrtc, config_timestamp: Timest return LIB.randr.xcb_randr_get_crtc_info(c, crtc, config_timestamp).reply(c) +def randr_get_output_info(c: Connection, output: RandrOutput, config_timestamp: Timestamp) -> RandrGetOutputInfoReply: + return LIB.randr.xcb_randr_get_output_info(c, output, config_timestamp).reply(c) + + +def randr_get_output_primary(c: Connection, window: Window) -> RandrGetOutputPrimaryReply: + return LIB.randr.xcb_randr_get_output_primary(c, window).reply(c) + + +def randr_get_output_property( + c: Connection, + output: RandrOutput, + property_: Atom, + type_: Atom, + long_offset: c_uint32 | int, + long_length: c_uint32 | int, + delete: c_uint8 | int, + pending: c_uint8 | int, +) -> RandrGetOutputPropertyReply: + return LIB.randr.xcb_randr_get_output_property( + c, output, property_, type_, long_offset, long_length, delete, pending + ).reply(c) + + +def randr_get_monitors(c: Connection, window: Window, get_active: c_uint8 | int) -> RandrGetMonitorsReply: + return LIB.randr.xcb_randr_get_monitors(c, window, get_active).reply(c) + + def render_query_version( c: Connection, client_major_version: c_uint32 | int, client_minor_version: c_uint32 | int ) -> RenderQueryVersionReply: @@ -764,6 +908,8 @@ def initialize() -> None: # noqa: PLR0915 LIB.xcb.xcb_screen_next.restype = None LIB.xcb.xcb_setup_next.argtypes = (POINTER(SetupIterator),) LIB.xcb.xcb_setup_next.restype = None + LIB.randr.xcb_randr_monitor_info_next.argtypes = (POINTER(RandrMonitorInfoIterator),) + LIB.randr.xcb_randr_monitor_info_next.restype = None LIB.render.xcb_render_pictdepth_next.argtypes = (POINTER(RenderPictdepthIterator),) LIB.render.xcb_render_pictdepth_next.restype = None LIB.render.xcb_render_pictscreen_next.argtypes = (POINTER(RenderPictscreenIterator),) @@ -840,6 +986,32 @@ def initialize() -> None: # noqa: PLR0915 LIB.randr.xcb_randr_get_crtc_info_possible.restype = POINTER(RandrOutput) LIB.randr.xcb_randr_get_crtc_info_possible_length.argtypes = (POINTER(RandrGetCrtcInfoReply),) LIB.randr.xcb_randr_get_crtc_info_possible_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_crtcs.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_crtcs.restype = POINTER(RandrCrtc) + LIB.randr.xcb_randr_get_output_info_crtcs_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_crtcs_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_modes.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_modes.restype = POINTER(RandrMode) + LIB.randr.xcb_randr_get_output_info_modes_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_modes_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_clones.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_clones.restype = POINTER(RandrOutput) + LIB.randr.xcb_randr_get_output_info_clones_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_clones_length.restype = c_int + LIB.randr.xcb_randr_get_output_info_name.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_name.restype = POINTER(c_uint8) + LIB.randr.xcb_randr_get_output_info_name_length.argtypes = (POINTER(RandrGetOutputInfoReply),) + LIB.randr.xcb_randr_get_output_info_name_length.restype = c_int + LIB.randr.xcb_randr_get_output_property_data.argtypes = (POINTER(RandrGetOutputPropertyReply),) + LIB.randr.xcb_randr_get_output_property_data.restype = POINTER(c_uint8) + LIB.randr.xcb_randr_get_output_property_data_length.argtypes = (POINTER(RandrGetOutputPropertyReply),) + LIB.randr.xcb_randr_get_output_property_data_length.restype = c_int + LIB.randr.xcb_randr_monitor_info_outputs.argtypes = (POINTER(RandrMonitorInfo),) + LIB.randr.xcb_randr_monitor_info_outputs.restype = POINTER(RandrOutput) + LIB.randr.xcb_randr_monitor_info_outputs_length.argtypes = (POINTER(RandrMonitorInfo),) + LIB.randr.xcb_randr_monitor_info_outputs_length.restype = c_int + LIB.randr.xcb_randr_get_monitors_monitors_iterator.argtypes = (POINTER(RandrGetMonitorsReply),) + LIB.randr.xcb_randr_get_monitors_monitors_iterator.restype = RandrMonitorInfoIterator LIB.render.xcb_render_pictdepth_visuals.argtypes = (POINTER(RenderPictdepth),) LIB.render.xcb_render_pictdepth_visuals.restype = POINTER(RenderPictvisual) LIB.render.xcb_render_pictdepth_visuals_length.argtypes = (POINTER(RenderPictdepth),) @@ -860,10 +1032,7 @@ def initialize() -> None: # noqa: PLR0915 LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image.restype = POINTER(c_uint32) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.argtypes = (POINTER(XfixesGetCursorImageReply),) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.restype = c_int - LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = ( - POINTER(Connection), - POINTER(ShmCreateSegmentReply), - ) + LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = (POINTER(Connection), POINTER(ShmCreateSegmentReply)) LIB.shm.xcb_shm_create_segment_reply_fds.restype = POINTER(c_int) initialize_xcb_typed_func(LIB.xcb, "xcb_get_geometry", [POINTER(Connection), Drawable], GetGeometryReply) initialize_xcb_typed_func( @@ -895,6 +1064,21 @@ def initialize() -> None: # noqa: PLR0915 initialize_xcb_typed_func( LIB.randr, "xcb_randr_get_crtc_info", [POINTER(Connection), RandrCrtc, Timestamp], RandrGetCrtcInfoReply ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_output_info", [POINTER(Connection), RandrOutput, Timestamp], RandrGetOutputInfoReply + ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_output_primary", [POINTER(Connection), Window], RandrGetOutputPrimaryReply + ) + initialize_xcb_typed_func( + LIB.randr, + "xcb_randr_get_output_property", + [POINTER(Connection), RandrOutput, Atom, Atom, c_uint32, c_uint32, c_uint8, c_uint8], + RandrGetOutputPropertyReply, + ) + initialize_xcb_typed_func( + LIB.randr, "xcb_randr_get_monitors", [POINTER(Connection), Window, c_uint8], RandrGetMonitorsReply + ) initialize_xcb_typed_func( LIB.render, "xcb_render_query_version", [POINTER(Connection), c_uint32, c_uint32], RenderQueryVersionReply ) @@ -908,20 +1092,12 @@ def initialize() -> None: # noqa: PLR0915 [POINTER(Connection), Drawable, c_int16, c_int16, c_uint16, c_uint16, c_uint32, c_uint8, ShmSeg, c_uint32], ShmGetImageReply, ) - LIB.shm.xcb_shm_attach_fd_checked.argtypes = ( - POINTER(Connection), - ShmSeg, - c_int, - c_uint8, - ) + LIB.shm.xcb_shm_attach_fd_checked.argtypes = (POINTER(Connection), ShmSeg, c_int, c_uint8) LIB.shm.xcb_shm_attach_fd_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.shm, "xcb_shm_create_segment", [POINTER(Connection), ShmSeg, c_uint32, c_uint8], ShmCreateSegmentReply ) - LIB.shm.xcb_shm_detach_checked.argtypes = ( - POINTER(Connection), - ShmSeg, - ) + LIB.shm.xcb_shm_detach_checked.argtypes = (POINTER(Connection), ShmSeg) LIB.shm.xcb_shm_detach_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.xfixes, "xcb_xfixes_query_version", [POINTER(Connection), c_uint32, c_uint32], XfixesQueryVersionReply diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index f504af66..648a5e4e 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -68,6 +68,10 @@ "GetScreenResources", "GetScreenResourcesCurrent", "GetCrtcInfo", + "GetOutputInfo", + "GetOutputPrimary", + "GetOutputProperty", + "GetMonitors", ], "render": [ "QueryVersion", From b443ae1dab3f802bbae32d250e2ea9650f43ba43 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 16:38:14 -0800 Subject: [PATCH 6/9] Implement suggested fixes from review Uses the := walrus operator to reduce redundant dict lookups. Also (this wasn't in the review), refactors an "in" test from a version I wrote when I needed a more complicated test, to one that's easier to read. --- src/mss/linux/base.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index 2236c961..5a808c64 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -254,23 +254,21 @@ def _randr_output_ids( if edid_prop.type_.value != 0: edid_block = bytes(xcb.randr_get_output_property_data(edid_prop)) edid_data = parse_edid(edid_block) - if "display_name" in edid_data: - rv["name"] = edid_data["display_name"] + if (display_name := edid_data.get("display_name")) is not None: + rv["name"] = display_name edid_params: dict[str, str] = {} - if "id_legacy" in edid_data: - edid_params["model"] = edid_data["id_legacy"] - if "serial_number" in edid_data: - edid_params["serial"] = str(edid_data["serial_number"]) - if "manufacture_year" in edid_data: - if "manufacture_week" in edid_data: - edid_params["mfr_date"] = ( - f"{edid_data['manufacture_year']:04d}W{edid_data['manufacture_week']:02d}" - ) + if (id_legacy := edid_data.get("id_legacy")) is not None: + edid_params["model"] = id_legacy + if (serial_number := edid_data.get("serial_number")) is not None: + edid_params["serial"] = str(serial_number) + if (manufacture_year := edid_data.get("manufacture_year")) is not None: + if (manufacture_week := edid_data.get("manufacture_week")) is not None: + edid_params["mfr_date"] = f"{manufacture_year:04d}W{manufacture_week:02d}" else: - edid_params["mfr_date"] = f"{edid_data['manufacture_year']:04d}" - if "model_year" in edid_data: - edid_params["model_year"] = f"{edid_data['model_year']:04d}" + edid_params["mfr_date"] = f"{manufacture_year:04d}" + if (model_year := edid_data.get("model_year")) is not None: + edid_params["model_year"] = f"{model_year:04d}" if edid_params: rv["unique_id"] = urlencode(edid_params) @@ -281,7 +279,7 @@ def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.Ra if len(outputs) == 0: msg = "No RandR outputs available" raise ScreenShotError(msg) - if any(o == primary_output for o in outputs): + if primary_output in outputs: return primary_output return outputs[0] From 2e656fb22b6f5a3cf7a4f0af40585bb832a261d0 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 16:54:00 -0800 Subject: [PATCH 7/9] Add CHANGELOG and CHANGES entries --- CHANGELOG.md | 1 + CHANGES.md | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c8397f6..2c6314a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ See Git commit messages for full history. ## 10.2.0.dev0 (2026-xx-xx) +- Linux: add primary monitor detection, monitor device name, unique device interface name, and output name using XRandR (#153) - Windows: switch from `GetDIBits` to more memory efficient `CreateDIBSection` for `MSS.grab` implementation (#449) - Windows: fix gdi32.GetDIBits() failed after a couple of minutes of recording (#268) - Linux: check the server for Xrandr support version (#417) diff --git a/CHANGES.md b/CHANGES.md index 1f9ef079..202b224f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,40 @@ # Technical Changes +## 10.2.0 (2026-xx-xx) + +### windows.py +- Added `MONITORINFOEXW` structure for extended monitor information. +- Added `DISPLAY_DEVICEW` structure for device information. +- Added constants: `CCHDEVICENAME`, `MONITORINFOF_PRIMARY`, `EDD_GET_DEVICE_INTERFACE_NAME`. +- Added `GetMonitorInfoW` to `CFUNCTIONS` for querying monitor properties. +- Added `EnumDisplayDevicesW` to `CFUNCTIONS` for querying device details. +- Modified `_monitors_impl()` callback to extract primary monitor flag, device names, and device interface name (unique_id) using Win32 APIs; `unique_id` uses `EDD_GET_DEVICE_INTERFACE_NAME` when available. + +### linux/base.py +- Reworked `_monitors_impl()` to prefer XRandR 1.5+ `GetMonitors` when available, falling back to enumerating active CRTCs. +- Added monitor identification fields from RandR + EDID where available: `is_primary`, `output`, `name`, and `unique_id`. +- Added EDID lookup via RandR `EDID`/`EdidData` output property and parsing via `mss.tools.parse_edid()`. + +### linux/xcb.py +- Added `intern_atom()` helper with per-connection caching and support for predefined atoms. +- Added `XCB_NONE` constant (`Atom(0)`). +- Added additional XRandR request wrappers used for monitor identification (`GetMonitors`, `GetOutputInfo`, `GetOutputPrimary`, `GetOutputProperty`). + +### linux/xcbhelpers.py +- Added `InternAtomReply` structure and typed binding for `xcb_intern_atom`. +- Added `__eq__()`/`__hash__()` to `XID` for value-based comparisons. + +### xcbproto/gen_xcb_to_py.py +- Extended the generator to include additional XRandR requests used by the XCB backends (`GetOutputInfo`, `GetOutputPrimary`, `GetOutputProperty`, `GetMonitors`). +- Updated typedef generation to emit value-based `__eq__()`/`__hash__()` implementations. +- Refactored code generation helpers and formatting (use `textwrap.indent`/`dedent`). + +### tools.py +- Added `parse_edid()` helper for extracting identifying fields (legacy model id, serial number, manufacture/model year, and display name) from EDID blocks. + +### linux/xshmgetimage.py +- Fixed XID type handling for `drawable`/`visual` (avoid mixing raw `.value` with typed IDs). + ## 10.1.1 (2025-xx-xx) ### linux/__init__.py From afc142238cb183a07e41e93d66172d922068f79a Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 17:11:29 -0800 Subject: [PATCH 8/9] Set monitor["is_primary"] = False if applicable In the previous code, "is_primary" would only be set on the primary monitor (where it's True). This change sets it on all monitors, to parallel the Windows behavior. This code won't set it at all if the primary monitor cannot be determined (XRandR 1.2), although it might set all of them to False if XRandR tells us explicitly that no monitor is primary. (MSSBase.primary_monitor, in the pending PR #469, will use the first monitor in that event.) --- src/mss/linux/base.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index 5a808c64..bed291a2 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -197,7 +197,7 @@ def _randr_get_version(self) -> tuple[int, int] | None: randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) return (randr_version_data.major_version, randr_version_data.minor_version) - def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.RandrOutput: + def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.RandrOutput | None: if self.conn is None: msg = "Cannot identify monitors while the connection is closed" raise ScreenShotError(msg) @@ -205,7 +205,9 @@ def _randr_get_primary_output(self, randr_version: tuple[int, int], /) -> xcb.Ra if randr_version >= (1, 3): primary_output_data = xcb.randr_get_output_primary(self.conn, self.drawable) return primary_output_data.output - return xcb.RandrOutput(0) + # Python None means that there was no way to identify a primary output. This is distinct from XCB_NONE (that + # is, xcb.RandROutput(0)), which means that there is not a primary monitor. + return None def _randr_get_edid_atom(self) -> xcb.Atom | None: if self.conn is None: @@ -275,7 +277,7 @@ def _randr_output_ids( return rv @staticmethod - def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput, /) -> xcb.RandrOutput: + def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput | None, /) -> xcb.RandrOutput: if len(outputs) == 0: msg = "No RandR outputs available" raise ScreenShotError(msg) @@ -283,7 +285,7 @@ def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.Ra return primary_output return outputs[0] - def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput, edid_atom: xcb.Atom | None, /) -> None: + def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput | None, edid_atom: xcb.Atom | None, /) -> None: if self.conn is None: msg = "Cannot identify monitors while the connection is closed" raise ScreenShotError(msg) @@ -297,8 +299,11 @@ def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput, edid_at "width": randr_monitor.width, "height": randr_monitor.height, } - if randr_monitor.primary: - monitor["is_primary"] = True + # Under XRandR, it's legal for no monitor to be primary. In this case, case MSSBase.primary_monitor will + # return the first monitor. That said, we note in the dict that we explicitly are told by XRandR that + # all of the monitors are not primary. (This is distinct from the XRandR 1.2 path, which doesn't have + # any information about primary monitors.) + monitor["is_primary"] = bool(randr_monitor.primary) if randr_monitor.nOutput > 0: outputs = xcb.randr_monitor_info_outputs(randr_monitor) @@ -310,7 +315,7 @@ def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput, edid_at def _monitors_from_randr_crtcs( self, randr_version: tuple[int, int], - primary_output: xcb.RandrOutput, + primary_output: xcb.RandrOutput | None, edid_atom: xcb.Atom | None, /, ) -> None: @@ -341,8 +346,11 @@ def _monitors_from_randr_crtcs( outputs = xcb.randr_get_crtc_info_outputs(crtc_info) chosen_output = self._choose_randr_output(outputs, primary_output) monitor |= self._randr_output_ids(chosen_output, timestamp, edid_atom) - if chosen_output == primary_output: - monitor["is_primary"] = True + # The concept of primary outputs was added in XRandR 1.3. We distinguish between "all the monitors are + # not primary" (RRGetOutputPrimary returned XCB_NONE, a valid case) and "we have no way to get + # information about the primary monitor": in the latter case, we don't populate "is_primary". + if primary_output is not None: + monitor["is_primary"] = chosen_output == primary_output self._monitors.append(monitor) From 3b93a73c0f89eddf38b7747725cf079eac897eb1 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 20 Feb 2026 17:20:42 -0800 Subject: [PATCH 9/9] Type fix --- src/mss/linux/base.py | 11 +++++++++-- src/tests/test_xcb.py | 3 +-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index bed291a2..036eb984 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -277,15 +277,22 @@ def _randr_output_ids( return rv @staticmethod - def _choose_randr_output(outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput | None, /) -> xcb.RandrOutput: + def _choose_randr_output( + outputs: Array[xcb.RandrOutput], primary_output: xcb.RandrOutput | None, / + ) -> xcb.RandrOutput: if len(outputs) == 0: msg = "No RandR outputs available" raise ScreenShotError(msg) + if primary_output is None: + # We don't want to use the `in` check if this could be None, according to MyPy. + return outputs[0] if primary_output in outputs: return primary_output return outputs[0] - def _monitors_from_randr_monitors(self, primary_output: xcb.RandrOutput | None, edid_atom: xcb.Atom | None, /) -> None: + def _monitors_from_randr_monitors( + self, primary_output: xcb.RandrOutput | None, edid_atom: xcb.Atom | None, / + ) -> None: if self.conn is None: msg = "Cannot identify monitors while the connection is closed" raise ScreenShotError(msg) diff --git a/src/tests/test_xcb.py b/src/tests/test_xcb.py index be18ccdd..f51a9206 100644 --- a/src/tests/test_xcb.py +++ b/src/tests/test_xcb.py @@ -1,6 +1,7 @@ from __future__ import annotations import gc +import platform from ctypes import ( POINTER, Structure, @@ -15,7 +16,6 @@ from typing import TYPE_CHECKING, Any, Callable from unittest.mock import Mock from weakref import finalize -import platform if TYPE_CHECKING: from collections.abc import Generator @@ -31,7 +31,6 @@ list_from_xcb, ) - if platform.system().lower() != "linux": pytestmark = pytest.mark.skip