Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 146 additions & 86 deletions src/agentready/assessors/code_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
import ast
import configparser
import logging
import os
import re
import subprocess
import tomllib

from ..models.attribute import Attribute
from ..models.finding import Citation, Finding, Remediation
from ..models.repository import Repository
from ..services.scanner import MissingToolError
from ..utils.subprocess_utils import safe_subprocess_run
from ..utils.subprocess_utils import (
safe_subprocess_run,
safe_subprocess_run_stream,
sanitize_subprocess_error,
)
from .base import BaseAssessor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -599,28 +605,30 @@ def assess(self, repository: Repository) -> Finding:
def _assess_python_complexity(self, repository: Repository) -> Finding:
"""Assess Python complexity using radon."""
try:
# Check if radon is available
# Security: Use safe_subprocess_run for validation and limits
result = safe_subprocess_run(
last_line = None
with safe_subprocess_run_stream(
["radon", "cc", str(repository.path), "-s", "-a"],
capture_output=True,
text=True,
timeout=60,
)

if result.returncode != 0:
raise MissingToolError("radon", install_command="pip install radon")
) as stream:
for line in stream:
last_line = line

# Parse radon output for average complexity
# Output format: "Average complexity: A (5.2)"
output = result.stdout
if stream.returncode != 0:
stderr_msg = sanitize_subprocess_error(
stream.stderr.strip(), repository.path
)
stdout_msg = sanitize_subprocess_error(
(last_line or "").strip(), repository.path
)
raise subprocess.CalledProcessError(
stream.returncode,
"radon",
output=stdout_msg,
stderr=stderr_msg,
)

if "Average complexity:" in output:
# Extract average value
avg_line = [
line for line in output.split("\n") if "Average complexity:" in line
][0]
avg_value = float(avg_line.split("(")[1].split(")")[0])
if last_line and last_line.startswith("Average complexity:"):
avg_value = float(last_line.split("(")[1].split(")")[0])

score = self.calculate_proportional_score(
measured_value=avg_value,
Expand Down Expand Up @@ -648,10 +656,7 @@ def _assess_python_complexity(self, repository: Repository) -> Finding:
)

except FileNotFoundError:
# radon command not found
raise MissingToolError("radon", install_command="pip install radon")
except MissingToolError:
raise # Re-raise to be caught by Scanner
except Exception as e:
return Finding.error(
self.attribute, reason=f"Complexity analysis failed: {str(e)}"
Expand All @@ -660,81 +665,77 @@ def _assess_python_complexity(self, repository: Repository) -> Finding:
def _assess_with_lizard(self, repository: Repository) -> Finding:
"""Assess complexity using lizard (multi-language)."""
try:
# Security: Use safe_subprocess_run for validation and limits
result = safe_subprocess_run(
["lizard", str(repository.path)],
capture_output=True,
text=True,
last_line = None
with safe_subprocess_run_stream(
[
"lizard",
"-i",
"-1",
"-t",
str(os.cpu_count() or 1),
str(repository.path),
],
timeout=60,
)
) as stream:
for line in stream:
last_line = line

if stream.returncode != 0:
stderr_msg = sanitize_subprocess_error(
stream.stderr.strip(), repository.path
)
stdout_msg = sanitize_subprocess_error(
(last_line or "").strip(), repository.path
)
raise subprocess.CalledProcessError(
stream.returncode,
"lizard",
output=stdout_msg,
stderr=stderr_msg,
)

if result.returncode != 0:
raise MissingToolError("lizard", install_command="pip install lizard")
try:
avg_ccn = float(last_line.split()[2])
except (AttributeError, IndexError, ValueError):
avg_ccn = None

# Parse lizard output
# This is simplified - production code should parse properly
return Finding.not_applicable(
self.attribute, reason="Lizard analysis not fully implemented"
if avg_ccn is None:
return Finding.not_applicable(
self.attribute, reason="No code to analyze with lizard"
)

score = self.calculate_proportional_score(
measured_value=avg_ccn,
threshold=10.0,
higher_is_better=False,
)
status = "pass" if score >= 75 else "fail"

return Finding(
attribute=self.attribute,
status=status,
score=score,
measured_value=f"{avg_ccn:.1f}",
threshold="<10.0",
evidence=[f"Average cyclomatic complexity (lizard): {avg_ccn:.1f}"],
remediation=(self._create_remediation() if status == "fail" else None),
error_message=None,
)

except FileNotFoundError:
# lizard command not found
raise MissingToolError("lizard", install_command="pip install lizard")
except MissingToolError:
raise
except Exception as e:
return Finding.error(
self.attribute, reason=f"Complexity analysis failed: {str(e)}"
)

def _assess_go_complexity(self, repository: Repository) -> Finding:
"""Assess Go complexity using gocyclo or golangci-lint config detection.
"""Assess Go complexity using golangci-lint config or gocyclo.

Tries gocyclo first. Falls back to checking if golangci-lint has
complexity linters (gocyclo/cyclop) enabled in config.
Checks for configured complexity linters first, then falls back
to running gocyclo directly.
"""
try:
result = safe_subprocess_run(
["gocyclo", "-avg", str(repository.path)],
capture_output=True,
text=True,
timeout=60,
)

if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split("\n")
avg_line = [line for line in lines if "Average" in line]
if avg_line:
avg_value = float(avg_line[0].split()[-1])

score = self.calculate_proportional_score(
measured_value=avg_value,
threshold=10.0,
higher_is_better=False,
)
status = "pass" if score >= 75 else "fail"

return Finding(
attribute=self.attribute,
status=status,
score=score,
measured_value=f"{avg_value:.1f}",
threshold="<10.0",
evidence=[
f"Average cyclomatic complexity (gocyclo): {avg_value:.1f}"
],
remediation=(
self._create_go_complexity_remediation()
if status == "fail"
else None
),
error_message=None,
)
except (FileNotFoundError, Exception):
pass

# Fallback: check if golangci-lint has complexity linters configured
# Search root and Go module root directories
# Check if golangci-lint has complexity linters configured
search_dirs = [repository.path] + self._find_go_module_roots(repository)
for search_dir in search_dirs:
for config_name in [
Expand Down Expand Up @@ -764,10 +765,69 @@ def _assess_go_complexity(self, repository: Repository) -> Finding:
except (OSError, UnicodeDecodeError):
continue

raise MissingToolError(
"gocyclo",
install_command="go install github.com/fzipp/gocyclo/cmd/gocyclo@latest",
)
# Fallback: run gocyclo directly
try:
last_line = None
with safe_subprocess_run_stream(
["gocyclo", "-avg", "-top", "1", str(repository.path)],
timeout=60,
) as stream:
for line in stream:
last_line = line

if stream.returncode != 0:
stderr_msg = sanitize_subprocess_error(
stream.stderr.strip(), repository.path
)
stdout_msg = sanitize_subprocess_error(
(last_line or "").strip(), repository.path
)
raise subprocess.CalledProcessError(
stream.returncode,
"gocyclo",
output=stdout_msg,
stderr=stderr_msg,
)

if last_line and last_line.startswith("Average:"):
avg_value = float(last_line.split()[-1])

score = self.calculate_proportional_score(
measured_value=avg_value,
threshold=10.0,
higher_is_better=False,
)
status = "pass" if score >= 75 else "fail"

return Finding(
attribute=self.attribute,
status=status,
score=score,
measured_value=f"{avg_value:.1f}",
threshold="<10.0",
evidence=[
f"Average cyclomatic complexity (gocyclo): {avg_value:.1f}"
],
remediation=(
self._create_go_complexity_remediation()
if status == "fail"
else None
),
error_message=None,
)
return Finding.not_applicable(
self.attribute, reason="No Go complexity data from gocyclo"
)

except FileNotFoundError:
raise MissingToolError(
"gocyclo",
install_command="go install github.com/fzipp/gocyclo/cmd/gocyclo@latest",
)
except Exception as e:
return Finding.error(
self.attribute, reason=f"Complexity analysis failed: {str(e)}"
)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def _create_go_complexity_remediation(self) -> Remediation:
"""Create remediation guidance for Go complexity."""
Expand Down
4 changes: 4 additions & 0 deletions src/agentready/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
)
from .subprocess_utils import (
SUBPROCESS_TIMEOUT,
StreamingSubprocess,
SubprocessSecurityError,
safe_subprocess_run,
safe_subprocess_run_stream,
sanitize_subprocess_error,
validate_repository_path,
)

__all__ = [
"safe_subprocess_run",
"safe_subprocess_run_stream",
"sanitize_subprocess_error",
"validate_repository_path",
"StreamingSubprocess",
"SubprocessSecurityError",
"SUBPROCESS_TIMEOUT",
"sanitize_path",
Expand Down
Loading