diff --git a/.github/workflows/validate-plugin-smoke.yml b/.github/workflows/validate-plugin-smoke.yml new file mode 100644 index 00000000..2b5e19bd --- /dev/null +++ b/.github/workflows/validate-plugin-smoke.yml @@ -0,0 +1,125 @@ +name: Validate Plugin Smoke + +on: + pull_request: + paths: + - "plugins.json" + - "scripts/validate_plugins/**" + - ".github/workflows/validate-plugin-smoke.yml" + workflow_dispatch: + inputs: + plugin_names: + description: "Comma-separated plugin keys from plugins.json" + required: false + default: "" + plugin_limit: + description: "Validate the first N plugins when plugin_names is empty" + required: false + default: "20" + astrbot_ref: + description: "AstrBot git ref to validate against" + required: false + default: "master" + +jobs: + validate-plugin-smoke: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set manual validation inputs + if: github.event_name == 'workflow_dispatch' + run: | + echo "ASTRBOT_REF=${{ inputs.astrbot_ref }}" >> "$GITHUB_ENV" + echo "PLUGIN_NAME_LIST=${{ inputs.plugin_names }}" >> "$GITHUB_ENV" + echo "PLUGIN_LIMIT=${{ inputs.plugin_limit }}" >> "$GITHUB_ENV" + echo "SHOULD_VALIDATE=true" >> "$GITHUB_ENV" + + - name: Detect changed plugins from pull request + if: github.event_name == 'pull_request' + run: | + python - <<'PY' + import json + import os + import subprocess + from pathlib import Path + + base_ref = os.environ["GITHUB_BASE_REF"] + subprocess.run(["git", "fetch", "origin", base_ref, "--depth", "1"], check=True) + base = json.loads( + subprocess.check_output( + ["git", "show", f"origin/{base_ref}:plugins.json"], + text=True, + ) + ) + head = json.loads(Path("plugins.json").read_text(encoding="utf-8")) + + changed = [ + name + for name, payload in head.items() + if base.get(name) != payload + ] + + with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as handle: + handle.write("ASTRBOT_REF=master\n") + handle.write(f"PLUGIN_NAME_LIST={','.join(changed)}\n") + handle.write("PLUGIN_LIMIT=\n") + handle.write(f"SHOULD_VALIDATE={'true' if changed else 'false'}\n") + PY + + - name: Show PR diff selection + if: github.event_name == 'pull_request' + run: | + if [ "$SHOULD_VALIDATE" != "true" ]; then + printf '%s\n' "No plugin entries changed in plugins.json; skipping smoke validation." + else + printf 'Selected plugins: %s\n' "$PLUGIN_NAME_LIST" + fi + + - name: Set up Python + if: env.SHOULD_VALIDATE == 'true' + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install validator dependencies + if: env.SHOULD_VALIDATE == 'true' + run: python -m pip install --upgrade pip pyyaml + + - name: Clone AstrBot + if: env.SHOULD_VALIDATE == 'true' + run: git clone --depth 1 --branch "$ASTRBOT_REF" "https://github.com/AstrBotDevs/AstrBot" ".cache/AstrBot" + + - name: Install AstrBot dependencies + if: env.SHOULD_VALIDATE == 'true' + run: python -m pip install -r ".cache/AstrBot/requirements.txt" + + - name: Run plugin smoke validator + if: env.SHOULD_VALIDATE == 'true' + run: | + args=( + --astrbot-path ".cache/AstrBot" + --report-path "validation-report.json" + ) + + if [ -n "${PLUGIN_NAME_LIST:-}" ]; then + args+=(--plugin-name-list "$PLUGIN_NAME_LIST") + fi + + if [ -n "${PLUGIN_LIMIT:-}" ]; then + args+=(--limit "$PLUGIN_LIMIT") + fi + + python scripts/validate_plugins/run.py "${args[@]}" + + - name: Upload validation report + if: always() + uses: actions/upload-artifact@v4 + with: + name: validation-report + path: validation-report.json + if-no-files-found: warn diff --git a/scripts/validate_plugins/__init__.py b/scripts/validate_plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/validate_plugins/run.py b/scripts/validate_plugins/run.py new file mode 100644 index 00000000..7db3d647 --- /dev/null +++ b/scripts/validate_plugins/run.py @@ -0,0 +1,594 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import re +import shutil +import subprocess +import sys +import tempfile +import traceback +from pathlib import Path +from urllib.parse import urlparse + +try: + import yaml +except ImportError: # pragma: no cover - optional in local unit tests + yaml = None + + +REQUIRED_METADATA_FIELDS = ("name", "desc", "version", "author") + + +def build_result( + *, + plugin: str, + repo: str, + normalized_repo_url: str | None, + ok: bool, + stage: str, + message: str, + plugin_dir_name: str | None = None, + details: dict | str | None = None, +) -> dict: + result = { + "plugin": plugin, + "repo": repo, + "normalized_repo_url": normalized_repo_url, + "ok": ok, + "stage": stage, + "message": message, + } + if plugin_dir_name: + result["plugin_dir_name"] = plugin_dir_name + if details is not None: + result["details"] = details + return result + + +def normalize_repo_url(repo_url: str) -> str: + parsed = urlparse(repo_url.strip()) + if parsed.scheme not in {"http", "https"}: + raise ValueError("repo URL must use http or https") + if parsed.netloc.lower() != "github.com": + raise ValueError("repo URL must point to github.com") + + parts = [part for part in parsed.path.split("/") if part] + if len(parts) != 2: + raise ValueError("repo URL must include owner and repository") + + owner, repo = parts[0], parts[1] + if repo.endswith(".git"): + repo = repo[:-4] + if not owner or not repo: + raise ValueError("repo URL owner or repository is empty") + + return f"https://github.com/{owner}/{repo}" + + +def select_plugins( + *, + plugins: dict, + requested_names: list[str] | None, + limit: int | None, +) -> list[tuple[str, dict]]: + if requested_names: + selected = [] + for name in requested_names: + if name not in plugins: + raise KeyError(f"plugin not found: {name}") + selected.append((name, plugins[name])) + return selected + + items = list(plugins.items()) + if limit is None: + return items + return items[:limit] + + +def _parse_simple_yaml(path: Path) -> dict: + result = {} + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or ":" not in line: + continue + key, value = line.split(":", 1) + result[key.strip()] = value.strip().strip("\"'") + return result + + +def load_metadata(path: Path) -> dict: + if yaml is not None: + loaded = yaml.safe_load(path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + return loaded + return {} + return _parse_simple_yaml(path) + + +def precheck_plugin_directory(plugin_dir: Path) -> dict: + metadata_path = plugin_dir / "metadata.yaml" + if not metadata_path.exists(): + return { + "ok": False, + "stage": "metadata", + "message": "missing metadata.yaml", + } + + metadata = load_metadata(metadata_path) + missing = [ + field + for field in REQUIRED_METADATA_FIELDS + if not isinstance(metadata.get(field), str) or not metadata[field].strip() + ] + if missing: + return { + "ok": False, + "stage": "metadata", + "message": f"missing required metadata fields: {', '.join(missing)}", + } + + plugin_name = metadata["name"].strip() + entry_candidates = [plugin_dir / "main.py", plugin_dir / f"{plugin_name}.py"] + if not any(path.exists() for path in entry_candidates): + return { + "ok": False, + "stage": "entrypoint", + "message": f"missing main.py or {plugin_name}.py", + } + + return { + "ok": True, + "stage": "precheck", + "message": "ok", + "metadata": metadata, + "plugin_dir_name": plugin_name, + } + + +def build_worker_command( + *, + script_path: Path, + astrbot_path: Path, + plugin_source_dir: Path, + plugin_dir_name: str, + normalized_repo_url: str, +) -> list[str]: + return [ + sys.executable, + str(script_path), + "--worker", + "--astrbot-path", + str(astrbot_path), + "--plugin-source-dir", + str(plugin_source_dir), + "--plugin-dir-name", + plugin_dir_name, + "--normalized-repo-url", + normalized_repo_url, + ] + + +def build_report(results: list[dict]) -> dict: + passed = sum(1 for result in results if result.get("ok")) + failed = len(results) - passed + return { + "summary": { + "total": len(results), + "passed": passed, + "failed": failed, + }, + "results": results, + } + + +def load_plugins_index(path: Path) -> dict[str, dict]: + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError("plugins.json must contain a JSON object") + result = {} + for key, value in data.items(): + if isinstance(key, str) and isinstance(value, dict): + result[key] = value + return result + + +def combine_requested_names( + plugin_names: list[str] | None, + plugin_name_list: str | None, +) -> list[str]: + names = list(plugin_names or []) + if plugin_name_list: + names.extend(part.strip() for part in plugin_name_list.split(",")) + return [name for name in names if name] + + +def sanitize_name(name: str) -> str: + sanitized = re.sub(r"[^A-Za-z0-9._-]+", "-", name).strip("-") + return sanitized or "plugin" + + +def clone_plugin_repo(repo_url: str, destination: Path) -> None: + subprocess.run( + ["git", "clone", "--depth", "1", repo_url, str(destination)], + check=True, + capture_output=True, + text=True, + ) + + +def parse_worker_output( + *, + plugin: str, + repo: str, + normalized_repo_url: str, + completed: subprocess.CompletedProcess[str], + plugin_dir_name: str, +) -> dict: + stdout = completed.stdout.strip() + if stdout: + try: + payload = json.loads(stdout) + except json.JSONDecodeError: + payload = None + if isinstance(payload, dict): + payload["plugin"] = plugin + payload["repo"] = repo + payload["normalized_repo_url"] = normalized_repo_url + payload.setdefault("plugin_dir_name", plugin_dir_name) + return payload + + stderr = completed.stderr.strip() + message = stderr or stdout or "worker returned no structured output" + return build_result( + plugin=plugin, + repo=repo, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="worker", + message=message, + plugin_dir_name=plugin_dir_name, + ) + + +def validate_plugin( + *, + plugin: str, + plugin_data: dict, + astrbot_path: Path, + script_path: Path, + work_dir: Path, + load_timeout: int, +) -> dict: + repo_url = plugin_data.get("repo") + if not isinstance(repo_url, str) or not repo_url.strip(): + return build_result( + plugin=plugin, + repo="", + normalized_repo_url=None, + ok=False, + stage="repo_url", + message="missing repo field", + ) + + try: + normalized_repo_url = normalize_repo_url(repo_url) + except ValueError as exc: + return build_result( + plugin=plugin, + repo=repo_url, + normalized_repo_url=None, + ok=False, + stage="repo_url", + message=str(exc), + ) + + plugin_clone_dir = work_dir / sanitize_name(plugin) + try: + clone_plugin_repo(normalized_repo_url, plugin_clone_dir) + except subprocess.CalledProcessError as exc: + message = exc.stderr.strip() or exc.stdout.strip() or str(exc) + return build_result( + plugin=plugin, + repo=repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="clone", + message=message, + ) + + precheck = precheck_plugin_directory(plugin_clone_dir) + if not precheck["ok"]: + return build_result( + plugin=plugin, + repo=repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage=precheck["stage"], + message=precheck["message"], + ) + + plugin_dir_name = precheck["plugin_dir_name"] + command = build_worker_command( + script_path=script_path, + astrbot_path=astrbot_path, + plugin_source_dir=plugin_clone_dir, + plugin_dir_name=plugin_dir_name, + normalized_repo_url=normalized_repo_url, + ) + + try: + completed = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + timeout=load_timeout, + ) + except subprocess.TimeoutExpired: + return build_result( + plugin=plugin, + repo=repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="timeout", + message=f"worker timed out after {load_timeout} seconds", + plugin_dir_name=plugin_dir_name, + ) + + return parse_worker_output( + plugin=plugin, + repo=repo_url, + normalized_repo_url=normalized_repo_url, + completed=completed, + plugin_dir_name=plugin_dir_name, + ) + + +class NullStub: + def __getattr__(self, name: str) -> "NullStub": + del name + return self + + def __call__(self, *args, **kwargs) -> "NullStub": + del args, kwargs + return self + + def __await__(self): + async def _return_self(): + return self + + return _return_self().__await__() + + def __iter__(self): + return iter(()) + + def __bool__(self) -> bool: + return False + + +class DummyContext: + def __init__(self) -> None: + self._star_manager = None + + def get_all_stars(self): + try: + from astrbot.core.star.star import star_registry + + return list(star_registry) + except Exception: + return [] + + def get_registered_star(self, star_name: str): + for star in self.get_all_stars(): + if getattr(star, "name", None) == star_name: + return star + return None + + def activate_llm_tool(self, name: str) -> bool: + del name + return True + + def deactivate_llm_tool(self, name: str) -> bool: + del name + return True + + def register_llm_tool(self, name: str, func_args, desc: str, func_obj) -> None: + del name, func_args, desc, func_obj + + def unregister_llm_tool(self, name: str) -> None: + del name + + def __getattr__(self, name: str) -> NullStub: + del name + return NullStub() + + +async def run_worker_load_check(plugin_dir_name: str, normalized_repo_url: str) -> dict: + try: + from astrbot.core.star.star_manager import PluginManager + except Exception as exc: + return build_result( + plugin=plugin_dir_name, + repo=normalized_repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="astrbot_import", + message=str(exc), + plugin_dir_name=plugin_dir_name, + details=traceback.format_exc(), + ) + + context = DummyContext() + manager = PluginManager(context, {}) + + try: + success, error = await manager.load(specified_dir_name=plugin_dir_name) + except Exception as exc: + return build_result( + plugin=plugin_dir_name, + repo=normalized_repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="load", + message=str(exc), + plugin_dir_name=plugin_dir_name, + details=traceback.format_exc(), + ) + + if success: + return build_result( + plugin=plugin_dir_name, + repo=normalized_repo_url, + normalized_repo_url=normalized_repo_url, + ok=True, + stage="load", + message="plugin loaded successfully", + plugin_dir_name=plugin_dir_name, + ) + + return build_result( + plugin=plugin_dir_name, + repo=normalized_repo_url, + normalized_repo_url=normalized_repo_url, + ok=False, + stage="load", + message=error or "plugin load failed", + plugin_dir_name=plugin_dir_name, + details=manager.failed_plugin_dict.get(plugin_dir_name), + ) + + +def run_worker(args: argparse.Namespace) -> int: + temp_root = Path(tempfile.mkdtemp(prefix="astrbot-plugin-worker-")) + try: + astrbot_root = temp_root / "astrbot-root" + plugin_store = astrbot_root / "data" / "plugins" + plugin_config = astrbot_root / "data" / "config" + plugin_store.mkdir(parents=True, exist_ok=True) + plugin_config.mkdir(parents=True, exist_ok=True) + + source_dir = Path(args.plugin_source_dir).resolve() + target_dir = plugin_store / args.plugin_dir_name + shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) + + os.environ["ASTRBOT_ROOT"] = str(astrbot_root) + os.environ.setdefault("TESTING", "true") + sys.path.insert(0, str(Path(args.astrbot_path).resolve())) + + result = asyncio.run( + run_worker_load_check(args.plugin_dir_name, args.normalized_repo_url) + ) + except Exception as exc: + result = build_result( + plugin=args.plugin_dir_name, + repo=args.normalized_repo_url, + normalized_repo_url=args.normalized_repo_url, + ok=False, + stage="worker", + message=str(exc), + plugin_dir_name=args.plugin_dir_name, + details=traceback.format_exc(), + ) + finally: + shutil.rmtree(temp_root, ignore_errors=True) + + print(json.dumps(result, ensure_ascii=False)) + return 0 if result["ok"] else 1 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Validate AstrBot plugins") + parser.add_argument("--plugins-json", default="plugins.json") + parser.add_argument("--plugin-name", action="append", dest="plugin_names") + parser.add_argument("--plugin-name-list") + parser.add_argument("--limit", type=int) + parser.add_argument("--astrbot-path") + parser.add_argument("--report-path", default="validation-report.json") + parser.add_argument("--work-dir") + parser.add_argument("--load-timeout", type=int, default=300) + parser.add_argument("--worker", action="store_true") + parser.add_argument("--plugin-source-dir") + parser.add_argument("--plugin-dir-name") + parser.add_argument("--normalized-repo-url") + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + if args.worker: + missing = [ + flag + for flag, value in ( + ("--astrbot-path", args.astrbot_path), + ("--plugin-source-dir", args.plugin_source_dir), + ("--plugin-dir-name", args.plugin_dir_name), + ("--normalized-repo-url", args.normalized_repo_url), + ) + if not value + ] + if missing: + parser.error(f"worker mode requires: {', '.join(missing)}") + return run_worker(args) + + if not args.astrbot_path: + parser.error("--astrbot-path is required") + + requested_names = combine_requested_names(args.plugin_names, args.plugin_name_list) + plugins = load_plugins_index(Path(args.plugins_json)) + selected = select_plugins( + plugins=plugins, + requested_names=requested_names or None, + limit=args.limit, + ) + + temp_dir = None + work_dir = Path(args.work_dir) if args.work_dir else None + if work_dir is None: + temp_dir = tempfile.TemporaryDirectory(prefix="astrbot-plugin-validate-") + work_dir = Path(temp_dir.name) + work_dir.mkdir(parents=True, exist_ok=True) + + try: + results = [ + validate_plugin( + plugin=plugin, + plugin_data=plugin_data, + astrbot_path=Path(args.astrbot_path).resolve(), + script_path=Path(__file__).resolve(), + work_dir=work_dir, + load_timeout=args.load_timeout, + ) + for plugin, plugin_data in selected + ] + finally: + if temp_dir is not None: + temp_dir.cleanup() + + report = build_report(results) + report_path = Path(args.report_path) + report_path.write_text( + json.dumps(report, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + + print( + json.dumps( + { + "report_path": str(report_path), + "summary": report["summary"], + }, + ensure_ascii=False, + ) + ) + return 0 if report["summary"]["failed"] == 0 else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_validate_plugins.py b/tests/test_validate_plugins.py new file mode 100644 index 00000000..a561767f --- /dev/null +++ b/tests/test_validate_plugins.py @@ -0,0 +1,143 @@ +import importlib.util +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +MODULE_PATH = ROOT / "scripts" / "validate_plugins" / "run.py" + + +def load_validator_module(): + if not MODULE_PATH.exists(): + raise AssertionError(f"validator script missing: {MODULE_PATH}") + + spec = importlib.util.spec_from_file_location("validate_plugins_run", MODULE_PATH) + if spec is None or spec.loader is None: + raise AssertionError("unable to load validator module spec") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +class NormalizeRepoUrlTests(unittest.TestCase): + def test_strips_git_suffix_trailing_slash_and_query(self): + module = load_validator_module() + + self.assertEqual( + module.normalize_repo_url( + "https://github.com/example/demo-plugin.git/?tab=readme-ov-file" + ), + "https://github.com/example/demo-plugin", + ) + + def test_rejects_non_github_urls(self): + module = load_validator_module() + + with self.assertRaises(ValueError): + module.normalize_repo_url("https://gitlab.com/example/demo-plugin") + + +class SelectPluginsTests(unittest.TestCase): + def test_prefers_explicit_names_in_requested_order(self): + module = load_validator_module() + plugins = { + "plugin-a": {"repo": "https://github.com/example/plugin-a"}, + "plugin-b": {"repo": "https://github.com/example/plugin-b"}, + "plugin-c": {"repo": "https://github.com/example/plugin-c"}, + } + + selected = module.select_plugins( + plugins=plugins, + requested_names=["plugin-c", "plugin-a"], + limit=None, + ) + + self.assertEqual([item[0] for item in selected], ["plugin-c", "plugin-a"]) + + +class MetadataValidationTests(unittest.TestCase): + def test_reports_missing_required_metadata_fields(self): + module = load_validator_module() + + with tempfile.TemporaryDirectory() as tmp_dir: + plugin_dir = Path(tmp_dir) + (plugin_dir / "metadata.yaml").write_text( + "name: demo_plugin\nauthor: AstrBot Team\n", + encoding="utf-8", + ) + (plugin_dir / "main.py").write_text("print('hello')\n", encoding="utf-8") + + result = module.precheck_plugin_directory(plugin_dir) + + self.assertFalse(result["ok"]) + self.assertEqual(result["stage"], "metadata") + self.assertIn("desc", result["message"]) + self.assertIn("version", result["message"]) + + +class WorkerCommandTests(unittest.TestCase): + def test_build_worker_command_contains_required_arguments(self): + module = load_validator_module() + + command = module.build_worker_command( + script_path=Path("/tmp/run.py"), + astrbot_path=Path("/tmp/astrbot"), + plugin_source_dir=Path("/tmp/plugin-src"), + plugin_dir_name="demo_plugin", + normalized_repo_url="https://github.com/example/demo-plugin", + ) + + self.assertEqual(command[0], sys.executable) + self.assertEqual(command[1], "/tmp/run.py") + self.assertIn("--worker", command) + self.assertIn("--astrbot-path", command) + self.assertIn("--plugin-source-dir", command) + self.assertIn("--plugin-dir-name", command) + self.assertIn("--normalized-repo-url", command) + + +class ReportBuilderTests(unittest.TestCase): + def test_build_report_counts_passed_and_failed_results(self): + module = load_validator_module() + + report = module.build_report( + [ + {"plugin": "plugin-a", "ok": True, "stage": "load", "message": "ok"}, + {"plugin": "plugin-b", "ok": False, "stage": "metadata", "message": "missing desc"}, + ] + ) + + self.assertEqual(report["summary"]["total"], 2) + self.assertEqual(report["summary"]["passed"], 1) + self.assertEqual(report["summary"]["failed"], 1) + self.assertEqual(report["results"][1]["plugin"], "plugin-b") + + +class WorkerOutputParsingTests(unittest.TestCase): + def test_parse_worker_output_keeps_market_plugin_key(self): + module = load_validator_module() + completed = subprocess.CompletedProcess( + args=["python3", "run.py"], + returncode=1, + stdout='{"plugin": "demo_plugin", "ok": false, "stage": "load", "message": "boom"}', + stderr="", + ) + + result = module.parse_worker_output( + plugin="market-plugin-key", + repo="https://github.com/example/demo-plugin?tab=readme-ov-file", + normalized_repo_url="https://github.com/example/demo-plugin", + completed=completed, + plugin_dir_name="demo_plugin", + ) + + self.assertEqual(result["plugin"], "market-plugin-key") + self.assertEqual(result["plugin_dir_name"], "demo_plugin") + + +if __name__ == "__main__": + unittest.main()