diff --git a/src/agentready/assessors/code_quality.py b/src/agentready/assessors/code_quality.py index 76564dc4..200a3b74 100644 --- a/src/agentready/assessors/code_quality.py +++ b/src/agentready/assessors/code_quality.py @@ -3,14 +3,20 @@ import ast import configparser import logging +import os import re +import subprocess import tomllib from ..models.attribute import Attribute from ..models.finding import Citation, Finding, Remediation from ..models.repository import Repository from ..services.scanner import MissingToolError -from ..utils.subprocess_utils import safe_subprocess_run +from ..utils.subprocess_utils import ( + safe_subprocess_run, + safe_subprocess_run_stream, + sanitize_subprocess_error, +) from .base import BaseAssessor logger = logging.getLogger(__name__) @@ -599,28 +605,30 @@ def assess(self, repository: Repository) -> Finding: def _assess_python_complexity(self, repository: Repository) -> Finding: """Assess Python complexity using radon.""" try: - # Check if radon is available - # Security: Use safe_subprocess_run for validation and limits - result = safe_subprocess_run( + last_line = None + with safe_subprocess_run_stream( ["radon", "cc", str(repository.path), "-s", "-a"], - capture_output=True, - text=True, timeout=60, - ) - - if result.returncode != 0: - raise MissingToolError("radon", install_command="pip install radon") + ) as stream: + for line in stream: + last_line = line - # Parse radon output for average complexity - # Output format: "Average complexity: A (5.2)" - output = result.stdout + if stream.returncode != 0: + stderr_msg = sanitize_subprocess_error( + stream.stderr.strip(), repository.path + ) + stdout_msg = sanitize_subprocess_error( + (last_line or "").strip(), repository.path + ) + raise subprocess.CalledProcessError( + stream.returncode, + "radon", + output=stdout_msg, + stderr=stderr_msg, + ) - if "Average complexity:" in output: - # Extract average value - avg_line = [ - line for line in output.split("\n") if "Average complexity:" in line - ][0] - avg_value = float(avg_line.split("(")[1].split(")")[0]) + if last_line and last_line.startswith("Average complexity:"): + avg_value = float(last_line.split("(")[1].split(")")[0]) score = self.calculate_proportional_score( measured_value=avg_value, @@ -648,10 +656,7 @@ def _assess_python_complexity(self, repository: Repository) -> Finding: ) except FileNotFoundError: - # radon command not found raise MissingToolError("radon", install_command="pip install radon") - except MissingToolError: - raise # Re-raise to be caught by Scanner except Exception as e: return Finding.error( self.attribute, reason=f"Complexity analysis failed: {str(e)}" @@ -660,81 +665,77 @@ def _assess_python_complexity(self, repository: Repository) -> Finding: def _assess_with_lizard(self, repository: Repository) -> Finding: """Assess complexity using lizard (multi-language).""" try: - # Security: Use safe_subprocess_run for validation and limits - result = safe_subprocess_run( - ["lizard", str(repository.path)], - capture_output=True, - text=True, + last_line = None + with safe_subprocess_run_stream( + [ + "lizard", + "-i", + "-1", + "-t", + str(os.cpu_count() or 1), + str(repository.path), + ], timeout=60, - ) + ) as stream: + for line in stream: + last_line = line + + if stream.returncode != 0: + stderr_msg = sanitize_subprocess_error( + stream.stderr.strip(), repository.path + ) + stdout_msg = sanitize_subprocess_error( + (last_line or "").strip(), repository.path + ) + raise subprocess.CalledProcessError( + stream.returncode, + "lizard", + output=stdout_msg, + stderr=stderr_msg, + ) - if result.returncode != 0: - raise MissingToolError("lizard", install_command="pip install lizard") + try: + avg_ccn = float(last_line.split()[2]) + except (AttributeError, IndexError, ValueError): + avg_ccn = None - # Parse lizard output - # This is simplified - production code should parse properly - return Finding.not_applicable( - self.attribute, reason="Lizard analysis not fully implemented" + if avg_ccn is None: + return Finding.not_applicable( + self.attribute, reason="No code to analyze with lizard" + ) + + score = self.calculate_proportional_score( + measured_value=avg_ccn, + threshold=10.0, + higher_is_better=False, + ) + status = "pass" if score >= 75 else "fail" + + return Finding( + attribute=self.attribute, + status=status, + score=score, + measured_value=f"{avg_ccn:.1f}", + threshold="<10.0", + evidence=[f"Average cyclomatic complexity (lizard): {avg_ccn:.1f}"], + remediation=(self._create_remediation() if status == "fail" else None), + error_message=None, ) except FileNotFoundError: - # lizard command not found raise MissingToolError("lizard", install_command="pip install lizard") - except MissingToolError: - raise except Exception as e: return Finding.error( self.attribute, reason=f"Complexity analysis failed: {str(e)}" ) def _assess_go_complexity(self, repository: Repository) -> Finding: - """Assess Go complexity using gocyclo or golangci-lint config detection. + """Assess Go complexity using golangci-lint config or gocyclo. - Tries gocyclo first. Falls back to checking if golangci-lint has - complexity linters (gocyclo/cyclop) enabled in config. + Checks for configured complexity linters first, then falls back + to running gocyclo directly. """ - try: - result = safe_subprocess_run( - ["gocyclo", "-avg", str(repository.path)], - capture_output=True, - text=True, - timeout=60, - ) - - if result.returncode == 0 and result.stdout.strip(): - lines = result.stdout.strip().split("\n") - avg_line = [line for line in lines if "Average" in line] - if avg_line: - avg_value = float(avg_line[0].split()[-1]) - - score = self.calculate_proportional_score( - measured_value=avg_value, - threshold=10.0, - higher_is_better=False, - ) - status = "pass" if score >= 75 else "fail" - - return Finding( - attribute=self.attribute, - status=status, - score=score, - measured_value=f"{avg_value:.1f}", - threshold="<10.0", - evidence=[ - f"Average cyclomatic complexity (gocyclo): {avg_value:.1f}" - ], - remediation=( - self._create_go_complexity_remediation() - if status == "fail" - else None - ), - error_message=None, - ) - except (FileNotFoundError, Exception): - pass - - # Fallback: check if golangci-lint has complexity linters configured - # Search root and Go module root directories + # Check if golangci-lint has complexity linters configured search_dirs = [repository.path] + self._find_go_module_roots(repository) for search_dir in search_dirs: for config_name in [ @@ -764,10 +765,69 @@ def _assess_go_complexity(self, repository: Repository) -> Finding: except (OSError, UnicodeDecodeError): continue - raise MissingToolError( - "gocyclo", - install_command="go install github.com/fzipp/gocyclo/cmd/gocyclo@latest", - ) + # Fallback: run gocyclo directly + try: + last_line = None + with safe_subprocess_run_stream( + ["gocyclo", "-avg", "-top", "1", str(repository.path)], + timeout=60, + ) as stream: + for line in stream: + last_line = line + + if stream.returncode != 0: + stderr_msg = sanitize_subprocess_error( + stream.stderr.strip(), repository.path + ) + stdout_msg = sanitize_subprocess_error( + (last_line or "").strip(), repository.path + ) + raise subprocess.CalledProcessError( + stream.returncode, + "gocyclo", + output=stdout_msg, + stderr=stderr_msg, + ) + + if last_line and last_line.startswith("Average:"): + avg_value = float(last_line.split()[-1]) + + score = self.calculate_proportional_score( + measured_value=avg_value, + threshold=10.0, + higher_is_better=False, + ) + status = "pass" if score >= 75 else "fail" + + return Finding( + attribute=self.attribute, + status=status, + score=score, + measured_value=f"{avg_value:.1f}", + threshold="<10.0", + evidence=[ + f"Average cyclomatic complexity (gocyclo): {avg_value:.1f}" + ], + remediation=( + self._create_go_complexity_remediation() + if status == "fail" + else None + ), + error_message=None, + ) + return Finding.not_applicable( + self.attribute, reason="No Go complexity data from gocyclo" + ) + + except FileNotFoundError: + raise MissingToolError( + "gocyclo", + install_command="go install github.com/fzipp/gocyclo/cmd/gocyclo@latest", + ) + except Exception as e: + return Finding.error( + self.attribute, reason=f"Complexity analysis failed: {str(e)}" + ) def _create_go_complexity_remediation(self) -> Remediation: """Create remediation guidance for Go complexity.""" diff --git a/src/agentready/utils/__init__.py b/src/agentready/utils/__init__.py index 4f8d0c74..b039255a 100644 --- a/src/agentready/utils/__init__.py +++ b/src/agentready/utils/__init__.py @@ -9,16 +9,20 @@ ) from .subprocess_utils import ( SUBPROCESS_TIMEOUT, + StreamingSubprocess, SubprocessSecurityError, safe_subprocess_run, + safe_subprocess_run_stream, sanitize_subprocess_error, validate_repository_path, ) __all__ = [ "safe_subprocess_run", + "safe_subprocess_run_stream", "sanitize_subprocess_error", "validate_repository_path", + "StreamingSubprocess", "SubprocessSecurityError", "SUBPROCESS_TIMEOUT", "sanitize_path", diff --git a/src/agentready/utils/subprocess_utils.py b/src/agentready/utils/subprocess_utils.py index a7e99c27..ceaf3d26 100644 --- a/src/agentready/utils/subprocess_utils.py +++ b/src/agentready/utils/subprocess_utils.py @@ -10,6 +10,8 @@ import getpass import logging import subprocess +import threading +import time from pathlib import Path from typing import Any @@ -61,19 +63,21 @@ def validate_repository_path(path: Path) -> Path: return resolved -def sanitize_subprocess_error(error: Exception, repo_path: Path | None = None) -> str: +def sanitize_subprocess_error( + error: Exception | str, repo_path: Path | None = None +) -> str: """Sanitize error message to prevent information leakage. Security: Redacts absolute paths, usernames, and sensitive data. Args: - error: Exception to sanitize + error: Exception or string to sanitize repo_path: Optional repository path to redact Returns: Sanitized error message """ - msg = str(error) + msg = error if isinstance(error, str) else str(error) # Redact absolute paths if repo_path: @@ -100,6 +104,230 @@ def sanitize_subprocess_error(error: Exception, repo_path: Path | None = None) - return msg +class StreamingSubprocess: + """Iterator/context-manager wrapper around Popen that yields stdout lines. + + Provides the same security guardrails as safe_subprocess_run but streams + output incrementally instead of buffering. stderr is accumulated in a + background thread and available via the .stderr property after iteration. + + Usage:: + + with safe_subprocess_run_stream(["cmd", "arg"], timeout=60) as stream: + for line in stream: + print(line, end="") + print(stream.returncode) + print(stream.stderr) + """ + + def __init__( + self, + process: subprocess.Popen, + timeout: int, + max_output_size: int, + cwd: Path | None, + ): + self._process = process + self._deadline = time.monotonic() + timeout + self._timeout = timeout + self._max_output_size = max_output_size + self._cwd = cwd + self._stderr_bytes_read = 0 + self._stderr_chunks: list[str] = [] + self._stderr_error: SubprocessSecurityError | None = None + self._closed = False + self._timed_out = False + + self._stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) + self._stderr_thread.start() + + self._watchdog_thread = threading.Thread(target=self._watchdog, daemon=True) + self._watchdog_thread.start() + + @property + def returncode(self) -> int | None: + return self._process.returncode + + @property + def stderr(self) -> str: + if self._stderr_thread.is_alive(): + self._stderr_thread.join(timeout=2) + return "".join(self._stderr_chunks) + + def __iter__(self): + return self + + def __next__(self) -> str: + if self._closed or self._process.stdout is None: + raise StopIteration + + if self._timed_out: + self._cleanup() + raise subprocess.TimeoutExpired(self._process.args, self._timeout) + + if self._stderr_error: + self._cleanup() + raise self._stderr_error + + line = self._process.stdout.readline() + + if self._timed_out: + self._cleanup() + raise subprocess.TimeoutExpired(self._process.args, self._timeout) + + if not line: + self._finalize() + if self._stderr_error: + raise self._stderr_error + raise StopIteration + + if self._stderr_error: + self._cleanup() + raise self._stderr_error + + return line + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + + def close(self): + if self._closed: + return + self._closed = True + self._cleanup() + + def _read_stderr(self): + try: + for line in self._process.stderr: + self._stderr_bytes_read += len(line.encode("utf-8")) + if self._stderr_bytes_read > self._max_output_size: + self._stderr_error = SubprocessSecurityError( + f"Subprocess stderr too large: " + f"{self._stderr_bytes_read} bytes " + f"(max: {self._max_output_size})" + ) + break + self._stderr_chunks.append(line) + except (OSError, ValueError): + pass + + def _watchdog(self): + remaining = self._deadline - time.monotonic() + if remaining > 0: + try: + self._process.wait(timeout=remaining) + return + except subprocess.TimeoutExpired: + pass + if self._process.poll() is None: + self._timed_out = True + sanitized = sanitize_subprocess_error( + subprocess.TimeoutExpired(self._process.args, self._timeout), + self._cwd, + ) + logger.error(f"Subprocess timeout ({self._timeout}s): {sanitized}") + self._terminate() + + def _terminate(self): + if self._process.poll() is not None: + return + self._process.terminate() + try: + self._process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + + def _cleanup(self): + self._terminate() + if self._process.stdout: + try: + self._process.stdout.close() + except OSError: + pass + if self._process.stderr: + try: + self._process.stderr.close() + except OSError: + pass + self._stderr_thread.join(timeout=2) + self._watchdog_thread.join(timeout=2) + + def _finalize(self): + self._process.wait(timeout=10) + self._stderr_thread.join(timeout=5) + self.close() + + +def safe_subprocess_run_stream( + cmd: list[str], + cwd: Path | None = None, + timeout: int | None = None, + max_output_size: int | None = None, + **kwargs: Any, +) -> StreamingSubprocess: + """Run subprocess with security guardrails, streaming stdout line-by-line. + + Same security features as safe_subprocess_run, but yields output + incrementally via a StreamingSubprocess iterator instead of buffering. + + Args: + cmd: Command and arguments (list form, never shell=True) + cwd: Working directory (validated if provided) + timeout: Timeout in seconds (default: SUBPROCESS_TIMEOUT) + max_output_size: Max cumulative bytes per stream (default: MAX_OUTPUT_SIZE) + **kwargs: Additional subprocess.Popen() arguments + + Returns: + StreamingSubprocess that yields str lines from stdout. + Access .returncode and .stderr after iteration completes. + + Raises: + SubprocessSecurityError: If shell=True or validation fails + subprocess.TimeoutExpired: If command exceeds timeout during iteration + """ + if timeout is None: + timeout = kwargs.pop("timeout", SUBPROCESS_TIMEOUT) + + if kwargs.get("shell"): + raise SubprocessSecurityError("shell=True is forbidden for security") + + if cwd: + cwd_path = Path(cwd) + if (cwd_path / ".git").exists() or (cwd_path / ".git").is_file(): + try: + cwd = validate_repository_path(cwd_path) + except SubprocessSecurityError: + logger.debug(f"Repository validation skipped for: {cwd}") + + if max_output_size is None: + max_output_size = MAX_OUTPUT_SIZE + + kwargs.pop("capture_output", None) + kwargs.pop("text", None) + + logger.debug(f"Executing streaming subprocess: {' '.join(cmd)} in {cwd}") + + try: + process = subprocess.Popen( + cmd, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + **kwargs, + ) + except Exception as e: + sanitized = sanitize_subprocess_error(e, cwd) + logger.error(f"Subprocess error: {sanitized}") + raise + + return StreamingSubprocess(process, timeout, max_output_size, cwd) + + def safe_subprocess_run( cmd: list[str], cwd: Path | None = None, diff --git a/tests/unit/utils/test_subprocess_utils.py b/tests/unit/utils/test_subprocess_utils.py index 3dc39674..8922f6aa 100644 --- a/tests/unit/utils/test_subprocess_utils.py +++ b/tests/unit/utils/test_subprocess_utils.py @@ -11,6 +11,7 @@ SUBPROCESS_TIMEOUT, SubprocessSecurityError, safe_subprocess_run, + safe_subprocess_run_stream, sanitize_subprocess_error, validate_repository_path, ) @@ -398,6 +399,147 @@ def test_run_with_text_output(self): assert isinstance(result.stdout, str) +class TestSafeSubprocessRunStream: + """Test streaming subprocess execution.""" + + def test_stream_basic_output(self): + """Test basic line-by-line streaming.""" + with safe_subprocess_run_stream(["seq", "5"]) as stream: + lines = list(stream) + assert lines == ["1\n", "2\n", "3\n", "4\n", "5\n"] + assert stream.returncode == 0 + + def test_stream_returncode_success(self): + """Test returncode is 0 on success.""" + with safe_subprocess_run_stream(["true"]) as stream: + list(stream) + assert stream.returncode == 0 + + def test_stream_returncode_failure(self): + """Test returncode is non-zero on failure.""" + with safe_subprocess_run_stream(["false"]) as stream: + list(stream) + assert stream.returncode != 0 + + def test_stream_stderr_accumulation(self): + """Test stderr is accumulated separately.""" + with safe_subprocess_run_stream( + ["bash", "-c", "echo out; echo err >&2"] + ) as stream: + stdout_lines = list(stream) + assert "out\n" in stdout_lines + assert "err\n" in stream.stderr + + def test_stream_shell_forbidden(self): + """Test that shell=True is blocked.""" + with pytest.raises(SubprocessSecurityError, match="shell=True is forbidden"): + safe_subprocess_run_stream(["echo test"], shell=True) + + def test_stream_timeout(self): + """Test timeout enforcement during streaming.""" + with pytest.raises(subprocess.TimeoutExpired): + with safe_subprocess_run_stream(["sleep", "10"], timeout=1) as stream: + list(stream) + + def test_stream_stderr_size_limit(self): + """Test stderr size limit enforcement.""" + with pytest.raises(SubprocessSecurityError, match="stderr too large"): + with safe_subprocess_run_stream( + ["bash", "-c", "head -c 200 /dev/zero >&2"], + max_output_size=100, + ) as stream: + for _ in stream: + pass + + def test_stream_context_manager_cleanup(self): + """Test process is cleaned up when exiting context manager.""" + stream = safe_subprocess_run_stream(["sleep", "60"], timeout=60) + with stream: + pass + assert stream._process.poll() is not None + + def test_stream_early_close(self): + """Test explicit close during iteration.""" + with safe_subprocess_run_stream(["yes"], timeout=10) as stream: + next(stream) + stream.close() + assert stream._process.poll() is not None + + def test_stream_break_from_loop(self): + """Test breaking from for-loop cleans up process.""" + with safe_subprocess_run_stream(["yes"], timeout=10) as stream: + for line in stream: + break + assert stream._process.poll() is not None + + def test_stream_cwd_validation(self, tmp_path): + """Test cwd path validation for git repos.""" + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").mkdir() + + with safe_subprocess_run_stream(["echo", "test"], cwd=repo) as stream: + lines = list(stream) + assert lines == ["test\n"] + assert stream.returncode == 0 + + def test_stream_kwargs_passthrough(self): + """Test additional kwargs are passed to Popen.""" + with safe_subprocess_run_stream( + ["echo", "test"], + env={"TEST_VAR": "value", "PATH": "/usr/bin:/bin"}, + ) as stream: + lines = list(stream) + assert lines == ["test\n"] + + def test_stream_strips_capture_output_kwarg(self): + """Test capture_output kwarg is silently stripped.""" + with safe_subprocess_run_stream( + ["echo", "test"], capture_output=True + ) as stream: + lines = list(stream) + assert lines == ["test\n"] + + def test_stream_strips_text_kwarg(self): + """Test text kwarg is silently stripped.""" + with safe_subprocess_run_stream(["echo", "test"], text=False) as stream: + lines = list(stream) + assert lines == ["test\n"] + assert isinstance(lines[0], str) + + def test_stream_command_not_found(self): + """Test handling of command not found.""" + with pytest.raises(FileNotFoundError): + safe_subprocess_run_stream(["nonexistent-command-12345"]) + + def test_stream_default_timeout(self): + """Test default timeout constant is used.""" + with safe_subprocess_run_stream(["echo", "test"]) as stream: + list(stream) + assert stream.returncode == 0 + + def test_stream_empty_output(self): + """Test command that produces no stdout.""" + with safe_subprocess_run_stream(["true"]) as stream: + lines = list(stream) + assert lines == [] + assert stream.returncode == 0 + + def test_stream_close_idempotent(self): + """Test calling close multiple times is safe.""" + with safe_subprocess_run_stream(["echo", "test"]) as stream: + list(stream) + stream.close() + stream.close() + + def test_stream_iteration_after_close(self): + """Test iterating after close raises StopIteration.""" + stream = safe_subprocess_run_stream(["echo", "test"]) + stream.close() + with pytest.raises(StopIteration): + next(stream) + + class TestSecurityConstants: """Test security constants are properly defined."""