diff --git a/src/strands/hooks/events.py b/src/strands/hooks/events.py index 9186e0e70..238ed3af4 100644 --- a/src/strands/hooks/events.py +++ b/src/strands/hooks/events.py @@ -147,6 +147,12 @@ class BeforeToolCallEvent(HookEvent, _Interruptible): cancel_tool: A user defined message that when set, will cancel the tool call. The message will be placed into a tool result with an error status. If set to `True`, Strands will cancel the tool call and use a default cancel message. + tool_is_read_only: Convenience property. True if the selected tool is read-only, False otherwise + (including when selected_tool is None). + tool_is_destructive: Convenience property. True if the selected tool is destructive, False otherwise + (including when selected_tool is None). + tool_requires_confirmation: Convenience property. True if the selected tool requires confirmation, + False otherwise (including when selected_tool is None). """ selected_tool: AgentTool | None @@ -157,6 +163,21 @@ class BeforeToolCallEvent(HookEvent, _Interruptible): def _can_write(self, name: str) -> bool: return name in ["cancel_tool", "selected_tool", "tool_use"] + @property + def tool_is_read_only(self) -> bool: + """Whether the selected tool only reads state. False when selected_tool is None.""" + return self.selected_tool is not None and self.selected_tool.is_read_only + + @property + def tool_is_destructive(self) -> bool: + """Whether the selected tool performs irreversible actions. False when selected_tool is None.""" + return self.selected_tool is not None and self.selected_tool.is_destructive + + @property + def tool_requires_confirmation(self) -> bool: + """Whether the selected tool requires user confirmation. False when selected_tool is None.""" + return self.selected_tool is not None and self.selected_tool.requires_confirmation + @override def _interrupt_id(self, name: str) -> str: """Unique id for the interrupt. diff --git a/src/strands/tools/decorator.py b/src/strands/tools/decorator.py index 9207df9b8..9df0fcc20 100644 --- a/src/strands/tools/decorator.py +++ b/src/strands/tools/decorator.py @@ -459,6 +459,10 @@ def __init__( tool_spec: ToolSpec, tool_func: Callable[P, R], metadata: FunctionToolMetadata, + *, + read_only: bool = False, + destructive: bool = False, + requires_confirmation: bool = False, ): """Initialize the decorated function tool. @@ -467,6 +471,9 @@ def __init__( tool_spec: The tool specification containing metadata for Agent integration. tool_func: The original function being decorated. metadata: The FunctionToolMetadata object with extracted function information. + read_only: Whether this tool only reads state without modification. + destructive: Whether this tool performs irreversible actions. + requires_confirmation: Whether this tool should require user confirmation before execution. """ super().__init__() @@ -474,6 +481,9 @@ def __init__( self._tool_spec = tool_spec self._tool_func = tool_func self._metadata = metadata + self._read_only = read_only + self._destructive = destructive + self._requires_confirmation = requires_confirmation functools.update_wrapper(wrapper=self, wrapped=self._tool_func) @@ -506,7 +516,15 @@ def my_tool(): if instance is not None and not inspect.ismethod(self._tool_func): # Create a bound method tool_func = self._tool_func.__get__(instance, instance.__class__) - return DecoratedFunctionTool(self._tool_name, self._tool_spec, tool_func, self._metadata) + return DecoratedFunctionTool( + self._tool_name, + self._tool_spec, + tool_func, + self._metadata, + read_only=self._read_only, + destructive=self._destructive, + requires_confirmation=self._requires_confirmation, + ) return self @@ -577,6 +595,24 @@ def tool_type(self) -> str: """ return "function" + @property + @override + def is_read_only(self) -> bool: + """Whether this tool only reads state without modification.""" + return self._read_only + + @property + @override + def is_destructive(self) -> bool: + """Whether this tool performs irreversible actions.""" + return self._destructive + + @property + @override + def requires_confirmation(self) -> bool: + """Whether this tool should require user confirmation before execution.""" + return self._requires_confirmation + @override async def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator: """Stream the tool with a tool use specification. @@ -725,6 +761,9 @@ def tool( inputSchema: JSONSchema | None = None, name: str | None = None, context: bool | str = False, + read_only: bool = False, + destructive: bool = False, + requires_confirmation: bool = False, ) -> Callable[[Callable[P, R]], DecoratedFunctionTool[P, R]]: ... # Suppressing the type error because we want callers to be able to use both `tool` and `tool()` at the # call site, but the actual implementation handles that and it's not representable via the type-system @@ -734,6 +773,9 @@ def tool( # type: ignore inputSchema: JSONSchema | None = None, name: str | None = None, context: bool | str = False, + read_only: bool = False, + destructive: bool = False, + requires_confirmation: bool = False, ) -> DecoratedFunctionTool[P, R] | Callable[[Callable[P, R]], DecoratedFunctionTool[P, R]]: """Decorator that transforms a Python function into a Strands tool. @@ -762,6 +804,10 @@ def tool( # type: ignore context: When provided, places an object in the designated parameter. If True, the param name defaults to 'tool_context', or if an override is needed, set context equal to a string to designate the param name. + read_only: Whether this tool only reads state without modification. Defaults to False. + destructive: Whether this tool performs irreversible actions. Defaults to False. + requires_confirmation: Whether this tool should require user confirmation before execution. + Defaults to False. Returns: An AgentTool that also mimics the original function when invoked @@ -816,13 +862,27 @@ def decorator(f: T) -> "DecoratedFunctionTool[P, R]": tool_spec["description"] = description if inputSchema is not None: tool_spec["inputSchema"] = inputSchema + if read_only: + tool_spec["readOnly"] = True + if destructive: + tool_spec["destructive"] = True + if requires_confirmation: + tool_spec["requiresConfirmation"] = True tool_name = tool_spec.get("name", f.__name__) if not isinstance(tool_name, str): raise ValueError(f"Tool name must be a string, got {type(tool_name)}") - return DecoratedFunctionTool(tool_name, tool_spec, f, tool_meta) + return DecoratedFunctionTool( + tool_name, + tool_spec, + f, + tool_meta, + read_only=read_only, + destructive=destructive, + requires_confirmation=requires_confirmation, + ) # Handle both @tool and @tool() syntax if func is None: diff --git a/src/strands/tools/mcp/mcp_agent_tool.py b/src/strands/tools/mcp/mcp_agent_tool.py index bedd93f24..a535502ac 100644 --- a/src/strands/tools/mcp/mcp_agent_tool.py +++ b/src/strands/tools/mcp/mcp_agent_tool.py @@ -35,6 +35,10 @@ def __init__( mcp_client: "MCPClient", name_override: str | None = None, timeout: timedelta | None = None, + *, + read_only: bool | None = None, + destructive: bool | None = None, + requires_confirmation: bool | None = None, ) -> None: """Initialize a new MCPAgentTool instance. @@ -44,6 +48,12 @@ def __init__( name_override: Optional name to use for the agent tool (for disambiguation) If None, uses the original MCP tool name timeout: Optional timeout duration for tool execution + read_only: Override for read-only classification. When None, falls back to the + tool spec's ``readOnly`` field if present, otherwise False. + destructive: Override for destructive classification. When None, falls back to the + tool spec's ``destructive`` field if present, otherwise False. + requires_confirmation: Override for confirmation requirement. When None, falls back + to the tool spec's ``requiresConfirmation`` field if present, otherwise False. """ super().__init__() logger.debug("tool_name=<%s> | creating mcp agent tool", mcp_tool.name) @@ -51,6 +61,9 @@ def __init__( self.mcp_client = mcp_client self._agent_tool_name = name_override or mcp_tool.name self.timeout = timeout + self._read_only_override = read_only + self._destructive_override = destructive + self._requires_confirmation_override = requires_confirmation @property def tool_name(self) -> str: @@ -93,6 +106,24 @@ def tool_type(self) -> str: """ return "python" + @property + @override + def is_read_only(self) -> bool: + """Whether this tool only reads state. Set via constructor override.""" + return self._read_only_override if self._read_only_override is not None else False + + @property + @override + def is_destructive(self) -> bool: + """Whether this tool performs irreversible actions. Set via constructor override.""" + return self._destructive_override if self._destructive_override is not None else False + + @property + @override + def requires_confirmation(self) -> bool: + """Whether this tool requires user confirmation. Set via constructor override.""" + return self._requires_confirmation_override if self._requires_confirmation_override is not None else False + @override async def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator: """Stream the MCP tool. diff --git a/src/strands/tools/registry.py b/src/strands/tools/registry.py index 9a0f0f722..85d35b603 100644 --- a/src/strands/tools/registry.py +++ b/src/strands/tools/registry.py @@ -270,6 +270,8 @@ def register_tool(self, tool: AgentTool) -> None: " Cannot add a duplicate tool which differs by a '-' or '_'" ) + self._validate_security_metadata(tool) + # Register in main registry self.registry[tool.tool_name] = tool @@ -288,6 +290,24 @@ def register_tool(self, tool: AgentTool) -> None: list(self.dynamic_tools.keys()), ) + def _validate_security_metadata(self, tool: AgentTool) -> None: + """Validate that a tool's security metadata is internally consistent. + + Args: + tool: The tool to validate. + + Raises: + ValueError: If the tool has contradictory security metadata. + """ + if tool.is_read_only and tool.is_destructive: + raise ValueError(f"Tool '{tool.tool_name}' cannot be both read_only and destructive") + + if tool.is_destructive and not tool.requires_confirmation: + logger.warning( + "tool_name=<%s> | tool is marked destructive but does not require confirmation", + tool.tool_name, + ) + def replace(self, new_tool: AgentTool) -> None: """Replace an existing tool with a new implementation. @@ -305,6 +325,8 @@ def replace(self, new_tool: AgentTool) -> None: if tool_name not in self.registry: raise ValueError(f"Cannot replace tool '{tool_name}' - tool does not exist") + self._validate_security_metadata(new_tool) + # Update main registry self.registry[tool_name] = new_tool diff --git a/src/strands/tools/tools.py b/src/strands/tools/tools.py index ccfeac323..f5fc04e20 100644 --- a/src/strands/tools/tools.py +++ b/src/strands/tools/tools.py @@ -240,6 +240,24 @@ def tool_type(self) -> str: """ return "python" + @property + @override + def is_read_only(self) -> bool: + """Whether this tool only reads state, derived from its ToolSpec.""" + return self._tool_spec.get("readOnly") is True + + @property + @override + def is_destructive(self) -> bool: + """Whether this tool performs irreversible actions, derived from its ToolSpec.""" + return self._tool_spec.get("destructive") is True + + @property + @override + def requires_confirmation(self) -> bool: + """Whether this tool requires user confirmation, derived from its ToolSpec.""" + return self._tool_spec.get("requiresConfirmation") is True + @override async def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator: """Stream the Python function with the given tool use request. diff --git a/src/strands/types/tools.py b/src/strands/types/tools.py index 088c83bdb..d977aab4c 100644 --- a/src/strands/types/tools.py +++ b/src/strands/types/tools.py @@ -30,12 +30,19 @@ class ToolSpec(TypedDict): outputSchema: Optional JSON Schema defining the expected output format. Note: Not all model providers support this field. Providers that don't support it should filter it out before sending to their API. + readOnly: Optional flag indicating the tool only reads state without modification. + destructive: Optional flag indicating the tool performs irreversible actions. + requiresConfirmation: Optional flag indicating the tool should require user + confirmation before execution. """ description: str inputSchema: JSONSchema name: str outputSchema: NotRequired[JSONSchema] + readOnly: NotRequired[bool] + destructive: NotRequired[bool] + requiresConfirmation: NotRequired[bool] class Tool(TypedDict): @@ -255,6 +262,33 @@ def supports_hot_reload(self) -> bool: """ return False + @property + def is_read_only(self) -> bool: + """Whether this tool only reads state without modification. + + Returns: + False by default. Override in subclasses or set via @tool(read_only=True). + """ + return False + + @property + def is_destructive(self) -> bool: + """Whether this tool performs irreversible actions. + + Returns: + False by default. Override in subclasses or set via @tool(destructive=True). + """ + return False + + @property + def requires_confirmation(self) -> bool: + """Whether this tool should require user confirmation before execution. + + Returns: + False by default. Override in subclasses or set via @tool(requires_confirmation=True). + """ + return False + @abstractmethod # pragma: no cover def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator: diff --git a/tests/strands/tools/mcp/test_mcp_client_tool_provider.py b/tests/strands/tools/mcp/test_mcp_client_tool_provider.py index 9cb90167d..dea484c5c 100644 --- a/tests/strands/tools/mcp/test_mcp_client_tool_provider.py +++ b/tests/strands/tools/mcp/test_mcp_client_tool_provider.py @@ -54,6 +54,9 @@ def create_mock_tool(tool_name: str, mcp_tool_name: str | None = None) -> MagicM tool.mcp_tool = MagicMock(spec=MCPTool) tool.mcp_tool.name = mcp_tool_name or tool_name tool.mcp_tool.description = f"Description for {tool_name}" + tool.is_read_only = False + tool.is_destructive = False + tool.requires_confirmation = False return tool diff --git a/tests/strands/tools/test_registry.py b/tests/strands/tools/test_registry.py index 3723f381b..ee9f294c6 100644 --- a/tests/strands/tools/test_registry.py +++ b/tests/strands/tools/test_registry.py @@ -130,15 +130,21 @@ def function() -> str: def test_register_tool_duplicate_name_without_hot_reload(): """Test that registering a tool with duplicate name raises ValueError when hot reload is not supported.""" # Create mock tools that don't support hot reload - tool_1 = MagicMock() + tool_1 = MagicMock(spec=PythonAgentTool) tool_1.tool_name = "duplicate_tool" tool_1.supports_hot_reload = False tool_1.is_dynamic = False + tool_1.is_read_only = False + tool_1.is_destructive = False + tool_1.requires_confirmation = False - tool_2 = MagicMock() + tool_2 = MagicMock(spec=PythonAgentTool) tool_2.tool_name = "duplicate_tool" tool_2.supports_hot_reload = False tool_2.is_dynamic = False + tool_2.is_read_only = False + tool_2.is_destructive = False + tool_2.requires_confirmation = False tool_registry = ToolRegistry() tool_registry.register_tool(tool_1) @@ -156,11 +162,17 @@ def test_register_tool_duplicate_name_with_hot_reload(): tool_1.tool_name = "hot_reload_tool" tool_1.supports_hot_reload = True tool_1.is_dynamic = False + tool_1.is_read_only = False + tool_1.is_destructive = False + tool_1.requires_confirmation = False tool_2 = MagicMock(spec=PythonAgentTool) tool_2.tool_name = "hot_reload_tool" tool_2.supports_hot_reload = True tool_2.is_dynamic = False + tool_2.is_read_only = False + tool_2.is_destructive = False + tool_2.requires_confirmation = False tool_registry = ToolRegistry() tool_registry.register_tool(tool_1) @@ -519,10 +531,16 @@ def test_tool_registry_replace_existing_tool(): old_tool.tool_name = "my_tool" old_tool.is_dynamic = False old_tool.supports_hot_reload = False + old_tool.is_read_only = False + old_tool.is_destructive = False + old_tool.requires_confirmation = False new_tool = MagicMock() new_tool.tool_name = "my_tool" new_tool.is_dynamic = False + new_tool.is_read_only = False + new_tool.is_destructive = False + new_tool.requires_confirmation = False registry = ToolRegistry() registry.register_tool(old_tool) @@ -535,6 +553,9 @@ def test_tool_registry_replace_nonexistent_tool(): """Test replacing a tool that doesn't exist raises ValueError.""" new_tool = MagicMock() new_tool.tool_name = "my_tool" + new_tool.is_read_only = False + new_tool.is_destructive = False + new_tool.requires_confirmation = False registry = ToolRegistry() @@ -548,10 +569,16 @@ def test_tool_registry_replace_dynamic_tool(): old_tool.tool_name = "dynamic_tool" old_tool.is_dynamic = True old_tool.supports_hot_reload = True + old_tool.is_read_only = False + old_tool.is_destructive = False + old_tool.requires_confirmation = False new_tool = MagicMock() new_tool.tool_name = "dynamic_tool" new_tool.is_dynamic = True + new_tool.is_read_only = False + new_tool.is_destructive = False + new_tool.requires_confirmation = False registry = ToolRegistry() registry.register_tool(old_tool) @@ -567,10 +594,16 @@ def test_tool_registry_replace_dynamic_with_non_dynamic(): old_tool.tool_name = "my_tool" old_tool.is_dynamic = True old_tool.supports_hot_reload = True + old_tool.is_read_only = False + old_tool.is_destructive = False + old_tool.requires_confirmation = False new_tool = MagicMock() new_tool.tool_name = "my_tool" new_tool.is_dynamic = False + new_tool.is_read_only = False + new_tool.is_destructive = False + new_tool.requires_confirmation = False registry = ToolRegistry() registry.register_tool(old_tool) @@ -589,10 +622,16 @@ def test_tool_registry_replace_non_dynamic_with_dynamic(): old_tool.tool_name = "my_tool" old_tool.is_dynamic = False old_tool.supports_hot_reload = False + old_tool.is_read_only = False + old_tool.is_destructive = False + old_tool.requires_confirmation = False new_tool = MagicMock() new_tool.tool_name = "my_tool" new_tool.is_dynamic = True + new_tool.is_read_only = False + new_tool.is_destructive = False + new_tool.requires_confirmation = False registry = ToolRegistry() registry.register_tool(old_tool) diff --git a/tests/strands/tools/test_tool_security_metadata.py b/tests/strands/tools/test_tool_security_metadata.py new file mode 100644 index 000000000..cbdaff427 --- /dev/null +++ b/tests/strands/tools/test_tool_security_metadata.py @@ -0,0 +1,426 @@ +"""Tests for tool security metadata (is_read_only, is_destructive, requires_confirmation). + +Covers: +- AgentTool base class defaults +- @tool decorator parameters +- ToolSpec round-trip (PythonAgentTool reads from spec) +- MCPAgentTool with overrides and spec fallback +- ToolRegistry contradiction rejection and destructive-without-confirmation warning +- BeforeToolCallEvent convenience properties +- Hook-based permission gate integration test +""" + +import logging +from unittest.mock import MagicMock + +import pytest + +from strands.hooks.events import BeforeToolCallEvent +from strands.tools.decorator import DecoratedFunctionTool, tool +from strands.tools.mcp.mcp_agent_tool import MCPAgentTool +from strands.tools.registry import ToolRegistry +from strands.tools.tools import PythonAgentTool + +# --------------------------------------------------------------------------- +# 1. AgentTool base class defaults +# --------------------------------------------------------------------------- + + +class _MinimalTool(PythonAgentTool): + pass + + +def _make_spec(name="test_tool", **extra): + spec = {"name": name, "description": "A test tool", "inputSchema": {"json": {"type": "object", "properties": {}}}} + spec.update(extra) + return spec + + +def test_agent_tool_defaults_all_false(): + t = PythonAgentTool(tool_name="t", tool_spec=_make_spec("t"), tool_func=lambda tu, **kw: None) + assert t.is_read_only is False + assert t.is_destructive is False + assert t.requires_confirmation is False + + +# --------------------------------------------------------------------------- +# 2. @tool decorator parameters +# --------------------------------------------------------------------------- + + +def test_tool_decorator_read_only(): + @tool(read_only=True) + def list_files(directory: str) -> str: + """List files in a directory.""" + return "" + + assert list_files.is_read_only is True + assert list_files.is_destructive is False + assert list_files.requires_confirmation is False + + +def test_tool_decorator_destructive_with_confirmation(): + @tool(destructive=True, requires_confirmation=True) + def delete_file(path: str) -> str: + """Delete a file permanently.""" + return "" + + assert delete_file.is_read_only is False + assert delete_file.is_destructive is True + assert delete_file.requires_confirmation is True + + +def test_tool_decorator_bare_has_defaults(): + @tool + def noop() -> str: + """Does nothing.""" + return "" + + assert noop.is_read_only is False + assert noop.is_destructive is False + assert noop.requires_confirmation is False + + +def test_tool_decorator_only_requires_confirmation(): + @tool(requires_confirmation=True) + def sensitive_op(data: str) -> str: + """A sensitive operation.""" + return data + + assert sensitive_op.is_read_only is False + assert sensitive_op.is_destructive is False + assert sensitive_op.requires_confirmation is True + + +# --------------------------------------------------------------------------- +# 3. ToolSpec round-trip (PythonAgentTool reads from spec) +# --------------------------------------------------------------------------- + + +def test_python_agent_tool_reads_read_only_from_spec(): + spec = _make_spec("reader", readOnly=True) + t = PythonAgentTool(tool_name="reader", tool_spec=spec, tool_func=lambda tu, **kw: None) + assert t.is_read_only is True + assert t.is_destructive is False + + +def test_python_agent_tool_reads_destructive_from_spec(): + spec = _make_spec("destroyer", destructive=True, requiresConfirmation=True) + t = PythonAgentTool(tool_name="destroyer", tool_spec=spec, tool_func=lambda tu, **kw: None) + assert t.is_destructive is True + assert t.requires_confirmation is True + assert t.is_read_only is False + + +def test_python_agent_tool_no_security_fields_in_spec(): + spec = _make_spec("plain") + t = PythonAgentTool(tool_name="plain", tool_spec=spec, tool_func=lambda tu, **kw: None) + assert t.is_read_only is False + assert t.is_destructive is False + assert t.requires_confirmation is False + + +# --------------------------------------------------------------------------- +# 4. MCPAgentTool with overrides and spec fallback +# --------------------------------------------------------------------------- + + +def _make_mcp_tool(name="mcp_test", input_schema=None): + mcp_tool = MagicMock() + mcp_tool.name = name + mcp_tool.description = f"MCP tool {name}" + mcp_tool.inputSchema = input_schema or {"type": "object", "properties": {}} + mcp_tool.outputSchema = None + return mcp_tool + + +def test_mcp_tool_defaults(): + mcp_tool = _make_mcp_tool() + t = MCPAgentTool(mcp_tool=mcp_tool, mcp_client=MagicMock()) + assert t.is_read_only is False + assert t.is_destructive is False + assert t.requires_confirmation is False + + +def test_mcp_tool_constructor_overrides(): + mcp_tool = _make_mcp_tool() + t = MCPAgentTool(mcp_tool=mcp_tool, mcp_client=MagicMock(), read_only=True) + assert t.is_read_only is True + assert t.is_destructive is False + + +def test_mcp_tool_destructive_override(): + mcp_tool = _make_mcp_tool() + t = MCPAgentTool( + mcp_tool=mcp_tool, + mcp_client=MagicMock(), + destructive=True, + requires_confirmation=True, + ) + assert t.is_destructive is True + assert t.requires_confirmation is True + assert t.is_read_only is False + + +def test_mcp_tool_override_takes_precedence_over_spec(): + """Constructor override should win even if the spec says differently.""" + mcp_tool = _make_mcp_tool() + t = MCPAgentTool( + mcp_tool=mcp_tool, + mcp_client=MagicMock(), + read_only=False, + ) + assert t.is_read_only is False + + +def test_mcp_tool_no_override_defaults_to_false(): + """Without constructor overrides, MCP tools default to False for all security properties.""" + mcp_tool = _make_mcp_tool() + t = MCPAgentTool(mcp_tool=mcp_tool, mcp_client=MagicMock()) + assert t.is_read_only is False + assert t.is_destructive is False + assert t.requires_confirmation is False + + +# --------------------------------------------------------------------------- +# 5. ToolRegistry validation +# --------------------------------------------------------------------------- + + +def test_registry_rejects_read_only_and_destructive(): + @tool(read_only=True, destructive=True) + def bad_tool() -> str: + """Contradictory tool.""" + return "" + + registry = ToolRegistry() + with pytest.raises(ValueError, match="cannot be both read_only and destructive"): + registry.register_tool(bad_tool) + + +def test_registry_warns_destructive_without_confirmation(caplog): + @tool(destructive=True) + def risky_tool() -> str: + """A risky tool without confirmation.""" + return "" + + registry = ToolRegistry() + with caplog.at_level(logging.WARNING): + registry.register_tool(risky_tool) + + assert "destructive but does not require confirmation" in caplog.text + + +def test_registry_accepts_destructive_with_confirmation(): + @tool(destructive=True, requires_confirmation=True) + def safe_destructive() -> str: + """A destructive tool that requires confirmation.""" + return "" + + registry = ToolRegistry() + registry.register_tool(safe_destructive) + assert "safe_destructive" in registry.registry + + +def test_registry_accepts_read_only(): + @tool(read_only=True) + def reader() -> str: + """A read-only tool.""" + return "" + + registry = ToolRegistry() + registry.register_tool(reader) + assert "reader" in registry.registry + + +def test_registry_accepts_default_metadata(): + @tool + def plain() -> str: + """A plain tool.""" + return "" + + registry = ToolRegistry() + registry.register_tool(plain) + assert "plain" in registry.registry + + +def test_registry_replace_rejects_contradictory_metadata(): + """ToolRegistry.replace() must also validate security metadata.""" + + @tool + def my_tool() -> str: + """A normal tool.""" + return "" + + @tool(read_only=True, destructive=True, name="my_tool") + def bad_replacement() -> str: + """Contradictory replacement.""" + return "" + + registry = ToolRegistry() + registry.register_tool(my_tool) + + with pytest.raises(ValueError, match="cannot be both read_only and destructive"): + registry.replace(bad_replacement) + + +# --------------------------------------------------------------------------- +# 6. BeforeToolCallEvent convenience properties +# --------------------------------------------------------------------------- + + +def _make_before_event(selected_tool=None): + return BeforeToolCallEvent( + agent=MagicMock(), + selected_tool=selected_tool, + tool_use={"name": "test", "toolUseId": "id-1", "input": {}}, + invocation_state={}, + ) + + +def test_event_convenience_props_with_none_tool(): + event = _make_before_event(selected_tool=None) + assert event.tool_is_read_only is False + assert event.tool_is_destructive is False + assert event.tool_requires_confirmation is False + + +def test_event_convenience_props_with_read_only_tool(): + @tool(read_only=True) + def reader() -> str: + """Read only.""" + return "" + + event = _make_before_event(selected_tool=reader) + assert event.tool_is_read_only is True + assert event.tool_is_destructive is False + assert event.tool_requires_confirmation is False + + +def test_event_convenience_props_with_destructive_tool(): + @tool(destructive=True, requires_confirmation=True) + def destroyer() -> str: + """Destructive.""" + return "" + + event = _make_before_event(selected_tool=destroyer) + assert event.tool_is_read_only is False + assert event.tool_is_destructive is True + assert event.tool_requires_confirmation is True + + +# --------------------------------------------------------------------------- +# 7. Integration: hook-based permission gate +# --------------------------------------------------------------------------- + + +def test_hook_cancels_destructive_tool(): + """Simulate a BeforeToolCallEvent hook that cancels destructive tools.""" + + @tool(destructive=True, requires_confirmation=True) + def delete_db() -> str: + """Delete the database.""" + return "" + + event = _make_before_event(selected_tool=delete_db) + + # Hook logic: cancel destructive tools + if event.tool_is_destructive: + event.cancel_tool = "Destructive tool requires approval" + + assert event.cancel_tool == "Destructive tool requires approval" + + +def test_hook_allows_read_only_tool(): + """Simulate a BeforeToolCallEvent hook that allows read-only tools.""" + + @tool(read_only=True) + def list_items() -> str: + """List items.""" + return "" + + event = _make_before_event(selected_tool=list_items) + + # Hook logic: only cancel non-read-only tools + if not event.tool_is_read_only: + event.cancel_tool = "Non-read-only tool blocked" + + assert event.cancel_tool is False + + +# --------------------------------------------------------------------------- +# 8. Backward compatibility +# --------------------------------------------------------------------------- + + +def test_decorated_tool_get_preserves_security_metadata(): + """Verify __get__ (descriptor protocol) propagates security metadata.""" + + class MyClass: + @tool(destructive=True, requires_confirmation=True) + def my_method(self, x: str) -> str: + """A method tool.""" + return x + + instance = MyClass() + bound_tool = instance.my_method + + assert isinstance(bound_tool, DecoratedFunctionTool) + assert bound_tool.is_destructive is True + assert bound_tool.requires_confirmation is True + assert bound_tool.is_read_only is False + + +def test_tool_spec_typed_dict_accepts_security_fields(): + """Verify ToolSpec TypedDict accepts the new NotRequired fields.""" + from strands.types.tools import ToolSpec + + spec: ToolSpec = { + "name": "test", + "description": "test", + "inputSchema": {}, + "readOnly": True, + "destructive": False, + "requiresConfirmation": False, + } + assert spec["readOnly"] is True + assert spec["destructive"] is False + + +def test_decorated_tool_writes_security_fields_to_spec(): + """@tool should write security fields into its ToolSpec for serialization consistency.""" + + @tool(read_only=True) + def reader(x: str) -> str: + """Read something.""" + return x + + assert reader.tool_spec.get("readOnly") is True + assert "destructive" not in reader.tool_spec + assert "requiresConfirmation" not in reader.tool_spec + + +def test_decorated_tool_destructive_fields_in_spec(): + """@tool(destructive=True, requires_confirmation=True) should write both fields to spec.""" + + @tool(destructive=True, requires_confirmation=True) + def deleter(x: str) -> str: + """Delete something.""" + return x + + assert deleter.tool_spec.get("destructive") is True + assert deleter.tool_spec.get("requiresConfirmation") is True + assert "readOnly" not in deleter.tool_spec + + +def test_bare_decorator_omits_security_fields_from_spec(): + """Plain @tool should not pollute ToolSpec with False security fields.""" + + @tool + def plain(x: str) -> str: + """Do something.""" + return x + + assert "readOnly" not in plain.tool_spec + assert "destructive" not in plain.tool_spec + assert "requiresConfirmation" not in plain.tool_spec