Skip to content

feature: add plugin smoke validation workflow#716

Merged
zouyonghe merged 15 commits intoAstrBotDevs:mainfrom
zouyonghe:main
Apr 24, 2026
Merged

feature: add plugin smoke validation workflow#716
zouyonghe merged 15 commits intoAstrBotDevs:mainfrom
zouyonghe:main

Conversation

@zouyonghe
Copy link
Copy Markdown
Member

@zouyonghe zouyonghe commented Apr 24, 2026

This change adds an MVP validation path for marketplace plugins so the repository can catch more than malformed plugins.json entries or unreachable repository URLs.

Today the collection only verifies index syntax and repository reachability. That lets broken plugins enter the market even when they are missing metadata.yaml, lack a valid entrypoint, or fail during AstrBot's real plugin loading path. The result is that users can install plugins that immediately fail to import or initialize.

The root cause is that marketplace validation stops before any plugin-structure or AstrBot-runtime checks happen. There was no reusable validator that clones a plugin repository, checks the expected files, and asks AstrBot's PluginManager.load() whether the plugin can actually load.

This PR adds that missing validation layer in two parts. First, it introduces scripts/validate_plugins/run.py, which normalizes GitHub repository URLs, selects target plugins from plugins.json, clones each repository, runs metadata and entrypoint prechecks, and then launches a subprocess worker that places the plugin into a temporary AstrBot root and executes PluginManager.load(specified_dir_name=...). Second, it adds .github/workflows/validate-plugin-smoke.yml, which runs this validator for changed plugin entries on pull requests and supports manual workflow_dispatch runs for targeted checks or limited sweeps.

The validator emits a structured JSON report with stage-specific failures such as repo_url, clone, metadata, entrypoint, load, worker, and timeout, so maintainers can tell why a plugin failed instead of only seeing a generic CI failure. The PR also includes focused unit tests for URL normalization, plugin selection, metadata validation, worker command construction, report aggregation, and worker output parsing.

Validation used for this change:

  • python3 -m unittest tests.test_validate_plugins -v
  • python3 scripts/validate_plugins/run.py --help
  • ruby -e 'require "yaml"; YAML.load_file(".github/workflows/validate-plugin-smoke.yml"); puts "workflow_ok"'

Summary by Sourcery

Introduce an automated plugin smoke validation pipeline for marketplace entries, including a CLI validator, change-detection helper, and CI workflow integration to run validations on PRs, schedules, and manual triggers.

New Features:

  • Add a standalone Python validator that clones plugin repositories, performs metadata and entrypoint checks, runs AstrBot plugin load in an isolated worker, and emits structured JSON results.
  • Introduce a helper script to detect which plugins changed in a pull request by diffing plugins.json and exposing the selection via GitHub Actions environment.
  • Create a GitHub Actions workflow to run plugin smoke validation for changed or selected plugins against a specified AstrBot ref, with configurable concurrency and limits.

Enhancements:

  • Centralize plugins.json parsing and validation logic into a reusable helper module shared by validation and change-detection scripts.

Tests:

  • Add comprehensive unit tests covering URL normalization, plugin selection, metadata parsing and validation, worker command and output handling, change detection, GitHub env writing, and overall report aggregation.

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="189-197" />
<code_context>
+    }
+
+
+def load_plugins_index(path: Path) -> dict[str, dict]:
+    data = json.loads(path.read_text(encoding="utf-8"))
+    if not isinstance(data, dict):
+        raise ValueError("plugins.json must contain a JSON object")
+    result = {}
+    for key, value in data.items():
+        if isinstance(key, str) and isinstance(value, dict):
+            result[key] = value
+    return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silently skipping non-dict plugin entries in plugins.json may hide configuration errors.

In `load_plugins_index`, non-dict values are silently ignored, so misconfigured entries (e.g., a string or list) just disappear from the index. Instead, consider raising an error or at least logging when a non-dict value is encountered, so configuration problems are visible rather than hidden.

```suggestion
def load_plugins_index(path: Path) -> dict[str, dict]:
    data = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError("plugins.json must contain a JSON object")

    result: dict[str, dict] = {}
    errors: list[str] = []

    for key, value in data.items():
        if not isinstance(key, str):
            errors.append(f"plugins.json contains non-string key {key!r}")
            continue

        if not isinstance(value, dict):
            errors.append(
                f"plugins.json entry {key!r} must be a JSON object, "
                f"got {type(value).__name__}"
            )
            continue

        result[key] = value

    if errors:
        raise ValueError(
            "Invalid entries found in plugins.json:\n" + "\n".join(errors)
        )

    return result
```
</issue_to_address>

### Comment 2
<location path="tests/test_validate_plugins.py" line_range="62-71" />
<code_context>
+        self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])
+
+
+class MetadataValidationTests(unittest.TestCase):
+    def test_reports_missing_required_metadata_fields(self):
+        module = load_validator_module()
+
+        with tempfile.TemporaryDirectory() as tmp_dir:
+            plugin_dir = Path(tmp_dir)
+            (plugin_dir / "metadata.yaml").write_text(
+                "name: demo_plugin\nauthor: AstrBot Team\n",
+                encoding="utf-8",
+            )
+            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
+
+            result = module.precheck_plugin_directory(plugin_dir)
+
+        self.assertFalse(result["ok"])
+        self.assertEqual(result["stage"], "metadata")
+        self.assertIn("desc", result["message"])
+        self.assertIn("version", result["message"])
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for `precheck_plugin_directory` success, missing `metadata.yaml`, and missing entrypoint

These tests currently only cover the partial‑metadata branch. Please also add cases for:

- Missing `metadata.yaml`: expect `ok=False`, `stage="metadata"`, and `"missing metadata.yaml"`.
- Fully valid metadata with an existing entrypoint: expect `ok=True`, `stage="precheck"`, and that `plugin_dir_name` and `metadata` are present.
- Valid metadata but no `main.py` or `<plugin_name>.py`: expect `ok=False`, `stage="entrypoint"`, with the corresponding error message.

This will exercise all `precheck_plugin_directory` branches.

Suggested implementation:

```python
        self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])



class MetadataValidationTests(unittest.TestCase):
    def test_reports_missing_required_metadata_fields(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\nauthor: AstrBot Team\n",
                encoding="utf-8",
            )
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "metadata")
        self.assertIn("desc", result["message"])
        self.assertIn("version", result["message"])

    def test_reports_missing_metadata_file(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            # Do not create metadata.yaml
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "metadata")
        self.assertIn("missing metadata.yaml", result["message"])

    def test_precheck_succeeds_with_valid_metadata_and_entrypoint(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\n"
                "author: AstrBot Team\n"
                "desc: Demo plugin.\n"
                "version: 0.1.0\n",
                encoding="utf-8",
            )
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertTrue(result["ok"])
        self.assertEqual(result["stage"], "precheck")
        self.assertIn("plugin_dir_name", result)
        self.assertIn("metadata", result)
        self.assertEqual(result["metadata"]["name"], "demo_plugin")

    def test_reports_missing_entrypoint_file(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\n"
                "author: AstrBot Team\n"
                "desc: Demo plugin.\n"
                "version: 0.1.0\n",
                encoding="utf-8",
            )
            # Intentionally do not create main.py or demo_plugin.py

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "entrypoint")
        self.assertIn("entrypoint", result["message"])



ROOT = Path(__file__).resolve().parents[1]

```

These tests assume that:
- `unittest`, `tempfile`, and `Path` are already imported in this file.
- `module.precheck_plugin_directory` returns a dict with keys: `"ok"`, `"stage"`, `"message"`, and on success `"plugin_dir_name"` and `"metadata"`.

If any of these imports are missing at the top of `tests/test_validate_plugins.py`, add:

```python
import unittest
import tempfile
from pathlib import Path
```

If the exact error message for missing metadata is different from `"missing metadata.yaml"`, adjust the assertion in `test_reports_missing_metadata_file` to match the actual message (for example, by asserting only a distinctive substring).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py Outdated
Comment on lines +62 to +71
class MetadataValidationTests(unittest.TestCase):
def test_reports_missing_required_metadata_fields(self):
module = load_validator_module()

with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\nauthor: AstrBot Team\n",
encoding="utf-8",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for precheck_plugin_directory success, missing metadata.yaml, and missing entrypoint

These tests currently only cover the partial‑metadata branch. Please also add cases for:

  • Missing metadata.yaml: expect ok=False, stage="metadata", and "missing metadata.yaml".
  • Fully valid metadata with an existing entrypoint: expect ok=True, stage="precheck", and that plugin_dir_name and metadata are present.
  • Valid metadata but no main.py or <plugin_name>.py: expect ok=False, stage="entrypoint", with the corresponding error message.

This will exercise all precheck_plugin_directory branches.

Suggested implementation:

        self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])



class MetadataValidationTests(unittest.TestCase):
    def test_reports_missing_required_metadata_fields(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\nauthor: AstrBot Team\n",
                encoding="utf-8",
            )
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "metadata")
        self.assertIn("desc", result["message"])
        self.assertIn("version", result["message"])

    def test_reports_missing_metadata_file(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            # Do not create metadata.yaml
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "metadata")
        self.assertIn("missing metadata.yaml", result["message"])

    def test_precheck_succeeds_with_valid_metadata_and_entrypoint(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\n"
                "author: AstrBot Team\n"
                "desc: Demo plugin.\n"
                "version: 0.1.0\n",
                encoding="utf-8",
            )
            (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertTrue(result["ok"])
        self.assertEqual(result["stage"], "precheck")
        self.assertIn("plugin_dir_name", result)
        self.assertIn("metadata", result)
        self.assertEqual(result["metadata"]["name"], "demo_plugin")

    def test_reports_missing_entrypoint_file(self):
        module = load_validator_module()

        with tempfile.TemporaryDirectory() as tmp_dir:
            plugin_dir = Path(tmp_dir)
            (plugin_dir / "metadata.yaml").write_text(
                "name: demo_plugin\n"
                "author: AstrBot Team\n"
                "desc: Demo plugin.\n"
                "version: 0.1.0\n",
                encoding="utf-8",
            )
            # Intentionally do not create main.py or demo_plugin.py

            result = module.precheck_plugin_directory(plugin_dir)

        self.assertFalse(result["ok"])
        self.assertEqual(result["stage"], "entrypoint")
        self.assertIn("entrypoint", result["message"])



ROOT = Path(__file__).resolve().parents[1]

These tests assume that:

  • unittest, tempfile, and Path are already imported in this file.
  • module.precheck_plugin_directory returns a dict with keys: "ok", "stage", "message", and on success "plugin_dir_name" and "metadata".

If any of these imports are missing at the top of tests/test_validate_plugins.py, add:

import unittest
import tempfile
from pathlib import Path

If the exact error message for missing metadata is different from "missing metadata.yaml", adjust the assertion in test_reports_missing_metadata_file to match the actual message (for example, by asserting only a distinctive substring).

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive plugin validation system for AstrBot, including a validation script and unit tests. The script manages repository cloning, metadata verification, and plugin loading within a worker process. Review feedback highlights several areas for improvement: enhancing the simple YAML parser to handle trailing comments, ensuring metadata validation supports non-string types like numeric versions, tightening name sanitization to prevent potential path traversal, and isolating plugin output to prevent interference with the worker's JSON communication.

Comment thread scripts/validate_plugins/run.py Outdated
Comment on lines +95 to +101
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or ":" not in line:
continue
key, value = line.split(":", 1)
result[key.strip()] = value.strip().strip("\"'")
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The simple YAML parser fallback does not handle trailing comments (e.g., key: value # comment), which will cause the comment text to be included in the parsed value. This can lead to validation failures when PyYAML is not installed in the environment.

Suggested change
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or ":" not in line:
continue
key, value = line.split(":", 1)
result[key.strip()] = value.strip().strip("\"'")
return result
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.split("#", 1)[0].strip()
if not line or ":" not in line:
continue
key, value = line.split(":", 1)
result[key.strip()] = value.strip().strip("\"'")
return result

Comment on lines +123 to +127
missing = [
field
for field in REQUIRED_METADATA_FIELDS
if not isinstance(metadata.get(field), str) or not metadata[field].strip()
]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type check isinstance(metadata.get(field), str) will fail if the YAML parser infers a non-string type for a field, such as a float or integer for the version number (e.g., version: 1.0). It is safer to check for existence and then validate the string representation of the value.

Suggested change
missing = [
field
for field in REQUIRED_METADATA_FIELDS
if not isinstance(metadata.get(field), str) or not metadata[field].strip()
]
missing = [
field
for field in REQUIRED_METADATA_FIELDS
if metadata.get(field) is None or not str(metadata[field]).strip()
]



def sanitize_name(name: str) -> str:
sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Allowing the . character in the sanitized name could potentially allow directory traversal if a plugin name is exactly ... While the risk is mitigated by replacing / with -, it is safer to restrict the allowed characters to alphanumeric, underscores, and hyphens.

Suggested change
sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-")
sanitized = re.sub(r"[^A-Za-z0-9_-]+", "-", name).strip("-")

Comment on lines +483 to +485
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the plugin or AstrBot prints to stdout during the load check, it will pollute the worker's output and cause JSON parsing to fail in the parent process. Redirecting sys.stdout to sys.stderr during the asyncio.run call ensures that only the final JSON result is sent to stdout.

        _stdout = sys.stdout
        sys.stdout = sys.stderr
        try:
            result = asyncio.run(
                run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
            )
        finally:
            sys.stdout = _stdout

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 7 issues, and left some high level feedback:

  • In run_worker, you call build_worker_sys_path and then insert the entries into sys.path in reversed order, which means astrbot_path is prepended before astrbot_root even though build_worker_sys_path (and its test name) imply the opposite precedence; consider dropping the reversed() so the root override is searched before the codebase.
  • The workflow step that detects changed plugins assumes plugins.json exists on the base ref and will cause git show origin/$base_ref:plugins.json to fail hard if it doesn't; you might want to guard this with a file-existence check or fall back to validating all plugins in that case.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `run_worker`, you call `build_worker_sys_path` and then insert the entries into `sys.path` in reversed order, which means `astrbot_path` is prepended before `astrbot_root` even though `build_worker_sys_path` (and its test name) imply the opposite precedence; consider dropping the `reversed()` so the root override is searched before the codebase.
- The workflow step that detects changed plugins assumes `plugins.json` exists on the base ref and will cause `git show origin/$base_ref:plugins.json` to fail hard if it doesn't; you might want to guard this with a file-existence check or fall back to validating all plugins in that case.

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="295" />
<code_context>
+            message=str(exc),
+        )
+
+    plugin_clone_dir = work_dir / sanitize_name(plugin)
+    try:
+        clone_plugin_repo(normalized_repo_url, plugin_clone_dir)
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential directory collision when different plugin keys sanitize to the same name.

Using only `sanitize_name(plugin)` for the directory can cause collisions (e.g. `"foo bar"` and `"foo/bar"` both mapping to `"foo_bar"`), leading to clones overwriting each other or mixing files. Consider making the directory name unique by incorporating the original plugin key and/or a short, stable hash, e.g. `sanitize_name(f"{plugin}-{hash(plugin)}")` or similar.
</issue_to_address>

### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="219-224" />
<code_context>
+    return sanitized or "plugin"
+
+
+def clone_plugin_repo(repo_url: str, destination: Path) -> None:
+    subprocess.run(
+        ["git", "clone", "--depth", "1", repo_url, str(destination)],
+        check=True,
+        capture_output=True,
+        text=True,
+    )
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Git clone has no timeout, which can hang the whole validation run.

Since this `subprocess.run` call has no timeout, a stalled or very slow `git clone` can block validation indefinitely. Please add a reasonable timeout (e.g., aligned with or derived from `load_timeout`, or a configurable value) to make the pipeline more resilient to flaky network or Git issues.

Suggested implementation:

```python
def clone_plugin_repo(
    repo_url: str,
    destination: Path,
    timeout: float | None = None,
) -> None:
    subprocess.run(
        ["git", "clone", "--depth", "1", repo_url, str(destination)],
        check=True,
        capture_output=True,
        text=True,
        timeout=timeout,
    )

```

1. Identify the place(s) where `clone_plugin_repo` is called in this file and pass an appropriate `timeout` value (for example, a value derived from an existing `load_timeout` or other configuration).
2. If there is a central configuration for timeouts in the validation pipeline, add a dedicated setting for the git-clone timeout and thread it through to these call sites.
</issue_to_address>

### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="18-27" />
<code_context>
+        normalized_repo_url=normalized_repo_url,
+    )
+
+    try:
+        completed = subprocess.run(
+            command,
+            check=False,
+            capture_output=True,
+            text=True,
+            timeout=load_timeout,
+        )
+    except subprocess.TimeoutExpired:
+        return build_result(
+            plugin=plugin,
</code_context>
<issue_to_address>
**suggestion:** Timeout handling for the worker doesn’t surface partial stdout/stderr, which can hinder debugging.

On timeout, this path drops any stdout/stderr the worker produced, even though those logs may contain useful diagnostics. Consider catching `subprocess.TimeoutExpired` as `exc` and, when available, adding `exc.stdout`/`exc.stderr` to `details` so timeouts are easier to debug.
</issue_to_address>

### Comment 4
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="51-59" />
<code_context>
+          import subprocess
+          from pathlib import Path
+
+          base_ref = os.environ["GITHUB_BASE_REF"]
+          subprocess.run(["git", "fetch", "origin", base_ref, "--depth", "1"], check=True)
+          base = json.loads(
+              subprocess.check_output(
+                  ["git", "show", f"origin/{base_ref}:plugins.json"],
+                  text=True,
+              )
+          )
+          head = json.loads(Path("plugins.json").read_text(encoding="utf-8"))
+
+          changed = [
</code_context>
<issue_to_address>
**issue (bug_risk):** Diff detection assumes plugins.json exists on the base branch and is valid JSON, which can break on first-add or malformed cases.

This step always runs `git show origin/{base_ref}:plugins.json` and `json.loads` on both base and head. If `plugins.json` doesn’t exist on the base branch, `git show` will error, and any malformed JSON in either file will crash the job before validation. Consider wrapping the `git show` and JSON parsing in try/except, treating a missing/invalid base as `{}` and an invalid head as `SHOULD_VALIDATE=false`, so the workflow remains robust when `plugins.json` is new or temporarily broken.
</issue_to_address>

### Comment 5
<location path="tests/test_validate_plugins.py" line_range="26" />
<code_context>
+    return module
+
+
+class NormalizeRepoUrlTests(unittest.TestCase):
+    def test_strips_git_suffix_trailing_slash_and_query(self):
+        module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add more edge-case coverage for `normalize_repo_url` error handling.

Current `NormalizeRepoUrlTests` only cover a happy-path GitHub URL and a non-GitHub host. Please add tests that assert:

- Non-HTTP schemes (e.g. `git://github.com/...`, `ssh://github.com/...`) raise `ValueError`.
- Missing owner and/or repo in the path (e.g. `https://github.com/`, `https://github.com/owner`, `https://github.com/owner/`) trigger the "repo URL must include owner and repository" branch.
- Empty segments or redundant slashes (e.g. `https://github.com//repo`, `https://github.com/owner//`).
- Leading/trailing whitespace is stripped as expected.

This will exercise all branches of `normalize_repo_url` and better guard against malformed URLs.
</issue_to_address>

### Comment 6
<location path="tests/test_validate_plugins.py" line_range="44" />
<code_context>
+            module.normalize_repo_url("https://gitlab.com/example/demo-plugin")
+
+
+class SelectPluginsTests(unittest.TestCase):
+    def test_returns_all_plugins_when_limit_is_none(self):
+        module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Extend `select_plugins` tests to cover limits and missing plugin names.

Please add tests for:

1. `limit` handling when `requested_names` is `None` and `limit` is smaller than the total plugins (e.g., 3 plugins, `limit=1`), asserting only the first N are returned in order.
2. The case where `requested_names` includes a name not in the index, asserting a `KeyError("plugin not found: ...")` is raised.

These will fully specify the selection behavior driven by CLI arguments.

Suggested implementation:

```python
import unittest
from collections import OrderedDict

```

```python
class SelectPluginsTests(unittest.TestCase):
    def test_returns_all_plugins_when_limit_is_none(self):
        module = load_validator_module()

        plugin_index = OrderedDict(
            [
                ("plugin-one", object()),
                ("plugin-two", object()),
                ("plugin-three", object()),
            ]
        )

        selected = list(
            module.select_plugins(
                plugin_index,
                requested_names=None,
                limit=None,
            )
        )

        # When limit is None and no names are requested, all plugins are returned in index order.
        self.assertEqual(
            selected,
            list(plugin_index.values()),
        )

    def test_respects_limit_when_requested_names_is_none(self):
        module = load_validator_module()

        plugin_index = OrderedDict(
            [
                ("plugin-one", object()),
                ("plugin-two", object()),
                ("plugin-three", object()),
            ]
        )

        selected = list(
            module.select_plugins(
                plugin_index,
                requested_names=None,
                limit=1,
            )
        )

        # When limit is smaller than the total number of plugins, only the first N are returned in order.
        self.assertEqual(
            selected,
            list(plugin_index.values())[:1],
        )

    def test_raises_key_error_for_unknown_requested_plugin(self):
        module = load_validator_module()

        plugin_index = {
            "known-plugin": object(),
        }

        with self.assertRaisesRegex(KeyError, r"plugin not found: missing-plugin"):
            list(
                module.select_plugins(
                    plugin_index,
                    requested_names=["known-plugin", "missing-plugin"],
                    limit=None,
                )
            )

```

These tests assume:
1. `select_plugins` takes parameters `(plugin_index, requested_names=None, limit=None)`.
2. `select_plugins` returns an iterable of plugin objects (i.e., `plugin_index` values) in the index's iteration order.
3. When a requested plugin name is missing, `select_plugins` raises `KeyError` with a message containing `plugin not found: <name>`.

If `select_plugins` instead yields `(name, plugin)` tuples or plugin names, update the `selected` expectations accordingly:
- For `(name, plugin)` tuples, compare `[name for name, _ in selected]` to `list(plugin_index.keys())` (and slice for the limit case).
- For plugin names, construct `plugin_index` keys as the expected list and compare `selected` directly to those keys.
</issue_to_address>

### Comment 7
<location path="tests/test_validate_plugins.py" line_range="148" />
<code_context>
+        )
+
+
+class ReportBuilderTests(unittest.TestCase):
+    def test_build_report_counts_passed_and_failed_results(self):
+        module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding unit tests for other pure helpers like `combine_requested_names` and `sanitize_name`.

Targeted tests for these helpers would increase confidence in the CLI-to-filesystem mapping logic:

- `combine_requested_names`: ensure it merges `plugin_names` and `plugin_name_list`, trims whitespace, and ignores empty values.
- `sanitize_name`: ensure it replaces invalid chars with `-`, trims leading/trailing `-`, and returns the fallback `"plugin"` for inputs that are all invalid or whitespace.

Since these functions are pure, they should be straightforward and fast to test.

Suggested implementation:

```python
            astrbot_root=Path("/tmp/astrbot-root"),
            astrbot_path=Path("/tmp/AstrBot"),
        )

        self.assertEqual(
            [Path(item) for item in sys_path_entries],
            [Path("/tmp/astrbot-root").resolve(), Path("/tmp/AstrBot").resolve()],
        )




        self.assertEqual(
            module.normalize_repo_url(
                "https://github.com/example/demo-plugin.git/?tab=readme-ov-file"
            ),
            "https://github.com/example/demo-plugin",
        )

    def test_rejects_non_github_urls(self):
        module = load_validator_module()


class HelperFunctionTests(unittest.TestCase):
    def test_combine_requested_names_merges_and_trims(self):
        module = load_validator_module()

        combined = module.combine_requested_names(
            plugin_names=["foo", "  bar  ", "", "baz"],
            plugin_name_list=["qux", "   ", "foo "],
        )

        # Expect whitespace to be trimmed and empty/whitespace-only entries dropped,
        # preserving order across both inputs.
        self.assertEqual(combined, ["foo", "bar", "baz", "qux", "foo"])

    def test_combine_requested_names_handles_none_inputs(self):
        module = load_validator_module()

        combined = module.combine_requested_names(
            plugin_names=None,
            plugin_name_list=None,
        )

        # With no names provided, we should get an empty list (not None).
        self.assertEqual(combined, [])

    def test_sanitize_name_replaces_invalid_chars_and_trims_hyphens(self):
        module = load_validator_module()

        sanitized = module.sanitize_name("  -invalid name!*?-  ")

        # - spaces and invalid chars should become '-'
        # - leading/trailing '-' should be trimmed
        # The expected result here assumes the implementation normalizes to:
        self.assertEqual(sanitized, "invalid-name")

    def test_sanitize_name_valid_name_unchanged(self):
        module = load_validator_module()

        self.assertEqual(module.sanitize_name("valid-name_123"), "valid-name_123")

    def test_sanitize_name_all_invalid_or_whitespace_falls_back_to_plugin(self):
        module = load_validator_module()

        self.assertEqual(module.sanitize_name("   "), "plugin")
        self.assertEqual(module.sanitize_name("!!!"), "plugin")
        self.assertEqual(module.sanitize_name("---"), "plugin")

```

The new tests assume the following behavior in the implementation:

1. `combine_requested_names(plugin_names, plugin_name_list)`:
   - Accepts `None` for either argument and treats it as empty.
   - Strips leading/trailing whitespace from each name.
   - Ignores empty or whitespace-only names.
   - Returns a `list[str]` in the order: all (cleaned) `plugin_names` followed by all (cleaned) `plugin_name_list`.

2. `sanitize_name(name)`:
   - Replaces characters that are not allowed in filenames/plugin identifiers with `'-'`.
   - Trims leading and trailing `'-'` after replacement.
   - Returns `"plugin"` if the resulting name is empty or consists entirely of invalid characters/whitespace.

If the actual implementations differ (e.g., they deduplicate names, use a different allowed-character set, or a different fallback than `"plugin"`), update the expected values in these tests to match the real behavior (or adjust the implementation to meet the behavior described in your original review comment).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py Outdated
Comment thread scripts/validate_plugins/run.py Outdated
Comment thread scripts/validate_plugins/run.py
Comment thread .github/workflows/validate-plugin-smoke.yml Outdated
return module


class NormalizeRepoUrlTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add more edge-case coverage for normalize_repo_url error handling.

Current NormalizeRepoUrlTests only cover a happy-path GitHub URL and a non-GitHub host. Please add tests that assert:

  • Non-HTTP schemes (e.g. git://github.com/..., ssh://github.com/...) raise ValueError.
  • Missing owner and/or repo in the path (e.g. https://github.com/, https://github.com/owner, https://github.com/owner/) trigger the "repo URL must include owner and repository" branch.
  • Empty segments or redundant slashes (e.g. https://github.com//repo, https://github.com/owner//).
  • Leading/trailing whitespace is stripped as expected.

This will exercise all branches of normalize_repo_url and better guard against malformed URLs.

module.normalize_repo_url("https://gitlab.com/example/demo-plugin")


class SelectPluginsTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Extend select_plugins tests to cover limits and missing plugin names.

Please add tests for:

  1. limit handling when requested_names is None and limit is smaller than the total plugins (e.g., 3 plugins, limit=1), asserting only the first N are returned in order.
  2. The case where requested_names includes a name not in the index, asserting a KeyError("plugin not found: ...") is raised.

These will fully specify the selection behavior driven by CLI arguments.

Suggested implementation:

import unittest
from collections import OrderedDict
class SelectPluginsTests(unittest.TestCase):
    def test_returns_all_plugins_when_limit_is_none(self):
        module = load_validator_module()

        plugin_index = OrderedDict(
            [
                ("plugin-one", object()),
                ("plugin-two", object()),
                ("plugin-three", object()),
            ]
        )

        selected = list(
            module.select_plugins(
                plugin_index,
                requested_names=None,
                limit=None,
            )
        )

        # When limit is None and no names are requested, all plugins are returned in index order.
        self.assertEqual(
            selected,
            list(plugin_index.values()),
        )

    def test_respects_limit_when_requested_names_is_none(self):
        module = load_validator_module()

        plugin_index = OrderedDict(
            [
                ("plugin-one", object()),
                ("plugin-two", object()),
                ("plugin-three", object()),
            ]
        )

        selected = list(
            module.select_plugins(
                plugin_index,
                requested_names=None,
                limit=1,
            )
        )

        # When limit is smaller than the total number of plugins, only the first N are returned in order.
        self.assertEqual(
            selected,
            list(plugin_index.values())[:1],
        )

    def test_raises_key_error_for_unknown_requested_plugin(self):
        module = load_validator_module()

        plugin_index = {
            "known-plugin": object(),
        }

        with self.assertRaisesRegex(KeyError, r"plugin not found: missing-plugin"):
            list(
                module.select_plugins(
                    plugin_index,
                    requested_names=["known-plugin", "missing-plugin"],
                    limit=None,
                )
            )

These tests assume:

  1. select_plugins takes parameters (plugin_index, requested_names=None, limit=None).
  2. select_plugins returns an iterable of plugin objects (i.e., plugin_index values) in the index's iteration order.
  3. When a requested plugin name is missing, select_plugins raises KeyError with a message containing plugin not found: <name>.

If select_plugins instead yields (name, plugin) tuples or plugin names, update the selected expectations accordingly:

  • For (name, plugin) tuples, compare [name for name, _ in selected] to list(plugin_index.keys()) (and slice for the limit case).
  • For plugin names, construct plugin_index keys as the expected list and compare selected directly to those keys.

)


class ReportBuilderTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding unit tests for other pure helpers like combine_requested_names and sanitize_name.

Targeted tests for these helpers would increase confidence in the CLI-to-filesystem mapping logic:

  • combine_requested_names: ensure it merges plugin_names and plugin_name_list, trims whitespace, and ignores empty values.
  • sanitize_name: ensure it replaces invalid chars with -, trims leading/trailing -, and returns the fallback "plugin" for inputs that are all invalid or whitespace.

Since these functions are pure, they should be straightforward and fast to test.

Suggested implementation:

            astrbot_root=Path("/tmp/astrbot-root"),
            astrbot_path=Path("/tmp/AstrBot"),
        )

        self.assertEqual(
            [Path(item) for item in sys_path_entries],
            [Path("/tmp/astrbot-root").resolve(), Path("/tmp/AstrBot").resolve()],
        )




        self.assertEqual(
            module.normalize_repo_url(
                "https://github.com/example/demo-plugin.git/?tab=readme-ov-file"
            ),
            "https://github.com/example/demo-plugin",
        )

    def test_rejects_non_github_urls(self):
        module = load_validator_module()


class HelperFunctionTests(unittest.TestCase):
    def test_combine_requested_names_merges_and_trims(self):
        module = load_validator_module()

        combined = module.combine_requested_names(
            plugin_names=["foo", "  bar  ", "", "baz"],
            plugin_name_list=["qux", "   ", "foo "],
        )

        # Expect whitespace to be trimmed and empty/whitespace-only entries dropped,
        # preserving order across both inputs.
        self.assertEqual(combined, ["foo", "bar", "baz", "qux", "foo"])

    def test_combine_requested_names_handles_none_inputs(self):
        module = load_validator_module()

        combined = module.combine_requested_names(
            plugin_names=None,
            plugin_name_list=None,
        )

        # With no names provided, we should get an empty list (not None).
        self.assertEqual(combined, [])

    def test_sanitize_name_replaces_invalid_chars_and_trims_hyphens(self):
        module = load_validator_module()

        sanitized = module.sanitize_name("  -invalid name!*?-  ")

        # - spaces and invalid chars should become '-'
        # - leading/trailing '-' should be trimmed
        # The expected result here assumes the implementation normalizes to:
        self.assertEqual(sanitized, "invalid-name")

    def test_sanitize_name_valid_name_unchanged(self):
        module = load_validator_module()

        self.assertEqual(module.sanitize_name("valid-name_123"), "valid-name_123")

    def test_sanitize_name_all_invalid_or_whitespace_falls_back_to_plugin(self):
        module = load_validator_module()

        self.assertEqual(module.sanitize_name("   "), "plugin")
        self.assertEqual(module.sanitize_name("!!!"), "plugin")
        self.assertEqual(module.sanitize_name("---"), "plugin")

The new tests assume the following behavior in the implementation:

  1. combine_requested_names(plugin_names, plugin_name_list):

    • Accepts None for either argument and treats it as empty.
    • Strips leading/trailing whitespace from each name.
    • Ignores empty or whitespace-only names.
    • Returns a list[str] in the order: all (cleaned) plugin_names followed by all (cleaned) plugin_name_list.
  2. sanitize_name(name):

    • Replaces characters that are not allowed in filenames/plugin identifiers with '-'.
    • Trims leading and trailing '-' after replacement.
    • Returns "plugin" if the resulting name is empty or consists entirely of invalid characters/whitespace.

If the actual implementations differ (e.g., they deduplicate names, use a different allowed-character set, or a different fallback than "plugin"), update the expected values in these tests to match the real behavior (or adjust the implementation to meet the behavior described in your original review comment).

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="tests/test_validate_plugins.py" line_range="154" />
<code_context>
+            )
+
+
+class HelperFunctionTests(unittest.TestCase):
+    def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
+        module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for YAML loading and plugins index parsing to ensure robustness of input handling

These tests omit `load_metadata` / `_parse_simple_yaml` and `load_plugins_index`, which are key for marketplace input handling. Please also add tests that:
- Cover `_parse_simple_yaml` with comments, quoted values, and extra whitespace when `yaml` is unavailable;
- Cover `load_metadata` with both code paths: `yaml` available (mock `yaml.safe_load`) and `yaml` is `None` (fallback parser);
- Cover `load_plugins_index` with: (a) a valid object, (b) a JSON array that should raise `ValueError`, and (c) entries with non-dict values or non-string keys that should be filtered out.
This will better protect against regressions in index/metadata formats.

Suggested implementation:

```python
class HelperFunctionTests(unittest.TestCase):
    def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
        module = load_validator_module()
        # This should exercise the existing helper that merges requested names from
        # multiple sources, trims whitespace, and drops empty values.
        combined = module.combine_requested_names(
            cli_names=[" plugin-a ", "", "plugin-b"],
            env_names=["PLUGIN-C", " "],
            config_names=["plugin-b", None, "  "],
        )
        self.assertEqual(sorted(combined), ["PLUGIN-C", "plugin-a", "plugin-b"])

    def test_parse_simple_yaml_handles_comments_quotes_and_whitespace_without_yaml(self):
        module = load_validator_module()
        # Force the module down the fallback path even if PyYAML is installed
        yaml_backup = getattr(module, "yaml", None)
        try:
            module.yaml = None
            text = """
                # leading comment

                key1: value1      # trailing comment
                key2: " spaced value "
                key3: 'another value'
                key4: value-with-#-hash
            """
            parsed = module._parse_simple_yaml(text)

            # Fallback parser should ignore comments and whitespace, and preserve quoted values
            self.assertEqual(parsed["key1"], "value1")
            self.assertEqual(parsed["key2"], " spaced value ")
            self.assertEqual(parsed["key3"], "another value")
            self.assertEqual(parsed["key4"], "value-with-#-hash")
        finally:
            module.yaml = yaml_backup

    def test_load_metadata_uses_yaml_safe_load_when_available(self):
        module = load_validator_module()
        from unittest import mock
        import tempfile
        import os

        with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
            # The actual contents are irrelevant because yaml.safe_load is mocked
            f.write("name: should-be-overridden\n")
            metadata_path = f.name

        try:
            fake_yaml = mock.Mock()
            fake_yaml.safe_load.return_value = {"name": "from-yaml", "version": "1.0.0"}

            with mock.patch.object(module, "yaml", fake_yaml):
                metadata = module.load_metadata(metadata_path)

            self.assertEqual(metadata, {"name": "from-yaml", "version": "1.0.0"})
            fake_yaml.safe_load.assert_called_once()
        finally:
            os.remove(metadata_path)

    def test_load_metadata_uses_simple_parser_when_yaml_unavailable(self):
        module = load_validator_module()
        import tempfile
        import os

        yaml_backup = getattr(module, "yaml", None)
        try:
            module.yaml = None

            with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
                f.write(
                    """
                    name: demo-plugin
                    version: "0.2.3"
                    """
                )
                metadata_path = f.name

            metadata = module.load_metadata(metadata_path)
            self.assertEqual(metadata.get("name"), "demo-plugin")
            self.assertEqual(metadata.get("version"), "0.2.3")
        finally:
            module.yaml = yaml_backup
            os.remove(metadata_path)

    def test_load_plugins_index_accepts_valid_object(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        index_obj = {
            "good-plugin": {"name": "Good Plugin", "repo": "https://github.com/example/good"},
            "another-plugin": {"name": "Another Plugin"},
        }

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump(index_obj, f)
            index_path = f.name

        try:
            plugins = module.load_plugins_index(index_path)
            # Should round-trip the valid mapping
            self.assertIn("good-plugin", plugins)
            self.assertIn("another-plugin", plugins)
            self.assertEqual(plugins["good-plugin"]["name"], "Good Plugin")
        finally:
            os.remove(index_path)

    def test_load_plugins_index_rejects_json_array(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump([{"name": "array-entry"}], f)
            index_path = f.name

        try:
            with self.assertRaises(ValueError):
                module.load_plugins_index(index_path)
        finally:
            os.remove(index_path)

    def test_load_plugins_index_filters_invalid_entries(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        # Include entries that should be filtered:
        # - non-dict value
        # - key that is not a string (if implementation checks this)
        # - dict without required fields (e.g., missing "name")
        index_obj = {
            "valid-plugin": {"name": "Valid Plugin", "repo": "https://github.com/example/valid"},
            "not-a-dict": "just-a-string",
            "missing-name": {"repo": "https://github.com/example/missing-name"},
        }

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump(index_obj, f)
            index_path = f.name

        try:
            plugins = module.load_plugins_index(index_path)
            # Only the fully valid entry should be present
            self.assertEqual(set(plugins.keys()), {"valid-plugin"})
            self.assertEqual(plugins["valid-plugin"]["name"], "Valid Plugin")
        finally:
            os.remove(index_path)

```

1. Ensure the `HelperFunctionTests.test_combine_requested_names_merges_trims_and_drops_empty_values` method still contains any existing assertions you rely on; if there were prior expectations different from the ones added here, merge them rather than overwrite.
2. At the top of `tests/test_validate_plugins.py`, make sure the following imports are present and not duplicated:
   - `import unittest`
   - `import json`
   - `import os`
   - `import tempfile`
   - `from unittest import mock`
3. These tests assume:
   - `combine_requested_names(cli_names, env_names, config_names)` exists and returns an iterable of names.
   - `_parse_simple_yaml(text: str)` returns a `dict`-like object.
   - `load_metadata(path: str | Path)` reads the file and uses `yaml.safe_load` when `module.yaml` is truthy, otherwise falls back to `_parse_simple_yaml`.
   - `load_plugins_index(path: str | Path)` reads JSON from the given path, raises `ValueError` when the JSON root is a list, and returns a mapping of plugin IDs to dict metadata while filtering out non-dict or otherwise-invalid entries.
   If the actual signatures or behaviors differ, adjust the test calls and assertions accordingly (for example, if `load_plugins_index` accepts a file object instead of a path, or if it uses a different validation rule for entries).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

)


class HelperFunctionTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for YAML loading and plugins index parsing to ensure robustness of input handling

These tests omit load_metadata / _parse_simple_yaml and load_plugins_index, which are key for marketplace input handling. Please also add tests that:

  • Cover _parse_simple_yaml with comments, quoted values, and extra whitespace when yaml is unavailable;
  • Cover load_metadata with both code paths: yaml available (mock yaml.safe_load) and yaml is None (fallback parser);
  • Cover load_plugins_index with: (a) a valid object, (b) a JSON array that should raise ValueError, and (c) entries with non-dict values or non-string keys that should be filtered out.
    This will better protect against regressions in index/metadata formats.

Suggested implementation:

class HelperFunctionTests(unittest.TestCase):
    def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
        module = load_validator_module()
        # This should exercise the existing helper that merges requested names from
        # multiple sources, trims whitespace, and drops empty values.
        combined = module.combine_requested_names(
            cli_names=[" plugin-a ", "", "plugin-b"],
            env_names=["PLUGIN-C", " "],
            config_names=["plugin-b", None, "  "],
        )
        self.assertEqual(sorted(combined), ["PLUGIN-C", "plugin-a", "plugin-b"])

    def test_parse_simple_yaml_handles_comments_quotes_and_whitespace_without_yaml(self):
        module = load_validator_module()
        # Force the module down the fallback path even if PyYAML is installed
        yaml_backup = getattr(module, "yaml", None)
        try:
            module.yaml = None
            text = """
                # leading comment

                key1: value1      # trailing comment
                key2: " spaced value "
                key3: 'another value'
                key4: value-with-#-hash
            """
            parsed = module._parse_simple_yaml(text)

            # Fallback parser should ignore comments and whitespace, and preserve quoted values
            self.assertEqual(parsed["key1"], "value1")
            self.assertEqual(parsed["key2"], " spaced value ")
            self.assertEqual(parsed["key3"], "another value")
            self.assertEqual(parsed["key4"], "value-with-#-hash")
        finally:
            module.yaml = yaml_backup

    def test_load_metadata_uses_yaml_safe_load_when_available(self):
        module = load_validator_module()
        from unittest import mock
        import tempfile
        import os

        with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
            # The actual contents are irrelevant because yaml.safe_load is mocked
            f.write("name: should-be-overridden\n")
            metadata_path = f.name

        try:
            fake_yaml = mock.Mock()
            fake_yaml.safe_load.return_value = {"name": "from-yaml", "version": "1.0.0"}

            with mock.patch.object(module, "yaml", fake_yaml):
                metadata = module.load_metadata(metadata_path)

            self.assertEqual(metadata, {"name": "from-yaml", "version": "1.0.0"})
            fake_yaml.safe_load.assert_called_once()
        finally:
            os.remove(metadata_path)

    def test_load_metadata_uses_simple_parser_when_yaml_unavailable(self):
        module = load_validator_module()
        import tempfile
        import os

        yaml_backup = getattr(module, "yaml", None)
        try:
            module.yaml = None

            with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
                f.write(
                    """
                    name: demo-plugin
                    version: "0.2.3"
                    """
                )
                metadata_path = f.name

            metadata = module.load_metadata(metadata_path)
            self.assertEqual(metadata.get("name"), "demo-plugin")
            self.assertEqual(metadata.get("version"), "0.2.3")
        finally:
            module.yaml = yaml_backup
            os.remove(metadata_path)

    def test_load_plugins_index_accepts_valid_object(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        index_obj = {
            "good-plugin": {"name": "Good Plugin", "repo": "https://github.com/example/good"},
            "another-plugin": {"name": "Another Plugin"},
        }

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump(index_obj, f)
            index_path = f.name

        try:
            plugins = module.load_plugins_index(index_path)
            # Should round-trip the valid mapping
            self.assertIn("good-plugin", plugins)
            self.assertIn("another-plugin", plugins)
            self.assertEqual(plugins["good-plugin"]["name"], "Good Plugin")
        finally:
            os.remove(index_path)

    def test_load_plugins_index_rejects_json_array(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump([{"name": "array-entry"}], f)
            index_path = f.name

        try:
            with self.assertRaises(ValueError):
                module.load_plugins_index(index_path)
        finally:
            os.remove(index_path)

    def test_load_plugins_index_filters_invalid_entries(self):
        module = load_validator_module()
        import tempfile
        import json
        import os

        # Include entries that should be filtered:
        # - non-dict value
        # - key that is not a string (if implementation checks this)
        # - dict without required fields (e.g., missing "name")
        index_obj = {
            "valid-plugin": {"name": "Valid Plugin", "repo": "https://github.com/example/valid"},
            "not-a-dict": "just-a-string",
            "missing-name": {"repo": "https://github.com/example/missing-name"},
        }

        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
            json.dump(index_obj, f)
            index_path = f.name

        try:
            plugins = module.load_plugins_index(index_path)
            # Only the fully valid entry should be present
            self.assertEqual(set(plugins.keys()), {"valid-plugin"})
            self.assertEqual(plugins["valid-plugin"]["name"], "Valid Plugin")
        finally:
            os.remove(index_path)
  1. Ensure the HelperFunctionTests.test_combine_requested_names_merges_trims_and_drops_empty_values method still contains any existing assertions you rely on; if there were prior expectations different from the ones added here, merge them rather than overwrite.
  2. At the top of tests/test_validate_plugins.py, make sure the following imports are present and not duplicated:
    • import unittest
    • import json
    • import os
    • import tempfile
    • from unittest import mock
  3. These tests assume:
    • combine_requested_names(cli_names, env_names, config_names) exists and returns an iterable of names.
    • _parse_simple_yaml(text: str) returns a dict-like object.
    • load_metadata(path: str | Path) reads the file and uses yaml.safe_load when module.yaml is truthy, otherwise falls back to _parse_simple_yaml.
    • load_plugins_index(path: str | Path) reads JSON from the given path, raises ValueError when the JSON root is a list, and returns a mapping of plugin IDs to dict metadata while filtering out non-dict or otherwise-invalid entries.
      If the actual signatures or behaviors differ, adjust the test calls and assertions accordingly (for example, if load_plugins_index accepts a file object instead of a path, or if it uses a different validation rule for entries).

Copy link
Copy Markdown

@SourceryAI SourceryAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In load_plugins_index, non-dict values in plugins.json are silently dropped; consider failing fast with a clear error so malformed entries don’t disappear unnoticed from validation.
  • The validator script is quite large and mixes CLI parsing, filesystem/git operations, and worker logic in a single file; consider extracting helpers (e.g., clone/selection/reporting utilities or worker-only code) into separate modules to keep concerns isolated and make future changes easier.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `load_plugins_index`, non-dict values in `plugins.json` are silently dropped; consider failing fast with a clear error so malformed entries don’t disappear unnoticed from validation.
- The validator script is quite large and mixes CLI parsing, filesystem/git operations, and worker logic in a single file; consider extracting helpers (e.g., clone/selection/reporting utilities or worker-only code) into separate modules to keep concerns isolated and make future changes easier.

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="195-203" />
<code_context>
+    }
+
+
+def load_plugins_index(path: Path) -> dict[str, dict]:
+    data = json.loads(path.read_text(encoding="utf-8"))
+    if not isinstance(data, dict):
+        raise ValueError("plugins.json must contain a JSON object")
+    result = {}
+    for key, value in data.items():
+        if isinstance(key, str) and isinstance(value, dict):
+            result[key] = value
+    return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silently dropping non-dict entries in plugins.json could hide configuration mistakes.

In `load_plugins_index`, entries with non-string keys or non-dict values are silently skipped. This can hide misconfigurations in `plugins.json` and make it hard to understand why a plugin never shows up. Consider failing fast (e.g., raising an error) or at least logging these invalid entries so issues in the config are visible.

```suggestion
def load_plugins_index(path: Path) -> dict[str, dict]:
    data = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError("plugins.json must contain a JSON object")

    for key, value in data.items():
        if not isinstance(key, str):
            raise ValueError(
                f"plugins.json keys must be strings, got {type(key).__name__!r}: {key!r}"
            )
        if not isinstance(value, dict):
            raise ValueError(
                f"plugins.json values must be objects/dicts; key {key!r} has {type(value).__name__!r}"
            )

    return data
```
</issue_to_address>

### Comment 2
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="56-65" />
<code_context>
+
+          validation_note = ""
+
+          try:
+              base_raw = subprocess.check_output(
+                  ["git", "show", f"origin/{base_ref}:plugins.json"],
+                  text=True,
+                  stderr=subprocess.DEVNULL,
+              )
+              base = json.loads(base_raw)
+              if not isinstance(base, dict):
+                  base = {}
+          except (subprocess.CalledProcessError, json.JSONDecodeError):
+              base = {}
+
+          try:
+              head = json.loads(Path("plugins.json").read_text(encoding="utf-8"))
+              if not isinstance(head, dict):
+                  raise ValueError("plugins.json must contain a JSON object")
+          except (json.JSONDecodeError, ValueError) as exc:
+              changed = []
+              should_validate = False
+              validation_note = f"Skipping smoke validation because plugins.json is invalid on the PR head: {exc}"
+          else:
+              changed = [
</code_context>
<issue_to_address>
**issue (bug_risk):** Invalid plugins.json on the PR head causes smoke validation to be skipped rather than failing the workflow.

When `plugins.json` on the PR head is invalid (e.g., merge markers or bad JSON), this code sets `should_validate = False` and only records a note, so the job still succeeds and a broken `plugins.json` can land. Consider treating this as a hard failure (e.g., `sys.exit(1)` or re-raising after emitting a clear error) so that invalid configs fail the workflow instead of silently skipping validation.
</issue_to_address>

### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="155" />
<code_context>
+    }
+
+
+def build_worker_command(
+    *,
+    script_path: Path,
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the single-use worker helpers and collapsing the tiny process-output helper into one function to make the validation script easier to follow sequentially.

You can trim some indirection around the worker helpers without changing behavior, which should make the script easier to read top‑to‑bottom.

### 1. Inline `build_worker_command`

`build_worker_command` is a single-use, linear argument list. Inlining it in `validate_plugin` removes a jump without hurting clarity:

```python
# remove this helper entirely
# def build_worker_command(...): ...

def validate_plugin(...):
    ...
    plugin_dir_name = precheck["plugin_dir_name"]

    command = [
        sys.executable,
        str(script_path),
        "--worker",
        "--astrbot-path", str(astrbot_path),
        "--plugin-source-dir", str(plugin_clone_dir),
        "--plugin-dir-name", plugin_dir_name,
        "--normalized-repo-url", normalized_repo_url,
    ]

    try:
        completed = subprocess.run(
            command,
            check=False,
            capture_output=True,
            text=True,
            timeout=load_timeout,
        )
    ...
```

### 2. Inline `build_worker_sys_path`

This is also a trivial, single-use helper:

```python
# remove this helper entirely
# def build_worker_sys_path(*, astrbot_root: Path, astrbot_path: Path) -> list[str]:
#     return [str(astrbot_root.resolve()), str(astrbot_path.resolve())]

def run_worker(args: argparse.Namespace) -> int:
    ...
    os.environ["ASTRBOT_ROOT"] = str(astrbot_root)
    os.environ.setdefault("TESTING", "true")
    sys.path[:0] = [
        str(astrbot_root.resolve()),
        str(Path(args.astrbot_path).resolve()),
    ]

    result = asyncio.run(
        run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
    )
    ...
```

### 3. Simplify process output helpers

You can keep the behavior but reduce the surface area by folding `_normalize_process_output` into `build_process_output_details`, since it’s only used there:

```python
# keep a single helper
def build_process_output_details(
    *, stdout: str | bytes | None, stderr: str | bytes | None
) -> dict | None:
    def _normalize(output: str | bytes | None) -> str | None:
        if output is None:
            return None
        if isinstance(output, bytes):
            output = output.decode("utf-8", errors="replace")
        output = output.strip()
        return output or None

    details = {}
    stdout_text = _normalize(stdout)
    stderr_text = _normalize(stderr)
    if stdout_text:
        details["stdout"] = stdout_text
    if stderr_text:
        details["stderr"] = stderr_text
    return details or None
```

These changes keep all existing behavior but reduce the number of tiny helpers the reader has to chase when following `validate_plugin`/`run_worker`.
</issue_to_address>

Hi @zouyonghe! 👋

Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀

Install the sourcery-ai bot to get automatic code reviews on every pull request ✨

Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py Outdated
Comment thread .github/workflows/validate-plugin-smoke.yml Outdated
}


def build_worker_command(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider inlining the single-use worker helpers and collapsing the tiny process-output helper into one function to make the validation script easier to follow sequentially.

You can trim some indirection around the worker helpers without changing behavior, which should make the script easier to read top‑to‑bottom.

1. Inline build_worker_command

build_worker_command is a single-use, linear argument list. Inlining it in validate_plugin removes a jump without hurting clarity:

# remove this helper entirely
# def build_worker_command(...): ...

def validate_plugin(...):
    ...
    plugin_dir_name = precheck["plugin_dir_name"]

    command = [
        sys.executable,
        str(script_path),
        "--worker",
        "--astrbot-path", str(astrbot_path),
        "--plugin-source-dir", str(plugin_clone_dir),
        "--plugin-dir-name", plugin_dir_name,
        "--normalized-repo-url", normalized_repo_url,
    ]

    try:
        completed = subprocess.run(
            command,
            check=False,
            capture_output=True,
            text=True,
            timeout=load_timeout,
        )
    ...

2. Inline build_worker_sys_path

This is also a trivial, single-use helper:

# remove this helper entirely
# def build_worker_sys_path(*, astrbot_root: Path, astrbot_path: Path) -> list[str]:
#     return [str(astrbot_root.resolve()), str(astrbot_path.resolve())]

def run_worker(args: argparse.Namespace) -> int:
    ...
    os.environ["ASTRBOT_ROOT"] = str(astrbot_root)
    os.environ.setdefault("TESTING", "true")
    sys.path[:0] = [
        str(astrbot_root.resolve()),
        str(Path(args.astrbot_path).resolve()),
    ]

    result = asyncio.run(
        run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
    )
    ...

3. Simplify process output helpers

You can keep the behavior but reduce the surface area by folding _normalize_process_output into build_process_output_details, since it’s only used there:

# keep a single helper
def build_process_output_details(
    *, stdout: str | bytes | None, stderr: str | bytes | None
) -> dict | None:
    def _normalize(output: str | bytes | None) -> str | None:
        if output is None:
            return None
        if isinstance(output, bytes):
            output = output.decode("utf-8", errors="replace")
        output = output.strip()
        return output or None

    details = {}
    stdout_text = _normalize(stdout)
    stderr_text = _normalize(stderr)
    if stdout_text:
        details["stdout"] = stdout_text
    if stderr_text:
        details["stderr"] = stderr_text
    return details or None

These changes keep all existing behavior but reduce the number of tiny helpers the reader has to chase when following validate_plugin/run_worker.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In the PR workflow, ASTRBOT_REF is hardcoded to master for pull_request runs; consider deriving this from the repository’s default branch or making it configurable (similar to workflow_dispatch) to avoid breakage if the default branch name changes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In the PR workflow, `ASTRBOT_REF` is hardcoded to `master` for pull_request runs; consider deriving this from the repository’s default branch or making it configurable (similar to `workflow_dispatch`) to avoid breakage if the default branch name changes.

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="132-139" />
<code_context>
+    return result
+
+
+def load_metadata(path: Path) -> dict:
+    text = path.read_text(encoding="utf-8")
+    if any(marker in text for marker in CONFLICT_MARKERS):
+        raise MetadataLoadError(
+            "could not find expected ':' (merge conflict markers found in metadata.yaml)"
+        )
+
+    if yaml is not None:
+        try:
+            loaded = yaml.safe_load(text)
+        except yaml.YAMLError as exc:
+            raise MetadataLoadError(str(exc)) from exc
+        if isinstance(loaded, dict):
+            return loaded
+        return {}
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Returning an empty dict when YAML root is not a mapping hides malformed metadata

In the YAML branch of `load_metadata`, a non-dict result from `safe_load` (e.g., list or scalar) is treated as `{}`, so structurally invalid `metadata.yaml` is indistinguishable from genuinely empty metadata and only surfaces as missing fields. Consider raising `MetadataLoadError` when `loaded` is not a dict so callers can distinguish malformed structure from empty content.

```suggestion
    if yaml is not None:
        try:
            loaded = yaml.safe_load(text)
        except yaml.YAMLError as exc:
            raise MetadataLoadError(str(exc)) from exc

        if loaded is None:
            # Treat an entirely empty metadata.yaml as empty metadata
            return {}

        if not isinstance(loaded, dict):
            raise MetadataLoadError(
                "metadata.yaml must contain a mapping at the top level"
            )

        return loaded
```
</issue_to_address>

### Comment 2
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="89-91" />
<code_context>
+              if not should_validate:
+                  validation_note = "No plugin entries changed in plugins.json; skipping smoke validation."
+
+          with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as handle:
+              handle.write("ASTRBOT_REF=master\n")
+              handle.write(f"PLUGIN_NAME_LIST={','.join(changed)}\n")
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Hardcoding ASTRBOT_REF to master for PRs may be misaligned with the target branch

In the PR path, `ASTRBOT_REF` is always set to `master`, ignoring `GITHUB_BASE_REF`. If the default branch changes (e.g., to `main`) or PRs target another long‑lived branch, smoke validation will run against the wrong branch. Consider deriving `ASTRBOT_REF` from `GITHUB_BASE_REF` (as with `origin/{base_ref}` above) so validation tracks the actual PR target.

```suggestion
          with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as handle:
              astrbot_ref = os.environ.get("GITHUB_BASE_REF", "master")
              handle.write(f"ASTRBOT_REF={astrbot_ref}\n")
              handle.write(f"PLUGIN_NAME_LIST={','.join(changed)}\n")
```
</issue_to_address>

### Comment 3
<location path="tests/test_validate_plugins.py" line_range="320" />
<code_context>
+            os.remove(index_path)
+
+
+class ValidationProgressTests(unittest.TestCase):
+    def test_validate_selected_plugins_emits_progress_and_result_lines(self):
+        module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add focused unit tests for `validate_plugin` to cover repo/clone/precheck/worker error paths

`validate_selected_plugins` is only tested via a mocked `validate_plugin`, so the core validation logic isn’t directly verified. Please add a focused test class (e.g. `ValidatePluginTests`) that mocks `clone_plugin_repo`, `precheck_plugin_directory`, and `subprocess.run` to cover:

- Missing `repo` field → stage `repo_url`, `ok=False`.
- Invalid repo URL from `normalize_repo_url` → stage `repo_url`, message propagated.
- `clone_plugin_repo` raising `CalledProcessError` → stage `clone`, stderr/stdout fallback.
- `clone_plugin_repo` raising `TimeoutExpired` → stage `clone_timeout`, uses `build_process_output_details`.
- Precheck failure (e.g. `stage="metadata"`) → precheck results mapped into `build_result`.
- Successful clone + precheck → worker subprocess invoked, mocked `subprocess.run` output passed to `parse_worker_output`.

This will exercise the full validator flow without relying on real Git or AstrBot dependencies.

Suggested implementation:

```python
import os
import subprocess
from unittest import mock

```

```python
class ValidationProgressTests(unittest.TestCase):
    def test_validate_selected_plugins_emits_progress_and_result_lines(self):
        module = load_validator_module()


class ValidatePluginTests(unittest.TestCase):
    """Focused tests for validate_plugin covering repo/clone/precheck/worker paths."""

    def setUp(self):
        # All tests use the same validator module instance
        self.module = load_validator_module()

        # Minimal, valid plugin definition used as a base in most tests
        self.base_plugin = {
            "name": "demo-plugin",
            "repo": "https://github.com/example/demo-plugin",
        }

        # Default validator settings; adjust to match the real signature/shape
        self.default_settings = {}

    def _call_validate_plugin(self, plugin_override=None, **kwargs):
        """Helper to invoke validate_plugin with overridable plugin/settings.

        Adjust this helper to match the actual validate_plugin signature.
        """
        plugin = dict(self.base_plugin)
        if plugin_override:
            plugin.update(plugin_override)

        # The exact argument list may differ in your implementation.
        # Prefer keyword args where possible to make the tests resilient to changes.
        return self.module.validate_plugin(
            plugin=plugin,
            settings=self.default_settings,
            **kwargs,
        )

    def test_missing_repo_field_sets_repo_url_stage_and_fails(self):
        # Remove repo field from plugin
        plugin = dict(self.base_plugin)
        plugin.pop("repo", None)

        result = self.module.validate_plugin(
            plugin=plugin,
            settings=self.default_settings,
        )

        self.assertIsInstance(result, dict)
        self.assertEqual(result.get("stage"), "repo_url")
        self.assertFalse(result.get("ok"))
        # Message should indicate missing repo information in some way
        self.assertIsInstance(result.get("message"), str)
        self.assertTrue(result["message"])

    def test_invalid_repo_url_from_normalize_repo_url_sets_repo_url_stage_and_propagates_message(
        self,
    ):
        error_message = "invalid repo URL"

        with mock.patch.object(
            self.module, "normalize_repo_url", side_effect=ValueError(error_message)
        ):
            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "repo_url")
        self.assertFalse(result.get("ok"))
        self.assertEqual(result.get("message"), error_message)

    def test_clone_plugin_repo_called_process_error_sets_clone_stage_and_uses_stderr_or_stdout(
        self,
    ):
        # Simulate clone failure with CalledProcessError
        cpe = subprocess.CalledProcessError(
            returncode=1,
            cmd=["git", "clone"],
            output="clone stdout",
            stderr="clone stderr",
        )

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", side_effect=cpe
        ):
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "clone")
        self.assertFalse(result.get("ok"))
        # Implementation should prefer stderr, but fall back to stdout if needed
        self.assertIn("clone stderr", result.get("message", ""))

    def test_clone_plugin_repo_timeout_expired_sets_clone_timeout_stage_and_uses_build_process_output_details(
        self,
    ):
        timeout_exc = subprocess.TimeoutExpired(cmd=["git", "clone"], timeout=30)
        fake_details = {"stdout": "clone timeout stdout", "stderr": "clone timeout stderr"}

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", side_effect=timeout_exc
        ), mock.patch.object(
            self.module,
            "build_process_output_details",
            return_value=fake_details,
        ) as details_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "clone_timeout")
        self.assertFalse(result.get("ok"))
        # Ensure helper was used to build the details
        details_mock.assert_called_once()
        self.assertEqual(result.get("details"), fake_details)

    def test_precheck_failure_is_mapped_into_build_result(self):
        # Clone succeeds and returns a repo path; precheck fails
        fake_repo_path = "/tmp/demo-plugin-repo"
        precheck_result = {
            "ok": False,
            "stage": "metadata",
            "message": "invalid metadata",
        }

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", return_value=fake_repo_path
        ) as clone_mock, mock.patch.object(
            self.module,
            "precheck_plugin_directory",
            return_value=precheck_result,
        ) as precheck_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        clone_mock.assert_called_once()
        precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
        # validate_plugin should propagate/translate precheck failure into its own result
        self.assertFalse(result.get("ok"))
        self.assertEqual(result.get("stage"), "metadata")
        self.assertIn("invalid metadata", result.get("message", ""))

    def test_successful_clone_and_precheck_invokes_worker_and_passes_output_to_parse_worker_output(
        self,
    ):
        fake_repo_path = "/tmp/demo-plugin-repo"
        worker_stdout = "worker stdout json or log"
        worker_stderr = ""
        completed = subprocess.CompletedProcess(
            args=["astrbot", "validate"],
            returncode=0,
            stdout=worker_stdout,
            stderr=worker_stderr,
        )
        parsed_output = {
            "ok": True,
            "stage": "worker",
            "message": "validation successful",
        }

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", return_value=fake_repo_path
        ) as clone_mock, mock.patch.object(
            self.module,
            "precheck_plugin_directory",
            return_value={"ok": True},
        ) as precheck_mock, mock.patch.object(
            subprocess, "run", return_value=completed
        ) as run_mock, mock.patch.object(
            self.module,
            "parse_worker_output",
            return_value=parsed_output,
        ) as parse_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        clone_mock.assert_called_once()
        precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
        run_mock.assert_called_once()
        parse_mock.assert_called_once_with(completed, self.base_plugin, self.default_settings)

        self.assertEqual(result, parsed_output)

```

1. Adjust the import edit if `subprocess` and `mock` are already imported elsewhere in `tests/test_validate_plugins.py` to avoid duplicate imports.
2. The helper `_call_validate_plugin` assumes a signature `validate_plugin(plugin=..., settings=..., **kwargs)`. Update it to match the real `validate_plugin` function (parameter names such as `validator_settings`, `keep_repo`, `worker_timeout`, or `workdir`).
3. The tests assume `validate_plugin` returns a `dict` with at least `ok`, `stage`, `message`, and (for the timeout case) `details`. If the actual shape differs, adjust the asserted keys accordingly.
4. The `precheck_plugin_directory` expectations (`precheck_mock.assert_called_once_with(...)`) assume the function receives `(repo_path, plugin, settings)`; update to match the real signature if needed.
5. The worker invocation test assumes:
   - `subprocess.run` is the function invoked by `validate_plugin` to execute the worker.
   - `parse_worker_output(completed_process, plugin, settings)` is used to turn the subprocess result into a validation result.
   If your implementation uses different arguments or a different helper, adjust the corresponding mocks and `assert_called_once_with` checks.
6. Place the `ValidatePluginTests` class near the other validator tests; if `ValidationProgressTests` is in a different section, you may want to move this class next to existing `ValidatePlugin`-related tests for consistency.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py Outdated
Comment thread .github/workflows/validate-plugin-smoke.yml Outdated
os.remove(index_path)


class ValidationProgressTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add focused unit tests for validate_plugin to cover repo/clone/precheck/worker error paths

validate_selected_plugins is only tested via a mocked validate_plugin, so the core validation logic isn’t directly verified. Please add a focused test class (e.g. ValidatePluginTests) that mocks clone_plugin_repo, precheck_plugin_directory, and subprocess.run to cover:

  • Missing repo field → stage repo_url, ok=False.
  • Invalid repo URL from normalize_repo_url → stage repo_url, message propagated.
  • clone_plugin_repo raising CalledProcessError → stage clone, stderr/stdout fallback.
  • clone_plugin_repo raising TimeoutExpired → stage clone_timeout, uses build_process_output_details.
  • Precheck failure (e.g. stage="metadata") → precheck results mapped into build_result.
  • Successful clone + precheck → worker subprocess invoked, mocked subprocess.run output passed to parse_worker_output.

This will exercise the full validator flow without relying on real Git or AstrBot dependencies.

Suggested implementation:

import os
import subprocess
from unittest import mock
class ValidationProgressTests(unittest.TestCase):
    def test_validate_selected_plugins_emits_progress_and_result_lines(self):
        module = load_validator_module()


class ValidatePluginTests(unittest.TestCase):
    """Focused tests for validate_plugin covering repo/clone/precheck/worker paths."""

    def setUp(self):
        # All tests use the same validator module instance
        self.module = load_validator_module()

        # Minimal, valid plugin definition used as a base in most tests
        self.base_plugin = {
            "name": "demo-plugin",
            "repo": "https://github.com/example/demo-plugin",
        }

        # Default validator settings; adjust to match the real signature/shape
        self.default_settings = {}

    def _call_validate_plugin(self, plugin_override=None, **kwargs):
        """Helper to invoke validate_plugin with overridable plugin/settings.

        Adjust this helper to match the actual validate_plugin signature.
        """
        plugin = dict(self.base_plugin)
        if plugin_override:
            plugin.update(plugin_override)

        # The exact argument list may differ in your implementation.
        # Prefer keyword args where possible to make the tests resilient to changes.
        return self.module.validate_plugin(
            plugin=plugin,
            settings=self.default_settings,
            **kwargs,
        )

    def test_missing_repo_field_sets_repo_url_stage_and_fails(self):
        # Remove repo field from plugin
        plugin = dict(self.base_plugin)
        plugin.pop("repo", None)

        result = self.module.validate_plugin(
            plugin=plugin,
            settings=self.default_settings,
        )

        self.assertIsInstance(result, dict)
        self.assertEqual(result.get("stage"), "repo_url")
        self.assertFalse(result.get("ok"))
        # Message should indicate missing repo information in some way
        self.assertIsInstance(result.get("message"), str)
        self.assertTrue(result["message"])

    def test_invalid_repo_url_from_normalize_repo_url_sets_repo_url_stage_and_propagates_message(
        self,
    ):
        error_message = "invalid repo URL"

        with mock.patch.object(
            self.module, "normalize_repo_url", side_effect=ValueError(error_message)
        ):
            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "repo_url")
        self.assertFalse(result.get("ok"))
        self.assertEqual(result.get("message"), error_message)

    def test_clone_plugin_repo_called_process_error_sets_clone_stage_and_uses_stderr_or_stdout(
        self,
    ):
        # Simulate clone failure with CalledProcessError
        cpe = subprocess.CalledProcessError(
            returncode=1,
            cmd=["git", "clone"],
            output="clone stdout",
            stderr="clone stderr",
        )

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", side_effect=cpe
        ):
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "clone")
        self.assertFalse(result.get("ok"))
        # Implementation should prefer stderr, but fall back to stdout if needed
        self.assertIn("clone stderr", result.get("message", ""))

    def test_clone_plugin_repo_timeout_expired_sets_clone_timeout_stage_and_uses_build_process_output_details(
        self,
    ):
        timeout_exc = subprocess.TimeoutExpired(cmd=["git", "clone"], timeout=30)
        fake_details = {"stdout": "clone timeout stdout", "stderr": "clone timeout stderr"}

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", side_effect=timeout_exc
        ), mock.patch.object(
            self.module,
            "build_process_output_details",
            return_value=fake_details,
        ) as details_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        self.assertEqual(result.get("stage"), "clone_timeout")
        self.assertFalse(result.get("ok"))
        # Ensure helper was used to build the details
        details_mock.assert_called_once()
        self.assertEqual(result.get("details"), fake_details)

    def test_precheck_failure_is_mapped_into_build_result(self):
        # Clone succeeds and returns a repo path; precheck fails
        fake_repo_path = "/tmp/demo-plugin-repo"
        precheck_result = {
            "ok": False,
            "stage": "metadata",
            "message": "invalid metadata",
        }

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", return_value=fake_repo_path
        ) as clone_mock, mock.patch.object(
            self.module,
            "precheck_plugin_directory",
            return_value=precheck_result,
        ) as precheck_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        clone_mock.assert_called_once()
        precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
        # validate_plugin should propagate/translate precheck failure into its own result
        self.assertFalse(result.get("ok"))
        self.assertEqual(result.get("stage"), "metadata")
        self.assertIn("invalid metadata", result.get("message", ""))

    def test_successful_clone_and_precheck_invokes_worker_and_passes_output_to_parse_worker_output(
        self,
    ):
        fake_repo_path = "/tmp/demo-plugin-repo"
        worker_stdout = "worker stdout json or log"
        worker_stderr = ""
        completed = subprocess.CompletedProcess(
            args=["astrbot", "validate"],
            returncode=0,
            stdout=worker_stdout,
            stderr=worker_stderr,
        )
        parsed_output = {
            "ok": True,
            "stage": "worker",
            "message": "validation successful",
        }

        with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
            self.module, "clone_plugin_repo", return_value=fake_repo_path
        ) as clone_mock, mock.patch.object(
            self.module,
            "precheck_plugin_directory",
            return_value={"ok": True},
        ) as precheck_mock, mock.patch.object(
            subprocess, "run", return_value=completed
        ) as run_mock, mock.patch.object(
            self.module,
            "parse_worker_output",
            return_value=parsed_output,
        ) as parse_mock:
            normalize_mock.return_value = self.base_plugin["repo"]

            result = self._call_validate_plugin()

        clone_mock.assert_called_once()
        precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
        run_mock.assert_called_once()
        parse_mock.assert_called_once_with(completed, self.base_plugin, self.default_settings)

        self.assertEqual(result, parsed_output)
  1. Adjust the import edit if subprocess and mock are already imported elsewhere in tests/test_validate_plugins.py to avoid duplicate imports.
  2. The helper _call_validate_plugin assumes a signature validate_plugin(plugin=..., settings=..., **kwargs). Update it to match the real validate_plugin function (parameter names such as validator_settings, keep_repo, worker_timeout, or workdir).
  3. The tests assume validate_plugin returns a dict with at least ok, stage, message, and (for the timeout case) details. If the actual shape differs, adjust the asserted keys accordingly.
  4. The precheck_plugin_directory expectations (precheck_mock.assert_called_once_with(...)) assume the function receives (repo_path, plugin, settings); update to match the real signature if needed.
  5. The worker invocation test assumes:
    • subprocess.run is the function invoked by validate_plugin to execute the worker.
    • parse_worker_output(completed_process, plugin, settings) is used to turn the subprocess result into a validation result.
      If your implementation uses different arguments or a different helper, adjust the corresponding mocks and assert_called_once_with checks.
  6. Place the ValidatePluginTests class near the other validator tests; if ValidationProgressTests is in a different section, you may want to move this class next to existing ValidatePlugin-related tests for consistency.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've left some high level feedback:

  • The validator script is quite large and mixes CLI wiring, filesystem/git operations, and AstrBot runtime glue in a single file; consider splitting it into smaller, focused modules (e.g., URL/index handling, clone/precheck, worker orchestration) to improve readability and long-term maintainability.
  • The Detect changed plugins from pull request workflow step embeds a substantial Python script inline; consider moving this logic into a tracked helper script (e.g., under scripts/) so it can be versioned, tested, and reused more easily.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The validator script is quite large and mixes CLI wiring, filesystem/git operations, and AstrBot runtime glue in a single file; consider splitting it into smaller, focused modules (e.g., URL/index handling, clone/precheck, worker orchestration) to improve readability and long-term maintainability.
- The `Detect changed plugins from pull request` workflow step embeds a substantial Python script inline; consider moving this logic into a tracked helper script (e.g., under `scripts/`) so it can be versioned, tested, and reused more easily.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="100-122" />
<code_context>
+    return items[:limit]
+
+
+def _parse_simple_yaml(path: Path) -> dict:
+    def parse_value(raw_value: str) -> str:
+        value = raw_value.strip()
+        if not value:
+            return ""
+
+        if value[0] in {'"', "'"}:
+            quote = value[0]
+            end_index = value.rfind(quote)
+            if end_index > 0:
+                return value[1:end_index]
+
+        value = re.split(r"\s+#", value, maxsplit=1)[0].rstrip()
+        return value.strip("\"'")
+
+    result = {}
+    for raw_line in path.read_text(encoding="utf-8").splitlines():
+        line = raw_line.strip()
+        if not line or line.startswith("#") or ":" not in line:
+            continue
+        key, value = line.split(":", 1)
+        result[key.strip()] = parse_value(value)
+    return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The fallback YAML parser is very limited and may diverge from real YAML semantics in subtle cases.

This implementation only supports flat `key: value` pairs with limited quoting and comments. When PyYAML is unavailable, common patterns like multiline strings, nested mappings, lists, or values containing `:` will be misparsed or dropped. If this fallback is used outside very controlled inputs, consider either failing fast on unsupported constructs or explicitly constraining/validating the allowed format so plugin authors get clear errors instead of subtly incorrect metadata.

```suggestion
def _parse_simple_yaml(path: Path) -> dict:
    """Very small YAML subset parser used as a fallback when PyYAML is unavailable.

    Supported format:
    - Flat mapping of `key: value` pairs
    - No indentation (no nested objects or multiline continuations)
    - No lists (`- item` syntax)
    - `#` starts a comment when preceded by whitespace (or at line start)
    """
    def parse_value(raw_value: str) -> str:
        value = raw_value.strip()
        if not value:
            return ""

        # Quoted value – keep everything inside the outermost matching quotes.
        if value[0] in {'"', "'"}:
            quote = value[0]
            end_index = value.rfind(quote)
            if end_index > 0:
                return value[1:end_index]

        # Strip inline comments introduced by "#" with preceding whitespace, then trim quotes.
        value = re.split(r"\s+#", value, maxsplit=1)[0].rstrip()
        return value.strip("\"'")

    result: dict = {}
    for lineno, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
        stripped = raw_line.strip()
        # Skip completely empty lines
        if not stripped:
            continue
        # Skip full-line comments
        if stripped.startswith("#"):
            continue

        # Reject any indentation (indented lines are treated as unsupported nested/continuation lines)
        if raw_line[0].isspace():
            raise ValueError(
                f"Unsupported YAML indentation in {path} at line {lineno}: {raw_line!r}"
            )

        line = stripped

        # Reject list syntax explicitly
        if line.startswith("-"):
            raise ValueError(
                f"Unsupported YAML list syntax in {path} at line {lineno}: {raw_line!r}"
            )

        # Require a single top-level mapping entry per line
        if ":" not in line:
            raise ValueError(
                f"Unsupported YAML content (expected 'key: value') in {path} at line {lineno}: {raw_line!r}"
            )

        key, value = line.split(":", 1)
        key = key.strip()
        if not key:
            raise ValueError(
                f"Empty key is not allowed in {path} at line {lineno}: {raw_line!r}"
            )

        if key in result:
            raise ValueError(
                f"Duplicate key '{key}' in {path} at line {lineno}"
            )

        result[key] = parse_value(value)

    return result
```
</issue_to_address>

### Comment 2
<location path="scripts/validate_plugins/detect_changed_plugins.py" line_range="16-22" />
<code_context>
+ASTRBOT_REMOTE_URL = "https://github.com/AstrBotDevs/AstrBot"
+
+
+def load_plugins_map(text: str, *, source_name: str) -> dict[str, dict]:
+    try:
+        data = json.loads(text)
+    except json.JSONDecodeError as exc:
+        raise ValueError(f"plugins.json is invalid on the {source_name}: {exc}") from exc
+
+    if not isinstance(data, dict):
+        raise ValueError("plugins.json must contain a JSON object")
+    return data
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `load_plugins_map` only validates the top-level type and may accept malformed plugin entries.

This only checks that the JSON root is a dict; it doesn’t validate that keys are strings and values are dicts as `load_plugins_index` in `run.py` does. Consider sharing a common validation helper so malformed `plugins.json` entries are consistently rejected and don’t slip through one path but fail in another.

Suggested implementation:

```python
import os
import json
import subprocess
import sys
from pathlib import Path

```

```python
def load_plugins_map(text: str, *, source_name: str) -> dict[str, dict]:
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise ValueError(f"plugins.json is invalid on the {source_name}: {exc}") from exc

    if not isinstance(data, dict):
        raise ValueError("plugins.json must contain a JSON object")

    for name, payload in data.items():
        if not isinstance(name, str):
            raise ValueError(
                f"plugins.json on the {source_name} has a non-string key: {name!r}"
            )
        if not isinstance(payload, dict):
            raise ValueError(
                f"plugins.json entry {name!r} on the {source_name} must be a JSON object"
            )

    return data

```

To fully "share a common validation helper" with `load_plugins_index` in `run.py`, you may want to:
1. Extract the common validation logic into a shared function (e.g. `validate_plugins_map(data, source_name)`) in a module imported by both `run.py` and `detect_changed_plugins.py`.
2. Replace the inline validation loop here and in `run.py` with a call to that shared helper, keeping error messages and rules identical.
</issue_to_address>

### Comment 3
<location path="tests/test_validate_plugins.py" line_range="371" />
<code_context>
+        print_mock.assert_any_call("[2/2] FAIL plugin-b [metadata] invalid metadata.yaml", flush=True)
+
+
+class ValidatePluginTests(unittest.TestCase):
+    def setUp(self):
+        self.module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add a validate_plugin test for worker subprocess timeout

Current tests cover clone errors, precheck failures, and the successful worker path, but not the branch where `subprocess.run` raises `TimeoutExpired` (which should produce `stage="timeout"` and call `build_process_output_details`). Please add a mock-based test that forces `subprocess.run` to raise `TimeoutExpired` and asserts the `stage`, `message`, `plugin_dir_name`, and that `build_process_output_details` is called.

Suggested implementation:

```python
class ValidatePluginTests(unittest.TestCase):
    def setUp(self):
        self.module = load_validator_module()
        self.plugin_key = "demo-plugin"
        self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
        self.astrbot_path = Path("/tmp/AstrBot")
        self.script_path = Path("/tmp/run.py")
        self.work_dir = Path("/tmp/work")

    def call_validate_plugin(self, plugin_data=None):
        return self.module.validate_plugin(
            plugin=self.plugin_key,
            plugin_data=self.plugin_data if plugin_data is None else plugin_data,
            astrbot_path=self.astrbot_path,
            script_path=self.script_path,
            work_dir=self.work_dir,
        )

    def test_validate_plugin_worker_timeout(self):
        timeout_exc = subprocess.TimeoutExpired(cmd=["python", str(self.script_path)], timeout=30)
        timeout_exc.stdout = "worker stdout"
        timeout_exc.stderr = "worker stderr"

        with mock.patch(f"{self.module.__name__}.subprocess.run") as run_mock, mock.patch(
            f"{self.module.__name__}.build_process_output_details"
        ) as details_mock:
            run_mock.side_effect = timeout_exc
            details_mock.return_value = "timeout details"

            result = self.call_validate_plugin()

        # stage should be reported as timeout
        self.assertEqual(result["stage"], "timeout")

        # message should reflect timeout; adjust expected text if implementation differs
        self.assertIn("timeout", result["message"].lower())

        # plugin_dir_name should point at this plugin; adjust if your implementation uses a different key
        self.assertEqual(result["plugin_dir_name"], self.plugin_key)

        # build_process_output_details should be called with stdout/stderr from the TimeoutExpired exception
        details_mock.assert_called_once_with("worker stdout", "worker stderr")

```

Because only part of the file is visible, please verify and adjust the following in your codebase:

1. **`call_validate_plugin` signature**:  
   - Ensure `validate_plugin` actually takes `astrbot_path`, `script_path`, and `work_dir` keyword arguments.  
   - If the real signature differs, update the wrapper accordingly.

2. **Result structure**:  
   - Confirm that `validate_plugin` returns a mapping with keys `"stage"`, `"message"`, and `"plugin_dir_name"`.  
   - If your result object uses attributes or different keys (e.g., `result.stage` or `result.plugin_dir`), update the assertions to match.

3. **Timeout message text**:  
   - The test currently asserts that `"timeout"` appears in `result["message"].lower()`.  
   - If your implementation uses a specific message (e.g., `"worker subprocess timed out"`), you can tighten this assertion to match exactly.

4. **Patch targets**:  
   - The test patches `subprocess.run` and `build_process_output_details` using `self.module.__name__` as the module path.  
   - If those functions live in a different module (e.g., a helper module imported by the validator), adjust the patch strings to point at the actual module where they are used.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py
Comment thread scripts/validate_plugins/detect_changed_plugins.py Outdated
print_mock.assert_any_call("[2/2] FAIL plugin-b [metadata] invalid metadata.yaml", flush=True)


class ValidatePluginTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a validate_plugin test for worker subprocess timeout

Current tests cover clone errors, precheck failures, and the successful worker path, but not the branch where subprocess.run raises TimeoutExpired (which should produce stage="timeout" and call build_process_output_details). Please add a mock-based test that forces subprocess.run to raise TimeoutExpired and asserts the stage, message, plugin_dir_name, and that build_process_output_details is called.

Suggested implementation:

class ValidatePluginTests(unittest.TestCase):
    def setUp(self):
        self.module = load_validator_module()
        self.plugin_key = "demo-plugin"
        self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
        self.astrbot_path = Path("/tmp/AstrBot")
        self.script_path = Path("/tmp/run.py")
        self.work_dir = Path("/tmp/work")

    def call_validate_plugin(self, plugin_data=None):
        return self.module.validate_plugin(
            plugin=self.plugin_key,
            plugin_data=self.plugin_data if plugin_data is None else plugin_data,
            astrbot_path=self.astrbot_path,
            script_path=self.script_path,
            work_dir=self.work_dir,
        )

    def test_validate_plugin_worker_timeout(self):
        timeout_exc = subprocess.TimeoutExpired(cmd=["python", str(self.script_path)], timeout=30)
        timeout_exc.stdout = "worker stdout"
        timeout_exc.stderr = "worker stderr"

        with mock.patch(f"{self.module.__name__}.subprocess.run") as run_mock, mock.patch(
            f"{self.module.__name__}.build_process_output_details"
        ) as details_mock:
            run_mock.side_effect = timeout_exc
            details_mock.return_value = "timeout details"

            result = self.call_validate_plugin()

        # stage should be reported as timeout
        self.assertEqual(result["stage"], "timeout")

        # message should reflect timeout; adjust expected text if implementation differs
        self.assertIn("timeout", result["message"].lower())

        # plugin_dir_name should point at this plugin; adjust if your implementation uses a different key
        self.assertEqual(result["plugin_dir_name"], self.plugin_key)

        # build_process_output_details should be called with stdout/stderr from the TimeoutExpired exception
        details_mock.assert_called_once_with("worker stdout", "worker stderr")

Because only part of the file is visible, please verify and adjust the following in your codebase:

  1. call_validate_plugin signature:

    • Ensure validate_plugin actually takes astrbot_path, script_path, and work_dir keyword arguments.
    • If the real signature differs, update the wrapper accordingly.
  2. Result structure:

    • Confirm that validate_plugin returns a mapping with keys "stage", "message", and "plugin_dir_name".
    • If your result object uses attributes or different keys (e.g., result.stage or result.plugin_dir), update the assertions to match.
  3. Timeout message text:

    • The test currently asserts that "timeout" appears in result["message"].lower().
    • If your implementation uses a specific message (e.g., "worker subprocess timed out"), you can tighten this assertion to match exactly.
  4. Patch targets:

    • The test patches subprocess.run and build_process_output_details using self.module.__name__ as the module path.
    • If those functions live in a different module (e.g., a helper module imported by the validator), adjust the patch strings to point at the actual module where they are used.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="486" />
<code_context>
+    )
+
+
+def validate_selected_plugins(
+    *,
+    selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` so each worker task writes directly into the shared results list and only returns data needed for logging, avoiding extra index bookkeeping and mappings.

You can simplify `validate_selected_plugins` by letting each task write directly into `results` and returning the minimal info needed for logging. This removes the `index`/`original_index` juggling and the `future_to_context` mapping without changing behavior.

```python
def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    def task(pos: int, plugin: str, plugin_data: dict) -> tuple[int, str, dict]:
        try:
            result = validate_plugin(
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
        except Exception as exc:
            result = build_result(
                plugin=plugin,
                repo="",
                normalized_repo_url=None,
                ok=False,
                stage="threadpool",
                message=str(exc),
                details=traceback.format_exc(),
            )

        results[pos] = result
        return pos, plugin, result

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures: set[concurrent.futures.Future] = set()

        for pos, (plugin, plugin_data) in enumerate(selected):
            print(f"[{pos + 1}/{total}] Queued {plugin}", flush=True)
            futures.add(executor.submit(task, pos, plugin, plugin_data))

        for future in concurrent.futures.as_completed(futures):
            pos, plugin, result = future.result()
            display_index = pos + 1
            status = "PASS" if result.get("ok") else "FAIL"
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(
                f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}",
                flush=True,
            )

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized
```

This keeps:

- Output indices and messages identical (`pos + 1` mirrors your previous `start=1` logic).
- The same error handling and `build_result(...)` behavior for unexpected exceptions.
- The same final sanity check on `results`.

But it removes the double bookkeeping (`index` vs `original_index`) and the extra `future_to_context` dict, making the control flow easier to follow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

)


def validate_selected_plugins(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying validate_selected_plugins so each worker task writes directly into the shared results list and only returns data needed for logging, avoiding extra index bookkeeping and mappings.

You can simplify validate_selected_plugins by letting each task write directly into results and returning the minimal info needed for logging. This removes the index/original_index juggling and the future_to_context mapping without changing behavior.

def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    def task(pos: int, plugin: str, plugin_data: dict) -> tuple[int, str, dict]:
        try:
            result = validate_plugin(
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
        except Exception as exc:
            result = build_result(
                plugin=plugin,
                repo="",
                normalized_repo_url=None,
                ok=False,
                stage="threadpool",
                message=str(exc),
                details=traceback.format_exc(),
            )

        results[pos] = result
        return pos, plugin, result

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures: set[concurrent.futures.Future] = set()

        for pos, (plugin, plugin_data) in enumerate(selected):
            print(f"[{pos + 1}/{total}] Queued {plugin}", flush=True)
            futures.add(executor.submit(task, pos, plugin, plugin_data))

        for future in concurrent.futures.as_completed(futures):
            pos, plugin, result = future.result()
            display_index = pos + 1
            status = "PASS" if result.get("ok") else "FAIL"
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(
                f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}",
                flush=True,
            )

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized

This keeps:

  • Output indices and messages identical (pos + 1 mirrors your previous start=1 logic).
  • The same error handling and build_result(...) behavior for unexpected exceptions.
  • The same final sanity check on results.

But it removes the double bookkeeping (index vs original_index) and the extra future_to_context dict, making the control flow easier to follow.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="209-193" />
<code_context>
+            "details": str(exc),
+        }
+
+    missing = [
+        field
+        for field in REQUIRED_METADATA_FIELDS
+        if not isinstance(metadata.get(field), str) or not metadata[field].strip()
+    ]
+    if missing:
+        return {
+            "ok": False,
+            "severity": "warn",
</code_context>
<issue_to_address>
**issue (bug_risk):** Severity and `ok` flag for missing metadata fields may conflict with how failures are counted.

Here, missing required metadata yields `ok: False` but `severity: 'warn'`. Later, `build_report` and the final exit code only depend on `severity`, so these cases won’t produce a non-zero exit code. If missing required metadata should fail the smoke validation, either use `severity: 'fail'` here or update `build_report`/`main` to incorporate `ok` when computing failures and the exit code.
</issue_to_address>

### Comment 2
<location path="scripts/validate_plugins/detect_changed_plugins.py" line_range="29-30" />
<code_context>
+    return [name for name, payload in head.items() if base.get(name) != payload]
+
+
+def fetch_base_ref(base_ref: str) -> None:
+    subprocess.run(["git", "fetch", "origin", base_ref, "--depth", "1"], check=True)
+
+
</code_context>
<issue_to_address>
**issue (bug_risk):** A failing `git fetch` for the base ref will abort detection and the workflow without a graceful fallback.

In `detect_pull_request_selection`, `fetch_base_ref` is invoked with `check=True`, so a `CalledProcessError` from `git fetch` (e.g., missing `GITHUB_BASE_REF` or transient network issues) will terminate the script. For consistency with `read_base_plugins_json` and to keep the workflow resilient, catch `CalledProcessError` around (or within) `fetch_base_ref` and fall back to `base = {}` so that only base-comparison logic is skipped instead of failing the entire job.
</issue_to_address>

### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="492" />
<code_context>
+    )
+
+
+def validate_selected_plugins(
+    *,
+    selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` by inlining the worker task and using zero-based indices plus a small status-formatting helper to avoid the current index juggling and indirection.

You can simplify `validate_selected_plugins` without changing behavior by removing the index gymnastics and the inner `task` wrapper, and by extracting the status formatting into a small helper.

### 1. Remove `task` and off‑by‑one indexing

You can capture `index` and `plugin` directly in the closure and use zero‑based indexes internally, only adding 1 for display:

```python
def _format_status(result: dict) -> tuple[str, str, str]:
    severity = result.get("severity")
    if severity is None:
        severity = "pass" if result.get("ok") else "fail"
    status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
    stage = result.get("stage", "unknown")
    message = result.get("message", "")
    return status, stage, message


def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for idx, (plugin, plugin_data) in enumerate(selected):
            display_index = idx + 1
            print(f"[{display_index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_idx[future] = (idx, plugin)

        for future in concurrent.futures.as_completed(future_to_idx):
            idx, plugin = future_to_idx[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[idx] = result
            status, stage, message = _format_status(result)
            display_index = idx + 1
            print(f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized
```

This keeps:

- Output order stable (`results` indexed by original position).
- The same printed messages and indices (still 1‑based in logs).
- Error handling semantics identical (`threadpool` fallback on exception).

But removes the `task` indirection, `original_index` tuple unpacking, and the `index - 1` offset logic, making the function easier to follow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

def precheck_plugin_directory(plugin_dir: Path) -> dict:
metadata_path = plugin_dir / "metadata.yaml"
if not metadata_path.exists():
return {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Severity and ok flag for missing metadata fields may conflict with how failures are counted.

Here, missing required metadata yields ok: False but severity: 'warn'. Later, build_report and the final exit code only depend on severity, so these cases won’t produce a non-zero exit code. If missing required metadata should fail the smoke validation, either use severity: 'fail' here or update build_report/main to incorporate ok when computing failures and the exit code.

Comment thread scripts/validate_plugins/detect_changed_plugins.py
)


def validate_selected_plugins(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying validate_selected_plugins by inlining the worker task and using zero-based indices plus a small status-formatting helper to avoid the current index juggling and indirection.

You can simplify validate_selected_plugins without changing behavior by removing the index gymnastics and the inner task wrapper, and by extracting the status formatting into a small helper.

1. Remove task and off‑by‑one indexing

You can capture index and plugin directly in the closure and use zero‑based indexes internally, only adding 1 for display:

def _format_status(result: dict) -> tuple[str, str, str]:
    severity = result.get("severity")
    if severity is None:
        severity = "pass" if result.get("ok") else "fail"
    status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
    stage = result.get("stage", "unknown")
    message = result.get("message", "")
    return status, stage, message


def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for idx, (plugin, plugin_data) in enumerate(selected):
            display_index = idx + 1
            print(f"[{display_index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_idx[future] = (idx, plugin)

        for future in concurrent.futures.as_completed(future_to_idx):
            idx, plugin = future_to_idx[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[idx] = result
            status, stage, message = _format_status(result)
            display_index = idx + 1
            print(f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized

This keeps:

  • Output order stable (results indexed by original position).
  • The same printed messages and indices (still 1‑based in logs).
  • Error handling semantics identical (threadpool fallback on exception).

But removes the task indirection, original_index tuple unpacking, and the index - 1 offset logic, making the function easier to follow.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • The plugin_dir_name taken from metadata['name'] is used directly as a filesystem path in run_worker and precheck_plugin_directory without sanitization, so it would be safer to validate/normalize it (e.g. reject path separators and ..) before using it in plugin_store / plugin_dir_name while still preserving the original name for reporting.
  • The validator script in scripts/validate_plugins/run.py has grown quite large and mixes CLI handling, I/O, concurrency, and AstrBot worker logic; consider splitting it into smaller modules (e.g. metadata handling, process/worker orchestration, reporting) to make future changes and reuse easier.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `plugin_dir_name` taken from `metadata['name']` is used directly as a filesystem path in `run_worker` and `precheck_plugin_directory` without sanitization, so it would be safer to validate/normalize it (e.g. reject path separators and `..`) before using it in `plugin_store / plugin_dir_name` while still preserving the original name for reporting.
- The validator script in `scripts/validate_plugins/run.py` has grown quite large and mixes CLI handling, I/O, concurrency, and AstrBot worker logic; consider splitting it into smaller modules (e.g. metadata handling, process/worker orchestration, reporting) to make future changes and reuse easier.

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="25-34" />
<code_context>
+
+
+def resolve_astrbot_ref() -> str:
+    try:
+        default_head = subprocess.check_output(
+            ["git", "ls-remote", "--symref", ASTRBOT_REMOTE_URL, "HEAD"],
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The `error` value from `PluginManager.load` is used directly as a message and may not always be a string.

In `run_worker_load_check`, the failure branch does `message=error or "plugin load failed"`. If `error` is not a string (e.g., an exception or structured object), this will put a non-string into the JSON result, conflicting with the `build_result` contract and hurting readability. Consider `message = str(error) if error else "plugin load failed"` and, if you need richer data, place it in the `details` field instead.
</issue_to_address>

### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="492" />
<code_context>
+    )
+
+
+def validate_selected_plugins(
+    *,
+    selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` by inlining the worker call and relying on the existing future-to-index mapping instead of returning indices from a wrapper task.

`validate_selected_plugins` can be simplified without changing behavior by removing the extra `task` wrapper, the `(index, result)` pairing, and the “missing results” post-check. You already track indices in `future_to_context`, so you don’t need to also return indices from the worker.

A more direct version:

```python
def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected, start=1):
            print(f"[{index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index - 1] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    # All futures are iterated, so all indices 1..total are filled
    return [r for r in results if r is not None]
```

This keeps:

- parallelism and as-completed logging,
- ordered result list,
- error handling (threadpool exceptions → `build_result`),

while removing:

- the inner `task` function,
- the index piggybacked in the result,
- the extra “missing results” runtime check.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread scripts/validate_plugins/run.py
)


def validate_selected_plugins(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying validate_selected_plugins by inlining the worker call and relying on the existing future-to-index mapping instead of returning indices from a wrapper task.

validate_selected_plugins can be simplified without changing behavior by removing the extra task wrapper, the (index, result) pairing, and the “missing results” post-check. You already track indices in future_to_context, so you don’t need to also return indices from the worker.

A more direct version:

def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected, start=1):
            print(f"[{index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index - 1] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    # All futures are iterated, so all indices 1..total are filled
    return [r for r in results if r is not None]

This keeps:

  • parallelism and as-completed logging,
  • ordered result list,
  • error handling (threadpool exceptions → build_result),

while removing:

  • the inner task function,
  • the index piggybacked in the result,
  • the extra “missing results” runtime check.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • Consider validating numeric CLI arguments like --max-workers, --clone-timeout, and --load-timeout to ensure they are positive (e.g., rejecting max_workers <= 0), since values like --max-workers 0 will cause ThreadPoolExecutor to fail at runtime in validate_selected_plugins.
  • In precheck_plugin_directory and build_result, results with severity='warn' still have ok=False; if warnings are meant to be non-fatal, you may want to set ok=True for those cases or otherwise clarify/align the semantics so downstream consumers aren’t forced to treat warnings as failures.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider validating numeric CLI arguments like `--max-workers`, `--clone-timeout`, and `--load-timeout` to ensure they are positive (e.g., rejecting `max_workers <= 0`), since values like `--max-workers 0` will cause `ThreadPoolExecutor` to fail at runtime in `validate_selected_plugins`.
- In `precheck_plugin_directory` and `build_result`, results with `severity='warn'` still have `ok=False`; if warnings are meant to be non-fatal, you may want to set `ok=True` for those cases or otherwise clarify/align the semantics so downstream consumers aren’t forced to treat warnings as failures.

## Individual Comments

### Comment 1
<location path="tests/test_validate_plugins.py" line_range="469" />
<code_context>
+        print_mock.assert_any_call("[3/3] PASS plugin-c [load] c", flush=True)
+
+
+class ValidatePluginTests(unittest.TestCase):
+    def setUp(self):
+        self.module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for the worker subprocess timeout path in `validate_plugin`.

`validate_plugin` has a branch that handles `subprocess.TimeoutExpired`, mapping it to `stage="timeout"` and using `build_process_output_details(exc.stdout, exc.stderr)`. `ValidatePluginTests` currently don’t exercise this. Please add a test that patches `subprocess.run` to raise `TimeoutExpired` and asserts that the result has `stage == "timeout"`, includes the `plugin_dir_name`, and that `build_process_output_details` is called with the expected arguments, so the load-timeout behavior is covered and stable.

Suggested implementation:

```python
class ValidatePluginTests(unittest.TestCase):
    def setUp(self):
        self.module = load_validator_module()
        self.plugin_key = "demo-plugin"
        self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
        self.astrbot_path = Path("/tmp/AstrBot")
        self.script_path = Path("/tmp/run.py")
        self.work_dir = Path("/tmp/work")

    def call_validate_plugin(self, plugin_data=None):
        return self.module.validate_plugin(
            plugin=self.plugin_key,
            plugin_data=self.plugin_data if plugin_data is None else plugin_data,
        )

    def test_validate_plugin_load_timeout_uses_timeout_stage_and_builds_output_details(self):
        # Prepare a TimeoutExpired instance with stdout/stderr set so we can
        # assert what gets forwarded into build_process_output_details.
        timeout_exc = subprocess.TimeoutExpired(
            cmd=[sys.executable, str(self.script_path)],
            timeout=300,
        )
        timeout_exc.stdout = "timeout-stdout"
        timeout_exc.stderr = "timeout-stderr"

        with mock.patch.object(self.module, "subprocess") as subprocess_mod, \
             mock.patch.object(self.module, "build_process_output_details") as build_details:
            # Ensure the validate_plugin `except subprocess.TimeoutExpired` branch
            # matches this class and that `run` raises the configured exception.
            subprocess_mod.TimeoutExpired = subprocess.TimeoutExpired
            subprocess_mod.run.side_effect = timeout_exc

            result = self.call_validate_plugin()

        self.assertEqual(result["stage"], "timeout")
        # The timeout result should still include the plugin_dir_name so callers
        # can attribute the failure correctly.
        self.assertIn("plugin_dir_name", result)
        self.assertEqual(result["plugin_dir_name"], "demo-plugin")
        build_details.assert_called_once_with("timeout-stdout", "timeout-stderr")

```

To make this compile and run, ensure the following are present at the top of `tests/test_validate_plugins.py` (if they are not already):

1. Imports:
   - `import sys`
   - `import subprocess`
   - `from unittest import mock` (or `import unittest.mock as mock` and adjust usage accordingly).

2. The helper `call_validate_plugin` likely needs to pass through all required parameters that `validate_plugin` expects (e.g., `astrbot_path`, `script_path`, `work_dir`, and any others used elsewhere in the file). If those are not already being passed, extend the call accordingly so the new test exercises the real code path.

3. If `build_process_output_details` is not defined directly on `self.module` but imported from another module, adjust the `mock.patch.object(self.module, "build_process_output_details")` line to patch the correct import location (e.g., `mock.patch("validator_module.build_process_output_details")` using the actual module path).
</issue_to_address>

### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="512" />
<code_context>
+    )
+
+
+def validate_selected_plugins(
+    *,
+    selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying validate_selected_plugins by submitting validate_plugin directly to the executor and using the future-to-context mapping instead of an inner task wrapper and index plumbing.

You can simplify `validate_selected_plugins` by removing the inner `task` function and the extra `(index, result)` plumbing. The `ThreadPoolExecutor` already gives you futures; you only need a mapping to the original index and plugin name.

This reduces nesting and avoids the `original_index` variable while preserving ordering, logging, and error handling:

```python
def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected, start=1):
            print(f"[{index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index - 1] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized
```

This keeps the same functionality (including ordered results and log format) but removes one level of indirection and simplifies the error path.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

print_mock.assert_any_call("[3/3] PASS plugin-c [load] c", flush=True)


class ValidatePluginTests(unittest.TestCase):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test for the worker subprocess timeout path in validate_plugin.

validate_plugin has a branch that handles subprocess.TimeoutExpired, mapping it to stage="timeout" and using build_process_output_details(exc.stdout, exc.stderr). ValidatePluginTests currently don’t exercise this. Please add a test that patches subprocess.run to raise TimeoutExpired and asserts that the result has stage == "timeout", includes the plugin_dir_name, and that build_process_output_details is called with the expected arguments, so the load-timeout behavior is covered and stable.

Suggested implementation:

class ValidatePluginTests(unittest.TestCase):
    def setUp(self):
        self.module = load_validator_module()
        self.plugin_key = "demo-plugin"
        self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
        self.astrbot_path = Path("/tmp/AstrBot")
        self.script_path = Path("/tmp/run.py")
        self.work_dir = Path("/tmp/work")

    def call_validate_plugin(self, plugin_data=None):
        return self.module.validate_plugin(
            plugin=self.plugin_key,
            plugin_data=self.plugin_data if plugin_data is None else plugin_data,
        )

    def test_validate_plugin_load_timeout_uses_timeout_stage_and_builds_output_details(self):
        # Prepare a TimeoutExpired instance with stdout/stderr set so we can
        # assert what gets forwarded into build_process_output_details.
        timeout_exc = subprocess.TimeoutExpired(
            cmd=[sys.executable, str(self.script_path)],
            timeout=300,
        )
        timeout_exc.stdout = "timeout-stdout"
        timeout_exc.stderr = "timeout-stderr"

        with mock.patch.object(self.module, "subprocess") as subprocess_mod, \
             mock.patch.object(self.module, "build_process_output_details") as build_details:
            # Ensure the validate_plugin `except subprocess.TimeoutExpired` branch
            # matches this class and that `run` raises the configured exception.
            subprocess_mod.TimeoutExpired = subprocess.TimeoutExpired
            subprocess_mod.run.side_effect = timeout_exc

            result = self.call_validate_plugin()

        self.assertEqual(result["stage"], "timeout")
        # The timeout result should still include the plugin_dir_name so callers
        # can attribute the failure correctly.
        self.assertIn("plugin_dir_name", result)
        self.assertEqual(result["plugin_dir_name"], "demo-plugin")
        build_details.assert_called_once_with("timeout-stdout", "timeout-stderr")

To make this compile and run, ensure the following are present at the top of tests/test_validate_plugins.py (if they are not already):

  1. Imports:

    • import sys
    • import subprocess
    • from unittest import mock (or import unittest.mock as mock and adjust usage accordingly).
  2. The helper call_validate_plugin likely needs to pass through all required parameters that validate_plugin expects (e.g., astrbot_path, script_path, work_dir, and any others used elsewhere in the file). If those are not already being passed, extend the call accordingly so the new test exercises the real code path.

  3. If build_process_output_details is not defined directly on self.module but imported from another module, adjust the mock.patch.object(self.module, "build_process_output_details") line to patch the correct import location (e.g., mock.patch("validator_module.build_process_output_details") using the actual module path).

)


def validate_selected_plugins(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying validate_selected_plugins by submitting validate_plugin directly to the executor and using the future-to-context mapping instead of an inner task wrapper and index plumbing.

You can simplify validate_selected_plugins by removing the inner task function and the extra (index, result) plumbing. The ThreadPoolExecutor already gives you futures; you only need a mapping to the original index and plugin name.

This reduces nesting and avoids the original_index variable while preserving ordering, logging, and error handling:

def validate_selected_plugins(
    *,
    selected: list[tuple[str, dict]],
    astrbot_path: Path,
    script_path: Path,
    work_dir: Path,
    clone_timeout: int,
    load_timeout: int,
    max_workers: int,
) -> list[dict]:
    total = len(selected)
    results: list[dict | None] = [None] * total

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected, start=1):
            print(f"[{index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(
                validate_plugin,
                plugin=plugin,
                plugin_data=plugin_data,
                astrbot_path=astrbot_path,
                script_path=script_path,
                work_dir=work_dir,
                clone_timeout=clone_timeout,
                load_timeout=load_timeout,
            )
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index - 1] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")

    return finalized

This keeps the same functionality (including ordered results and log format) but removes one level of indirection and simplifies the error path.

@zouyonghe
Copy link
Copy Markdown
Member Author

@sourcery-ai review

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="521" />
<code_context>
+    )
+
+
+def validate_selected_plugins(
+    *,
+    selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the thread pool indexing logic and tightening the worker stdout contract so result collection and worker output parsing are straightforward and avoid extra bookkeeping.

1. **Parallel validation index handling is more complex than needed**

You’re manually juggling 1-based indices, storing `(index, plugin)` in a map, returning `(original_index, result)` from `task`, then subtracting 1 to index into `results`. You can keep the 1-based user-facing progress output while using 0-based indices internally and avoid passing indices through the worker function.

This keeps behavior identical (including printed `[1/…]` counters) but reduces the mental overhead:

```python
def validate_selected_plugins(...):
    total = len(selected)
    results: list[dict | None] = [None] * total

    def task(plugin: str, plugin_data: dict) -> dict:
        return validate_plugin(
            plugin=plugin,
            plugin_data=plugin_data,
            astrbot_path=astrbot_path,
            script_path=script_path,
            work_dir=work_dir,
            clone_timeout=clone_timeout,
            load_timeout=load_timeout,
        )

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected):
            human_index = index + 1
            print(f"[{human_index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(task, plugin, plugin_data)
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            human_index = index + 1
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{human_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")
    return finalized
```

2. **Worker output parsing can be simplified with a tighter contract**

`parse_worker_output` currently scans stdout backwards to find a JSON line and mutates it to inject `plugin`, `repo`, and `normalized_repo_url`. If you can tighten the contract so that the worker always prints a *single* JSON line (and sends any debug/log output to stderr), the parsing logic becomes much simpler and easier to reason about.

On the worker side:

```python
def run_worker(args: argparse.Namespace) -> int:
    ...
    try:
        result = asyncio.run(
            run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
        )
    except Exception as exc:
        result = build_result(
            plugin=args.plugin_dir_name,
            repo=args.normalized_repo_url,
            normalized_repo_url=args.normalized_repo_url,
            ok=False,
            stage="worker",
            message=str(exc),
            plugin_dir_name=args.plugin_dir_name,
            details=traceback.format_exc(),
        )
    finally:
        shutil.rmtree(temp_root, ignore_errors=True)

    # Ensure only the structured result is printed on stdout
    print(json.dumps(result, ensure_ascii=False))
    return 0 if result["ok"] else 1
```

And then `parse_worker_output` can assume “one JSON object per run” and skip the reverse scan and mutation:

```python
def parse_worker_output(
    *,
    plugin: str,
    repo: str,
    normalized_repo_url: str,
    completed: subprocess.CompletedProcess[str],
    plugin_dir_name: str,
) -> dict:
    stdout = completed.stdout.strip()
    if stdout:
        try:
            payload = json.loads(stdout)
        except json.JSONDecodeError:
            payload = None
        if isinstance(payload, dict):
            # Optionally still enforce/override identity fields here
            payload.setdefault("plugin", plugin)
            payload.setdefault("repo", repo)
            payload.setdefault("normalized_repo_url", normalized_repo_url)
            payload.setdefault("plugin_dir_name", plugin_dir_name)
            return payload

    stderr = completed.stderr.strip()
    message = stderr or stdout or "worker returned no structured output"
    return build_result(
        plugin=plugin,
        repo=repo,
        normalized_repo_url=normalized_repo_url,
        ok=False,
        stage="worker",
        message=message,
        plugin_dir_name=plugin_dir_name,
    )
```

This keeps all current capabilities (structured result, fallback to error result) but removes the need for scanning stdout line-by-line and retrofitting identity fields.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

)


def validate_selected_plugins(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the thread pool indexing logic and tightening the worker stdout contract so result collection and worker output parsing are straightforward and avoid extra bookkeeping.

  1. Parallel validation index handling is more complex than needed

You’re manually juggling 1-based indices, storing (index, plugin) in a map, returning (original_index, result) from task, then subtracting 1 to index into results. You can keep the 1-based user-facing progress output while using 0-based indices internally and avoid passing indices through the worker function.

This keeps behavior identical (including printed [1/…] counters) but reduces the mental overhead:

def validate_selected_plugins(...):
    total = len(selected)
    results: list[dict | None] = [None] * total

    def task(plugin: str, plugin_data: dict) -> dict:
        return validate_plugin(
            plugin=plugin,
            plugin_data=plugin_data,
            astrbot_path=astrbot_path,
            script_path=script_path,
            work_dir=work_dir,
            clone_timeout=clone_timeout,
            load_timeout=load_timeout,
        )

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}

        for index, (plugin, plugin_data) in enumerate(selected):
            human_index = index + 1
            print(f"[{human_index}/{total}] Queued {plugin}", flush=True)
            future = executor.submit(task, plugin, plugin_data)
            future_to_context[future] = (index, plugin)

        for future in concurrent.futures.as_completed(future_to_context):
            index, plugin = future_to_context[future]
            human_index = index + 1
            try:
                result = future.result()
            except Exception as exc:
                result = build_result(
                    plugin=plugin,
                    repo="",
                    normalized_repo_url=None,
                    ok=False,
                    stage="threadpool",
                    message=str(exc),
                    details=traceback.format_exc(),
                )

            results[index] = result
            severity = result.get("severity", "pass" if result.get("ok") else "fail")
            status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
            stage = result.get("stage", "unknown")
            message = result.get("message", "")
            print(f"[{human_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)

    finalized = [result for result in results if result is not None]
    if len(finalized) != total:
        raise RuntimeError("parallel validation finished with missing results")
    return finalized
  1. Worker output parsing can be simplified with a tighter contract

parse_worker_output currently scans stdout backwards to find a JSON line and mutates it to inject plugin, repo, and normalized_repo_url. If you can tighten the contract so that the worker always prints a single JSON line (and sends any debug/log output to stderr), the parsing logic becomes much simpler and easier to reason about.

On the worker side:

def run_worker(args: argparse.Namespace) -> int:
    ...
    try:
        result = asyncio.run(
            run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
        )
    except Exception as exc:
        result = build_result(
            plugin=args.plugin_dir_name,
            repo=args.normalized_repo_url,
            normalized_repo_url=args.normalized_repo_url,
            ok=False,
            stage="worker",
            message=str(exc),
            plugin_dir_name=args.plugin_dir_name,
            details=traceback.format_exc(),
        )
    finally:
        shutil.rmtree(temp_root, ignore_errors=True)

    # Ensure only the structured result is printed on stdout
    print(json.dumps(result, ensure_ascii=False))
    return 0 if result["ok"] else 1

And then parse_worker_output can assume “one JSON object per run” and skip the reverse scan and mutation:

def parse_worker_output(
    *,
    plugin: str,
    repo: str,
    normalized_repo_url: str,
    completed: subprocess.CompletedProcess[str],
    plugin_dir_name: str,
) -> dict:
    stdout = completed.stdout.strip()
    if stdout:
        try:
            payload = json.loads(stdout)
        except json.JSONDecodeError:
            payload = None
        if isinstance(payload, dict):
            # Optionally still enforce/override identity fields here
            payload.setdefault("plugin", plugin)
            payload.setdefault("repo", repo)
            payload.setdefault("normalized_repo_url", normalized_repo_url)
            payload.setdefault("plugin_dir_name", plugin_dir_name)
            return payload

    stderr = completed.stderr.strip()
    message = stderr or stdout or "worker returned no structured output"
    return build_result(
        plugin=plugin,
        repo=repo,
        normalized_repo_url=normalized_repo_url,
        ok=False,
        stage="worker",
        message=message,
        plugin_dir_name=plugin_dir_name,
    )

This keeps all current capabilities (structured result, fallback to error result) but removes the need for scanning stdout line-by-line and retrofitting identity fields.

@zouyonghe zouyonghe merged commit 6147ad4 into AstrBotDevs:main Apr 24, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants