diff --git a/AGENTS.md b/AGENTS.md
index 8835b45c8..3facc2c4d 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -117,6 +117,12 @@ strands-agents/
│ │ ├── repository_session_manager.py # Repository pattern
│ │ └── session_repository.py # Storage interface
│ │
+│ │
+│ ├── sandbox/ # Sandbox abstraction for code execution
+│ │ ├── base.py # Sandbox ABC, ExecutionResult, FileInfo, OutputFile
+│ │ ├── host.py # HostSandbox (native Python, default)
+│ │ ├── shell_based.py # ShellBasedSandbox (for remote/container envs)
+│ │ └── noop.py # NoOpSandbox (raises NotImplementedError for all ops)
│ ├── telemetry/ # Observability (OpenTelemetry)
│ │ ├── tracer.py # Tracing
│ │ ├── metrics.py # Metrics collection
@@ -183,6 +189,7 @@ strands-agents/
│ ├── types/
│ ├── session/
│ ├── telemetry/
+│ ├── sandbox/
│ ├── hooks/
│ ├── plugins/
│ ├── handlers/
diff --git a/src/strands/__init__.py b/src/strands/__init__.py
index 6625ac41f..7fd06e02a 100644
--- a/src/strands/__init__.py
+++ b/src/strands/__init__.py
@@ -1,10 +1,14 @@
"""A framework for building, deploying, and managing AI agents."""
-from . import agent, models, telemetry, types
+from . import agent, models, sandbox, telemetry, types
from .agent.agent import Agent
from .agent.base import AgentBase
from .event_loop._retry import ModelRetryStrategy
from .plugins import Plugin
+from .sandbox.base import ExecutionResult, FileInfo, OutputFile, Sandbox, StreamChunk, StreamType
+from .sandbox.host import HostSandbox
+from .sandbox.noop import NoOpSandbox
+from .sandbox.shell_based import ShellBasedSandbox
from .tools.decorator import tool
from .types._snapshot import Snapshot
from .types.tools import ToolContext
@@ -15,11 +19,21 @@
"AgentBase",
"AgentSkills",
"agent",
+ "ExecutionResult",
+ "FileInfo",
+ "HostSandbox",
"models",
"ModelRetryStrategy",
"Plugin",
+ "OutputFile",
+ "sandbox",
+ "Sandbox",
+ "NoOpSandbox",
+ "ShellBasedSandbox",
"Skill",
"Snapshot",
+ "StreamChunk",
+ "StreamType",
"tool",
"ToolContext",
"types",
diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py
index 965969961..5e3339d08 100644
--- a/src/strands/agent/agent.py
+++ b/src/strands/agent/agent.py
@@ -29,6 +29,7 @@
from .._async import run_async
from ..event_loop._retry import ModelRetryStrategy
from ..event_loop.event_loop import INITIAL_DELAY, MAX_ATTEMPTS, MAX_DELAY, event_loop_cycle
+from ..sandbox.base import Sandbox
from ..tools._tool_helpers import generate_missing_tool_result_content
from ..types._snapshot import (
SNAPSHOT_SCHEMA_VERSION,
@@ -146,6 +147,7 @@ def __init__(
tool_executor: ToolExecutor | None = None,
retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY,
concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW,
+ sandbox: Sandbox | None = None,
):
"""Initialize the Agent with the specified configuration.
@@ -214,6 +216,9 @@ def __init__(
Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations.
Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided
only for advanced use cases where the caller understands the risks.
+ sandbox: Execution environment for agent tools. Tools access the sandbox
+ via tool_context.agent.sandbox to execute commands, code, and filesystem operations.
+ Defaults to HostSandbox() for host execution when not specified.
Raises:
ValueError: If agent id contains path separators.
@@ -298,6 +303,17 @@ def __init__(
self.tool_caller = _ToolCaller(self)
+ # Initialize sandbox for tool execution environment
+ # Default to HostSandbox() for backwards compatibility — any code that
+ # accesses agent.sandbox gets a working local execution environment.
+ # Import is deferred to avoid unconditional coupling to HostSandbox.
+ if sandbox is not None:
+ self.sandbox: Sandbox = sandbox
+ else:
+ from ..sandbox.host import HostSandbox
+
+ self.sandbox = HostSandbox()
+
self.hooks = HookRegistry()
self._plugin_registry = _PluginRegistry(self)
diff --git a/src/strands/sandbox/__init__.py b/src/strands/sandbox/__init__.py
new file mode 100644
index 000000000..8e4e4e517
--- /dev/null
+++ b/src/strands/sandbox/__init__.py
@@ -0,0 +1,30 @@
+"""Sandbox abstraction for agent code execution environments.
+
+This module provides the Sandbox interface that decouples tool logic from where code runs.
+Tools that need to execute code or access a filesystem receive a Sandbox instead of managing
+their own execution, enabling portability across local and cloud environments.
+
+Class hierarchy::
+
+ Sandbox (ABC, all abstract + helpers)
+ ├── HostSandbox — native Python methods for host execution (default)
+ ├── ShellBasedSandbox (ABC, only execute_streaming() abstract — shell-based file ops + execute_code)
+ └── NoOpSandbox — no-op implementation that disables all sandbox functionality
+"""
+
+from .base import ExecutionResult, FileInfo, OutputFile, Sandbox, StreamChunk, StreamType
+from .host import HostSandbox
+from .noop import NoOpSandbox
+from .shell_based import ShellBasedSandbox
+
+__all__ = [
+ "ExecutionResult",
+ "FileInfo",
+ "HostSandbox",
+ "NoOpSandbox",
+ "OutputFile",
+ "Sandbox",
+ "ShellBasedSandbox",
+ "StreamChunk",
+ "StreamType",
+]
diff --git a/src/strands/sandbox/base.py b/src/strands/sandbox/base.py
new file mode 100644
index 000000000..61823fe47
--- /dev/null
+++ b/src/strands/sandbox/base.py
@@ -0,0 +1,418 @@
+"""Base sandbox interface for agent code execution environments.
+
+This module defines the abstract Sandbox class and supporting dataclasses:
+
+- :class:`ExecutionResult` — result of command/code execution
+- :class:`FileInfo` — metadata about a file in the sandbox
+- :class:`OutputFile` — a file produced as output by code execution
+- :class:`StreamChunk` — a typed chunk of streaming output (stdout or stderr)
+
+Sandbox implementations provide the runtime context where tools execute code, run commands,
+and interact with a filesystem. Multiple tools share the same Sandbox instance, giving them
+a common working directory, environment variables, and filesystem.
+
+Class hierarchy::
+
+ Sandbox (ABC): All operations are abstract. Implement this for non-shell-based
+ sandboxes (e.g., API-based cloud sandboxes).
+ ShellBasedSandbox (ABC, in shell_based.py): Provides shell-based defaults for file
+ operations and code execution. Subclasses only need to implement ``execute_streaming()``.
+ NoOpSandbox (in noop.py): No-op implementation that raises NotImplementedError
+ for all operations. Use to disable sandbox functionality entirely.
+"""
+
+import logging
+from abc import ABC, abstractmethod
+from collections.abc import AsyncGenerator
+from dataclasses import dataclass, field
+from typing import Any, Literal
+
+logger = logging.getLogger(__name__)
+
+
+StreamType = Literal["stdout", "stderr"]
+"""Type of a streaming output chunk.
+
+Used by :class:`StreamChunk` to distinguish stdout from stderr output
+during streaming execution.
+
+- ``"stdout"``: Standard output from the command or code.
+- ``"stderr"``: Standard error from the command or code.
+"""
+
+
+@dataclass
+class StreamChunk:
+ """A typed chunk of streaming output from command or code execution.
+
+ Allows consumers to distinguish stdout from stderr during streaming,
+ enabling richer UIs and more precise output handling.
+
+ Attributes:
+ data: The text content of the chunk.
+ stream_type: Whether this chunk is from stdout or stderr.
+ """
+
+ data: str
+ stream_type: StreamType = "stdout"
+
+
+@dataclass
+class FileInfo:
+ """Metadata about a file or directory in a sandbox.
+
+ Provides minimal structured information that lets tools distinguish
+ files from directories and report sizes. Fields ``is_dir`` and ``size``
+ are optional — implementations that cannot provide accurate data
+ return ``None`` instead of lying.
+
+ Attributes:
+ name: The file or directory name (not the full path).
+ is_dir: Whether this entry is a directory. ``None`` if unknown.
+ size: File size in bytes. ``None`` if unknown.
+ """
+
+ name: str
+ is_dir: bool | None = None
+ size: int | None = None
+
+
+@dataclass
+class OutputFile:
+ """A file produced as output by code execution.
+
+ Used to carry binary artifacts (images, charts, PDFs, compiled files)
+ from sandbox execution back to the agent. Tools can convert these
+ to Strands' ``ImageContent`` or ``DocumentContent`` for the model.
+
+ Follows ADK's ``File`` pattern — simple, portable, MIME-typed.
+
+ Attributes:
+ name: Filename (e.g., ``"plot.png"``).
+ content: Raw file content as bytes.
+ mime_type: MIME type of the content (e.g., ``"image/png"``).
+ """
+
+ name: str
+ content: bytes
+ mime_type: str = "application/octet-stream"
+
+
+@dataclass
+class ExecutionResult:
+ """Result of code or command execution in a sandbox.
+
+ Attributes:
+ exit_code: The exit code of the command or code execution.
+ stdout: Standard output captured from execution.
+ stderr: Standard error captured from execution.
+ output_files: Files produced by the execution (e.g., images, charts).
+ Shell-based sandboxes typically return an empty list. Jupyter-backed
+ or API-backed sandboxes can populate this with generated artifacts.
+ """
+
+ exit_code: int
+ stdout: str
+ stderr: str
+ output_files: list[OutputFile] = field(default_factory=list)
+
+
+class Sandbox(ABC):
+ """Abstract execution environment for agent tools.
+
+ A Sandbox provides the runtime context where tools execute code,
+ run commands, and interact with a filesystem. Multiple tools
+ share the same Sandbox instance, giving them a common working
+ directory, environment variables, and filesystem.
+
+ The sandbox follows the SDK's ``invoke_async`` / ``stream_async``
+ pattern: streaming methods (``execute_streaming``, ``execute_code_streaming``)
+ are the abstract primitives that implementations must provide.
+ Non-streaming convenience methods (``execute``, ``execute_code``) consume
+ the stream and return the final ``ExecutionResult``.
+
+ Streaming methods yield :class:`StreamChunk` objects that carry both
+ the text data and the stream type (stdout or stderr), followed by a
+ final :class:`ExecutionResult`. This allows consumers to distinguish
+ between stdout and stderr during streaming.
+
+ All abstract methods accept ``**kwargs`` for forward compatibility —
+ new parameters with defaults can be added in future versions without
+ breaking existing implementations.
+
+ Example:
+ Non-streaming (common case)::
+
+ from strands.sandbox import HostSandbox
+
+ sandbox = HostSandbox(working_dir="/tmp/my-sandbox")
+ result = await sandbox.execute("echo hello")
+ print(result.stdout)
+
+ Streaming with stdout/stderr distinction::
+
+ async for chunk in sandbox.execute_streaming("echo hello"):
+ if isinstance(chunk, StreamChunk):
+ if chunk.stream_type == "stdout":
+ print(f"[stdout] {chunk.data}", end="")
+ else:
+ print(f"[stderr] {chunk.data}", end="")
+ elif isinstance(chunk, ExecutionResult):
+ print(f"Exit code: {chunk.exit_code}")
+
+ With custom working directory::
+
+ result = await sandbox.execute("ls -la", cwd="/tmp/other-dir")
+ """
+
+ # ---- Streaming methods (abstract primitives) ----
+
+ @abstractmethod
+ async def execute_streaming(
+ self,
+ command: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Execute a shell command, streaming output.
+
+ Yields :class:`StreamChunk` objects for stdout and stderr output
+ as it arrives. The final yield is an :class:`ExecutionResult` with
+ the exit code and complete output.
+
+ Args:
+ command: The shell command to execute.
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for command execution. ``None`` means use the
+ sandbox's default working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Yields:
+ :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`.
+ """
+ ...
+ # Make the method signature an async generator for type checkers.
+ # Concrete subclasses must yield at least one ExecutionResult.
+ yield # type: ignore[misc] # pragma: no cover
+
+ @abstractmethod
+ async def execute_code_streaming(
+ self,
+ code: str,
+ language: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Execute code in the sandbox, streaming output.
+
+ Args:
+ code: The source code to execute.
+ language: The programming language interpreter to use.
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for code execution. ``None`` means use the
+ sandbox's default working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Yields:
+ :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`.
+ """
+ ...
+ yield # type: ignore[misc] # pragma: no cover
+
+ @abstractmethod
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ """Read a file from the sandbox filesystem.
+
+ Returns raw bytes to support both text and binary files (images,
+ PDFs, compiled artifacts). Use :meth:`read_text` for a convenience
+ wrapper that decodes to a string.
+
+ Args:
+ path: Path to the file to read.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ The file contents as bytes.
+
+ Raises:
+ FileNotFoundError: If the file does not exist or cannot be read.
+ """
+ ...
+
+ @abstractmethod
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ """Write a file to the sandbox filesystem.
+
+ Accepts raw bytes to support both text and binary content. Use
+ :meth:`write_text` for a convenience wrapper that encodes a string.
+
+ Implementations should create parent directories if they do not exist.
+ :class:`~strands.sandbox.host.HostSandbox` does this natively
+ via :func:`pathlib.Path.mkdir`. Shell-based implementations should
+ include a ``mkdir -p`` before writing.
+
+ Args:
+ path: Path to the file to write.
+ content: The content to write as bytes.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Raises:
+ IOError: If the file cannot be written.
+ """
+ ...
+
+ @abstractmethod
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ """Remove a file from the sandbox filesystem.
+
+ Args:
+ path: Path to the file to remove.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ """
+ ...
+
+ @abstractmethod
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ """List files in a sandbox directory.
+
+ Returns structured :class:`FileInfo` entries with metadata (name,
+ is_dir, size) so tools can make informed decisions about files.
+ Fields ``is_dir`` and ``size`` may be ``None`` if the implementation
+ cannot provide accurate data.
+
+ Args:
+ path: Path to the directory to list.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ A list of :class:`FileInfo` entries for the directory contents.
+
+ Raises:
+ FileNotFoundError: If the directory does not exist.
+ """
+ ...
+
+ # ---- Non-streaming convenience methods ----
+
+ async def execute(
+ self,
+ command: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> ExecutionResult:
+ """Execute a shell command and return the result.
+
+ Convenience wrapper that consumes :meth:`execute_streaming` and
+ returns the final :class:`ExecutionResult`. This is the common case —
+ use :meth:`execute_streaming` when you need to process output as it
+ arrives.
+
+ Implementations that want an optimized non-streaming path can
+ override this method directly.
+
+ Args:
+ command: The shell command to execute.
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for command execution. ``None`` means use the
+ sandbox's default working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ The final ExecutionResult from execution.
+
+ Raises:
+ RuntimeError: If execute_streaming() did not yield an ExecutionResult.
+ """
+ result = None
+ async for chunk in self.execute_streaming(command, timeout=timeout, cwd=cwd, **kwargs):
+ if isinstance(chunk, ExecutionResult):
+ result = chunk
+ if result is None:
+ raise RuntimeError("execute_streaming() did not yield an ExecutionResult")
+ return result
+
+ async def execute_code(
+ self,
+ code: str,
+ language: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> ExecutionResult:
+ """Execute code and return the result.
+
+ Convenience wrapper that consumes :meth:`execute_code_streaming` and
+ returns the final :class:`ExecutionResult`. This is the common case —
+ use :meth:`execute_code_streaming` when you need to process output as
+ it arrives.
+
+ Implementations that want an optimized non-streaming path can
+ override this method directly.
+
+ Args:
+ code: The source code to execute.
+ language: The programming language interpreter to use.
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for code execution. ``None`` means use the
+ sandbox's default working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ The final ExecutionResult from execution.
+
+ Raises:
+ RuntimeError: If execute_code_streaming() did not yield an ExecutionResult.
+ """
+ result = None
+ async for chunk in self.execute_code_streaming(code, language=language, timeout=timeout, cwd=cwd, **kwargs):
+ if isinstance(chunk, ExecutionResult):
+ result = chunk
+ if result is None:
+ raise RuntimeError("execute_code_streaming() did not yield an ExecutionResult")
+ return result
+
+ # ---- Text convenience methods ----
+
+ async def read_text(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str:
+ """Read a text file from the sandbox filesystem.
+
+ Convenience wrapper around :meth:`read_file` that decodes bytes
+ to a string.
+
+ Args:
+ path: Path to the file to read.
+ encoding: Text encoding to use. Defaults to UTF-8.
+ **kwargs: Additional keyword arguments passed to :meth:`read_file`.
+
+ Returns:
+ The file contents as a string.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ UnicodeDecodeError: If the file cannot be decoded with the given encoding.
+ """
+ data = await self.read_file(path, **kwargs)
+ return data.decode(encoding)
+
+ async def write_text(self, path: str, content: str, encoding: str = "utf-8", **kwargs: Any) -> None:
+ """Write a text file to the sandbox filesystem.
+
+ Convenience wrapper around :meth:`write_file` that encodes a string
+ to bytes.
+
+ Args:
+ path: Path to the file to write.
+ content: The text content to write.
+ encoding: Text encoding to use. Defaults to UTF-8.
+ **kwargs: Additional keyword arguments passed to :meth:`write_file`.
+
+ Raises:
+ IOError: If the file cannot be written.
+ """
+ await self.write_file(path, content.encode(encoding), **kwargs)
diff --git a/src/strands/sandbox/host.py b/src/strands/sandbox/host.py
new file mode 100644
index 000000000..29239c42e
--- /dev/null
+++ b/src/strands/sandbox/host.py
@@ -0,0 +1,380 @@
+"""Host sandbox implementation for host-process execution.
+
+This module implements the HostSandbox, which executes commands and code
+on the local host using asyncio subprocesses and native Python filesystem
+operations. It extends Sandbox directly — all file and code operations
+use proper Python methods (pathlib, os, subprocess) instead of shell commands.
+
+This is the default sandbox used when no explicit sandbox is configured.
+"""
+
+import asyncio
+import logging
+import os
+import re
+from collections.abc import AsyncGenerator
+from pathlib import Path
+from typing import Any
+
+from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk
+
+logger = logging.getLogger(__name__)
+
+#: Maximum number of bytes to read at once from a subprocess stream.
+#: Prevents memory exhaustion from extremely long lines without newlines.
+_READ_CHUNK_SIZE = 64 * 1024 # 64 KiB
+
+#: Pattern for validating language/interpreter names.
+#: Allows alphanumeric characters, dots, hyphens, and underscores.
+_LANGUAGE_PATTERN = re.compile(r"^[a-zA-Z0-9._-]+$")
+
+
+async def _read_stream(
+ stream: asyncio.StreamReader | None,
+ collected: list[str],
+) -> None:
+ """Read all chunks from a subprocess stream into a list.
+
+ Reads in chunks of up to ``_READ_CHUNK_SIZE`` bytes to handle
+ binary output and extremely long lines without newlines.
+ Non-UTF-8 bytes are replaced with the Unicode replacement character
+ to prevent ``UnicodeDecodeError`` from crashing the sandbox.
+
+ Args:
+ stream: The subprocess stdout or stderr stream.
+ collected: List to append decoded string chunks to.
+ """
+ if stream is None:
+ return
+ while True:
+ chunk_bytes = await stream.read(_READ_CHUNK_SIZE)
+ if not chunk_bytes:
+ break
+ collected.append(chunk_bytes.decode(errors="replace"))
+
+
+class HostSandbox(Sandbox):
+ """Execute code and commands on the local host using native Python methods.
+
+ Uses asyncio subprocesses for command execution, ``subprocess_exec`` for
+ code execution (avoiding shell intermediaries), and native filesystem
+ operations (``pathlib``, ``os``) for all file I/O.
+
+ This sandbox extends :class:`Sandbox` directly — it does **not**
+ inherit from :class:`ShellBasedSandbox`. All operations use proper,
+ safe Python methods instead of piping through shell commands.
+
+ Args:
+ working_dir: The working directory for command execution.
+ Defaults to the current working directory.
+
+ Example:
+ Non-streaming (common case)::
+
+ from strands.sandbox import HostSandbox
+
+ sandbox = HostSandbox(working_dir="/tmp/my-sandbox")
+ result = await sandbox.execute("echo hello")
+ print(result.stdout)
+
+ Streaming::
+
+ async for chunk in sandbox.execute_streaming("echo hello"):
+ if isinstance(chunk, StreamChunk):
+ print(chunk.data, end="")
+ """
+
+ def __init__(self, working_dir: str | None = None) -> None:
+ """Initialize the HostSandbox.
+
+ Args:
+ working_dir: The working directory for command execution.
+ Defaults to the current working directory at construction time.
+ """
+ self.working_dir = working_dir or os.getcwd()
+
+ def _resolve_path(self, path: str) -> Path:
+ """Resolve a path relative to the working directory.
+
+ Absolute paths are returned as-is. Relative paths are resolved
+ against the working directory.
+
+ Args:
+ path: The file path to resolve.
+
+ Returns:
+ The resolved Path object.
+ """
+ if os.path.isabs(path):
+ return Path(path)
+ return Path(self.working_dir) / path
+
+ async def execute_streaming(
+ self,
+ command: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Execute a shell command on the local host, streaming output.
+
+ Reads stdout and stderr in chunks (up to 64 KiB at a time) to avoid
+ blocking on extremely long lines. Chunks are collected and yielded
+ after the command completes. The final yield is an ExecutionResult
+ with the exit code and complete captured output.
+
+ Args:
+ command: The shell command to execute.
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for this command. ``None`` means use the
+ sandbox's default ``working_dir``.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Yields:
+ :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`.
+
+ Raises:
+ asyncio.TimeoutError: If the command exceeds the timeout.
+ """
+ effective_cwd = cwd or self.working_dir
+ logger.debug("command=<%s>, timeout=<%s>, cwd=<%s> | executing local command", command, timeout, effective_cwd)
+
+ working_path = Path(effective_cwd)
+ working_path.mkdir(parents=True, exist_ok=True)
+
+ proc = await asyncio.create_subprocess_shell(
+ command,
+ cwd=effective_cwd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+
+ async for item in self._collect_and_yield(proc, timeout):
+ yield item
+
+ async def execute_code_streaming(
+ self,
+ code: str,
+ language: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Execute code on the local host using subprocess_exec (no shell intermediary).
+
+ Uses :func:`asyncio.create_subprocess_exec` to invoke the language
+ interpreter directly, passing code via the ``-c`` flag. This avoids
+ shell quoting issues entirely — the interpreter name and code are
+ passed as separate arguments to ``execvp``, not concatenated into a
+ shell command string.
+
+ The language parameter is validated against a safe pattern to prevent
+ path traversal or binary injection. If the interpreter is not found
+ on the system, an ExecutionResult with exit code 127 is returned
+ (matching the shell convention for "command not found").
+
+ Args:
+ code: The source code to execute.
+ language: The programming language interpreter to use (e.g.
+ ``"python"``, ``"python3"``, ``"node"``, ``"ruby"``).
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for code execution. ``None`` means use the
+ sandbox's default ``working_dir``.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Yields:
+ :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`.
+
+ Raises:
+ asyncio.TimeoutError: If the code execution exceeds the timeout.
+ ValueError: If the language parameter contains unsafe characters.
+ """
+ # Validate language to prevent injection via interpreter name.
+ # Only allow safe characters (alphanumeric, dots, hyphens, underscores).
+ if not _LANGUAGE_PATTERN.match(language):
+ raise ValueError(f"language parameter contains unsafe characters: {language}")
+
+ effective_cwd = cwd or self.working_dir
+ logger.debug(
+ "language=<%s>, timeout=<%s>, cwd=<%s> | executing code locally",
+ language,
+ timeout,
+ effective_cwd,
+ )
+
+ working_path = Path(effective_cwd)
+ working_path.mkdir(parents=True, exist_ok=True)
+
+ # Use create_subprocess_exec (not shell) — the interpreter and arguments
+ # are passed directly to execvp, avoiding all shell quoting issues.
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ language,
+ "-c",
+ code,
+ cwd=effective_cwd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ except FileNotFoundError:
+ yield ExecutionResult(
+ exit_code=127,
+ stdout="",
+ stderr=f"Language interpreter not found: {language}",
+ )
+ return
+
+ async for item in self._collect_and_yield(proc, timeout):
+ yield item
+
+ async def _collect_and_yield(
+ self,
+ proc: asyncio.subprocess.Process,
+ timeout: int | None,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Read stdout/stderr from a subprocess, then yield typed chunks and a final ExecutionResult.
+
+ Shared helper used by both ``execute_streaming()`` and ``execute_code_streaming()`` to
+ avoid duplicating the stream-reading, timeout-handling, and yielding logic.
+
+ Args:
+ proc: The running subprocess.
+ timeout: Maximum time in seconds to wait for output. ``None`` means no timeout.
+
+ Yields:
+ :class:`StreamChunk` objects for stdout/stderr, then a final :class:`ExecutionResult`.
+
+ Raises:
+ asyncio.TimeoutError: If the process exceeds the timeout.
+ """
+ stdout_chunks: list[str] = []
+ stderr_chunks: list[str] = []
+
+ try:
+ read_task = asyncio.gather(
+ _read_stream(proc.stdout, stdout_chunks),
+ _read_stream(proc.stderr, stderr_chunks),
+ proc.wait(),
+ )
+ await asyncio.wait_for(read_task, timeout=timeout)
+ except asyncio.TimeoutError:
+ proc.kill()
+ await proc.communicate()
+ raise
+
+ stdout_text = "".join(stdout_chunks)
+ stderr_text = "".join(stderr_chunks)
+
+ for chunk in stdout_chunks:
+ yield StreamChunk(data=chunk, stream_type="stdout")
+ for chunk in stderr_chunks:
+ yield StreamChunk(data=chunk, stream_type="stderr")
+
+ yield ExecutionResult(
+ exit_code=0 if proc.returncode is None else proc.returncode,
+ stdout=stdout_text,
+ stderr=stderr_text,
+ )
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ """Read a file from the local filesystem as raw bytes.
+
+ Uses ``asyncio.to_thread`` to avoid blocking the event loop
+ during disk I/O.
+
+ Args:
+ path: Path to the file to read. Relative paths are resolved
+ against the working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ The file contents as bytes.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ """
+ full_path = self._resolve_path(path)
+ return await asyncio.to_thread(full_path.read_bytes)
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ """Write bytes to a file on the local filesystem.
+
+ Creates parent directories if they do not exist. Uses
+ ``asyncio.to_thread`` to avoid blocking the event loop.
+
+ Args:
+ path: Path to the file to write. Relative paths are resolved
+ against the working directory.
+ content: The content to write as bytes.
+ **kwargs: Additional keyword arguments for forward compatibility.
+ """
+ full_path = self._resolve_path(path)
+
+ def _write() -> None:
+ full_path.parent.mkdir(parents=True, exist_ok=True)
+ full_path.write_bytes(content)
+
+ await asyncio.to_thread(_write)
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ """Remove a file from the local filesystem using native Python methods.
+
+ Uses ``asyncio.to_thread`` to avoid blocking the event loop.
+
+ Args:
+ path: Path to the file to remove. Relative paths are resolved
+ against the working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ """
+ full_path = self._resolve_path(path)
+ await asyncio.to_thread(full_path.unlink)
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ """List files in a directory with structured metadata.
+
+ Uses native Python methods (:func:`os.listdir`, :func:`os.stat`)
+ to return :class:`FileInfo` entries with name, is_dir, and size.
+ Results include hidden files (dotfiles) and are sorted for
+ deterministic ordering.
+
+ Uses ``asyncio.to_thread`` to avoid blocking the event loop
+ during disk I/O.
+
+ Args:
+ path: Path to the directory to list. Relative paths are resolved
+ against the working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ A sorted list of :class:`FileInfo` entries.
+
+ Raises:
+ FileNotFoundError: If the directory does not exist.
+ """
+ full_path = self._resolve_path(path)
+
+ def _list() -> list[FileInfo]:
+ if not full_path.is_dir():
+ raise FileNotFoundError(f"Directory not found: {full_path}")
+ entries = []
+ for name in sorted(os.listdir(full_path)):
+ entry_path = full_path / name
+ try:
+ stat = entry_path.stat()
+ entries.append(
+ FileInfo(
+ name=name,
+ is_dir=entry_path.is_dir(),
+ size=stat.st_size,
+ )
+ )
+ except OSError:
+ # If we can't stat the entry (e.g., broken symlink), include
+ # it with defaults
+ entries.append(FileInfo(name=name))
+ return entries
+
+ return await asyncio.to_thread(_list)
diff --git a/src/strands/sandbox/noop.py b/src/strands/sandbox/noop.py
new file mode 100644
index 000000000..d4a0b48ee
--- /dev/null
+++ b/src/strands/sandbox/noop.py
@@ -0,0 +1,73 @@
+"""No-op sandbox implementation that disables all sandbox functionality.
+
+Use ``NoOpSandbox`` to explicitly disable sandbox features on an agent.
+All operations raise ``NotImplementedError`` with a clear message.
+
+Example::
+
+ from strands import Agent
+ from strands.sandbox import NoOpSandbox
+
+ # Explicitly disable sandbox functionality
+ agent = Agent(sandbox=NoOpSandbox())
+"""
+
+from collections.abc import AsyncGenerator
+from typing import Any
+
+from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk
+
+
+class NoOpSandbox(Sandbox):
+ """No-op sandbox that raises NotImplementedError for all operations.
+
+ Use this to explicitly disable sandbox functionality on an agent.
+ Any tool that attempts to use the sandbox will get a clear error
+ indicating that sandbox is disabled, rather than silently failing.
+
+ Example::
+
+ from strands import Agent
+ from strands.sandbox import NoOpSandbox
+
+ agent = Agent(sandbox=NoOpSandbox())
+ """
+
+ async def execute_streaming(
+ self,
+ command: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot execute commands.")
+ yield # type: ignore[unreachable] # pragma: no cover
+
+ async def execute_code_streaming(
+ self,
+ code: str,
+ language: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot execute code.")
+ yield # type: ignore[unreachable] # pragma: no cover
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot read files.")
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot write files.")
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot remove files.")
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ """Raise NotImplementedError — sandbox is disabled."""
+ raise NotImplementedError("Sandbox is disabled (NoOpSandbox). Cannot list files.")
diff --git a/src/strands/sandbox/shell_based.py b/src/strands/sandbox/shell_based.py
new file mode 100644
index 000000000..f68b8ed45
--- /dev/null
+++ b/src/strands/sandbox/shell_based.py
@@ -0,0 +1,200 @@
+"""Shell-based sandbox with default implementations for file and code operations.
+
+This module defines the ShellBasedSandbox abstract class, which provides
+shell-command-based defaults for file operations (read, write, remove, list)
+and code execution. Subclasses only need to implement ``execute_streaming()``.
+
+Use this for remote environments where only shell access is available
+(e.g., Docker containers, SSH connections). For local execution, use
+:class:`~strands.sandbox.host.HostSandbox` which uses native
+Python methods instead.
+
+Class hierarchy::
+
+ Sandbox (ABC, all abstract)
+ └── ShellBasedSandbox (ABC, only execute_streaming() abstract — shell-based file ops + execute_code)
+"""
+
+import base64
+import logging
+import shlex
+from abc import ABC
+from collections.abc import AsyncGenerator
+from typing import Any
+
+from .base import ExecutionResult, FileInfo, Sandbox, StreamChunk
+
+logger = logging.getLogger(__name__)
+
+
+class ShellBasedSandbox(Sandbox, ABC):
+ """Abstract sandbox that provides shell-based defaults for file and code operations.
+
+ Subclasses only need to implement :meth:`execute_streaming`. The remaining
+ operations — ``execute_code_streaming``, ``read_file``, ``write_file``,
+ ``remove_file``, and ``list_files`` — are implemented via shell commands
+ piped through ``execute_streaming()``.
+
+ This class is intended for remote execution environments where only
+ shell access is available (e.g., Docker containers, SSH connections).
+ For local execution, use :class:`~strands.sandbox.host.HostSandbox`
+ which uses native Python methods for better safety and reliability.
+
+ Subclasses may override any method with a native implementation for
+ better performance.
+ """
+
+ async def execute_code_streaming(
+ self,
+ code: str,
+ language: str,
+ timeout: int | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ """Execute code in the sandbox, streaming output.
+
+ The default implementation passes code to the language interpreter
+ via ``-c`` with proper shell quoting. Both the ``language`` and
+ ``code`` parameters are sanitized with :func:`shlex.quote` to
+ prevent command injection.
+
+ Args:
+ code: The source code to execute.
+ language: The programming language interpreter to use (e.g.
+ ``"python"``, ``"node"``, ``"ruby"``).
+ timeout: Maximum execution time in seconds. ``None`` means no timeout.
+ cwd: Working directory for code execution. ``None`` means use the
+ sandbox's default working directory.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Yields:
+ :class:`StreamChunk` objects for output, then a final :class:`ExecutionResult`.
+
+ Note:
+ The default implementation assumes the language interpreter
+ accepts code via the ``-c`` flag (e.g., ``python -c "code"``).
+ Override this method for interpreters that require a different
+ invocation pattern (e.g., ``javac``, ``gcc``, ``go run``).
+ """
+ async for chunk in self.execute_streaming(
+ f"{shlex.quote(language)} -c {shlex.quote(code)}", timeout=timeout, cwd=cwd
+ ):
+ yield chunk
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ """Read a file from the sandbox filesystem as raw bytes.
+
+ Uses ``base64`` to encode the file content for safe transport
+ through the shell text layer, then decodes on the Python side.
+ This preserves binary content (images, PDFs, compiled files)
+ that would be corrupted by direct ``cat`` through a text pipe.
+
+ Override for native file I/O support if the backend provides
+ a binary-safe channel (e.g., Docker stdin/stdout pipes).
+
+ Args:
+ path: Path to the file to read.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ The file contents as bytes.
+
+ Raises:
+ FileNotFoundError: If the file does not exist or cannot be read.
+ """
+ result = await self.execute(f"base64 {shlex.quote(path)}")
+ if result.exit_code != 0:
+ raise FileNotFoundError(result.stderr)
+ # base64 output is ASCII-safe text — decode it back to raw bytes
+ return base64.b64decode(result.stdout)
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ """Write bytes to a file in the sandbox filesystem.
+
+ Uses ``base64`` encoding to safely transport binary content through
+ the shell text layer. The encoded data is piped through ``base64 -d``
+ to decode it back to raw bytes on the remote side. Parent directories
+ are created automatically via ``mkdir -p``.
+
+ This approach handles any content type (text, images, PDFs, compiled
+ files) without corruption. Override for native file I/O if the
+ backend provides a binary-safe channel.
+
+ Note:
+ The base64-encoded content is passed as a shell argument to
+ ``printf``. For very large files (roughly >1.5 MB of original
+ content, which becomes ~2 MB after base64 encoding), this may
+ exceed the shell's ``ARG_MAX`` limit on some systems. For large
+ binary files, override this method with a stdin-piping approach
+ or use a binary-safe channel.
+
+ Args:
+ path: Path to the file to write.
+ content: The content to write as bytes.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Raises:
+ IOError: If the file cannot be written.
+ """
+ encoded = base64.b64encode(content).decode("ascii")
+ quoted_path = shlex.quote(path)
+ # Create parent directories, then write content via base64 decode
+ cmd = f"mkdir -p \"$(dirname {quoted_path})\" && printf '%s' {shlex.quote(encoded)} | base64 -d > {quoted_path}"
+ result = await self.execute(cmd)
+ if result.exit_code != 0:
+ raise OSError(result.stderr)
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ """Remove a file from the sandbox filesystem.
+
+ Override for native file removal support. The default implementation
+ uses ``rm`` via the shell.
+
+ Args:
+ path: Path to the file to remove.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ """
+ result = await self.execute(f"rm {shlex.quote(path)}")
+ if result.exit_code != 0:
+ raise FileNotFoundError(result.stderr)
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ """List files in a sandbox directory with structured metadata.
+
+ Uses ``ls -1aF`` to include hidden files (dotfiles) and identify
+ directories. Returns :class:`FileInfo` entries with name and is_dir.
+ Size is ``None`` for shell-based listing (cannot be determined
+ reliably from ``ls -1aF`` output alone).
+
+ Override for native directory listing support.
+
+ Args:
+ path: Path to the directory to list.
+ **kwargs: Additional keyword arguments for forward compatibility.
+
+ Returns:
+ A list of :class:`FileInfo` entries.
+
+ Raises:
+ FileNotFoundError: If the directory does not exist.
+ """
+ result = await self.execute(f"ls -1aF {shlex.quote(path)}")
+ if result.exit_code != 0:
+ raise FileNotFoundError(result.stderr)
+
+ entries = []
+ for line in result.stdout.strip().split("\n"):
+ line = line.strip()
+ if not line or line in (".", "..", "./", "../"):
+ continue
+ # ls -F appends / for directories, @ for symlinks, * for executables
+ is_dir = line.endswith("/")
+ # Strip the type indicator from the name
+ name = line.rstrip("/@*=|")
+ if name:
+ entries.append(FileInfo(name=name, is_dir=is_dir))
+ return entries
diff --git a/tests/strands/sandbox/__init__.py b/tests/strands/sandbox/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/strands/sandbox/test_adversarial_security.py b/tests/strands/sandbox/test_adversarial_security.py
new file mode 100644
index 000000000..aeb4d50e8
--- /dev/null
+++ b/tests/strands/sandbox/test_adversarial_security.py
@@ -0,0 +1,329 @@
+"""Adversarial tests: Security, path traversal, injection, and edge cases.
+
+Tests for:
+- Path traversal attacks in HostSandbox
+- Content injection edge cases
+- Symlink attacks
+- Binary/special content handling
+"""
+
+import pytest
+
+from strands.sandbox.base import ExecutionResult
+from strands.sandbox.host import HostSandbox
+from strands.sandbox.shell_based import ShellBasedSandbox
+
+
+class TestHostSandboxDoesNotConfineFilesystemAccess:
+ """Documents that HostSandbox does NOT confine file access to working_dir.
+
+ HostSandbox is a host execution environment with no sandboxing.
+ Path traversal, absolute paths, and symlinks all work as they would
+ in any Python program. For confined execution, use a Sandbox
+ implementation that provides actual isolation (e.g., Docker, cloud sandbox).
+
+ These tests verify the LACK of isolation — they are NOT security tests.
+ """
+
+ @pytest.mark.asyncio
+ async def test_read_file_allows_path_traversal(self, tmp_path):
+ """read_file with ../.. reads outside working_dir (no confinement)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ outside_dir = tmp_path.parent / "outside_sandbox"
+ outside_dir.mkdir(exist_ok=True)
+ outside_file = outside_dir / "secret.txt"
+ outside_file.write_text("SECRET_DATA")
+
+ relative_path = "../outside_sandbox/secret.txt"
+ content = await sandbox.read_file(relative_path)
+ assert content == b"SECRET_DATA"
+
+ @pytest.mark.asyncio
+ async def test_write_file_allows_path_traversal(self, tmp_path):
+ """write_file with ../.. writes outside working_dir (no confinement)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ outside_dir = tmp_path.parent / "write_escape"
+ outside_dir.mkdir(exist_ok=True)
+
+ relative_path = "../write_escape/pwned.txt"
+ await sandbox.write_file(relative_path, b"PWNED")
+ assert (outside_dir / "pwned.txt").read_bytes() == b"PWNED"
+
+ @pytest.mark.asyncio
+ async def test_execute_accesses_entire_filesystem(self, tmp_path):
+ """execute() runs arbitrary shell commands with no filesystem confinement."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("echo hello_from_shell")
+ assert result.exit_code == 0
+ assert "hello_from_shell" in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_absolute_path_bypasses_working_dir(self, tmp_path):
+ """Absolute paths bypass working_dir — HostSandbox has no path confinement."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ abs_path = str(tmp_path.parent / "abs_escape.txt")
+ await sandbox.write_file(abs_path, b"escaped")
+ content = await sandbox.read_file(abs_path)
+ assert content == b"escaped"
+
+
+class TestContentEdgeCases:
+ """Can content with special characters break file operations?"""
+
+ @pytest.mark.asyncio
+ async def test_content_with_shell_metacharacters(self, tmp_path):
+ """Content with shell metacharacters should be preserved (native Python I/O)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ content = b"hello $USER `whoami` $(id) && rm -rf / ; echo pwned"
+ await sandbox.write_file("test.txt", content)
+ read_back = await sandbox.read_file("test.txt")
+ assert read_back == content
+
+ @pytest.mark.asyncio
+ async def test_content_with_null_bytes(self, tmp_path):
+ """Content with null bytes should be handled by native Python I/O."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ content = b"before\x00after"
+ await sandbox.write_file("null.txt", content)
+ read_back = await sandbox.read_file("null.txt")
+ assert read_back == content
+
+ @pytest.mark.asyncio
+ async def test_empty_content(self, tmp_path):
+ """Writing empty content should create an empty file."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("empty.txt", b"")
+ content = await sandbox.read_file("empty.txt")
+ assert content == b""
+
+ @pytest.mark.asyncio
+ async def test_very_large_content(self, tmp_path):
+ """Writing 10MB of content should work."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ large_content = b"A" * (10 * 1024 * 1024)
+ await sandbox.write_file("large.txt", large_content)
+ content = await sandbox.read_file("large.txt")
+ assert len(content) == len(large_content)
+ assert content == large_content
+
+
+class TestHostSandboxEdgeCases:
+ """Edge cases specific to HostSandbox."""
+
+ @pytest.mark.asyncio
+ async def test_symlink_read(self, tmp_path):
+ """Reading through a symlink should work."""
+ real_file = tmp_path / "real.txt"
+ real_file.write_text("real content")
+ symlink = tmp_path / "link.txt"
+ symlink.symlink_to(real_file)
+
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ content = await sandbox.read_file("link.txt")
+ assert content == b"real content"
+
+ @pytest.mark.asyncio
+ async def test_symlink_outside_sandbox(self, tmp_path):
+ """Symlink pointing outside working_dir — HostSandbox follows it."""
+ outside_dir = tmp_path.parent / "symlink_escape"
+ outside_dir.mkdir(exist_ok=True)
+ outside_file = outside_dir / "target.txt"
+ outside_file.write_text("escaped via symlink")
+
+ symlink = tmp_path / "evil_link.txt"
+ symlink.symlink_to(outside_file)
+
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ content = await sandbox.read_file("evil_link.txt")
+ assert content == b"escaped via symlink"
+
+ @pytest.mark.asyncio
+ async def test_unicode_filename(self, tmp_path):
+ """Unicode filenames should work."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("日本語.txt", b"Japanese content")
+ content = await sandbox.read_file("日本語.txt")
+ assert content == b"Japanese content"
+
+ @pytest.mark.asyncio
+ async def test_filename_with_spaces_and_special_chars(self, tmp_path):
+ """Filenames with spaces and special characters should work."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("file with spaces.txt", b"spaced")
+ content = await sandbox.read_file("file with spaces.txt")
+ assert content == b"spaced"
+
+ @pytest.mark.asyncio
+ async def test_deeply_nested_directory_creation(self, tmp_path):
+ """write_file should create deeply nested directories."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ deep_path = "a/b/c/d/e/f/g/h/i/j/deep.txt"
+ await sandbox.write_file(deep_path, b"deep")
+ content = await sandbox.read_file(deep_path)
+ assert content == b"deep"
+
+ @pytest.mark.asyncio
+ async def test_list_files_with_hidden_files(self, tmp_path):
+ """list_files should include hidden files (os.listdir includes them)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("visible.txt", b"visible")
+ await sandbox.write_file(".hidden", b"hidden")
+
+ files = await sandbox.list_files(".")
+ names = [f.name for f in files]
+ assert "visible.txt" in names
+ # Native Python os.listdir includes hidden files!
+ assert ".hidden" in names
+
+ @pytest.mark.asyncio
+ async def test_read_nonexistent_with_special_chars_in_path(self, tmp_path):
+ """read_file with special chars in path should raise FileNotFoundError.
+
+ On Windows, ' and " are invalid in file paths, raising OSError instead.
+ """
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises((FileNotFoundError, OSError)):
+ await sandbox.read_file("nonexistent 'file\".txt")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_with_multiline_and_quotes(self, tmp_path):
+ """execute_code should handle code with all types of quotes."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ code = """
+x = "hello 'world'"
+y = 'hello "world"'
+print(x)
+print(y)
+"""
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+ assert "hello 'world'" in result.stdout
+ assert 'hello "world"' in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_execute_code_with_backslashes(self, tmp_path):
+ """execute_code should handle backslashes in code."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ code = 'print("path\\\\to\\\\file")'
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+ assert "path\\to\\file" in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_execute_returns_correct_exit_codes(self, tmp_path):
+ """Various exit codes should be preserved."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ for code in [0, 1, 2, 127, 255]:
+ result = await sandbox.execute(f"exit {code}")
+ assert result.exit_code == code, f"Expected exit code {code}, got {result.exit_code}"
+
+ @pytest.mark.asyncio
+ async def test_file_io_uses_async_to_thread(self, tmp_path):
+ """HostSandbox wraps file I/O with asyncio.to_thread to avoid blocking.
+
+ All pathlib operations (read_bytes, write_bytes, unlink, listdir) are
+ wrapped in asyncio.to_thread, keeping the event loop responsive during
+ disk I/O.
+ """
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ large_content = b"X" * (1024 * 1024) # 1MB
+ await sandbox.write_file("blocking_test.txt", large_content)
+ content = await sandbox.read_file("blocking_test.txt")
+ assert len(content) == 1024 * 1024
+
+ @pytest.mark.asyncio
+ async def test_remove_file_nonexistent(self, tmp_path):
+ """remove_file should raise FileNotFoundError for missing files."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(FileNotFoundError):
+ await sandbox.remove_file("nonexistent.txt")
+
+ @pytest.mark.asyncio
+ async def test_remove_file_then_read(self, tmp_path):
+ """Removing a file and then reading it should raise FileNotFoundError."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("to_delete.txt", b"content")
+ await sandbox.remove_file("to_delete.txt")
+ with pytest.raises(FileNotFoundError):
+ await sandbox.read_file("to_delete.txt")
+
+ @pytest.mark.asyncio
+ async def test_remove_file_outside_sandbox(self, tmp_path):
+ """remove_file with absolute path can delete files outside sandbox."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ outside_dir = tmp_path.parent / "remove_escape"
+ outside_dir.mkdir(exist_ok=True)
+ outside_file = outside_dir / "target.txt"
+ outside_file.write_text("will be deleted")
+
+ await sandbox.remove_file(str(outside_file))
+ assert not outside_file.exists()
+
+ @pytest.mark.asyncio
+ async def test_execute_code_rejects_unsafe_language(self, tmp_path):
+ """execute_code validates language parameter against unsafe characters."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(ValueError, match="unsafe characters"):
+ await sandbox.execute_code("print(1)", language="python; rm -rf /")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_rejects_path_traversal_language(self, tmp_path):
+ """execute_code rejects language with path separators."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(ValueError, match="unsafe characters"):
+ await sandbox.execute_code("1", language="../../../bin/sh")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_rejects_space_injection(self, tmp_path):
+ """execute_code rejects language with spaces."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(ValueError, match="unsafe characters"):
+ await sandbox.execute_code("1", language="python -m http.server")
+
+
+class TestShellBasedSandboxHeredocEdgeCases:
+ """Edge cases in the ShellBasedSandbox heredoc implementation."""
+
+ @pytest.mark.asyncio
+ async def test_write_file_content_with_single_quotes(self):
+ """Content with single quotes is safely transported via base64."""
+
+ class MockShellSandbox(ShellBasedSandbox):
+ def __init__(self):
+ super().__init__()
+ self.last_command = ""
+
+ async def execute_streaming(self, command, timeout=None, **kwargs):
+ self.last_command = command
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def start(self):
+ self._started = True
+
+ sandbox = MockShellSandbox()
+ content = "line1\nline2\nline3"
+ await sandbox.write_file("/tmp/test.txt", content.encode())
+
+ cmd = sandbox.last_command
+ assert "base64 -d" in cmd
+ assert "/tmp/test.txt" in cmd
+
+ @pytest.mark.asyncio
+ async def test_write_file_empty_path(self):
+ """Empty path should still be quoted."""
+
+ class MockShellSandbox(ShellBasedSandbox):
+ def __init__(self):
+ super().__init__()
+ self.last_command = ""
+
+ async def execute_streaming(self, command, timeout=None, **kwargs):
+ self.last_command = command
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def start(self):
+ self._started = True
+
+ sandbox = MockShellSandbox()
+ await sandbox.write_file("", b"content")
+ assert "''" in sandbox.last_command
diff --git a/tests/strands/sandbox/test_adversarial_shared_sandbox.py b/tests/strands/sandbox/test_adversarial_shared_sandbox.py
new file mode 100644
index 000000000..709394860
--- /dev/null
+++ b/tests/strands/sandbox/test_adversarial_shared_sandbox.py
@@ -0,0 +1,136 @@
+"""Adversarial tests: Shared sandboxes and concurrent tool calls.
+
+Tests what happens when:
+- Multiple agents share the same sandbox instance
+- Multiple concurrent execute() calls hit the same sandbox
+- File operations overlap (write/read races)
+- Lifecycle races (start/stop during execution)
+"""
+
+import asyncio
+
+import pytest
+
+from strands.sandbox.base import ExecutionResult
+from strands.sandbox.host import HostSandbox
+
+
+class TestSharedSandboxConcurrentExecution:
+ """What happens when multiple coroutines call execute() on the same sandbox concurrently?"""
+
+ @pytest.mark.asyncio
+ async def test_concurrent_executes_same_sandbox(self, tmp_path):
+ """Multiple concurrent execute() calls on same sandbox should not corrupt each other."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+
+ async def run_command(cmd: str) -> ExecutionResult:
+ return await sandbox.execute(cmd)
+
+ # Run 10 concurrent commands
+ results = await asyncio.gather(
+ run_command("echo cmd0"),
+ run_command("echo cmd1"),
+ run_command("echo cmd2"),
+ run_command("echo cmd3"),
+ run_command("echo cmd4"),
+ run_command("echo cmd5"),
+ run_command("echo cmd6"),
+ run_command("echo cmd7"),
+ run_command("echo cmd8"),
+ run_command("echo cmd9"),
+ )
+
+ # Each command should have its own exit code and output
+ for i, result in enumerate(results):
+ assert result.exit_code == 0, f"cmd{i} failed: {result.stderr}"
+ assert result.stdout.strip() == f"cmd{i}", f"cmd{i} got wrong output: {result.stdout!r}"
+
+ @pytest.mark.asyncio
+ async def test_concurrent_file_write_same_file(self, tmp_path):
+ """Two concurrent writes to the same file — last write wins, no crash."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+
+ async def write_content(content: bytes):
+ await sandbox.write_file("shared.txt", content)
+
+ # Run concurrent writes
+ await asyncio.gather(
+ write_content(b"content_A"),
+ write_content(b"content_B"),
+ )
+
+ # File should exist and contain one of the values (no corruption)
+ content = await sandbox.read_file("shared.txt")
+ assert content in (b"content_A", b"content_B"), f"Corrupted content: {content!r}"
+
+ @pytest.mark.asyncio
+ async def test_concurrent_file_write_different_files(self, tmp_path):
+ """Concurrent writes to different files should all succeed."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+
+ async def write_file(name: str, content: bytes):
+ await sandbox.write_file(name, content)
+
+ await asyncio.gather(*[write_file(f"file_{i}.txt", f"content_{i}".encode()) for i in range(20)])
+
+ # All files should be written correctly
+ for i in range(20):
+ content = await sandbox.read_file(f"file_{i}.txt")
+ assert content == f"content_{i}".encode(), f"file_{i}.txt has wrong content: {content!r}"
+
+ @pytest.mark.asyncio
+ async def test_concurrent_read_write_same_file(self, tmp_path):
+ """Concurrent read + write on same file — should not crash."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("test.txt", b"initial")
+
+ async def writer():
+ for i in range(10):
+ await sandbox.write_file("test.txt", f"version_{i}".encode())
+ await asyncio.sleep(0.001)
+
+ async def reader():
+ results = []
+ for _ in range(10):
+ try:
+ content = await sandbox.read_file("test.txt")
+ results.append(content)
+ except FileNotFoundError:
+ # File might be in the middle of being overwritten
+ results.append("FILE_NOT_FOUND")
+ await asyncio.sleep(0.001)
+ return results
+
+ # This should not raise any unhandled exceptions
+ writer_task = asyncio.create_task(writer())
+ reader_task = asyncio.create_task(reader())
+ results = await reader_task
+ await writer_task
+
+ # At least some reads should succeed
+ successful_reads = [r for r in results if r != "FILE_NOT_FOUND"]
+ assert len(successful_reads) > 0
+
+
+class TestSharedSandboxBetweenAgents:
+ """What happens when two Agent instances share the same sandbox?"""
+
+ def test_two_agents_same_sandbox_instance(self):
+ """Two agents sharing the same sandbox should reference the same object."""
+ from strands import Agent
+
+ sandbox = HostSandbox(working_dir="/tmp/shared")
+ agent1 = Agent(sandbox=sandbox)
+ agent2 = Agent(sandbox=sandbox)
+ assert agent1.sandbox is agent2.sandbox
+
+ @pytest.mark.asyncio
+ async def test_shared_sandbox_working_dir_isolation(self, tmp_path):
+ """Commands from both agents should execute in the same working directory."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+
+ # Agent 1 creates a file
+ await sandbox.write_file("from_agent1.txt", b"hello from 1")
+ # Agent 2 should see it
+ content = await sandbox.read_file("from_agent1.txt")
+ assert content == b"hello from 1"
diff --git a/tests/strands/sandbox/test_agent_sandbox.py b/tests/strands/sandbox/test_agent_sandbox.py
new file mode 100644
index 000000000..a24cf976e
--- /dev/null
+++ b/tests/strands/sandbox/test_agent_sandbox.py
@@ -0,0 +1,81 @@
+"""Tests for Agent + Sandbox integration."""
+
+from strands.agent.agent import Agent
+from strands.sandbox.base import Sandbox
+from strands.sandbox.host import HostSandbox
+
+
+class TestAgentSandboxIntegration:
+ def test_agent_sandbox_defaults_to_host_sandbox(self) -> None:
+ """Agent.sandbox defaults to HostSandbox when not explicitly set."""
+ agent = Agent(model="test")
+ assert agent.sandbox is not None
+ assert isinstance(agent.sandbox, HostSandbox)
+ assert isinstance(agent.sandbox, Sandbox)
+
+ def test_agent_sandbox_accepts_host_sandbox(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ agent = Agent(model="test", sandbox=sandbox)
+ assert agent.sandbox is sandbox
+ assert isinstance(agent.sandbox, Sandbox)
+
+ def test_agent_sandbox_default_uses_cwd(self) -> None:
+ """Default HostSandbox uses the current working directory."""
+ import os
+
+ agent = Agent(model="test")
+ assert isinstance(agent.sandbox, HostSandbox)
+ assert agent.sandbox.working_dir == os.getcwd()
+
+ def test_agent_sandbox_is_accessible(self, tmp_path: object) -> None:
+ """Tools can access sandbox via agent.sandbox."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ agent = Agent(model="test", sandbox=sandbox)
+ assert agent.sandbox.working_dir == str(tmp_path)
+
+
+class TestAgentSandboxToolAccess:
+ """Critical: Verify tools can access sandbox through tool_context.agent.sandbox."""
+
+ def test_sandbox_accessible_via_agent_attribute(self, tmp_path: object) -> None:
+ """Simulates tool access pattern: tool_context.agent.sandbox."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ agent = Agent(model="test", sandbox=sandbox)
+
+ # This is the access pattern tools use: tool_context.agent.sandbox
+ accessed_sandbox = agent.sandbox
+ assert accessed_sandbox is sandbox
+ assert accessed_sandbox.working_dir == str(tmp_path)
+ assert hasattr(accessed_sandbox, "execute")
+ assert hasattr(accessed_sandbox, "read_file")
+ assert hasattr(accessed_sandbox, "write_file")
+ assert hasattr(accessed_sandbox, "remove_file")
+ assert hasattr(accessed_sandbox, "list_files")
+ assert hasattr(accessed_sandbox, "execute_code")
+
+ def test_multiple_agents_share_sandbox_correctly(self, tmp_path: object) -> None:
+ """Two agents sharing a sandbox should both access the same instance."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ agent1 = Agent(model="test", sandbox=sandbox)
+ agent2 = Agent(model="test", sandbox=sandbox)
+ assert agent1.sandbox is agent2.sandbox
+ assert agent1.sandbox.working_dir == agent2.sandbox.working_dir
+
+ def test_default_sandbox_has_all_methods(self) -> None:
+ """Default HostSandbox should have all abstract methods implemented."""
+ agent = Agent(model="test")
+ ws = agent.sandbox
+ # Verify all 6 abstract methods + 2 convenience methods exist
+ for method in [
+ "execute",
+ "execute_streaming",
+ "execute_code",
+ "execute_code_streaming",
+ "read_file",
+ "write_file",
+ "remove_file",
+ "list_files",
+ "read_text",
+ "write_text",
+ ]:
+ assert callable(getattr(ws, method)), f"Missing method: {method}"
diff --git a/tests/strands/sandbox/test_base.py b/tests/strands/sandbox/test_base.py
new file mode 100644
index 000000000..ae6e704e4
--- /dev/null
+++ b/tests/strands/sandbox/test_base.py
@@ -0,0 +1,735 @@
+"""Tests for the Sandbox ABC, ShellBasedSandbox, ExecutionResult, FileInfo, and OutputFile."""
+
+from collections.abc import AsyncGenerator
+from typing import Any
+
+import pytest
+
+from strands.sandbox.base import (
+ ExecutionResult,
+ FileInfo,
+ OutputFile,
+ Sandbox,
+ StreamChunk,
+)
+from strands.sandbox.shell_based import ShellBasedSandbox
+
+
+class ConcreteShellSandbox(ShellBasedSandbox):
+ """Minimal concrete ShellBasedSandbox implementation for testing."""
+
+ def __init__(self) -> None:
+ self.commands: list[str] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.commands.append(command)
+ if "fail" in command:
+ yield ExecutionResult(exit_code=1, stdout="", stderr="command failed")
+ return
+ # For base64 commands (used by read_file), return valid base64 output
+ if command.startswith("base64 "):
+ import base64 as b64
+
+ stdout = b64.b64encode(b"mock file content").decode("ascii") + "\n"
+ else:
+ stdout = f"output of: {command}\n"
+ yield StreamChunk(data=stdout)
+ yield ExecutionResult(exit_code=0, stdout=stdout, stderr="")
+
+
+class TestExecutionResult:
+ def test_execution_result_fields(self) -> None:
+ result = ExecutionResult(exit_code=0, stdout="hello", stderr="")
+ assert result.exit_code == 0
+ assert result.stdout == "hello"
+ assert result.stderr == ""
+
+ def test_execution_result_error(self) -> None:
+ result = ExecutionResult(exit_code=1, stdout="", stderr="error msg")
+ assert result.exit_code == 1
+ assert result.stderr == "error msg"
+
+ def test_execution_result_equality(self) -> None:
+ r1 = ExecutionResult(exit_code=0, stdout="out", stderr="err")
+ r2 = ExecutionResult(exit_code=0, stdout="out", stderr="err")
+ assert r1 == r2
+
+ def test_execution_result_output_files_default_empty(self) -> None:
+ result = ExecutionResult(exit_code=0, stdout="", stderr="")
+ assert result.output_files == []
+
+ def test_execution_result_with_output_files(self) -> None:
+ files = [OutputFile(name="plot.png", content=b"\x89PNG", mime_type="image/png")]
+ result = ExecutionResult(exit_code=0, stdout="", stderr="", output_files=files)
+ assert len(result.output_files) == 1
+ assert result.output_files[0].name == "plot.png"
+ assert result.output_files[0].content == b"\x89PNG"
+ assert result.output_files[0].mime_type == "image/png"
+
+
+class TestFileInfo:
+ def test_file_info_file(self) -> None:
+ info = FileInfo(name="test.txt", is_dir=False, size=1024)
+ assert info.name == "test.txt"
+ assert info.is_dir is False
+ assert info.size == 1024
+
+ def test_file_info_directory(self) -> None:
+ info = FileInfo(name="subdir", is_dir=True)
+ assert info.name == "subdir"
+ assert info.is_dir is True
+ assert info.size is None # default is None now
+
+ def test_file_info_equality(self) -> None:
+ f1 = FileInfo(name="a.txt", is_dir=False, size=100)
+ f2 = FileInfo(name="a.txt", is_dir=False, size=100)
+ assert f1 == f2
+
+ def test_file_info_optional_fields_default_none(self) -> None:
+ """is_dir and size default to None when not provided."""
+ info = FileInfo(name="unknown.txt")
+ assert info.name == "unknown.txt"
+ assert info.is_dir is None
+ assert info.size is None
+
+ def test_file_info_with_only_name(self) -> None:
+ """FileInfo with only name is valid — unknown metadata."""
+ info = FileInfo(name="mystery")
+ assert info.is_dir is None
+ assert info.size is None
+
+
+class TestOutputFile:
+ def test_output_file_fields(self) -> None:
+ f = OutputFile(name="chart.svg", content=b"", mime_type="image/svg+xml")
+ assert f.name == "chart.svg"
+ assert f.content == b""
+ assert f.mime_type == "image/svg+xml"
+
+ def test_output_file_default_mime_type(self) -> None:
+ f = OutputFile(name="data.bin", content=b"\x00\x01")
+ assert f.mime_type == "application/octet-stream"
+
+
+class TestSandboxABC:
+ """Tests that Sandbox has all abstract methods and cannot be partially implemented."""
+
+ def test_cannot_instantiate_abstract(self) -> None:
+ with pytest.raises(TypeError):
+ Sandbox() # type: ignore
+
+ def test_cannot_instantiate_with_only_execute_streaming(self) -> None:
+ """A class implementing only execute_streaming() is still abstract."""
+
+ class OnlyExecute(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ with pytest.raises(TypeError):
+ OnlyExecute() # type: ignore
+
+ def test_all_abstract_methods_required(self) -> None:
+ """A class must implement all abstract methods to be concrete."""
+
+ class AllMethods(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ # Should not raise
+ sandbox = AllMethods()
+ assert sandbox is not None
+
+ def test_missing_remove_file_is_abstract(self) -> None:
+ """A class missing remove_file() is still abstract."""
+
+ class MissingRemoveFile(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ with pytest.raises(TypeError):
+ MissingRemoveFile() # type: ignore
+
+ @pytest.mark.asyncio
+ async def test_read_text_convenience(self) -> None:
+ """Test that read_text decodes bytes from read_file."""
+
+ class TextSandbox(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b"hello world"
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = TextSandbox()
+ text = await sandbox.read_text("test.txt")
+ assert text == "hello world"
+ assert isinstance(text, str)
+
+ @pytest.mark.asyncio
+ async def test_write_text_convenience(self) -> None:
+ """Test that write_text encodes string to bytes for write_file."""
+
+ class TextSandbox(Sandbox):
+ def __init__(self) -> None:
+ self.written_content: bytes = b""
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ self.written_content = content
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = TextSandbox()
+ await sandbox.write_text("test.txt", "hello world")
+ assert sandbox.written_content == b"hello world"
+
+ @pytest.mark.asyncio
+ async def test_non_streaming_execute_convenience(self) -> None:
+ """Test that execute() returns ExecutionResult directly."""
+
+ class SimpleSandbox(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield StreamChunk(data="output\n")
+ yield ExecutionResult(exit_code=0, stdout="output\n", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = SimpleSandbox()
+ result = await sandbox.execute("echo hello")
+ assert isinstance(result, ExecutionResult)
+ assert result.exit_code == 0
+ assert result.stdout == "output\n"
+
+ @pytest.mark.asyncio
+ async def test_non_streaming_execute_code_convenience(self) -> None:
+ """Test that execute_code() returns ExecutionResult directly."""
+
+ class SimpleSandbox(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield StreamChunk(data="code output\n")
+ yield ExecutionResult(exit_code=0, stdout="code output\n", stderr="")
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = SimpleSandbox()
+ result = await sandbox.execute_code("print(1)", language="python")
+ assert isinstance(result, ExecutionResult)
+ assert result.exit_code == 0
+ assert result.stdout == "code output\n"
+
+ @pytest.mark.asyncio
+ async def test_execute_raises_on_missing_result(self) -> None:
+ """execute() raises RuntimeError if execute_streaming yields no ExecutionResult."""
+
+ class BadSandbox(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield "just a string"
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield "just a string"
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = BadSandbox()
+ with pytest.raises(RuntimeError, match="did not yield an ExecutionResult"):
+ await sandbox.execute("anything")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_raises_on_missing_result(self) -> None:
+ """execute_code() raises RuntimeError if execute_code_streaming yields no ExecutionResult."""
+
+ class BadSandbox(Sandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield "just a string"
+
+ async def execute_code_streaming(
+ self, code: str, language: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ yield "just a string"
+
+ async def read_file(self, path: str, **kwargs: Any) -> bytes:
+ return b""
+
+ async def write_file(self, path: str, content: bytes, **kwargs: Any) -> None:
+ pass
+
+ async def remove_file(self, path: str, **kwargs: Any) -> None:
+ pass
+
+ async def list_files(self, path: str, **kwargs: Any) -> list[FileInfo]:
+ return []
+
+ sandbox = BadSandbox()
+ with pytest.raises(RuntimeError, match="did not yield an ExecutionResult"):
+ await sandbox.execute_code("print(1)", language="python")
+
+
+class TestShellBasedSandboxABC:
+ """Tests that ShellBasedSandbox is still abstract (execute_streaming() not implemented)."""
+
+ def test_cannot_instantiate_shell_based_sandbox(self) -> None:
+ with pytest.raises(TypeError):
+ ShellBasedSandbox() # type: ignore
+
+ def test_shell_based_sandbox_only_needs_execute_streaming(self) -> None:
+ """ShellBasedSandbox requires only execute_streaming() to be concrete."""
+ sandbox = ConcreteShellSandbox()
+ assert sandbox is not None
+
+
+class TestShellBasedSandboxOperations:
+ """Tests for the shell-based default implementations."""
+
+ @pytest.mark.asyncio
+ async def test_execute_streaming_yields_lines_and_result(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ chunks: list[StreamChunk | ExecutionResult] = []
+ async for chunk in sandbox.execute_streaming("echo hello"):
+ chunks.append(chunk)
+ assert isinstance(chunks[-1], ExecutionResult)
+ assert chunks[-1].exit_code == 0
+ assert any(isinstance(c, StreamChunk) for c in chunks[:-1])
+ assert sandbox.commands == ["echo hello"]
+
+ @pytest.mark.asyncio
+ async def test_non_streaming_execute(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ result = await sandbox.execute("echo hello")
+ assert isinstance(result, ExecutionResult)
+ assert result.exit_code == 0
+ assert "echo hello" in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_execute_code_streaming_default(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ chunks: list[StreamChunk | ExecutionResult] = []
+ async for chunk in sandbox.execute_code_streaming("print('hi')", language="python"):
+ chunks.append(chunk)
+ assert isinstance(chunks[-1], ExecutionResult)
+ assert chunks[-1].exit_code == 0
+
+ @pytest.mark.asyncio
+ async def test_non_streaming_execute_code(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ result = await sandbox.execute_code("print('hi')", language="python")
+ assert result.exit_code == 0
+ assert len(sandbox.commands) == 1
+ assert "python" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_execute_code_custom_language(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ result = await sandbox.execute_code("puts 'hi'", language="ruby")
+ assert result.exit_code == 0
+ assert "ruby" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_read_file_returns_bytes(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ content = await sandbox.read_file("/tmp/test.txt")
+ assert isinstance(content, bytes)
+ assert content == b"mock file content"
+ assert "base64" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_read_file_not_found(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ with pytest.raises(FileNotFoundError):
+ await sandbox.read_file("/tmp/fail.txt")
+
+ @pytest.mark.asyncio
+ async def test_write_file_accepts_bytes(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ await sandbox.write_file("/tmp/test.txt", b"hello content")
+ assert len(sandbox.commands) == 1
+ assert "/tmp/test.txt" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_write_file_failure(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ with pytest.raises(IOError):
+ await sandbox.write_file("/tmp/fail.txt", b"content")
+
+ @pytest.mark.asyncio
+ async def test_write_file_uses_base64(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ await sandbox.write_file("/tmp/test.txt", b"content with STRANDS_EOF inside")
+ assert "base64 -d" in sandbox.commands[0]
+ assert "/tmp/test.txt" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_write_file_path_is_shell_quoted(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ await sandbox.write_file("/tmp/test file.txt", b"content")
+ assert "'/tmp/test file.txt'" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_read_file_path_is_shell_quoted(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ await sandbox.read_file("/tmp/test file.txt")
+ assert "'/tmp/test file.txt'" in sandbox.commands[0]
+ assert "base64" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_remove_file_success(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ await sandbox.remove_file("/tmp/test.txt")
+ assert len(sandbox.commands) == 1
+ assert "rm" in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_remove_file_not_found(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ with pytest.raises(FileNotFoundError):
+ await sandbox.remove_file("/tmp/fail.txt")
+
+ @pytest.mark.asyncio
+ async def test_list_files_returns_file_info(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ files = await sandbox.list_files("/tmp")
+ assert len(sandbox.commands) == 1
+ assert "ls" in sandbox.commands[0]
+ # The mock returns a string output, so list_files parses it
+ for f in files:
+ assert isinstance(f, FileInfo)
+
+ @pytest.mark.asyncio
+ async def test_list_files_not_found(self) -> None:
+ sandbox = ConcreteShellSandbox()
+ with pytest.raises(FileNotFoundError):
+ await sandbox.list_files("/tmp/fail")
+
+ @pytest.mark.asyncio
+ async def test_list_files_size_is_none(self) -> None:
+ """ShellBasedSandbox.list_files returns size=None (cannot determine from ls)."""
+ sandbox = ConcreteShellSandbox()
+ files = await sandbox.list_files("/tmp")
+ for f in files:
+ assert f.size is None
+
+
+class TestShellBasedListFilesRealisticOutput:
+ @pytest.mark.asyncio
+ async def test_list_files_parses_directory_indicator(self) -> None:
+ class RealisticLsSandbox(ShellBasedSandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ stdout = ".\n..\nsubdir/\n.hidden_dir/\nfile.txt\nscript.py*\nlink.txt@\npipe_file|\n"
+ yield StreamChunk(data=stdout)
+ yield ExecutionResult(exit_code=0, stdout=stdout, stderr="")
+
+ sandbox = RealisticLsSandbox()
+ files = await sandbox.list_files("/some/path")
+
+ names = [f.name for f in files]
+ is_dir_map = {f.name: f.is_dir for f in files}
+
+ assert "subdir" in names
+ assert is_dir_map["subdir"] is True
+ assert ".hidden_dir" in names
+ assert is_dir_map[".hidden_dir"] is True
+ assert "file.txt" in names
+ assert is_dir_map["file.txt"] is False
+ assert "script.py" in names
+ assert is_dir_map["script.py"] is False
+ assert "link.txt" in names
+ assert is_dir_map["link.txt"] is False
+ assert "pipe_file" in names
+ assert is_dir_map["pipe_file"] is False
+ assert "." not in names
+ assert ".." not in names
+
+ @pytest.mark.asyncio
+ async def test_list_files_empty_directory(self) -> None:
+ class EmptyLsSandbox(ShellBasedSandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ stdout = "./\n../\n"
+ yield StreamChunk(data=stdout)
+ yield ExecutionResult(exit_code=0, stdout=stdout, stderr="")
+
+ sandbox = EmptyLsSandbox()
+ files = await sandbox.list_files("/empty")
+ assert files == []
+
+ @pytest.mark.asyncio
+ async def test_list_files_files_only_no_indicators(self) -> None:
+ class PlainLsSandbox(ShellBasedSandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ stdout = "readme.md\nsetup.py\nrequirements.txt\n"
+ yield StreamChunk(data=stdout)
+ yield ExecutionResult(exit_code=0, stdout=stdout, stderr="")
+
+ sandbox = PlainLsSandbox()
+ files = await sandbox.list_files("/project")
+ names = [f.name for f in files]
+ assert names == ["readme.md", "setup.py", "requirements.txt"]
+ assert all(not f.is_dir for f in files)
+ assert all(f.size is None for f in files) # shell-based has no size info
+
+
+class TestShellBasedWriteFileParentDirs:
+ @pytest.mark.asyncio
+ async def test_write_file_command_includes_mkdir(self) -> None:
+ class CommandCapture(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.commands: list[str] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.commands.append(command)
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ sandbox = CommandCapture()
+ await sandbox.write_file("/tmp/deep/nested/file.txt", b"content")
+ assert len(sandbox.commands) == 1
+ assert "mkdir -p" in sandbox.commands[0]
+ assert "base64 -d" in sandbox.commands[0]
+
+
+class TestShellBasedBase64Encoding:
+ @pytest.mark.asyncio
+ async def test_write_file_encodes_exact_base64(self) -> None:
+ import base64 as b64
+
+ class CommandCapture(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.commands: list[str] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.commands.append(command)
+ yield ExecutionResult(exit_code=0, stdout="", stderr="")
+
+ sandbox = CommandCapture()
+ original_content = b"hello world with special chars: \x00\xff\n\t"
+ await sandbox.write_file("/tmp/test.bin", original_content)
+ expected_b64 = b64.b64encode(original_content).decode("ascii")
+ assert expected_b64 in sandbox.commands[0]
+
+ @pytest.mark.asyncio
+ async def test_read_file_decodes_base64_correctly(self) -> None:
+ import base64 as b64
+
+ original_content = b"\x89PNG\r\n\x1a\n\x00\x00binary"
+
+ class Base64Sandbox(ShellBasedSandbox):
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ stdout = b64.b64encode(original_content).decode("ascii") + "\n"
+ yield StreamChunk(data=stdout)
+ yield ExecutionResult(exit_code=0, stdout=stdout, stderr="")
+
+ sandbox = Base64Sandbox()
+ content = await sandbox.read_file("/tmp/image.png")
+ assert content == original_content
+
+
+class TestShellBasedSandboxCwdPassthrough:
+ """Test that ShellBasedSandbox passes cwd through to execute_streaming."""
+
+ @pytest.mark.asyncio
+ async def test_execute_code_streaming_passes_cwd(self) -> None:
+ """execute_code_streaming should forward cwd to execute_streaming."""
+
+ class CwdTrackingSandbox(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.received_cwds: list[str | None] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.received_cwds.append(cwd)
+ yield ExecutionResult(exit_code=0, stdout="ok", stderr="")
+
+ sandbox = CwdTrackingSandbox()
+ await sandbox.execute_code("print(1)", language="python", cwd="/custom/dir")
+ assert sandbox.received_cwds == ["/custom/dir"]
+
+ @pytest.mark.asyncio
+ async def test_execute_code_streaming_passes_none_cwd_by_default(self) -> None:
+ """When no cwd is provided, None should be passed through."""
+
+ class CwdTrackingSandbox(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.received_cwds: list[str | None] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.received_cwds.append(cwd)
+ yield ExecutionResult(exit_code=0, stdout="ok", stderr="")
+
+ sandbox = CwdTrackingSandbox()
+ await sandbox.execute_code("print(1)", language="python")
+ assert sandbox.received_cwds == [None]
+
+
+class TestNonStreamingConveniencePassesCwd:
+ """Test that non-streaming execute/execute_code pass cwd to streaming methods."""
+
+ @pytest.mark.asyncio
+ async def test_execute_passes_cwd_to_streaming(self) -> None:
+ """The non-streaming execute() should forward cwd to execute_streaming()."""
+
+ class CwdTrackingSandbox(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.received_cwds: list[str | None] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.received_cwds.append(cwd)
+ yield ExecutionResult(exit_code=0, stdout="ok", stderr="")
+
+ sandbox = CwdTrackingSandbox()
+ result = await sandbox.execute("echo hi", cwd="/some/path")
+ assert sandbox.received_cwds == ["/some/path"]
+ assert result.exit_code == 0
+
+ @pytest.mark.asyncio
+ async def test_execute_code_passes_cwd_to_streaming(self) -> None:
+ """The non-streaming execute_code() should forward cwd to execute_code_streaming()."""
+
+ class CwdTrackingSandbox(ShellBasedSandbox):
+ def __init__(self) -> None:
+ self.received_cwds: list[str | None] = []
+
+ async def execute_streaming(
+ self, command: str, timeout: int | None = None, cwd: str | None = None, **kwargs: Any
+ ) -> AsyncGenerator[StreamChunk | ExecutionResult, None]:
+ self.received_cwds.append(cwd)
+ yield ExecutionResult(exit_code=0, stdout="ok", stderr="")
+
+ sandbox = CwdTrackingSandbox()
+ result = await sandbox.execute_code("print(1)", language="python", cwd="/code/dir")
+ assert sandbox.received_cwds == ["/code/dir"]
+ assert result.exit_code == 0
diff --git a/tests/strands/sandbox/test_host.py b/tests/strands/sandbox/test_host.py
new file mode 100644
index 000000000..8c08ed2cf
--- /dev/null
+++ b/tests/strands/sandbox/test_host.py
@@ -0,0 +1,473 @@
+"""Tests for the HostSandbox implementation."""
+
+import asyncio
+import os
+from pathlib import Path
+
+import pytest
+
+from strands.sandbox.base import ExecutionResult, FileInfo, Sandbox, StreamChunk
+from strands.sandbox.host import HostSandbox
+
+
+class TestHostSandboxInit:
+ def test_default_working_dir(self) -> None:
+ sandbox = HostSandbox()
+ assert sandbox.working_dir == os.getcwd()
+
+ def test_custom_working_dir(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ assert sandbox.working_dir == str(tmp_path)
+
+ def test_extends_sandbox_directly(self) -> None:
+ """HostSandbox extends Sandbox directly, not ShellBasedSandbox."""
+ sandbox = HostSandbox()
+ assert isinstance(sandbox, Sandbox)
+
+ def test_does_not_extend_shell_based_sandbox(self) -> None:
+ """HostSandbox must NOT inherit from ShellBasedSandbox."""
+ from strands.sandbox.shell_based import ShellBasedSandbox
+
+ sandbox = HostSandbox()
+ assert not isinstance(sandbox, ShellBasedSandbox)
+
+
+class TestHostSandboxResolvePath:
+ def test_resolve_relative_path(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ resolved = sandbox._resolve_path("subdir/file.txt")
+ expected = Path(str(tmp_path)) / "subdir" / "file.txt"
+ assert resolved == expected
+
+ def test_resolve_absolute_path(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ abs_path = str(Path(str(tmp_path)) / "absolute" / "path.txt")
+ resolved = sandbox._resolve_path(abs_path)
+ assert str(resolved) == abs_path
+
+
+class TestHostSandboxExecute:
+ @pytest.mark.asyncio
+ async def test_execute_echo(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("echo hello")
+ assert result.exit_code == 0
+ assert result.stdout.strip() == "hello"
+ assert result.stderr == ""
+
+ @pytest.mark.asyncio
+ async def test_execute_streams_chunks(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ chunks: list[StreamChunk | ExecutionResult] = []
+ async for chunk in sandbox.execute_streaming("echo line1 && echo line2"):
+ chunks.append(chunk)
+ str_chunks = [c.data for c in chunks if isinstance(c, StreamChunk)]
+ result_chunks = [c for c in chunks if isinstance(c, ExecutionResult)]
+ assert len(result_chunks) == 1
+ assert len(str_chunks) >= 1
+ combined = "".join(str_chunks)
+ assert "line1" in combined
+ assert "line2" in combined
+
+ @pytest.mark.asyncio
+ async def test_execute_failure(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("exit 42")
+ assert result.exit_code == 42
+
+ @pytest.mark.asyncio
+ async def test_execute_stderr(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("echo error >&2")
+ assert result.exit_code == 0
+ assert result.stderr.strip() == "error"
+
+ @pytest.mark.asyncio
+ async def test_execute_uses_working_dir(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ # Use python to print cwd instead of 'pwd' for cross-platform compatibility.
+ # On Windows, 'pwd' (via Git Bash) returns MSYS-style paths (/c/Users/...)
+ # which don't match the native Windows path (C:\Users\...).
+ result = await sandbox.execute('python -c "import os; print(os.getcwd())"')
+ assert result.exit_code == 0
+ assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(tmp_path))
+
+ @pytest.mark.asyncio
+ async def test_execute_timeout(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(asyncio.TimeoutError):
+ await sandbox.execute("sleep 10", timeout=1)
+
+ @pytest.mark.asyncio
+ async def test_execute_no_timeout(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("echo fast", timeout=None)
+ assert result.exit_code == 0
+ assert result.stdout.strip() == "fast"
+
+ @pytest.mark.asyncio
+ async def test_execute_long_output_without_newlines(self, tmp_path: object) -> None:
+ """Bug 2 fix: long output lines (>64KB) no longer crash."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute("python3 -c \"import sys; sys.stdout.write('A' * 131072)\"")
+ assert result.exit_code == 0
+ assert len(result.stdout) == 131072
+
+
+class TestHostSandboxExecuteCode:
+ @pytest.mark.asyncio
+ async def test_execute_python_code(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute_code("print('hello from python')", language="python")
+ assert result.exit_code == 0
+ assert result.stdout.strip() == "hello from python"
+
+ @pytest.mark.asyncio
+ async def test_execute_code_streams(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ chunks: list[StreamChunk | ExecutionResult] = []
+ async for chunk in sandbox.execute_code_streaming("print('line1')\nprint('line2')", language="python"):
+ chunks.append(chunk)
+ assert isinstance(chunks[-1], ExecutionResult)
+ combined = "".join(c.data for c in chunks if isinstance(c, StreamChunk))
+ assert "line1" in combined
+ assert "line2" in combined
+
+ @pytest.mark.asyncio
+ async def test_execute_python_code_error(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute_code("raise ValueError('test error')", language="python")
+ assert result.exit_code != 0
+ assert "ValueError" in result.stderr
+
+ @pytest.mark.asyncio
+ async def test_execute_python_multiline(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ code = "x = 42\nprint(f'x = {x}')"
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+ assert "x = 42" in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_execute_code_rejects_unsafe_language(self, tmp_path: object) -> None:
+ """Language parameter with shell metacharacters raises ValueError."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(ValueError, match="unsafe characters"):
+ await sandbox.execute_code("print(1)", language="python; rm -rf /")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_accepts_valid_languages(self, tmp_path: object) -> None:
+ """Valid language names with dots, hyphens, underscores pass validation."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ # python3.12-like names should be accepted
+ # (may fail to execute if interpreter not found, but validation passes)
+ with pytest.raises(ValueError, match="unsafe characters"):
+ await sandbox.execute_code("1", language="python; echo pwned")
+ # These should NOT raise ValueError (may raise FileNotFoundError at exec time)
+ try:
+ await sandbox.execute_code("1", language="python3.12")
+ except Exception as e:
+ # FileNotFoundError or non-zero exit code is expected, NOT ValueError
+ assert not isinstance(e, ValueError)
+
+ @pytest.mark.asyncio
+ async def test_execute_code_uses_subprocess_exec(self, tmp_path: object) -> None:
+ """Verify code is passed directly to interpreter, not through shell."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ # Code with shell metacharacters should be passed literally to python
+ code = "import sys; print(sys.argv)"
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+
+ @pytest.mark.asyncio
+ async def test_execute_code_uses_working_dir(self, tmp_path: object) -> None:
+ """Code execution should use the sandbox working directory."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute_code("import os; print(os.getcwd())", language="python")
+ assert result.exit_code == 0
+ assert result.stdout.strip() == str(tmp_path)
+
+ @pytest.mark.asyncio
+ async def test_execute_code_with_quotes(self, tmp_path: object) -> None:
+ """Code with all types of quotes should work (no shell quoting needed)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ code = """
+x = "hello 'world'"
+y = 'hello "world"'
+print(x)
+print(y)
+"""
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+ assert "hello 'world'" in result.stdout
+ assert 'hello "world"' in result.stdout
+
+ @pytest.mark.asyncio
+ async def test_execute_code_with_backslashes(self, tmp_path: object) -> None:
+ """Code with backslashes should work (no shell escaping needed)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ code = 'print("path\\\\to\\\\file")'
+ result = await sandbox.execute_code(code, language="python")
+ assert result.exit_code == 0
+ assert "path\\to\\file" in result.stdout
+
+
+class TestHostSandboxFileOps:
+ @pytest.mark.asyncio
+ async def test_write_and_read_file(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("test.txt", b"hello world")
+ content = await sandbox.read_file("test.txt")
+ assert content == b"hello world"
+
+ @pytest.mark.asyncio
+ async def test_read_file_absolute_path(self, tmp_path: object) -> None:
+ test_file = tmp_path / "abs_test.txt" # type: ignore[operator]
+ test_file.write_bytes(b"absolute content")
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ content = await sandbox.read_file(str(test_file))
+ assert content == b"absolute content"
+
+ @pytest.mark.asyncio
+ async def test_read_file_not_found(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(FileNotFoundError):
+ await sandbox.read_file("nonexistent.txt")
+
+ @pytest.mark.asyncio
+ async def test_write_file_creates_directories(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("subdir/nested/test.txt", b"nested content")
+ content = await sandbox.read_file("subdir/nested/test.txt")
+ assert content == b"nested content"
+
+ @pytest.mark.asyncio
+ async def test_write_file_absolute_path(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ abs_path = str(tmp_path / "abs_write.txt") # type: ignore[operator]
+ await sandbox.write_file(abs_path, b"absolute write")
+ content = await sandbox.read_file(abs_path)
+ assert content == b"absolute write"
+
+ @pytest.mark.asyncio
+ async def test_write_file_unicode(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_text("unicode.txt", "héllo wörld 🌍")
+ content = await sandbox.read_text("unicode.txt")
+ assert content == "héllo wörld 🌍"
+
+ @pytest.mark.asyncio
+ async def test_list_files(self, tmp_path: object) -> None:
+ (tmp_path / "file1.txt").write_text("a") # type: ignore[operator]
+ (tmp_path / "file2.txt").write_text("b") # type: ignore[operator]
+ (tmp_path / "file3.py").write_text("c") # type: ignore[operator]
+
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ files = await sandbox.list_files(".")
+ names = sorted([f.name for f in files])
+ assert names == ["file1.txt", "file2.txt", "file3.py"]
+ # All should be FileInfo instances
+ for f in files:
+ assert isinstance(f, FileInfo)
+ assert f.is_dir is False
+
+ @pytest.mark.asyncio
+ async def test_list_files_includes_hidden_files(self, tmp_path: object) -> None:
+ """list_files uses os.listdir which includes hidden files (unlike ls -1)."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("visible.txt", b"visible")
+ await sandbox.write_file(".hidden", b"hidden")
+
+ files = await sandbox.list_files(".")
+ names = [f.name for f in files]
+ assert "visible.txt" in names
+ assert ".hidden" in names # Native Python includes dotfiles!
+
+ @pytest.mark.asyncio
+ async def test_list_files_sorted(self, tmp_path: object) -> None:
+ """list_files returns sorted results for deterministic ordering."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("zebra.txt", b"z")
+ await sandbox.write_file("apple.txt", b"a")
+ await sandbox.write_file("mango.txt", b"m")
+
+ files = await sandbox.list_files(".")
+ names = [f.name for f in files]
+ assert names == ["apple.txt", "mango.txt", "zebra.txt"]
+
+ @pytest.mark.asyncio
+ async def test_list_files_empty_dir(self, tmp_path: object) -> None:
+ empty_dir = tmp_path / "empty" # type: ignore[operator]
+ empty_dir.mkdir()
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ files = await sandbox.list_files("empty")
+ assert files == [] # Empty list of FileInfo
+
+ @pytest.mark.asyncio
+ async def test_list_files_not_found(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ with pytest.raises(FileNotFoundError):
+ await sandbox.list_files("nonexistent")
+
+
+class TestHostSandboxExecuteCodeErrorHandling:
+ """Tests for execute_code error handling (FileNotFoundError fix)."""
+
+ @pytest.mark.asyncio
+ async def test_execute_code_nonexistent_language_returns_exit_127(self, tmp_path: object) -> None:
+ """execute_code with non-existent language returns exit 127 instead of crashing."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute_code("1", language="nonexistent-lang-12345")
+ assert result.exit_code == 127
+ assert "not found" in result.stderr.lower()
+
+ @pytest.mark.asyncio
+ async def test_execute_code_nonexistent_language_streams_result(self, tmp_path: object) -> None:
+ """execute_code yields an ExecutionResult even for non-existent languages."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ chunks: list = []
+ async for chunk in sandbox.execute_code_streaming("1", language="nonexistent-lang-xyz"):
+ chunks.append(chunk)
+ assert len(chunks) == 1
+ assert isinstance(chunks[0], ExecutionResult)
+ assert chunks[0].exit_code == 127
+
+
+class TestHostSandboxBinaryIO:
+ """Tests for binary file I/O (bytes-native read/write)."""
+
+ @pytest.mark.asyncio
+ async def test_write_and_read_binary(self, tmp_path: object) -> None:
+ """Binary content (e.g., PNG header) round-trips correctly."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ binary_content = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR"
+ await sandbox.write_file("image.png", binary_content)
+ read_back = await sandbox.read_file("image.png")
+ assert read_back == binary_content
+
+ @pytest.mark.asyncio
+ async def test_read_text_convenience(self, tmp_path: object) -> None:
+ """read_text decodes bytes to string."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("test.txt", b"hello world")
+ text = await sandbox.read_text("test.txt")
+ assert text == "hello world"
+ assert isinstance(text, str)
+
+ @pytest.mark.asyncio
+ async def test_write_text_convenience(self, tmp_path: object) -> None:
+ """write_text encodes string to bytes."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_text("test.txt", "hello world")
+ read_back = await sandbox.read_file("test.txt")
+ assert read_back == b"hello world"
+
+ @pytest.mark.asyncio
+ async def test_read_text_unicode_decode_error(self, tmp_path: object) -> None:
+ """read_text raises UnicodeDecodeError for binary content."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ await sandbox.write_file("binary.bin", b"\x89PNG\xff\xfe")
+ with pytest.raises(UnicodeDecodeError):
+ await sandbox.read_text("binary.bin")
+
+
+class TestHostSandboxFileInfoMetadata:
+ """Tests for structured FileInfo returns from list_files."""
+
+ @pytest.mark.asyncio
+ async def test_list_files_directories_have_is_dir_true(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ (tmp_path / "subdir").mkdir() # type: ignore[operator]
+ (tmp_path / "file.txt").write_bytes(b"content") # type: ignore[operator]
+ files = await sandbox.list_files(".")
+ dir_entry = next(f for f in files if f.name == "subdir")
+ file_entry = next(f for f in files if f.name == "file.txt")
+ assert dir_entry.is_dir is True
+ assert file_entry.is_dir is False
+
+ @pytest.mark.asyncio
+ async def test_list_files_reports_file_size(self, tmp_path: object) -> None:
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ (tmp_path / "sized.txt").write_bytes(b"x" * 42) # type: ignore[operator]
+ files = await sandbox.list_files(".")
+ entry = files[0]
+ assert entry.name == "sized.txt"
+ assert entry.size == 42
+
+
+class TestHostSandboxCwd:
+ """Test cwd parameter on HostSandbox execution methods."""
+
+ @pytest.mark.asyncio
+ async def test_execute_with_cwd_overrides_working_dir(self, tmp_path: object) -> None:
+ """execute() with cwd should run in the specified directory, not working_dir."""
+ base_dir = Path(str(tmp_path))
+ sandbox_dir = base_dir / "sandbox_default"
+ sandbox_dir.mkdir()
+ custom_dir = base_dir / "custom_cwd"
+ custom_dir.mkdir()
+
+ sandbox = HostSandbox(working_dir=str(sandbox_dir))
+ result = await sandbox.execute(
+ 'python3 -c "import os; print(os.getcwd())"',
+ cwd=str(custom_dir),
+ )
+ assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir))
+
+ @pytest.mark.asyncio
+ async def test_execute_without_cwd_uses_working_dir(self, tmp_path: object) -> None:
+ """execute() without cwd should use the sandbox's working_dir."""
+ sandbox = HostSandbox(working_dir=str(tmp_path))
+ result = await sandbox.execute(
+ 'python3 -c "import os; print(os.getcwd())"',
+ )
+ assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(tmp_path))
+
+ @pytest.mark.asyncio
+ async def test_execute_code_with_cwd(self, tmp_path: object) -> None:
+ """execute_code() with cwd should run in the specified directory."""
+ base_dir = Path(str(tmp_path))
+ sandbox_dir = base_dir / "sandbox_default"
+ sandbox_dir.mkdir()
+ custom_dir = base_dir / "code_cwd"
+ custom_dir.mkdir()
+
+ sandbox = HostSandbox(working_dir=str(sandbox_dir))
+ result = await sandbox.execute_code(
+ "import os; print(os.getcwd())",
+ language="python3",
+ cwd=str(custom_dir),
+ )
+ assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir))
+
+ @pytest.mark.asyncio
+ async def test_execute_streaming_with_cwd(self, tmp_path: object) -> None:
+ """execute_streaming() with cwd should run in the specified directory."""
+
+ base_dir = Path(str(tmp_path))
+ custom_dir = base_dir / "stream_cwd"
+ custom_dir.mkdir()
+
+ sandbox = HostSandbox(working_dir=str(base_dir))
+ chunks = []
+ async for chunk in sandbox.execute_streaming(
+ 'python3 -c "import os; print(os.getcwd())"',
+ cwd=str(custom_dir),
+ ):
+ chunks.append(chunk)
+
+ # Find the ExecutionResult
+ result = next(c for c in chunks if isinstance(c, ExecutionResult))
+ assert os.path.normpath(result.stdout.strip()) == os.path.normpath(str(custom_dir))
+
+ @pytest.mark.asyncio
+ async def test_cwd_creates_directory_if_not_exists(self, tmp_path: object) -> None:
+ """cwd directory should be created if it doesn't exist."""
+ base_dir = Path(str(tmp_path))
+ new_dir = base_dir / "brand_new_dir"
+ assert not new_dir.exists()
+
+ sandbox = HostSandbox(working_dir=str(base_dir))
+ result = await sandbox.execute("echo ok", cwd=str(new_dir))
+ assert result.exit_code == 0
+ assert new_dir.exists()
diff --git a/tests/strands/sandbox/test_noop.py b/tests/strands/sandbox/test_noop.py
new file mode 100644
index 000000000..dba7da60a
--- /dev/null
+++ b/tests/strands/sandbox/test_noop.py
@@ -0,0 +1,85 @@
+"""Tests for the NoOpSandbox implementation."""
+
+import pytest
+
+from strands.sandbox.base import Sandbox
+from strands.sandbox.noop import NoOpSandbox
+
+
+class TestNoOpSandbox:
+ def test_is_sandbox_instance(self) -> None:
+ sandbox = NoOpSandbox()
+ assert isinstance(sandbox, Sandbox)
+
+ @pytest.mark.asyncio
+ async def test_execute_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.execute("echo hello")
+
+ @pytest.mark.asyncio
+ async def test_execute_streaming_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ async for _ in sandbox.execute_streaming("echo hello"):
+ pass
+
+ @pytest.mark.asyncio
+ async def test_execute_code_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.execute_code("print(1)", language="python")
+
+ @pytest.mark.asyncio
+ async def test_execute_code_streaming_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ async for _ in sandbox.execute_code_streaming("print(1)", language="python"):
+ pass
+
+ @pytest.mark.asyncio
+ async def test_read_file_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.read_file("test.txt")
+
+ @pytest.mark.asyncio
+ async def test_write_file_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.write_file("test.txt", b"content")
+
+ @pytest.mark.asyncio
+ async def test_remove_file_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.remove_file("test.txt")
+
+ @pytest.mark.asyncio
+ async def test_list_files_raises_not_implemented(self) -> None:
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.list_files(".")
+
+ @pytest.mark.asyncio
+ async def test_read_text_raises_not_implemented(self) -> None:
+ """read_text delegates to read_file, which raises."""
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.read_text("test.txt")
+
+ @pytest.mark.asyncio
+ async def test_write_text_raises_not_implemented(self) -> None:
+ """write_text delegates to write_file, which raises."""
+ sandbox = NoOpSandbox()
+ with pytest.raises(NotImplementedError, match="Sandbox is disabled"):
+ await sandbox.write_text("test.txt", "content")
+
+ def test_agent_with_noop_sandbox(self) -> None:
+ """Agent can be constructed with NoOpSandbox."""
+ from strands.agent.agent import Agent
+
+ sandbox = NoOpSandbox()
+ agent = Agent(model="test", sandbox=sandbox)
+ assert agent.sandbox is sandbox
+ assert isinstance(agent.sandbox, NoOpSandbox)