feature: add plugin smoke validation workflow#716
feature: add plugin smoke validation workflow#716zouyonghe merged 15 commits intoAstrBotDevs:mainfrom
Conversation
There was a problem hiding this comment.
Hey - I've found 2 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="189-197" />
<code_context>
+ }
+
+
+def load_plugins_index(path: Path) -> dict[str, dict]:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ if not isinstance(data, dict):
+ raise ValueError("plugins.json must contain a JSON object")
+ result = {}
+ for key, value in data.items():
+ if isinstance(key, str) and isinstance(value, dict):
+ result[key] = value
+ return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silently skipping non-dict plugin entries in plugins.json may hide configuration errors.
In `load_plugins_index`, non-dict values are silently ignored, so misconfigured entries (e.g., a string or list) just disappear from the index. Instead, consider raising an error or at least logging when a non-dict value is encountered, so configuration problems are visible rather than hidden.
```suggestion
def load_plugins_index(path: Path) -> dict[str, dict]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("plugins.json must contain a JSON object")
result: dict[str, dict] = {}
errors: list[str] = []
for key, value in data.items():
if not isinstance(key, str):
errors.append(f"plugins.json contains non-string key {key!r}")
continue
if not isinstance(value, dict):
errors.append(
f"plugins.json entry {key!r} must be a JSON object, "
f"got {type(value).__name__}"
)
continue
result[key] = value
if errors:
raise ValueError(
"Invalid entries found in plugins.json:\n" + "\n".join(errors)
)
return result
```
</issue_to_address>
### Comment 2
<location path="tests/test_validate_plugins.py" line_range="62-71" />
<code_context>
+ self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])
+
+
+class MetadataValidationTests(unittest.TestCase):
+ def test_reports_missing_required_metadata_fields(self):
+ module = load_validator_module()
+
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ plugin_dir = Path(tmp_dir)
+ (plugin_dir / "metadata.yaml").write_text(
+ "name: demo_plugin\nauthor: AstrBot Team\n",
+ encoding="utf-8",
+ )
+ (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
+
+ result = module.precheck_plugin_directory(plugin_dir)
+
+ self.assertFalse(result["ok"])
+ self.assertEqual(result["stage"], "metadata")
+ self.assertIn("desc", result["message"])
+ self.assertIn("version", result["message"])
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for `precheck_plugin_directory` success, missing `metadata.yaml`, and missing entrypoint
These tests currently only cover the partial‑metadata branch. Please also add cases for:
- Missing `metadata.yaml`: expect `ok=False`, `stage="metadata"`, and `"missing metadata.yaml"`.
- Fully valid metadata with an existing entrypoint: expect `ok=True`, `stage="precheck"`, and that `plugin_dir_name` and `metadata` are present.
- Valid metadata but no `main.py` or `<plugin_name>.py`: expect `ok=False`, `stage="entrypoint"`, with the corresponding error message.
This will exercise all `precheck_plugin_directory` branches.
Suggested implementation:
```python
self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])
class MetadataValidationTests(unittest.TestCase):
def test_reports_missing_required_metadata_fields(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\nauthor: AstrBot Team\n",
encoding="utf-8",
)
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "metadata")
self.assertIn("desc", result["message"])
self.assertIn("version", result["message"])
def test_reports_missing_metadata_file(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
# Do not create metadata.yaml
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "metadata")
self.assertIn("missing metadata.yaml", result["message"])
def test_precheck_succeeds_with_valid_metadata_and_entrypoint(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\n"
"author: AstrBot Team\n"
"desc: Demo plugin.\n"
"version: 0.1.0\n",
encoding="utf-8",
)
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertTrue(result["ok"])
self.assertEqual(result["stage"], "precheck")
self.assertIn("plugin_dir_name", result)
self.assertIn("metadata", result)
self.assertEqual(result["metadata"]["name"], "demo_plugin")
def test_reports_missing_entrypoint_file(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\n"
"author: AstrBot Team\n"
"desc: Demo plugin.\n"
"version: 0.1.0\n",
encoding="utf-8",
)
# Intentionally do not create main.py or demo_plugin.py
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "entrypoint")
self.assertIn("entrypoint", result["message"])
ROOT = Path(__file__).resolve().parents[1]
```
These tests assume that:
- `unittest`, `tempfile`, and `Path` are already imported in this file.
- `module.precheck_plugin_directory` returns a dict with keys: `"ok"`, `"stage"`, `"message"`, and on success `"plugin_dir_name"` and `"metadata"`.
If any of these imports are missing at the top of `tests/test_validate_plugins.py`, add:
```python
import unittest
import tempfile
from pathlib import Path
```
If the exact error message for missing metadata is different from `"missing metadata.yaml"`, adjust the assertion in `test_reports_missing_metadata_file` to match the actual message (for example, by asserting only a distinctive substring).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| class MetadataValidationTests(unittest.TestCase): | ||
| def test_reports_missing_required_metadata_fields(self): | ||
| module = load_validator_module() | ||
|
|
||
| with tempfile.TemporaryDirectory() as tmp_dir: | ||
| plugin_dir = Path(tmp_dir) | ||
| (plugin_dir / "metadata.yaml").write_text( | ||
| "name: demo_plugin\nauthor: AstrBot Team\n", | ||
| encoding="utf-8", | ||
| ) |
There was a problem hiding this comment.
suggestion (testing): Add tests for precheck_plugin_directory success, missing metadata.yaml, and missing entrypoint
These tests currently only cover the partial‑metadata branch. Please also add cases for:
- Missing
metadata.yaml: expectok=False,stage="metadata", and"missing metadata.yaml". - Fully valid metadata with an existing entrypoint: expect
ok=True,stage="precheck", and thatplugin_dir_nameandmetadataare present. - Valid metadata but no
main.pyor<plugin_name>.py: expectok=False,stage="entrypoint", with the corresponding error message.
This will exercise all precheck_plugin_directory branches.
Suggested implementation:
self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"])
class MetadataValidationTests(unittest.TestCase):
def test_reports_missing_required_metadata_fields(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\nauthor: AstrBot Team\n",
encoding="utf-8",
)
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "metadata")
self.assertIn("desc", result["message"])
self.assertIn("version", result["message"])
def test_reports_missing_metadata_file(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
# Do not create metadata.yaml
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "metadata")
self.assertIn("missing metadata.yaml", result["message"])
def test_precheck_succeeds_with_valid_metadata_and_entrypoint(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\n"
"author: AstrBot Team\n"
"desc: Demo plugin.\n"
"version: 0.1.0\n",
encoding="utf-8",
)
(plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8")
result = module.precheck_plugin_directory(plugin_dir)
self.assertTrue(result["ok"])
self.assertEqual(result["stage"], "precheck")
self.assertIn("plugin_dir_name", result)
self.assertIn("metadata", result)
self.assertEqual(result["metadata"]["name"], "demo_plugin")
def test_reports_missing_entrypoint_file(self):
module = load_validator_module()
with tempfile.TemporaryDirectory() as tmp_dir:
plugin_dir = Path(tmp_dir)
(plugin_dir / "metadata.yaml").write_text(
"name: demo_plugin\n"
"author: AstrBot Team\n"
"desc: Demo plugin.\n"
"version: 0.1.0\n",
encoding="utf-8",
)
# Intentionally do not create main.py or demo_plugin.py
result = module.precheck_plugin_directory(plugin_dir)
self.assertFalse(result["ok"])
self.assertEqual(result["stage"], "entrypoint")
self.assertIn("entrypoint", result["message"])
ROOT = Path(__file__).resolve().parents[1]These tests assume that:
unittest,tempfile, andPathare already imported in this file.module.precheck_plugin_directoryreturns a dict with keys:"ok","stage","message", and on success"plugin_dir_name"and"metadata".
If any of these imports are missing at the top of tests/test_validate_plugins.py, add:
import unittest
import tempfile
from pathlib import PathIf the exact error message for missing metadata is different from "missing metadata.yaml", adjust the assertion in test_reports_missing_metadata_file to match the actual message (for example, by asserting only a distinctive substring).
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive plugin validation system for AstrBot, including a validation script and unit tests. The script manages repository cloning, metadata verification, and plugin loading within a worker process. Review feedback highlights several areas for improvement: enhancing the simple YAML parser to handle trailing comments, ensuring metadata validation supports non-string types like numeric versions, tightening name sanitization to prevent potential path traversal, and isolating plugin output to prevent interference with the worker's JSON communication.
| for raw_line in path.read_text(encoding="utf-8").splitlines(): | ||
| line = raw_line.strip() | ||
| if not line or line.startswith("#") or ":" not in line: | ||
| continue | ||
| key, value = line.split(":", 1) | ||
| result[key.strip()] = value.strip().strip("\"'") | ||
| return result |
There was a problem hiding this comment.
The simple YAML parser fallback does not handle trailing comments (e.g., key: value # comment), which will cause the comment text to be included in the parsed value. This can lead to validation failures when PyYAML is not installed in the environment.
| for raw_line in path.read_text(encoding="utf-8").splitlines(): | |
| line = raw_line.strip() | |
| if not line or line.startswith("#") or ":" not in line: | |
| continue | |
| key, value = line.split(":", 1) | |
| result[key.strip()] = value.strip().strip("\"'") | |
| return result | |
| for raw_line in path.read_text(encoding="utf-8").splitlines(): | |
| line = raw_line.split("#", 1)[0].strip() | |
| if not line or ":" not in line: | |
| continue | |
| key, value = line.split(":", 1) | |
| result[key.strip()] = value.strip().strip("\"'") | |
| return result |
| missing = [ | ||
| field | ||
| for field in REQUIRED_METADATA_FIELDS | ||
| if not isinstance(metadata.get(field), str) or not metadata[field].strip() | ||
| ] |
There was a problem hiding this comment.
The type check isinstance(metadata.get(field), str) will fail if the YAML parser infers a non-string type for a field, such as a float or integer for the version number (e.g., version: 1.0). It is safer to check for existence and then validate the string representation of the value.
| missing = [ | |
| field | |
| for field in REQUIRED_METADATA_FIELDS | |
| if not isinstance(metadata.get(field), str) or not metadata[field].strip() | |
| ] | |
| missing = [ | |
| field | |
| for field in REQUIRED_METADATA_FIELDS | |
| if metadata.get(field) is None or not str(metadata[field]).strip() | |
| ] |
|
|
||
|
|
||
| def sanitize_name(name: str) -> str: | ||
| sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-") |
There was a problem hiding this comment.
Allowing the . character in the sanitized name could potentially allow directory traversal if a plugin name is exactly ... While the risk is mitigated by replacing / with -, it is safer to restrict the allowed characters to alphanumeric, underscores, and hyphens.
| sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-") | |
| sanitized = re.sub(r"[^A-Za-z0-9_-]+", "-", name).strip("-") |
| result = asyncio.run( | ||
| run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url) | ||
| ) |
There was a problem hiding this comment.
If the plugin or AstrBot prints to stdout during the load check, it will pollute the worker's output and cause JSON parsing to fail in the parent process. Redirecting sys.stdout to sys.stderr during the asyncio.run call ensures that only the final JSON result is sent to stdout.
_stdout = sys.stdout
sys.stdout = sys.stderr
try:
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
finally:
sys.stdout = _stdout|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 7 issues, and left some high level feedback:
- In
run_worker, you callbuild_worker_sys_pathand then insert the entries intosys.pathin reversed order, which meansastrbot_pathis prepended beforeastrbot_rooteven thoughbuild_worker_sys_path(and its test name) imply the opposite precedence; consider dropping thereversed()so the root override is searched before the codebase. - The workflow step that detects changed plugins assumes
plugins.jsonexists on the base ref and will causegit show origin/$base_ref:plugins.jsonto fail hard if it doesn't; you might want to guard this with a file-existence check or fall back to validating all plugins in that case.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `run_worker`, you call `build_worker_sys_path` and then insert the entries into `sys.path` in reversed order, which means `astrbot_path` is prepended before `astrbot_root` even though `build_worker_sys_path` (and its test name) imply the opposite precedence; consider dropping the `reversed()` so the root override is searched before the codebase.
- The workflow step that detects changed plugins assumes `plugins.json` exists on the base ref and will cause `git show origin/$base_ref:plugins.json` to fail hard if it doesn't; you might want to guard this with a file-existence check or fall back to validating all plugins in that case.
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="295" />
<code_context>
+ message=str(exc),
+ )
+
+ plugin_clone_dir = work_dir / sanitize_name(plugin)
+ try:
+ clone_plugin_repo(normalized_repo_url, plugin_clone_dir)
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential directory collision when different plugin keys sanitize to the same name.
Using only `sanitize_name(plugin)` for the directory can cause collisions (e.g. `"foo bar"` and `"foo/bar"` both mapping to `"foo_bar"`), leading to clones overwriting each other or mixing files. Consider making the directory name unique by incorporating the original plugin key and/or a short, stable hash, e.g. `sanitize_name(f"{plugin}-{hash(plugin)}")` or similar.
</issue_to_address>
### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="219-224" />
<code_context>
+ return sanitized or "plugin"
+
+
+def clone_plugin_repo(repo_url: str, destination: Path) -> None:
+ subprocess.run(
+ ["git", "clone", "--depth", "1", repo_url, str(destination)],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Git clone has no timeout, which can hang the whole validation run.
Since this `subprocess.run` call has no timeout, a stalled or very slow `git clone` can block validation indefinitely. Please add a reasonable timeout (e.g., aligned with or derived from `load_timeout`, or a configurable value) to make the pipeline more resilient to flaky network or Git issues.
Suggested implementation:
```python
def clone_plugin_repo(
repo_url: str,
destination: Path,
timeout: float | None = None,
) -> None:
subprocess.run(
["git", "clone", "--depth", "1", repo_url, str(destination)],
check=True,
capture_output=True,
text=True,
timeout=timeout,
)
```
1. Identify the place(s) where `clone_plugin_repo` is called in this file and pass an appropriate `timeout` value (for example, a value derived from an existing `load_timeout` or other configuration).
2. If there is a central configuration for timeouts in the validation pipeline, add a dedicated setting for the git-clone timeout and thread it through to these call sites.
</issue_to_address>
### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="18-27" />
<code_context>
+ normalized_repo_url=normalized_repo_url,
+ )
+
+ try:
+ completed = subprocess.run(
+ command,
+ check=False,
+ capture_output=True,
+ text=True,
+ timeout=load_timeout,
+ )
+ except subprocess.TimeoutExpired:
+ return build_result(
+ plugin=plugin,
</code_context>
<issue_to_address>
**suggestion:** Timeout handling for the worker doesn’t surface partial stdout/stderr, which can hinder debugging.
On timeout, this path drops any stdout/stderr the worker produced, even though those logs may contain useful diagnostics. Consider catching `subprocess.TimeoutExpired` as `exc` and, when available, adding `exc.stdout`/`exc.stderr` to `details` so timeouts are easier to debug.
</issue_to_address>
### Comment 4
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="51-59" />
<code_context>
+ import subprocess
+ from pathlib import Path
+
+ base_ref = os.environ["GITHUB_BASE_REF"]
+ subprocess.run(["git", "fetch", "origin", base_ref, "--depth", "1"], check=True)
+ base = json.loads(
+ subprocess.check_output(
+ ["git", "show", f"origin/{base_ref}:plugins.json"],
+ text=True,
+ )
+ )
+ head = json.loads(Path("plugins.json").read_text(encoding="utf-8"))
+
+ changed = [
</code_context>
<issue_to_address>
**issue (bug_risk):** Diff detection assumes plugins.json exists on the base branch and is valid JSON, which can break on first-add or malformed cases.
This step always runs `git show origin/{base_ref}:plugins.json` and `json.loads` on both base and head. If `plugins.json` doesn’t exist on the base branch, `git show` will error, and any malformed JSON in either file will crash the job before validation. Consider wrapping the `git show` and JSON parsing in try/except, treating a missing/invalid base as `{}` and an invalid head as `SHOULD_VALIDATE=false`, so the workflow remains robust when `plugins.json` is new or temporarily broken.
</issue_to_address>
### Comment 5
<location path="tests/test_validate_plugins.py" line_range="26" />
<code_context>
+ return module
+
+
+class NormalizeRepoUrlTests(unittest.TestCase):
+ def test_strips_git_suffix_trailing_slash_and_query(self):
+ module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add more edge-case coverage for `normalize_repo_url` error handling.
Current `NormalizeRepoUrlTests` only cover a happy-path GitHub URL and a non-GitHub host. Please add tests that assert:
- Non-HTTP schemes (e.g. `git://github.com/...`, `ssh://github.com/...`) raise `ValueError`.
- Missing owner and/or repo in the path (e.g. `https://github.com/`, `https://github.com/owner`, `https://github.com/owner/`) trigger the "repo URL must include owner and repository" branch.
- Empty segments or redundant slashes (e.g. `https://github.com//repo`, `https://github.com/owner//`).
- Leading/trailing whitespace is stripped as expected.
This will exercise all branches of `normalize_repo_url` and better guard against malformed URLs.
</issue_to_address>
### Comment 6
<location path="tests/test_validate_plugins.py" line_range="44" />
<code_context>
+ module.normalize_repo_url("https://gitlab.com/example/demo-plugin")
+
+
+class SelectPluginsTests(unittest.TestCase):
+ def test_returns_all_plugins_when_limit_is_none(self):
+ module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Extend `select_plugins` tests to cover limits and missing plugin names.
Please add tests for:
1. `limit` handling when `requested_names` is `None` and `limit` is smaller than the total plugins (e.g., 3 plugins, `limit=1`), asserting only the first N are returned in order.
2. The case where `requested_names` includes a name not in the index, asserting a `KeyError("plugin not found: ...")` is raised.
These will fully specify the selection behavior driven by CLI arguments.
Suggested implementation:
```python
import unittest
from collections import OrderedDict
```
```python
class SelectPluginsTests(unittest.TestCase):
def test_returns_all_plugins_when_limit_is_none(self):
module = load_validator_module()
plugin_index = OrderedDict(
[
("plugin-one", object()),
("plugin-two", object()),
("plugin-three", object()),
]
)
selected = list(
module.select_plugins(
plugin_index,
requested_names=None,
limit=None,
)
)
# When limit is None and no names are requested, all plugins are returned in index order.
self.assertEqual(
selected,
list(plugin_index.values()),
)
def test_respects_limit_when_requested_names_is_none(self):
module = load_validator_module()
plugin_index = OrderedDict(
[
("plugin-one", object()),
("plugin-two", object()),
("plugin-three", object()),
]
)
selected = list(
module.select_plugins(
plugin_index,
requested_names=None,
limit=1,
)
)
# When limit is smaller than the total number of plugins, only the first N are returned in order.
self.assertEqual(
selected,
list(plugin_index.values())[:1],
)
def test_raises_key_error_for_unknown_requested_plugin(self):
module = load_validator_module()
plugin_index = {
"known-plugin": object(),
}
with self.assertRaisesRegex(KeyError, r"plugin not found: missing-plugin"):
list(
module.select_plugins(
plugin_index,
requested_names=["known-plugin", "missing-plugin"],
limit=None,
)
)
```
These tests assume:
1. `select_plugins` takes parameters `(plugin_index, requested_names=None, limit=None)`.
2. `select_plugins` returns an iterable of plugin objects (i.e., `plugin_index` values) in the index's iteration order.
3. When a requested plugin name is missing, `select_plugins` raises `KeyError` with a message containing `plugin not found: <name>`.
If `select_plugins` instead yields `(name, plugin)` tuples or plugin names, update the `selected` expectations accordingly:
- For `(name, plugin)` tuples, compare `[name for name, _ in selected]` to `list(plugin_index.keys())` (and slice for the limit case).
- For plugin names, construct `plugin_index` keys as the expected list and compare `selected` directly to those keys.
</issue_to_address>
### Comment 7
<location path="tests/test_validate_plugins.py" line_range="148" />
<code_context>
+ )
+
+
+class ReportBuilderTests(unittest.TestCase):
+ def test_build_report_counts_passed_and_failed_results(self):
+ module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding unit tests for other pure helpers like `combine_requested_names` and `sanitize_name`.
Targeted tests for these helpers would increase confidence in the CLI-to-filesystem mapping logic:
- `combine_requested_names`: ensure it merges `plugin_names` and `plugin_name_list`, trims whitespace, and ignores empty values.
- `sanitize_name`: ensure it replaces invalid chars with `-`, trims leading/trailing `-`, and returns the fallback `"plugin"` for inputs that are all invalid or whitespace.
Since these functions are pure, they should be straightforward and fast to test.
Suggested implementation:
```python
astrbot_root=Path("/tmp/astrbot-root"),
astrbot_path=Path("/tmp/AstrBot"),
)
self.assertEqual(
[Path(item) for item in sys_path_entries],
[Path("/tmp/astrbot-root").resolve(), Path("/tmp/AstrBot").resolve()],
)
self.assertEqual(
module.normalize_repo_url(
"https://github.com/example/demo-plugin.git/?tab=readme-ov-file"
),
"https://github.com/example/demo-plugin",
)
def test_rejects_non_github_urls(self):
module = load_validator_module()
class HelperFunctionTests(unittest.TestCase):
def test_combine_requested_names_merges_and_trims(self):
module = load_validator_module()
combined = module.combine_requested_names(
plugin_names=["foo", " bar ", "", "baz"],
plugin_name_list=["qux", " ", "foo "],
)
# Expect whitespace to be trimmed and empty/whitespace-only entries dropped,
# preserving order across both inputs.
self.assertEqual(combined, ["foo", "bar", "baz", "qux", "foo"])
def test_combine_requested_names_handles_none_inputs(self):
module = load_validator_module()
combined = module.combine_requested_names(
plugin_names=None,
plugin_name_list=None,
)
# With no names provided, we should get an empty list (not None).
self.assertEqual(combined, [])
def test_sanitize_name_replaces_invalid_chars_and_trims_hyphens(self):
module = load_validator_module()
sanitized = module.sanitize_name(" -invalid name!*?- ")
# - spaces and invalid chars should become '-'
# - leading/trailing '-' should be trimmed
# The expected result here assumes the implementation normalizes to:
self.assertEqual(sanitized, "invalid-name")
def test_sanitize_name_valid_name_unchanged(self):
module = load_validator_module()
self.assertEqual(module.sanitize_name("valid-name_123"), "valid-name_123")
def test_sanitize_name_all_invalid_or_whitespace_falls_back_to_plugin(self):
module = load_validator_module()
self.assertEqual(module.sanitize_name(" "), "plugin")
self.assertEqual(module.sanitize_name("!!!"), "plugin")
self.assertEqual(module.sanitize_name("---"), "plugin")
```
The new tests assume the following behavior in the implementation:
1. `combine_requested_names(plugin_names, plugin_name_list)`:
- Accepts `None` for either argument and treats it as empty.
- Strips leading/trailing whitespace from each name.
- Ignores empty or whitespace-only names.
- Returns a `list[str]` in the order: all (cleaned) `plugin_names` followed by all (cleaned) `plugin_name_list`.
2. `sanitize_name(name)`:
- Replaces characters that are not allowed in filenames/plugin identifiers with `'-'`.
- Trims leading and trailing `'-'` after replacement.
- Returns `"plugin"` if the resulting name is empty or consists entirely of invalid characters/whitespace.
If the actual implementations differ (e.g., they deduplicate names, use a different allowed-character set, or a different fallback than `"plugin"`), update the expected values in these tests to match the real behavior (or adjust the implementation to meet the behavior described in your original review comment).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| return module | ||
|
|
||
|
|
||
| class NormalizeRepoUrlTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Add more edge-case coverage for normalize_repo_url error handling.
Current NormalizeRepoUrlTests only cover a happy-path GitHub URL and a non-GitHub host. Please add tests that assert:
- Non-HTTP schemes (e.g.
git://github.com/...,ssh://github.com/...) raiseValueError. - Missing owner and/or repo in the path (e.g.
https://github.com/,https://github.com/owner,https://github.com/owner/) trigger the "repo URL must include owner and repository" branch. - Empty segments or redundant slashes (e.g.
https://github.com//repo,https://github.com/owner//). - Leading/trailing whitespace is stripped as expected.
This will exercise all branches of normalize_repo_url and better guard against malformed URLs.
| module.normalize_repo_url("https://gitlab.com/example/demo-plugin") | ||
|
|
||
|
|
||
| class SelectPluginsTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Extend select_plugins tests to cover limits and missing plugin names.
Please add tests for:
limithandling whenrequested_namesisNoneandlimitis smaller than the total plugins (e.g., 3 plugins,limit=1), asserting only the first N are returned in order.- The case where
requested_namesincludes a name not in the index, asserting aKeyError("plugin not found: ...")is raised.
These will fully specify the selection behavior driven by CLI arguments.
Suggested implementation:
import unittest
from collections import OrderedDictclass SelectPluginsTests(unittest.TestCase):
def test_returns_all_plugins_when_limit_is_none(self):
module = load_validator_module()
plugin_index = OrderedDict(
[
("plugin-one", object()),
("plugin-two", object()),
("plugin-three", object()),
]
)
selected = list(
module.select_plugins(
plugin_index,
requested_names=None,
limit=None,
)
)
# When limit is None and no names are requested, all plugins are returned in index order.
self.assertEqual(
selected,
list(plugin_index.values()),
)
def test_respects_limit_when_requested_names_is_none(self):
module = load_validator_module()
plugin_index = OrderedDict(
[
("plugin-one", object()),
("plugin-two", object()),
("plugin-three", object()),
]
)
selected = list(
module.select_plugins(
plugin_index,
requested_names=None,
limit=1,
)
)
# When limit is smaller than the total number of plugins, only the first N are returned in order.
self.assertEqual(
selected,
list(plugin_index.values())[:1],
)
def test_raises_key_error_for_unknown_requested_plugin(self):
module = load_validator_module()
plugin_index = {
"known-plugin": object(),
}
with self.assertRaisesRegex(KeyError, r"plugin not found: missing-plugin"):
list(
module.select_plugins(
plugin_index,
requested_names=["known-plugin", "missing-plugin"],
limit=None,
)
)These tests assume:
select_pluginstakes parameters(plugin_index, requested_names=None, limit=None).select_pluginsreturns an iterable of plugin objects (i.e.,plugin_indexvalues) in the index's iteration order.- When a requested plugin name is missing,
select_pluginsraisesKeyErrorwith a message containingplugin not found: <name>.
If select_plugins instead yields (name, plugin) tuples or plugin names, update the selected expectations accordingly:
- For
(name, plugin)tuples, compare[name for name, _ in selected]tolist(plugin_index.keys())(and slice for the limit case). - For plugin names, construct
plugin_indexkeys as the expected list and compareselecteddirectly to those keys.
| ) | ||
|
|
||
|
|
||
| class ReportBuilderTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Consider adding unit tests for other pure helpers like combine_requested_names and sanitize_name.
Targeted tests for these helpers would increase confidence in the CLI-to-filesystem mapping logic:
combine_requested_names: ensure it mergesplugin_namesandplugin_name_list, trims whitespace, and ignores empty values.sanitize_name: ensure it replaces invalid chars with-, trims leading/trailing-, and returns the fallback"plugin"for inputs that are all invalid or whitespace.
Since these functions are pure, they should be straightforward and fast to test.
Suggested implementation:
astrbot_root=Path("/tmp/astrbot-root"),
astrbot_path=Path("/tmp/AstrBot"),
)
self.assertEqual(
[Path(item) for item in sys_path_entries],
[Path("/tmp/astrbot-root").resolve(), Path("/tmp/AstrBot").resolve()],
)
self.assertEqual(
module.normalize_repo_url(
"https://github.com/example/demo-plugin.git/?tab=readme-ov-file"
),
"https://github.com/example/demo-plugin",
)
def test_rejects_non_github_urls(self):
module = load_validator_module()
class HelperFunctionTests(unittest.TestCase):
def test_combine_requested_names_merges_and_trims(self):
module = load_validator_module()
combined = module.combine_requested_names(
plugin_names=["foo", " bar ", "", "baz"],
plugin_name_list=["qux", " ", "foo "],
)
# Expect whitespace to be trimmed and empty/whitespace-only entries dropped,
# preserving order across both inputs.
self.assertEqual(combined, ["foo", "bar", "baz", "qux", "foo"])
def test_combine_requested_names_handles_none_inputs(self):
module = load_validator_module()
combined = module.combine_requested_names(
plugin_names=None,
plugin_name_list=None,
)
# With no names provided, we should get an empty list (not None).
self.assertEqual(combined, [])
def test_sanitize_name_replaces_invalid_chars_and_trims_hyphens(self):
module = load_validator_module()
sanitized = module.sanitize_name(" -invalid name!*?- ")
# - spaces and invalid chars should become '-'
# - leading/trailing '-' should be trimmed
# The expected result here assumes the implementation normalizes to:
self.assertEqual(sanitized, "invalid-name")
def test_sanitize_name_valid_name_unchanged(self):
module = load_validator_module()
self.assertEqual(module.sanitize_name("valid-name_123"), "valid-name_123")
def test_sanitize_name_all_invalid_or_whitespace_falls_back_to_plugin(self):
module = load_validator_module()
self.assertEqual(module.sanitize_name(" "), "plugin")
self.assertEqual(module.sanitize_name("!!!"), "plugin")
self.assertEqual(module.sanitize_name("---"), "plugin")The new tests assume the following behavior in the implementation:
-
combine_requested_names(plugin_names, plugin_name_list):- Accepts
Nonefor either argument and treats it as empty. - Strips leading/trailing whitespace from each name.
- Ignores empty or whitespace-only names.
- Returns a
list[str]in the order: all (cleaned)plugin_namesfollowed by all (cleaned)plugin_name_list.
- Accepts
-
sanitize_name(name):- Replaces characters that are not allowed in filenames/plugin identifiers with
'-'. - Trims leading and trailing
'-'after replacement. - Returns
"plugin"if the resulting name is empty or consists entirely of invalid characters/whitespace.
- Replaces characters that are not allowed in filenames/plugin identifiers with
If the actual implementations differ (e.g., they deduplicate names, use a different allowed-character set, or a different fallback than "plugin"), update the expected values in these tests to match the real behavior (or adjust the implementation to meet the behavior described in your original review comment).
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="tests/test_validate_plugins.py" line_range="154" />
<code_context>
+ )
+
+
+class HelperFunctionTests(unittest.TestCase):
+ def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
+ module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for YAML loading and plugins index parsing to ensure robustness of input handling
These tests omit `load_metadata` / `_parse_simple_yaml` and `load_plugins_index`, which are key for marketplace input handling. Please also add tests that:
- Cover `_parse_simple_yaml` with comments, quoted values, and extra whitespace when `yaml` is unavailable;
- Cover `load_metadata` with both code paths: `yaml` available (mock `yaml.safe_load`) and `yaml` is `None` (fallback parser);
- Cover `load_plugins_index` with: (a) a valid object, (b) a JSON array that should raise `ValueError`, and (c) entries with non-dict values or non-string keys that should be filtered out.
This will better protect against regressions in index/metadata formats.
Suggested implementation:
```python
class HelperFunctionTests(unittest.TestCase):
def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
module = load_validator_module()
# This should exercise the existing helper that merges requested names from
# multiple sources, trims whitespace, and drops empty values.
combined = module.combine_requested_names(
cli_names=[" plugin-a ", "", "plugin-b"],
env_names=["PLUGIN-C", " "],
config_names=["plugin-b", None, " "],
)
self.assertEqual(sorted(combined), ["PLUGIN-C", "plugin-a", "plugin-b"])
def test_parse_simple_yaml_handles_comments_quotes_and_whitespace_without_yaml(self):
module = load_validator_module()
# Force the module down the fallback path even if PyYAML is installed
yaml_backup = getattr(module, "yaml", None)
try:
module.yaml = None
text = """
# leading comment
key1: value1 # trailing comment
key2: " spaced value "
key3: 'another value'
key4: value-with-#-hash
"""
parsed = module._parse_simple_yaml(text)
# Fallback parser should ignore comments and whitespace, and preserve quoted values
self.assertEqual(parsed["key1"], "value1")
self.assertEqual(parsed["key2"], " spaced value ")
self.assertEqual(parsed["key3"], "another value")
self.assertEqual(parsed["key4"], "value-with-#-hash")
finally:
module.yaml = yaml_backup
def test_load_metadata_uses_yaml_safe_load_when_available(self):
module = load_validator_module()
from unittest import mock
import tempfile
import os
with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
# The actual contents are irrelevant because yaml.safe_load is mocked
f.write("name: should-be-overridden\n")
metadata_path = f.name
try:
fake_yaml = mock.Mock()
fake_yaml.safe_load.return_value = {"name": "from-yaml", "version": "1.0.0"}
with mock.patch.object(module, "yaml", fake_yaml):
metadata = module.load_metadata(metadata_path)
self.assertEqual(metadata, {"name": "from-yaml", "version": "1.0.0"})
fake_yaml.safe_load.assert_called_once()
finally:
os.remove(metadata_path)
def test_load_metadata_uses_simple_parser_when_yaml_unavailable(self):
module = load_validator_module()
import tempfile
import os
yaml_backup = getattr(module, "yaml", None)
try:
module.yaml = None
with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
f.write(
"""
name: demo-plugin
version: "0.2.3"
"""
)
metadata_path = f.name
metadata = module.load_metadata(metadata_path)
self.assertEqual(metadata.get("name"), "demo-plugin")
self.assertEqual(metadata.get("version"), "0.2.3")
finally:
module.yaml = yaml_backup
os.remove(metadata_path)
def test_load_plugins_index_accepts_valid_object(self):
module = load_validator_module()
import tempfile
import json
import os
index_obj = {
"good-plugin": {"name": "Good Plugin", "repo": "https://github.com/example/good"},
"another-plugin": {"name": "Another Plugin"},
}
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump(index_obj, f)
index_path = f.name
try:
plugins = module.load_plugins_index(index_path)
# Should round-trip the valid mapping
self.assertIn("good-plugin", plugins)
self.assertIn("another-plugin", plugins)
self.assertEqual(plugins["good-plugin"]["name"], "Good Plugin")
finally:
os.remove(index_path)
def test_load_plugins_index_rejects_json_array(self):
module = load_validator_module()
import tempfile
import json
import os
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump([{"name": "array-entry"}], f)
index_path = f.name
try:
with self.assertRaises(ValueError):
module.load_plugins_index(index_path)
finally:
os.remove(index_path)
def test_load_plugins_index_filters_invalid_entries(self):
module = load_validator_module()
import tempfile
import json
import os
# Include entries that should be filtered:
# - non-dict value
# - key that is not a string (if implementation checks this)
# - dict without required fields (e.g., missing "name")
index_obj = {
"valid-plugin": {"name": "Valid Plugin", "repo": "https://github.com/example/valid"},
"not-a-dict": "just-a-string",
"missing-name": {"repo": "https://github.com/example/missing-name"},
}
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump(index_obj, f)
index_path = f.name
try:
plugins = module.load_plugins_index(index_path)
# Only the fully valid entry should be present
self.assertEqual(set(plugins.keys()), {"valid-plugin"})
self.assertEqual(plugins["valid-plugin"]["name"], "Valid Plugin")
finally:
os.remove(index_path)
```
1. Ensure the `HelperFunctionTests.test_combine_requested_names_merges_trims_and_drops_empty_values` method still contains any existing assertions you rely on; if there were prior expectations different from the ones added here, merge them rather than overwrite.
2. At the top of `tests/test_validate_plugins.py`, make sure the following imports are present and not duplicated:
- `import unittest`
- `import json`
- `import os`
- `import tempfile`
- `from unittest import mock`
3. These tests assume:
- `combine_requested_names(cli_names, env_names, config_names)` exists and returns an iterable of names.
- `_parse_simple_yaml(text: str)` returns a `dict`-like object.
- `load_metadata(path: str | Path)` reads the file and uses `yaml.safe_load` when `module.yaml` is truthy, otherwise falls back to `_parse_simple_yaml`.
- `load_plugins_index(path: str | Path)` reads JSON from the given path, raises `ValueError` when the JSON root is a list, and returns a mapping of plugin IDs to dict metadata while filtering out non-dict or otherwise-invalid entries.
If the actual signatures or behaviors differ, adjust the test calls and assertions accordingly (for example, if `load_plugins_index` accepts a file object instead of a path, or if it uses a different validation rule for entries).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| ) | ||
|
|
||
|
|
||
| class HelperFunctionTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Add tests for YAML loading and plugins index parsing to ensure robustness of input handling
These tests omit load_metadata / _parse_simple_yaml and load_plugins_index, which are key for marketplace input handling. Please also add tests that:
- Cover
_parse_simple_yamlwith comments, quoted values, and extra whitespace whenyamlis unavailable; - Cover
load_metadatawith both code paths:yamlavailable (mockyaml.safe_load) andyamlisNone(fallback parser); - Cover
load_plugins_indexwith: (a) a valid object, (b) a JSON array that should raiseValueError, and (c) entries with non-dict values or non-string keys that should be filtered out.
This will better protect against regressions in index/metadata formats.
Suggested implementation:
class HelperFunctionTests(unittest.TestCase):
def test_combine_requested_names_merges_trims_and_drops_empty_values(self):
module = load_validator_module()
# This should exercise the existing helper that merges requested names from
# multiple sources, trims whitespace, and drops empty values.
combined = module.combine_requested_names(
cli_names=[" plugin-a ", "", "plugin-b"],
env_names=["PLUGIN-C", " "],
config_names=["plugin-b", None, " "],
)
self.assertEqual(sorted(combined), ["PLUGIN-C", "plugin-a", "plugin-b"])
def test_parse_simple_yaml_handles_comments_quotes_and_whitespace_without_yaml(self):
module = load_validator_module()
# Force the module down the fallback path even if PyYAML is installed
yaml_backup = getattr(module, "yaml", None)
try:
module.yaml = None
text = """
# leading comment
key1: value1 # trailing comment
key2: " spaced value "
key3: 'another value'
key4: value-with-#-hash
"""
parsed = module._parse_simple_yaml(text)
# Fallback parser should ignore comments and whitespace, and preserve quoted values
self.assertEqual(parsed["key1"], "value1")
self.assertEqual(parsed["key2"], " spaced value ")
self.assertEqual(parsed["key3"], "another value")
self.assertEqual(parsed["key4"], "value-with-#-hash")
finally:
module.yaml = yaml_backup
def test_load_metadata_uses_yaml_safe_load_when_available(self):
module = load_validator_module()
from unittest import mock
import tempfile
import os
with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
# The actual contents are irrelevant because yaml.safe_load is mocked
f.write("name: should-be-overridden\n")
metadata_path = f.name
try:
fake_yaml = mock.Mock()
fake_yaml.safe_load.return_value = {"name": "from-yaml", "version": "1.0.0"}
with mock.patch.object(module, "yaml", fake_yaml):
metadata = module.load_metadata(metadata_path)
self.assertEqual(metadata, {"name": "from-yaml", "version": "1.0.0"})
fake_yaml.safe_load.assert_called_once()
finally:
os.remove(metadata_path)
def test_load_metadata_uses_simple_parser_when_yaml_unavailable(self):
module = load_validator_module()
import tempfile
import os
yaml_backup = getattr(module, "yaml", None)
try:
module.yaml = None
with tempfile.NamedTemporaryFile("w", suffix=".yml", delete=False) as f:
f.write(
"""
name: demo-plugin
version: "0.2.3"
"""
)
metadata_path = f.name
metadata = module.load_metadata(metadata_path)
self.assertEqual(metadata.get("name"), "demo-plugin")
self.assertEqual(metadata.get("version"), "0.2.3")
finally:
module.yaml = yaml_backup
os.remove(metadata_path)
def test_load_plugins_index_accepts_valid_object(self):
module = load_validator_module()
import tempfile
import json
import os
index_obj = {
"good-plugin": {"name": "Good Plugin", "repo": "https://github.com/example/good"},
"another-plugin": {"name": "Another Plugin"},
}
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump(index_obj, f)
index_path = f.name
try:
plugins = module.load_plugins_index(index_path)
# Should round-trip the valid mapping
self.assertIn("good-plugin", plugins)
self.assertIn("another-plugin", plugins)
self.assertEqual(plugins["good-plugin"]["name"], "Good Plugin")
finally:
os.remove(index_path)
def test_load_plugins_index_rejects_json_array(self):
module = load_validator_module()
import tempfile
import json
import os
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump([{"name": "array-entry"}], f)
index_path = f.name
try:
with self.assertRaises(ValueError):
module.load_plugins_index(index_path)
finally:
os.remove(index_path)
def test_load_plugins_index_filters_invalid_entries(self):
module = load_validator_module()
import tempfile
import json
import os
# Include entries that should be filtered:
# - non-dict value
# - key that is not a string (if implementation checks this)
# - dict without required fields (e.g., missing "name")
index_obj = {
"valid-plugin": {"name": "Valid Plugin", "repo": "https://github.com/example/valid"},
"not-a-dict": "just-a-string",
"missing-name": {"repo": "https://github.com/example/missing-name"},
}
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f:
json.dump(index_obj, f)
index_path = f.name
try:
plugins = module.load_plugins_index(index_path)
# Only the fully valid entry should be present
self.assertEqual(set(plugins.keys()), {"valid-plugin"})
self.assertEqual(plugins["valid-plugin"]["name"], "Valid Plugin")
finally:
os.remove(index_path)- Ensure the
HelperFunctionTests.test_combine_requested_names_merges_trims_and_drops_empty_valuesmethod still contains any existing assertions you rely on; if there were prior expectations different from the ones added here, merge them rather than overwrite. - At the top of
tests/test_validate_plugins.py, make sure the following imports are present and not duplicated:import unittestimport jsonimport osimport tempfilefrom unittest import mock
- These tests assume:
combine_requested_names(cli_names, env_names, config_names)exists and returns an iterable of names._parse_simple_yaml(text: str)returns adict-like object.load_metadata(path: str | Path)reads the file and usesyaml.safe_loadwhenmodule.yamlis truthy, otherwise falls back to_parse_simple_yaml.load_plugins_index(path: str | Path)reads JSON from the given path, raisesValueErrorwhen the JSON root is a list, and returns a mapping of plugin IDs to dict metadata while filtering out non-dict or otherwise-invalid entries.
If the actual signatures or behaviors differ, adjust the test calls and assertions accordingly (for example, ifload_plugins_indexaccepts a file object instead of a path, or if it uses a different validation rule for entries).
SourceryAI
left a comment
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
load_plugins_index, non-dict values inplugins.jsonare silently dropped; consider failing fast with a clear error so malformed entries don’t disappear unnoticed from validation. - The validator script is quite large and mixes CLI parsing, filesystem/git operations, and worker logic in a single file; consider extracting helpers (e.g., clone/selection/reporting utilities or worker-only code) into separate modules to keep concerns isolated and make future changes easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `load_plugins_index`, non-dict values in `plugins.json` are silently dropped; consider failing fast with a clear error so malformed entries don’t disappear unnoticed from validation.
- The validator script is quite large and mixes CLI parsing, filesystem/git operations, and worker logic in a single file; consider extracting helpers (e.g., clone/selection/reporting utilities or worker-only code) into separate modules to keep concerns isolated and make future changes easier.
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="195-203" />
<code_context>
+ }
+
+
+def load_plugins_index(path: Path) -> dict[str, dict]:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ if not isinstance(data, dict):
+ raise ValueError("plugins.json must contain a JSON object")
+ result = {}
+ for key, value in data.items():
+ if isinstance(key, str) and isinstance(value, dict):
+ result[key] = value
+ return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Silently dropping non-dict entries in plugins.json could hide configuration mistakes.
In `load_plugins_index`, entries with non-string keys or non-dict values are silently skipped. This can hide misconfigurations in `plugins.json` and make it hard to understand why a plugin never shows up. Consider failing fast (e.g., raising an error) or at least logging these invalid entries so issues in the config are visible.
```suggestion
def load_plugins_index(path: Path) -> dict[str, dict]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("plugins.json must contain a JSON object")
for key, value in data.items():
if not isinstance(key, str):
raise ValueError(
f"plugins.json keys must be strings, got {type(key).__name__!r}: {key!r}"
)
if not isinstance(value, dict):
raise ValueError(
f"plugins.json values must be objects/dicts; key {key!r} has {type(value).__name__!r}"
)
return data
```
</issue_to_address>
### Comment 2
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="56-65" />
<code_context>
+
+ validation_note = ""
+
+ try:
+ base_raw = subprocess.check_output(
+ ["git", "show", f"origin/{base_ref}:plugins.json"],
+ text=True,
+ stderr=subprocess.DEVNULL,
+ )
+ base = json.loads(base_raw)
+ if not isinstance(base, dict):
+ base = {}
+ except (subprocess.CalledProcessError, json.JSONDecodeError):
+ base = {}
+
+ try:
+ head = json.loads(Path("plugins.json").read_text(encoding="utf-8"))
+ if not isinstance(head, dict):
+ raise ValueError("plugins.json must contain a JSON object")
+ except (json.JSONDecodeError, ValueError) as exc:
+ changed = []
+ should_validate = False
+ validation_note = f"Skipping smoke validation because plugins.json is invalid on the PR head: {exc}"
+ else:
+ changed = [
</code_context>
<issue_to_address>
**issue (bug_risk):** Invalid plugins.json on the PR head causes smoke validation to be skipped rather than failing the workflow.
When `plugins.json` on the PR head is invalid (e.g., merge markers or bad JSON), this code sets `should_validate = False` and only records a note, so the job still succeeds and a broken `plugins.json` can land. Consider treating this as a hard failure (e.g., `sys.exit(1)` or re-raising after emitting a clear error) so that invalid configs fail the workflow instead of silently skipping validation.
</issue_to_address>
### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="155" />
<code_context>
+ }
+
+
+def build_worker_command(
+ *,
+ script_path: Path,
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the single-use worker helpers and collapsing the tiny process-output helper into one function to make the validation script easier to follow sequentially.
You can trim some indirection around the worker helpers without changing behavior, which should make the script easier to read top‑to‑bottom.
### 1. Inline `build_worker_command`
`build_worker_command` is a single-use, linear argument list. Inlining it in `validate_plugin` removes a jump without hurting clarity:
```python
# remove this helper entirely
# def build_worker_command(...): ...
def validate_plugin(...):
...
plugin_dir_name = precheck["plugin_dir_name"]
command = [
sys.executable,
str(script_path),
"--worker",
"--astrbot-path", str(astrbot_path),
"--plugin-source-dir", str(plugin_clone_dir),
"--plugin-dir-name", plugin_dir_name,
"--normalized-repo-url", normalized_repo_url,
]
try:
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
timeout=load_timeout,
)
...
```
### 2. Inline `build_worker_sys_path`
This is also a trivial, single-use helper:
```python
# remove this helper entirely
# def build_worker_sys_path(*, astrbot_root: Path, astrbot_path: Path) -> list[str]:
# return [str(astrbot_root.resolve()), str(astrbot_path.resolve())]
def run_worker(args: argparse.Namespace) -> int:
...
os.environ["ASTRBOT_ROOT"] = str(astrbot_root)
os.environ.setdefault("TESTING", "true")
sys.path[:0] = [
str(astrbot_root.resolve()),
str(Path(args.astrbot_path).resolve()),
]
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
...
```
### 3. Simplify process output helpers
You can keep the behavior but reduce the surface area by folding `_normalize_process_output` into `build_process_output_details`, since it’s only used there:
```python
# keep a single helper
def build_process_output_details(
*, stdout: str | bytes | None, stderr: str | bytes | None
) -> dict | None:
def _normalize(output: str | bytes | None) -> str | None:
if output is None:
return None
if isinstance(output, bytes):
output = output.decode("utf-8", errors="replace")
output = output.strip()
return output or None
details = {}
stdout_text = _normalize(stdout)
stderr_text = _normalize(stderr)
if stdout_text:
details["stdout"] = stdout_text
if stderr_text:
details["stderr"] = stderr_text
return details or None
```
These changes keep all existing behavior but reduce the number of tiny helpers the reader has to chase when following `validate_plugin`/`run_worker`.
</issue_to_address>Hi @zouyonghe! 👋
Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀
Install the sourcery-ai bot to get automatic code reviews on every pull request ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.| } | ||
|
|
||
|
|
||
| def build_worker_command( |
There was a problem hiding this comment.
issue (complexity): Consider inlining the single-use worker helpers and collapsing the tiny process-output helper into one function to make the validation script easier to follow sequentially.
You can trim some indirection around the worker helpers without changing behavior, which should make the script easier to read top‑to‑bottom.
1. Inline build_worker_command
build_worker_command is a single-use, linear argument list. Inlining it in validate_plugin removes a jump without hurting clarity:
# remove this helper entirely
# def build_worker_command(...): ...
def validate_plugin(...):
...
plugin_dir_name = precheck["plugin_dir_name"]
command = [
sys.executable,
str(script_path),
"--worker",
"--astrbot-path", str(astrbot_path),
"--plugin-source-dir", str(plugin_clone_dir),
"--plugin-dir-name", plugin_dir_name,
"--normalized-repo-url", normalized_repo_url,
]
try:
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
timeout=load_timeout,
)
...2. Inline build_worker_sys_path
This is also a trivial, single-use helper:
# remove this helper entirely
# def build_worker_sys_path(*, astrbot_root: Path, astrbot_path: Path) -> list[str]:
# return [str(astrbot_root.resolve()), str(astrbot_path.resolve())]
def run_worker(args: argparse.Namespace) -> int:
...
os.environ["ASTRBOT_ROOT"] = str(astrbot_root)
os.environ.setdefault("TESTING", "true")
sys.path[:0] = [
str(astrbot_root.resolve()),
str(Path(args.astrbot_path).resolve()),
]
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
...3. Simplify process output helpers
You can keep the behavior but reduce the surface area by folding _normalize_process_output into build_process_output_details, since it’s only used there:
# keep a single helper
def build_process_output_details(
*, stdout: str | bytes | None, stderr: str | bytes | None
) -> dict | None:
def _normalize(output: str | bytes | None) -> str | None:
if output is None:
return None
if isinstance(output, bytes):
output = output.decode("utf-8", errors="replace")
output = output.strip()
return output or None
details = {}
stdout_text = _normalize(stdout)
stderr_text = _normalize(stderr)
if stdout_text:
details["stdout"] = stdout_text
if stderr_text:
details["stderr"] = stderr_text
return details or NoneThese changes keep all existing behavior but reduce the number of tiny helpers the reader has to chase when following validate_plugin/run_worker.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In the PR workflow,
ASTRBOT_REFis hardcoded tomasterfor pull_request runs; consider deriving this from the repository’s default branch or making it configurable (similar toworkflow_dispatch) to avoid breakage if the default branch name changes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In the PR workflow, `ASTRBOT_REF` is hardcoded to `master` for pull_request runs; consider deriving this from the repository’s default branch or making it configurable (similar to `workflow_dispatch`) to avoid breakage if the default branch name changes.
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="132-139" />
<code_context>
+ return result
+
+
+def load_metadata(path: Path) -> dict:
+ text = path.read_text(encoding="utf-8")
+ if any(marker in text for marker in CONFLICT_MARKERS):
+ raise MetadataLoadError(
+ "could not find expected ':' (merge conflict markers found in metadata.yaml)"
+ )
+
+ if yaml is not None:
+ try:
+ loaded = yaml.safe_load(text)
+ except yaml.YAMLError as exc:
+ raise MetadataLoadError(str(exc)) from exc
+ if isinstance(loaded, dict):
+ return loaded
+ return {}
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Returning an empty dict when YAML root is not a mapping hides malformed metadata
In the YAML branch of `load_metadata`, a non-dict result from `safe_load` (e.g., list or scalar) is treated as `{}`, so structurally invalid `metadata.yaml` is indistinguishable from genuinely empty metadata and only surfaces as missing fields. Consider raising `MetadataLoadError` when `loaded` is not a dict so callers can distinguish malformed structure from empty content.
```suggestion
if yaml is not None:
try:
loaded = yaml.safe_load(text)
except yaml.YAMLError as exc:
raise MetadataLoadError(str(exc)) from exc
if loaded is None:
# Treat an entirely empty metadata.yaml as empty metadata
return {}
if not isinstance(loaded, dict):
raise MetadataLoadError(
"metadata.yaml must contain a mapping at the top level"
)
return loaded
```
</issue_to_address>
### Comment 2
<location path=".github/workflows/validate-plugin-smoke.yml" line_range="89-91" />
<code_context>
+ if not should_validate:
+ validation_note = "No plugin entries changed in plugins.json; skipping smoke validation."
+
+ with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as handle:
+ handle.write("ASTRBOT_REF=master\n")
+ handle.write(f"PLUGIN_NAME_LIST={','.join(changed)}\n")
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Hardcoding ASTRBOT_REF to master for PRs may be misaligned with the target branch
In the PR path, `ASTRBOT_REF` is always set to `master`, ignoring `GITHUB_BASE_REF`. If the default branch changes (e.g., to `main`) or PRs target another long‑lived branch, smoke validation will run against the wrong branch. Consider deriving `ASTRBOT_REF` from `GITHUB_BASE_REF` (as with `origin/{base_ref}` above) so validation tracks the actual PR target.
```suggestion
with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as handle:
astrbot_ref = os.environ.get("GITHUB_BASE_REF", "master")
handle.write(f"ASTRBOT_REF={astrbot_ref}\n")
handle.write(f"PLUGIN_NAME_LIST={','.join(changed)}\n")
```
</issue_to_address>
### Comment 3
<location path="tests/test_validate_plugins.py" line_range="320" />
<code_context>
+ os.remove(index_path)
+
+
+class ValidationProgressTests(unittest.TestCase):
+ def test_validate_selected_plugins_emits_progress_and_result_lines(self):
+ module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add focused unit tests for `validate_plugin` to cover repo/clone/precheck/worker error paths
`validate_selected_plugins` is only tested via a mocked `validate_plugin`, so the core validation logic isn’t directly verified. Please add a focused test class (e.g. `ValidatePluginTests`) that mocks `clone_plugin_repo`, `precheck_plugin_directory`, and `subprocess.run` to cover:
- Missing `repo` field → stage `repo_url`, `ok=False`.
- Invalid repo URL from `normalize_repo_url` → stage `repo_url`, message propagated.
- `clone_plugin_repo` raising `CalledProcessError` → stage `clone`, stderr/stdout fallback.
- `clone_plugin_repo` raising `TimeoutExpired` → stage `clone_timeout`, uses `build_process_output_details`.
- Precheck failure (e.g. `stage="metadata"`) → precheck results mapped into `build_result`.
- Successful clone + precheck → worker subprocess invoked, mocked `subprocess.run` output passed to `parse_worker_output`.
This will exercise the full validator flow without relying on real Git or AstrBot dependencies.
Suggested implementation:
```python
import os
import subprocess
from unittest import mock
```
```python
class ValidationProgressTests(unittest.TestCase):
def test_validate_selected_plugins_emits_progress_and_result_lines(self):
module = load_validator_module()
class ValidatePluginTests(unittest.TestCase):
"""Focused tests for validate_plugin covering repo/clone/precheck/worker paths."""
def setUp(self):
# All tests use the same validator module instance
self.module = load_validator_module()
# Minimal, valid plugin definition used as a base in most tests
self.base_plugin = {
"name": "demo-plugin",
"repo": "https://github.com/example/demo-plugin",
}
# Default validator settings; adjust to match the real signature/shape
self.default_settings = {}
def _call_validate_plugin(self, plugin_override=None, **kwargs):
"""Helper to invoke validate_plugin with overridable plugin/settings.
Adjust this helper to match the actual validate_plugin signature.
"""
plugin = dict(self.base_plugin)
if plugin_override:
plugin.update(plugin_override)
# The exact argument list may differ in your implementation.
# Prefer keyword args where possible to make the tests resilient to changes.
return self.module.validate_plugin(
plugin=plugin,
settings=self.default_settings,
**kwargs,
)
def test_missing_repo_field_sets_repo_url_stage_and_fails(self):
# Remove repo field from plugin
plugin = dict(self.base_plugin)
plugin.pop("repo", None)
result = self.module.validate_plugin(
plugin=plugin,
settings=self.default_settings,
)
self.assertIsInstance(result, dict)
self.assertEqual(result.get("stage"), "repo_url")
self.assertFalse(result.get("ok"))
# Message should indicate missing repo information in some way
self.assertIsInstance(result.get("message"), str)
self.assertTrue(result["message"])
def test_invalid_repo_url_from_normalize_repo_url_sets_repo_url_stage_and_propagates_message(
self,
):
error_message = "invalid repo URL"
with mock.patch.object(
self.module, "normalize_repo_url", side_effect=ValueError(error_message)
):
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "repo_url")
self.assertFalse(result.get("ok"))
self.assertEqual(result.get("message"), error_message)
def test_clone_plugin_repo_called_process_error_sets_clone_stage_and_uses_stderr_or_stdout(
self,
):
# Simulate clone failure with CalledProcessError
cpe = subprocess.CalledProcessError(
returncode=1,
cmd=["git", "clone"],
output="clone stdout",
stderr="clone stderr",
)
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", side_effect=cpe
):
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "clone")
self.assertFalse(result.get("ok"))
# Implementation should prefer stderr, but fall back to stdout if needed
self.assertIn("clone stderr", result.get("message", ""))
def test_clone_plugin_repo_timeout_expired_sets_clone_timeout_stage_and_uses_build_process_output_details(
self,
):
timeout_exc = subprocess.TimeoutExpired(cmd=["git", "clone"], timeout=30)
fake_details = {"stdout": "clone timeout stdout", "stderr": "clone timeout stderr"}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", side_effect=timeout_exc
), mock.patch.object(
self.module,
"build_process_output_details",
return_value=fake_details,
) as details_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "clone_timeout")
self.assertFalse(result.get("ok"))
# Ensure helper was used to build the details
details_mock.assert_called_once()
self.assertEqual(result.get("details"), fake_details)
def test_precheck_failure_is_mapped_into_build_result(self):
# Clone succeeds and returns a repo path; precheck fails
fake_repo_path = "/tmp/demo-plugin-repo"
precheck_result = {
"ok": False,
"stage": "metadata",
"message": "invalid metadata",
}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", return_value=fake_repo_path
) as clone_mock, mock.patch.object(
self.module,
"precheck_plugin_directory",
return_value=precheck_result,
) as precheck_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
clone_mock.assert_called_once()
precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
# validate_plugin should propagate/translate precheck failure into its own result
self.assertFalse(result.get("ok"))
self.assertEqual(result.get("stage"), "metadata")
self.assertIn("invalid metadata", result.get("message", ""))
def test_successful_clone_and_precheck_invokes_worker_and_passes_output_to_parse_worker_output(
self,
):
fake_repo_path = "/tmp/demo-plugin-repo"
worker_stdout = "worker stdout json or log"
worker_stderr = ""
completed = subprocess.CompletedProcess(
args=["astrbot", "validate"],
returncode=0,
stdout=worker_stdout,
stderr=worker_stderr,
)
parsed_output = {
"ok": True,
"stage": "worker",
"message": "validation successful",
}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", return_value=fake_repo_path
) as clone_mock, mock.patch.object(
self.module,
"precheck_plugin_directory",
return_value={"ok": True},
) as precheck_mock, mock.patch.object(
subprocess, "run", return_value=completed
) as run_mock, mock.patch.object(
self.module,
"parse_worker_output",
return_value=parsed_output,
) as parse_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
clone_mock.assert_called_once()
precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
run_mock.assert_called_once()
parse_mock.assert_called_once_with(completed, self.base_plugin, self.default_settings)
self.assertEqual(result, parsed_output)
```
1. Adjust the import edit if `subprocess` and `mock` are already imported elsewhere in `tests/test_validate_plugins.py` to avoid duplicate imports.
2. The helper `_call_validate_plugin` assumes a signature `validate_plugin(plugin=..., settings=..., **kwargs)`. Update it to match the real `validate_plugin` function (parameter names such as `validator_settings`, `keep_repo`, `worker_timeout`, or `workdir`).
3. The tests assume `validate_plugin` returns a `dict` with at least `ok`, `stage`, `message`, and (for the timeout case) `details`. If the actual shape differs, adjust the asserted keys accordingly.
4. The `precheck_plugin_directory` expectations (`precheck_mock.assert_called_once_with(...)`) assume the function receives `(repo_path, plugin, settings)`; update to match the real signature if needed.
5. The worker invocation test assumes:
- `subprocess.run` is the function invoked by `validate_plugin` to execute the worker.
- `parse_worker_output(completed_process, plugin, settings)` is used to turn the subprocess result into a validation result.
If your implementation uses different arguments or a different helper, adjust the corresponding mocks and `assert_called_once_with` checks.
6. Place the `ValidatePluginTests` class near the other validator tests; if `ValidationProgressTests` is in a different section, you may want to move this class next to existing `ValidatePlugin`-related tests for consistency.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| os.remove(index_path) | ||
|
|
||
|
|
||
| class ValidationProgressTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Add focused unit tests for validate_plugin to cover repo/clone/precheck/worker error paths
validate_selected_plugins is only tested via a mocked validate_plugin, so the core validation logic isn’t directly verified. Please add a focused test class (e.g. ValidatePluginTests) that mocks clone_plugin_repo, precheck_plugin_directory, and subprocess.run to cover:
- Missing
repofield → stagerepo_url,ok=False. - Invalid repo URL from
normalize_repo_url→ stagerepo_url, message propagated. clone_plugin_reporaisingCalledProcessError→ stageclone, stderr/stdout fallback.clone_plugin_reporaisingTimeoutExpired→ stageclone_timeout, usesbuild_process_output_details.- Precheck failure (e.g.
stage="metadata") → precheck results mapped intobuild_result. - Successful clone + precheck → worker subprocess invoked, mocked
subprocess.runoutput passed toparse_worker_output.
This will exercise the full validator flow without relying on real Git or AstrBot dependencies.
Suggested implementation:
import os
import subprocess
from unittest import mockclass ValidationProgressTests(unittest.TestCase):
def test_validate_selected_plugins_emits_progress_and_result_lines(self):
module = load_validator_module()
class ValidatePluginTests(unittest.TestCase):
"""Focused tests for validate_plugin covering repo/clone/precheck/worker paths."""
def setUp(self):
# All tests use the same validator module instance
self.module = load_validator_module()
# Minimal, valid plugin definition used as a base in most tests
self.base_plugin = {
"name": "demo-plugin",
"repo": "https://github.com/example/demo-plugin",
}
# Default validator settings; adjust to match the real signature/shape
self.default_settings = {}
def _call_validate_plugin(self, plugin_override=None, **kwargs):
"""Helper to invoke validate_plugin with overridable plugin/settings.
Adjust this helper to match the actual validate_plugin signature.
"""
plugin = dict(self.base_plugin)
if plugin_override:
plugin.update(plugin_override)
# The exact argument list may differ in your implementation.
# Prefer keyword args where possible to make the tests resilient to changes.
return self.module.validate_plugin(
plugin=plugin,
settings=self.default_settings,
**kwargs,
)
def test_missing_repo_field_sets_repo_url_stage_and_fails(self):
# Remove repo field from plugin
plugin = dict(self.base_plugin)
plugin.pop("repo", None)
result = self.module.validate_plugin(
plugin=plugin,
settings=self.default_settings,
)
self.assertIsInstance(result, dict)
self.assertEqual(result.get("stage"), "repo_url")
self.assertFalse(result.get("ok"))
# Message should indicate missing repo information in some way
self.assertIsInstance(result.get("message"), str)
self.assertTrue(result["message"])
def test_invalid_repo_url_from_normalize_repo_url_sets_repo_url_stage_and_propagates_message(
self,
):
error_message = "invalid repo URL"
with mock.patch.object(
self.module, "normalize_repo_url", side_effect=ValueError(error_message)
):
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "repo_url")
self.assertFalse(result.get("ok"))
self.assertEqual(result.get("message"), error_message)
def test_clone_plugin_repo_called_process_error_sets_clone_stage_and_uses_stderr_or_stdout(
self,
):
# Simulate clone failure with CalledProcessError
cpe = subprocess.CalledProcessError(
returncode=1,
cmd=["git", "clone"],
output="clone stdout",
stderr="clone stderr",
)
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", side_effect=cpe
):
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "clone")
self.assertFalse(result.get("ok"))
# Implementation should prefer stderr, but fall back to stdout if needed
self.assertIn("clone stderr", result.get("message", ""))
def test_clone_plugin_repo_timeout_expired_sets_clone_timeout_stage_and_uses_build_process_output_details(
self,
):
timeout_exc = subprocess.TimeoutExpired(cmd=["git", "clone"], timeout=30)
fake_details = {"stdout": "clone timeout stdout", "stderr": "clone timeout stderr"}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", side_effect=timeout_exc
), mock.patch.object(
self.module,
"build_process_output_details",
return_value=fake_details,
) as details_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
self.assertEqual(result.get("stage"), "clone_timeout")
self.assertFalse(result.get("ok"))
# Ensure helper was used to build the details
details_mock.assert_called_once()
self.assertEqual(result.get("details"), fake_details)
def test_precheck_failure_is_mapped_into_build_result(self):
# Clone succeeds and returns a repo path; precheck fails
fake_repo_path = "/tmp/demo-plugin-repo"
precheck_result = {
"ok": False,
"stage": "metadata",
"message": "invalid metadata",
}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", return_value=fake_repo_path
) as clone_mock, mock.patch.object(
self.module,
"precheck_plugin_directory",
return_value=precheck_result,
) as precheck_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
clone_mock.assert_called_once()
precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
# validate_plugin should propagate/translate precheck failure into its own result
self.assertFalse(result.get("ok"))
self.assertEqual(result.get("stage"), "metadata")
self.assertIn("invalid metadata", result.get("message", ""))
def test_successful_clone_and_precheck_invokes_worker_and_passes_output_to_parse_worker_output(
self,
):
fake_repo_path = "/tmp/demo-plugin-repo"
worker_stdout = "worker stdout json or log"
worker_stderr = ""
completed = subprocess.CompletedProcess(
args=["astrbot", "validate"],
returncode=0,
stdout=worker_stdout,
stderr=worker_stderr,
)
parsed_output = {
"ok": True,
"stage": "worker",
"message": "validation successful",
}
with mock.patch.object(self.module, "normalize_repo_url") as normalize_mock, mock.patch.object(
self.module, "clone_plugin_repo", return_value=fake_repo_path
) as clone_mock, mock.patch.object(
self.module,
"precheck_plugin_directory",
return_value={"ok": True},
) as precheck_mock, mock.patch.object(
subprocess, "run", return_value=completed
) as run_mock, mock.patch.object(
self.module,
"parse_worker_output",
return_value=parsed_output,
) as parse_mock:
normalize_mock.return_value = self.base_plugin["repo"]
result = self._call_validate_plugin()
clone_mock.assert_called_once()
precheck_mock.assert_called_once_with(fake_repo_path, self.base_plugin, self.default_settings)
run_mock.assert_called_once()
parse_mock.assert_called_once_with(completed, self.base_plugin, self.default_settings)
self.assertEqual(result, parsed_output)- Adjust the import edit if
subprocessandmockare already imported elsewhere intests/test_validate_plugins.pyto avoid duplicate imports. - The helper
_call_validate_pluginassumes a signaturevalidate_plugin(plugin=..., settings=..., **kwargs). Update it to match the realvalidate_pluginfunction (parameter names such asvalidator_settings,keep_repo,worker_timeout, orworkdir). - The tests assume
validate_pluginreturns adictwith at leastok,stage,message, and (for the timeout case)details. If the actual shape differs, adjust the asserted keys accordingly. - The
precheck_plugin_directoryexpectations (precheck_mock.assert_called_once_with(...)) assume the function receives(repo_path, plugin, settings); update to match the real signature if needed. - The worker invocation test assumes:
subprocess.runis the function invoked byvalidate_pluginto execute the worker.parse_worker_output(completed_process, plugin, settings)is used to turn the subprocess result into a validation result.
If your implementation uses different arguments or a different helper, adjust the corresponding mocks andassert_called_once_withchecks.
- Place the
ValidatePluginTestsclass near the other validator tests; ifValidationProgressTestsis in a different section, you may want to move this class next to existingValidatePlugin-related tests for consistency.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- The validator script is quite large and mixes CLI wiring, filesystem/git operations, and AstrBot runtime glue in a single file; consider splitting it into smaller, focused modules (e.g., URL/index handling, clone/precheck, worker orchestration) to improve readability and long-term maintainability.
- The
Detect changed plugins from pull requestworkflow step embeds a substantial Python script inline; consider moving this logic into a tracked helper script (e.g., underscripts/) so it can be versioned, tested, and reused more easily.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The validator script is quite large and mixes CLI wiring, filesystem/git operations, and AstrBot runtime glue in a single file; consider splitting it into smaller, focused modules (e.g., URL/index handling, clone/precheck, worker orchestration) to improve readability and long-term maintainability.
- The `Detect changed plugins from pull request` workflow step embeds a substantial Python script inline; consider moving this logic into a tracked helper script (e.g., under `scripts/`) so it can be versioned, tested, and reused more easily.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 3 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="100-122" />
<code_context>
+ return items[:limit]
+
+
+def _parse_simple_yaml(path: Path) -> dict:
+ def parse_value(raw_value: str) -> str:
+ value = raw_value.strip()
+ if not value:
+ return ""
+
+ if value[0] in {'"', "'"}:
+ quote = value[0]
+ end_index = value.rfind(quote)
+ if end_index > 0:
+ return value[1:end_index]
+
+ value = re.split(r"\s+#", value, maxsplit=1)[0].rstrip()
+ return value.strip("\"'")
+
+ result = {}
+ for raw_line in path.read_text(encoding="utf-8").splitlines():
+ line = raw_line.strip()
+ if not line or line.startswith("#") or ":" not in line:
+ continue
+ key, value = line.split(":", 1)
+ result[key.strip()] = parse_value(value)
+ return result
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The fallback YAML parser is very limited and may diverge from real YAML semantics in subtle cases.
This implementation only supports flat `key: value` pairs with limited quoting and comments. When PyYAML is unavailable, common patterns like multiline strings, nested mappings, lists, or values containing `:` will be misparsed or dropped. If this fallback is used outside very controlled inputs, consider either failing fast on unsupported constructs or explicitly constraining/validating the allowed format so plugin authors get clear errors instead of subtly incorrect metadata.
```suggestion
def _parse_simple_yaml(path: Path) -> dict:
"""Very small YAML subset parser used as a fallback when PyYAML is unavailable.
Supported format:
- Flat mapping of `key: value` pairs
- No indentation (no nested objects or multiline continuations)
- No lists (`- item` syntax)
- `#` starts a comment when preceded by whitespace (or at line start)
"""
def parse_value(raw_value: str) -> str:
value = raw_value.strip()
if not value:
return ""
# Quoted value – keep everything inside the outermost matching quotes.
if value[0] in {'"', "'"}:
quote = value[0]
end_index = value.rfind(quote)
if end_index > 0:
return value[1:end_index]
# Strip inline comments introduced by "#" with preceding whitespace, then trim quotes.
value = re.split(r"\s+#", value, maxsplit=1)[0].rstrip()
return value.strip("\"'")
result: dict = {}
for lineno, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
stripped = raw_line.strip()
# Skip completely empty lines
if not stripped:
continue
# Skip full-line comments
if stripped.startswith("#"):
continue
# Reject any indentation (indented lines are treated as unsupported nested/continuation lines)
if raw_line[0].isspace():
raise ValueError(
f"Unsupported YAML indentation in {path} at line {lineno}: {raw_line!r}"
)
line = stripped
# Reject list syntax explicitly
if line.startswith("-"):
raise ValueError(
f"Unsupported YAML list syntax in {path} at line {lineno}: {raw_line!r}"
)
# Require a single top-level mapping entry per line
if ":" not in line:
raise ValueError(
f"Unsupported YAML content (expected 'key: value') in {path} at line {lineno}: {raw_line!r}"
)
key, value = line.split(":", 1)
key = key.strip()
if not key:
raise ValueError(
f"Empty key is not allowed in {path} at line {lineno}: {raw_line!r}"
)
if key in result:
raise ValueError(
f"Duplicate key '{key}' in {path} at line {lineno}"
)
result[key] = parse_value(value)
return result
```
</issue_to_address>
### Comment 2
<location path="scripts/validate_plugins/detect_changed_plugins.py" line_range="16-22" />
<code_context>
+ASTRBOT_REMOTE_URL = "https://github.com/AstrBotDevs/AstrBot"
+
+
+def load_plugins_map(text: str, *, source_name: str) -> dict[str, dict]:
+ try:
+ data = json.loads(text)
+ except json.JSONDecodeError as exc:
+ raise ValueError(f"plugins.json is invalid on the {source_name}: {exc}") from exc
+
+ if not isinstance(data, dict):
+ raise ValueError("plugins.json must contain a JSON object")
+ return data
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `load_plugins_map` only validates the top-level type and may accept malformed plugin entries.
This only checks that the JSON root is a dict; it doesn’t validate that keys are strings and values are dicts as `load_plugins_index` in `run.py` does. Consider sharing a common validation helper so malformed `plugins.json` entries are consistently rejected and don’t slip through one path but fail in another.
Suggested implementation:
```python
import os
import json
import subprocess
import sys
from pathlib import Path
```
```python
def load_plugins_map(text: str, *, source_name: str) -> dict[str, dict]:
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise ValueError(f"plugins.json is invalid on the {source_name}: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("plugins.json must contain a JSON object")
for name, payload in data.items():
if not isinstance(name, str):
raise ValueError(
f"plugins.json on the {source_name} has a non-string key: {name!r}"
)
if not isinstance(payload, dict):
raise ValueError(
f"plugins.json entry {name!r} on the {source_name} must be a JSON object"
)
return data
```
To fully "share a common validation helper" with `load_plugins_index` in `run.py`, you may want to:
1. Extract the common validation logic into a shared function (e.g. `validate_plugins_map(data, source_name)`) in a module imported by both `run.py` and `detect_changed_plugins.py`.
2. Replace the inline validation loop here and in `run.py` with a call to that shared helper, keeping error messages and rules identical.
</issue_to_address>
### Comment 3
<location path="tests/test_validate_plugins.py" line_range="371" />
<code_context>
+ print_mock.assert_any_call("[2/2] FAIL plugin-b [metadata] invalid metadata.yaml", flush=True)
+
+
+class ValidatePluginTests(unittest.TestCase):
+ def setUp(self):
+ self.module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add a validate_plugin test for worker subprocess timeout
Current tests cover clone errors, precheck failures, and the successful worker path, but not the branch where `subprocess.run` raises `TimeoutExpired` (which should produce `stage="timeout"` and call `build_process_output_details`). Please add a mock-based test that forces `subprocess.run` to raise `TimeoutExpired` and asserts the `stage`, `message`, `plugin_dir_name`, and that `build_process_output_details` is called.
Suggested implementation:
```python
class ValidatePluginTests(unittest.TestCase):
def setUp(self):
self.module = load_validator_module()
self.plugin_key = "demo-plugin"
self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
self.astrbot_path = Path("/tmp/AstrBot")
self.script_path = Path("/tmp/run.py")
self.work_dir = Path("/tmp/work")
def call_validate_plugin(self, plugin_data=None):
return self.module.validate_plugin(
plugin=self.plugin_key,
plugin_data=self.plugin_data if plugin_data is None else plugin_data,
astrbot_path=self.astrbot_path,
script_path=self.script_path,
work_dir=self.work_dir,
)
def test_validate_plugin_worker_timeout(self):
timeout_exc = subprocess.TimeoutExpired(cmd=["python", str(self.script_path)], timeout=30)
timeout_exc.stdout = "worker stdout"
timeout_exc.stderr = "worker stderr"
with mock.patch(f"{self.module.__name__}.subprocess.run") as run_mock, mock.patch(
f"{self.module.__name__}.build_process_output_details"
) as details_mock:
run_mock.side_effect = timeout_exc
details_mock.return_value = "timeout details"
result = self.call_validate_plugin()
# stage should be reported as timeout
self.assertEqual(result["stage"], "timeout")
# message should reflect timeout; adjust expected text if implementation differs
self.assertIn("timeout", result["message"].lower())
# plugin_dir_name should point at this plugin; adjust if your implementation uses a different key
self.assertEqual(result["plugin_dir_name"], self.plugin_key)
# build_process_output_details should be called with stdout/stderr from the TimeoutExpired exception
details_mock.assert_called_once_with("worker stdout", "worker stderr")
```
Because only part of the file is visible, please verify and adjust the following in your codebase:
1. **`call_validate_plugin` signature**:
- Ensure `validate_plugin` actually takes `astrbot_path`, `script_path`, and `work_dir` keyword arguments.
- If the real signature differs, update the wrapper accordingly.
2. **Result structure**:
- Confirm that `validate_plugin` returns a mapping with keys `"stage"`, `"message"`, and `"plugin_dir_name"`.
- If your result object uses attributes or different keys (e.g., `result.stage` or `result.plugin_dir`), update the assertions to match.
3. **Timeout message text**:
- The test currently asserts that `"timeout"` appears in `result["message"].lower()`.
- If your implementation uses a specific message (e.g., `"worker subprocess timed out"`), you can tighten this assertion to match exactly.
4. **Patch targets**:
- The test patches `subprocess.run` and `build_process_output_details` using `self.module.__name__` as the module path.
- If those functions live in a different module (e.g., a helper module imported by the validator), adjust the patch strings to point at the actual module where they are used.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| print_mock.assert_any_call("[2/2] FAIL plugin-b [metadata] invalid metadata.yaml", flush=True) | ||
|
|
||
|
|
||
| class ValidatePluginTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Add a validate_plugin test for worker subprocess timeout
Current tests cover clone errors, precheck failures, and the successful worker path, but not the branch where subprocess.run raises TimeoutExpired (which should produce stage="timeout" and call build_process_output_details). Please add a mock-based test that forces subprocess.run to raise TimeoutExpired and asserts the stage, message, plugin_dir_name, and that build_process_output_details is called.
Suggested implementation:
class ValidatePluginTests(unittest.TestCase):
def setUp(self):
self.module = load_validator_module()
self.plugin_key = "demo-plugin"
self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
self.astrbot_path = Path("/tmp/AstrBot")
self.script_path = Path("/tmp/run.py")
self.work_dir = Path("/tmp/work")
def call_validate_plugin(self, plugin_data=None):
return self.module.validate_plugin(
plugin=self.plugin_key,
plugin_data=self.plugin_data if plugin_data is None else plugin_data,
astrbot_path=self.astrbot_path,
script_path=self.script_path,
work_dir=self.work_dir,
)
def test_validate_plugin_worker_timeout(self):
timeout_exc = subprocess.TimeoutExpired(cmd=["python", str(self.script_path)], timeout=30)
timeout_exc.stdout = "worker stdout"
timeout_exc.stderr = "worker stderr"
with mock.patch(f"{self.module.__name__}.subprocess.run") as run_mock, mock.patch(
f"{self.module.__name__}.build_process_output_details"
) as details_mock:
run_mock.side_effect = timeout_exc
details_mock.return_value = "timeout details"
result = self.call_validate_plugin()
# stage should be reported as timeout
self.assertEqual(result["stage"], "timeout")
# message should reflect timeout; adjust expected text if implementation differs
self.assertIn("timeout", result["message"].lower())
# plugin_dir_name should point at this plugin; adjust if your implementation uses a different key
self.assertEqual(result["plugin_dir_name"], self.plugin_key)
# build_process_output_details should be called with stdout/stderr from the TimeoutExpired exception
details_mock.assert_called_once_with("worker stdout", "worker stderr")Because only part of the file is visible, please verify and adjust the following in your codebase:
-
call_validate_pluginsignature:- Ensure
validate_pluginactually takesastrbot_path,script_path, andwork_dirkeyword arguments. - If the real signature differs, update the wrapper accordingly.
- Ensure
-
Result structure:
- Confirm that
validate_pluginreturns a mapping with keys"stage","message", and"plugin_dir_name". - If your result object uses attributes or different keys (e.g.,
result.stageorresult.plugin_dir), update the assertions to match.
- Confirm that
-
Timeout message text:
- The test currently asserts that
"timeout"appears inresult["message"].lower(). - If your implementation uses a specific message (e.g.,
"worker subprocess timed out"), you can tighten this assertion to match exactly.
- The test currently asserts that
-
Patch targets:
- The test patches
subprocess.runandbuild_process_output_detailsusingself.module.__name__as the module path. - If those functions live in a different module (e.g., a helper module imported by the validator), adjust the patch strings to point at the actual module where they are used.
- The test patches
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="486" />
<code_context>
+ )
+
+
+def validate_selected_plugins(
+ *,
+ selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` so each worker task writes directly into the shared results list and only returns data needed for logging, avoiding extra index bookkeeping and mappings.
You can simplify `validate_selected_plugins` by letting each task write directly into `results` and returning the minimal info needed for logging. This removes the `index`/`original_index` juggling and the `future_to_context` mapping without changing behavior.
```python
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
def task(pos: int, plugin: str, plugin_data: dict) -> tuple[int, str, dict]:
try:
result = validate_plugin(
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[pos] = result
return pos, plugin, result
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures: set[concurrent.futures.Future] = set()
for pos, (plugin, plugin_data) in enumerate(selected):
print(f"[{pos + 1}/{total}] Queued {plugin}", flush=True)
futures.add(executor.submit(task, pos, plugin, plugin_data))
for future in concurrent.futures.as_completed(futures):
pos, plugin, result = future.result()
display_index = pos + 1
status = "PASS" if result.get("ok") else "FAIL"
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(
f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}",
flush=True,
)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalized
```
This keeps:
- Output indices and messages identical (`pos + 1` mirrors your previous `start=1` logic).
- The same error handling and `build_result(...)` behavior for unexpected exceptions.
- The same final sanity check on `results`.
But it removes the double bookkeeping (`index` vs `original_index`) and the extra `future_to_context` dict, making the control flow easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| ) | ||
|
|
||
|
|
||
| def validate_selected_plugins( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying validate_selected_plugins so each worker task writes directly into the shared results list and only returns data needed for logging, avoiding extra index bookkeeping and mappings.
You can simplify validate_selected_plugins by letting each task write directly into results and returning the minimal info needed for logging. This removes the index/original_index juggling and the future_to_context mapping without changing behavior.
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
def task(pos: int, plugin: str, plugin_data: dict) -> tuple[int, str, dict]:
try:
result = validate_plugin(
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[pos] = result
return pos, plugin, result
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures: set[concurrent.futures.Future] = set()
for pos, (plugin, plugin_data) in enumerate(selected):
print(f"[{pos + 1}/{total}] Queued {plugin}", flush=True)
futures.add(executor.submit(task, pos, plugin, plugin_data))
for future in concurrent.futures.as_completed(futures):
pos, plugin, result = future.result()
display_index = pos + 1
status = "PASS" if result.get("ok") else "FAIL"
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(
f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}",
flush=True,
)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalizedThis keeps:
- Output indices and messages identical (
pos + 1mirrors your previousstart=1logic). - The same error handling and
build_result(...)behavior for unexpected exceptions. - The same final sanity check on
results.
But it removes the double bookkeeping (index vs original_index) and the extra future_to_context dict, making the control flow easier to follow.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 3 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="209-193" />
<code_context>
+ "details": str(exc),
+ }
+
+ missing = [
+ field
+ for field in REQUIRED_METADATA_FIELDS
+ if not isinstance(metadata.get(field), str) or not metadata[field].strip()
+ ]
+ if missing:
+ return {
+ "ok": False,
+ "severity": "warn",
</code_context>
<issue_to_address>
**issue (bug_risk):** Severity and `ok` flag for missing metadata fields may conflict with how failures are counted.
Here, missing required metadata yields `ok: False` but `severity: 'warn'`. Later, `build_report` and the final exit code only depend on `severity`, so these cases won’t produce a non-zero exit code. If missing required metadata should fail the smoke validation, either use `severity: 'fail'` here or update `build_report`/`main` to incorporate `ok` when computing failures and the exit code.
</issue_to_address>
### Comment 2
<location path="scripts/validate_plugins/detect_changed_plugins.py" line_range="29-30" />
<code_context>
+ return [name for name, payload in head.items() if base.get(name) != payload]
+
+
+def fetch_base_ref(base_ref: str) -> None:
+ subprocess.run(["git", "fetch", "origin", base_ref, "--depth", "1"], check=True)
+
+
</code_context>
<issue_to_address>
**issue (bug_risk):** A failing `git fetch` for the base ref will abort detection and the workflow without a graceful fallback.
In `detect_pull_request_selection`, `fetch_base_ref` is invoked with `check=True`, so a `CalledProcessError` from `git fetch` (e.g., missing `GITHUB_BASE_REF` or transient network issues) will terminate the script. For consistency with `read_base_plugins_json` and to keep the workflow resilient, catch `CalledProcessError` around (or within) `fetch_base_ref` and fall back to `base = {}` so that only base-comparison logic is skipped instead of failing the entire job.
</issue_to_address>
### Comment 3
<location path="scripts/validate_plugins/run.py" line_range="492" />
<code_context>
+ )
+
+
+def validate_selected_plugins(
+ *,
+ selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` by inlining the worker task and using zero-based indices plus a small status-formatting helper to avoid the current index juggling and indirection.
You can simplify `validate_selected_plugins` without changing behavior by removing the index gymnastics and the inner `task` wrapper, and by extracting the status formatting into a small helper.
### 1. Remove `task` and off‑by‑one indexing
You can capture `index` and `plugin` directly in the closure and use zero‑based indexes internally, only adding 1 for display:
```python
def _format_status(result: dict) -> tuple[str, str, str]:
severity = result.get("severity")
if severity is None:
severity = "pass" if result.get("ok") else "fail"
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
return status, stage, message
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_idx: dict[concurrent.futures.Future, tuple[int, str]] = {}
for idx, (plugin, plugin_data) in enumerate(selected):
display_index = idx + 1
print(f"[{display_index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_idx[future] = (idx, plugin)
for future in concurrent.futures.as_completed(future_to_idx):
idx, plugin = future_to_idx[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[idx] = result
status, stage, message = _format_status(result)
display_index = idx + 1
print(f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalized
```
This keeps:
- Output order stable (`results` indexed by original position).
- The same printed messages and indices (still 1‑based in logs).
- Error handling semantics identical (`threadpool` fallback on exception).
But removes the `task` indirection, `original_index` tuple unpacking, and the `index - 1` offset logic, making the function easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def precheck_plugin_directory(plugin_dir: Path) -> dict: | ||
| metadata_path = plugin_dir / "metadata.yaml" | ||
| if not metadata_path.exists(): | ||
| return { |
There was a problem hiding this comment.
issue (bug_risk): Severity and ok flag for missing metadata fields may conflict with how failures are counted.
Here, missing required metadata yields ok: False but severity: 'warn'. Later, build_report and the final exit code only depend on severity, so these cases won’t produce a non-zero exit code. If missing required metadata should fail the smoke validation, either use severity: 'fail' here or update build_report/main to incorporate ok when computing failures and the exit code.
| ) | ||
|
|
||
|
|
||
| def validate_selected_plugins( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying validate_selected_plugins by inlining the worker task and using zero-based indices plus a small status-formatting helper to avoid the current index juggling and indirection.
You can simplify validate_selected_plugins without changing behavior by removing the index gymnastics and the inner task wrapper, and by extracting the status formatting into a small helper.
1. Remove task and off‑by‑one indexing
You can capture index and plugin directly in the closure and use zero‑based indexes internally, only adding 1 for display:
def _format_status(result: dict) -> tuple[str, str, str]:
severity = result.get("severity")
if severity is None:
severity = "pass" if result.get("ok") else "fail"
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
return status, stage, message
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_idx: dict[concurrent.futures.Future, tuple[int, str]] = {}
for idx, (plugin, plugin_data) in enumerate(selected):
display_index = idx + 1
print(f"[{display_index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_idx[future] = (idx, plugin)
for future in concurrent.futures.as_completed(future_to_idx):
idx, plugin = future_to_idx[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[idx] = result
status, stage, message = _format_status(result)
display_index = idx + 1
print(f"[{display_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalizedThis keeps:
- Output order stable (
resultsindexed by original position). - The same printed messages and indices (still 1‑based in logs).
- Error handling semantics identical (
threadpoolfallback on exception).
But removes the task indirection, original_index tuple unpacking, and the index - 1 offset logic, making the function easier to follow.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The
plugin_dir_nametaken frommetadata['name']is used directly as a filesystem path inrun_workerandprecheck_plugin_directorywithout sanitization, so it would be safer to validate/normalize it (e.g. reject path separators and..) before using it inplugin_store / plugin_dir_namewhile still preserving the original name for reporting. - The validator script in
scripts/validate_plugins/run.pyhas grown quite large and mixes CLI handling, I/O, concurrency, and AstrBot worker logic; consider splitting it into smaller modules (e.g. metadata handling, process/worker orchestration, reporting) to make future changes and reuse easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `plugin_dir_name` taken from `metadata['name']` is used directly as a filesystem path in `run_worker` and `precheck_plugin_directory` without sanitization, so it would be safer to validate/normalize it (e.g. reject path separators and `..`) before using it in `plugin_store / plugin_dir_name` while still preserving the original name for reporting.
- The validator script in `scripts/validate_plugins/run.py` has grown quite large and mixes CLI handling, I/O, concurrency, and AstrBot worker logic; consider splitting it into smaller modules (e.g. metadata handling, process/worker orchestration, reporting) to make future changes and reuse easier.
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="25-34" />
<code_context>
+
+
+def resolve_astrbot_ref() -> str:
+ try:
+ default_head = subprocess.check_output(
+ ["git", "ls-remote", "--symref", ASTRBOT_REMOTE_URL, "HEAD"],
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The `error` value from `PluginManager.load` is used directly as a message and may not always be a string.
In `run_worker_load_check`, the failure branch does `message=error or "plugin load failed"`. If `error` is not a string (e.g., an exception or structured object), this will put a non-string into the JSON result, conflicting with the `build_result` contract and hurting readability. Consider `message = str(error) if error else "plugin load failed"` and, if you need richer data, place it in the `details` field instead.
</issue_to_address>
### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="492" />
<code_context>
+ )
+
+
+def validate_selected_plugins(
+ *,
+ selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `validate_selected_plugins` by inlining the worker call and relying on the existing future-to-index mapping instead of returning indices from a wrapper task.
`validate_selected_plugins` can be simplified without changing behavior by removing the extra `task` wrapper, the `(index, result)` pairing, and the “missing results” post-check. You already track indices in `future_to_context`, so you don’t need to also return indices from the worker.
A more direct version:
```python
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected, start=1):
print(f"[{index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index - 1] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
# All futures are iterated, so all indices 1..total are filled
return [r for r in results if r is not None]
```
This keeps:
- parallelism and as-completed logging,
- ordered result list,
- error handling (threadpool exceptions → `build_result`),
while removing:
- the inner `task` function,
- the index piggybacked in the result,
- the extra “missing results” runtime check.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| ) | ||
|
|
||
|
|
||
| def validate_selected_plugins( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying validate_selected_plugins by inlining the worker call and relying on the existing future-to-index mapping instead of returning indices from a wrapper task.
validate_selected_plugins can be simplified without changing behavior by removing the extra task wrapper, the (index, result) pairing, and the “missing results” post-check. You already track indices in future_to_context, so you don’t need to also return indices from the worker.
A more direct version:
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected, start=1):
print(f"[{index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index - 1] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
# All futures are iterated, so all indices 1..total are filled
return [r for r in results if r is not None]This keeps:
- parallelism and as-completed logging,
- ordered result list,
- error handling (threadpool exceptions →
build_result),
while removing:
- the inner
taskfunction, - the index piggybacked in the result,
- the extra “missing results” runtime check.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- Consider validating numeric CLI arguments like
--max-workers,--clone-timeout, and--load-timeoutto ensure they are positive (e.g., rejectingmax_workers <= 0), since values like--max-workers 0will causeThreadPoolExecutorto fail at runtime invalidate_selected_plugins. - In
precheck_plugin_directoryandbuild_result, results withseverity='warn'still haveok=False; if warnings are meant to be non-fatal, you may want to setok=Truefor those cases or otherwise clarify/align the semantics so downstream consumers aren’t forced to treat warnings as failures.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider validating numeric CLI arguments like `--max-workers`, `--clone-timeout`, and `--load-timeout` to ensure they are positive (e.g., rejecting `max_workers <= 0`), since values like `--max-workers 0` will cause `ThreadPoolExecutor` to fail at runtime in `validate_selected_plugins`.
- In `precheck_plugin_directory` and `build_result`, results with `severity='warn'` still have `ok=False`; if warnings are meant to be non-fatal, you may want to set `ok=True` for those cases or otherwise clarify/align the semantics so downstream consumers aren’t forced to treat warnings as failures.
## Individual Comments
### Comment 1
<location path="tests/test_validate_plugins.py" line_range="469" />
<code_context>
+ print_mock.assert_any_call("[3/3] PASS plugin-c [load] c", flush=True)
+
+
+class ValidatePluginTests(unittest.TestCase):
+ def setUp(self):
+ self.module = load_validator_module()
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for the worker subprocess timeout path in `validate_plugin`.
`validate_plugin` has a branch that handles `subprocess.TimeoutExpired`, mapping it to `stage="timeout"` and using `build_process_output_details(exc.stdout, exc.stderr)`. `ValidatePluginTests` currently don’t exercise this. Please add a test that patches `subprocess.run` to raise `TimeoutExpired` and asserts that the result has `stage == "timeout"`, includes the `plugin_dir_name`, and that `build_process_output_details` is called with the expected arguments, so the load-timeout behavior is covered and stable.
Suggested implementation:
```python
class ValidatePluginTests(unittest.TestCase):
def setUp(self):
self.module = load_validator_module()
self.plugin_key = "demo-plugin"
self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
self.astrbot_path = Path("/tmp/AstrBot")
self.script_path = Path("/tmp/run.py")
self.work_dir = Path("/tmp/work")
def call_validate_plugin(self, plugin_data=None):
return self.module.validate_plugin(
plugin=self.plugin_key,
plugin_data=self.plugin_data if plugin_data is None else plugin_data,
)
def test_validate_plugin_load_timeout_uses_timeout_stage_and_builds_output_details(self):
# Prepare a TimeoutExpired instance with stdout/stderr set so we can
# assert what gets forwarded into build_process_output_details.
timeout_exc = subprocess.TimeoutExpired(
cmd=[sys.executable, str(self.script_path)],
timeout=300,
)
timeout_exc.stdout = "timeout-stdout"
timeout_exc.stderr = "timeout-stderr"
with mock.patch.object(self.module, "subprocess") as subprocess_mod, \
mock.patch.object(self.module, "build_process_output_details") as build_details:
# Ensure the validate_plugin `except subprocess.TimeoutExpired` branch
# matches this class and that `run` raises the configured exception.
subprocess_mod.TimeoutExpired = subprocess.TimeoutExpired
subprocess_mod.run.side_effect = timeout_exc
result = self.call_validate_plugin()
self.assertEqual(result["stage"], "timeout")
# The timeout result should still include the plugin_dir_name so callers
# can attribute the failure correctly.
self.assertIn("plugin_dir_name", result)
self.assertEqual(result["plugin_dir_name"], "demo-plugin")
build_details.assert_called_once_with("timeout-stdout", "timeout-stderr")
```
To make this compile and run, ensure the following are present at the top of `tests/test_validate_plugins.py` (if they are not already):
1. Imports:
- `import sys`
- `import subprocess`
- `from unittest import mock` (or `import unittest.mock as mock` and adjust usage accordingly).
2. The helper `call_validate_plugin` likely needs to pass through all required parameters that `validate_plugin` expects (e.g., `astrbot_path`, `script_path`, `work_dir`, and any others used elsewhere in the file). If those are not already being passed, extend the call accordingly so the new test exercises the real code path.
3. If `build_process_output_details` is not defined directly on `self.module` but imported from another module, adjust the `mock.patch.object(self.module, "build_process_output_details")` line to patch the correct import location (e.g., `mock.patch("validator_module.build_process_output_details")` using the actual module path).
</issue_to_address>
### Comment 2
<location path="scripts/validate_plugins/run.py" line_range="512" />
<code_context>
+ )
+
+
+def validate_selected_plugins(
+ *,
+ selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying validate_selected_plugins by submitting validate_plugin directly to the executor and using the future-to-context mapping instead of an inner task wrapper and index plumbing.
You can simplify `validate_selected_plugins` by removing the inner `task` function and the extra `(index, result)` plumbing. The `ThreadPoolExecutor` already gives you futures; you only need a mapping to the original index and plugin name.
This reduces nesting and avoids the `original_index` variable while preserving ordering, logging, and error handling:
```python
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected, start=1):
print(f"[{index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index - 1] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalized
```
This keeps the same functionality (including ordered results and log format) but removes one level of indirection and simplifies the error path.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| print_mock.assert_any_call("[3/3] PASS plugin-c [load] c", flush=True) | ||
|
|
||
|
|
||
| class ValidatePluginTests(unittest.TestCase): |
There was a problem hiding this comment.
suggestion (testing): Add a test for the worker subprocess timeout path in validate_plugin.
validate_plugin has a branch that handles subprocess.TimeoutExpired, mapping it to stage="timeout" and using build_process_output_details(exc.stdout, exc.stderr). ValidatePluginTests currently don’t exercise this. Please add a test that patches subprocess.run to raise TimeoutExpired and asserts that the result has stage == "timeout", includes the plugin_dir_name, and that build_process_output_details is called with the expected arguments, so the load-timeout behavior is covered and stable.
Suggested implementation:
class ValidatePluginTests(unittest.TestCase):
def setUp(self):
self.module = load_validator_module()
self.plugin_key = "demo-plugin"
self.plugin_data = {"repo": "https://github.com/example/demo-plugin"}
self.astrbot_path = Path("/tmp/AstrBot")
self.script_path = Path("/tmp/run.py")
self.work_dir = Path("/tmp/work")
def call_validate_plugin(self, plugin_data=None):
return self.module.validate_plugin(
plugin=self.plugin_key,
plugin_data=self.plugin_data if plugin_data is None else plugin_data,
)
def test_validate_plugin_load_timeout_uses_timeout_stage_and_builds_output_details(self):
# Prepare a TimeoutExpired instance with stdout/stderr set so we can
# assert what gets forwarded into build_process_output_details.
timeout_exc = subprocess.TimeoutExpired(
cmd=[sys.executable, str(self.script_path)],
timeout=300,
)
timeout_exc.stdout = "timeout-stdout"
timeout_exc.stderr = "timeout-stderr"
with mock.patch.object(self.module, "subprocess") as subprocess_mod, \
mock.patch.object(self.module, "build_process_output_details") as build_details:
# Ensure the validate_plugin `except subprocess.TimeoutExpired` branch
# matches this class and that `run` raises the configured exception.
subprocess_mod.TimeoutExpired = subprocess.TimeoutExpired
subprocess_mod.run.side_effect = timeout_exc
result = self.call_validate_plugin()
self.assertEqual(result["stage"], "timeout")
# The timeout result should still include the plugin_dir_name so callers
# can attribute the failure correctly.
self.assertIn("plugin_dir_name", result)
self.assertEqual(result["plugin_dir_name"], "demo-plugin")
build_details.assert_called_once_with("timeout-stdout", "timeout-stderr")To make this compile and run, ensure the following are present at the top of tests/test_validate_plugins.py (if they are not already):
-
Imports:
import sysimport subprocessfrom unittest import mock(orimport unittest.mock as mockand adjust usage accordingly).
-
The helper
call_validate_pluginlikely needs to pass through all required parameters thatvalidate_pluginexpects (e.g.,astrbot_path,script_path,work_dir, and any others used elsewhere in the file). If those are not already being passed, extend the call accordingly so the new test exercises the real code path. -
If
build_process_output_detailsis not defined directly onself.modulebut imported from another module, adjust themock.patch.object(self.module, "build_process_output_details")line to patch the correct import location (e.g.,mock.patch("validator_module.build_process_output_details")using the actual module path).
| ) | ||
|
|
||
|
|
||
| def validate_selected_plugins( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying validate_selected_plugins by submitting validate_plugin directly to the executor and using the future-to-context mapping instead of an inner task wrapper and index plumbing.
You can simplify validate_selected_plugins by removing the inner task function and the extra (index, result) plumbing. The ThreadPoolExecutor already gives you futures; you only need a mapping to the original index and plugin name.
This reduces nesting and avoids the original_index variable while preserving ordering, logging, and error handling:
def validate_selected_plugins(
*,
selected: list[tuple[str, dict]],
astrbot_path: Path,
script_path: Path,
work_dir: Path,
clone_timeout: int,
load_timeout: int,
max_workers: int,
) -> list[dict]:
total = len(selected)
results: list[dict | None] = [None] * total
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected, start=1):
print(f"[{index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(
validate_plugin,
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index - 1] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalizedThis keeps the same functionality (including ordered results and log format) but removes one level of indirection and simplifies the error path.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="scripts/validate_plugins/run.py" line_range="521" />
<code_context>
+ )
+
+
+def validate_selected_plugins(
+ *,
+ selected: list[tuple[str, dict]],
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the thread pool indexing logic and tightening the worker stdout contract so result collection and worker output parsing are straightforward and avoid extra bookkeeping.
1. **Parallel validation index handling is more complex than needed**
You’re manually juggling 1-based indices, storing `(index, plugin)` in a map, returning `(original_index, result)` from `task`, then subtracting 1 to index into `results`. You can keep the 1-based user-facing progress output while using 0-based indices internally and avoid passing indices through the worker function.
This keeps behavior identical (including printed `[1/…]` counters) but reduces the mental overhead:
```python
def validate_selected_plugins(...):
total = len(selected)
results: list[dict | None] = [None] * total
def task(plugin: str, plugin_data: dict) -> dict:
return validate_plugin(
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected):
human_index = index + 1
print(f"[{human_index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(task, plugin, plugin_data)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
human_index = index + 1
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{human_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalized
```
2. **Worker output parsing can be simplified with a tighter contract**
`parse_worker_output` currently scans stdout backwards to find a JSON line and mutates it to inject `plugin`, `repo`, and `normalized_repo_url`. If you can tighten the contract so that the worker always prints a *single* JSON line (and sends any debug/log output to stderr), the parsing logic becomes much simpler and easier to reason about.
On the worker side:
```python
def run_worker(args: argparse.Namespace) -> int:
...
try:
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
except Exception as exc:
result = build_result(
plugin=args.plugin_dir_name,
repo=args.normalized_repo_url,
normalized_repo_url=args.normalized_repo_url,
ok=False,
stage="worker",
message=str(exc),
plugin_dir_name=args.plugin_dir_name,
details=traceback.format_exc(),
)
finally:
shutil.rmtree(temp_root, ignore_errors=True)
# Ensure only the structured result is printed on stdout
print(json.dumps(result, ensure_ascii=False))
return 0 if result["ok"] else 1
```
And then `parse_worker_output` can assume “one JSON object per run” and skip the reverse scan and mutation:
```python
def parse_worker_output(
*,
plugin: str,
repo: str,
normalized_repo_url: str,
completed: subprocess.CompletedProcess[str],
plugin_dir_name: str,
) -> dict:
stdout = completed.stdout.strip()
if stdout:
try:
payload = json.loads(stdout)
except json.JSONDecodeError:
payload = None
if isinstance(payload, dict):
# Optionally still enforce/override identity fields here
payload.setdefault("plugin", plugin)
payload.setdefault("repo", repo)
payload.setdefault("normalized_repo_url", normalized_repo_url)
payload.setdefault("plugin_dir_name", plugin_dir_name)
return payload
stderr = completed.stderr.strip()
message = stderr or stdout or "worker returned no structured output"
return build_result(
plugin=plugin,
repo=repo,
normalized_repo_url=normalized_repo_url,
ok=False,
stage="worker",
message=message,
plugin_dir_name=plugin_dir_name,
)
```
This keeps all current capabilities (structured result, fallback to error result) but removes the need for scanning stdout line-by-line and retrofitting identity fields.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| ) | ||
|
|
||
|
|
||
| def validate_selected_plugins( |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the thread pool indexing logic and tightening the worker stdout contract so result collection and worker output parsing are straightforward and avoid extra bookkeeping.
- Parallel validation index handling is more complex than needed
You’re manually juggling 1-based indices, storing (index, plugin) in a map, returning (original_index, result) from task, then subtracting 1 to index into results. You can keep the 1-based user-facing progress output while using 0-based indices internally and avoid passing indices through the worker function.
This keeps behavior identical (including printed [1/…] counters) but reduces the mental overhead:
def validate_selected_plugins(...):
total = len(selected)
results: list[dict | None] = [None] * total
def task(plugin: str, plugin_data: dict) -> dict:
return validate_plugin(
plugin=plugin,
plugin_data=plugin_data,
astrbot_path=astrbot_path,
script_path=script_path,
work_dir=work_dir,
clone_timeout=clone_timeout,
load_timeout=load_timeout,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {}
for index, (plugin, plugin_data) in enumerate(selected):
human_index = index + 1
print(f"[{human_index}/{total}] Queued {plugin}", flush=True)
future = executor.submit(task, plugin, plugin_data)
future_to_context[future] = (index, plugin)
for future in concurrent.futures.as_completed(future_to_context):
index, plugin = future_to_context[future]
human_index = index + 1
try:
result = future.result()
except Exception as exc:
result = build_result(
plugin=plugin,
repo="",
normalized_repo_url=None,
ok=False,
stage="threadpool",
message=str(exc),
details=traceback.format_exc(),
)
results[index] = result
severity = result.get("severity", "pass" if result.get("ok") else "fail")
status = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}.get(severity, "FAIL")
stage = result.get("stage", "unknown")
message = result.get("message", "")
print(f"[{human_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True)
finalized = [result for result in results if result is not None]
if len(finalized) != total:
raise RuntimeError("parallel validation finished with missing results")
return finalized- Worker output parsing can be simplified with a tighter contract
parse_worker_output currently scans stdout backwards to find a JSON line and mutates it to inject plugin, repo, and normalized_repo_url. If you can tighten the contract so that the worker always prints a single JSON line (and sends any debug/log output to stderr), the parsing logic becomes much simpler and easier to reason about.
On the worker side:
def run_worker(args: argparse.Namespace) -> int:
...
try:
result = asyncio.run(
run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url)
)
except Exception as exc:
result = build_result(
plugin=args.plugin_dir_name,
repo=args.normalized_repo_url,
normalized_repo_url=args.normalized_repo_url,
ok=False,
stage="worker",
message=str(exc),
plugin_dir_name=args.plugin_dir_name,
details=traceback.format_exc(),
)
finally:
shutil.rmtree(temp_root, ignore_errors=True)
# Ensure only the structured result is printed on stdout
print(json.dumps(result, ensure_ascii=False))
return 0 if result["ok"] else 1And then parse_worker_output can assume “one JSON object per run” and skip the reverse scan and mutation:
def parse_worker_output(
*,
plugin: str,
repo: str,
normalized_repo_url: str,
completed: subprocess.CompletedProcess[str],
plugin_dir_name: str,
) -> dict:
stdout = completed.stdout.strip()
if stdout:
try:
payload = json.loads(stdout)
except json.JSONDecodeError:
payload = None
if isinstance(payload, dict):
# Optionally still enforce/override identity fields here
payload.setdefault("plugin", plugin)
payload.setdefault("repo", repo)
payload.setdefault("normalized_repo_url", normalized_repo_url)
payload.setdefault("plugin_dir_name", plugin_dir_name)
return payload
stderr = completed.stderr.strip()
message = stderr or stdout or "worker returned no structured output"
return build_result(
plugin=plugin,
repo=repo,
normalized_repo_url=normalized_repo_url,
ok=False,
stage="worker",
message=message,
plugin_dir_name=plugin_dir_name,
)This keeps all current capabilities (structured result, fallback to error result) but removes the need for scanning stdout line-by-line and retrofitting identity fields.
This change adds an MVP validation path for marketplace plugins so the repository can catch more than malformed
plugins.jsonentries or unreachable repository URLs.Today the collection only verifies index syntax and repository reachability. That lets broken plugins enter the market even when they are missing
metadata.yaml, lack a valid entrypoint, or fail during AstrBot's real plugin loading path. The result is that users can install plugins that immediately fail to import or initialize.The root cause is that marketplace validation stops before any plugin-structure or AstrBot-runtime checks happen. There was no reusable validator that clones a plugin repository, checks the expected files, and asks AstrBot's
PluginManager.load()whether the plugin can actually load.This PR adds that missing validation layer in two parts. First, it introduces
scripts/validate_plugins/run.py, which normalizes GitHub repository URLs, selects target plugins fromplugins.json, clones each repository, runs metadata and entrypoint prechecks, and then launches a subprocess worker that places the plugin into a temporary AstrBot root and executesPluginManager.load(specified_dir_name=...). Second, it adds.github/workflows/validate-plugin-smoke.yml, which runs this validator for changed plugin entries on pull requests and supports manualworkflow_dispatchruns for targeted checks or limited sweeps.The validator emits a structured JSON report with stage-specific failures such as
repo_url,clone,metadata,entrypoint,load,worker, andtimeout, so maintainers can tell why a plugin failed instead of only seeing a generic CI failure. The PR also includes focused unit tests for URL normalization, plugin selection, metadata validation, worker command construction, report aggregation, and worker output parsing.Validation used for this change:
python3 -m unittest tests.test_validate_plugins -vpython3 scripts/validate_plugins/run.py --helpruby -e 'require "yaml"; YAML.load_file(".github/workflows/validate-plugin-smoke.yml"); puts "workflow_ok"'Summary by Sourcery
Introduce an automated plugin smoke validation pipeline for marketplace entries, including a CLI validator, change-detection helper, and CI workflow integration to run validations on PRs, schedules, and manual triggers.
New Features:
Enhancements:
Tests: