diff --git a/AGENTS.md b/AGENTS.md index 8835b45c8..3facc2c4d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -117,6 +117,12 @@ strands-agents/ │ │ ├── repository_session_manager.py # Repository pattern │ │ └── session_repository.py # Storage interface │ │ +│ │ +│ ├── sandbox/ # Sandbox abstraction for code execution +│ │ ├── base.py # Sandbox ABC, ExecutionResult, FileInfo, OutputFile +│ │ ├── host.py # HostSandbox (native Python, default) +│ │ ├── shell_based.py # ShellBasedSandbox (for remote/container envs) +│ │ └── noop.py # NoOpSandbox (raises NotImplementedError for all ops) │ ├── telemetry/ # Observability (OpenTelemetry) │ │ ├── tracer.py # Tracing │ │ ├── metrics.py # Metrics collection @@ -183,6 +189,7 @@ strands-agents/ │ ├── types/ │ ├── session/ │ ├── telemetry/ +│ ├── sandbox/ │ ├── hooks/ │ ├── plugins/ │ ├── handlers/ diff --git a/src/strands/__init__.py b/src/strands/__init__.py index 6625ac41f..7fd06e02a 100644 --- a/src/strands/__init__.py +++ b/src/strands/__init__.py @@ -1,10 +1,14 @@ """A framework for building, deploying, and managing AI agents.""" -from . import agent, models, telemetry, types +from . import agent, models, sandbox, telemetry, types from .agent.agent import Agent from .agent.base import AgentBase from .event_loop._retry import ModelRetryStrategy from .plugins import Plugin +from .sandbox.base import ExecutionResult, FileInfo, OutputFile, Sandbox, StreamChunk, StreamType +from .sandbox.host import HostSandbox +from .sandbox.noop import NoOpSandbox +from .sandbox.shell_based import ShellBasedSandbox from .tools.decorator import tool from .types._snapshot import Snapshot from .types.tools import ToolContext @@ -15,11 +19,21 @@ "AgentBase", "AgentSkills", "agent", + "ExecutionResult", + "FileInfo", + "HostSandbox", "models", "ModelRetryStrategy", "Plugin", + "OutputFile", + "sandbox", + "Sandbox", + "NoOpSandbox", + "ShellBasedSandbox", "Skill", "Snapshot", + "StreamChunk", + "StreamType", "tool", "ToolContext", "types", diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 965969961..5e3339d08 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -29,6 +29,7 @@ from .._async import run_async from ..event_loop._retry import ModelRetryStrategy from ..event_loop.event_loop import INITIAL_DELAY, MAX_ATTEMPTS, MAX_DELAY, event_loop_cycle +from ..sandbox.base import Sandbox from ..tools._tool_helpers import generate_missing_tool_result_content from ..types._snapshot import ( SNAPSHOT_SCHEMA_VERSION, @@ -146,6 +147,7 @@ def __init__( tool_executor: ToolExecutor | None = None, retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY, concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW, + sandbox: Sandbox | None = None, ): """Initialize the Agent with the specified configuration. @@ -214,6 +216,9 @@ def __init__( Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations. Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided only for advanced use cases where the caller understands the risks. + sandbox: Execution environment for agent tools. Tools access the sandbox + via tool_context.agent.sandbox to execute commands, code, and filesystem operations. + Defaults to HostSandbox() for host execution when not specified. Raises: ValueError: If agent id contains path separators. @@ -298,6 +303,17 @@ def __init__( self.tool_caller = _ToolCaller(self) + # Initialize sandbox for tool execution environment + # Default to HostSandbox() for backwards compatibility — any code that + # accesses agent.sandbox gets a working local execution environment. + # Import is deferred to avoid unconditional coupling to HostSandbox. + if sandbox is not None: + self.sandbox: Sandbox = sandbox + else: + from ..sandbox.host import HostSandbox + + self.sandbox = HostSandbox() + self.hooks = HookRegistry() self._plugin_registry = _PluginRegistry(self) diff --git a/src/strands/sandbox/__init__.py b/src/strands/sandbox/__init__.py new file mode 100644 index 000000000..8e4e4e517 --- /dev/null +++ b/src/strands/sandbox/__init__.py @@ -0,0 +1,30 @@ +"""Sandbox abstraction for agent code execution environments. + +This module provides the Sandbox interface that decouples tool logic from where code runs. +Tools that need to execute code or access a filesystem receive a Sandbox instead of managing +their own execution, enabling portability across local and cloud environments. + +Class hierarchy:: + + Sandbox (ABC, all abstract + helpers) + ├── HostSandbox — native Python methods for host execution (default) + ├── ShellBasedSandbox (ABC, only execute_streaming() abstract — shell-based file ops + execute_code) + └── NoOpSandbox — no-op implementation that disables all sandbox functionality +""" + +from .base import ExecutionResult, FileInfo, OutputFile, Sandbox, StreamChunk, StreamType +from .host import HostSandbox +from .noop import NoOpSandbox +from .shell_based import ShellBasedSandbox + +__all__ = [ + "ExecutionResult", + "FileInfo", + "HostSandbox", + "NoOpSandbox", + "OutputFile", + "Sandbox", + "ShellBasedSandbox", + "StreamChunk", + "StreamType", +] diff --git a/src/strands/sandbox/base.py b/src/strands/sandbox/base.py new file mode 100644 index 000000000..61823fe47 --- /dev/null +++ b/src/strands/sandbox/base.py @@ -0,0 +1,418 @@ +"""Base sandbox interface for agent code execution environments. + +This module defines the abstract Sandbox class and supporting dataclasses: + +- :class:`ExecutionResult` — result of command/code execution +- :class:`FileInfo` — metadata about a file in the sandbox +- :class:`OutputFile` — a file produced as output by code execution +- :class:`StreamChunk` — a typed chunk of streaming output (stdout or stderr) + +Sandbox implementations provide the runtime context where tools execute code, run commands, +and interact with a filesystem. Multiple tools share the same Sandbox instance, giving them +a common working directory, environment variables, and filesystem. + +Class hierarchy:: + + Sandbox (ABC): All operations are abstract. Implement this for non-shell-based + sandboxes (e.g., API-based cloud sandboxes). + ShellBasedSandbox (ABC, in shell_based.py): Provides shell-based defaults for file + operations and code execution. Subclasses only need to implement ``execute_streaming()``. + NoOpSandbox (in noop.py): No-op implementation that raises NotImplementedError + for all operations. Use to disable sandbox functionality entirely. +""" + +import logging +from abc import ABC, abstractmethod +from collections.abc import AsyncGenerator +from dataclasses import dataclass, field +from typing import Any, Literal + +logger = logging.getLogger(__name__) + + +StreamType = Literal["stdout", "stderr"] +"""Type of a streaming output chunk. + +Used by :class:`StreamChunk` to distinguish stdout from stderr output +during streaming execution. + +- ``"stdout"``: Standard output from the command or code. +- ``"stderr"``: Standard error from the command or code. +""" + + +@dataclass +class StreamChunk: + """A typed chunk of streaming output from command or code execution. + + Allows consumers to distinguish stdout from stderr during streaming, + enabling richer UIs and more precise output handling. + + Attributes: + data: The text content of the chunk. + stream_type: Whether this chunk is from stdout or stderr. + """ + + data: str + stream_type: StreamType = "stdout" + + +@dataclass +class FileInfo: + """Metadata about a file or directory in a sandbox. + + Provides minimal structured information that lets tools distinguish + files from directories and report sizes. Fields ``is_dir`` and ``size`` + are optional — implementations that cannot provide accurate data + return ``None`` instead of lying. + + Attributes: + name: The file or directory name (not the full path). + is_dir: Whether this entry is a directory. ``None`` if unknown. + size: File size in bytes. ``None`` if unknown. + """ + + name: str + is_dir: bool | None = None + size: int | None = None + + +@dataclass +class OutputFile: + """A file produced as output by code execution. + + Used to carry binary artifacts (images, charts, PDFs, compiled files) + from sandbox execution back to the agent. Tools can convert these + to Strands' ``ImageContent`` or ``DocumentContent`` for the model. + + Follows ADK's ``File`` pattern — simple, portable, MIME-typed. + + Attributes: + name: Filename (e.g., ``"plot.png"``). + content: Raw file content as bytes. + mime_type: MIME type of the content (e.g., ``"image/png"``). + """ + + name: str + content: bytes + mime_type: str = "application/octet-stream" + + +@dataclass +class ExecutionResult: + """Result of code or command execution in a sandbox. + + Attributes: + exit_code: The exit code of the command or code execution. + stdout: Standard output captured from execution. + stderr: Standard error captured from execution. + output_files: Files produced by the execution (e.g., images, charts). + Shell-based sandboxes typically return an empty list. Jupyter-backed + or API-backed sandboxes can populate this with generated artifacts. + """ + + exit_code: int + stdout: str + stderr: str + output_files: list[OutputFile] = field(default_factory=list) + + +class Sandbox(ABC): + """Abstract execution environment for agent tools. + + A Sandbox provides the runtime context where tools execute code, + run commands, and interact with a filesystem. Multiple tools + share the same Sandbox instance, giving them a common working + directory, environment variables, and filesystem. + + The sandbox follows the SDK's ``invoke_async`` / ``stream_async`` + pattern: streaming methods (``execute_streaming``, ``execute_code_streaming``) + are the abstract primitives that implementations must provide. + Non-streaming convenience methods (``execute``, ``execute_code``) consume + the stream and return the final ``ExecutionResult``. + + Streaming methods yield :class:`StreamChunk` objects that carry both + the text data and the stream type (stdout or stderr), followed by a + final :class:`ExecutionResult`. This allows consumers to distinguish + between stdout and stderr during streaming. + + All abstract methods accept ``**kwargs`` for forward compatibility — + new parameters with defaults can be added in future versions without + breaking existing implementations. + + Example: + Non-streaming (common case):: + + from strands.sandbox import HostSandbox + + sandbox = HostSandbox(working_dir="/tmp/my-sandbox") + result = await sandbox.execute("echo hello") + print(result.stdout) + + Streaming with stdout/stderr distinction:: + + async for chunk in sandbox.execute_streaming("echo hello"): + if isinstance(chunk, StreamChunk): + if chunk.stream_type == "stdout": + print(f"[stdout] {chunk.data}", end="") + else: + print(f"[stderr] {chunk.data}", end="") + elif isinstance(chunk, ExecutionResult): + print(f"Exit code: {chunk.exit_code}") + + With custom working directory:: + + result = await sandbox.execute("ls -la", cwd="/tmp/other-dir") + """ + + # ---- Streaming methods (abstract primitives) ---- + + @abstractmethod + async def execute_streaming( + self, + command: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Execute a shell command, streaming output. + + Yields :class:`StreamChunk` objects for stdout and stderr output + as it arrives. The final yield is an :class:`ExecutionResult` with + the exit code and complete output. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for command execution. ``None`` means use the + sandbox's default working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Yields: + :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`. + """ + ... + # Make the method signature an async generator for type checkers. + # Concrete subclasses must yield at least one ExecutionResult. + yield # type: ignore[misc] # pragma: no cover + + @abstractmethod + async def execute_code_streaming( + self, + code: str, + language: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Execute code in the sandbox, streaming output. + + Args: + code: The source code to execute. + language: The programming language interpreter to use. + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for code execution. ``None`` means use the + sandbox's default working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Yields: + :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`. + """ + ... + yield # type: ignore[misc] # pragma: no cover + + @abstractmethod + async def read_file(self, path: str, **kwargs: Any) -> bytes: + """Read a file from the sandbox filesystem. + + Returns raw bytes to support both text and binary files (images, + PDFs, compiled artifacts). Use :meth:`read_text` for a convenience + wrapper that decodes to a string. + + Args: + path: Path to the file to read. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + The file contents as bytes. + + Raises: + FileNotFoundError: If the file does not exist or cannot be read. + """ + ... + + @abstractmethod + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + """Write a file to the sandbox filesystem. + + Accepts raw bytes to support both text and binary content. Use + :meth:`write_text` for a convenience wrapper that encodes a string. + + Implementations should create parent directories if they do not exist. + :class:`~strands.sandbox.host.HostSandbox` does this natively + via :func:`pathlib.Path.mkdir`. Shell-based implementations should + include a ``mkdir -p`` before writing. + + Args: + path: Path to the file to write. + content: The content to write as bytes. + **kwargs: Additional keyword arguments for forward compatibility. + + Raises: + IOError: If the file cannot be written. + """ + ... + + @abstractmethod + async def remove_file(self, path: str, **kwargs: Any) -> None: + """Remove a file from the sandbox filesystem. + + Args: + path: Path to the file to remove. + **kwargs: Additional keyword arguments for forward compatibility. + + Raises: + FileNotFoundError: If the file does not exist. + """ + ... + + @abstractmethod + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + """List files in a sandbox directory. + + Returns structured :class:`FileInfo` entries with metadata (name, + is_dir, size) so tools can make informed decisions about files. + Fields ``is_dir`` and ``size`` may be ``None`` if the implementation + cannot provide accurate data. + + Args: + path: Path to the directory to list. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + A list of :class:`FileInfo` entries for the directory contents. + + Raises: + FileNotFoundError: If the directory does not exist. + """ + ... + + # ---- Non-streaming convenience methods ---- + + async def execute( + self, + command: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ExecutionResult: + """Execute a shell command and return the result. + + Convenience wrapper that consumes :meth:`execute_streaming` and + returns the final :class:`ExecutionResult`. This is the common case — + use :meth:`execute_streaming` when you need to process output as it + arrives. + + Implementations that want an optimized non-streaming path can + override this method directly. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for command execution. ``None`` means use the + sandbox's default working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + The final ExecutionResult from execution. + + Raises: + RuntimeError: If execute_streaming() did not yield an ExecutionResult. + """ + result = None + async for chunk in self.execute_streaming(command, timeout=timeout, cwd=cwd, **kwargs): + if isinstance(chunk, ExecutionResult): + result = chunk + if result is None: + raise RuntimeError("execute_streaming() did not yield an ExecutionResult") + return result + + async def execute_code( + self, + code: str, + language: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ExecutionResult: + """Execute code and return the result. + + Convenience wrapper that consumes :meth:`execute_code_streaming` and + returns the final :class:`ExecutionResult`. This is the common case — + use :meth:`execute_code_streaming` when you need to process output as + it arrives. + + Implementations that want an optimized non-streaming path can + override this method directly. + + Args: + code: The source code to execute. + language: The programming language interpreter to use. + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for code execution. ``None`` means use the + sandbox's default working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + The final ExecutionResult from execution. + + Raises: + RuntimeError: If execute_code_streaming() did not yield an ExecutionResult. + """ + result = None + async for chunk in self.execute_code_streaming(code, language=language, timeout=timeout, cwd=cwd, **kwargs): + if isinstance(chunk, ExecutionResult): + result = chunk + if result is None: + raise RuntimeError("execute_code_streaming() did not yield an ExecutionResult") + return result + + # ---- Text convenience methods ---- + + async def read_text(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str: + """Read a text file from the sandbox filesystem. + + Convenience wrapper around :meth:`read_file` that decodes bytes + to a string. + + Args: + path: Path to the file to read. + encoding: Text encoding to use. Defaults to UTF-8. + **kwargs: Additional keyword arguments passed to :meth:`read_file`. + + Returns: + The file contents as a string. + + Raises: + FileNotFoundError: If the file does not exist. + UnicodeDecodeError: If the file cannot be decoded with the given encoding. + """ + data = await self.read_file(path, **kwargs) + return data.decode(encoding) + + async def write_text(self, path: str, content: str, encoding: str = "utf-8", **kwargs: Any) -> None: + """Write a text file to the sandbox filesystem. + + Convenience wrapper around :meth:`write_file` that encodes a string + to bytes. + + Args: + path: Path to the file to write. + content: The text content to write. + encoding: Text encoding to use. Defaults to UTF-8. + **kwargs: Additional keyword arguments passed to :meth:`write_file`. + + Raises: + IOError: If the file cannot be written. + """ + await self.write_file(path, content.encode(encoding), **kwargs) diff --git a/src/strands/sandbox/host.py b/src/strands/sandbox/host.py new file mode 100644 index 000000000..29239c42e --- /dev/null +++ b/src/strands/sandbox/host.py @@ -0,0 +1,380 @@ +"""Host sandbox implementation for host-process execution. + +This module implements the HostSandbox, which executes commands and code +on the local host using asyncio subprocesses and native Python filesystem +operations. It extends Sandbox directly — all file and code operations +use proper Python methods (pathlib, os, subprocess) instead of shell commands. + +This is the default sandbox used when no explicit sandbox is configured. +""" + +import asyncio +import logging +import os +import re +from collections.abc import AsyncGenerator +from pathlib import Path +from typing import Any + +from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk + +logger = logging.getLogger(__name__) + +#: Maximum number of bytes to read at once from a subprocess stream. +#: Prevents memory exhaustion from extremely long lines without newlines. +_READ_CHUNK_SIZE = 64 * 1024 # 64 KiB + +#: Pattern for validating language/interpreter names. +#: Allows alphanumeric characters, dots, hyphens, and underscores. +_LANGUAGE_PATTERN = re.compile(r"^[a-zA-Z0-9._-]+$") + + +async def _read_stream( + stream: asyncio.StreamReader | None, + collected: list[str], +) -> None: + """Read all chunks from a subprocess stream into a list. + + Reads in chunks of up to ``_READ_CHUNK_SIZE`` bytes to handle + binary output and extremely long lines without newlines. + Non-UTF-8 bytes are replaced with the Unicode replacement character + to prevent ``UnicodeDecodeError`` from crashing the sandbox. + + Args: + stream: The subprocess stdout or stderr stream. + collected: List to append decoded string chunks to. + """ + if stream is None: + return + while True: + chunk_bytes = await stream.read(_READ_CHUNK_SIZE) + if not chunk_bytes: + break + collected.append(chunk_bytes.decode(errors="replace")) + + +class HostSandbox(Sandbox): + """Execute code and commands on the local host using native Python methods. + + Uses asyncio subprocesses for command execution, ``subprocess_exec`` for + code execution (avoiding shell intermediaries), and native filesystem + operations (``pathlib``, ``os``) for all file I/O. + + This sandbox extends :class:`Sandbox` directly — it does **not** + inherit from :class:`ShellBasedSandbox`. All operations use proper, + safe Python methods instead of piping through shell commands. + + Args: + working_dir: The working directory for command execution. + Defaults to the current working directory. + + Example: + Non-streaming (common case):: + + from strands.sandbox import HostSandbox + + sandbox = HostSandbox(working_dir="/tmp/my-sandbox") + result = await sandbox.execute("echo hello") + print(result.stdout) + + Streaming:: + + async for chunk in sandbox.execute_streaming("echo hello"): + if isinstance(chunk, StreamChunk): + print(chunk.data, end="") + """ + + def __init__(self, working_dir: str | None = None) -> None: + """Initialize the HostSandbox. + + Args: + working_dir: The working directory for command execution. + Defaults to the current working directory at construction time. + """ + self.working_dir = working_dir or os.getcwd() + + def _resolve_path(self, path: str) -> Path: + """Resolve a path relative to the working directory. + + Absolute paths are returned as-is. Relative paths are resolved + against the working directory. + + Args: + path: The file path to resolve. + + Returns: + The resolved Path object. + """ + if os.path.isabs(path): + return Path(path) + return Path(self.working_dir) / path + + async def execute_streaming( + self, + command: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Execute a shell command on the local host, streaming output. + + Reads stdout and stderr in chunks (up to 64 KiB at a time) to avoid + blocking on extremely long lines. Chunks are collected and yielded + after the command completes. The final yield is an ExecutionResult + with the exit code and complete captured output. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for this command. ``None`` means use the + sandbox's default ``working_dir``. + **kwargs: Additional keyword arguments for forward compatibility. + + Yields: + :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`. + + Raises: + asyncio.TimeoutError: If the command exceeds the timeout. + """ + effective_cwd = cwd or self.working_dir + logger.debug("command=<%s>, timeout=<%s>, cwd=<%s> | executing local command", command, timeout, effective_cwd) + + working_path = Path(effective_cwd) + working_path.mkdir(parents=True, exist_ok=True) + + proc = await asyncio.create_subprocess_shell( + command, + cwd=effective_cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + async for item in self._collect_and_yield(proc, timeout): + yield item + + async def execute_code_streaming( + self, + code: str, + language: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Execute code on the local host using subprocess_exec (no shell intermediary). + + Uses :func:`asyncio.create_subprocess_exec` to invoke the language + interpreter directly, passing code via the ``-c`` flag. This avoids + shell quoting issues entirely — the interpreter name and code are + passed as separate arguments to ``execvp``, not concatenated into a + shell command string. + + The language parameter is validated against a safe pattern to prevent + path traversal or binary injection. If the interpreter is not found + on the system, an ExecutionResult with exit code 127 is returned + (matching the shell convention for "command not found"). + + Args: + code: The source code to execute. + language: The programming language interpreter to use (e.g. + ``"python"``, ``"python3"``, ``"node"``, ``"ruby"``). + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for code execution. ``None`` means use the + sandbox's default ``working_dir``. + **kwargs: Additional keyword arguments for forward compatibility. + + Yields: + :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`. + + Raises: + asyncio.TimeoutError: If the code execution exceeds the timeout. + ValueError: If the language parameter contains unsafe characters. + """ + # Validate language to prevent injection via interpreter name. + # Only allow safe characters (alphanumeric, dots, hyphens, underscores). + if not _LANGUAGE_PATTERN.match(language): + raise ValueError(f"language parameter contains unsafe characters: {language}") + + effective_cwd = cwd or self.working_dir + logger.debug( + "language=<%s>, timeout=<%s>, cwd=<%s> | executing code locally", + language, + timeout, + effective_cwd, + ) + + working_path = Path(effective_cwd) + working_path.mkdir(parents=True, exist_ok=True) + + # Use create_subprocess_exec (not shell) — the interpreter and arguments + # are passed directly to execvp, avoiding all shell quoting issues. + try: + proc = await asyncio.create_subprocess_exec( + language, + "-c", + code, + cwd=effective_cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except FileNotFoundError: + yield ExecutionResult( + exit_code=127, + stdout="", + stderr=f"Language interpreter not found: {language}", + ) + return + + async for item in self._collect_and_yield(proc, timeout): + yield item + + async def _collect_and_yield( + self, + proc: asyncio.subprocess.Process, + timeout: int | None, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Read stdout/stderr from a subprocess, then yield typed chunks and a final ExecutionResult. + + Shared helper used by both ``execute_streaming()`` and ``execute_code_streaming()`` to + avoid duplicating the stream-reading, timeout-handling, and yielding logic. + + Args: + proc: The running subprocess. + timeout: Maximum time in seconds to wait for output. ``None`` means no timeout. + + Yields: + :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`. + + Raises: + asyncio.TimeoutError: If the process exceeds the timeout. + """ + stdout_chunks: list[str] = [] + stderr_chunks: list[str] = [] + + try: + read_task = asyncio.gather( + _read_stream(proc.stdout, stdout_chunks), + _read_stream(proc.stderr, stderr_chunks), + proc.wait(), + ) + await asyncio.wait_for(read_task, timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + raise + + stdout_text = "".join(stdout_chunks) + stderr_text = "".join(stderr_chunks) + + for chunk in stdout_chunks: + yield StreamChunk(data=chunk, stream_type="stdout") + for chunk in stderr_chunks: + yield StreamChunk(data=chunk, stream_type="stderr") + + yield ExecutionResult( + exit_code=0 if proc.returncode is None else proc.returncode, + stdout=stdout_text, + stderr=stderr_text, + ) + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + """Read a file from the local filesystem as raw bytes. + + Uses ``asyncio.to_thread`` to avoid blocking the event loop + during disk I/O. + + Args: + path: Path to the file to read. Relative paths are resolved + against the working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + The file contents as bytes. + + Raises: + FileNotFoundError: If the file does not exist. + """ + full_path = self._resolve_path(path) + return await asyncio.to_thread(full_path.read_bytes) + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + """Write bytes to a file on the local filesystem. + + Creates parent directories if they do not exist. Uses + ``asyncio.to_thread`` to avoid blocking the event loop. + + Args: + path: Path to the file to write. Relative paths are resolved + against the working directory. + content: The content to write as bytes. + **kwargs: Additional keyword arguments for forward compatibility. + """ + full_path = self._resolve_path(path) + + def _write() -> None: + full_path.parent.mkdir(parents=True, exist_ok=True) + full_path.write_bytes(content) + + await asyncio.to_thread(_write) + + async def remove_file(self, path: str, **kwargs: Any) -> None: + """Remove a file from the local filesystem using native Python methods. + + Uses ``asyncio.to_thread`` to avoid blocking the event loop. + + Args: + path: Path to the file to remove. Relative paths are resolved + against the working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Raises: + FileNotFoundError: If the file does not exist. + """ + full_path = self._resolve_path(path) + await asyncio.to_thread(full_path.unlink) + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + """List files in a directory with structured metadata. + + Uses native Python methods (:func:`os.listdir`, :func:`os.stat`) + to return :class:`FileInfo` entries with name, is_dir, and size. + Results include hidden files (dotfiles) and are sorted for + deterministic ordering. + + Uses ``asyncio.to_thread`` to avoid blocking the event loop + during disk I/O. + + Args: + path: Path to the directory to list. Relative paths are resolved + against the working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + A sorted list of :class:`FileInfo` entries. + + Raises: + FileNotFoundError: If the directory does not exist. + """ + full_path = self._resolve_path(path) + + def _list() -> list[FileInfo]: + if not full_path.is_dir(): + raise FileNotFoundError(f"Directory not found: {full_path}") + entries = [] + for name in sorted(os.listdir(full_path)): + entry_path = full_path / name + try: + stat = entry_path.stat() + entries.append( + FileInfo( + name=name, + is_dir=entry_path.is_dir(), + size=stat.st_size, + ) + ) + except OSError: + # If we can't stat the entry (e.g., broken symlink), include + # it with defaults + entries.append(FileInfo(name=name)) + return entries + + return await asyncio.to_thread(_list) diff --git a/src/strands/sandbox/noop.py b/src/strands/sandbox/noop.py new file mode 100644 index 000000000..d4a0b48ee --- /dev/null +++ b/src/strands/sandbox/noop.py @@ -0,0 +1,73 @@ +"""No-op sandbox implementation that disables all sandbox functionality. + +Use ``NoOpSandbox`` to explicitly disable sandbox features on an agent. +All operations raise ``NotImplementedError`` with a clear message. + +Example:: + + from strands import Agent + from strands.sandbox import NoOpSandbox + + # Explicitly disable sandbox functionality + agent = Agent(sandbox=NoOpSandbox()) +""" + +from collections.abc import AsyncGenerator +from typing import Any + +from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk + + +class NoOpSandbox(Sandbox): + """No-op sandbox that raises NotImplementedError for all operations. + + Use this to explicitly disable sandbox functionality on an agent. + Any tool that attempts to use the sandbox will get a clear error + indicating that sandbox is disabled, rather than silently failing. + + Example:: + + from strands import Agent + from strands.sandbox import NoOpSandbox + + agent = Agent(sandbox=NoOpSandbox()) + """ + + async def execute_streaming( + self, + command: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot execute commands.") + yield # type: ignore[unreachable] # pragma: no cover + + async def execute_code_streaming( + self, + code: str, + language: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot execute code.") + yield # type: ignore[unreachable] # pragma: no cover + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot read files.") + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot write files.") + + async def remove_file(self, path: str, **kwargs: Any) -> None: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot remove files.") + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + """Raise NotImplementedError — sandbox is disabled.""" + raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot list files.") diff --git a/src/strands/sandbox/shell_based.py b/src/strands/sandbox/shell_based.py new file mode 100644 index 000000000..f68b8ed45 --- /dev/null +++ b/src/strands/sandbox/shell_based.py @@ -0,0 +1,200 @@ +"""Shell-based sandbox with default implementations for file and code operations. + +This module defines the ShellBasedSandbox abstract class, which provides +shell-command-based defaults for file operations (read, write, remove, list) +and code execution. Subclasses only need to implement ``execute_streaming()``. + +Use this for remote environments where only shell access is available +(e.g., Docker containers, SSH connections). For local execution, use +:class:`~strands.sandbox.host.HostSandbox` which uses native +Python methods instead. + +Class hierarchy:: + + Sandbox (ABC, all abstract) + └── ShellBasedSandbox (ABC, only execute_streaming() abstract — shell-based file ops + execute_code) +""" + +import base64 +import logging +import shlex +from abc import ABC +from collections.abc import AsyncGenerator +from typing import Any + +from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk + +logger = logging.getLogger(__name__) + + +class ShellBasedSandbox(Sandbox, ABC): + """Abstract sandbox that provides shell-based defaults for file and code operations. + + Subclasses only need to implement :meth:`execute_streaming`. The remaining + operations — ``execute_code_streaming``, ``read_file``, ``write_file``, + ``remove_file``, and ``list_files`` — are implemented via shell commands + piped through ``execute_streaming()``. + + This class is intended for remote execution environments where only + shell access is available (e.g., Docker containers, SSH connections). + For local execution, use :class:`~strands.sandbox.host.HostSandbox` + which uses native Python methods for better safety and reliability. + + Subclasses may override any method with a native implementation for + better performance. + """ + + async def execute_code_streaming( + self, + code: str, + language: str, + timeout: int | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + """Execute code in the sandbox, streaming output. + + The default implementation passes code to the language interpreter + via ``-c`` with proper shell quoting. Both the ``language`` and + ``code`` parameters are sanitized with :func:`shlex.quote` to + prevent command injection. + + Args: + code: The source code to execute. + language: The programming language interpreter to use (e.g. + ``"python"``, ``"node"``, ``"ruby"``). + timeout: Maximum execution time in seconds. ``None`` means no timeout. + cwd: Working directory for code execution. ``None`` means use the + sandbox's default working directory. + **kwargs: Additional keyword arguments for forward compatibility. + + Yields: + :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`. + + Note: + The default implementation assumes the language interpreter + accepts code via the ``-c`` flag (e.g., ``python -c "code"``). + Override this method for interpreters that require a different + invocation pattern (e.g., ``javac``, ``gcc``, ``go run``). + """ + async for chunk in self.execute_streaming( + f"{shlex.quote(language)} -c {shlex.quote(code)}", timeout=timeout, cwd=cwd + ): + yield chunk + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + """Read a file from the sandbox filesystem as raw bytes. + + Uses ``base64`` to encode the file content for safe transport + through the shell text layer, then decodes on the Python side. + This preserves binary content (images, PDFs, compiled files) + that would be corrupted by direct ``cat`` through a text pipe. + + Override for native file I/O support if the backend provides + a binary-safe channel (e.g., Docker stdin/stdout pipes). + + Args: + path: Path to the file to read. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + The file contents as bytes. + + Raises: + FileNotFoundError: If the file does not exist or cannot be read. + """ + result = await self.execute(f"base64 {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + # base64 output is ASCII-safe text — decode it back to raw bytes + return base64.b64decode(result.stdout) + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + """Write bytes to a file in the sandbox filesystem. + + Uses ``base64`` encoding to safely transport binary content through + the shell text layer. The encoded data is piped through ``base64 -d`` + to decode it back to raw bytes on the remote side. Parent directories + are created automatically via ``mkdir -p``. + + This approach handles any content type (text, images, PDFs, compiled + files) without corruption. Override for native file I/O if the + backend provides a binary-safe channel. + + Note: + The base64-encoded content is passed as a shell argument to + ``printf``. For very large files (roughly >1.5 MB of original + content, which becomes ~2 MB after base64 encoding), this may + exceed the shell's ``ARG_MAX`` limit on some systems. For large + binary files, override this method with a stdin-piping approach + or use a binary-safe channel. + + Args: + path: Path to the file to write. + content: The content to write as bytes. + **kwargs: Additional keyword arguments for forward compatibility. + + Raises: + IOError: If the file cannot be written. + """ + encoded = base64.b64encode(content).decode("ascii") + quoted_path = shlex.quote(path) + # Create parent directories, then write content via base64 decode + cmd = f"mkdir -p \"$(dirname {quoted_path})\" && printf '%s' {shlex.quote(encoded)} | base64 -d > {quoted_path}" + result = await self.execute(cmd) + if result.exit_code != 0: + raise OSError(result.stderr) + + async def remove_file(self, path: str, **kwargs: Any) -> None: + """Remove a file from the sandbox filesystem. + + Override for native file removal support. The default implementation + uses ``rm`` via the shell. + + Args: + path: Path to the file to remove. + **kwargs: Additional keyword arguments for forward compatibility. + + Raises: + FileNotFoundError: If the file does not exist. + """ + result = await self.execute(f"rm {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + """List files in a sandbox directory with structured metadata. + + Uses ``ls -1aF`` to include hidden files (dotfiles) and identify + directories. Returns :class:`FileInfo` entries with name and is_dir. + Size is ``None`` for shell-based listing (cannot be determined + reliably from ``ls -1aF`` output alone). + + Override for native directory listing support. + + Args: + path: Path to the directory to list. + **kwargs: Additional keyword arguments for forward compatibility. + + Returns: + A list of :class:`FileInfo` entries. + + Raises: + FileNotFoundError: If the directory does not exist. + """ + result = await self.execute(f"ls -1aF {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + + entries = [] + for line in result.stdout.strip().split("\n"): + line = line.strip() + if not line or line in (".", "..", "./", "../"): + continue + # ls -F appends / for directories, @ for symlinks, * for executables + is_dir = line.endswith("/") + # Strip the type indicator from the name + name = line.rstrip("/@*=|") + if name: + entries.append(FileInfo(name=name, is_dir=is_dir)) + return entries diff --git a/tests/strands/sandbox/__init__.py b/tests/strands/sandbox/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/strands/sandbox/test_adversarial_security.py b/tests/strands/sandbox/test_adversarial_security.py new file mode 100644 index 000000000..aeb4d50e8 --- /dev/null +++ b/tests/strands/sandbox/test_adversarial_security.py @@ -0,0 +1,329 @@ +"""Adversarial tests: Security, path traversal, injection, and edge cases. + +Tests for: +- Path traversal attacks in HostSandbox +- Content injection edge cases +- Symlink attacks +- Binary/special content handling +""" + +import pytest + +from strands.sandbox.base import ExecutionResult +from strands.sandbox.host import HostSandbox +from strands.sandbox.shell_based import ShellBasedSandbox + + +class TestHostSandboxDoesNotConfineFilesystemAccess: + """Documents that HostSandbox does NOT confine file access to working_dir. + + HostSandbox is a host execution environment with no sandboxing. + Path traversal, absolute paths, and symlinks all work as they would + in any Python program. For confined execution, use a Sandbox + implementation that provides actual isolation (e.g., Docker, cloud sandbox). + + These tests verify the LACK of isolation — they are NOT security tests. + """ + + @pytest.mark.asyncio + async def test_read_file_allows_path_traversal(self, tmp_path): + """read_file with ../.. reads outside working_dir (no confinement).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + outside_dir = tmp_path.parent / "outside_sandbox" + outside_dir.mkdir(exist_ok=True) + outside_file = outside_dir / "secret.txt" + outside_file.write_text("SECRET_DATA") + + relative_path = "../outside_sandbox/secret.txt" + content = await sandbox.read_file(relative_path) + assert content == b"SECRET_DATA" + + @pytest.mark.asyncio + async def test_write_file_allows_path_traversal(self, tmp_path): + """write_file with ../.. writes outside working_dir (no confinement).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + outside_dir = tmp_path.parent / "write_escape" + outside_dir.mkdir(exist_ok=True) + + relative_path = "../write_escape/pwned.txt" + await sandbox.write_file(relative_path, b"PWNED") + assert (outside_dir / "pwned.txt").read_bytes() == b"PWNED" + + @pytest.mark.asyncio + async def test_execute_accesses_entire_filesystem(self, tmp_path): + """execute() runs arbitrary shell commands with no filesystem confinement.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("echo hello_from_shell") + assert result.exit_code == 0 + assert "hello_from_shell" in result.stdout + + @pytest.mark.asyncio + async def test_absolute_path_bypasses_working_dir(self, tmp_path): + """Absolute paths bypass working_dir — HostSandbox has no path confinement.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + abs_path = str(tmp_path.parent / "abs_escape.txt") + await sandbox.write_file(abs_path, b"escaped") + content = await sandbox.read_file(abs_path) + assert content == b"escaped" + + +class TestContentEdgeCases: + """Can content with special characters break file operations?""" + + @pytest.mark.asyncio + async def test_content_with_shell_metacharacters(self, tmp_path): + """Content with shell metacharacters should be preserved (native Python I/O).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + content = b"hello $USER `whoami` $(id) && rm -rf / ; echo pwned" + await sandbox.write_file("test.txt", content) + read_back = await sandbox.read_file("test.txt") + assert read_back == content + + @pytest.mark.asyncio + async def test_content_with_null_bytes(self, tmp_path): + """Content with null bytes should be handled by native Python I/O.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + content = b"before\x00after" + await sandbox.write_file("null.txt", content) + read_back = await sandbox.read_file("null.txt") + assert read_back == content + + @pytest.mark.asyncio + async def test_empty_content(self, tmp_path): + """Writing empty content should create an empty file.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("empty.txt", b"") + content = await sandbox.read_file("empty.txt") + assert content == b"" + + @pytest.mark.asyncio + async def test_very_large_content(self, tmp_path): + """Writing 10MB of content should work.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + large_content = b"A" * (10 * 1024 * 1024) + await sandbox.write_file("large.txt", large_content) + content = await sandbox.read_file("large.txt") + assert len(content) == len(large_content) + assert content == large_content + + +class TestHostSandboxEdgeCases: + """Edge cases specific to HostSandbox.""" + + @pytest.mark.asyncio + async def test_symlink_read(self, tmp_path): + """Reading through a symlink should work.""" + real_file = tmp_path / "real.txt" + real_file.write_text("real content") + symlink = tmp_path / "link.txt" + symlink.symlink_to(real_file) + + sandbox = HostSandbox(working_dir=str(tmp_path)) + content = await sandbox.read_file("link.txt") + assert content == b"real content" + + @pytest.mark.asyncio + async def test_symlink_outside_sandbox(self, tmp_path): + """Symlink pointing outside working_dir — HostSandbox follows it.""" + outside_dir = tmp_path.parent / "symlink_escape" + outside_dir.mkdir(exist_ok=True) + outside_file = outside_dir / "target.txt" + outside_file.write_text("escaped via symlink") + + symlink = tmp_path / "evil_link.txt" + symlink.symlink_to(outside_file) + + sandbox = HostSandbox(working_dir=str(tmp_path)) + content = await sandbox.read_file("evil_link.txt") + assert content == b"escaped via symlink" + + @pytest.mark.asyncio + async def test_unicode_filename(self, tmp_path): + """Unicode filenames should work.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("日本語.txt", b"Japanese content") + content = await sandbox.read_file("日本語.txt") + assert content == b"Japanese content" + + @pytest.mark.asyncio + async def test_filename_with_spaces_and_special_chars(self, tmp_path): + """Filenames with spaces and special characters should work.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("file with spaces.txt", b"spaced") + content = await sandbox.read_file("file with spaces.txt") + assert content == b"spaced" + + @pytest.mark.asyncio + async def test_deeply_nested_directory_creation(self, tmp_path): + """write_file should create deeply nested directories.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + deep_path = "a/b/c/d/e/f/g/h/i/j/deep.txt" + await sandbox.write_file(deep_path, b"deep") + content = await sandbox.read_file(deep_path) + assert content == b"deep" + + @pytest.mark.asyncio + async def test_list_files_with_hidden_files(self, tmp_path): + """list_files should include hidden files (os.listdir includes them).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("visible.txt", b"visible") + await sandbox.write_file(".hidden", b"hidden") + + files = await sandbox.list_files(".") + names = [f.name for f in files] + assert "visible.txt" in names + # Native Python os.listdir includes hidden files! + assert ".hidden" in names + + @pytest.mark.asyncio + async def test_read_nonexistent_with_special_chars_in_path(self, tmp_path): + """read_file with special chars in path should raise FileNotFoundError. + + On Windows, ' and " are invalid in file paths, raising OSError instead. + """ + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises((FileNotFoundError, OSError)): + await sandbox.read_file("nonexistent 'file\".txt") + + @pytest.mark.asyncio + async def test_execute_code_with_multiline_and_quotes(self, tmp_path): + """execute_code should handle code with all types of quotes.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + code = """ +x = "hello 'world'" +y = 'hello "world"' +print(x) +print(y) +""" + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + assert "hello 'world'" in result.stdout + assert 'hello "world"' in result.stdout + + @pytest.mark.asyncio + async def test_execute_code_with_backslashes(self, tmp_path): + """execute_code should handle backslashes in code.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + code = 'print("path\\\\to\\\\file")' + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + assert "path\\to\\file" in result.stdout + + @pytest.mark.asyncio + async def test_execute_returns_correct_exit_codes(self, tmp_path): + """Various exit codes should be preserved.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + for code in [0, 1, 2, 127, 255]: + result = await sandbox.execute(f"exit {code}") + assert result.exit_code == code, f"Expected exit code {code}, got {result.exit_code}" + + @pytest.mark.asyncio + async def test_file_io_uses_async_to_thread(self, tmp_path): + """HostSandbox wraps file I/O with asyncio.to_thread to avoid blocking. + + All pathlib operations (read_bytes, write_bytes, unlink, listdir) are + wrapped in asyncio.to_thread, keeping the event loop responsive during + disk I/O. + """ + sandbox = HostSandbox(working_dir=str(tmp_path)) + large_content = b"X" * (1024 * 1024) # 1MB + await sandbox.write_file("blocking_test.txt", large_content) + content = await sandbox.read_file("blocking_test.txt") + assert len(content) == 1024 * 1024 + + @pytest.mark.asyncio + async def test_remove_file_nonexistent(self, tmp_path): + """remove_file should raise FileNotFoundError for missing files.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(FileNotFoundError): + await sandbox.remove_file("nonexistent.txt") + + @pytest.mark.asyncio + async def test_remove_file_then_read(self, tmp_path): + """Removing a file and then reading it should raise FileNotFoundError.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("to_delete.txt", b"content") + await sandbox.remove_file("to_delete.txt") + with pytest.raises(FileNotFoundError): + await sandbox.read_file("to_delete.txt") + + @pytest.mark.asyncio + async def test_remove_file_outside_sandbox(self, tmp_path): + """remove_file with absolute path can delete files outside sandbox.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + outside_dir = tmp_path.parent / "remove_escape" + outside_dir.mkdir(exist_ok=True) + outside_file = outside_dir / "target.txt" + outside_file.write_text("will be deleted") + + await sandbox.remove_file(str(outside_file)) + assert not outside_file.exists() + + @pytest.mark.asyncio + async def test_execute_code_rejects_unsafe_language(self, tmp_path): + """execute_code validates language parameter against unsafe characters.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(ValueError, match="unsafe characters"): + await sandbox.execute_code("print(1)", language="python; rm -rf /") + + @pytest.mark.asyncio + async def test_execute_code_rejects_path_traversal_language(self, tmp_path): + """execute_code rejects language with path separators.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(ValueError, match="unsafe characters"): + await sandbox.execute_code("1", language="../../../bin/sh") + + @pytest.mark.asyncio + async def test_execute_code_rejects_space_injection(self, tmp_path): + """execute_code rejects language with spaces.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(ValueError, match="unsafe characters"): + await sandbox.execute_code("1", language="python -m http.server") + + +class TestShellBasedSandboxHeredocEdgeCases: + """Edge cases in the ShellBasedSandbox heredoc implementation.""" + + @pytest.mark.asyncio + async def test_write_file_content_with_single_quotes(self): + """Content with single quotes is safely transported via base64.""" + + class MockShellSandbox(ShellBasedSandbox): + def __init__(self): + super().__init__() + self.last_command = "" + + async def execute_streaming(self, command, timeout=None, **kwargs): + self.last_command = command + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def start(self): + self._started = True + + sandbox = MockShellSandbox() + content = "line1\nline2\nline3" + await sandbox.write_file("/tmp/test.txt", content.encode()) + + cmd = sandbox.last_command + assert "base64 -d" in cmd + assert "/tmp/test.txt" in cmd + + @pytest.mark.asyncio + async def test_write_file_empty_path(self): + """Empty path should still be quoted.""" + + class MockShellSandbox(ShellBasedSandbox): + def __init__(self): + super().__init__() + self.last_command = "" + + async def execute_streaming(self, command, timeout=None, **kwargs): + self.last_command = command + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def start(self): + self._started = True + + sandbox = MockShellSandbox() + await sandbox.write_file("", b"content") + assert "''" in sandbox.last_command diff --git a/tests/strands/sandbox/test_adversarial_shared_sandbox.py b/tests/strands/sandbox/test_adversarial_shared_sandbox.py new file mode 100644 index 000000000..709394860 --- /dev/null +++ b/tests/strands/sandbox/test_adversarial_shared_sandbox.py @@ -0,0 +1,136 @@ +"""Adversarial tests: Shared sandboxes and concurrent tool calls. + +Tests what happens when: +- Multiple agents share the same sandbox instance +- Multiple concurrent execute() calls hit the same sandbox +- File operations overlap (write/read races) +- Lifecycle races (start/stop during execution) +""" + +import asyncio + +import pytest + +from strands.sandbox.base import ExecutionResult +from strands.sandbox.host import HostSandbox + + +class TestSharedSandboxConcurrentExecution: + """What happens when multiple coroutines call execute() on the same sandbox concurrently?""" + + @pytest.mark.asyncio + async def test_concurrent_executes_same_sandbox(self, tmp_path): + """Multiple concurrent execute() calls on same sandbox should not corrupt each other.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + + async def run_command(cmd: str) -> ExecutionResult: + return await sandbox.execute(cmd) + + # Run 10 concurrent commands + results = await asyncio.gather( + run_command("echo cmd0"), + run_command("echo cmd1"), + run_command("echo cmd2"), + run_command("echo cmd3"), + run_command("echo cmd4"), + run_command("echo cmd5"), + run_command("echo cmd6"), + run_command("echo cmd7"), + run_command("echo cmd8"), + run_command("echo cmd9"), + ) + + # Each command should have its own exit code and output + for i, result in enumerate(results): + assert result.exit_code == 0, f"cmd{i} failed: {result.stderr}" + assert result.stdout.strip() == f"cmd{i}", f"cmd{i} got wrong output: {result.stdout!r}" + + @pytest.mark.asyncio + async def test_concurrent_file_write_same_file(self, tmp_path): + """Two concurrent writes to the same file — last write wins, no crash.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + + async def write_content(content: bytes): + await sandbox.write_file("shared.txt", content) + + # Run concurrent writes + await asyncio.gather( + write_content(b"content_A"), + write_content(b"content_B"), + ) + + # File should exist and contain one of the values (no corruption) + content = await sandbox.read_file("shared.txt") + assert content in (b"content_A", b"content_B"), f"Corrupted content: {content!r}" + + @pytest.mark.asyncio + async def test_concurrent_file_write_different_files(self, tmp_path): + """Concurrent writes to different files should all succeed.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + + async def write_file(name: str, content: bytes): + await sandbox.write_file(name, content) + + await asyncio.gather(*[write_file(f"file_{i}.txt", f"content_{i}".encode()) for i in range(20)]) + + # All files should be written correctly + for i in range(20): + content = await sandbox.read_file(f"file_{i}.txt") + assert content == f"content_{i}".encode(), f"file_{i}.txt has wrong content: {content!r}" + + @pytest.mark.asyncio + async def test_concurrent_read_write_same_file(self, tmp_path): + """Concurrent read + write on same file — should not crash.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("test.txt", b"initial") + + async def writer(): + for i in range(10): + await sandbox.write_file("test.txt", f"version_{i}".encode()) + await asyncio.sleep(0.001) + + async def reader(): + results = [] + for _ in range(10): + try: + content = await sandbox.read_file("test.txt") + results.append(content) + except FileNotFoundError: + # File might be in the middle of being overwritten + results.append("FILE_NOT_FOUND") + await asyncio.sleep(0.001) + return results + + # This should not raise any unhandled exceptions + writer_task = asyncio.create_task(writer()) + reader_task = asyncio.create_task(reader()) + results = await reader_task + await writer_task + + # At least some reads should succeed + successful_reads = [r for r in results if r != "FILE_NOT_FOUND"] + assert len(successful_reads) > 0 + + +class TestSharedSandboxBetweenAgents: + """What happens when two Agent instances share the same sandbox?""" + + def test_two_agents_same_sandbox_instance(self): + """Two agents sharing the same sandbox should reference the same object.""" + from strands import Agent + + sandbox = HostSandbox(working_dir="/tmp/shared") + agent1 = Agent(sandbox=sandbox) + agent2 = Agent(sandbox=sandbox) + assert agent1.sandbox is agent2.sandbox + + @pytest.mark.asyncio + async def test_shared_sandbox_working_dir_isolation(self, tmp_path): + """Commands from both agents should execute in the same working directory.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + + # Agent 1 creates a file + await sandbox.write_file("from_agent1.txt", b"hello from 1") + # Agent 2 should see it + content = await sandbox.read_file("from_agent1.txt") + assert content == b"hello from 1" diff --git a/tests/strands/sandbox/test_agent_sandbox.py b/tests/strands/sandbox/test_agent_sandbox.py new file mode 100644 index 000000000..a24cf976e --- /dev/null +++ b/tests/strands/sandbox/test_agent_sandbox.py @@ -0,0 +1,81 @@ +"""Tests for Agent + Sandbox integration.""" + +from strands.agent.agent import Agent +from strands.sandbox.base import Sandbox +from strands.sandbox.host import HostSandbox + + +class TestAgentSandboxIntegration: + def test_agent_sandbox_defaults_to_host_sandbox(self) -> None: + """Agent.sandbox defaults to HostSandbox when not explicitly set.""" + agent = Agent(model="test") + assert agent.sandbox is not None + assert isinstance(agent.sandbox, HostSandbox) + assert isinstance(agent.sandbox, Sandbox) + + def test_agent_sandbox_accepts_host_sandbox(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + agent = Agent(model="test", sandbox=sandbox) + assert agent.sandbox is sandbox + assert isinstance(agent.sandbox, Sandbox) + + def test_agent_sandbox_default_uses_cwd(self) -> None: + """Default HostSandbox uses the current working directory.""" + import os + + agent = Agent(model="test") + assert isinstance(agent.sandbox, HostSandbox) + assert agent.sandbox.working_dir == os.getcwd() + + def test_agent_sandbox_is_accessible(self, tmp_path: object) -> None: + """Tools can access sandbox via agent.sandbox.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + agent = Agent(model="test", sandbox=sandbox) + assert agent.sandbox.working_dir == str(tmp_path) + + +class TestAgentSandboxToolAccess: + """Critical: Verify tools can access sandbox through tool_context.agent.sandbox.""" + + def test_sandbox_accessible_via_agent_attribute(self, tmp_path: object) -> None: + """Simulates tool access pattern: tool_context.agent.sandbox.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + agent = Agent(model="test", sandbox=sandbox) + + # This is the access pattern tools use: tool_context.agent.sandbox + accessed_sandbox = agent.sandbox + assert accessed_sandbox is sandbox + assert accessed_sandbox.working_dir == str(tmp_path) + assert hasattr(accessed_sandbox, "execute") + assert hasattr(accessed_sandbox, "read_file") + assert hasattr(accessed_sandbox, "write_file") + assert hasattr(accessed_sandbox, "remove_file") + assert hasattr(accessed_sandbox, "list_files") + assert hasattr(accessed_sandbox, "execute_code") + + def test_multiple_agents_share_sandbox_correctly(self, tmp_path: object) -> None: + """Two agents sharing a sandbox should both access the same instance.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + agent1 = Agent(model="test", sandbox=sandbox) + agent2 = Agent(model="test", sandbox=sandbox) + assert agent1.sandbox is agent2.sandbox + assert agent1.sandbox.working_dir == agent2.sandbox.working_dir + + def test_default_sandbox_has_all_methods(self) -> None: + """Default HostSandbox should have all abstract methods implemented.""" + agent = Agent(model="test") + ws = agent.sandbox + # Verify all 6 abstract methods + 2 convenience methods exist + for method in [ + "execute", + "execute_streaming", + "execute_code", + "execute_code_streaming", + "read_file", + "write_file", + "remove_file", + "list_files", + "read_text", + "write_text", + ]: + assert callable(getattr(ws, method)), f"Missing method: {method}" diff --git a/tests/strands/sandbox/test_base.py b/tests/strands/sandbox/test_base.py new file mode 100644 index 000000000..ae6e704e4 --- /dev/null +++ b/tests/strands/sandbox/test_base.py @@ -0,0 +1,735 @@ +"""Tests for the Sandbox ABC, ShellBasedSandbox, ExecutionResult, FileInfo, and OutputFile.""" + +from collections.abc import AsyncGenerator +from typing import Any + +import pytest + +from strands.sandbox.base import ( + ExecutionResult, + FileInfo, + OutputFile, + Sandbox, + StreamChunk, +) +from strands.sandbox.shell_based import ShellBasedSandbox + + +class ConcreteShellSandbox(ShellBasedSandbox): + """Minimal concrete ShellBasedSandbox implementation for testing.""" + + def __init__(self) -> None: + self.commands: list[str] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.commands.append(command) + if "fail" in command: + yield ExecutionResult(exit_code=1, stdout="", stderr="command failed") + return + # For base64 commands (used by read_file), return valid base64 output + if command.startswith("base64 "): + import base64 as b64 + + stdout = b64.b64encode(b"mock file content").decode("ascii") + "\n" + else: + stdout = f"output of: {command}\n" + yield StreamChunk(data=stdout) + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + +class TestExecutionResult: + def test_execution_result_fields(self) -> None: + result = ExecutionResult(exit_code=0, stdout="hello", stderr="") + assert result.exit_code == 0 + assert result.stdout == "hello" + assert result.stderr == "" + + def test_execution_result_error(self) -> None: + result = ExecutionResult(exit_code=1, stdout="", stderr="error msg") + assert result.exit_code == 1 + assert result.stderr == "error msg" + + def test_execution_result_equality(self) -> None: + r1 = ExecutionResult(exit_code=0, stdout="out", stderr="err") + r2 = ExecutionResult(exit_code=0, stdout="out", stderr="err") + assert r1 == r2 + + def test_execution_result_output_files_default_empty(self) -> None: + result = ExecutionResult(exit_code=0, stdout="", stderr="") + assert result.output_files == [] + + def test_execution_result_with_output_files(self) -> None: + files = [OutputFile(name="plot.png", content=b"\x89PNG", mime_type="image/png")] + result = ExecutionResult(exit_code=0, stdout="", stderr="", output_files=files) + assert len(result.output_files) == 1 + assert result.output_files[0].name == "plot.png" + assert result.output_files[0].content == b"\x89PNG" + assert result.output_files[0].mime_type == "image/png" + + +class TestFileInfo: + def test_file_info_file(self) -> None: + info = FileInfo(name="test.txt", is_dir=False, size=1024) + assert info.name == "test.txt" + assert info.is_dir is False + assert info.size == 1024 + + def test_file_info_directory(self) -> None: + info = FileInfo(name="subdir", is_dir=True) + assert info.name == "subdir" + assert info.is_dir is True + assert info.size is None # default is None now + + def test_file_info_equality(self) -> None: + f1 = FileInfo(name="a.txt", is_dir=False, size=100) + f2 = FileInfo(name="a.txt", is_dir=False, size=100) + assert f1 == f2 + + def test_file_info_optional_fields_default_none(self) -> None: + """is_dir and size default to None when not provided.""" + info = FileInfo(name="unknown.txt") + assert info.name == "unknown.txt" + assert info.is_dir is None + assert info.size is None + + def test_file_info_with_only_name(self) -> None: + """FileInfo with only name is valid — unknown metadata.""" + info = FileInfo(name="mystery") + assert info.is_dir is None + assert info.size is None + + +class TestOutputFile: + def test_output_file_fields(self) -> None: + f = OutputFile(name="chart.svg", content=b"", mime_type="image/svg+xml") + assert f.name == "chart.svg" + assert f.content == b"" + assert f.mime_type == "image/svg+xml" + + def test_output_file_default_mime_type(self) -> None: + f = OutputFile(name="data.bin", content=b"\x00\x01") + assert f.mime_type == "application/octet-stream" + + +class TestSandboxABC: + """Tests that Sandbox has all abstract methods and cannot be partially implemented.""" + + def test_cannot_instantiate_abstract(self) -> None: + with pytest.raises(TypeError): + Sandbox() # type: ignore + + def test_cannot_instantiate_with_only_execute_streaming(self) -> None: + """A class implementing only execute_streaming() is still abstract.""" + + class OnlyExecute(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + with pytest.raises(TypeError): + OnlyExecute() # type: ignore + + def test_all_abstract_methods_required(self) -> None: + """A class must implement all abstract methods to be concrete.""" + + class AllMethods(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + # Should not raise + sandbox = AllMethods() + assert sandbox is not None + + def test_missing_remove_file_is_abstract(self) -> None: + """A class missing remove_file() is still abstract.""" + + class MissingRemoveFile(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + with pytest.raises(TypeError): + MissingRemoveFile() # type: ignore + + @pytest.mark.asyncio + async def test_read_text_convenience(self) -> None: + """Test that read_text decodes bytes from read_file.""" + + class TextSandbox(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"hello world" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = TextSandbox() + text = await sandbox.read_text("test.txt") + assert text == "hello world" + assert isinstance(text, str) + + @pytest.mark.asyncio + async def test_write_text_convenience(self) -> None: + """Test that write_text encodes string to bytes for write_file.""" + + class TextSandbox(Sandbox): + def __init__(self) -> None: + self.written_content: bytes = b"" + + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + self.written_content = content + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = TextSandbox() + await sandbox.write_text("test.txt", "hello world") + assert sandbox.written_content == b"hello world" + + @pytest.mark.asyncio + async def test_non_streaming_execute_convenience(self) -> None: + """Test that execute() returns ExecutionResult directly.""" + + class SimpleSandbox(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield StreamChunk(data="output\n") + yield ExecutionResult(exit_code=0, stdout="output\n", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = SimpleSandbox() + result = await sandbox.execute("echo hello") + assert isinstance(result, ExecutionResult) + assert result.exit_code == 0 + assert result.stdout == "output\n" + + @pytest.mark.asyncio + async def test_non_streaming_execute_code_convenience(self) -> None: + """Test that execute_code() returns ExecutionResult directly.""" + + class SimpleSandbox(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield StreamChunk(data="code output\n") + yield ExecutionResult(exit_code=0, stdout="code output\n", stderr="") + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = SimpleSandbox() + result = await sandbox.execute_code("print(1)", language="python") + assert isinstance(result, ExecutionResult) + assert result.exit_code == 0 + assert result.stdout == "code output\n" + + @pytest.mark.asyncio + async def test_execute_raises_on_missing_result(self) -> None: + """execute() raises RuntimeError if execute_streaming yields no ExecutionResult.""" + + class BadSandbox(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield "just a string" + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield "just a string" + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = BadSandbox() + with pytest.raises(RuntimeError, match="did not yield an ExecutionResult"): + await sandbox.execute("anything") + + @pytest.mark.asyncio + async def test_execute_code_raises_on_missing_result(self) -> None: + """execute_code() raises RuntimeError if execute_code_streaming yields no ExecutionResult.""" + + class BadSandbox(Sandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield "just a string" + + async def execute_code_streaming( + self, code: str, language: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + yield "just a string" + + async def read_file(self, path: str, **kwargs: Any) -> bytes: + return b"" + + async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None: + pass + + async def remove_file(self, path: str, **kwargs: Any) -> None: + pass + + async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]: + return [] + + sandbox = BadSandbox() + with pytest.raises(RuntimeError, match="did not yield an ExecutionResult"): + await sandbox.execute_code("print(1)", language="python") + + +class TestShellBasedSandboxABC: + """Tests that ShellBasedSandbox is still abstract (execute_streaming() not implemented).""" + + def test_cannot_instantiate_shell_based_sandbox(self) -> None: + with pytest.raises(TypeError): + ShellBasedSandbox() # type: ignore + + def test_shell_based_sandbox_only_needs_execute_streaming(self) -> None: + """ShellBasedSandbox requires only execute_streaming() to be concrete.""" + sandbox = ConcreteShellSandbox() + assert sandbox is not None + + +class TestShellBasedSandboxOperations: + """Tests for the shell-based default implementations.""" + + @pytest.mark.asyncio + async def test_execute_streaming_yields_lines_and_result(self) -> None: + sandbox = ConcreteShellSandbox() + chunks: list[StreamChunk | ExecutionResult] = [] + async for chunk in sandbox.execute_streaming("echo hello"): + chunks.append(chunk) + assert isinstance(chunks[-1], ExecutionResult) + assert chunks[-1].exit_code == 0 + assert any(isinstance(c, StreamChunk) for c in chunks[:-1]) + assert sandbox.commands == ["echo hello"] + + @pytest.mark.asyncio + async def test_non_streaming_execute(self) -> None: + sandbox = ConcreteShellSandbox() + result = await sandbox.execute("echo hello") + assert isinstance(result, ExecutionResult) + assert result.exit_code == 0 + assert "echo hello" in result.stdout + + @pytest.mark.asyncio + async def test_execute_code_streaming_default(self) -> None: + sandbox = ConcreteShellSandbox() + chunks: list[StreamChunk | ExecutionResult] = [] + async for chunk in sandbox.execute_code_streaming("print('hi')", language="python"): + chunks.append(chunk) + assert isinstance(chunks[-1], ExecutionResult) + assert chunks[-1].exit_code == 0 + + @pytest.mark.asyncio + async def test_non_streaming_execute_code(self) -> None: + sandbox = ConcreteShellSandbox() + result = await sandbox.execute_code("print('hi')", language="python") + assert result.exit_code == 0 + assert len(sandbox.commands) == 1 + assert "python" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_execute_code_custom_language(self) -> None: + sandbox = ConcreteShellSandbox() + result = await sandbox.execute_code("puts 'hi'", language="ruby") + assert result.exit_code == 0 + assert "ruby" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_returns_bytes(self) -> None: + sandbox = ConcreteShellSandbox() + content = await sandbox.read_file("/tmp/test.txt") + assert isinstance(content, bytes) + assert content == b"mock file content" + assert "base64" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_not_found(self) -> None: + sandbox = ConcreteShellSandbox() + with pytest.raises(FileNotFoundError): + await sandbox.read_file("/tmp/fail.txt") + + @pytest.mark.asyncio + async def test_write_file_accepts_bytes(self) -> None: + sandbox = ConcreteShellSandbox() + await sandbox.write_file("/tmp/test.txt", b"hello content") + assert len(sandbox.commands) == 1 + assert "/tmp/test.txt" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_write_file_failure(self) -> None: + sandbox = ConcreteShellSandbox() + with pytest.raises(IOError): + await sandbox.write_file("/tmp/fail.txt", b"content") + + @pytest.mark.asyncio + async def test_write_file_uses_base64(self) -> None: + sandbox = ConcreteShellSandbox() + await sandbox.write_file("/tmp/test.txt", b"content with STRANDS_EOF inside") + assert "base64 -d" in sandbox.commands[0] + assert "/tmp/test.txt" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_write_file_path_is_shell_quoted(self) -> None: + sandbox = ConcreteShellSandbox() + await sandbox.write_file("/tmp/test file.txt", b"content") + assert "'/tmp/test file.txt'" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_path_is_shell_quoted(self) -> None: + sandbox = ConcreteShellSandbox() + await sandbox.read_file("/tmp/test file.txt") + assert "'/tmp/test file.txt'" in sandbox.commands[0] + assert "base64" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_remove_file_success(self) -> None: + sandbox = ConcreteShellSandbox() + await sandbox.remove_file("/tmp/test.txt") + assert len(sandbox.commands) == 1 + assert "rm" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_remove_file_not_found(self) -> None: + sandbox = ConcreteShellSandbox() + with pytest.raises(FileNotFoundError): + await sandbox.remove_file("/tmp/fail.txt") + + @pytest.mark.asyncio + async def test_list_files_returns_file_info(self) -> None: + sandbox = ConcreteShellSandbox() + files = await sandbox.list_files("/tmp") + assert len(sandbox.commands) == 1 + assert "ls" in sandbox.commands[0] + # The mock returns a string output, so list_files parses it + for f in files: + assert isinstance(f, FileInfo) + + @pytest.mark.asyncio + async def test_list_files_not_found(self) -> None: + sandbox = ConcreteShellSandbox() + with pytest.raises(FileNotFoundError): + await sandbox.list_files("/tmp/fail") + + @pytest.mark.asyncio + async def test_list_files_size_is_none(self) -> None: + """ShellBasedSandbox.list_files returns size=None (cannot determine from ls).""" + sandbox = ConcreteShellSandbox() + files = await sandbox.list_files("/tmp") + for f in files: + assert f.size is None + + +class TestShellBasedListFilesRealisticOutput: + @pytest.mark.asyncio + async def test_list_files_parses_directory_indicator(self) -> None: + class RealisticLsSandbox(ShellBasedSandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + stdout = ".\n..\nsubdir/\n.hidden_dir/\nfile.txt\nscript.py*\nlink.txt@\npipe_file|\n" + yield StreamChunk(data=stdout) + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + sandbox = RealisticLsSandbox() + files = await sandbox.list_files("/some/path") + + names = [f.name for f in files] + is_dir_map = {f.name: f.is_dir for f in files} + + assert "subdir" in names + assert is_dir_map["subdir"] is True + assert ".hidden_dir" in names + assert is_dir_map[".hidden_dir"] is True + assert "file.txt" in names + assert is_dir_map["file.txt"] is False + assert "script.py" in names + assert is_dir_map["script.py"] is False + assert "link.txt" in names + assert is_dir_map["link.txt"] is False + assert "pipe_file" in names + assert is_dir_map["pipe_file"] is False + assert "." not in names + assert ".." not in names + + @pytest.mark.asyncio + async def test_list_files_empty_directory(self) -> None: + class EmptyLsSandbox(ShellBasedSandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + stdout = "./\n../\n" + yield StreamChunk(data=stdout) + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + sandbox = EmptyLsSandbox() + files = await sandbox.list_files("/empty") + assert files == [] + + @pytest.mark.asyncio + async def test_list_files_files_only_no_indicators(self) -> None: + class PlainLsSandbox(ShellBasedSandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + stdout = "readme.md\nsetup.py\nrequirements.txt\n" + yield StreamChunk(data=stdout) + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + sandbox = PlainLsSandbox() + files = await sandbox.list_files("/project") + names = [f.name for f in files] + assert names == ["readme.md", "setup.py", "requirements.txt"] + assert all(not f.is_dir for f in files) + assert all(f.size is None for f in files) # shell-based has no size info + + +class TestShellBasedWriteFileParentDirs: + @pytest.mark.asyncio + async def test_write_file_command_includes_mkdir(self) -> None: + class CommandCapture(ShellBasedSandbox): + def __init__(self) -> None: + self.commands: list[str] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.commands.append(command) + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + sandbox = CommandCapture() + await sandbox.write_file("/tmp/deep/nested/file.txt", b"content") + assert len(sandbox.commands) == 1 + assert "mkdir -p" in sandbox.commands[0] + assert "base64 -d" in sandbox.commands[0] + + +class TestShellBasedBase64Encoding: + @pytest.mark.asyncio + async def test_write_file_encodes_exact_base64(self) -> None: + import base64 as b64 + + class CommandCapture(ShellBasedSandbox): + def __init__(self) -> None: + self.commands: list[str] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.commands.append(command) + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + sandbox = CommandCapture() + original_content = b"hello world with special chars: \x00\xff\n\t" + await sandbox.write_file("/tmp/test.bin", original_content) + expected_b64 = b64.b64encode(original_content).decode("ascii") + assert expected_b64 in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_decodes_base64_correctly(self) -> None: + import base64 as b64 + + original_content = b"\x89PNG\r\n\x1a\n\x00\x00binary" + + class Base64Sandbox(ShellBasedSandbox): + async def execute_streaming( + self, command: str, timeout: int | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + stdout = b64.b64encode(original_content).decode("ascii") + "\n" + yield StreamChunk(data=stdout) + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + sandbox = Base64Sandbox() + content = await sandbox.read_file("/tmp/image.png") + assert content == original_content + + +class TestShellBasedSandboxCwdPassthrough: + """Test that ShellBasedSandbox passes cwd through to execute_streaming.""" + + @pytest.mark.asyncio + async def test_execute_code_streaming_passes_cwd(self) -> None: + """execute_code_streaming should forward cwd to execute_streaming.""" + + class CwdTrackingSandbox(ShellBasedSandbox): + def __init__(self) -> None: + self.received_cwds: list[str | None] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.received_cwds.append(cwd) + yield ExecutionResult(exit_code=0, stdout="ok", stderr="") + + sandbox = CwdTrackingSandbox() + await sandbox.execute_code("print(1)", language="python", cwd="/custom/dir") + assert sandbox.received_cwds == ["/custom/dir"] + + @pytest.mark.asyncio + async def test_execute_code_streaming_passes_none_cwd_by_default(self) -> None: + """When no cwd is provided, None should be passed through.""" + + class CwdTrackingSandbox(ShellBasedSandbox): + def __init__(self) -> None: + self.received_cwds: list[str | None] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.received_cwds.append(cwd) + yield ExecutionResult(exit_code=0, stdout="ok", stderr="") + + sandbox = CwdTrackingSandbox() + await sandbox.execute_code("print(1)", language="python") + assert sandbox.received_cwds == [None] + + +class TestNonStreamingConveniencePassesCwd: + """Test that non-streaming execute/execute_code pass cwd to streaming methods.""" + + @pytest.mark.asyncio + async def test_execute_passes_cwd_to_streaming(self) -> None: + """The non-streaming execute() should forward cwd to execute_streaming().""" + + class CwdTrackingSandbox(ShellBasedSandbox): + def __init__(self) -> None: + self.received_cwds: list[str | None] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.received_cwds.append(cwd) + yield ExecutionResult(exit_code=0, stdout="ok", stderr="") + + sandbox = CwdTrackingSandbox() + result = await sandbox.execute("echo hi", cwd="/some/path") + assert sandbox.received_cwds == ["/some/path"] + assert result.exit_code == 0 + + @pytest.mark.asyncio + async def test_execute_code_passes_cwd_to_streaming(self) -> None: + """The non-streaming execute_code() should forward cwd to execute_code_streaming().""" + + class CwdTrackingSandbox(ShellBasedSandbox): + def __init__(self) -> None: + self.received_cwds: list[str | None] = [] + + async def execute_streaming( + self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any + ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]: + self.received_cwds.append(cwd) + yield ExecutionResult(exit_code=0, stdout="ok", stderr="") + + sandbox = CwdTrackingSandbox() + result = await sandbox.execute_code("print(1)", language="python", cwd="/code/dir") + assert sandbox.received_cwds == ["/code/dir"] + assert result.exit_code == 0 diff --git a/tests/strands/sandbox/test_host.py b/tests/strands/sandbox/test_host.py new file mode 100644 index 000000000..8c08ed2cf --- /dev/null +++ b/tests/strands/sandbox/test_host.py @@ -0,0 +1,473 @@ +"""Tests for the HostSandbox implementation.""" + +import asyncio +import os +from pathlib import Path + +import pytest + +from strands.sandbox.base import ExecutionResult, FileInfo, Sandbox, StreamChunk +from strands.sandbox.host import HostSandbox + + +class TestHostSandboxInit: + def test_default_working_dir(self) -> None: + sandbox = HostSandbox() + assert sandbox.working_dir == os.getcwd() + + def test_custom_working_dir(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + assert sandbox.working_dir == str(tmp_path) + + def test_extends_sandbox_directly(self) -> None: + """HostSandbox extends Sandbox directly, not ShellBasedSandbox.""" + sandbox = HostSandbox() + assert isinstance(sandbox, Sandbox) + + def test_does_not_extend_shell_based_sandbox(self) -> None: + """HostSandbox must NOT inherit from ShellBasedSandbox.""" + from strands.sandbox.shell_based import ShellBasedSandbox + + sandbox = HostSandbox() + assert not isinstance(sandbox, ShellBasedSandbox) + + +class TestHostSandboxResolvePath: + def test_resolve_relative_path(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + resolved = sandbox._resolve_path("subdir/file.txt") + expected = Path(str(tmp_path)) / "subdir" / "file.txt" + assert resolved == expected + + def test_resolve_absolute_path(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + abs_path = str(Path(str(tmp_path)) / "absolute" / "path.txt") + resolved = sandbox._resolve_path(abs_path) + assert str(resolved) == abs_path + + +class TestHostSandboxExecute: + @pytest.mark.asyncio + async def test_execute_echo(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("echo hello") + assert result.exit_code == 0 + assert result.stdout.strip() == "hello" + assert result.stderr == "" + + @pytest.mark.asyncio + async def test_execute_streams_chunks(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + chunks: list[StreamChunk | ExecutionResult] = [] + async for chunk in sandbox.execute_streaming("echo line1 && echo line2"): + chunks.append(chunk) + str_chunks = [c.data for c in chunks if isinstance(c, StreamChunk)] + result_chunks = [c for c in chunks if isinstance(c, ExecutionResult)] + assert len(result_chunks) == 1 + assert len(str_chunks) >= 1 + combined = "".join(str_chunks) + assert "line1" in combined + assert "line2" in combined + + @pytest.mark.asyncio + async def test_execute_failure(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("exit 42") + assert result.exit_code == 42 + + @pytest.mark.asyncio + async def test_execute_stderr(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("echo error >&2") + assert result.exit_code == 0 + assert result.stderr.strip() == "error" + + @pytest.mark.asyncio + async def test_execute_uses_working_dir(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + # Use python to print cwd instead of 'pwd' for cross-platform compatibility. + # On Windows, 'pwd' (via Git Bash) returns MSYS-style paths (/c/Users/...) + # which don't match the native Windows path (C:\Users\...). + result = await sandbox.execute('python -c "import os; print(os.getcwd())"') + assert result.exit_code == 0 + assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(tmp_path)) + + @pytest.mark.asyncio + async def test_execute_timeout(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(asyncio.TimeoutError): + await sandbox.execute("sleep 10", timeout=1) + + @pytest.mark.asyncio + async def test_execute_no_timeout(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("echo fast", timeout=None) + assert result.exit_code == 0 + assert result.stdout.strip() == "fast" + + @pytest.mark.asyncio + async def test_execute_long_output_without_newlines(self, tmp_path: object) -> None: + """Bug 2 fix: long output lines (>64KB) no longer crash.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute("python3 -c \"import sys; sys.stdout.write('A' * 131072)\"") + assert result.exit_code == 0 + assert len(result.stdout) == 131072 + + +class TestHostSandboxExecuteCode: + @pytest.mark.asyncio + async def test_execute_python_code(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute_code("print('hello from python')", language="python") + assert result.exit_code == 0 + assert result.stdout.strip() == "hello from python" + + @pytest.mark.asyncio + async def test_execute_code_streams(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + chunks: list[StreamChunk | ExecutionResult] = [] + async for chunk in sandbox.execute_code_streaming("print('line1')\nprint('line2')", language="python"): + chunks.append(chunk) + assert isinstance(chunks[-1], ExecutionResult) + combined = "".join(c.data for c in chunks if isinstance(c, StreamChunk)) + assert "line1" in combined + assert "line2" in combined + + @pytest.mark.asyncio + async def test_execute_python_code_error(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute_code("raise ValueError('test error')", language="python") + assert result.exit_code != 0 + assert "ValueError" in result.stderr + + @pytest.mark.asyncio + async def test_execute_python_multiline(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + code = "x = 42\nprint(f'x = {x}')" + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + assert "x = 42" in result.stdout + + @pytest.mark.asyncio + async def test_execute_code_rejects_unsafe_language(self, tmp_path: object) -> None: + """Language parameter with shell metacharacters raises ValueError.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(ValueError, match="unsafe characters"): + await sandbox.execute_code("print(1)", language="python; rm -rf /") + + @pytest.mark.asyncio + async def test_execute_code_accepts_valid_languages(self, tmp_path: object) -> None: + """Valid language names with dots, hyphens, underscores pass validation.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + # python3.12-like names should be accepted + # (may fail to execute if interpreter not found, but validation passes) + with pytest.raises(ValueError, match="unsafe characters"): + await sandbox.execute_code("1", language="python; echo pwned") + # These should NOT raise ValueError (may raise FileNotFoundError at exec time) + try: + await sandbox.execute_code("1", language="python3.12") + except Exception as e: + # FileNotFoundError or non-zero exit code is expected, NOT ValueError + assert not isinstance(e, ValueError) + + @pytest.mark.asyncio + async def test_execute_code_uses_subprocess_exec(self, tmp_path: object) -> None: + """Verify code is passed directly to interpreter, not through shell.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + # Code with shell metacharacters should be passed literally to python + code = "import sys; print(sys.argv)" + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + + @pytest.mark.asyncio + async def test_execute_code_uses_working_dir(self, tmp_path: object) -> None: + """Code execution should use the sandbox working directory.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute_code("import os; print(os.getcwd())", language="python") + assert result.exit_code == 0 + assert result.stdout.strip() == str(tmp_path) + + @pytest.mark.asyncio + async def test_execute_code_with_quotes(self, tmp_path: object) -> None: + """Code with all types of quotes should work (no shell quoting needed).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + code = """ +x = "hello 'world'" +y = 'hello "world"' +print(x) +print(y) +""" + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + assert "hello 'world'" in result.stdout + assert 'hello "world"' in result.stdout + + @pytest.mark.asyncio + async def test_execute_code_with_backslashes(self, tmp_path: object) -> None: + """Code with backslashes should work (no shell escaping needed).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + code = 'print("path\\\\to\\\\file")' + result = await sandbox.execute_code(code, language="python") + assert result.exit_code == 0 + assert "path\\to\\file" in result.stdout + + +class TestHostSandboxFileOps: + @pytest.mark.asyncio + async def test_write_and_read_file(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("test.txt", b"hello world") + content = await sandbox.read_file("test.txt") + assert content == b"hello world" + + @pytest.mark.asyncio + async def test_read_file_absolute_path(self, tmp_path: object) -> None: + test_file = tmp_path / "abs_test.txt" # type: ignore[operator] + test_file.write_bytes(b"absolute content") + sandbox = HostSandbox(working_dir=str(tmp_path)) + content = await sandbox.read_file(str(test_file)) + assert content == b"absolute content" + + @pytest.mark.asyncio + async def test_read_file_not_found(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(FileNotFoundError): + await sandbox.read_file("nonexistent.txt") + + @pytest.mark.asyncio + async def test_write_file_creates_directories(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("subdir/nested/test.txt", b"nested content") + content = await sandbox.read_file("subdir/nested/test.txt") + assert content == b"nested content" + + @pytest.mark.asyncio + async def test_write_file_absolute_path(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + abs_path = str(tmp_path / "abs_write.txt") # type: ignore[operator] + await sandbox.write_file(abs_path, b"absolute write") + content = await sandbox.read_file(abs_path) + assert content == b"absolute write" + + @pytest.mark.asyncio + async def test_write_file_unicode(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_text("unicode.txt", "héllo wörld 🌍") + content = await sandbox.read_text("unicode.txt") + assert content == "héllo wörld 🌍" + + @pytest.mark.asyncio + async def test_list_files(self, tmp_path: object) -> None: + (tmp_path / "file1.txt").write_text("a") # type: ignore[operator] + (tmp_path / "file2.txt").write_text("b") # type: ignore[operator] + (tmp_path / "file3.py").write_text("c") # type: ignore[operator] + + sandbox = HostSandbox(working_dir=str(tmp_path)) + files = await sandbox.list_files(".") + names = sorted([f.name for f in files]) + assert names == ["file1.txt", "file2.txt", "file3.py"] + # All should be FileInfo instances + for f in files: + assert isinstance(f, FileInfo) + assert f.is_dir is False + + @pytest.mark.asyncio + async def test_list_files_includes_hidden_files(self, tmp_path: object) -> None: + """list_files uses os.listdir which includes hidden files (unlike ls -1).""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("visible.txt", b"visible") + await sandbox.write_file(".hidden", b"hidden") + + files = await sandbox.list_files(".") + names = [f.name for f in files] + assert "visible.txt" in names + assert ".hidden" in names # Native Python includes dotfiles! + + @pytest.mark.asyncio + async def test_list_files_sorted(self, tmp_path: object) -> None: + """list_files returns sorted results for deterministic ordering.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("zebra.txt", b"z") + await sandbox.write_file("apple.txt", b"a") + await sandbox.write_file("mango.txt", b"m") + + files = await sandbox.list_files(".") + names = [f.name for f in files] + assert names == ["apple.txt", "mango.txt", "zebra.txt"] + + @pytest.mark.asyncio + async def test_list_files_empty_dir(self, tmp_path: object) -> None: + empty_dir = tmp_path / "empty" # type: ignore[operator] + empty_dir.mkdir() + sandbox = HostSandbox(working_dir=str(tmp_path)) + files = await sandbox.list_files("empty") + assert files == [] # Empty list of FileInfo + + @pytest.mark.asyncio + async def test_list_files_not_found(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + with pytest.raises(FileNotFoundError): + await sandbox.list_files("nonexistent") + + +class TestHostSandboxExecuteCodeErrorHandling: + """Tests for execute_code error handling (FileNotFoundError fix).""" + + @pytest.mark.asyncio + async def test_execute_code_nonexistent_language_returns_exit_127(self, tmp_path: object) -> None: + """execute_code with non-existent language returns exit 127 instead of crashing.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute_code("1", language="nonexistent-lang-12345") + assert result.exit_code == 127 + assert "not found" in result.stderr.lower() + + @pytest.mark.asyncio + async def test_execute_code_nonexistent_language_streams_result(self, tmp_path: object) -> None: + """execute_code yields an ExecutionResult even for non-existent languages.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + chunks: list = [] + async for chunk in sandbox.execute_code_streaming("1", language="nonexistent-lang-xyz"): + chunks.append(chunk) + assert len(chunks) == 1 + assert isinstance(chunks[0], ExecutionResult) + assert chunks[0].exit_code == 127 + + +class TestHostSandboxBinaryIO: + """Tests for binary file I/O (bytes-native read/write).""" + + @pytest.mark.asyncio + async def test_write_and_read_binary(self, tmp_path: object) -> None: + """Binary content (e.g., PNG header) round-trips correctly.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + binary_content = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" + await sandbox.write_file("image.png", binary_content) + read_back = await sandbox.read_file("image.png") + assert read_back == binary_content + + @pytest.mark.asyncio + async def test_read_text_convenience(self, tmp_path: object) -> None: + """read_text decodes bytes to string.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("test.txt", b"hello world") + text = await sandbox.read_text("test.txt") + assert text == "hello world" + assert isinstance(text, str) + + @pytest.mark.asyncio + async def test_write_text_convenience(self, tmp_path: object) -> None: + """write_text encodes string to bytes.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_text("test.txt", "hello world") + read_back = await sandbox.read_file("test.txt") + assert read_back == b"hello world" + + @pytest.mark.asyncio + async def test_read_text_unicode_decode_error(self, tmp_path: object) -> None: + """read_text raises UnicodeDecodeError for binary content.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("binary.bin", b"\x89PNG\xff\xfe") + with pytest.raises(UnicodeDecodeError): + await sandbox.read_text("binary.bin") + + +class TestHostSandboxFileInfoMetadata: + """Tests for structured FileInfo returns from list_files.""" + + @pytest.mark.asyncio + async def test_list_files_directories_have_is_dir_true(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + (tmp_path / "subdir").mkdir() # type: ignore[operator] + (tmp_path / "file.txt").write_bytes(b"content") # type: ignore[operator] + files = await sandbox.list_files(".") + dir_entry = next(f for f in files if f.name == "subdir") + file_entry = next(f for f in files if f.name == "file.txt") + assert dir_entry.is_dir is True + assert file_entry.is_dir is False + + @pytest.mark.asyncio + async def test_list_files_reports_file_size(self, tmp_path: object) -> None: + sandbox = HostSandbox(working_dir=str(tmp_path)) + (tmp_path / "sized.txt").write_bytes(b"x" * 42) # type: ignore[operator] + files = await sandbox.list_files(".") + entry = files[0] + assert entry.name == "sized.txt" + assert entry.size == 42 + + +class TestHostSandboxCwd: + """Test cwd parameter on HostSandbox execution methods.""" + + @pytest.mark.asyncio + async def test_execute_with_cwd_overrides_working_dir(self, tmp_path: object) -> None: + """execute() with cwd should run in the specified directory, not working_dir.""" + base_dir = Path(str(tmp_path)) + sandbox_dir = base_dir / "sandbox_default" + sandbox_dir.mkdir() + custom_dir = base_dir / "custom_cwd" + custom_dir.mkdir() + + sandbox = HostSandbox(working_dir=str(sandbox_dir)) + result = await sandbox.execute( + 'python3 -c "import os; print(os.getcwd())"', + cwd=str(custom_dir), + ) + assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir)) + + @pytest.mark.asyncio + async def test_execute_without_cwd_uses_working_dir(self, tmp_path: object) -> None: + """execute() without cwd should use the sandbox's working_dir.""" + sandbox = HostSandbox(working_dir=str(tmp_path)) + result = await sandbox.execute( + 'python3 -c "import os; print(os.getcwd())"', + ) + assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(tmp_path)) + + @pytest.mark.asyncio + async def test_execute_code_with_cwd(self, tmp_path: object) -> None: + """execute_code() with cwd should run in the specified directory.""" + base_dir = Path(str(tmp_path)) + sandbox_dir = base_dir / "sandbox_default" + sandbox_dir.mkdir() + custom_dir = base_dir / "code_cwd" + custom_dir.mkdir() + + sandbox = HostSandbox(working_dir=str(sandbox_dir)) + result = await sandbox.execute_code( + "import os; print(os.getcwd())", + language="python3", + cwd=str(custom_dir), + ) + assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir)) + + @pytest.mark.asyncio + async def test_execute_streaming_with_cwd(self, tmp_path: object) -> None: + """execute_streaming() with cwd should run in the specified directory.""" + + base_dir = Path(str(tmp_path)) + custom_dir = base_dir / "stream_cwd" + custom_dir.mkdir() + + sandbox = HostSandbox(working_dir=str(base_dir)) + chunks = [] + async for chunk in sandbox.execute_streaming( + 'python3 -c "import os; print(os.getcwd())"', + cwd=str(custom_dir), + ): + chunks.append(chunk) + + # Find the ExecutionResult + result = next(c for c in chunks if isinstance(c, ExecutionResult)) + assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir)) + + @pytest.mark.asyncio + async def test_cwd_creates_directory_if_not_exists(self, tmp_path: object) -> None: + """cwd directory should be created if it doesn't exist.""" + base_dir = Path(str(tmp_path)) + new_dir = base_dir / "brand_new_dir" + assert not new_dir.exists() + + sandbox = HostSandbox(working_dir=str(base_dir)) + result = await sandbox.execute("echo ok", cwd=str(new_dir)) + assert result.exit_code == 0 + assert new_dir.exists() diff --git a/tests/strands/sandbox/test_noop.py b/tests/strands/sandbox/test_noop.py new file mode 100644 index 000000000..dba7da60a --- /dev/null +++ b/tests/strands/sandbox/test_noop.py @@ -0,0 +1,85 @@ +"""Tests for the NoOpSandbox implementation.""" + +import pytest + +from strands.sandbox.base import Sandbox +from strands.sandbox.noop import NoOpSandbox + + +class TestNoOpSandbox: + def test_is_sandbox_instance(self) -> None: + sandbox = NoOpSandbox() + assert isinstance(sandbox, Sandbox) + + @pytest.mark.asyncio + async def test_execute_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.execute("echo hello") + + @pytest.mark.asyncio + async def test_execute_streaming_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + async for _ in sandbox.execute_streaming("echo hello"): + pass + + @pytest.mark.asyncio + async def test_execute_code_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.execute_code("print(1)", language="python") + + @pytest.mark.asyncio + async def test_execute_code_streaming_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + async for _ in sandbox.execute_code_streaming("print(1)", language="python"): + pass + + @pytest.mark.asyncio + async def test_read_file_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.read_file("test.txt") + + @pytest.mark.asyncio + async def test_write_file_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.write_file("test.txt", b"content") + + @pytest.mark.asyncio + async def test_remove_file_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.remove_file("test.txt") + + @pytest.mark.asyncio + async def test_list_files_raises_not_implemented(self) -> None: + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.list_files(".") + + @pytest.mark.asyncio + async def test_read_text_raises_not_implemented(self) -> None: + """read_text delegates to read_file, which raises.""" + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.read_text("test.txt") + + @pytest.mark.asyncio + async def test_write_text_raises_not_implemented(self) -> None: + """write_text delegates to write_file, which raises.""" + sandbox = NoOpSandbox() + with pytest.raises(NotImplementedError, match="Sandbox is disabled"): + await sandbox.write_text("test.txt", "content") + + def test_agent_with_noop_sandbox(self) -> None: + """Agent can be constructed with NoOpSandbox.""" + from strands.agent.agent import Agent + + sandbox = NoOpSandbox() + agent = Agent(model="test", sandbox=sandbox) + assert agent.sandbox is sandbox + assert isinstance(agent.sandbox, NoOpSandbox)