diff --git a/.coveragerc b/.coveragerc index 481e53d..83fcc6e 100644 --- a/.coveragerc +++ b/.coveragerc @@ -3,7 +3,4 @@ [run] branch = True source = dlclivegui -omit = - # omit only the parts that are pure passthrough shims to SDKs - dlclivegui/cameras/backends/basler_backend.py - dlclivegui/cameras/backends/gentl_backend.py +# omit = diff --git a/dlclivegui/cameras/backends/aravis_backend.py b/dlclivegui/cameras/backends/aravis_backend.py index a0d9ce3..b437c3c 100644 --- a/dlclivegui/cameras/backends/aravis_backend.py +++ b/dlclivegui/cameras/backends/aravis_backend.py @@ -10,7 +10,9 @@ import cv2 import numpy as np +from ...config import CameraSettings from ..base import CameraBackend, SupportLevel, register_backend +from ..factory import DetectedCamera LOG = logging.getLogger(__name__) @@ -40,7 +42,7 @@ def __init__(self, settings): if not isinstance(ns, dict): ns = {} - self._camera_id: str | None = ns.get("camera_id") or props.get("camera_id") + self._camera_id: str | None = ns.get("device_id") or props.get("device_id") self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "Mono8") self._timeout: int = int(ns.get("timeout", props.get("timeout", 2_000_000))) self._n_buffers: int = int(ns.get("n_buffers", props.get("n_buffers", 10))) @@ -103,6 +105,153 @@ def get_device_count(cls) -> int: except Exception: return -1 + @classmethod + def quick_ping(cls, index: int, *_args, **_kwargs) -> bool: + """ + Cheap presence test for CameraFactory probing. + Uses update_device_list() then bounds-check. + """ + if not ARAVIS_AVAILABLE: + return False + try: + Aravis.update_device_list() + n = int(Aravis.get_n_devices() or 0) + return 0 <= int(index) < n + except Exception: + return False + + @classmethod + def discover_devices(cls, max_devices: int = 10, should_cancel=None, progress_cb=None): + if not ARAVIS_AVAILABLE: + return [] + + # Refresh list once; indices may change after update_device_list() + Aravis.update_device_list() + + snap = cls._arv_snapshot_devices(limit=max_devices) + + cams: list[DetectedCamera] = [] + for d in snap: + if should_cancel and should_cancel(): + break + if progress_cb: + progress_cb(f"Found {d['label']}") + + path = d.get("physical_id") or d.get("address") + + cams.append( + DetectedCamera( + index=int(d["index"]), + label=str(d["label"]), + device_id=d.get("device_id"), + path=path, + ) + ) + return cams + + @classmethod + def rebind_settings(cls, settings: CameraSettings) -> CameraSettings: + """ + Best-effort quick rebind using only Aravis enumeration APIs (no camera open). + Indices may change after Aravis.update_device_list(). + """ + if not ARAVIS_AVAILABLE: + return settings + + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(cls.OPTIONS_KEY, {}) if isinstance(props.get(cls.OPTIONS_KEY), dict) else {} + + # Stored identifiers (some may be missing) + stored_device_id = cls._safe_str( + ns.get("device_id") or props.get("device_id") or ns.get("camera_id") or props.get("camera_id") + ) + stored_physical = cls._safe_str( + ns.get("device_physical_id") or ns.get("device_path") or props.get("device_path") + ) + stored_vendor = cls._safe_str(ns.get("device_vendor")) + stored_model = cls._safe_str(ns.get("device_model")) + stored_serial = cls._safe_str(ns.get("device_serial_nbr") or ns.get("device_serial")) + stored_name = cls._safe_str(ns.get("device_name")) + + # Nothing to rebind with + if not any( + [stored_device_id, stored_physical, (stored_vendor and stored_model and stored_serial), stored_name] + ): + return settings + + try: + Aravis.update_device_list() # must be called before get_device_* + snap = cls._arv_snapshot_devices(limit=None) + + # 1) device_id exact match (fast) + chosen = None + if stored_device_id: + for d in snap: + if d.get("device_id") == stored_device_id: + chosen = d + break + + # 2) physical_id exact match + if chosen is None and stored_physical: + for d in snap: + if d.get("physical_id") == stored_physical or d.get("address") == stored_physical: + chosen = d + break + + # 3) vendor/model/serial exact triple match + if chosen is None and stored_vendor and stored_model and stored_serial: + for d in snap: + if (d.get("vendor"), d.get("model"), d.get("serial")) == ( + stored_vendor, + stored_model, + stored_serial, + ): + chosen = d + break + + # 4) name substring match against computed label + if chosen is None and stored_name: + needle = stored_name.lower() + for d in snap: + label = (d.get("label") or "").lower() + if needle and needle in label: + chosen = d + break + + # 5) fallback to current index if still plausible + if chosen is None: + idx = int(getattr(settings, "index", 0) or 0) + if 0 <= idx < len(snap): + chosen = snap[idx] + else: + return settings + + # Apply new index + settings.index = int(chosen["index"]) + + # Refresh namespace fields (keeps GUI stable identity fresh) + if isinstance(settings.properties, dict): + out = settings.properties.setdefault(cls.OPTIONS_KEY, {}) + if isinstance(out, dict): + out["device_id"] = chosen.get("device_id") + out["device_physical_id"] = chosen.get("physical_id") + out["device_vendor"] = chosen.get("vendor") + out["device_model"] = chosen.get("model") + out["device_serial_nbr"] = chosen.get("serial") + out["device_protocol"] = chosen.get("protocol") + out["device_address"] = chosen.get("address") + out["device_name"] = chosen.get("label") # computed label (no open) + + # also keep 'device_path' aligned with physical id for GUI fallback + if chosen.get("physical_id"): + out["device_path"] = chosen.get("physical_id") + + return settings + + except Exception: + # Never hard-fail creation just because rebinding couldn't happen + return settings + def open(self) -> None: if not ARAVIS_AVAILABLE: raise RuntimeError("Aravis library not available") @@ -120,11 +269,68 @@ def open(self) -> None: raise RuntimeError(f"Camera index {index} out of range for {n_devices} Aravis device(s)") camera_id = Aravis.get_device_id(index) self._camera = Aravis.Camera.new(camera_id) + self._camera_id = self._safe_str(camera_id) if self._camera is None: raise RuntimeError("Failed to open Aravis camera") + # --- Refresh identity and align index (best-effort, no heavy open needed) --- + try: + snap = self._arv_snapshot_devices(limit=None) + + opened_id = self._camera_id + if opened_id is None: + # Opened by index + try: + opened_id = self._safe_str(Aravis.get_device_id(int(self.settings.index))) + except Exception: + opened_id = None + + chosen = None + if opened_id: + for d in snap: + if d.get("device_id") == opened_id: + chosen = d + break + + # If we found it, align settings.index and refresh identity cache + if chosen: + self.settings.index = int(chosen["index"]) + if isinstance(self.settings.properties, dict): + ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {}) + if isinstance(ns, dict): + ns["device_id"] = chosen.get("device_id") + ns["device_physical_id"] = chosen.get("physical_id") + ns["device_vendor"] = chosen.get("vendor") + ns["device_model"] = chosen.get("model") + ns["device_serial_nbr"] = chosen.get("serial") + ns["device_protocol"] = chosen.get("protocol") + ns["device_address"] = chosen.get("address") + ns["device_path"] = chosen.get("physical_id") or chosen.get("address") + else: + if isinstance(self.settings.properties, dict): + ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {}) + if isinstance(ns, dict): + ns["device_id"] = opened_id + except Exception: + pass + + # Compute higher-quality label from the opened camera object self._device_label = self._resolve_device_label() + # Always populate minimal identity into backend namespace for GUI + if isinstance(self.settings.properties, dict): + ns = self.settings.properties.setdefault(self.OPTIONS_KEY, {}) + if isinstance(ns, dict): + # Always write a device_id after a successful open + try: + if self._camera_id: + ns["device_id"] = self._camera_id + else: + ns["device_id"] = self._safe_str(Aravis.get_device_id(int(self.settings.index))) + except Exception: + pass + if self._device_label: + ns["device_name"] = self._device_label self._configure_pixel_format() self._configure_resolution() @@ -261,6 +467,81 @@ def device_name(self) -> str: # ------------------------------------------------------------------ # Configuration helpers # ------------------------------------------------------------------ + @staticmethod + def _safe_str(x) -> str | None: + try: + if x is None: + return None + s = str(x).strip() + return s if s else None + except Exception: + return None + + @classmethod + def _arv_snapshot_devices(cls, limit: int | None = None) -> list[dict]: + """ + Fast snapshot of the current Aravis device list without opening cameras. + Requires Aravis.update_device_list() before calling. + """ + n = int(Aravis.get_n_devices() or 0) # valid until next update_device_list() + if limit is not None: + n = min(n, int(limit)) + + devices: list[dict] = [] + for i in range(n): + try: + dev_id = cls._safe_str(Aravis.get_device_id(i)) + except Exception: + dev_id = None + + try: + physical = cls._safe_str(Aravis.get_device_physical_id(i)) + except Exception: + physical = None + try: + vendor = cls._safe_str(Aravis.get_device_vendor(i)) + except Exception: + vendor = None + try: + model = cls._safe_str(Aravis.get_device_model(i)) + except Exception: + model = None + try: + serial = cls._safe_str(Aravis.get_device_serial_nbr(i)) + except Exception: + serial = None + try: + protocol = cls._safe_str(Aravis.get_device_protocol(i)) + except Exception: + protocol = None + try: + address = cls._safe_str(Aravis.get_device_address(i)) + except Exception: + address = None + + # Construct a stable-ish human label without opening the camera + label_parts = [p for p in (vendor, model) if p] + label = " ".join(label_parts) if label_parts else None + if serial: + label = f"{label} ({serial})" if label else f"({serial})" + if not label: + label = dev_id or f"Aravis #{i}" + + devices.append( + { + "index": int(i), + "device_id": dev_id, + "physical_id": physical, + "vendor": vendor, + "model": model, + "serial": serial, + "protocol": protocol, + "address": address, + "label": label, + } + ) + return devices + def _get_requested_resolution_or_none(self) -> tuple[int, int] | None: """ Return (w, h) if user explicitly requested a resolution. diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index 3175bf5..c74c0ab 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -44,20 +44,56 @@ class GenTLCameraBackend(CameraBackend): def __init__(self, settings): super().__init__(settings) + # --- Properties namespace handling (new UI stores backend options under properties["gentl"]) --- props = settings.properties if isinstance(settings.properties, dict) else {} ns = props.get(self.OPTIONS_KEY, {}) if not isinstance(ns, dict): ns = {} + # --- CTI / transport configuration --- self._cti_file: str | None = ns.get("cti_file") or props.get("cti_file") - self._serial_number: str | None = ( - ns.get("serial_number") or ns.get("serial") or props.get("serial_number") or props.get("serial") + self._cti_search_paths: tuple[str, ...] = self._parse_cti_paths( + ns.get("cti_search_paths", props.get("cti_search_paths")) ) + + # --- Fast probe mode (CameraProbeWorker sets this) --- + # When fast_start=True, open() should avoid starting acquisition if possible. + self._fast_start: bool = bool(ns.get("fast_start", False)) + + # --- Stable identity / serial selection --- + # New UI stores stable identity as ns["device_id"], with recommended formats: + # - "serial:" for true serials + # - "fp:" when serial is missing/ambiguous + # + # We keep legacy "serial_number"/"serial" behavior as fallback. + raw_device_id = ns.get("device_id") or props.get("device_id") + legacy_serial = ns.get("serial_number") or ns.get("serial") or props.get("serial_number") or props.get("serial") + + self._device_id: str | None = str(raw_device_id).strip() if raw_device_id else None + + # Decide what to use for actual device selection in open(): + # - If device_id is "serial:XXXX" -> use XXXX as serial_number + # - Otherwise, keep legacy serial if present; open() may still use index if serial is None + self._serial_number: str | None = None + if self._device_id: + did = self._device_id + if did.startswith("serial:"): + self._serial_number = did.split("serial:", 1)[1].strip() or None + elif did.startswith("fp:"): + # fingerprint: not directly usable as serial; rebind_settings should map fp -> index + self._serial_number = legacy_serial # keep legacy if any, otherwise None + else: + # If device_id is provided without prefix, treat it as a "serial-like" value for backward compatibility + self._serial_number = did + else: + self._serial_number = str(legacy_serial).strip() if legacy_serial else None + + # --- Pixel format / image transforms (legacy + backend options) --- self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "Mono8") self._rotate: int = int(ns.get("rotate", props.get("rotate", 0))) % 360 self._crop: tuple[int, int, int, int] | None = self._parse_crop(ns.get("crop", props.get("crop"))) - # Exposure / Gain: 0 means Auto (do not set) + # --- Exposure / Gain: 0 means Auto (do not set) --- exp_val = getattr(settings, "exposure", 0) gain_val = getattr(settings, "gain", 0.0) @@ -81,21 +117,21 @@ def __init__(self, settings): except Exception: self._gain = None + # --- Acquisition timeout --- self._timeout: float = float(ns.get("timeout", props.get("timeout", 2.0))) - self._cti_search_paths: tuple[str, ...] = self._parse_cti_paths( - ns.get("cti_search_paths", props.get("cti_search_paths")) - ) - # Resolution request (None = device default) + # --- Resolution request (None = device default / Auto) --- + # Uses settings.width/settings.height if set; falls back to legacy props["resolution"] if present. self._requested_resolution: tuple[int, int] | None = self._get_requested_resolution_or_none() - # Actuals for GUI + # --- Actuals for GUI --- self._actual_width: int | None = None self._actual_height: int | None = None self._actual_fps: float | None = None self._actual_gain: float | None = None self._actual_exposure: float | None = None + # --- Harvesters resources --- self._harvester = None self._acquirer = None self._device_label: str | None = None @@ -169,74 +205,180 @@ def open(self) -> None: "The 'harvesters' package is required for the GenTL backend. Install it via 'pip install harvesters'." ) + # Ensure properties namespace exists for persistence back to UI + if not isinstance(self.settings.properties, dict): + self.settings.properties = {} + props = self.settings.properties + ns = props.get(self.OPTIONS_KEY, {}) + if not isinstance(ns, dict): + ns = {} + props[self.OPTIONS_KEY] = ns + self._harvester = Harvester() - cti_file = self._cti_file or self._find_cti_file() + + # Resolve CTI file: explicit > configured > search + cti_file = self._cti_file or ns.get("cti_file") or props.get("cti_file") or self._find_cti_file() self._harvester.add_file(cti_file) self._harvester.update() if not self._harvester.device_info_list: raise RuntimeError("No GenTL cameras detected via Harvesters") - serial = self._serial_number - index = int(self.settings.index or 0) - if serial: - available = self._available_serials() - matches = [s for s in available if serial in s] - if not matches: - raise RuntimeError(f"Camera with serial '{serial}' not found. Available cameras: {available}") - serial = matches[0] - else: - device_count = len(self._harvester.device_info_list) - if index < 0 or index >= device_count: - raise RuntimeError(f"Camera index {index} out of range for {device_count} GenTL device(s)") + infos = list(self._harvester.device_info_list) + + # Helper: robustly read device_info fields (supports dict-like or attribute-like entries) + def _info_get(info, key: str, default=None): + try: + if hasattr(info, "get"): + v = info.get(key) # type: ignore[attr-defined] + if v is not None: + return v + except Exception: + pass + try: + v = getattr(info, key, None) + if v is not None: + return v + except Exception: + pass + return default + + # ------------------------------------------------------------------ + # Device selection (stable device_id > serial > index) + # ------------------------------------------------------------------ + requested_index = int(self.settings.index or 0) + selected_index: int | None = None + selected_serial: str | None = None + + # 1) Try stable device_id first (supports "serial:..." and "fp:...") + target_device_id = self._device_id or ns.get("device_id") or props.get("device_id") + if target_device_id: + target_device_id = str(target_device_id).strip() + + # Match exact against computed device_id_from_info(info) + for idx, info in enumerate(infos): + try: + did = self._device_id_from_info(info) + except Exception: + did = None + if did and did == target_device_id: + selected_index = idx + selected_serial = _info_get(info, "serial_number", None) + selected_serial = str(selected_serial).strip() if selected_serial else None + break + + # If device_id is "serial:XXXX", match serial directly + if selected_index is None and target_device_id.startswith("serial:"): + serial_target = target_device_id.split("serial:", 1)[1].strip() + if serial_target: + exact = [] + for idx, info in enumerate(infos): + sn = _info_get(info, "serial_number", "") + sn = str(sn).strip() if sn is not None else "" + if sn == serial_target: + exact.append((idx, sn)) + if exact: + selected_index = exact[0][0] + selected_serial = exact[0][1] + else: + sub = [] + for idx, info in enumerate(infos): + sn = _info_get(info, "serial_number", "") + sn = str(sn).strip() if sn is not None else "" + if serial_target and serial_target in sn: + sub.append((idx, sn)) + if len(sub) == 1: + selected_index = sub[0][0] + selected_serial = sub[0][1] or None + elif len(sub) > 1: + candidates = [sn for _, sn in sub] + raise RuntimeError( + f"Ambiguous GenTL serial match for '{serial_target}'. Candidates: {candidates}" + ) - self._acquirer = self._create_acquirer(serial, index) + # 2) Try legacy serial selection if still not selected + if selected_index is None: + serial = self._serial_number + if serial: + serial = str(serial).strip() + exact = [] + for idx, info in enumerate(infos): + sn = _info_get(info, "serial_number", "") + sn = str(sn).strip() if sn is not None else "" + if sn == serial: + exact.append((idx, sn)) + if exact: + selected_index = exact[0][0] + selected_serial = exact[0][1] + else: + sub = [] + for idx, info in enumerate(infos): + sn = _info_get(info, "serial_number", "") + sn = str(sn).strip() if sn is not None else "" + if serial and serial in sn: + sub.append((idx, sn)) + if len(sub) == 1: + selected_index = sub[0][0] + selected_serial = sub[0][1] or None + elif len(sub) > 1: + candidates = [sn for _, sn in sub] + raise RuntimeError(f"Ambiguous GenTL serial match for '{serial}'. Candidates: {candidates}") + else: + available = [str(_info_get(i, "serial_number", "")).strip() for i in infos] + raise RuntimeError(f"Camera with serial '{serial}' not found. Available cameras: {available}") + + # 3) Fallback to index selection + if selected_index is None: + device_count = len(infos) + if requested_index < 0 or requested_index >= device_count: + raise RuntimeError(f"Camera index {requested_index} out of range for {device_count} GenTL device(s)") + selected_index = requested_index + sn = _info_get(infos[selected_index], "serial_number", "") + selected_serial = str(sn).strip() if sn else None + + # Update settings.index to the actual selected index (important for UI merge-back + stability) + self.settings.index = int(selected_index) + selected_info = infos[int(selected_index)] + + # ------------------------------------------------------------------ + # Create ImageAcquirer using the latest Harvesters API: Harvester.create(...) + # ------------------------------------------------------------------ + try: + if selected_serial: + self._acquirer = self._harvester.create({"serial_number": str(selected_serial)}) + else: + self._acquirer = self._harvester.create(int(selected_index)) + except TypeError: + # Some versions accept keyword argument; keep as a safety net without reintroducing legacy API. + if selected_serial: + self._acquirer = self._harvester.create({"serial_number": str(selected_serial)}) + else: + self._acquirer = self._harvester.create(index=int(selected_index)) remote = self._acquirer.remote_device node_map = remote.node_map - # print(dir(node_map)) - """ - ['AcquisitionBurstFrameCount', 'AcquisitionControl', 'AcquisitionFrameRate', 'AcquisitionMode', - 'AcquisitionStart', 'AcquisitionStop', 'AnalogControl', 'AutoFunctionsROI', 'AutoFunctionsROIEnable', - 'AutoFunctionsROIHeight', 'AutoFunctionsROILeft', 'AutoFunctionsROIPreset', 'AutoFunctionsROITop', - 'AutoFunctionsROIWidth', 'BinningHorizontal', 'BinningVertical', 'BlackLevel', 'CameraRegisterAddress', - 'CameraRegisterAddressSpace', 'CameraRegisterControl', 'CameraRegisterRead', 'CameraRegisterValue', - 'CameraRegisterWrite', 'Contrast', 'DecimationHorizontal', 'DecimationVertical', 'Denoise', - 'DeviceControl', 'DeviceFirmwareVersion', 'DeviceModelName', 'DeviceReset', 'DeviceSFNCVersionMajor', - 'DeviceSFNCVersionMinor', 'DeviceSFNCVersionSubMinor', 'DeviceScanType', 'DeviceSerialNumber', - 'DeviceTLType', 'DeviceTLVersionMajor', 'DeviceTLVersionMinor', 'DeviceTLVersionSubMinor', - 'DeviceTemperature', 'DeviceTemperatureSelector', 'DeviceType', 'DeviceUserID', 'DeviceVendorName', - 'DigitalIO', 'ExposureAuto', 'ExposureAutoHighlightReduction', 'ExposureAutoLowerLimit', - 'ExposureAutoReference', 'ExposureAutoUpperLimit', 'ExposureAutoUpperLimitAuto', 'ExposureTime', - 'GPIn', 'GPOut', 'Gain', 'GainAuto', 'GainAutoLowerLimit', 'GainAutoUpperLimit', 'Gamma', 'Height', - 'HeightMax', 'IMXLowLatencyTriggerMode', 'ImageFormatControl', 'OffsetAutoCenter', 'OffsetX', 'OffsetY', - 'PayloadSize', 'PixelFormat', 'ReverseX', 'ReverseY', 'Root', 'SensorHeight', 'SensorWidth', 'Sharpness', - 'ShowOverlay', 'SoftwareAnalogControl', 'SoftwareTransformControl', 'SoftwareTransformEnable', - 'StrobeDelay', 'StrobeDuration', 'StrobeEnable', 'StrobeOperation', 'StrobePolarity', 'TLParamsLocked', - 'TestControl', 'TestPendingAck', 'TimestampLatch', 'TimestampLatchValue', 'TimestampReset', 'ToneMappingAuto', - 'ToneMappingControl', 'ToneMappingEnable', 'ToneMappingGlobalBrightness', 'ToneMappingIntensity', - 'TransportLayerControl', 'TriggerActivation', 'TriggerDebouncer', 'TriggerDelay', 'TriggerDenoise', - 'TriggerMask', 'TriggerMode', 'TriggerOverlap', 'TriggerSelector', 'TriggerSoftware', 'TriggerSource', - 'UserSetControl', 'UserSetDefault', 'UserSetLoad', 'UserSetSave', 'UserSetSelector', 'Width', 'WidthMax'] - """ - + # Resolve human label for UI self._device_label = self._resolve_device_label(node_map) + # ------------------------------------------------------------------ + # Apply configuration (existing behavior) + # ------------------------------------------------------------------ self._configure_pixel_format(node_map) self._configure_resolution(node_map) self._configure_exposure(node_map) self._configure_gain(node_map) self._configure_frame_rate(node_map) - # Capture actual resolution even when using defaults + # ------------------------------------------------------------------ + # Capture "actual" telemetry for GUI (existing behavior) + # ------------------------------------------------------------------ try: self._actual_width = int(node_map.Width.value) self._actual_height = int(node_map.Height.value) except Exception: pass - # Capture actual FPS if available try: self._actual_fps = float(node_map.ResultingFrameRate.value) except Exception: @@ -252,8 +394,331 @@ def open(self) -> None: except Exception: self._actual_gain = None + # ------------------------------------------------------------------ + # Persist identity + richer device metadata back into settings for UI merge-back + # ------------------------------------------------------------------ + computed_id = None + try: + computed_id = self._device_id_from_info(selected_info) + except Exception: + computed_id = None + + if computed_id: + ns["device_id"] = computed_id + elif selected_serial: + ns["device_id"] = f"serial:{selected_serial}" + + # Canonical serial storage + if selected_serial: + ns["serial_number"] = str(selected_serial) + ns["device_serial_number"] = str(selected_serial) + + # UI-friendly name + if self._device_label: + ns["device_name"] = str(self._device_label) + + # Extra metadata from discovery info (helps debugging and stable identity fallbacks) + ns["device_display_name"] = str(_info_get(selected_info, "display_name", "") or "") + ns["device_info_id"] = str(_info_get(selected_info, "id_", "") or "") + ns["device_vendor"] = str(_info_get(selected_info, "vendor", "") or "") + ns["device_model"] = str(_info_get(selected_info, "model", "") or "") + ns["device_tl_type"] = str(_info_get(selected_info, "tl_type", "") or "") + ns["device_user_defined_name"] = str(_info_get(selected_info, "user_defined_name", "") or "") + ns["device_version"] = str(_info_get(selected_info, "version", "") or "") + ns["device_access_status"] = _info_get(selected_info, "access_status", None) + + # Preserve CTI used (useful for stable operation) + ns["cti_file"] = str(cti_file) + + # ------------------------------------------------------------------ + # Start streaming unless fast_start probe mode is requested + # ------------------------------------------------------------------ + if getattr(self, "_fast_start", False): + LOG.info("GenTL open() in fast_start probe mode: acquisition not started.") + return + self._acquirer.start() + @staticmethod + def _device_id_from_info(info) -> str | None: + """ + Build a stable-ish device identifier from Harvester device_info_list entries. + This helper supports both dict-like and attribute-like representations. + """ + + def _read(name: str): + # dict-like + try: + if hasattr(info, "get"): + v = info.get(name) # type: ignore[attr-defined] + if v is not None: + return v + except Exception: + pass + # attribute-like + try: + return getattr(info, name, None) + except Exception: + return None + + def _get(*names: str) -> str | None: + for n in names: + v = _read(n) + if v is None: + continue + s = str(v).strip() + if s: + return s + return None + + # Prefer serial if present (best stable key when available) + serial = _get("serial_number", "SerialNumber", "device_serial_number", "sn", "serial") + if serial: + return f"serial:{serial}" + + # Fallback components (best-effort; names may vary per producer) + vendor = _get("vendor", "vendor_name", "manufacturer", "DeviceVendorName") + model = _get("model", "model_name", "DeviceModelName") + user_id = _get("user_defined_name", "user_id", "DeviceUserID", "DeviceUserId", "device_user_id") + tl_type = _get("tl_type", "transport_layer_type", "DeviceTLType") + + unique = _get("id_", "id", "device_id", "uid", "guid", "mac_address", "interface_id", "display_name") + + parts = [] + for k, v in (("vendor", vendor), ("model", model), ("user", user_id), ("tl", tl_type), ("uid", unique)): + if v: + parts.append(f"{k}={v}") + + if not parts: + return None + + return "fp:" + "|".join(parts) + + @classmethod + def discover_devices( + cls, + *, + max_devices: int = 10, + should_cancel: callable[[], bool] | None = None, + progress_cb: callable[[str], None] | None = None, + ): + """ + Rich discovery path for CameraFactory.detect_cameras(). + Returns a list of DetectedCamera with device_id filled when possible. + """ + if Harvester is None: + return [] + + # Local import to avoid circulars at import time + from ..factory import DetectedCamera + + def _canceled() -> bool: + return bool(should_cancel and should_cancel()) + + harvester = None + try: + if progress_cb: + progress_cb("Initializing GenTL discovery…") + + harvester = Harvester() + + # Use default CTI search; we don't have per-camera settings here. + cti_file = cls._search_cti_file(cls._DEFAULT_CTI_PATTERNS) + if not cti_file: + if progress_cb: + progress_cb("No .cti found (GenTL producer missing).") + return [] + + harvester.add_file(cti_file) + harvester.update() + + infos = list(harvester.device_info_list or []) + if not infos: + return [] + + out: list[DetectedCamera] = [] + limit = min(len(infos), max_devices if max_devices > 0 else len(infos)) + + for idx in range(limit): + if _canceled(): + break + + # Create a label for the UI, using display_name if available, otherwise vendor/model/serial. + info = infos[idx] + display_name = None + try: + display_name = ( + info.get("display_name") if hasattr(info, "get") else getattr(info, "display_name", None) + ) + except Exception: + display_name = None + + if display_name: + label = str(display_name).strip() + else: + vendor = ( + getattr(info, "vendor", None) or (info.get("vendor") if hasattr(info, "get") else None) or "" + ) + model = getattr(info, "model", None) or (info.get("model") if hasattr(info, "get") else None) or "" + serial = ( + getattr(info, "serial_number", None) + or (info.get("serial_number") if hasattr(info, "get") else None) + or "" + ) + vendor = str(vendor).strip() + model = str(model).strip() + serial = str(serial).strip() + + label = f"{vendor} {model}".strip() if (vendor or model) else f"GenTL device {idx}" + if serial: + label = f"{label} ({serial})" + + device_id = cls._device_id_from_info(info) + + out.append( + DetectedCamera( + index=idx, + label=label, + device_id=device_id, + # GenTL usually doesn't expose vid/pid/path consistently; leave None unless you have it + vid=None, + pid=None, + path=None, + backend_hint=None, + ) + ) + + if progress_cb: + progress_cb(f"Found: {label}") + + out.sort(key=lambda c: c.index) + return out + + except Exception: + # Returning None would trigger probing fallback; but since you declared discovery supported, + # returning [] is usually less surprising than a slow probe storm. + return [] + finally: + if harvester is not None: + try: + harvester.reset() + except Exception: + pass + + @classmethod + def rebind_settings(cls, settings): + """ + If a stable identity exists in settings.properties['gentl'], map it to the + correct current index (and serial_number if available). + """ + if Harvester is None: + return settings + + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(cls.OPTIONS_KEY, {}) + if not isinstance(ns, dict): + ns = {} + + target_id = ns.get("device_id") or ns.get("serial_number") or ns.get("serial") + if not target_id: + return settings + + harvester = None + try: + harvester = Harvester() + cti_file = ns.get("cti_file") or props.get("cti_file") or cls._search_cti_file(cls._DEFAULT_CTI_PATTERNS) + if not cti_file: + return settings + + harvester.add_file(cti_file) + harvester.update() + + infos = list(harvester.device_info_list or []) + if not infos: + return settings + + # Try exact match by computed device_id first + match_index = None + match_serial = None + + # Normalize + target_id_str = str(target_id).strip() + + for idx, info in enumerate(infos): + dev_id = cls._device_id_from_info(info) + if dev_id and dev_id == target_id_str: + match_index = idx + match_serial = getattr(info, "serial_number", None) + break + + # If not found, fallback: treat target as serial-ish substring (legacy behavior) + if match_index is None: + for idx, info in enumerate(infos): + serial = getattr(info, "serial_number", None) + if serial and target_id_str in str(serial): + match_index = idx + match_serial = serial + break + + if match_index is None: + return settings + + # Apply rebinding + settings.index = int(match_index) + + # Keep namespace consistent for open() + if not isinstance(settings.properties, dict): + settings.properties = {} + ns2 = settings.properties.setdefault(cls.OPTIONS_KEY, {}) + if not isinstance(ns2, dict): + ns2 = {} + settings.properties[cls.OPTIONS_KEY] = ns2 + + # If we got a serial, save it for open() selection (backward compatible) + if match_serial: + ns2["serial_number"] = str(match_serial) + ns2["device_id"] = target_id_str + + return settings + + except Exception: + # Any failure should not prevent fallback to index-based open + return settings + finally: + if harvester is not None: + try: + harvester.reset() + except Exception: + pass + + @classmethod + def quick_ping(cls, index: int, _unused=None) -> bool: + """ + Fast check: is there a device at this index according to Harvester? + Does not open/start acquisition. + """ + if Harvester is None: + return False + + harvester = None + try: + harvester = Harvester() + cti_file = cls._search_cti_file(cls._DEFAULT_CTI_PATTERNS) + if not cti_file: + return False + harvester.add_file(cti_file) + harvester.update() + infos = harvester.device_info_list or [] + return 0 <= int(index) < len(infos) + except Exception: + return False + finally: + if harvester is not None: + try: + harvester.reset() + except Exception: + pass + def read(self) -> tuple[np.ndarray, float]: if self._acquirer is None: raise RuntimeError("GenTL image acquirer not initialised") @@ -510,90 +975,6 @@ def _configure_pixel_format(self, node_map) -> None: except Exception as e: LOG.warning(f"Failed to set pixel format '{self._pixel_format}': {e}") - def _configure_resolution(self, node_map) -> None: - """Configure camera resolution (width and height).""" - if self._resolution is None: - return - - requested_width, requested_height = self._resolution - actual_width, actual_height = None, None - - # Try to set width - for width_attr in ("Width", "WidthMax"): - try: - node = getattr(node_map, width_attr) - if width_attr == "Width": - # Get constraints - try: - min_w = node.min - max_w = node.max - inc_w = getattr(node, "inc", 1) - # Adjust to valid value - width = self._adjust_to_increment(requested_width, min_w, max_w, inc_w) - if width != requested_width: - LOG.info( - f"Width adjusted from {requested_width} to {width} " - f"(min={min_w}, max={max_w}, inc={inc_w})" - ) - node.value = int(width) - actual_width = node.value - break - except Exception as e: - # Try setting without adjustment - try: - node.value = int(requested_width) - actual_width = node.value - break - except Exception: - LOG.warning(f"Failed to set width via {width_attr}: {e}") - continue - except AttributeError: - continue - - # Try to set height - for height_attr in ("Height", "HeightMax"): - try: - node = getattr(node_map, height_attr) - if height_attr == "Height": - # Get constraints - try: - min_h = node.min - max_h = node.max - inc_h = getattr(node, "inc", 1) - # Adjust to valid value - height = self._adjust_to_increment(requested_height, min_h, max_h, inc_h) - if height != requested_height: - LOG.info( - f"Height adjusted from {requested_height} to {height} " - f"(min={min_h}, max={max_h}, inc={inc_h})" - ) - node.value = int(height) - actual_height = node.value - break - except Exception as e: - # Try setting without adjustment - try: - node.value = int(requested_height) - actual_height = node.value - break - except Exception: - LOG.warning(f"Failed to set height via {height_attr}: {e}") - continue - except AttributeError: - continue - - # Log final resolution - if actual_width is not None and actual_height is not None: - if actual_width != requested_width or actual_height != requested_height: - LOG.warning( - f"Resolution mismatch: requested {requested_width}x{requested_height}, " - f"got {actual_width}x{actual_height}" - ) - else: - LOG.info(f"Resolution set to {actual_width}x{actual_height}") - else: - LOG.warning(f"Could not verify resolution setting (width={actual_width}, height={actual_height})") - def _configure_exposure(self, node_map) -> None: if self._exposure is None: return diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index f38bb43..e982e41 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -192,6 +192,7 @@ def __init__(self, config: ApplicationSettings | None = None): self._display_timer.start() # Show status message if myconfig.json was loaded + # FIXME @C-Achard deprecated behavior, remove later if self._config_path and self._config_path.name == "myconfig.json": self.statusBar().showMessage(f"Auto-loaded configuration from {self._config_path}", 5000) diff --git a/pyproject.toml b/pyproject.toml index 2b3c18a..8827b8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dev = [ "pytest-cov>=4.0", "pytest-mock>=3.10", "pytest-qt>=4.2", + "pytest-timeout>=2.0", "pre-commit", "hypothesis>=6.0", ] @@ -62,6 +63,7 @@ test = [ "pytest-cov>=4.0", "pytest-mock>=3.10", "pytest-qt>=4.2", + "pytest-timeout>=2.0", "hypothesis>=6.0", ] @@ -108,6 +110,7 @@ markers = [ "hardware: Tests that require specific hardware, notable camera backends", # "slow: Tests that take a long time to run", "gui: Tests that require GUI interaction", + "timeout: Test timeout in seconds (pytest-timeout)", ] [tool.coverage.run] diff --git a/tests/cameras/backends/conftest.py b/tests/cameras/backends/conftest.py index d140d4a..0fb3ded 100644 --- a/tests/cameras/backends/conftest.py +++ b/tests/cameras/backends/conftest.py @@ -1,7 +1,12 @@ # tests/cameras/backends/conftest.py +from __future__ import annotations + import importlib import os +from dataclasses import dataclass +from typing import Any +import numpy as np import pytest @@ -16,8 +21,25 @@ def _has_module(name: str) -> bool: return False -ARAVIS_AVAILABLE = _has_module("gi") # Aravis via GObject introspection -PYPYLON_AVAILABLE = _has_module("pypylon") # Basler pypylon SDK +def _has_aravis_gi() -> bool: + """ + GI can exist without the Aravis typelib. Be representative: + check that gi.repository.Aravis is importable and versionable. + """ + try: + import gi # type: ignore + + gi.require_version("Aravis", "0.8") + from gi.repository import Aravis # noqa: F401 + + return True + except Exception: + return False + + +ARAVIS_AVAILABLE = _has_aravis_gi() +PYPYLON_AVAILABLE = _has_module("pypylon") +HARVESTERS_AVAILABLE = _has_module("harvesters") # ----------------------------- @@ -108,13 +130,782 @@ def force_aravis_unavailable(monkeypatch): def force_pypylon_unavailable(monkeypatch): """ Force Basler/pypylon to be unavailable for error-path testing. + Basler backend availability is based on 'pylon is not None'. """ try: import dlclivegui.cameras.backends.basler_backend as bas except Exception: - # If the module doesn't exist in your tree, ignore. yield return - monkeypatch.setattr(bas, "PYPYLON_AVAILABLE", False, raising=False) + monkeypatch.setattr(bas, "pylon", None, raising=False) yield + + +# ----------------------------------------------------------------------------- +# Fake Aravis SDK (module-like) + fixtures +# ----------------------------------------------------------------------------- +class FakeAravis: + """Minimal fake Aravis module used for SDK-less unit/contract tests.""" + + class BufferStatus: + SUCCESS = "SUCCESS" + ERROR = "ERROR" + + PIXEL_FORMAT_MONO_8 = "MONO8" + PIXEL_FORMAT_MONO_12 = "MONO12" + PIXEL_FORMAT_MONO_16 = "MONO16" + PIXEL_FORMAT_RGB_8_PACKED = "RGB8" + PIXEL_FORMAT_BGR_8_PACKED = "BGR8" + + class Auto: + OFF = "OFF" + + # Mutable "device list" that tests can override + devices = ["dev0"] + + @classmethod + def update_device_list(cls): + pass + + @classmethod + def get_n_devices(cls) -> int: + return len(cls.devices) + + @classmethod + def get_device_id(cls, index: int) -> str: + return cls.devices[index] + + # Optional metadata used by snapshot/rebind logic (safe defaults) + @classmethod + def get_device_physical_id(cls, index: int) -> str: + return f"PHYS-{cls.devices[index]}" + + @classmethod + def get_device_vendor(cls, index: int) -> str: + return "FakeVendor" + + @classmethod + def get_device_model(cls, index: int) -> str: + return "FakeModel" + + @classmethod + def get_device_serial_nbr(cls, index: int) -> str: + return "12345" + + @classmethod + def get_device_protocol(cls, index: int) -> str: + return "FakeProtocol" + + @classmethod + def get_device_address(cls, index: int) -> str: + return f"ADDR-{index}" + + class Camera: + def __init__(self, device_id="dev0"): + self.device_id = device_id + self.pixel_format = None + self._exposure = 0.0 + self._gain = 0.0 + self._fps = 0.0 + self.payload = 100 + self.stream = None # should be a FakeAravisStream + + self._features_int = {"Width": 1920, "Height": 1080} + self._features_float = {"AcquisitionFrameRate": 30.0} + + @classmethod + def new(cls, device_id): + return cls(device_id) + + # GenICam-like int/float access + def set_integer(self, name: str, value: int): + self._features_int[name] = int(value) + + def get_integer(self, name: str) -> int: + return int(self._features_int[name]) + + def set_float(self, name: str, value: float): + self._features_float[name] = float(value) + + def get_float(self, name: str) -> float: + return float(self._features_float[name]) + + # Pixel format + def set_pixel_format(self, fmt): + self.pixel_format = fmt + + def set_pixel_format_from_string(self, s): + self.pixel_format = s + + # Exposure + def set_exposure_time_auto(self, mode): + pass + + def set_exposure_time(self, v): + self._exposure = float(v) + + def get_exposure_time(self): + return float(self._exposure) + + # Gain + def set_gain_auto(self, mode): + pass + + def set_gain(self, v): + self._gain = float(v) + + def get_gain(self): + return float(self._gain) + + # FPS + def set_frame_rate(self, v): + self._fps = float(v) + self._features_float["AcquisitionFrameRate"] = float(v) + + def get_frame_rate(self): + return float(self._fps) + + # Metadata + def get_model_name(self): + return "FakeModel" + + def get_vendor_name(self): + return "FakeVendor" + + def get_device_serial_number(self): + return "12345" + + # Streaming + def get_payload(self): + return int(self.payload) + + def create_stream(self, *_): + return self.stream + + def start_acquisition(self): + pass + + def stop_acquisition(self): + pass + + class Buffer: + def __init__(self, data, w, h, fmt, status="SUCCESS"): + self._data = data + self._w = w + self._h = h + self._fmt = fmt + self._status = status + + @classmethod + def new_allocate(cls, size): + # Placeholder buffer object for open() buffer queue + return object() + + def get_status(self): + return self._status + + def get_data(self): + return self._data + + def get_image_width(self): + return self._w + + def get_image_height(self): + return self._h + + def get_image_pixel_format(self): + return self._fmt + + +class FakeAravisStream: + def __init__(self, buffers): + self._buffers = list(buffers) + self.pushed = 0 + + def timeout_pop_buffer(self, timeout): + return self._buffers.pop(0) if self._buffers else None + + def try_pop_buffer(self): + return self._buffers.pop(0) if self._buffers else None + + def push_buffer(self, buf): + self.pushed += 1 + + +@pytest.fixture() +def fake_aravis_module(): + """ + Returns the FakeAravis 'module' and resets its mutable state for isolation. + Tests may mutate FakeAravis.devices safely. + """ + FakeAravis.devices = ["dev0"] + return FakeAravis + + +@pytest.fixture() +def patch_aravis_sdk(monkeypatch, fake_aravis_module): + """ + Patch the Aravis backend module so it behaves as if the SDK is installed, + but uses our FakeAravis implementation. + + Usage: + def test_something(patch_aravis_sdk): + ... # aravis backend sees ARAVIS_AVAILABLE=True and Aravis=FakeAravis + """ + import dlclivegui.cameras.backends.aravis_backend as ar + + monkeypatch.setattr(ar, "ARAVIS_AVAILABLE", True, raising=False) + monkeypatch.setattr(ar, "Aravis", fake_aravis_module, raising=False) + return fake_aravis_module + + +@pytest.fixture() +def fake_aravis_stream(): + """ + Small helper fixture to create a FakeAravisStream with a list of buffers. + """ + + def _make(buffers): + return FakeAravisStream(buffers) + + return _make + + +# ----------------------------------------------------------------------------- +# Fake Basler / pypylon SDK (module-like) + fixtures +# ----------------------------------------------------------------------------- + + +class FakePylon: + """Minimal fake for 'from pypylon import pylon' usage in basler_backend.""" + + # Constants used by Basler backend + GrabStrategy_LatestImageOnly = 1 + TimeoutHandling_ThrowException = 1 + PixelType_BGR8packed = 0x02180014 # arbitrary token + OutputBitAlignment_MsbAligned = 1 + + class _Feature: + def __init__(self, value=0): + self._value = value + + def SetValue(self, v): + self._value = v + + def GetValue(self): + return self._value + + class _DeviceInfo: + def __init__(self, serial: str): + self._serial = serial + + def GetSerialNumber(self): + return self._serial + + class _Device: + def __init__(self, info): + self.info = info + + class TlFactory: + _instance = None + + def __init__(self): + self._devices = [FakePylon._DeviceInfo("FAKE-BASLER-0")] + + @classmethod + def GetInstance(cls): + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def EnumerateDevices(self): + return list(self._devices) + + def CreateDevice(self, device_info): + return FakePylon._Device(device_info) + + class _GrabResult: + def __init__(self, ok=True, array=None): + self._ok = ok + self._array = array + + def GrabSucceeded(self): + return bool(self._ok) + + def Release(self): + return None + + class InstantCamera: + def __init__(self, device): + self._device = device + self._open = False + self._grabbing = False + + # Feature nodes the backend uses + self.ExposureTime = FakePylon._Feature(1000.0) + self.Gain = FakePylon._Feature(0.0) + self.Width = FakePylon._Feature(1920) + self.Height = FakePylon._Feature(1080) + + self.AcquisitionFrameRateEnable = FakePylon._Feature(False) + self.AcquisitionFrameRate = FakePylon._Feature(30.0) + + def Open(self): + self._open = True + + def Close(self): + self._open = False + + def IsOpen(self): + return bool(self._open) + + def StartGrabbing(self, *_args, **_kwargs): + self._grabbing = True + + def StopGrabbing(self): + self._grabbing = False + + def IsGrabbing(self): + return bool(self._grabbing) + + def RetrieveResult(self, *_args, **_kwargs): + # Always succeed with a small dummy image (BGR) + import numpy as np + + frame = np.zeros((10, 10, 3), dtype=np.uint8) + return FakePylon._GrabResult(ok=True, array=frame) + + class _ConvertedImage: + def __init__(self, array): + self._array = array + + def GetArray(self): + return self._array + + class ImageFormatConverter: + def __init__(self): + self.OutputPixelFormat = None + self.OutputBitAlignment = None + + def Convert(self, grab_result): + return FakePylon._ConvertedImage(grab_result._array) + + +@pytest.fixture() +def fake_pylon_module(): + """ + Returns the FakePylon 'module' and resets singleton devices for isolation. + """ + # reset singleton factory so devices list resets per test + FakePylon.TlFactory._instance = None + return FakePylon + + +@pytest.fixture() +def patch_basler_sdk(monkeypatch, fake_pylon_module): + """ + Patch Basler backend to behave as if pypylon is installed, using FakePylon. + """ + import dlclivegui.cameras.backends.basler_backend as bb + + monkeypatch.setattr(bb, "pylon", fake_pylon_module, raising=False) + return fake_pylon_module + + +# ----------------------------------------------------------------------------- +# Fake GenTL / harvesters SDK (SDK-free) + fixtures for strict lifecycle tests +# ----------------------------------------------------------------------------- + + +class FakeGenTLTimeoutException(TimeoutError): + """Fake timeout/error type used as HarvesterTimeoutError in backend tests.""" + + pass + + +def _info_get(info: Any, key: str, default=None): + """Read a device-info field from dict-like or attribute-like entries.""" + try: + if hasattr(info, "get"): + v = info.get(key) + if v is not None: + return v + except Exception: + pass + try: + v = getattr(info, key, None) + if v is not None: + return v + except Exception: + pass + return default + + +class _FakeNode: + """Minimal GenICam node: .value plus optional constraints and symbolics.""" + + def __init__(self, value=None, *, min=None, max=None, inc=1, symbolics=None): + self.value = value + self.min = min + self.max = max + self.inc = inc + self.symbolics = symbolics or [] + + +class _FakeNodeMap: + """Node map with the attributes your GenTLCameraBackend touches.""" + + def __init__( + self, + *, + width=1920, + height=1080, + fps=30.0, + exposure=10000.0, + gain=0.0, + pixel_format="Mono8", + model="FakeGenTLModel", + serial="FAKE-GENTL-0", + display="FakeGenTLDisplay", + ): + # Label fields used by _resolve_device_label() + self.DeviceModelName = _FakeNode(model) + self.DeviceSerialNumber = _FakeNode(serial) + self.DeviceDisplayName = _FakeNode(display) + + # Pixel format node + self.PixelFormat = _FakeNode( + pixel_format, + symbolics=["Mono8", "Mono16", "RGB8", "BGR8"], + ) + + # Width/Height constraints for increment alignment logic + self.Width = _FakeNode(int(width), min=64, max=4096, inc=2) + self.Height = _FakeNode(int(height), min=64, max=4096, inc=2) + + # FPS / actual fps + self.AcquisitionFrameRateEnable = _FakeNode(True) + self.AcquisitionFrameRate = _FakeNode(float(fps)) + self.ResultingFrameRate = _FakeNode(float(fps)) + + # Exposure / gain + self.ExposureAuto = _FakeNode("Off") + self.ExposureTime = _FakeNode(float(exposure)) + self.GainAuto = _FakeNode("Off") + self.Gain = _FakeNode(float(gain)) + + +class _FakeRemoteDevice: + def __init__(self, node_map: _FakeNodeMap): + self.node_map = node_map + + +class _FakeComponent: + """ + Component with .data, .width, .height like Harvesters component2D image. + Your backend does np.asarray(component.data) and reshape using height/width. + """ + + def __init__(self, width: int, height: int, channels: int, dtype=np.uint8): + self.width = int(width) + self.height = int(height) + self._channels = int(channels) + + n = self.width * self.height * self._channels + if dtype == np.uint8: + arr = (np.arange(n) % 255).astype(np.uint8) + else: + arr = (np.arange(n) % 65535).astype(np.uint16) + self.data = arr + + +class _FakePayload: + def __init__(self, component: _FakeComponent): + self.components = [component] + + +class _FakeFetchedBufferCtx: + """Context manager returned by fetch(). Must have .payload.""" + + def __init__(self, payload: _FakePayload): + self.payload = payload + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +@dataclass +class FakeImageAcquirer: + """ + Minimal ImageAcquirer: + - remote_device.node_map + - node_map shortcut (backend uses self._acquirer.node_map in read()) + - start/stop/destroy + - fetch(timeout=...) -> ctx manager yielding buffer-like object + Strict rule: fetch fails unless started=True. + """ + + serial: str = "FAKE-GENTL-0" + width: int = 1920 + height: int = 1080 + pixel_format: str = "Mono8" + + def __post_init__(self): + self.remote_device = _FakeRemoteDevice( + _FakeNodeMap(width=self.width, height=self.height, pixel_format=self.pixel_format, serial=self.serial) + ) + self.node_map = self.remote_device.node_map + + self._started = False + self._destroyed = False + self._queue: list[_FakePayload] = [] + + # Call tracing + self.start_calls = 0 + self.stop_calls = 0 + self.destroy_calls = 0 + self.fetch_calls: list[float] = [] + + # Prepare one default frame + self._enqueue_default_frame() + + def _enqueue_default_frame(self): + pf = str(self.node_map.PixelFormat.value or "Mono8") + if pf in ("RGB8", "BGR8"): + channels, dtype = 3, np.uint8 + elif pf == "Mono16": + channels, dtype = 1, np.uint16 + else: + channels, dtype = 1, np.uint8 + + comp = _FakeComponent(self.node_map.Width.value, self.node_map.Height.value, channels, dtype=dtype) + self._queue.append(_FakePayload(comp)) + + def start(self): + self.start_calls += 1 + self._started = True + + def stop(self): + self.stop_calls += 1 + self._started = False + + def destroy(self): + self.destroy_calls += 1 + self._destroyed = True + + def fetch(self, timeout: float = 2.0): + self.fetch_calls.append(float(timeout)) + + # Strict rule: cannot fetch unless started + if not self._started: + raise FakeGenTLTimeoutException("fetch called while not started") + + if not self._queue: + raise FakeGenTLTimeoutException(f"timeout after {timeout}s") + + payload = self._queue.pop(0) + return _FakeFetchedBufferCtx(payload) + + +class FakeHarvester: + """ + Minimal Harvester: + - add_file/update/reset + - device_info_list + - create(index) or create({"serial_number": ...}) + Inventory-driven so tests can control enumeration. + """ + + def __init__(self, inventory: list[dict[str, Any]] | None = None): + self._files: list[str] = [] + self._inventory: list[dict[str, Any]] = list(inventory or []) + self.device_info_list: list[Any] = [] + + # Call tracing + self.add_file_calls: list[str] = [] + self.update_calls = 0 + self.reset_calls = 0 + self.create_calls: list[Any] = [] + + def add_file(self, file_path: str): + self._files.append(str(file_path)) + self.add_file_calls.append(str(file_path)) + + def update(self): + self.update_calls += 1 + # If not provided, default to a single fake device + if not self._inventory: + self._inventory = [ + { + "display_name": "TLSimuMono (FAKE-GENTL-0)", + "model": "FakeGenTLModel", + "vendor": "FakeVendor", + "serial_number": "FAKE-GENTL-0", + "id_": "FakeDeviceId", + "tl_type": "Custom", + "user_defined_name": "Center", + "version": "1.0.0", + "access_status": 1000, + } + ] + self.device_info_list = list(self._inventory) + + def reset(self): + self.reset_calls += 1 + self.device_info_list = [] + self._files = [] + + def create(self, selector=None, index: int | None = None, *args, **kwargs): + # Record call for verification + self.create_calls.append({"selector": selector, "index": index, "args": args, "kwargs": kwargs}) + + if not self.device_info_list: + self.update() + + serial = None + if isinstance(selector, dict): + serial = selector.get("serial_number") + + if serial is None: + if index is None: + # allow create(0) style + if isinstance(selector, int): + index = selector + else: + index = 0 + if index < 0 or index >= len(self.device_info_list): + raise RuntimeError("Index out of range") + info = self.device_info_list[index] + serial = str(_info_get(info, "serial_number", "FAKE-GENTL-0")) + + return FakeImageAcquirer(serial=str(serial)) + + # Keep compatibility if anything uses the older name + def create_image_acquirer(self, *args, **kwargs): + return self.create(*args, **kwargs) + + +# ----------------------------------------------------------------------------- +# GentL fixtures: inventory, patching, settings factory +# ----------------------------------------------------------------------------- + + +@pytest.fixture() +def gentl_inventory(): + """ + Mutable inventory list used by FakeHarvester.update(). + Tests can replace contents to simulate multiple devices, ambiguity, missing fields, etc. + """ + inv: list[dict[str, Any]] = [ + { + "display_name": "TLSimuMono (FAKE-GENTL-0)", + "model": "FakeGenTLModel", + "vendor": "FakeVendor", + "serial_number": "FAKE-GENTL-0", + "id_": "FakeDeviceId", + "tl_type": "Custom", + "user_defined_name": "Center", + "version": "1.0.0", + "access_status": 1000, + } + ] + return inv + + +@pytest.fixture() +def fake_harvester_factory(gentl_inventory): + """ + Factory that returns a FakeHarvester bound to the current gentl_inventory. + Allows tests to mutate gentl_inventory before calling backend.open(). + """ + + def _make(): + return FakeHarvester(inventory=gentl_inventory) + + return _make + + +@pytest.fixture() +def patch_gentl_sdk(monkeypatch, fake_harvester_factory): + """ + Patch dlclivegui.cameras.backends.gentl_backend to use FakeHarvester + Fake timeout. + Also bypass CTI search logic so tests never hit filesystem/SDK paths. + """ + import dlclivegui.cameras.backends.gentl_backend as gb + + # Patch Harvester symbol (the backend calls Harvester() directly) + monkeypatch.setattr(gb, "Harvester", lambda: fake_harvester_factory(), raising=False) + + # Keep your backend timeout contract as-is: it catches HarvesterTimeoutError + monkeypatch.setattr(gb, "HarvesterTimeoutError", FakeGenTLTimeoutException, raising=False) + + # Avoid filesystem CTI searching + monkeypatch.setattr(gb.GenTLCameraBackend, "_find_cti_file", lambda self: "dummy.cti", raising=False) + monkeypatch.setattr( + gb.GenTLCameraBackend, "_search_cti_file", staticmethod(lambda patterns: "dummy.cti"), raising=False + ) + + return gb + + +@pytest.fixture() +def gentl_settings_factory(): + """ + Convenience factory for CameraSettings for gentl backend tests. + """ + from dlclivegui.config import CameraSettings + + def _make( + *, + index=0, + name="TestCam", + width=0, + height=0, + fps=0.0, + exposure=0, + gain=0.0, + enabled=True, + properties=None, + ): + props = properties if isinstance(properties, dict) else {} + props.setdefault("gentl", {}) + return CameraSettings( + name=name, + index=index, + backend="gentl", + width=width, + height=height, + fps=fps, + exposure=exposure, + gain=gain, + enabled=enabled, + properties=props, + ) + + return _make + + +# ----------------------------------------------------------------------------- +# Generic patcher mapping fixture for test_generic_contracts.py +# ----------------------------------------------------------------------------- +@pytest.fixture() +def backend_sdk_patchers(patch_aravis_sdk, patch_basler_sdk, patch_gentl_sdk): + """ + Mapping from backend name -> patcher callable (best-effort SDK stubs). + + This fixture intentionally reuses existing per-backend patch fixtures + to avoid duplication. Patch side effects occur when this fixture is + requested (because patch_aravis_sdk is injected). + """ + return { + # Calling it is harmless; patching already applied by fixture injection. + "aravis": (lambda: patch_aravis_sdk), + "basler": (lambda: patch_basler_sdk), + "gentl": (lambda: patch_gentl_sdk), + # No patch needed: OpenCV is assumed present + # "opencv": None, + } diff --git a/tests/cameras/backends/test_aravis_backend.py b/tests/cameras/backends/test_aravis_backend.py index 719da89..797fd11 100644 --- a/tests/cameras/backends/test_aravis_backend.py +++ b/tests/cameras/backends/test_aravis_backend.py @@ -242,14 +242,12 @@ def make_backend(settings, buffers): @pytest.mark.unit -@pytest.mark.integration def test_device_name(): be, cam, s = make_backend(Settings(), []) assert be.device_name() == "FakeVendor FakeModel (12345)" @pytest.mark.unit -@pytest.mark.integration def test_read_mono8(): w, h = 4, 3 data = (np.arange(w * h) % 256).astype(np.uint8).tobytes() @@ -268,7 +266,6 @@ def test_read_mono8(): @pytest.mark.unit -@pytest.mark.integration def test_read_rgb8_converts_to_bgr(): w, h = 2, 1 # RGB: red=[255,0,0], green=[0,255,0] @@ -286,7 +283,6 @@ def test_read_rgb8_converts_to_bgr(): @pytest.mark.unit -@pytest.mark.integration def test_read_bgr8_passthrough(): w, h = 2, 1 data = np.array([10, 20, 30, 40, 50, 60], dtype=np.uint8).tobytes() @@ -301,7 +297,6 @@ def test_read_bgr8_passthrough(): @pytest.mark.unit -@pytest.mark.integration def test_read_mono16_scaling(): w, h = 3, 1 raw = np.array([0, 32768, 65535], dtype=np.uint16) @@ -320,7 +315,6 @@ def test_read_mono16_scaling(): @pytest.mark.unit -@pytest.mark.integration def test_read_unknown_format_fallback_to_mono8(): w, h = 2, 2 data = (np.arange(w * h) % 256).astype(np.uint8).tobytes() @@ -336,7 +330,6 @@ def test_read_unknown_format_fallback_to_mono8(): @pytest.mark.unit -@pytest.mark.integration def test_read_timeout_raises(): be, cam, s = make_backend(Settings(), []) with pytest.raises(TimeoutError): @@ -344,7 +337,6 @@ def test_read_timeout_raises(): @pytest.mark.unit -@pytest.mark.integration def test_read_status_error_raises_and_pushes_back(): w, h = 1, 1 data = b"\x00" @@ -357,7 +349,6 @@ def test_read_status_error_raises_and_pushes_back(): @pytest.mark.unit -@pytest.mark.integration def test_close_is_idempotent(): be, cam, s = make_backend(Settings(), []) be.close() @@ -370,7 +361,6 @@ def test_close_is_idempotent(): @pytest.mark.unit -@pytest.mark.integration def test_is_available_false_when_aravis_missing(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -380,7 +370,6 @@ def test_is_available_false_when_aravis_missing(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_get_device_count_when_unavailable(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -389,7 +378,6 @@ def test_get_device_count_when_unavailable(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_get_device_count_when_available(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -405,7 +393,6 @@ def test_get_device_count_when_available(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_open_index_out_of_range(monkeypatch): # Patch Aravis module inside backend fake = FakeAravis @@ -418,7 +405,6 @@ def test_open_index_out_of_range(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_open_success_pushes_initial_buffers_and_configures(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -466,7 +452,6 @@ def new_camera(device_id): @pytest.mark.unit -@pytest.mark.integration def test_open_device_default_resolution_sets_actual_resolution(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -495,7 +480,6 @@ def test_open_device_default_resolution_sets_actual_resolution(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_open_requested_resolution_applies_and_reports_actual(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar @@ -524,7 +508,6 @@ def test_open_requested_resolution_applies_and_reports_actual(monkeypatch): @pytest.mark.unit -@pytest.mark.integration def test_close_flushes_stream_and_clears_state(monkeypatch): import dlclivegui.cameras.backends.aravis_backend as ar diff --git a/tests/cameras/backends/test_generic_contracts.py b/tests/cameras/backends/test_generic_contracts.py new file mode 100644 index 0000000..1c8d770 --- /dev/null +++ b/tests/cameras/backends/test_generic_contracts.py @@ -0,0 +1,214 @@ +""" +Backend-agnostic contract tests for camera backends. + +Hard failures: +- backend registry / factory / discovery calls must not crash +- capabilities must be well-formed +- if backend is available, create() must return a usable backend object +- close() must be idempotent + +Soft signals (warnings): +- missing quick_ping / discover_devices / rebind_settings (helps future dev work) +- capability claims that do not match provided methods +""" + +from __future__ import annotations + +import warnings +from typing import Any + +import pytest + +from dlclivegui.cameras.factory import CameraFactory, DetectedCamera +from dlclivegui.config import CameraSettings + + +def _try_import_gui_apply_identity(): + try: + from dlclivegui.gui.camera_config_dialog import _apply_detected_identity # type: ignore + + return _apply_detected_identity + except Exception: + return None + + +def _minimal_settings(backend: str, index: int = 0, *, properties: dict[str, Any] | None = None) -> CameraSettings: + return CameraSettings( + name=f"ContractTest-{backend}", + backend=backend, + index=index, + properties=properties or {}, + enabled=True, + width=0, + height=0, + fps=0.0, + exposure=0, + gain=0.0, + rotation=0, + crop_x0=0, + crop_y0=0, + crop_x1=0, + crop_y1=0, + ) + + +@pytest.fixture(scope="module") +def all_registered_backends() -> list[str]: + return list(CameraFactory.backend_names()) + + +@pytest.mark.unit +def test_all_registered_backends_have_well_formed_capabilities(all_registered_backends): + for name in all_registered_backends: + caps = CameraFactory.backend_capabilities(name) + assert isinstance(caps, dict), f"{name}: capabilities must be a dict" + assert all(isinstance(k, str) for k in caps.keys()), f"{name}: capability keys must be str" + assert all(hasattr(v, "value") for v in caps.values()), f"{name}: capability values must be enum-like" + + +@pytest.mark.unit +def test_available_backends_map_is_well_formed(): + availability = CameraFactory.available_backends() + assert isinstance(availability, dict) + assert all(isinstance(k, str) for k in availability.keys()) + assert all(isinstance(v, bool) for v in availability.values()) + + +@pytest.mark.unit +def test_detect_cameras_is_safe_for_all_backends(all_registered_backends): + """ + Must never crash; it should return [] for unavailable SDKs. + """ + for name in all_registered_backends: + cams = CameraFactory.detect_cameras(name, max_devices=2) + assert isinstance(cams, list), f"{name}: detect_cameras must return a list" + for c in cams: + assert hasattr(c, "index") and hasattr(c, "label"), f"{name}: detected items must have index/label" + assert isinstance(c.index, int) + assert isinstance(c.label, str) + + +@pytest.mark.unit +def test_optional_accelerators_warn_if_missing(all_registered_backends): + """ + Non-failing warnings to encourage implementers to add helpful fast paths. + """ + for name in all_registered_backends: + # Resolve backend class indirectly by trying to create minimal settings only if available. + # We can still warn based on capabilities + expected methods. + caps = CameraFactory.backend_capabilities(name) + + # Determine if stable identity / discovery are claimed + stable_claim = getattr(caps.get("stable_identity", None), "value", None) + disco_claim = getattr(caps.get("device_discovery", None), "value", None) + + # Check method presence on backend class (best-effort) + try: + # This is internal-ish, but it’s the most direct way to inspect class methods. + backend_cls = CameraFactory._resolve_backend(name) # type: ignore[attr-defined] + except Exception: + # If backend can't resolve, that's a real issue, but it will be caught elsewhere. + continue + + missing = [] + if not hasattr(backend_cls, "quick_ping"): + missing.append("quick_ping") + if not hasattr(backend_cls, "discover_devices"): + missing.append("discover_devices") + if not hasattr(backend_cls, "rebind_settings"): + missing.append("rebind_settings") + + # Soft warnings: missing accelerators + if missing: + warnings.warn( + f"[backend-contract] {name}: missing optional accelerators: {', '.join(missing)}", + UserWarning, + stacklevel=2, + ) + + # Soft warnings: claimed capability but method missing + if stable_claim in ("supported", "best_effort") and not hasattr(backend_cls, "rebind_settings"): + warnings.warn( + f"[backend-contract] {name}: capabilities claim stable_identity={stable_claim} " + f"but rebind_settings() is missing", + UserWarning, + stacklevel=2, + ) + if disco_claim in ("supported", "best_effort") and not hasattr(backend_cls, "discover_devices"): + warnings.warn( + f"[backend-contract] {name}: capabilities claim device_discovery={disco_claim} " + f"but discover_devices() is missing", + UserWarning, + stacklevel=2, + ) + + +@pytest.mark.unit +def test_create_contract_for_available_backends(all_registered_backends, backend_sdk_patchers): + """ + If a backend is available (possibly after applying a fake SDK patch), create() should work. + If not available, create() should fail cleanly (factory raises). + """ + CameraFactory.available_backends() + + for name in all_registered_backends: + # Apply optional SDK stub/patch if provided (keeps this test file backend-agnostic) + patcher = backend_sdk_patchers.get(name) + if patcher: + patcher() + + availability = CameraFactory.available_backends() + is_avail = availability.get(name, False) + + settings = _minimal_settings(name, index=0) + + if not is_avail: + # Must fail cleanly if unavailable + with pytest.raises(RuntimeError): + CameraFactory.create(settings) + continue + + # Available -> must create successfully + be = CameraFactory.create(settings) + + # Minimal base contract + assert hasattr(be, "open") + assert hasattr(be, "read") + assert hasattr(be, "close") + assert callable(be.device_name) + + # close() should be idempotent even if never opened + be.close() + be.close() + + +@pytest.mark.unit +def test_gui_identity_helper_is_backend_agnostic(all_registered_backends): + """ + This checks the GUI helper itself is backend-agnostic, not that each backend populates identity. + We only validate that applying a DetectedCamera results in namespaced properties. + """ + apply_identity = _try_import_gui_apply_identity() + if apply_identity is None: + pytest.skip("GUI helpers not importable (PySide6 likely missing in test env).") + + for backend in all_registered_backends: + cam = _minimal_settings(backend, index=0, properties={}) + detected = DetectedCamera( + index=0, + label=f"Label-{backend}", + device_id=f"device-{backend}", + vid=0x1234, + pid=0x5678, + path=f"path-{backend}", + backend_hint=None, + ) + + apply_identity(cam, detected, backend) + + assert isinstance(cam.properties, dict) + assert backend in cam.properties + ns = cam.properties[backend] + assert isinstance(ns, dict) + assert ns.get("device_id") == detected.device_id + assert ns.get("device_name") == detected.label diff --git a/tests/cameras/backends/test_gentl_backend.py b/tests/cameras/backends/test_gentl_backend.py new file mode 100644 index 0000000..d31bc9c --- /dev/null +++ b/tests/cameras/backends/test_gentl_backend.py @@ -0,0 +1,531 @@ +# tests/cameras/backends/test_gentl_backend.py +from __future__ import annotations + +import types + +import numpy as np +import pytest + + +# --------------------------------------------------------------------- +# Core lifecycle + strict transaction model +# --------------------------------------------------------------------- +def test_open_starts_stream_and_read_returns_frame(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + be.open() + assert be._harvester is not None + assert be._acquirer is not None + + # Strict model validated via behavior: read must succeed after normal open() + frame, ts = be.read() + assert isinstance(ts, float) + assert isinstance(frame, np.ndarray) + assert frame.size > 0 + # Backend converts to BGR; ensure 3-channel output + assert frame.ndim == 3 and frame.shape[2] == 3 + + be.close() + assert be._harvester is None + assert be._acquirer is None + assert be._device_label is None + + +def test_fast_start_does_not_start_stream_and_read_times_out(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory(properties={"gentl": {"fast_start": True}}) + be = gb.GenTLCameraBackend(settings) + + be.open() + assert be._acquirer is not None + + # Strict model: fast_start -> open() does NOT start acquisition -> read must fail + with pytest.raises(TimeoutError): + be.read() + + be.close() + + +def test_close_is_idempotent(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + be = gb.GenTLCameraBackend(gentl_settings_factory()) + be.open() + be.close() + # Must not raise + be.close() + + +def test_stop_is_safe_before_open_and_after_close(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + # stop before open should not raise + be.stop() + + be.open() + + # stop should make acquisition unusable for strict fetch/read + be.stop() + with pytest.raises(TimeoutError): + be.read() + + be.close() + + # stop after close should not raise + be.stop() + + +def test_read_before_open_raises_runtimeerror(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + with pytest.raises(RuntimeError): + be.read() + + +# --------------------------------------------------------------------- +# Device selection + robustness (behavior/state based, minimal circularity) +# --------------------------------------------------------------------- +def test_device_id_exact_match_selects_correct_device_and_updates_index( + patch_gentl_sdk, gentl_settings_factory, gentl_inventory +): + gb = patch_gentl_sdk + + # Two devices; device_id targets SER1 => should bind to index 1 + gentl_inventory[:] = [ + { + "display_name": "Dev0 (SER0)", + "model": "M0", + "vendor": "V", + "serial_number": "SER0", + "id_": "ID0", + "tl_type": "Custom", + "user_defined_name": "U0", + "version": "1.0.0", + "access_status": 1000, + }, + { + "display_name": "Dev1 (SER1)", + "model": "M1", + "vendor": "V", + "serial_number": "SER1", + "id_": "ID1", + "tl_type": "Custom", + "user_defined_name": "U1", + "version": "1.0.0", + "access_status": 1000, + }, + ] + + settings = gentl_settings_factory(index=0, properties={"gentl": {"device_id": "serial:SER1"}}) + be = gb.GenTLCameraBackend(settings) + be.open() + + # Backend observable outcome: settings.index updated + assert int(be.settings.index) == 1 + + # Backend observable outcome: persisted identity and serial + ns = settings.properties.get("gentl", {}) + assert ns.get("device_id") == "serial:SER1" + assert ns.get("serial_number") == "SER1" + + be.close() + + +def test_ambiguous_serial_prefix_raises(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + {"display_name": "DevA", "serial_number": "ABC-1"}, + {"display_name": "DevB", "serial_number": "ABC-2"}, + ] + + settings = gentl_settings_factory(properties={"gentl": {"device_id": "serial:ABC"}}) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + +def test_open_index_out_of_range_raises(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [{"display_name": "OnlyDev", "serial_number": "SER0"}] + settings = gentl_settings_factory(index=5) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + +def test_missing_serial_produces_fingerprint_device_id(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + { + "display_name": "DevNoSerial", + "vendor": "V", + "model": "M", + "serial_number": "", # missing/blank + "id_": "ID-NO-SERIAL", + "tl_type": "Custom", + "user_defined_name": "U", + "version": "1.0.0", + "access_status": 1000, + } + ] + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + be.open() + + ns = settings.properties["gentl"] + assert isinstance(ns.get("device_id"), str) + assert ns["device_id"].startswith("fp:") + + # Rich metadata should still be persisted + assert ns.get("device_info_id") == "ID-NO-SERIAL" + assert ns.get("device_display_name") == "DevNoSerial" + + be.close() + + +# --------------------------------------------------------------------- +# Persistence contract (UI relies on these keys) +# --------------------------------------------------------------------- +def test_open_persists_rich_metadata_in_namespace(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + be.open() + + ns = settings.properties.get("gentl", {}) + assert isinstance(ns, dict) + + # Identity basics + assert "device_id" in ns and str(ns["device_id"]) + assert "cti_file" in ns and str(ns["cti_file"]) + + # Rich info keys (minimum contract) + for k in ( + "device_display_name", + "device_info_id", + "device_vendor", + "device_model", + "device_tl_type", + "device_user_defined_name", + "device_version", + "device_access_status", + ): + assert k in ns, f"Missing persisted key: {k}" + + # If serial exists, it should be persisted + assert ns.get("serial_number") + assert ns.get("device_serial_number") + + # device_name derived from node_map label resolution + assert ns.get("device_name") + + be.close() + + +def test_open_persists_cti_file_even_when_provided_in_props(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory(properties={"cti_file": "from-props.cti", "gentl": {}}) + be = gb.GenTLCameraBackend(settings) + be.open() + + ns = settings.properties["gentl"] + assert isinstance(ns.get("cti_file"), str) and ns["cti_file"] + + be.close() + + +# --------------------------------------------------------------------- +# Discovery / ping / rebind (still unit-only via patched SDK) +# --------------------------------------------------------------------- +def test_discover_devices_returns_device_id_and_label(patch_gentl_sdk): + gb = patch_gentl_sdk + + cams = gb.GenTLCameraBackend.discover_devices(max_devices=10) + assert isinstance(cams, list) + assert cams + + cam0 = cams[0] + assert getattr(cam0, "label", "") + assert getattr(cam0, "device_id", None) is not None + assert str(cam0.device_id).startswith(("serial:", "fp:")) + + +def test_discover_devices_prefers_display_name_for_label(patch_gentl_sdk, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + {"display_name": "Pretty Name (SERX)", "vendor": "V", "model": "M", "serial_number": "SERX", "id_": "IDX"} + ] + + cams = gb.GenTLCameraBackend.discover_devices(max_devices=10) + assert cams and cams[0].label == "Pretty Name (SERX)" + + +def test_quick_ping_true_for_existing_false_for_missing(patch_gentl_sdk, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [{"display_name": "Dev0", "serial_number": "SER0"}] + assert gb.GenTLCameraBackend.quick_ping(0) is True + assert gb.GenTLCameraBackend.quick_ping(1) is False + + +def test_rebind_settings_updates_index_using_device_id_with_attribute_entries( + patch_gentl_sdk, gentl_settings_factory, gentl_inventory +): + """ + rebind_settings has some getattr(...) usage; feed attribute-like entries to match that path. + """ + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + types.SimpleNamespace( + display_name="Dev0", serial_number="SER0", vendor="V", model="M0", id_="ID0", tl_type="T" + ), + types.SimpleNamespace( + display_name="Dev1", serial_number="SER1", vendor="V", model="M1", id_="ID1", tl_type="T" + ), + ] + + settings = gentl_settings_factory(index=0, properties={"gentl": {"device_id": "serial:SER1"}}) + out = gb.GenTLCameraBackend.rebind_settings(settings) + + assert int(out.index) == 1 + ns = out.properties.get("gentl", {}) + assert ns.get("device_id") == "serial:SER1" + + +def test_rebind_settings_no_device_id_no_change(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + {"display_name": "Dev0", "serial_number": "SER0"}, + {"display_name": "Dev1", "serial_number": "SER1"}, + ] + settings = gentl_settings_factory(index=1, properties={"gentl": {}}) + out = gb.GenTLCameraBackend.rebind_settings(settings) + + assert int(out.index) == 1 + + +# --------------------------------------------------------------------- +# _configure_* coverage (assert on node_map side effects, not logs) +# --------------------------------------------------------------------- +def test_resolution_auto_does_not_modify_node_dimensions(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory(width=0, height=0) # Auto + be = gb.GenTLCameraBackend(settings) + be.open() + + nm = be._acquirer.remote_device.node_map + assert int(nm.Width.value) == 1920 + assert int(nm.Height.value) == 1080 + + be.close() + + +def test_resolution_request_is_aligned_to_increment(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory(width=641, height=481) # odd -> should align + be = gb.GenTLCameraBackend(settings) + be.open() + + nm = be._acquirer.remote_device.node_map + assert int(nm.Width.value) % 2 == 0 + assert int(nm.Height.value) % 2 == 0 + + assert be.actual_resolution is not None + w, h = be.actual_resolution + assert w == int(nm.Width.value) + assert h == int(nm.Height.value) + + be.close() + + +def test_manual_exposure_gain_fps_are_applied_when_nonzero(patch_gentl_sdk, gentl_settings_factory): + """ + Covers _configure_exposure/_configure_gain/_configure_frame_rate success path. + """ + gb = patch_gentl_sdk + + settings = gentl_settings_factory(exposure=20000, gain=3.0, fps=50.0) + be = gb.GenTLCameraBackend(settings) + be.open() + + nm = be._acquirer.remote_device.node_map + assert float(nm.ExposureTime.value) == pytest.approx(20000.0) + assert float(nm.Gain.value) == pytest.approx(3.0) + assert float(nm.AcquisitionFrameRate.value) == pytest.approx(50.0) + + be.close() + + +def test_pixel_format_unavailable_does_not_crash_open_and_streams(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory(properties={"gentl": {"pixel_format": "NotAFormat"}}) + be = gb.GenTLCameraBackend(settings) + be.open() + + # No fake-internal checks; just verify it can read + frame, _ = be.read() + assert frame is not None and frame.size > 0 + + be.close() + + +# --------------------------------------------------------------------- +# Direct unit tests for _create_acquirer (fallback paths + error aggregation) +# --------------------------------------------------------------------- +def test__create_acquirer_prefers_create_serial_dict(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, selector=None, index=None): + # Return different sentinels depending on call form + if isinstance(selector, dict) and selector.get("serial_number") == "SERX": + return "ACQ_SERIAL_DICT" + raise RuntimeError("unexpected call") + + be._harvester = H() + acq = be._create_acquirer("SERX", 0) + assert acq == "ACQ_SERIAL_DICT" + + +def test__create_acquirer_index_kw_typeerror_falls_back_to_positional(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + # Simulate a Harvester that does NOT accept keyword index + if "index" in kwargs: + raise TypeError("index kw not supported") + # Positional index works + if len(args) == 1 and args[0] == 2: + return "ACQ_POS_INDEX" + raise RuntimeError("unexpected call") + + be._harvester = H() + acq = be._create_acquirer(None, 2) + assert acq == "ACQ_POS_INDEX" + + +def test__create_acquirer_falls_back_to_create_image_acquirer_when_create_fails( + patch_gentl_sdk, gentl_settings_factory +): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + raise RuntimeError("create fails") + + def create_image_acquirer(self, selector=None, index=None): + # Succeeds here + if isinstance(selector, dict) and selector.get("serial_number") == "SERX": + return "ACQ_CIA_SERIAL" + if index == 1: + return "ACQ_CIA_INDEX" + return "ACQ_CIA_OTHER" + + be._harvester = H() + acq = be._create_acquirer("SERX", 1) + assert acq == "ACQ_CIA_SERIAL" + + +def test__create_acquirer_uses_device_info_fallback_when_available(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + device_info_obj = {"serial_number": "SER0", "id_": "ID0"} + + class H: + device_info_list = [device_info_obj] + + def create(self, *args, **kwargs): + # Fail index, succeed if given device_info object + if "index" in kwargs or (len(args) == 1 and isinstance(args[0], int)): + raise RuntimeError("index path fails") + if len(args) == 1 and args[0] is device_info_obj: + return "ACQ_DEVICE_INFO" + raise RuntimeError("unexpected call") + + be._harvester = H() + acq = be._create_acquirer(None, 0) + assert acq == "ACQ_DEVICE_INFO" + + +def test__create_acquirer_tries_default_create_when_index0_and_no_serial(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + # Fail index attempts; succeed only on no-arg create() + if args or kwargs: + raise RuntimeError("only no-arg create works") + return "ACQ_DEFAULT" + + be._harvester = H() + acq = be._create_acquirer(None, 0) + assert acq == "ACQ_DEFAULT" + + +def test__create_acquirer_raises_runtimeerror_with_joined_errors(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + raise RuntimeError("create boom") + + def create_image_acquirer(self, *args, **kwargs): + raise RuntimeError("cia boom") + + be._harvester = H() + + with pytest.raises(RuntimeError) as ei: + be._create_acquirer("SERX", 0) + + # Error message should include some context about attempted creation methods + msg = str(ei.value).lower() + assert "failed to initialise gentl image acquirer" in msg