diff --git a/scripts/flipper_core.py b/scripts/flipper_core.py index f8283d7..ccabb95 100644 --- a/scripts/flipper_core.py +++ b/scripts/flipper_core.py @@ -8,6 +8,7 @@ from __future__ import annotations import os +import queue import struct import sys import threading @@ -103,14 +104,45 @@ def varint_decode(reader) -> int: raise ValueError("varint too long") +@dataclass +class _PendingCall: + """One in-flight RPC call awaiting its response chain. + + The reader thread appends each received message with the matching + command_id to `msgs`, then signals `event` once a message arrives with + `has_next == False` (or sets `error` and signals on connection loss). + """ + event: threading.Event + msgs: list = field(default_factory=list) + error: Optional[BaseException] = None + + +# Subscriber queue sizing. 30 frames ≈ 2 seconds at 15 FPS, plenty of slack +# for a slow consumer without unbounded memory growth. +_FRAME_QUEUE_SIZE = 30 +_LOG_QUEUE_SIZE = 4096 + + class RpcSession: - """Synchronous RPC session over a serial port. + """RPC session over a serial port with a dedicated reader thread. Usage: with RpcSession(port) as s: info = s.device_info() png = s.screen_frame() s.send_input("Ok", "Short") + + Concurrency model: + A single daemon reader thread owns ``self._ser.read``. Callers send + commands via ``_request`` / ``_request_stream`` which register a + ``_PendingCall`` keyed by command_id, transmit the message under + ``_io_lock``, and block on ``call.event`` until the reader dispatches + the response chain. + + Asynchronous notifications (``system_log_response``, + ``gui_screen_frame``) fan out to any queues created by + ``subscribe_log_queue()`` / ``subscribe_frames()``. Multiple readers + can subscribe concurrently; each gets its own queue. """ def __init__(self, port: str, timeout: float = 5.0): @@ -124,13 +156,24 @@ def __init__(self, port: str, timeout: float = 5.0): self.timeout = timeout self._ser: Optional[serial.Serial] = None self._command_id = 0 - # Pending responses keyed by command_id (when a background reader is active) - self._pending: dict[int, list] = {} + self._id_lock = threading.Lock() + # In-flight calls awaiting responses, keyed by command_id. + self._pending: dict[int, _PendingCall] = {} + self._pending_lock = threading.Lock() + # Unrouted messages (no matching pending call) — kept for diagnostics. self._notifications: deque = deque(maxlen=4096) + # Asynchronous push streams. self._log_lines: deque[str] = deque(maxlen=4096) + self._log_subscribers: list[queue.Queue] = [] + self._frame_subscribers: list[queue.Queue] = [] + self._subscribers_lock = threading.Lock() + # Reader thread state. self._reader_thread: Optional[threading.Thread] = None self._reader_stop = threading.Event() + self._alive = False # True between successful open() and close()/EOF + # Write-side serialization on the serial port. self._io_lock = threading.Lock() + # Legacy callback API (kept for backward compat with subscribe_logs). self._on_log: Optional[Callable[[str], None]] = None def __enter__(self) -> "RpcSession": @@ -142,11 +185,26 @@ def __exit__(self, *exc) -> None: def open(self) -> None: self._ser = serial.Serial(self.port, baudrate=230400, timeout=self.timeout) + # Drain banner + switch into RPC mode while we still own the read side. + # The reader thread must NOT be running yet — `_enter_rpc_mode` reads + # synchronously from the serial port to detect the CLI prompt. self._enter_rpc_mode() + self._alive = True + self._reader_stop.clear() + self._reader_thread = threading.Thread( + target=self._reader_loop, + name=f"flipper-rpc-reader[{self.port}]", + daemon=True, + ) + self._reader_thread.start() def close(self) -> None: if self._ser is None: return + # Order matters: signal stop first, send polite session-end, then + # close the port (which unblocks any pending read in the reader), + # then join the thread, then fail any callers still waiting. + self._reader_stop.set() try: self._stop_session() except Exception: @@ -155,7 +213,13 @@ def close(self) -> None: self._ser.close() except Exception: pass + thread = self._reader_thread + self._reader_thread = None + if thread is not None and thread.is_alive(): + thread.join(timeout=2.0) self._ser = None + self._alive = False + self._fail_pending(ConnectionError("RPC session closed")) # -- RPC framing ----------------------------------------------------- @@ -183,8 +247,9 @@ def _read_until(self, marker: bytes, timeout: float = 2.0) -> bytes: return bytes(buf) def _next_id(self) -> int: - self._command_id = (self._command_id + 1) & 0x7FFFFFFF - return self._command_id + with self._id_lock: + self._command_id = (self._command_id + 1) & 0x7FFFFFFF + return self._command_id def _send(self, msg) -> None: assert self._ser is not None @@ -192,7 +257,8 @@ def _send(self, msg) -> None: with self._io_lock: self._ser.write(varint_encode(len(data)) + data) - def _recv(self): + def _recv_raw(self): + """Read one framed message. Called only from the reader thread.""" assert self._ser is not None length = varint_decode(self._ser.read) data = b"" @@ -205,10 +271,107 @@ def _recv(self): msg.ParseFromString(data) return msg - def _request(self, **fields): - msg = flipper_pb2.Main(command_id=self._next_id(), **fields) - self._send(msg) - return self._recv() + # Cap consecutive non-EOF read errors before giving up on the session. + # 50 errors × 10 ms backoff = ~500 ms grace for a transient garbage burst + # while still bounding CPU on a wedged stream. + _READER_MAX_CONSECUTIVE_ERRORS = 50 + _READER_ERROR_BACKOFF_S = 0.01 + + def _reader_loop(self) -> None: + """Pump messages off the serial port and dispatch them. + + Runs in its own daemon thread until ``close()`` is called or the + device disconnects. On exit, fails every still-waiting call so + request methods don't hang forever. + """ + consecutive_errors = 0 + try: + while not self._reader_stop.is_set(): + try: + msg = self._recv_raw() + consecutive_errors = 0 + except EOFError: + self._alive = False + break + except serial.SerialException: + self._alive = False + break + except Exception: + # Transient read error (varint overflow, protobuf decode + # failure, etc.). Brief backoff so a wedged stream + # doesn't pin a CPU core; give up after a threshold so + # callers eventually unblock instead of waiting forever. + if self._reader_stop.is_set(): + break + consecutive_errors += 1 + if consecutive_errors >= self._READER_MAX_CONSECUTIVE_ERRORS: + self._alive = False + break + time.sleep(self._READER_ERROR_BACKOFF_S) + continue + try: + self._handle_message(msg) + except Exception: + # Dispatcher bugs must not kill the reader. + pass + finally: + self._fail_pending( + ConnectionError("RPC reader thread exited (device disconnected?)") + ) + + def _fail_pending(self, error: BaseException) -> None: + """Wake every still-waiting `_PendingCall` with an error.""" + with self._pending_lock: + calls = list(self._pending.values()) + self._pending.clear() + for call in calls: + call.error = error + call.event.set() + + def _register_pending(self, cid: int) -> _PendingCall: + call = _PendingCall(event=threading.Event()) + with self._pending_lock: + self._pending[cid] = call + return call + + def _unregister_pending(self, cid: int) -> None: + with self._pending_lock: + self._pending.pop(cid, None) + + def _wait_pending(self, call: _PendingCall, timeout: Optional[float] = None) -> list: + """Block until the response chain finishes or the timeout elapses.""" + signaled = call.event.wait(timeout if timeout is not None else self.timeout) + if call.error is not None: + raise call.error + if not signaled: + raise TimeoutError("RPC timeout") + return call.msgs + + def _request(self, _timeout: Optional[float] = None, **fields): + """Send one message and return the final response (handles has_next). + + Use for one-shot commands. For commands that emit a sequence of + responses, use :meth:`_request_stream` to receive all of them. + """ + cid = self._next_id() + call = self._register_pending(cid) + try: + self._send(flipper_pb2.Main(command_id=cid, **fields)) + msgs = self._wait_pending(call, timeout=_timeout) + finally: + self._unregister_pending(cid) + return msgs[-1] if msgs else None + + def _request_stream(self, _timeout: Optional[float] = None, **fields) -> list: + """Send one message and return every response message in the chain.""" + cid = self._next_id() + call = self._register_pending(cid) + try: + self._send(flipper_pb2.Main(command_id=cid, **fields)) + msgs = self._wait_pending(call, timeout=_timeout) + finally: + self._unregister_pending(cid) + return msgs # -- Log subscription ----------------------------------------------- @@ -257,13 +420,30 @@ def get_log_lines(self, tail: Optional[int] = None, clear: bool = False) -> list lines = lines[-tail:] return lines + @staticmethod + def _push_to_subscribers(subs: list, item) -> None: + """Fan an item out to a list of bounded queues, dropping oldest on overflow. + + Subscribers iterate over a snapshot of the list so it's safe to + mutate concurrently elsewhere. + """ + for q in list(subs): + try: + q.put_nowait(item) + except queue.Full: + try: + q.get_nowait() + q.put_nowait(item) + except (queue.Empty, queue.Full): + pass + def _handle_message(self, msg) -> None: - """Dispatch a message: log notification or command response.""" - # Log notifications carry command_id=0 and a system_log_response + """Route a received message to its pending caller or push subscribers.""" try: kind = msg.WhichOneof("content") except Exception: kind = None + if kind == "system_log_response": text = getattr(msg.system_log_response, "text", "") or "" if text: @@ -273,11 +453,82 @@ def _handle_message(self, msg) -> None: self._on_log(text) except Exception: pass + with self._subscribers_lock: + subs = list(self._log_subscribers) + self._push_to_subscribers(subs, text) return + + if kind == "gui_screen_frame": + data = bytes(msg.gui_screen_frame.data) + with self._subscribers_lock: + subs = list(self._frame_subscribers) + self._push_to_subscribers(subs, data) + # Don't fall through to _pending routing — frames are pure + # notifications even when command_id matches a start-stream + # request (one-shot screen_frame() pulls from a subscriber queue + # instead of waiting on _pending). + return + cid = msg.command_id - self._pending.setdefault(cid, []).append(msg) + with self._pending_lock: + call = self._pending.get(cid) + if call is not None: + call.msgs.append(msg) + if not msg.has_next: + call.event.set() + return + + # Unrouted notification (e.g. start-stream ACK arriving after the + # caller already moved on). Keep a bounded history for debugging. self._notifications.append(msg) + # -- Push subscribers ----------------------------------------------- + + def subscribe_frames(self, maxsize: int = _FRAME_QUEUE_SIZE) -> "queue.Queue[bytes]": + """Return a queue that receives screen frames pushed by the device. + + Frames arrive as raw 1024-byte 1-bpp packed framebuffers. The caller + must also activate the device-side stream via + :meth:`start_screen_stream`. Always pair with + :meth:`unsubscribe_frames` to avoid leaking queues on the dispatcher. + """ + q: "queue.Queue[bytes]" = queue.Queue(maxsize=maxsize) + with self._subscribers_lock: + self._frame_subscribers.append(q) + return q + + def unsubscribe_frames(self, q: "queue.Queue[bytes]") -> None: + with self._subscribers_lock: + try: + self._frame_subscribers.remove(q) + except ValueError: + pass + + def subscribe_log_queue(self, maxsize: int = _LOG_QUEUE_SIZE) -> "queue.Queue[str]": + """Return a queue that receives log lines pushed by the device. + + Activate device-side log delivery with :meth:`subscribe_logs` first. + """ + q: "queue.Queue[str]" = queue.Queue(maxsize=maxsize) + with self._subscribers_lock: + self._log_subscribers.append(q) + return q + + def unsubscribe_log_queue(self, q: "queue.Queue[str]") -> None: + with self._subscribers_lock: + try: + self._log_subscribers.remove(q) + except ValueError: + pass + + def start_screen_stream(self) -> None: + """Tell the device to start pushing screen frames continuously.""" + self._request(gui_start_screen_stream_request=gui_pb2.StartScreenStreamRequest()) + + def stop_screen_stream(self) -> None: + """Tell the device to stop pushing screen frames.""" + self._request(gui_stop_screen_stream_request=gui_pb2.StopScreenStreamRequest()) + def _stop_session(self) -> None: try: msg = flipper_pb2.Main( @@ -293,20 +544,13 @@ def _stop_session(self) -> None: def device_info(self) -> DeviceInfo: info = DeviceInfo(port=self.port) properties: dict[str, str] = {} - msg = flipper_pb2.Main( - command_id=self._next_id(), - has_next=False, + for resp in self._request_stream( system_device_info_request=system_pb2.DeviceInfoRequest() - ) - self._send(msg) - while True: - resp = self._recv() + ): key = resp.system_device_info_response.key value = resp.system_device_info_response.value if key: properties[key] = value - if not resp.has_next: - break info.properties = properties info.firmware_version = properties.get("firmware_version", "") info.api_version = ( @@ -334,14 +578,38 @@ def _detect_firmware_variant(props: dict) -> str: return "xtreme" return "ofw" - def screen_frame(self) -> bytes: - """Return the current 128x64 framebuffer (1024 bytes, 1bpp packed).""" - self._request(gui_start_screen_stream_request=gui_pb2.StartScreenStreamRequest()) - # First frame arrives as a notification; capture it - frame = self._recv() - data = bytes(frame.gui_screen_frame.data) - self._request(gui_stop_screen_stream_request=gui_pb2.StopScreenStreamRequest()) - return data + def screen_frame(self, timeout: float = 5.0) -> bytes: + """Return the current 128x64 framebuffer (1024 bytes, 1bpp packed). + + One-shot: starts the stream, pulls a single fresh frame off the + subscriber queue, then stops the stream. Safe to call concurrently + with a long-lived viewer subscription — every subscriber gets every + frame. + """ + q = self.subscribe_frames(maxsize=4) + try: + self.start_screen_stream() + # _request for start_screen_stream blocks until the device ACKs; + # frames received before that ACK belong to a previous (overlapping + # or stopping) stream and must not be returned as "the new frame". + # The reader thread is serial, so anything already in the queue + # at this point predates the ACK. + while True: + try: + q.get_nowait() + except queue.Empty: + break + try: + return q.get(timeout=timeout) + except queue.Empty as e: + raise TimeoutError("No screen frame received") from e + finally: + try: + self.stop_screen_stream() + except Exception: + pass + finally: + self.unsubscribe_frames(q) def send_input(self, key: str, kind: str = "Short") -> None: key_enum = gui_pb2.InputKey.Value(key) # Up, Down, Left, Right, Ok, Back @@ -364,53 +632,46 @@ def stop_app(self) -> None: def storage_list(self, path: str) -> list[dict]: out = [] - msg = flipper_pb2.Main( - command_id=self._next_id(), + for resp in self._request_stream( storage_list_request=storage_pb2.ListRequest(path=path) - ) - self._send(msg) - while True: - resp = self._recv() + ): for f in resp.storage_list_response.file: out.append({ "name": f.name, "type": "dir" if f.type == storage_pb2.File.DIR else "file", "size": f.size, }) - if not resp.has_next: - break return out def storage_read(self, path: str) -> bytes: buf = bytearray() - msg = flipper_pb2.Main( - command_id=self._next_id(), + for resp in self._request_stream( storage_read_request=storage_pb2.ReadRequest(path=path) - ) - self._send(msg) - while True: - resp = self._recv() + ): buf.extend(resp.storage_read_response.file.data) - if not resp.has_next: - break return bytes(buf) def storage_write(self, path: str, data: bytes, chunk_size: int = 512) -> None: chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] or [b""] - cmd_id = self._next_id() - for i, chunk in enumerate(chunks): - is_last = (i == len(chunks) - 1) - msg = flipper_pb2.Main( - command_id=cmd_id, - has_next=not is_last, - storage_write_request=storage_pb2.WriteRequest( - path=path, - file=storage_pb2.File(data=chunk), - ) - ) - self._send(msg) - # Final ack - self._recv() + # All chunks share one command_id; device replies once after the final + # chunk. Register the pending call *before* sending so the reader + # thread can route the response back to us. + cid = self._next_id() + call = self._register_pending(cid) + try: + for i, chunk in enumerate(chunks): + is_last = (i == len(chunks) - 1) + self._send(flipper_pb2.Main( + command_id=cid, + has_next=not is_last, + storage_write_request=storage_pb2.WriteRequest( + path=path, + file=storage_pb2.File(data=chunk), + ) + )) + self._wait_pending(call) + finally: + self._unregister_pending(cid) def storage_mkdir(self, path: str) -> None: self._request(storage_mkdir_request=storage_pb2.MkdirRequest(path=path))