diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py index c0b00f39..2110292d 100644 --- a/roboflow/adapters/rfapi.py +++ b/roboflow/adapters/rfapi.py @@ -12,7 +12,18 @@ class RoboflowError(Exception): - pass + """Generic API error. + + Optional `status_code` is the HTTP status from the upstream response + when available — set by helpers like `_raise_for_trash_response` so + callers can branch on auth (401) vs not-found (404) without string + matching the message. Existing call sites that pass only a message + still work; the attribute defaults to `None`. + """ + + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code class ImageUploadError(RoboflowError): @@ -911,8 +922,11 @@ def _raise_for_trash_response(response): error messages are agent-friendly. Falls back to the raw text if the body isn't JSON or doesn't contain an `error` field. - The single `raise` at the end means we can't accidentally swallow the - intended error if a future refactor widens the except clause. + Carries the HTTP status code on the raised exception so callers (e.g. + the CLI) can map auth failures (401) to the right exit code without + string-matching. The single `raise` at the end means we can't + accidentally swallow the intended error if a future refactor widens + the except clause. """ msg = None try: @@ -922,7 +936,7 @@ def _raise_for_trash_response(response): except ValueError: # Body wasn't JSON — fall through to response.text. pass - raise RoboflowError(msg or response.text) + raise RoboflowError(msg or response.text, status_code=response.status_code) def delete_project(api_key, workspace_url, project_url): diff --git a/roboflow/cli/_output.py b/roboflow/cli/_output.py index ffc29025..e3f22dd4 100644 --- a/roboflow/cli/_output.py +++ b/roboflow/cli/_output.py @@ -176,6 +176,93 @@ def output_error( sys.exit(exit_code) +def confirm_destructive(args: Any, prompt: str) -> bool: + """Gate a destructive action on either ``--yes`` or an interactive TTY confirmation. + + Returns ``True`` if the action is approved (caller should proceed) or + ``False`` if the user declined at the prompt (caller should bail + cleanly via ``output(args, {"cancelled": True}, ...)``). + + Calls ``output_error`` and exits with code 1 when *no* TTY is available + and ``--yes`` wasn't passed, rather than prompting on a closed stdin + (which would either hang or — worse — silently default to a permissive + behavior). + + The previous logic gated on ``--json`` ("if --json is set, skip the + prompt") which conflated *output formatting* with *destructive intent*. + Anyone piping ``roboflow project delete X --json`` into ``jq`` for + parsing got their project nuked without any confirmation. Now ``--json`` + is purely a formatting flag; the kill-switch is ``--yes``/``-y``. + """ + if getattr(args, "yes", False): + return True + + # Either explicit `--yes` is required or we need an interactive TTY to + # ask. typer.confirm() reads from stdin; if stdin is closed (CI, piped + # input, agent) we'd hang — bail with a useful hint instead. + if not sys.stdin.isatty(): + output_error( + args, + "This is a destructive action and requires confirmation.", + hint=( + "Re-run with --yes / -y to confirm, or run interactively. " + "(--json is a formatting flag and does not bypass this.)" + ), + exit_code=1, + ) + return False # unreachable: output_error sys.exits + + import typer + + confirmed = typer.confirm(prompt, default=False) + if not confirmed: + # Caller renders the cancelled state. + output(args, {"cancelled": True}, text="Cancelled.") + return confirmed + + +def output_api_error( + args: Any, + exc: Exception, + *, + hint: Optional[str] = None, + auth_hint: Optional[str] = None, + not_found_hint: Optional[str] = None, +) -> None: + """Render a server-side error and exit with the correct code. + + Maps an exception's HTTP status (carried as ``exc.status_code`` when the + raiser sets it — see ``_raise_for_trash_response`` in ``adapters.rfapi``) + to the per-CONTRIBUTING.md exit-code contract: + + * **401** → exit code 2 ("auth error"). ``auth_hint`` overrides ``hint`` + so we can surface a key-specific suggestion ("check ROBOFLOW_API_KEY" + etc.) regardless of what the caller passes. + * **404** → exit code 3 ("not found"). ``not_found_hint`` overrides + ``hint`` similarly. Useful for resources that may legitimately be + absent (deleted, mis-typed slug, etc.). + * **anything else / no status_code attached** → exit code 1 ("error"), + using ``hint`` verbatim. + + Without this helper every handler had to either string-match the message + (brittle) or fall back to a single ``exit_code=3`` for both 401 and 404, + which broke the ``$? == 2`` contract that scripts rely on to decide + whether to retry vs. re-auth. + """ + status = getattr(exc, "status_code", None) + if status == 401: + output_error( + args, + str(exc), + hint=auth_hint or hint or "Check that ROBOFLOW_API_KEY is set and not revoked.", + exit_code=2, + ) + elif status == 404: + output_error(args, str(exc), hint=not_found_hint or hint, exit_code=3) + else: + output_error(args, str(exc), hint=hint, exit_code=1) + + def stub(args: Any) -> None: """Placeholder handler for not-yet-implemented commands.""" output_error(args, "This command is not yet implemented.", hint="Coming soon.", exit_code=1) diff --git a/roboflow/cli/handlers/project.py b/roboflow/cli/handlers/project.py index 0b9d6125..2d0ee140 100644 --- a/roboflow/cli/handlers/project.py +++ b/roboflow/cli/handlers/project.py @@ -223,7 +223,7 @@ def _create_project(args): # noqa: ANN001 def _delete_project(args): # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import confirm_destructive, output, output_api_error, output_error from roboflow.cli._resolver import resolve_resource from roboflow.config import load_roboflow_api_key @@ -247,26 +247,52 @@ def _delete_project(args): # noqa: ANN001 ) return - if not getattr(args, "yes", False) and not getattr(args, "json", False): - import typer - - confirmed = typer.confirm( - f"Move '{workspace_url}/{project_slug}' to Trash? " - "(Retained for 30 days. Any in-flight trainings will be cancelled.)", - default=False, - ) - if not confirmed: - output(args, {"cancelled": True}, text="Cancelled.") - return + if not confirm_destructive( + args, + f"Move '{workspace_url}/{project_slug}' to Trash? " + "(Retained for 30 days. Any in-flight trainings will be cancelled.)", + ): + return try: data = rfapi.delete_project(api_key, workspace_url, project_slug) except rfapi.RoboflowError as exc: - output_error( + # Idempotent re-delete: when the project is already in Trash the + # public API's URL filter excludes it, so the DELETE returns 404 + # with a generic "endpoint does not exist" message. That looks like + # a permissions error to the user. Probe Trash explicitly — if the + # slug is there, treat the call as a no-op success so scripts can + # safely retry without special-casing the second attempt. + if getattr(exc, "status_code", None) == 404: + try: + trash = rfapi.list_trash(api_key, workspace_url) + except rfapi.RoboflowError: + trash = None + if trash is not None: + already = next( + (p for p in trash.get("sections", {}).get("projects", []) if p.get("url") == project_slug), + None, + ) + if already is not None: + data = { + "deleted": True, + "type": "project", + "workspace": workspace_url, + "project": project_slug, + "projectId": already.get("id"), + "trash": True, + "alreadyInTrash": True, + } + output( + args, + data, + text=f"{workspace_url}/{project_slug} is already in Trash (no-op).", + ) + return + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'project:update' scope on this workspace.", - exit_code=3, ) return @@ -279,7 +305,7 @@ def _delete_project(args): # noqa: ANN001 def _restore_project(args): # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import output, output_api_error, output_error from roboflow.cli._resolver import resolve_resource from roboflow.config import load_roboflow_api_key @@ -306,11 +332,11 @@ def _restore_project(args): # noqa: ANN001 try: trash = rfapi.list_trash(api_key, workspace_url) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, + auth_hint="Check that ROBOFLOW_API_KEY is valid for this workspace.", hint="Check your API key has 'project:read' scope on this workspace.", - exit_code=3, ) return @@ -328,11 +354,10 @@ def _restore_project(args): # noqa: ANN001 try: data = rfapi.restore_trash_item(api_key, workspace_url, "project", match["id"]) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'project:update' scope on this workspace.", - exit_code=3, ) return diff --git a/roboflow/cli/handlers/trash.py b/roboflow/cli/handlers/trash.py index 7c7a3815..60056505 100644 --- a/roboflow/cli/handlers/trash.py +++ b/roboflow/cli/handlers/trash.py @@ -52,7 +52,7 @@ def _resolve_workspace(args): def _list_trash(args): # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import output, output_api_error from roboflow.cli._table import format_table workspace_url, api_key = _resolve_workspace(args) @@ -62,11 +62,10 @@ def _list_trash(args): # noqa: ANN001 try: trash = rfapi.list_trash(api_key, workspace_url) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'project:read' scope on this workspace.", - exit_code=3, ) return diff --git a/roboflow/cli/handlers/version.py b/roboflow/cli/handlers/version.py index 275bc6ce..d1bd51d6 100644 --- a/roboflow/cli/handlers/version.py +++ b/roboflow/cli/handlers/version.py @@ -356,7 +356,7 @@ def _create(args): # noqa: ANN001 def _delete_version(args): # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import confirm_destructive, output, output_api_error, output_error from roboflow.cli._resolver import resolve_resource from roboflow.config import load_roboflow_api_key @@ -388,26 +388,55 @@ def _delete_version(args): # noqa: ANN001 ) return - if not getattr(args, "yes", False) and not getattr(args, "json", False): - import typer - - confirmed = typer.confirm( - f"Move version '{workspace_url}/{project_slug}/{version_num}' to Trash? " - "(Retained for 30 days. Any in-flight training will be cancelled.)", - default=False, - ) - if not confirmed: - output(args, {"cancelled": True}, text="Cancelled.") - return + if not confirm_destructive( + args, + f"Move version '{workspace_url}/{project_slug}/{version_num}' to Trash? " + "(Retained for 30 days. Any in-flight training will be cancelled.)", + ): + return try: data = rfapi.delete_version(api_key, workspace_url, project_slug, version_num) except rfapi.RoboflowError as exc: - output_error( + # Idempotent re-delete: if the version is already in Trash, the + # public API URL is filtered and DELETE returns 404 — surface it + # as an explicit no-op so retries don't surface a misleading + # "missing scope" message. Same shape as project delete above. + if getattr(exc, "status_code", None) == 404: + try: + trash = rfapi.list_trash(api_key, workspace_url) + except rfapi.RoboflowError: + trash = None + if trash is not None: + target = str(version_num) + already = next( + ( + v + for v in trash.get("sections", {}).get("versions", []) + if str(v.get("id")) == target and v.get("parentUrl") == project_slug + ), + None, + ) + if already is not None: + data = { + "deleted": True, + "type": "version", + "workspace": workspace_url, + "project": project_slug, + "version": str(version_num), + "trash": True, + "alreadyInTrash": True, + } + output( + args, + data, + text=f"{workspace_url}/{project_slug}/{version_num} is already in Trash (no-op).", + ) + return + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'version:update' scope and the version exists.", - exit_code=3, ) return @@ -420,7 +449,7 @@ def _delete_version(args): # noqa: ANN001 def _restore_version(args): # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import output, output_api_error, output_error from roboflow.cli._resolver import resolve_resource from roboflow.config import load_roboflow_api_key @@ -455,11 +484,10 @@ def _restore_version(args): # noqa: ANN001 try: trash = rfapi.list_trash(api_key, workspace_url) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'project:read' scope on this workspace.", - exit_code=3, ) return @@ -488,11 +516,10 @@ def _restore_version(args): # noqa: ANN001 parent_id=match.get("parentId"), ) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'version:update' scope on this workspace.", - exit_code=3, ) return diff --git a/roboflow/cli/handlers/workflow.py b/roboflow/cli/handlers/workflow.py index cd0be353..8db60b67 100644 --- a/roboflow/cli/handlers/workflow.py +++ b/roboflow/cli/handlers/workflow.py @@ -428,30 +428,60 @@ def _stub_deploy(args) -> None: # noqa: ANN001 def _delete_workflow(args) -> None: # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import confirm_destructive, output, output_api_error resolved = _resolve_workspace_and_key(args) if resolved is None: return workspace_url, api_key = resolved - if not getattr(args, "yes", False) and not getattr(args, "json", False): - confirmed = typer.confirm( - f"Move workflow '{workspace_url}/{args.workflow_url}' to Trash? (Retained for 30 days.)", - default=False, - ) - if not confirmed: - output(args, {"cancelled": True}, text="Cancelled.") - return + if not confirm_destructive( + args, + f"Move workflow '{workspace_url}/{args.workflow_url}' to Trash? (Retained for 30 days.)", + ): + return try: data = rfapi.delete_workflow(api_key, workspace_url, args.workflow_url) except rfapi.RoboflowError as exc: - output_error( + # Idempotent re-delete: a workflow already in Trash returns 404 + # because the public API filters trashed workflows out of the URL + # match. Probe Trash and treat as a no-op success when found — + # mirrors the project / version delete behavior. + if getattr(exc, "status_code", None) == 404: + try: + trash = rfapi.list_trash(api_key, workspace_url) + except rfapi.RoboflowError: + trash = None + if trash is not None: + already = next( + ( + w + for w in trash.get("sections", {}).get("workflows", []) + if w.get("url") == args.workflow_url or w.get("id") == args.workflow_url + ), + None, + ) + if already is not None: + data = { + "deleted": True, + "type": "workflow", + "workspace": workspace_url, + "workflow": args.workflow_url, + "workflowId": already.get("id"), + "trash": True, + "alreadyInTrash": True, + } + output( + args, + data, + text=f"{workspace_url}/{args.workflow_url} is already in Trash (no-op).", + ) + return + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'workflow:update' scope on this workspace.", - exit_code=3, ) return @@ -464,7 +494,7 @@ def _delete_workflow(args) -> None: # noqa: ANN001 def _restore_workflow(args) -> None: # noqa: ANN001 from roboflow.adapters import rfapi - from roboflow.cli._output import output, output_error + from roboflow.cli._output import output, output_api_error, output_error resolved = _resolve_workspace_and_key(args) if resolved is None: @@ -474,11 +504,10 @@ def _restore_workflow(args) -> None: # noqa: ANN001 try: trash = rfapi.list_trash(api_key, workspace_url) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'project:read' scope on this workspace.", - exit_code=3, ) return @@ -500,11 +529,10 @@ def _restore_workflow(args) -> None: # noqa: ANN001 try: data = rfapi.restore_trash_item(api_key, workspace_url, "workflow", match["id"]) except rfapi.RoboflowError as exc: - output_error( + output_api_error( args, - str(exc), + exc, hint="Check your API key has 'workflow:update' scope on this workspace.", - exit_code=3, ) return diff --git a/tests/cli/test_trash_polish.py b/tests/cli/test_trash_polish.py new file mode 100644 index 00000000..4c4f53e2 --- /dev/null +++ b/tests/cli/test_trash_polish.py @@ -0,0 +1,284 @@ +"""Behavioral tests for the 1.3.8 CLI polish fixes. + +Covers the three findings from the prod CLI shake-down: + 1. Auth errors (HTTP 401) should exit with code 2, not 3. + 2. Destructive commands without ``--yes`` and no TTY should bail with a + hint instead of either hanging on a closed stdin or silently + proceeding when ``--json`` is set. + 3. Re-running ``project|version|workflow delete`` on something already in + Trash should be a no-op success rather than surfacing a misleading + "missing scope" 404 message. +""" + +import io +import sys +import unittest +from argparse import Namespace +from unittest.mock import patch + +from roboflow.adapters import rfapi + + +def _args(**overrides): + base = { + "json": False, + "workspace": "test-ws", + "api_key": "fake-key", + "quiet": False, + "yes": False, + } + base.update(overrides) + return Namespace(**base) + + +# --------------------------------------------------------------------------- +# 1. Auth errors → exit code 2 +# --------------------------------------------------------------------------- + + +class TestAuthExitCode(unittest.TestCase): + """``output_api_error`` must map ``status_code=401`` to exit code 2. + + Before this change every ``rfapi.RoboflowError`` from the trash + endpoints was caught with ``exit_code=3`` (not-found), so scripts / + agents couldn't tell "your key is bad, get a new one" apart from + "this resource doesn't exist." + """ + + def test_401_maps_to_exit_2(self) -> None: + from roboflow.cli._output import output_api_error + + exc = rfapi.RoboflowError("This API key does not exist", status_code=401) + with self.assertRaises(SystemExit) as ctx: + output_api_error(_args(), exc, hint="ignored when 401") + self.assertEqual(ctx.exception.code, 2) + + def test_404_maps_to_exit_3(self) -> None: + from roboflow.cli._output import output_api_error + + exc = rfapi.RoboflowError("Not found", status_code=404) + with self.assertRaises(SystemExit) as ctx: + output_api_error(_args(), exc) + self.assertEqual(ctx.exception.code, 3) + + def test_other_status_maps_to_exit_1(self) -> None: + from roboflow.cli._output import output_api_error + + exc = rfapi.RoboflowError("Server died", status_code=500) + with self.assertRaises(SystemExit) as ctx: + output_api_error(_args(), exc) + self.assertEqual(ctx.exception.code, 1) + + def test_no_status_code_maps_to_exit_1(self) -> None: + # Older `raise RoboflowError(text)` call sites don't set status_code; + # they should default to a generic exit 1, NOT to 3 (which would + # impersonate "not found") and NOT to 2 (which would impersonate + # "auth error"). + from roboflow.cli._output import output_api_error + + exc = rfapi.RoboflowError("ambiguous") + with self.assertRaises(SystemExit) as ctx: + output_api_error(_args(), exc) + self.assertEqual(ctx.exception.code, 1) + + def test_trash_response_attaches_status_code(self) -> None: + # rfapi._raise_for_trash_response is the funnel for every 4xx/5xx + # from the soft-delete endpoints — verify it stamps status_code. + class FakeResponse: + status_code = 401 + text = '{"error": "Unauthorized"}' + + def json(self): + return {"error": "Unauthorized"} + + with self.assertRaises(rfapi.RoboflowError) as ctx: + rfapi._raise_for_trash_response(FakeResponse()) + self.assertEqual(ctx.exception.status_code, 401) + self.assertEqual(str(ctx.exception), "Unauthorized") + + +# --------------------------------------------------------------------------- +# 2. Destructive commands gate on --yes OR a TTY (not on --json) +# --------------------------------------------------------------------------- + + +class TestDestructiveConfirm(unittest.TestCase): + """``confirm_destructive`` should: + + * return True when ``--yes`` is set (regardless of ``--json`` or TTY); + * exit cleanly with code 1 when no TTY AND no ``--yes`` (the regression + scenario: ``roboflow project delete X --json < /dev/null`` previously + went through silently because ``--json`` was treated as an implicit + "skip prompt" signal); + * prompt via typer.confirm when on a TTY without ``--yes`` and respect + the user's choice. + """ + + def test_yes_flag_short_circuits(self) -> None: + from roboflow.cli._output import confirm_destructive + + # No TTY, no prompt — but --yes is set, so it should still proceed. + with patch.object(sys.stdin, "isatty", return_value=False): + self.assertTrue(confirm_destructive(_args(yes=True), "destroy?")) + + def test_no_tty_no_yes_bails_with_exit_1(self) -> None: + from roboflow.cli._output import confirm_destructive + + with patch.object(sys.stdin, "isatty", return_value=False): + with self.assertRaises(SystemExit) as ctx: + confirm_destructive(_args(yes=False), "destroy?") + self.assertEqual(ctx.exception.code, 1) + + def test_json_alone_does_not_bypass(self) -> None: + # Regression guard for the original bug: --json without --yes + # should NOT short-circuit the destructive guard. + from roboflow.cli._output import confirm_destructive + + with patch.object(sys.stdin, "isatty", return_value=False): + with self.assertRaises(SystemExit) as ctx: + confirm_destructive(_args(yes=False, json=True), "destroy?") + self.assertEqual(ctx.exception.code, 1) + + def test_tty_prompts_and_respects_decline(self) -> None: + from roboflow.cli._output import confirm_destructive + + captured = io.StringIO() + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch("typer.confirm", return_value=False), + patch("sys.stdout", captured), + ): + self.assertFalse(confirm_destructive(_args(yes=False), "destroy?")) + # Caller doesn't need to re-emit "Cancelled." — confirm_destructive + # already calls output() with the cancelled marker. + self.assertIn("Cancelled", captured.getvalue()) + + def test_tty_prompts_and_respects_accept(self) -> None: + from roboflow.cli._output import confirm_destructive + + with patch.object(sys.stdin, "isatty", return_value=True), patch("typer.confirm", return_value=True): + self.assertTrue(confirm_destructive(_args(yes=False), "destroy?")) + + +# --------------------------------------------------------------------------- +# 3. Idempotent re-delete on project/version/workflow +# --------------------------------------------------------------------------- + + +class TestIdempotentDelete(unittest.TestCase): + """When the DELETE call returns 404 because the resource is already in + Trash (the public API's URL filter excludes trashed items), the handler + should probe ``list_trash`` and emit a synthetic success payload with + ``alreadyInTrash: True``. Previously we surfaced the raw 404 with a + misleading "missing scope" hint.""" + + def _trash_payload_with_project(self, slug: str, project_id: str = "p_id"): + return { + "items": [], + "sections": { + "projects": [{"id": project_id, "url": slug, "name": slug}], + "versions": [], + "workflows": [], + }, + } + + def test_project_already_in_trash_returns_success(self) -> None: + from roboflow.cli.handlers.project import _delete_project + + not_found = rfapi.RoboflowError("Endpoint does not exist", status_code=404) + captured = io.StringIO() + + # Resolver works on "ws/slug" shorthand — pass an explicit workspace + # via args.workspace and a bare slug via args.project_id. + args = _args(project_id="my-proj", yes=True, json=True) + with ( + patch("roboflow.adapters.rfapi.delete_project", side_effect=not_found), + patch( + "roboflow.adapters.rfapi.list_trash", + return_value=self._trash_payload_with_project("my-proj"), + ), + patch("sys.stdout", captured), + ): + _delete_project(args) + + out = captured.getvalue() + self.assertIn('"alreadyInTrash": true', out) + self.assertIn('"deleted": true', out) + self.assertIn('"trash": true', out) + + def test_project_404_not_in_trash_propagates_error(self) -> None: + # If the slug really doesn't exist (not active, not trashed), + # we should NOT swallow the 404 — propagate as exit 3. + from roboflow.cli.handlers.project import _delete_project + + not_found = rfapi.RoboflowError("Endpoint does not exist", status_code=404) + empty_trash = {"items": [], "sections": {"projects": [], "versions": [], "workflows": []}} + + args = _args(project_id="ghost-proj", yes=True, json=True) + with ( + patch("roboflow.adapters.rfapi.delete_project", side_effect=not_found), + patch("roboflow.adapters.rfapi.list_trash", return_value=empty_trash), + patch("sys.stderr", io.StringIO()), + ): + with self.assertRaises(SystemExit) as ctx: + _delete_project(args) + self.assertEqual(ctx.exception.code, 3) + + def test_workflow_already_in_trash_returns_success(self) -> None: + from roboflow.cli.handlers.workflow import _delete_workflow + + not_found = rfapi.RoboflowError("Endpoint does not exist", status_code=404) + trash = { + "items": [], + "sections": { + "projects": [], + "versions": [], + "workflows": [{"id": "wf_id", "url": "my-wf", "name": "My WF"}], + }, + } + captured = io.StringIO() + args = _args(workflow_url="my-wf", yes=True, json=True) + with ( + patch("roboflow.adapters.rfapi.delete_workflow", side_effect=not_found), + patch("roboflow.adapters.rfapi.list_trash", return_value=trash), + patch("sys.stdout", captured), + ): + _delete_workflow(args) + out = captured.getvalue() + self.assertIn('"alreadyInTrash": true', out) + self.assertIn('"workflowId": "wf_id"', out) + + def test_version_already_in_trash_returns_success(self) -> None: + from roboflow.cli.handlers.version import _delete_version + + not_found = rfapi.RoboflowError("Endpoint does not exist", status_code=404) + trash = { + "items": [], + "sections": { + "projects": [], + "versions": [ + { + "id": "1", + "parentUrl": "my-proj", + "parentId": "p_id", + "name": "v1", + } + ], + "workflows": [], + }, + } + captured = io.StringIO() + args = _args(version_ref="my-proj/1", yes=True, json=True) + with ( + patch("roboflow.adapters.rfapi.delete_version", side_effect=not_found), + patch("roboflow.adapters.rfapi.list_trash", return_value=trash), + patch("sys.stdout", captured), + ): + _delete_version(args) + out = captured.getvalue() + self.assertIn('"alreadyInTrash": true', out) + self.assertIn('"version": "1"', out) + + +if __name__ == "__main__": + unittest.main()