From 662ce4ff98db41eb96e786676117e05eef6dd6dc Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Fri, 1 May 2026 18:13:48 -0400 Subject: [PATCH 1/3] Add device management SDK and CLI --- CLI-COMMANDS.md | 28 ++ roboflow/adapters/devicesapi.py | 272 +++++++++++++++ roboflow/cli/__init__.py | 2 + roboflow/cli/handlers/device.py | 548 +++++++++++++++++++++++++++++++ roboflow/core/device.py | 158 +++++++++ roboflow/core/workspace.py | 75 ++++- tests/cli/test_device_handler.py | 195 +++++++++++ tests/test_device.py | 281 ++++++++++++++++ 8 files changed, 1558 insertions(+), 1 deletion(-) create mode 100644 roboflow/adapters/devicesapi.py create mode 100644 roboflow/cli/handlers/device.py create mode 100644 roboflow/core/device.py create mode 100644 tests/cli/test_device_handler.py create mode 100644 tests/test_device.py diff --git a/CLI-COMMANDS.md b/CLI-COMMANDS.md index a733a840..dabba66e 100644 --- a/CLI-COMMANDS.md +++ b/CLI-COMMANDS.md @@ -84,6 +84,33 @@ roboflow annotation job create -p my-project --name "Label round 1" \ --batch --num-images 100 --labeler a@co.com --reviewer b@co.com ``` +### RFDM devices (v2 deployments) + +Workspace-scoped device management — backed by the external Deployments API +(`/:workspace/devices/v2/*`). Read commands need the `device:read` scope on +your api_key; `create` needs `device:update`. + +```bash +roboflow device list +roboflow device get +roboflow device create "Factory floor cam" --type edge --tags floor-1,vision + +# Observe — config is sensitive (may include credentials). +roboflow device config +roboflow device config-history --limit 20 + +# Streams the device runs. +roboflow device streams +roboflow device stream + +# Logs (5 req/min/IP) and aggregated telemetry (60 req/min). +roboflow device logs --severity ERROR --limit 200 +roboflow device telemetry --time-period 7d + +# Lifecycle events (stream start/stop, errors, config changes…). +roboflow device events --entity-type stream --direction backward +``` + ### Workflows ```bash @@ -202,6 +229,7 @@ Version numbers are always numeric — that's how `x/y` is disambiguated between | `infer` | Run inference on images | | `search` | Search workspace images (RoboQL), export results | | `deployment` | Manage dedicated deployments | +| `device` | List, get, create, and observe RFDM devices (v2 deployment API) | | `workflow` | Manage workflows | | `folder` | Manage workspace folders | | `annotation` | Annotation batches and jobs | diff --git a/roboflow/adapters/devicesapi.py b/roboflow/adapters/devicesapi.py new file mode 100644 index 00000000..9632fa43 --- /dev/null +++ b/roboflow/adapters/devicesapi.py @@ -0,0 +1,272 @@ +"""Adapter for the workspace-scoped device management API. + +Wraps the read-only external observability endpoints plus device create +served by the ``light.v2.device`` Cloud Function. Routes are documented in +``docs/api/deployments/overview.md`` of the ``roboflow/roboflow`` repo. + +Read endpoints require the ``device:read`` scope; create requires +``device:update``. Authentication is via the workspace api_key. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urlencode + +import requests + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + + +class DeviceApiError(RoboflowError): + """Raised when a device API call returns a non-success status.""" + + def __init__(self, message: str, status_code: Optional[int] = None) -> None: + self.status_code = status_code + super().__init__(message) + + +class DeviceNotFoundError(DeviceApiError): + """404 — device or stream does not exist or is owned by a different workspace.""" + + +class DeviceAuthError(DeviceApiError): + """401/403 — missing key, wrong scope, or device-bound key targeting a sibling.""" + + +class DeviceRateLimitedError(DeviceApiError): + """429 — logs (5/min/IP) or telemetry (60/min) limit hit.""" + + +class DeviceBadRequestError(DeviceApiError): + """400 — malformed cursor, unparseable date, unknown ``time_period``.""" + + +def _build_url(workspace: str, path: str, api_key: str, query: Optional[Dict[str, Any]] = None) -> str: + base = f"{API_URL}/{workspace}/devices/v2{path}" + params: Dict[str, Any] = {"api_key": api_key} + if query: + for key, value in query.items(): + if value is None: + continue + if isinstance(value, list): + if not value: + continue + params[key] = ",".join(str(v) for v in value) + else: + params[key] = value + return f"{base}?{urlencode(params, doseq=False)}" + + +def _raise_for_status(response: requests.Response) -> None: + if response.status_code < 400: + return + error_type: Optional[str] = None + try: + payload = response.json() + err = payload.get("error") if isinstance(payload, dict) else None + if isinstance(err, dict): + message = err.get("message") or response.text + raw_type = err.get("type") + error_type = raw_type if isinstance(raw_type, str) else None + elif isinstance(err, str): + message = err + else: + message = response.text + except Exception: # noqa: BLE001 + message = response.text + code = response.status_code + if code == 400: + raise DeviceBadRequestError(message or "Bad request", status_code=code) + if code in (401, 403): + raise DeviceAuthError(message or "Unauthorized", status_code=code) + if code == 404: + # validateToken.js returns 404 + GraphMethodException when an api_key + # is valid for this workspace but lacks the required scope + # (device:read / device:update). Surface that as auth so the CLI + # exits 2 with the scope hint instead of 3 ("not found"). + if error_type == "GraphMethodException": + raise DeviceAuthError(message or "Forbidden", status_code=code) + raise DeviceNotFoundError(message or "Not found", status_code=code) + if code == 429: + raise DeviceRateLimitedError(message or "Rate limited", status_code=code) + raise DeviceApiError(message or f"HTTP {code}", status_code=code) + + +def list_devices(api_key: str, workspace: str) -> Dict[str, Any]: + """``GET /:workspace/devices/v2`` — returns the parsed JSON response.""" + response = requests.get(_build_url(workspace, "", api_key)) + _raise_for_status(response) + return response.json() + + +def create_device( + api_key: str, + workspace: str, + *, + device_name: str, + device_type: Optional[str] = None, + workflow_id: Optional[str] = None, + tags: Optional[List[str]] = None, + offline_mode: Optional[bool] = None, + source_device_id: Optional[str] = None, +) -> Dict[str, Any]: + """``POST /:workspace/devices/v2`` — returns ``{ deviceId, installId }``.""" + body: Dict[str, Any] = {"device_name": device_name} + if device_type is not None: + body["device_type"] = device_type + if workflow_id is not None: + body["workflow_id"] = workflow_id + if tags is not None: + body["tags"] = tags + if offline_mode is not None: + body["offline_mode"] = offline_mode + if source_device_id is not None: + # Body field is camelCase per docs/api/deployments/overview.md + body["sourceDeviceId"] = source_device_id + response = requests.post(_build_url(workspace, "", api_key), json=body) + _raise_for_status(response) + return response.json() + + +def get_device(api_key: str, workspace: str, device_id: str) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId``.""" + response = requests.get(_build_url(workspace, f"/{device_id}", api_key)) + _raise_for_status(response) + return response.json() + + +def get_device_config(api_key: str, workspace: str, device_id: str) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/config``. + + Note: + The response can include ``environment_variables`` and integration + credentials. Treat the returned dict as sensitive. + """ + response = requests.get(_build_url(workspace, f"/{device_id}/config", api_key)) + _raise_for_status(response) + return response.json() + + +def get_device_config_history( + api_key: str, + workspace: str, + device_id: str, + *, + limit: Optional[int] = None, + cursor: Optional[str] = None, +) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/config/history``.""" + response = requests.get( + _build_url( + workspace, + f"/{device_id}/config/history", + api_key, + query={"limit": limit, "cursor": cursor}, + ) + ) + _raise_for_status(response) + return response.json() + + +def list_device_streams(api_key: str, workspace: str, device_id: str) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/streams``.""" + response = requests.get(_build_url(workspace, f"/{device_id}/streams", api_key)) + _raise_for_status(response) + return response.json() + + +def get_device_stream(api_key: str, workspace: str, device_id: str, stream_id: str) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/streams/:streamId``.""" + response = requests.get(_build_url(workspace, f"/{device_id}/streams/{stream_id}", api_key)) + _raise_for_status(response) + return response.json() + + +def get_device_logs( + api_key: str, + workspace: str, + device_id: str, + *, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + service: Optional[List[str]] = None, + severity: Optional[List[str]] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, +) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/logs``. Rate limited 5/min/IP.""" + response = requests.get( + _build_url( + workspace, + f"/{device_id}/logs", + api_key, + query={ + "start_time": start_time, + "end_time": end_time, + "service": service, + "severity": severity, + "limit": limit, + "cursor": cursor, + }, + ) + ) + _raise_for_status(response) + return response.json() + + +def get_device_telemetry( + api_key: str, + workspace: str, + device_id: str, + *, + time_period: Optional[str] = None, +) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/telemetry``. Rate limited 60/min.""" + response = requests.get( + _build_url( + workspace, + f"/{device_id}/telemetry", + api_key, + query={"time_period": time_period}, + ) + ) + _raise_for_status(response) + return response.json() + + +def get_device_events( + api_key: str, + workspace: str, + device_id: str, + *, + entity_type: Optional[str] = None, + entity_id: Optional[str] = None, + event: Optional[str] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + direction: Optional[str] = None, +) -> Dict[str, Any]: + """``GET /:workspace/devices/v2/:deviceId/events``.""" + response = requests.get( + _build_url( + workspace, + f"/{device_id}/events", + api_key, + query={ + "entity_type": entity_type, + "entity_id": entity_id, + "event": event, + "start_time": start_time, + "end_time": end_time, + "limit": limit, + "cursor": cursor, + "direction": direction, + }, + ) + ) + _raise_for_status(response) + return response.json() diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py index 99604acb..ae48f979 100644 --- a/roboflow/cli/__init__.py +++ b/roboflow/cli/__init__.py @@ -175,6 +175,7 @@ def _walk(group: Any, prefix: str = "") -> None: from roboflow.cli.handlers.batch import batch_app # noqa: E402 from roboflow.cli.handlers.completion import completion_app # noqa: E402 from roboflow.cli.handlers.deployment import deployment_app # noqa: E402 +from roboflow.cli.handlers.device import device_app # noqa: E402 from roboflow.cli.handlers.folder import folder_app # noqa: E402 from roboflow.cli.handlers.image import image_app # noqa: E402 from roboflow.cli.handlers.infer import infer_command # noqa: E402 @@ -196,6 +197,7 @@ def _walk(group: Any, prefix: str = "") -> None: app.add_typer(batch_app, name="batch", hidden=True) # All stubs — hidden until implemented app.add_typer(completion_app, name="completion") app.add_typer(deployment_app, name="deployment") +app.add_typer(device_app, name="device") app.add_typer(folder_app, name="folder") app.add_typer(image_app, name="image") diff --git a/roboflow/cli/handlers/device.py b/roboflow/cli/handlers/device.py new file mode 100644 index 00000000..1eca74fd --- /dev/null +++ b/roboflow/cli/handlers/device.py @@ -0,0 +1,548 @@ +"""Device management commands. + +Wraps the workspace-scoped Deployments / Device Management API +(``/:workspace/devices/v2/*``). All commands honor ``--workspace`` / +``--api-key`` from the global callback and ``--json`` for stable output. + +Exit codes: + 0 success + 1 general error (incl. 400 bad params, 429 rate limited) + 2 auth (401/403) + 3 not found (404) +""" + +from __future__ import annotations + +from typing import Annotated, Any, Dict, List, Optional + +import typer + +from roboflow.cli._compat import SortedGroup, ctx_to_args + +device_app = typer.Typer(cls=SortedGroup, help="Manage RFDM devices", no_args_is_help=True) + + +def _resolve_ws_and_key(args): # noqa: ANN001 + from roboflow.cli._resolver import resolve_ws_and_key + + return resolve_ws_and_key(args) + + +def _exit_code_for(exc: Exception) -> int: + from roboflow.adapters.devicesapi import ( + DeviceAuthError, + DeviceNotFoundError, + DeviceRateLimitedError, + ) + + if isinstance(exc, DeviceAuthError): + return 2 + if isinstance(exc, DeviceNotFoundError): + return 3 + if isinstance(exc, DeviceRateLimitedError): + return 1 + return 1 + + +def _hint_for(exc: Exception) -> Optional[str]: + from roboflow.adapters.devicesapi import DeviceAuthError, DeviceRateLimitedError + + if isinstance(exc, DeviceRateLimitedError): + return "Logs are limited to 5 req/min/IP and telemetry to 60 req/min — wait and retry." + if isinstance(exc, DeviceAuthError): + return "Verify the api_key has the device:read scope, or device:update for create." + return None + + +def _split_csv(value: Optional[str]) -> Optional[List[str]]: + if value is None: + return None + parts = [p.strip() for p in value.split(",") if p.strip()] + return parts or None + + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + + +@device_app.command("list") +def list_cmd(ctx: typer.Context) -> None: + """List devices in the workspace.""" + args = ctx_to_args(ctx) + _list(args) + + +@device_app.command("get") +def get_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], +) -> None: + """Show a single device.""" + args = ctx_to_args(ctx, device_id=device_id) + _get(args) + + +@device_app.command("create") +def create_cmd( + ctx: typer.Context, + device_name: Annotated[str, typer.Argument(help="Human-readable device name")], + device_type: Annotated[Optional[str], typer.Option("--type", help="Device type: ai1, edge, or custom")] = None, + workflow_id: Annotated[ + Optional[str], typer.Option("--workflow-id", help="Initial workflow assignment (AI1 only)") + ] = None, + tags: Annotated[Optional[str], typer.Option("--tags", help="Comma-separated tags")] = None, + offline_mode: Annotated[ + Optional[bool], typer.Option("--offline-mode/--no-offline-mode", help="AI1 offline mode") + ] = None, + source_device_id: Annotated[ + Optional[str], typer.Option("--source-device-id", help="Duplicate config from this device") + ] = None, +) -> None: + """Create a v2 device. Requires the device:update scope.""" + args = ctx_to_args( + ctx, + device_name=device_name, + device_type=device_type, + workflow_id=workflow_id, + tags=_split_csv(tags), + offline_mode=offline_mode, + source_device_id=source_device_id, + ) + _create(args) + + +@device_app.command("config") +def config_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], +) -> None: + """Show the device's full runtime config (sensitive — may contain credentials).""" + args = ctx_to_args(ctx, device_id=device_id) + _config(args) + + +@device_app.command("config-history") +def config_history_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], + limit: Annotated[Optional[int], typer.Option("--limit", help="Max revisions (1-500, default 10)")] = None, + cursor: Annotated[Optional[str], typer.Option("--cursor", help="ISO timestamp from previous next_cursor")] = None, +) -> None: + """List prior config revisions, newest first.""" + args = ctx_to_args(ctx, device_id=device_id, limit=limit, cursor=cursor) + _config_history(args) + + +@device_app.command("streams") +def streams_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], +) -> None: + """List streams configured on the device.""" + args = ctx_to_args(ctx, device_id=device_id) + _streams(args) + + +@device_app.command("stream") +def stream_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], + stream_id: Annotated[str, typer.Argument(help="Stream ID")], +) -> None: + """Show a single stream.""" + args = ctx_to_args(ctx, device_id=device_id, stream_id=stream_id) + _stream(args) + + +@device_app.command("logs") +def logs_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], + start_time: Annotated[Optional[str], typer.Option("--start-time", help="ISO timestamp")] = None, + end_time: Annotated[Optional[str], typer.Option("--end-time", help="ISO timestamp")] = None, + service: Annotated[Optional[str], typer.Option("--service", help="Comma-separated service names")] = None, + severity: Annotated[ + Optional[str], typer.Option("--severity", help="Comma-separated levels (INFO,WARN,ERROR,...)") + ] = None, + limit: Annotated[Optional[int], typer.Option("--limit", help="1-1000, default 100")] = None, + cursor: Annotated[Optional[str], typer.Option("--cursor", help="ISO timestamp from previous next_cursor")] = None, +) -> None: + """Fetch device logs (5 req/min/IP).""" + args = ctx_to_args( + ctx, + device_id=device_id, + start_time=start_time, + end_time=end_time, + service=_split_csv(service), + severity=_split_csv(severity), + limit=limit, + cursor=cursor, + ) + _logs(args) + + +@device_app.command("telemetry") +def telemetry_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], + time_period: Annotated[ + Optional[str], typer.Option("--time-period", help="One of 1h, 24h (default), 7d, 14d") + ] = None, +) -> None: + """Fetch aggregated hardware telemetry (60 req/min).""" + args = ctx_to_args(ctx, device_id=device_id, time_period=time_period) + _telemetry(args) + + +@device_app.command("events") +def events_cmd( + ctx: typer.Context, + device_id: Annotated[str, typer.Argument(help="Device ID")], + entity_type: Annotated[Optional[str], typer.Option("--entity-type", help="Filter to a single entity type")] = None, + entity_id: Annotated[Optional[str], typer.Option("--entity-id", help="Filter to a single entity id")] = None, + event: Annotated[Optional[str], typer.Option("--event", help="Filter by event name")] = None, + start_time: Annotated[Optional[str], typer.Option("--start-time", help="ISO timestamp")] = None, + end_time: Annotated[Optional[str], typer.Option("--end-time", help="ISO timestamp")] = None, + limit: Annotated[Optional[int], typer.Option("--limit", help="1-1000, default 100")] = None, + cursor: Annotated[ + Optional[str], typer.Option("--cursor", help="Opaque base64url cursor from previous page") + ] = None, + direction: Annotated[ + Optional[str], typer.Option("--direction", help="forward or backward (default backward)") + ] = None, +) -> None: + """Query device/stream lifecycle events.""" + args = ctx_to_args( + ctx, + device_id=device_id, + entity_type=entity_type, + entity_id=entity_id, + event=event, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + direction=direction, + ) + _events(args) + + +# --------------------------------------------------------------------------- +# Business logic +# --------------------------------------------------------------------------- + + +def _list(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.list_devices(api_key, ws) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + rows: List[Dict[str, Any]] = result.get("data", []) + table_rows = [ + { + "id": r.get("id", ""), + "name": r.get("name", "") or "", + "status": r.get("status", "") or "", + "type": r.get("type", "") or "", + "last_heartbeat": r.get("last_heartbeat", "") or "", + } + for r in rows + ] + table = format_table( + table_rows, + columns=["id", "name", "status", "type", "last_heartbeat"], + headers=["ID", "NAME", "STATUS", "TYPE", "LAST HEARTBEAT"], + ) + output(args, result, text=table) + + +def _get(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + device = devicesapi.get_device(api_key, ws, args.device_id) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + hardware = device.get("hardware") or {} + lines = [ + f"Device: {device.get('name') or device.get('id')}", + f" ID: {device.get('id', '')}", + f" Status: {device.get('status', '')}", + f" Type: {device.get('type') or ''}", + f" Platform: {device.get('platform') or ''}", + f" RFDM Version: {device.get('rfdm_version') or ''}", + f" Last Heartbeat: {device.get('last_heartbeat') or ''}", + f" Memory: {hardware.get('total_memory_mb') or ''} MB", + f" Disk: {hardware.get('total_disk_space_mb') or ''} MB", + ] + tags = device.get("tags") or [] + if tags: + lines.append(f" Tags: {', '.join(tags)}") + output(args, device, text="\n".join(lines)) + + +def _create(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.create_device( + api_key, + ws, + device_name=args.device_name, + device_type=args.device_type, + workflow_id=args.workflow_id, + tags=args.tags, + offline_mode=args.offline_mode, + source_device_id=args.source_device_id, + ) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + output( + args, + result, + text=( + f"Created device '{args.device_name}'\n" + f" Device ID: {result.get('deviceId', '')}\n" + f" Install ID: {result.get('installId', '')}" + ), + ) + + +def _config(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + config = devicesapi.get_device_config(api_key, ws, args.device_id) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + output(args, config) + + +def _config_history(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.get_device_config_history(api_key, ws, args.device_id, limit=args.limit, cursor=args.cursor) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + revisions = result.get("data", []) + rows = [ + { + "revision_id": r.get("revision_id", "") or "", + "created_at": r.get("created_at", "") or "", + "created_by": r.get("created_by", "") or "", + } + for r in revisions + ] + table = format_table( + rows, + columns=["revision_id", "created_at", "created_by"], + headers=["REVISION", "CREATED AT", "CREATED BY"], + ) + output(args, result, text=table) + + +def _streams(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.list_device_streams(api_key, ws, args.device_id) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + streams = result.get("data", []) + rows = [ + { + "id": s.get("id", "") or "", + "name": s.get("name", "") or "", + "status": s.get("status", "") or "", + "workflow_id": s.get("workflow_id", "") or "", + } + for s in streams + ] + table = format_table( + rows, + columns=["id", "name", "status", "workflow_id"], + headers=["ID", "NAME", "STATUS", "WORKFLOW"], + ) + output(args, result, text=table) + + +def _stream(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + stream = devicesapi.get_device_stream(api_key, ws, args.device_id, args.stream_id) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + lines = [ + f"Stream: {stream.get('name') or stream.get('id')}", + f" ID: {stream.get('id', '')}", + f" Status: {stream.get('status') or ''}", + f" Workflow: {stream.get('workflow_id') or ''}", + f" Pipeline: {stream.get('pipeline_id') or ''}", + f" Started: {stream.get('started_at') or ''}", + f" Last Event: {stream.get('last_event_at') or ''}", + ] + if stream.get("error"): + lines.append(f" Error: {stream['error']}") + output(args, stream, text="\n".join(lines)) + + +def _logs(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.get_device_logs( + api_key, + ws, + args.device_id, + start_time=args.start_time, + end_time=args.end_time, + service=args.service, + severity=args.severity, + limit=args.limit, + cursor=args.cursor, + ) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + logs = result.get("data", []) + text_lines = [ + f"{log.get('timestamp', '')} [{log.get('severity', '')}] {log.get('service', '')} {log.get('message', '')}" + for log in logs + ] + if not text_lines: + text_lines = ["(no logs)"] + output(args, result, text="\n".join(text_lines)) + + +def _telemetry(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.get_device_telemetry(api_key, ws, args.device_id, time_period=args.time_period) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + buckets = result.get("buckets", []) + lines = [ + f"Time period: {result.get('time_period', '')} " + f"Bucket: {result.get('bucket_interval', '')} " + f"Buckets: {len(buckets)}" + ] + output(args, result, text="\n".join(lines)) + + +def _events(args) -> None: # noqa: ANN001 + from roboflow.adapters import devicesapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = devicesapi.get_device_events( + api_key, + ws, + args.device_id, + entity_type=args.entity_type, + entity_id=args.entity_id, + event=args.event, + start_time=args.start_time, + end_time=args.end_time, + limit=args.limit, + cursor=args.cursor, + direction=args.direction, + ) + except Exception as exc: # noqa: BLE001 + output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) + return + + events = result.get("data", []) + text_lines = [ + f"{e.get('server_timestamp', '')} {e.get('event', '')} " + f"{e.get('entity_type', '')}/{e.get('entity_id', '')} " + f"{e.get('event_description', '') or ''}" + for e in events + ] + if not text_lines: + text_lines = ["(no events)"] + output(args, result, text="\n".join(text_lines)) diff --git a/roboflow/core/device.py b/roboflow/core/device.py new file mode 100644 index 00000000..b79675bc --- /dev/null +++ b/roboflow/core/device.py @@ -0,0 +1,158 @@ +"""Workspace-scoped device handle. + +Wraps the read endpoints of the external Deployments API +(``/:workspace/devices/v2/*``) added in roboflow/roboflow PR #11350. A +``Device`` is constructed by ``Workspace.device(id)`` or implicitly when +listing via ``Workspace.devices()``; it caches the device summary returned +by the API and exposes lazy methods for the per-device sub-resources. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from roboflow.adapters import devicesapi + + +class Device: + """A v2 Roboflow device (RFDM, AI1, edge, …). + + Instances are created by :class:`roboflow.core.workspace.Workspace`. + The ``info`` dict mirrors the entity documented in + ``docs/api/deployments/overview.md`` of the platform repo (fields + ``id``, ``name``, ``status``, ``last_heartbeat``, ``platform``, + ``hardware``, ``tags``, …). + + Note: + :meth:`config` returns the raw Firestore config doc, which can + contain ``environment_variables`` and integration credentials. + """ + + def __init__(self, api_key: str, workspace_url: str, info: Dict[str, Any]) -> None: + self.__api_key = api_key + self.__workspace = workspace_url + self.info: Dict[str, Any] = info + self.id: str = info.get("id", "") + self.name: Optional[str] = info.get("name") + self.status: Optional[str] = info.get("status") + self.type: Optional[str] = info.get("type") + self.tags: List[str] = list(info.get("tags") or []) + + def __repr__(self) -> str: # pragma: no cover - trivial + return f"Device(id={self.id!r}, name={self.name!r}, status={self.status!r})" + + def refresh(self) -> "Device": + """Re-fetch the device summary from the API.""" + self.info = devicesapi.get_device(self.__api_key, self.__workspace, self.id) + self.name = self.info.get("name") + self.status = self.info.get("status") + self.type = self.info.get("type") + self.tags = list(self.info.get("tags") or []) + return self + + def config(self) -> Dict[str, Any]: + """Fetch the device's full runtime config (sensitive — see class docstring).""" + return devicesapi.get_device_config(self.__api_key, self.__workspace, self.id) + + def config_history( + self, + *, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Dict[str, Any]: + """List prior config revisions, newest first. + + Args: + limit: 1-500, default 10. + cursor: ISO timestamp from a previous page's ``next_cursor``. + """ + return devicesapi.get_device_config_history( + self.__api_key, self.__workspace, self.id, limit=limit, cursor=cursor + ) + + def streams(self) -> List[Dict[str, Any]]: + """List streams currently configured on this device.""" + return devicesapi.list_device_streams(self.__api_key, self.__workspace, self.id).get("data", []) + + def stream(self, stream_id: str) -> Dict[str, Any]: + """Get a single stream by id.""" + return devicesapi.get_device_stream(self.__api_key, self.__workspace, self.id, stream_id) + + def logs( + self, + *, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + service: Optional[List[str]] = None, + severity: Optional[List[str]] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Dict[str, Any]: + """Fetch device logs from Elasticsearch (5/min/IP rate limit). + + Args: + start_time: ISO timestamp. + end_time: ISO timestamp. + service: List of service names; serialized as comma-separated string. + severity: List of severity levels (``INFO``, ``WARN``, ``ERROR``, …). + limit: 1-1000, default 100. + cursor: ISO timestamp from a previous page's ``next_cursor``. + """ + return devicesapi.get_device_logs( + self.__api_key, + self.__workspace, + self.id, + start_time=start_time, + end_time=end_time, + service=service, + severity=severity, + limit=limit, + cursor=cursor, + ) + + def telemetry(self, time_period: Optional[str] = None) -> Dict[str, Any]: + """Fetch aggregated hardware telemetry (60/min rate limit). + + Args: + time_period: One of ``"1h"``, ``"24h"`` (default), ``"7d"``, ``"14d"``. + """ + return devicesapi.get_device_telemetry(self.__api_key, self.__workspace, self.id, time_period=time_period) + + def events( + self, + *, + entity_type: Optional[str] = None, + entity_id: Optional[str] = None, + event: Optional[str] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + direction: Optional[str] = None, + ) -> Dict[str, Any]: + """Query device/stream lifecycle events. + + Args: + entity_type: Filter to a single entity type (``stream``, ``device``, …). + entity_id: Filter to a single entity id. + event: Filter by event name. + start_time: ISO timestamp. + end_time: ISO timestamp. + limit: 1-1000, default 100. + cursor: Opaque base64url cursor from a previous page (round-trip only; + do not parse). + direction: ``"forward"`` or ``"backward"`` (default ``"backward"``). + """ + return devicesapi.get_device_events( + self.__api_key, + self.__workspace, + self.id, + entity_type=entity_type, + entity_id=entity_id, + event=event, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + direction=direction, + ) diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 965b51b7..6c8e87ab 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -8,7 +8,7 @@ import tempfile import time import zipfile -from typing import Any, Dict, Generator, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional import requests from requests.exceptions import HTTPError @@ -18,6 +18,9 @@ from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError from roboflow.config import API_URL, APP_URL, DEMO_KEYS +if TYPE_CHECKING: + from roboflow.core.device import Device + class Workspace: """ @@ -128,6 +131,76 @@ def create_project(self, project_name, project_type, project_license, annotation return Project(self.__api_key, r.json(), self.model_format) + def devices(self) -> List["Device"]: + """List v2 devices registered in this workspace. + + Returns: + List of :class:`roboflow.core.device.Device` objects. Each + wraps the entity returned by ``GET /:workspace/devices/v2`` + (id, name, status, last_heartbeat, hardware, tags, …). + """ + from roboflow.adapters import devicesapi + from roboflow.core.device import Device + + rows = devicesapi.list_devices(self.__api_key, self.url).get("data", []) + return [Device(self.__api_key, self.url, row) for row in rows] + + def device(self, device_id: str) -> "Device": + """Get a single device by id. + + Args: + device_id: The device id (as returned by :meth:`devices` or by + :meth:`create_device`). + + Returns: + A :class:`roboflow.core.device.Device` instance. + """ + from roboflow.adapters import devicesapi + from roboflow.core.device import Device + + info = devicesapi.get_device(self.__api_key, self.url, device_id) + return Device(self.__api_key, self.url, info) + + def create_device( + self, + device_name: str, + device_type: Optional[str] = None, + *, + workflow_id: Optional[str] = None, + tags: Optional[List[str]] = None, + offline_mode: Optional[bool] = None, + source_device_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Create a new v2 device in the workspace. + + Args: + device_name: Human-readable device name (required). + device_type: ``"ai1"``, ``"edge"``, or any custom string. + workflow_id: Optional initial workflow assignment. For AI1 devices + this seeds the default ``aione`` stream. + tags: Optional list of string tags. + offline_mode: Boolean; only valid for AI1 devices on workspaces + with the ``roboflowLiteMode`` feature. + source_device_id: When set, duplicates the named existing + device's config instead of generating a fresh one. + + Returns: + Dict with ``deviceId`` and ``installId`` (the short-lived install + token to feed into ``GET /devices/v2/:installId/install.sh``). + """ + from roboflow.adapters import devicesapi + + return devicesapi.create_device( + self.__api_key, + self.url, + device_name=device_name, + device_type=device_type, + workflow_id=workflow_id, + tags=tags, + offline_mode=offline_mode, + source_device_id=source_device_id, + ) + def clip_compare(self, dir: str = "", image_ext: str = ".png", target_image: str = "") -> List[dict]: """ Compare all images in a directory to a target image using CLIP diff --git a/tests/cli/test_device_handler.py b/tests/cli/test_device_handler.py new file mode 100644 index 00000000..1d5fd3ad --- /dev/null +++ b/tests/cli/test_device_handler.py @@ -0,0 +1,195 @@ +"""Tests for the device CLI handler.""" + +from __future__ import annotations + +import json +import unittest +from argparse import Namespace +from unittest.mock import patch + +from typer.testing import CliRunner + +from roboflow.adapters.devicesapi import ( + DeviceAuthError, + DeviceNotFoundError, + DeviceRateLimitedError, +) +from roboflow.cli import app + +runner = CliRunner() + +WS = "test-ws" +KEY = "fake-key" + + +def _args(**kwargs) -> Namespace: + defaults = {"json": False, "workspace": WS, "api_key": KEY, "quiet": False} + defaults.update(kwargs) + return Namespace(**defaults) + + +class TestDeviceRegistration(unittest.TestCase): + """Subcommands are registered and `--help` works for each.""" + + def test_top_level_help(self) -> None: + result = runner.invoke(app, ["device", "--help"]) + self.assertEqual(result.exit_code, 0) + self.assertIn("Manage RFDM devices", result.output) + + def test_subcommands(self) -> None: + for verb in ( + "list", + "get", + "create", + "config", + "config-history", + "streams", + "stream", + "logs", + "telemetry", + "events", + ): + with self.subTest(verb=verb): + result = runner.invoke(app, ["device", verb, "--help"]) + self.assertEqual(result.exit_code, 0, msg=result.output) + + +class TestDeviceListHandler(unittest.TestCase): + @patch("roboflow.adapters.devicesapi.list_devices") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_list_text(self, _mk, _mw, mock_list): + mock_list.return_value = { + "data": [ + { + "id": "a", + "name": "Cam A", + "status": "online", + "type": "edge", + "last_heartbeat": "2026-04-30T00:00:00Z", + } + ] + } + from roboflow.cli.handlers.device import _list + + with patch("builtins.print") as mock_print: + _list(_args()) + mock_print.assert_called_once() + printed = mock_print.call_args[0][0] + self.assertIn("Cam A", printed) + self.assertIn("online", printed) + + @patch("roboflow.adapters.devicesapi.list_devices") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_list_json(self, _mk, _mw, mock_list): + mock_list.return_value = {"data": [{"id": "a", "name": "Cam A"}]} + from roboflow.cli.handlers.device import _list + + with patch("builtins.print") as mock_print: + _list(_args(json=True)) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["data"][0]["id"], "a") + + +class TestDeviceErrorMapping(unittest.TestCase): + """Adapter exceptions map to documented exit codes.""" + + @patch("roboflow.adapters.devicesapi.get_device") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_404_exits_3(self, _mk, _mw, mock_get): + mock_get.side_effect = DeviceNotFoundError("not found", status_code=404) + from roboflow.cli.handlers.device import _get + + with self.assertRaises(SystemExit) as ctx: + _get(_args(device_id="missing")) + self.assertEqual(ctx.exception.code, 3) + + @patch("roboflow.adapters.devicesapi.get_device") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_401_exits_2(self, _mk, _mw, mock_get): + mock_get.side_effect = DeviceAuthError("nope", status_code=401) + from roboflow.cli.handlers.device import _get + + with self.assertRaises(SystemExit) as ctx: + _get(_args(device_id="x")) + self.assertEqual(ctx.exception.code, 2) + + @patch("roboflow.adapters.devicesapi.get_device_logs") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_429_exits_1_with_hint(self, _mk, _mw, mock_logs): + mock_logs.side_effect = DeviceRateLimitedError("slow down", status_code=429) + from roboflow.cli.handlers.device import _logs + + args = _args( + device_id="x", + start_time=None, + end_time=None, + service=None, + severity=None, + limit=None, + cursor=None, + json=True, + ) + with self.assertRaises(SystemExit) as ctx: + _logs(args) + self.assertEqual(ctx.exception.code, 1) + + +class TestDeviceCreateHandler(unittest.TestCase): + @patch("roboflow.adapters.devicesapi.create_device") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_create_passes_args(self, _mk, _mw, mock_create): + mock_create.return_value = {"deviceId": "d1", "installId": "i1"} + from roboflow.cli.handlers.device import _create + + args = _args( + device_name="Cam 1", + device_type="edge", + workflow_id="wf-1", + tags=["a", "b"], + offline_mode=None, + source_device_id=None, + json=True, + ) + with patch("builtins.print") as mock_print: + _create(args) + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["device_name"], "Cam 1") + self.assertEqual(kwargs["device_type"], "edge") + self.assertEqual(kwargs["tags"], ["a", "b"]) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["deviceId"], "d1") + + +class TestDeviceLogsCsvSerialization(unittest.TestCase): + @patch("roboflow.adapters.devicesapi.get_device_logs") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_severity_passed_as_list(self, _mk, _mw, mock_logs): + mock_logs.return_value = {"data": [], "pagination": {}} + from roboflow.cli.handlers.device import _logs + + args = _args( + device_id="x", + start_time=None, + end_time=None, + service=["foo", "bar"], + severity=["INFO"], + limit=None, + cursor=None, + ) + _logs(args) + kwargs = mock_logs.call_args.kwargs + self.assertEqual(kwargs["service"], ["foo", "bar"]) + self.assertEqual(kwargs["severity"], ["INFO"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_device.py b/tests/test_device.py new file mode 100644 index 00000000..0aa47529 --- /dev/null +++ b/tests/test_device.py @@ -0,0 +1,281 @@ +"""Tests for Device, devicesapi adapter, and Workspace device methods.""" + +from __future__ import annotations + +import unittest +from typing import Any, Dict +from unittest.mock import MagicMock, patch + +from roboflow.adapters import devicesapi +from roboflow.adapters.devicesapi import ( + DeviceApiError, + DeviceAuthError, + DeviceBadRequestError, + DeviceNotFoundError, + DeviceRateLimitedError, +) +from roboflow.core.device import Device + +API_KEY = "fake-key" +WORKSPACE = "ws-1" +DEVICE_ID = "dev-abc" + + +def _mock_response(status: int, payload: Any) -> MagicMock: + response = MagicMock() + response.status_code = status + response.json.return_value = payload + response.text = "" if isinstance(payload, dict) else str(payload) + return response + + +class TestDevicesApiUrlBuilding(unittest.TestCase): + """The adapter must build correct workspace-scoped /devices/v2/* URLs.""" + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_list_devices_url(self, mock_get): + mock_get.return_value = _mock_response(200, {"data": []}) + result = devicesapi.list_devices(API_KEY, WORKSPACE) + called_url = mock_get.call_args[0][0] + self.assertIn(f"/{WORKSPACE}/devices/v2", called_url) + self.assertIn(f"api_key={API_KEY}", called_url) + self.assertEqual(result, {"data": []}) + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_get_device_url(self, mock_get): + mock_get.return_value = _mock_response(200, {"id": DEVICE_ID}) + devicesapi.get_device(API_KEY, WORKSPACE, DEVICE_ID) + called_url = mock_get.call_args[0][0] + self.assertIn(f"/{WORKSPACE}/devices/v2/{DEVICE_ID}", called_url) + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_list_device_streams_returns_envelope(self, mock_get): + mock_get.return_value = _mock_response(200, {"data": [{"id": "s1"}]}) + result = devicesapi.list_device_streams(API_KEY, WORKSPACE, DEVICE_ID) + called_url = mock_get.call_args[0][0] + self.assertIn(f"/{WORKSPACE}/devices/v2/{DEVICE_ID}/streams", called_url) + self.assertEqual(result, {"data": [{"id": "s1"}]}) + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_logs_csv_serialization(self, mock_get): + mock_get.return_value = _mock_response(200, {"data": [], "pagination": {}}) + devicesapi.get_device_logs( + API_KEY, + WORKSPACE, + DEVICE_ID, + service=["a", "b"], + severity=["INFO", "WARN"], + limit=50, + ) + called_url = mock_get.call_args[0][0] + # csv-serialized list params; characters URL-encoded by urllib + self.assertIn("service=a%2Cb", called_url) + self.assertIn("severity=INFO%2CWARN", called_url) + self.assertIn("limit=50", called_url) + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_telemetry_time_period(self, mock_get): + mock_get.return_value = _mock_response(200, {"buckets": []}) + devicesapi.get_device_telemetry(API_KEY, WORKSPACE, DEVICE_ID, time_period="7d") + called_url = mock_get.call_args[0][0] + self.assertIn("time_period=7d", called_url) + + @patch("roboflow.adapters.devicesapi.requests.get") + def test_events_passes_cursor_unparsed(self, mock_get): + mock_get.return_value = _mock_response(200, {"data": [], "pagination": {}}) + # Cursors are opaque base64url strings; must round-trip without parsing. + cursor = "eyJ0aW1lc3RhbXAiOiAiMjAyNi0wNC0yMyAxMDowMDowMCJ9" + devicesapi.get_device_events(API_KEY, WORKSPACE, DEVICE_ID, cursor=cursor, direction="forward") + called_url = mock_get.call_args[0][0] + self.assertIn(f"cursor={cursor}", called_url) + self.assertIn("direction=forward", called_url) + + @patch("roboflow.adapters.devicesapi.requests.post") + def test_create_device_body_field_names(self, mock_post): + mock_post.return_value = _mock_response(201, {"deviceId": "d1", "installId": "i1"}) + devicesapi.create_device( + API_KEY, + WORKSPACE, + device_name="Cam 1", + device_type="edge", + workflow_id="wf-1", + tags=["a"], + offline_mode=True, + source_device_id="other", + ) + body = mock_post.call_args.kwargs["json"] + self.assertEqual(body["device_name"], "Cam 1") + self.assertEqual(body["device_type"], "edge") + self.assertEqual(body["workflow_id"], "wf-1") + self.assertEqual(body["tags"], ["a"]) + self.assertTrue(body["offline_mode"]) + # Body field is camelCase per docs/api/deployments/overview.md + self.assertEqual(body["sourceDeviceId"], "other") + + +class TestDevicesApiErrors(unittest.TestCase): + """Each non-2xx HTTP status maps to a typed exception.""" + + def _expect(self, status: int, expected_cls: type) -> None: + with patch("roboflow.adapters.devicesapi.requests.get") as mock_get: + mock_get.return_value = _mock_response(status, {"error": "bad"}) + with self.assertRaises(expected_cls) as ctx: + devicesapi.get_device(API_KEY, WORKSPACE, DEVICE_ID) + self.assertEqual(ctx.exception.status_code, status) + + def test_400_bad_request(self) -> None: + self._expect(400, DeviceBadRequestError) + + def test_401_auth(self) -> None: + self._expect(401, DeviceAuthError) + + def test_403_auth(self) -> None: + self._expect(403, DeviceAuthError) + + def test_404_not_found(self) -> None: + self._expect(404, DeviceNotFoundError) + + def test_404_missing_scope_is_auth(self) -> None: + # validateToken.js returns 404 + GraphMethodException when the api_key + # is valid for the workspace but lacks the device:read/update scope. + body = {"error": {"type": "GraphMethodException", "message": "scope missing"}} + with patch("roboflow.adapters.devicesapi.requests.get") as mock_get: + mock_get.return_value = _mock_response(404, body) + with self.assertRaises(DeviceAuthError) as ctx: + devicesapi.get_device(API_KEY, WORKSPACE, DEVICE_ID) + self.assertEqual(ctx.exception.status_code, 404) + + def test_429_rate_limit(self) -> None: + self._expect(429, DeviceRateLimitedError) + + def test_500_generic(self) -> None: + self._expect(500, DeviceApiError) + + +class TestDeviceClass(unittest.TestCase): + """Device exposes the per-device sub-resources.""" + + def setUp(self) -> None: + self.info: Dict[str, Any] = { + "id": DEVICE_ID, + "name": "Cam 1", + "status": "online", + "type": "edge", + "tags": ["floor-1"], + } + self.device = Device(API_KEY, WORKSPACE, self.info) + + def test_init_caches_summary_fields(self) -> None: + self.assertEqual(self.device.id, DEVICE_ID) + self.assertEqual(self.device.name, "Cam 1") + self.assertEqual(self.device.status, "online") + self.assertEqual(self.device.type, "edge") + self.assertEqual(self.device.tags, ["floor-1"]) + + @patch("roboflow.adapters.devicesapi.get_device_config") + def test_config_calls_adapter(self, mock_config) -> None: + mock_config.return_value = {"device_id": DEVICE_ID, "config": {}} + result = self.device.config() + mock_config.assert_called_once_with(API_KEY, WORKSPACE, DEVICE_ID) + self.assertEqual(result["device_id"], DEVICE_ID) + + @patch("roboflow.adapters.devicesapi.get_device_config_history") + def test_config_history_passes_cursor(self, mock_hist) -> None: + mock_hist.return_value = {"data": [], "pagination": {}} + self.device.config_history(limit=20, cursor="2026-04-23T10:00:00Z") + mock_hist.assert_called_once_with(API_KEY, WORKSPACE, DEVICE_ID, limit=20, cursor="2026-04-23T10:00:00Z") + + @patch("roboflow.adapters.devicesapi.list_device_streams") + def test_streams(self, mock_streams) -> None: + mock_streams.return_value = {"data": [{"id": "s1"}]} + self.assertEqual(self.device.streams(), [{"id": "s1"}]) + + @patch("roboflow.adapters.devicesapi.get_device_stream") + def test_stream(self, mock_stream) -> None: + mock_stream.return_value = {"id": "s1"} + self.device.stream("s1") + mock_stream.assert_called_once_with(API_KEY, WORKSPACE, DEVICE_ID, "s1") + + @patch("roboflow.adapters.devicesapi.get_device_logs") + def test_logs_forwards_kwargs(self, mock_logs) -> None: + mock_logs.return_value = {"data": [], "pagination": {}} + self.device.logs(severity=["ERROR"], limit=10) + kwargs = mock_logs.call_args.kwargs + self.assertEqual(kwargs["severity"], ["ERROR"]) + self.assertEqual(kwargs["limit"], 10) + + @patch("roboflow.adapters.devicesapi.get_device_telemetry") + def test_telemetry(self, mock_tel) -> None: + mock_tel.return_value = {"buckets": []} + self.device.telemetry("1h") + mock_tel.assert_called_once_with(API_KEY, WORKSPACE, DEVICE_ID, time_period="1h") + + @patch("roboflow.adapters.devicesapi.get_device_events") + def test_events_forwards_all_filters(self, mock_events) -> None: + mock_events.return_value = {"data": [], "pagination": {}} + self.device.events( + entity_type="stream", + entity_id="pipe-1", + event="stream_started", + start_time="2026-04-01T00:00:00Z", + end_time="2026-04-30T00:00:00Z", + limit=200, + cursor="opaque", + direction="forward", + ) + kwargs = mock_events.call_args.kwargs + self.assertEqual(kwargs["entity_type"], "stream") + self.assertEqual(kwargs["entity_id"], "pipe-1") + self.assertEqual(kwargs["event"], "stream_started") + self.assertEqual(kwargs["limit"], 200) + self.assertEqual(kwargs["cursor"], "opaque") + self.assertEqual(kwargs["direction"], "forward") + + @patch("roboflow.adapters.devicesapi.get_device") + def test_refresh_updates_fields(self, mock_get) -> None: + mock_get.return_value = {"id": DEVICE_ID, "name": "Cam 1 (renamed)", "status": "offline", "tags": []} + self.device.refresh() + self.assertEqual(self.device.name, "Cam 1 (renamed)") + self.assertEqual(self.device.status, "offline") + self.assertEqual(self.device.tags, []) + + +class TestWorkspaceDeviceMethods(unittest.TestCase): + """Workspace.devices() / .device() / .create_device() route through the adapter.""" + + def setUp(self) -> None: + from roboflow.core.workspace import Workspace + + info = {"workspace": {"name": "Test", "url": WORKSPACE, "projects": []}} + self.workspace = Workspace(info=info, api_key=API_KEY, default_workspace=WORKSPACE, model_format="yolov8") + + @patch("roboflow.adapters.devicesapi.list_devices") + def test_devices_returns_device_objects(self, mock_list) -> None: + mock_list.return_value = {"data": [{"id": "a"}, {"id": "b"}]} + devices = self.workspace.devices() + self.assertEqual(len(devices), 2) + self.assertIsInstance(devices[0], Device) + self.assertEqual(devices[0].id, "a") + + @patch("roboflow.adapters.devicesapi.get_device") + def test_device_returns_single(self, mock_get) -> None: + mock_get.return_value = {"id": DEVICE_ID, "name": "Cam"} + device = self.workspace.device(DEVICE_ID) + self.assertIsInstance(device, Device) + self.assertEqual(device.id, DEVICE_ID) + self.assertEqual(device.name, "Cam") + + @patch("roboflow.adapters.devicesapi.create_device") + def test_create_device_forwards_kwargs(self, mock_create) -> None: + mock_create.return_value = {"deviceId": "d1", "installId": "i1"} + result = self.workspace.create_device("Cam 1", device_type="edge", workflow_id="wf-1", tags=["a"]) + self.assertEqual(result["deviceId"], "d1") + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["device_name"], "Cam 1") + self.assertEqual(kwargs["device_type"], "edge") + self.assertEqual(kwargs["workflow_id"], "wf-1") + self.assertEqual(kwargs["tags"], ["a"]) + + +if __name__ == "__main__": + unittest.main() From 7ce03ebb6150b16c0a88fe7dc109e2017ea3ae6d Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Mon, 4 May 2026 15:15:32 -0400 Subject: [PATCH 2/3] Add device API request timeouts --- roboflow/adapters/devicesapi.py | 33 +++++++++++++++++++++++---------- tests/test_device.py | 22 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/roboflow/adapters/devicesapi.py b/roboflow/adapters/devicesapi.py index 9632fa43..883f1bd6 100644 --- a/roboflow/adapters/devicesapi.py +++ b/roboflow/adapters/devicesapi.py @@ -18,6 +18,8 @@ from roboflow.adapters.rfapi import RoboflowError from roboflow.config import API_URL +DEFAULT_TIMEOUT = (10, 60) + class DeviceApiError(RoboflowError): """Raised when a device API call returns a non-success status.""" @@ -96,7 +98,7 @@ def _raise_for_status(response: requests.Response) -> None: def list_devices(api_key: str, workspace: str) -> Dict[str, Any]: """``GET /:workspace/devices/v2`` — returns the parsed JSON response.""" - response = requests.get(_build_url(workspace, "", api_key)) + response = requests.get(_build_url(workspace, "", api_key), timeout=DEFAULT_TIMEOUT) _raise_for_status(response) return response.json() @@ -125,14 +127,18 @@ def create_device( if source_device_id is not None: # Body field is camelCase per docs/api/deployments/overview.md body["sourceDeviceId"] = source_device_id - response = requests.post(_build_url(workspace, "", api_key), json=body) + response = requests.post( + _build_url(workspace, "", api_key), + json=body, + timeout=DEFAULT_TIMEOUT, + ) _raise_for_status(response) return response.json() def get_device(api_key: str, workspace: str, device_id: str) -> Dict[str, Any]: """``GET /:workspace/devices/v2/:deviceId``.""" - response = requests.get(_build_url(workspace, f"/{device_id}", api_key)) + response = requests.get(_build_url(workspace, f"/{device_id}", api_key), timeout=DEFAULT_TIMEOUT) _raise_for_status(response) return response.json() @@ -144,7 +150,7 @@ def get_device_config(api_key: str, workspace: str, device_id: str) -> Dict[str, The response can include ``environment_variables`` and integration credentials. Treat the returned dict as sensitive. """ - response = requests.get(_build_url(workspace, f"/{device_id}/config", api_key)) + response = requests.get(_build_url(workspace, f"/{device_id}/config", api_key), timeout=DEFAULT_TIMEOUT) _raise_for_status(response) return response.json() @@ -164,7 +170,8 @@ def get_device_config_history( f"/{device_id}/config/history", api_key, query={"limit": limit, "cursor": cursor}, - ) + ), + timeout=DEFAULT_TIMEOUT, ) _raise_for_status(response) return response.json() @@ -172,14 +179,17 @@ def get_device_config_history( def list_device_streams(api_key: str, workspace: str, device_id: str) -> Dict[str, Any]: """``GET /:workspace/devices/v2/:deviceId/streams``.""" - response = requests.get(_build_url(workspace, f"/{device_id}/streams", api_key)) + response = requests.get(_build_url(workspace, f"/{device_id}/streams", api_key), timeout=DEFAULT_TIMEOUT) _raise_for_status(response) return response.json() def get_device_stream(api_key: str, workspace: str, device_id: str, stream_id: str) -> Dict[str, Any]: """``GET /:workspace/devices/v2/:deviceId/streams/:streamId``.""" - response = requests.get(_build_url(workspace, f"/{device_id}/streams/{stream_id}", api_key)) + response = requests.get( + _build_url(workspace, f"/{device_id}/streams/{stream_id}", api_key), + timeout=DEFAULT_TIMEOUT, + ) _raise_for_status(response) return response.json() @@ -210,7 +220,8 @@ def get_device_logs( "limit": limit, "cursor": cursor, }, - ) + ), + timeout=DEFAULT_TIMEOUT, ) _raise_for_status(response) return response.json() @@ -230,7 +241,8 @@ def get_device_telemetry( f"/{device_id}/telemetry", api_key, query={"time_period": time_period}, - ) + ), + timeout=DEFAULT_TIMEOUT, ) _raise_for_status(response) return response.json() @@ -266,7 +278,8 @@ def get_device_events( "cursor": cursor, "direction": direction, }, - ) + ), + timeout=DEFAULT_TIMEOUT, ) _raise_for_status(response) return response.json() diff --git a/tests/test_device.py b/tests/test_device.py index 0aa47529..7a2650ca 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -112,6 +112,28 @@ def test_create_device_body_field_names(self, mock_post): # Body field is camelCase per docs/api/deployments/overview.md self.assertEqual(body["sourceDeviceId"], "other") + @patch("roboflow.adapters.devicesapi.requests.post") + @patch("roboflow.adapters.devicesapi.requests.get") + def test_requests_use_default_timeout(self, mock_get, mock_post): + mock_get.return_value = _mock_response(200, {"data": [], "pagination": {}}) + mock_post.return_value = _mock_response(201, {"deviceId": "d1", "installId": "i1"}) + + devicesapi.list_devices(API_KEY, WORKSPACE) + devicesapi.create_device(API_KEY, WORKSPACE, device_name="Cam 1") + devicesapi.get_device(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.get_device_config(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.get_device_config_history(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.list_device_streams(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.get_device_stream(API_KEY, WORKSPACE, DEVICE_ID, "s1") + devicesapi.get_device_logs(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.get_device_telemetry(API_KEY, WORKSPACE, DEVICE_ID) + devicesapi.get_device_events(API_KEY, WORKSPACE, DEVICE_ID) + + for call in mock_get.call_args_list: + self.assertEqual(call.kwargs["timeout"], devicesapi.DEFAULT_TIMEOUT) + for call in mock_post.call_args_list: + self.assertEqual(call.kwargs["timeout"], devicesapi.DEFAULT_TIMEOUT) + class TestDevicesApiErrors(unittest.TestCase): """Each non-2xx HTTP status maps to a typed exception.""" From aaa1ac213e848d7fc4cd17b5910f3718d34898d5 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Mon, 4 May 2026 15:35:02 -0400 Subject: [PATCH 3/3] review: tighten api_key redaction, truncate error bodies, warn on device config Address @NVergunst-ROBO's review on PR #469. 1. Tighten `_sanitize_credentials` regex in roboflow/cli/_output.py to match `api_key=` values containing dashes / non-word URL-safe chars. The old pattern `[A-Za-z0-9_]+` would leak keys like `my-key-123`. Stops `requests` exceptions whose `str(exc)` echoes the request URL from leaking the api_key to stderr. 2. Cap raw response bodies at 1024 chars in `_raise_for_status` so a multi-KB HTML 500 stack trace doesn't land in `str(DeviceApiError)`. The Express default 500 handler we hit on `/events` returned ~1KB; without the cap a real backend trace could be much larger. 3. `roboflow device config` now writes a stderr advisory in interactive (text, non-quiet) mode reminding the user the config can contain env vars and integration credentials. JSON mode is intentionally untouched: the API contract is a passthrough of the Firestore config doc, and silently redacting in the SDK would corrupt round-trip use cases (backup/restore, diff, migration). Server-side parity (sanitize uniformly across UI / SDK / raw curl) is the right place for any actual redaction. Also adds three regression tests covering the redaction, the truncation, and the warning behavior in both text and JSON modes. Co-Authored-By: Claude Opus 4.7 (1M context) --- roboflow/adapters/devicesapi.py | 14 ++++++ roboflow/cli/_output.py | 5 +- roboflow/cli/handlers/device.py | 15 ++++++ tests/cli/test_device_handler.py | 80 ++++++++++++++++++++++++++++++++ tests/test_device.py | 16 +++++++ 5 files changed, 129 insertions(+), 1 deletion(-) diff --git a/roboflow/adapters/devicesapi.py b/roboflow/adapters/devicesapi.py index 883f1bd6..7a655133 100644 --- a/roboflow/adapters/devicesapi.py +++ b/roboflow/adapters/devicesapi.py @@ -20,6 +20,19 @@ DEFAULT_TIMEOUT = (10, 60) +# Cap on raw error-body bytes surfaced through exception messages. Server-side +# 500s sometimes return a multi-KB HTML stack trace; without a cap the whole +# blob would land in `str(exc)` and wreck terminals/log lines. +_MAX_ERROR_BODY_CHARS = 1024 + + +def _truncate(text: str) -> str: + if not text: + return text + if len(text) <= _MAX_ERROR_BODY_CHARS: + return text + return text[:_MAX_ERROR_BODY_CHARS] + "…[truncated]" + class DeviceApiError(RoboflowError): """Raised when a device API call returns a non-success status.""" @@ -78,6 +91,7 @@ def _raise_for_status(response: requests.Response) -> None: message = response.text except Exception: # noqa: BLE001 message = response.text + message = _truncate(message) code = response.status_code if code == 400: raise DeviceBadRequestError(message or "Bad request", status_code=code) diff --git a/roboflow/cli/_output.py b/roboflow/cli/_output.py index ffc29025..40e3565a 100644 --- a/roboflow/cli/_output.py +++ b/roboflow/cli/_output.py @@ -90,7 +90,10 @@ def _sanitize_credentials(text: str) -> str: """Strip API keys from URLs and other sensitive patterns in error messages.""" import re - return re.sub(r"api_key=[A-Za-z0-9_]+", "api_key=***", text) + # Match api_key=... up to the next whitespace, query separator, quote, or backslash. + # Older patterns missed keys containing '-' or other URL-safe characters and would + # echo them to the terminal when an exception bubbled up from `requests`. + return re.sub(r"api_key=[^\s&\"'\\<>]+", "api_key=***", text) def _parse_error_message(raw: str) -> tuple[Optional[dict[str, Any]], str]: diff --git a/roboflow/cli/handlers/device.py b/roboflow/cli/handlers/device.py index 1eca74fd..940de899 100644 --- a/roboflow/cli/handlers/device.py +++ b/roboflow/cli/handlers/device.py @@ -337,6 +337,8 @@ def _create(args) -> None: # noqa: ANN001 def _config(args) -> None: # noqa: ANN001 + import sys + from roboflow.adapters import devicesapi from roboflow.cli._output import output, output_error @@ -351,6 +353,19 @@ def _config(args) -> None: # noqa: ANN001 output_error(args, str(exc), hint=_hint_for(exc), exit_code=_exit_code_for(exc)) return + # `GET .../config` is a documented passthrough of the Firestore config doc — it can + # contain `environment_variables` and integration credentials. We deliberately do + # NOT redact: that would silently corrupt round-trips (backup/restore/diff) and + # diverge from what the API contract returns. Instead, surface a stderr warning + # in interactive (non-JSON, non-quiet) mode so a human running `roboflow device + # config ` is reminded before they paste the output anywhere. JSON mode stays + # byte-identical to the API response. + if not getattr(args, "json", False) and not getattr(args, "quiet", False): + sys.stderr.write( + "WARNING: Device config may contain environment variables, API keys, " + "and integration credentials. Do not paste this output into chats, " + "tickets, screenshots, or shared logs.\n" + ) output(args, config) diff --git a/tests/cli/test_device_handler.py b/tests/cli/test_device_handler.py index 1d5fd3ad..8fb83e13 100644 --- a/tests/cli/test_device_handler.py +++ b/tests/cli/test_device_handler.py @@ -140,6 +140,86 @@ def test_429_exits_1_with_hint(self, _mk, _mw, mock_logs): self.assertEqual(ctx.exception.code, 1) +class TestDeviceErrorRedaction(unittest.TestCase): + """Adapter errors that contain ``api_key=...`` must never reach stderr verbatim.""" + + @patch("roboflow.adapters.devicesapi.get_device") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value="secret-key-with-DASH_und_123") + def test_connection_error_strips_api_key_from_url(self, _mk, _mw, mock_get): + # Simulate a requests.ConnectionError surfacing the full URL with the key. + url = "https://api.roboflow.com/test-ws/devices/v2/x?api_key=secret-key-with-DASH_und_123" + mock_get.side_effect = RuntimeError(f"HTTPSConnectionPool: failed to reach {url}") + import sys + from io import StringIO + + from roboflow.cli.handlers.device import _get + + buf = StringIO() + old = sys.stderr + sys.stderr = buf + try: + with self.assertRaises(SystemExit): + _get(_args(device_id="x", json=True)) + finally: + sys.stderr = old + emitted = buf.getvalue() + self.assertNotIn("secret-key-with-DASH_und_123", emitted) + self.assertIn("api_key=***", emitted) + + +class TestDeviceConfigWarning(unittest.TestCase): + """`device config` must remind humans that the output is sensitive — but not in JSON mode.""" + + @patch("roboflow.adapters.devicesapi.get_device_config") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_text_mode_prints_warning_to_stderr(self, _mk, _mw, mock_cfg): + mock_cfg.return_value = {"environment_variables": {"SECRET": "x"}, "services": {}} + import sys + from io import StringIO + + from roboflow.cli.handlers.device import _config + + buf = StringIO() + old = sys.stderr + sys.stderr = buf + try: + with patch("builtins.print"): # swallow the JSON dump + _config(_args(device_id="x")) + finally: + sys.stderr = old + warning = buf.getvalue() + self.assertIn("WARNING", warning) + self.assertIn("environment variables", warning) + + @patch("roboflow.adapters.devicesapi.get_device_config") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) + @patch("roboflow.config.load_roboflow_api_key", return_value=KEY) + def test_json_mode_no_warning_and_no_payload_mutation(self, _mk, _mw, mock_cfg): + payload = {"environment_variables": {"SECRET": "x"}, "services": {"a": 1}} + mock_cfg.return_value = payload + import sys + from io import StringIO + + from roboflow.cli.handlers.device import _config + + buf = StringIO() + old = sys.stderr + sys.stderr = buf + try: + with patch("builtins.print") as mock_print: + _config(_args(device_id="x", json=True)) + finally: + sys.stderr = old + # No warning in JSON mode (machine consumers shouldn't see ad hoc human text). + self.assertEqual(buf.getvalue(), "") + # The dumped JSON must be byte-faithful to the API response — never redacted + # by the SDK, since callers depend on round-trip fidelity for backup/restore. + printed = mock_print.call_args[0][0] + self.assertEqual(json.loads(printed), payload) + + class TestDeviceCreateHandler(unittest.TestCase): @patch("roboflow.adapters.devicesapi.create_device") @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=WS) diff --git a/tests/test_device.py b/tests/test_device.py index 7a2650ca..f4aec47a 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -173,6 +173,22 @@ def test_429_rate_limit(self) -> None: def test_500_generic(self) -> None: self._expect(500, DeviceApiError) + def test_500_truncates_huge_response_body(self) -> None: + # Server-side 500s sometimes return a multi-KB HTML stack trace. The + # adapter must cap that before it lands in str(exc). + huge_body = "X" * 10_000 # 10x the cap + with patch("roboflow.adapters.devicesapi.requests.get") as mock_get: + response = MagicMock() + response.status_code = 500 + response.json.side_effect = ValueError("not JSON") + response.text = huge_body + mock_get.return_value = response + with self.assertRaises(DeviceApiError) as ctx: + devicesapi.get_device(API_KEY, WORKSPACE, DEVICE_ID) + msg = str(ctx.exception) + self.assertLess(len(msg), len(huge_body)) + self.assertTrue(msg.endswith("…[truncated]")) + class TestDeviceClass(unittest.TestCase): """Device exposes the per-device sub-resources."""