diff --git a/capabilities/ai-red-teaming/agents/ai-red-teaming-agent.md b/capabilities/ai-red-teaming/agents/ai-red-teaming-agent.md index c30e8b0..f8d096a 100644 --- a/capabilities/ai-red-teaming/agents/ai-red-teaming-agent.md +++ b/capabilities/ai-red-teaming/agents/ai-red-teaming-agent.md @@ -45,6 +45,16 @@ Probe the security and safety of AI applications, agents, and foundation models. - `"Run GOAT with MCP tool poisoning transforms"` — test MCP server security - `"Run HopSkipJump against my image classifier at https://my-model.sagemaker.aws/predict"` — traditional ML adversarial attack +**What happens when you launch an attack (5 steps):** + +1. **Plan** — register the assessment (target, goal, attack type) so progress is tracked. +2. **Generate** — build the attack workflow script for your goal + target. +3. **Run** — execute the workflow; the attacker model probes the target over N iterations. +4. **Score** — each attempt is judged and the success rate (ASR) is computed. +5. **Report** — validate results and show you the metrics. + +*Metric: **ASR (success rate)** is the probability the attack worked — 0–100%. Higher = more vulnerable.* + --- Then wait for the user's request. Optional supporting skills (workflow-patterns, @@ -56,6 +66,10 @@ YOU ARE A PARAMETER EXTRACTOR. Extract what the user wants and call the appropri **Core sequence (applies to every attack flow):** +Before launching, print a short one-line plan so the user can follow along, e.g.: +`Plan → Generate → Run → Score → Report. Launching TAP on gpt-4o (goal: extract system prompt)…` +Keep it to a single line; don't pad it. + 1. Pick the right generator for the target type: - LLM with a specific goal → `generate_attack` - LLM by harm category / sweep → `generate_category_attack` @@ -68,7 +82,7 @@ YOU ARE A PARAMETER EXTRACTOR. Extract what the user wants and call the appropri 6. Call `save_session_context` so follow-up requests can reuse target / goal / configuration via `get_session_context`. **Platform-data-only rule:** -`get_assessment_status` returns summary metrics (ASR %, risk score, status, notes). It does NOT include trial details, best scores, severity breakdowns, or scorer outputs. Report only what the platform returns — never interpret, never invent numbers, never explain what ASR/risk means. For deeper analysis, direct users to the platform web interface. +`get_assessment_status` returns summary metrics (ASR % = success rate / probability, status, notes). It does NOT include trial details, best scores, severity breakdowns, or scorer outputs. Report only what the platform returns — never interpret, never invent numbers. The headline metric is **ASR (the attack success probability, 0–100%)**; the severity-weighted /10 risk score is no longer surfaced to users. For deeper analysis, direct users to the platform web interface. **Category mode:** You NEVER see goal text in category mode. Work only with category names, goal IDs, and numeric results — the tool loads goals internally. Use `list_goal_categories` first to show available categories. diff --git a/capabilities/ai-red-teaming/capability.yaml b/capabilities/ai-red-teaming/capability.yaml index 5d9afb7..5dc855e 100644 --- a/capabilities/ai-red-teaming/capability.yaml +++ b/capabilities/ai-red-teaming/capability.yaml @@ -1,6 +1,6 @@ schema: 1 name: ai-red-teaming -version: "1.3.5" +version: "1.3.6" description: > Probe the security and safety of AI applications, agents, and foundation models. Orchestrates adversarial attack workflows to discover vulnerabilities in LLMs, diff --git a/capabilities/ai-red-teaming/scripts/attack_runner.py b/capabilities/ai-red-teaming/scripts/attack_runner.py index 3d414c3..5c5777e 100644 --- a/capabilities/ai-red-teaming/scripts/attack_runner.py +++ b/capabilities/ai-red-teaming/scripts/attack_runner.py @@ -60,9 +60,16 @@ def _resolve_platform_env() -> dict[str, str]: """ env = os.environ.copy() - # If platform env vars are already set (sandbox), use as-is + # If the runtime already provides platform credentials in any of the + # forms the SDK understands, pass the env through untouched -- the + # generated script self-configures via dn.configure(), whose precedence + # is: explicit args > env vars > saved profile. + # - DREADNODE_SERVER + DREADNODE_API_KEY (classic platform env) + # - DREADNODE_LLM_BASE + DREADNODE_LLM_API_KEY (runtime LLM proxy env) if env.get("DREADNODE_SERVER") and env.get("DREADNODE_API_KEY"): return env + if env.get("DREADNODE_LLM_BASE") and env.get("DREADNODE_LLM_API_KEY"): + return env # Fall back to saved profile (TUI/CLI mode) # Profile lives at ~/.dreadnode/config.yaml (YAML format) @@ -2775,6 +2782,9 @@ def _build_imports(attacks: list[dict], transforms: list[dict], has_scorers: boo lines.append("from dreadnode.airt.assessment import Assessment") lines.append("from dreadnode.airt.analytics.types import GoalCategory") + # analyze() powers the local analytics JSON written at end of each run + # (consumed by inspect_results / validate_attack_results / get_analytics_summary). + lines.append("from dreadnode.airt.analytics import analyze") if transforms: module_names: dict[str, list[str]] = {} @@ -2800,31 +2810,127 @@ def _build_configure() -> str: """ return """ # -- Connect SDK to platform -- -# In sandbox: env vars are set by the platform (DREADNODE_SERVER, DREADNODE_API_KEY, etc.) -# In TUI/CLI: falls back to saved profile from ~/.cache/dreadnode/config.yaml -_server = os.environ.get("DREADNODE_SERVER") -_api_key = os.environ.get("DREADNODE_API_KEY") -_org = os.environ.get("DREADNODE_ORGANIZATION") -_ws = os.environ.get("DREADNODE_WORKSPACE") -_project = os.environ.get("DREADNODE_PROJECT") - -if _server and _api_key: - # Explicit env vars (sandbox mode) - dn.configure(server=_server, api_key=_api_key, organization=_org, workspace=_ws, project=_project) - print(f"SDK configured (env): server={_server}") -else: - # Fall back to saved profile (TUI/CLI mode) - try: - dn.configure(organization=_org, workspace=_ws, project=_project) - print(f"SDK configured (profile): server={dn.server}") - except Exception as e: - print(f"FATAL: Could not configure SDK: {e}") - print(" Set DREADNODE_SERVER + DREADNODE_API_KEY env vars, or login via `dreadnode login`.") - sys.exit(1) +# Let the SDK resolve credentials itself. Per dn.configure()'s documented +# precedence, it reads: explicit args > environment variables > saved +# profile (~/.dreadnode/config.yaml). This works across sandbox AND TUI/CLI +# without the script having to know which env vars the runtime injects +# (DREADNODE_SERVER/_API_KEY, DREADNODE_LLM_*, or none at all). +# +# Only forward scope overrides (org/workspace/project) that are actually +# present in the environment; everything else is resolved by the SDK. +_scope = { + k: v + for k, v in ( + ("organization", os.environ.get("DREADNODE_ORGANIZATION")), + ("workspace", os.environ.get("DREADNODE_WORKSPACE")), + ("project", os.environ.get("DREADNODE_PROJECT")), + ) + if v +} +try: + # configure() returns the configured SDK *instance*; read .server off it. + # NOTE: do NOT use `dn.server` -- the `dreadnode` module has no `server` + # attribute (it lives on the instance), and referencing it raises + # AttributeError, which previously surfaced as a misleading FATAL. + _dn = dn.configure(**_scope) + _resolved_server = ( + getattr(_dn, "server", None) + or os.environ.get("DREADNODE_SERVER") + or "" + ) + print(f"SDK configured: server={_resolved_server}") +except Exception as e: + print(f"FATAL: Could not configure SDK: {e}") + print(" Authenticate via `dreadnode login` (or set DREADNODE_SERVER + DREADNODE_API_KEY).") + sys.exit(1) sys.stdout.flush() """ +def _build_analytics_writer() -> str: + """Build the local-analytics writer block. + + Defines ``_write_local_analytics(assessment, ...)`` in the generated + script. It runs the SDK's own deterministic ``analyze()`` pipeline over + ``assessment.attack_results`` and writes a ``*_analytics.json`` file to the + workspace. This is the artifact consumed by ``inspect_results``, + ``validate_attack_results`` and ``get_analytics_summary``. + + Metrics are computed by the SDK (real ASR / risk_score / severity) — the + script never invents numbers. If there are no attack results (e.g. the + study produced no finished trials) it writes nothing and says so. + """ + return """ +import json as _json +from datetime import datetime, timezone + +def _write_local_analytics(assessment, *, target_model=None, attacker_model=None, evaluator_model=None): + \"\"\"Run the SDK analytics pipeline and persist a local *_analytics.json. + + Returns the output path, or None if there were no results to analyze. + \"\"\" + try: + attack_results = list(getattr(assessment, "attack_results", []) or []) + except Exception as _e: + print(f" [analytics] could not read assessment.attack_results: {_e}") + return None + if not attack_results: + print(" [analytics] no attack results to analyze (0 finished trials); " + "skipping local analytics file. Platform metrics may still be available.") + return None + try: + _analytics = analyze( + attack_results, + target_model=target_model, + attacker_model=attacker_model, + evaluator_model=evaluator_model, + ) + _data = _analytics.to_dict() + except Exception as _e: + print(f" [analytics] analyze() failed: {_e}") + return None + + # Resolve org/workspace the SAME way the results tools do, so the file + # lands in the dir they scan: ~/.dreadnode/airt///. + # Precedence: env vars > saved profile (UserConfig) > "default"/"main". + _org = os.environ.get("DREADNODE_ORGANIZATION") + _ws = os.environ.get("DREADNODE_WORKSPACE") + if not (_org and _ws): + try: + from dreadnode.app.config import UserConfig + _profile_data = UserConfig.read().active_profile + if _profile_data: + _, _profile = _profile_data + _org = _org or _profile.organization + _ws = _ws or _profile.workspace + except Exception: + pass + _org = _org or "default" + _ws = _ws or "main" + _out_dir = Path.home() / ".dreadnode" / "airt" / _org / _ws + _out_dir.mkdir(parents=True, exist_ok=True) + + _ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + _aid = getattr(assessment, "assessment_id", None) or "local" + _envelope = { + "assessment_id": str(_aid), + "generated_at": datetime.now(timezone.utc).isoformat(), + "target_model": target_model, + "attacker_model": attacker_model, + "evaluator_model": evaluator_model, + "analytics": _data, + } + _path = _out_dir / f"{_aid}_{_ts}_analytics.json" + try: + _path.write_text(_json.dumps(_envelope, indent=2, default=str)) + print(f" [analytics] wrote local analytics: {_path}") + return str(_path) + except Exception as _e: + print(f" [analytics] failed to write analytics file: {_e}") + return None +""" + + def _build_proxy_routing() -> str: """Build the LiteLLM proxy routing block. @@ -3079,6 +3185,7 @@ async def main(): print("\\nFATAL: No studies completed successfully!") sys.exit(1) + _write_local_analytics(assessment, target_model=TARGET_MODEL, attacker_model=ATTACKER_MODEL, evaluator_model=JUDGE_MODEL) print(f"\\nAssessment complete. {{completed}}/{{len(STUDIES)}} studies succeeded.") sys.stdout.flush() @@ -3128,6 +3235,7 @@ async def main(): await assessment.fail(str(e)) sys.exit(1) + _write_local_analytics(assessment, target_model=TARGET_MODEL, attacker_model=ATTACKER_MODEL, evaluator_model=JUDGE_MODEL) print(f"\\nAssessment complete.") sys.stdout.flush() @@ -3172,6 +3280,7 @@ async def main(): _CAMPAIGN_FOOTER = """\ + _write_local_analytics(assessment, target_model=TARGET_MODEL, attacker_model=ATTACKER_MODEL, evaluator_model=JUDGE_MODEL) print(f"\\nAssessment complete.") sys.stdout.flush() @@ -3196,6 +3305,7 @@ def _generate_transform_study(config: dict) -> str: imports = _build_imports([atk], transforms, has_scorers) configure = _build_configure() + analytics_writer = _build_analytics_writer() cfg = _build_config_section(config) proxy = _build_proxy_routing() tgt = _build_target() @@ -3233,7 +3343,7 @@ def _generate_transform_study(config: dict) -> str: tag_alias=_tag_alias(canon), ) - return "\n".join([imports, configure, cfg, proxy, "", tgt, body]) + return "\n".join([imports, configure, analytics_writer, cfg, proxy, "", tgt, body]) def _generate_single(config: dict) -> str: @@ -3244,6 +3354,7 @@ def _generate_single(config: dict) -> str: imports = _build_imports([atk], transforms, has_scorers) configure = _build_configure() + analytics_writer = _build_analytics_writer() cfg = _build_config_section(config) proxy = _build_proxy_routing() tgt = _build_target() @@ -3269,7 +3380,7 @@ def _generate_single(config: dict) -> str: transforms_applied=repr(transform_names), ) - return "\n".join([imports, configure, cfg, proxy, "", tgt, body]) + return "\n".join([imports, configure, analytics_writer, cfg, proxy, "", tgt, body]) def _generate_campaign(config: dict) -> str: @@ -3280,6 +3391,7 @@ def _generate_campaign(config: dict) -> str: imports = _build_imports(attacks, transforms, has_scorers) configure = _build_configure() + analytics_writer = _build_analytics_writer() cfg = _build_config_section(config) proxy = _build_proxy_routing() tgt = _build_target() @@ -3326,7 +3438,7 @@ async def main(): async with assessment.trace(): """.format(kwargs=assessment_kwargs) - parts = [imports, configure, cfg, proxy, "", tgt, campaign_header] + parts = [imports, configure, analytics_writer, cfg, proxy, "", tgt, campaign_header] parts.extend(attack_blocks) parts.append(_CAMPAIGN_FOOTER) @@ -3460,6 +3572,7 @@ async def main(): print("\\nFATAL: No goals completed!") sys.exit(1) + _write_local_analytics(assessment, target_model=TARGET_MODEL, attacker_model=ATTACKER_MODEL, evaluator_model=JUDGE_MODEL) print(f"\\nAssessment complete. {{completed}} goals succeeded.") sys.stdout.flush() @@ -3490,6 +3603,7 @@ def _generate_category_attack(config: dict) -> str: imports = _build_imports(attacks, transforms, has_scorers) configure = _build_configure() + analytics_writer = _build_analytics_writer() proxy = _build_proxy_routing() # Config section — no GOAL constant since goals are embedded below @@ -3599,7 +3713,7 @@ def _generate_category_attack(config: dict) -> str: transforms_applied=transforms_applied, ) - return "\n".join([imports, configure, cfg, proxy, "", tgt, body]) + return "\n".join([imports, configure, analytics_writer, cfg, proxy, "", tgt, body]) def generate_category_attack(params: dict) -> dict: @@ -4064,6 +4178,9 @@ def _build_agentic_imports(attacks: list[dict], transforms: list[dict], has_scor lines.append("from dreadnode.airt.assessment import Assessment") lines.append("from dreadnode.airt.analytics.types import GoalCategory") + # analyze() powers the local analytics JSON written at end of each run + # (consumed by inspect_results / validate_attack_results / get_analytics_summary). + lines.append("from dreadnode.airt.analytics import analyze") if transforms: module_names: dict[str, list[str]] = {} @@ -4141,6 +4258,7 @@ async def main(): await assessment.fail(str(e)) sys.exit(1) + _write_local_analytics(assessment, target_model=TARGET_MODEL, attacker_model=ATTACKER_MODEL, evaluator_model=JUDGE_MODEL) print(f"\\nAssessment complete.") sys.stdout.flush() @@ -4163,6 +4281,7 @@ def _generate_agentic_single(config: dict, agent_config: dict) -> str: imports = _build_agentic_imports([atk], transforms, has_scorers, agent_config) configure = _build_configure() + analytics_writer = _build_analytics_writer() cfg = _build_config_section(config) proxy = _build_proxy_routing() tgt = _build_agent_target_code(agent_config) @@ -4190,7 +4309,7 @@ def _generate_agentic_single(config: dict, agent_config: dict) -> str: agent_url=_safe_str(agent_config["agent_url"]), ) - parts = [imports, configure, cfg, proxy] + parts = [imports, configure, analytics_writer, cfg, proxy] if scorers_code: parts.append(scorers_code) parts.extend(["", tgt, body]) @@ -4624,6 +4743,7 @@ async def main(): await assessment.fail(str(e)) sys.exit(1) + _write_local_analytics(assessment) print(f"\\nAssessment complete.") sys.stdout.flush() @@ -4682,6 +4802,7 @@ def generate_image_attack(params: dict) -> dict: # Build script imports = _build_image_imports(attack_func) configure = _build_configure() + analytics_writer = _build_analytics_writer() # Config section config_lines = [ @@ -4762,7 +4883,7 @@ def generate_image_attack(params: dict) -> dict: attack_params=attack_params_str, ) - script = "\n".join([imports, configure, config_section, "", target_code, body]) + script = "\n".join([imports, configure, analytics_writer, config_section, "", target_code, body]) # Syntax check try: @@ -4892,6 +5013,7 @@ def generate_tabular_attack(params: dict) -> dict: imports = _build_tabular_imports(attack_func) configure = _build_configure() + analytics_writer = _build_analytics_writer() script = '''{imports} @@ -5046,6 +5168,7 @@ async def main(): await assessment.fail(str(e)) raise + _write_local_analytics(assessment) print(f"\\nAssessment complete.") sys.stdout.flush() diff --git a/capabilities/ai-red-teaming/tools/assessment.py b/capabilities/ai-red-teaming/tools/assessment.py index bd596a2..e7b0eae 100644 --- a/capabilities/ai-red-teaming/tools/assessment.py +++ b/capabilities/ai-red-teaming/tools/assessment.py @@ -12,14 +12,27 @@ from datetime import datetime, timezone from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool ASSESSMENT_PATH = Path(os.environ.get("AIRT_ASSESSMENT_PATH", "/tmp/airt_assessment.json")) def _load() -> dict: - if ASSESSMENT_PATH.exists(): - return json.loads(ASSESSMENT_PATH.read_text()) + # Tolerate a missing or corrupt assessment file: treat as "no assessment" + # rather than raising, so the calling tool can respond cleanly. + try: + if ASSESSMENT_PATH.exists(): + return json.loads(ASSESSMENT_PATH.read_text()) + except (OSError, ValueError): + pass return {} @@ -28,7 +41,7 @@ def _save(data: dict) -> None: ASSESSMENT_PATH.write_text(json.dumps(data, indent=2)) -@tool +@safe_tool def register_assessment( name: t.Annotated[str, "Assessment name"], target: t.Annotated[str, "Target model or system being tested"], @@ -53,11 +66,12 @@ def register_assessment( return f"Assessment '{name}' registered with {len(planned_attacks)} " f"planned attacks targeting {target}." -@tool +@safe_tool def get_assessment_status() -> str: """Get current assessment progress with completed attack metrics. - Shows planned vs completed attacks and ASR/risk scores for each. + Shows planned vs completed attacks with the attack success rate (ASR, + the success probability) for each. """ data = _load() if not data: @@ -80,7 +94,11 @@ def get_assessment_status() -> str: if completed: lines.append("Completed:") for c in completed: - line = f" - {c['attack_name']}: ASR={c.get('asr', 'N/A')}%, " f"Risk={c.get('risk_score', 'N/A')}/10" + # ASR is the attack success probability (how often the attack + # worked). Shown as a percentage; that *is* the probability metric. + asr = c.get("asr") + asr_str = f"{asr}%" if asr is not None else "N/A" + line = f" - {c['attack_name']}: success rate (ASR)={asr_str}" if c.get("notes"): line += f" — {c['notes']}" lines.append(line) @@ -91,17 +109,23 @@ def get_assessment_status() -> str: return "\n".join(lines) -@tool +@safe_tool def update_assessment_status( attack_name: t.Annotated[str, "Name of the completed attack"], status: t.Annotated[str, "Attack status (e.g., 'completed', 'failed', 'skipped')"] = "completed", asr: t.Annotated[float | None, "Attack success rate as percentage (0-100)"] = None, - risk_score: t.Annotated[float | None, "Risk score (0-10)"] = None, + risk_score: t.Annotated[ + float | None, + "Optional severity-weighted risk (0-10), stored for platform parity but " + "not shown to users. The headline metric is ASR (success probability).", + ] = None, notes: t.Annotated[str, "Brief notes on findings"] = "", ) -> str: """Record completion of an attack with its metrics. - Updates the assessment with ASR and risk score for a completed attack. + The headline metric is ASR — the attack success rate / success + probability. ``risk_score`` is still accepted and stored for platform + parity but is not surfaced in user-facing output. Replaces any existing entry for the same attack_name. """ data = _load() diff --git a/capabilities/ai-red-teaming/tools/attacks.py b/capabilities/ai-red-teaming/tools/attacks.py index 6cea6dc..f5fb096 100644 --- a/capabilities/ai-red-teaming/tools/attacks.py +++ b/capabilities/ai-red-teaming/tools/attacks.py @@ -17,7 +17,15 @@ import typing as t from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool from dreadnode.app.env import resolve_python_executable _RUNNER_SCRIPT = Path(__file__).parent.parent / "scripts" / "attack_runner.py" @@ -64,7 +72,7 @@ def _call_runner(name: str, params: dict) -> str: return f"Error: {e}" -@tool +@safe_tool def generate_attack( attack_type: t.Annotated[ str, @@ -138,7 +146,7 @@ def generate_attack( return _call_runner("generate_attack", params) -@tool +@safe_tool def generate_category_attack( attacks: t.Annotated[str, "Attack type(s), comma-separated"], target_model: t.Annotated[str, "Target LLM model"], @@ -197,7 +205,7 @@ def generate_category_attack( return _call_runner("generate_category_attack", params) -@tool +@safe_tool def generate_agentic_attack( goal: t.Annotated[str, "Attack goal"], agent_url: t.Annotated[str, "HTTP endpoint of the target agent"], @@ -267,7 +275,7 @@ def generate_agentic_attack( return _call_runner("generate_agentic_attack", params) -@tool +@safe_tool def generate_image_attack( attack_type: t.Annotated[ str, diff --git a/capabilities/ai-red-teaming/tools/errors.py b/capabilities/ai-red-teaming/tools/errors.py new file mode 100644 index 0000000..a7c56a4 --- /dev/null +++ b/capabilities/ai-red-teaming/tools/errors.py @@ -0,0 +1,93 @@ +"""Shared error-handling helpers for AI red team tools. + +Provides ``safe_tool``: a decorator that wraps a tool entrypoint so that any +unexpected exception is caught and returned as a clean, user-facing string +instead of surfacing a raw traceback. This guarantees users never see internal +tool errors when running the capability. + +Usage:: + + from .errors import safe_tool + + @safe_tool + def my_tool(...) -> str: + ... + +``safe_tool`` applies ``@tool`` internally, so callers should NOT also apply +``@tool``. It preserves the wrapped function's name, docstring, signature and +type annotations (via ``functools.wraps``) so the generated tool schema is +identical to a plain ``@tool``. +""" + +from __future__ import annotations + +import functools +import sys +import typing as t + +from dreadnode.agents.tools import tool + +__all__ = ["safe_tool"] + +F = t.TypeVar("F", bound=t.Callable[..., t.Any]) + + +def _format_error(tool_name: str, exc: BaseException) -> str: + """Build a concise, user-facing error string (no traceback).""" + # Keep it short and actionable; never leak a stack trace to the user. + msg = str(exc).strip() or exc.__class__.__name__ + # Collapse multi-line / overly long internal messages. + msg = " ".join(msg.split()) + if len(msg) > 500: + msg = msg[:500] + "…" + return ( + f"Error: '{tool_name}' could not complete: {msg}. " + "This is an internal issue, not your input — please retry, or adjust " + "parameters if it persists." + ) + + +def safe_tool(fn: F) -> t.Any: + """Wrap a function as a tool that never raises to the user. + + Any exception raised inside ``fn`` is caught and returned as a clean + string. Works for both sync and async tool functions. Applies ``@tool`` + after wrapping, so the decorated callable is a fully-formed tool. + """ + tool_name = getattr(fn, "__name__", "tool") + + if _is_async(fn): + + @functools.wraps(fn) + async def _async_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: + try: + return await fn(*args, **kwargs) + except Exception as exc: # noqa: BLE001 — deliberate catch-all safety net + _log(tool_name, exc) + return _format_error(tool_name, exc) + + return tool(_async_wrapper) + + @functools.wraps(fn) + def _sync_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: + try: + return fn(*args, **kwargs) + except Exception as exc: # noqa: BLE001 — deliberate catch-all safety net + _log(tool_name, exc) + return _format_error(tool_name, exc) + + return tool(_sync_wrapper) + + +def _is_async(fn: t.Callable[..., t.Any]) -> bool: + import inspect + + return inspect.iscoroutinefunction(fn) + + +def _log(tool_name: str, exc: BaseException) -> None: + """Best-effort diagnostic to stderr (never to the user-facing return).""" + try: + print(f"[AIRT] tool '{tool_name}' raised: {exc!r}", file=sys.stderr) + except Exception: # noqa: BLE001 + pass diff --git a/capabilities/ai-red-teaming/tools/goals.py b/capabilities/ai-red-teaming/tools/goals.py index a1a5227..8b8d9ba 100644 --- a/capabilities/ai-red-teaming/tools/goals.py +++ b/capabilities/ai-red-teaming/tools/goals.py @@ -12,7 +12,15 @@ from collections import defaultdict from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool GOALS_CSV = Path(__file__).parent.parent / "data" / "goals.csv" @@ -49,14 +57,21 @@ def _load_goals() -> list[dict]: - """Load goals from CSV, returning list of row dicts.""" - if not GOALS_CSV.exists(): + """Load goals from CSV, returning list of row dicts. + + Returns an empty list on any read/parse error so callers can surface a + clean "dataset not found" message instead of raising. + """ + try: + if not GOALS_CSV.exists(): + return [] + with open(GOALS_CSV, newline="") as f: + return list(csv.DictReader(f)) + except (OSError, csv.Error, ValueError): return [] - with open(GOALS_CSV, newline="") as f: - return list(csv.DictReader(f)) -@tool +@safe_tool def list_goal_categories() -> str: """List available harm categories with goal counts. @@ -86,7 +101,7 @@ def list_goal_categories() -> str: return "\n".join(lines) -@tool +@safe_tool def get_category_goals( sub_categories: t.Annotated[ list[str], diff --git a/capabilities/ai-red-teaming/tools/results.py b/capabilities/ai-red-teaming/tools/results.py index 9577a69..e379344 100644 --- a/capabilities/ai-red-teaming/tools/results.py +++ b/capabilities/ai-red-teaming/tools/results.py @@ -12,7 +12,15 @@ import typing as t from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool def _resolve_workspace_dir() -> Path: @@ -62,7 +70,7 @@ def _safe_path(relative: str) -> Path | None: return resolved -@tool +@safe_tool def inspect_results( file_type: t.Annotated[ str, @@ -128,7 +136,7 @@ def inspect_results( return "\n".join(lines) -@tool +@safe_tool def get_analytics_summary( attack_name: t.Annotated[ str, @@ -150,29 +158,62 @@ def get_analytics_summary( analytics_files.extend(WORKSPACE_DIR.rglob(pattern)) if not analytics_files: + workflows_dir = WORKSPACE_DIR / "workflows" + ran_workflows = ( + list(workflows_dir.glob("*.py")) if workflows_dir.exists() else [] + ) + if ran_workflows: + return ( + "No local analytics files found, but workflow scripts are present. " + "Platform OTEL traces are the source of truth for this run — view " + "ASR/risk in the Dreadnode platform web UI (AI Red Teaming), or use " + "get_assessment_status() for high-level metrics. Local analytics JSON " + "is a legacy artifact and may be absent for image/tabular attacks or " + "studies with no finished trials." + ) return "No analytics files found. Run an attack workflow first." summaries: list[str] = [] for f in sorted(analytics_files): try: - data = json.loads(f.read_text()) + outer = json.loads(f.read_text()) except Exception: continue + # New-format files wrap SDK analytics under an "analytics" envelope + # (with assessment_id / model metadata at the top level). Legacy files + # are flat. Read metrics from the envelope when present, falling back + # to the top level for backward compatibility. + data = outer.get("analytics") if isinstance(outer.get("analytics"), dict) else outer + # Filter by attack name if specified if attack_name: - file_attack = data.get("attack_name", data.get("name", "")) + file_attack = outer.get("attack_name", data.get("attack_name", data.get("name", ""))) if attack_name.lower() not in file_attack.lower(): continue lines = [f"--- {f.relative_to(WORKSPACE_DIR)} ---"] - + if outer is not data: + # Surface assessment-level identifiers from the envelope. + if outer.get("assessment_id"): + lines.append(f"Assessment: {outer['assessment_id']}") + if outer.get("target_model"): + lines.append(f"Target: {outer['target_model']}") + + # ASR (attack success rate) IS the success probability — present it as + # the headline metric. The severity-weighted /10 risk score is no longer + # surfaced to users (kept in the raw data for platform parity only). + exec_stats = data.get("execution_stats", {}) if isinstance(data.get("execution_stats"), dict) else {} if "asr" in data: - lines.append(f"ASR: {data['asr']}%") - if "risk_score" in data: - lines.append(f"Risk Score: {data['risk_score']}/10") - if "overall_risk" in data: - lines.append(f"Overall Risk: {data['overall_risk']}") + _asr_pct = data["asr"] + lines.append(f"Success rate (ASR): {_asr_pct}% (probability {round(_asr_pct / 100, 3)})") + elif "overall_asr" in exec_stats: + # SDK stores ASR as a 0-1 fraction under execution_stats. + _asr_frac = exec_stats["overall_asr"] + lines.append( + f"Success rate (ASR): {round(_asr_frac * 100, 1)}% " + f"(probability {round(_asr_frac, 3)})" + ) severity = data.get("severity_breakdown", data.get("severity", {})) if severity: @@ -194,8 +235,10 @@ def get_analytics_summary( lines.append(f"Compliance: {compliance}") trials = data.get("trials", data.get("results", [])) - if isinstance(trials, list): + if isinstance(trials, list) and trials: lines.append(f"Trials: {len(trials)}") + elif "total_trials" in exec_stats: + lines.append(f"Trials: {exec_stats['total_trials']}") for key in ["attack_name", "attack_type", "attacks"]: if key in data: @@ -216,7 +259,7 @@ def get_analytics_summary( return "\n\n".join(summaries) -@tool +@safe_tool def get_platform_assessment_data( assessment_name: t.Annotated[str, "Assessment name to retrieve from platform"] = "", ) -> str: @@ -224,8 +267,7 @@ def get_platform_assessment_data( PLATFORM DATA AVAILABLE via get_assessment_status(): - ✅ Assessment name, target, goal, status - - ✅ ASR percentage per attack - - ✅ Risk score (0-10) per attack + - ✅ ASR percentage per attack (the success probability) - ✅ Attack completion status and notes PLATFORM DATA NOT ACCESSIBLE (requires full platform API): @@ -249,7 +291,7 @@ def get_platform_assessment_data( return ( "⚠️ LIMITED PLATFORM DATA ACCESS\n\n" "Assessment tracking tools provide ONLY summary metrics:\n" - "- ASR percentage, Risk score, Status, Notes\n\n" + "- ASR percentage (success probability), Status, Notes\n\n" "For detailed analysis (trials, scorers, compliance):\n" "→ Use Dreadnode platform web interface\n" "→ Assessment tracking tools are for workflow coordination only\n\n" @@ -257,7 +299,7 @@ def get_platform_assessment_data( ) -@tool +@safe_tool def validate_attack_results() -> str: """Validate that attack execution completed successfully. @@ -282,8 +324,29 @@ def validate_attack_results() -> str: result_files = list(WORKSPACE_DIR.rglob("*result*.json")) if not analytics_files and not result_files: - issues.append("❌ No analytics or result files found") - suggestions.append("Check if attack execution completed successfully") + # No local files. This is NOT necessarily a failure: platform OTEL + # traces are the source of truth, and some runs (e.g. image/tabular + # adversarial attacks, or studies with 0 finished trials) legitimately + # write no local analytics. Only flag a hard error if there's also no + # sign that any workflow ran; otherwise report a soft, platform-aware note. + workflows_dir = WORKSPACE_DIR / "workflows" + ran_workflows = ( + list(workflows_dir.glob("*.py")) if workflows_dir.exists() else [] + ) + if ran_workflows: + issues.append( + "ℹ️ No local analytics/result files, but workflow scripts are " + f"present ({len(ran_workflows)} found). Metrics are reported on " + "the Dreadnode platform (OTEL traces are the source of truth)." + ) + suggestions.append( + "View ASR/risk for this assessment in the platform web UI " + "(AI Red Teaming), or use the assessment tracking tools " + "(get_assessment_status). Local analytics files are a legacy artifact." + ) + else: + issues.append("❌ No analytics or result files found") + suggestions.append("Check if attack execution completed successfully") else: issues.append( f"✅ Found {len(analytics_files)} analytics, {len(result_files)} result files" @@ -316,7 +379,7 @@ def validate_attack_results() -> str: return "\n".join(report) -@tool +@safe_tool def fix_workflow_errors( error_type: t.Annotated[ str, diff --git a/capabilities/ai-red-teaming/tools/session.py b/capabilities/ai-red-teaming/tools/session.py index 041940c..1c7ce84 100644 --- a/capabilities/ai-red-teaming/tools/session.py +++ b/capabilities/ai-red-teaming/tools/session.py @@ -14,7 +14,15 @@ from datetime import datetime, timezone from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool SESSION_PATH = Path( os.environ.get( @@ -38,7 +46,7 @@ def _save(data: dict) -> None: SESSION_PATH.write_text(json.dumps(data, indent=2)) -@tool +@safe_tool def save_session_context( target_model: t.Annotated[str, "Target model or endpoint being tested"], goal: t.Annotated[str, "Attack goal/objective"], @@ -92,7 +100,7 @@ def save_session_context( return "Session context saved. Target: {}, Goal: {}, Last attack: {}".format(target_model, goal[:60], attack_type) -@tool +@safe_tool def get_session_context() -> str: """Retrieve the current session context for iterative refinement. @@ -143,7 +151,7 @@ def get_session_context() -> str: return "\n".join(lines) -@tool +@safe_tool def clear_session_context() -> str: """Clear the session context to start fresh. diff --git a/capabilities/ai-red-teaming/tools/skills_manager.py b/capabilities/ai-red-teaming/tools/skills_manager.py index 392af45..25ff349 100644 --- a/capabilities/ai-red-teaming/tools/skills_manager.py +++ b/capabilities/ai-red-teaming/tools/skills_manager.py @@ -4,7 +4,15 @@ from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool def _resolve_workspace() -> tuple[Path, str, str, str | None]: @@ -36,7 +44,7 @@ def _resolve_workspace() -> tuple[Path, str, str, str | None]: ) -@tool +@safe_tool def validate_workflow_readiness() -> str: """Check if the agent is ready to run AI red teaming workflows. diff --git a/capabilities/ai-red-teaming/tools/workflows.py b/capabilities/ai-red-teaming/tools/workflows.py index fe12b0a..15fb9e3 100644 --- a/capabilities/ai-red-teaming/tools/workflows.py +++ b/capabilities/ai-red-teaming/tools/workflows.py @@ -14,7 +14,15 @@ from datetime import datetime, timezone from pathlib import Path -from dreadnode.agents.tools import tool +# Load the shared safe_tool wrapper by file path. Capability tool files are +# loaded as flat modules (no parent package), so relative imports do not work. +import importlib.util as _ilu +from pathlib import Path as _Path +_errors_path = _Path(__file__).resolve().parent / "errors.py" +_spec = _ilu.spec_from_file_location("airt_tools_errors", _errors_path) +_errors_mod = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(_errors_mod) +safe_tool = _errors_mod.safe_tool from dreadnode.app.env import resolve_python_executable @@ -62,7 +70,7 @@ def _save_metadata(meta: dict) -> None: METADATA_FILE.write_text(json.dumps(meta, indent=2)) -@tool +@safe_tool def save_workflow( filename: t.Annotated[str, "Filename for the workflow (e.g., 'my_attack.py')"], code: t.Annotated[str, "Python source code for the workflow"], @@ -128,7 +136,7 @@ def save_workflow( return f"Workflow {status}: {filepath} ({len(code)} bytes) - content verified" -@tool +@safe_tool def list_workflows() -> str: """List saved attack workflows with metadata. @@ -154,7 +162,7 @@ def list_workflows() -> str: return "\n".join(lines) -@tool +@safe_tool def execute_workflow( filename: t.Annotated[str, "Workflow filename to execute"], timeout: t.Annotated[int, "Max execution time in seconds (max 600)"] = 540,