From f932870b27e141c059c36b8e1d934cdf35f01c0d Mon Sep 17 00:00:00 2001 From: Varun Ursekar Date: Wed, 24 Jun 2026 11:09:45 -0700 Subject: [PATCH] Harbor eval sidecar: verifier, token auth, and the HTTP API (Mode A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The evaluation engine, run in a sidecar container, with the trust boundary that makes an optimization run leaderboard-gradeable: - `EvaluationSidecar` (server.py): agent-facing handlers — commit transfer from the untrusted agent repo (git fetch, hooks off, object copy) and tier-gated write-routing of results across the agent-readable and admin volumes. - `Verifier` (verifier.py): commit selection (submit | auto_best) + hidden-split scoring. - Per-trial admin token (auth.py), written root:600 so the optimizer (de-privileged) cannot read it; only the verifier (root, shared mode) can. - FastAPI surface (app.py): /eval, /submit, /status for the agent (metered, redacted); /finalize for the verifier (token-gated). `vero harbor serve` (serve.py) assembles the engine + sidecar + verifier from a ServeConfig and runs it under uvicorn. - `vero harbor` CLI clients (cli.py): serve | eval | submit | status | finalize (build/run land with the compiler). HarborConfig + the Mode-B dataset partition helpers (config.py, dataset.py) are included so the harbor package imports cleanly. Co-Authored-By: Claude Opus 4.8 (1M context) --- vero/src/vero/harbor/__init__.py | 19 +++ vero/src/vero/harbor/app.py | 91 ++++++++++++++ vero/src/vero/harbor/auth.py | 39 ++++++ vero/src/vero/harbor/cli.py | 127 ++++++++++++++++++++ vero/src/vero/harbor/config.py | 35 ++++++ vero/src/vero/harbor/dataset.py | 80 +++++++++++++ vero/src/vero/harbor/protocol.py | 106 ++++++++++++++++ vero/src/vero/harbor/serve.py | 170 ++++++++++++++++++++++++++ vero/src/vero/harbor/server.py | 186 +++++++++++++++++++++++++++++ vero/src/vero/harbor/verifier.py | 114 ++++++++++++++++++ vero/tests/test_harbor_app.py | 85 +++++++++++++ vero/tests/test_harbor_cli.py | 82 +++++++++++++ vero/tests/test_harbor_dataset.py | 49 ++++++++ vero/tests/test_harbor_protocol.py | 86 +++++++++++++ vero/tests/test_harbor_serve.py | 143 ++++++++++++++++++++++ vero/tests/test_harbor_server.py | 124 +++++++++++++++++++ vero/tests/test_harbor_transfer.py | 85 +++++++++++++ vero/tests/test_harbor_verifier.py | 112 +++++++++++++++++ 18 files changed, 1733 insertions(+) create mode 100644 vero/src/vero/harbor/__init__.py create mode 100644 vero/src/vero/harbor/app.py create mode 100644 vero/src/vero/harbor/auth.py create mode 100644 vero/src/vero/harbor/cli.py create mode 100644 vero/src/vero/harbor/config.py create mode 100644 vero/src/vero/harbor/dataset.py create mode 100644 vero/src/vero/harbor/protocol.py create mode 100644 vero/src/vero/harbor/serve.py create mode 100644 vero/src/vero/harbor/server.py create mode 100644 vero/src/vero/harbor/verifier.py create mode 100644 vero/tests/test_harbor_app.py create mode 100644 vero/tests/test_harbor_cli.py create mode 100644 vero/tests/test_harbor_dataset.py create mode 100644 vero/tests/test_harbor_protocol.py create mode 100644 vero/tests/test_harbor_serve.py create mode 100644 vero/tests/test_harbor_server.py create mode 100644 vero/tests/test_harbor_transfer.py create mode 100644 vero/tests/test_harbor_verifier.py diff --git a/vero/src/vero/harbor/__init__.py b/vero/src/vero/harbor/__init__.py new file mode 100644 index 0000000..43df555 --- /dev/null +++ b/vero/src/vero/harbor/__init__.py @@ -0,0 +1,19 @@ +"""Harbor integration: the sidecar-specific frontend over the shared +EvaluationEngine, plus Mode B (Harbor-delegated eval). The `harbor` SDK is an +optional extra, imported lazily (only registry enumeration / nested runs need it — +config, dataset compilation, and the sidecar handlers do not). +""" + +from vero.harbor.config import HarborConfig +from vero.harbor.dataset import ( + build_harbor_dataset, + enumerate_local_task_names, + validate_partition, +) + +__all__ = [ + "HarborConfig", + "build_harbor_dataset", + "enumerate_local_task_names", + "validate_partition", +] diff --git a/vero/src/vero/harbor/app.py b/vero/src/vero/harbor/app.py new file mode 100644 index 0000000..e98bcce --- /dev/null +++ b/vero/src/vero/harbor/app.py @@ -0,0 +1,91 @@ +"""FastAPI app for the eval sidecar — the HTTP surface over the (transport-agnostic) +EvaluationSidecar handlers + the admin `finalize` over the Verifier. + +Two roles over one app: agent (`/eval`, `/submit`, `/status`; unauthenticated, metered, +redacted) and admin (`/finalize`; bearer-token gated). `vero harbor serve` runs +this under uvicorn in the eval-sidecar container. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from fastapi import FastAPI, Header, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from vero.evaluation.engine import EvalRequest +from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError +from vero.harbor.auth import check_admin +from vero.harbor.server import SubmitDisabledError +from vero.harbor.verifier import NoCandidateError + +if TYPE_CHECKING: + from vero.harbor.server import EvaluationSidecar + from vero.harbor.verifier import Verifier + + +class EvalBody(BaseModel): + dataset_id: str + split: str + commit: str | None = None + sample_ids: list[int] | None = None + num_samples: int | None = None + + +class SubmitBody(BaseModel): + commit: str | None = None + + +def create_app( + *, + sidecar: EvaluationSidecar, + verifier: Verifier, + admin_token: str, +) -> FastAPI: + app = FastAPI(title="vero eval sidecar") + + # Known errors -> agent-facing status codes. + app.add_exception_handler( + ExperimentBudgetExceeded, + lambda r, e: JSONResponse(status_code=429, content={"error": str(e)}), + ) + app.add_exception_handler( + InvalidSplitError, + lambda r, e: JSONResponse(status_code=400, content={"error": str(e)}), + ) + app.add_exception_handler( + SubmitDisabledError, + lambda r, e: JSONResponse(status_code=409, content={"error": str(e)}), + ) + app.add_exception_handler( + NoCandidateError, + lambda r, e: JSONResponse(status_code=409, content={"error": str(e)}), + ) + + @app.get("/health") + async def health(): + return {"ok": True} + + # --- agent endpoints (unauthenticated; metered + redacted) --- + @app.post("/eval") + async def eval_(body: EvalBody): + summary = await sidecar.evaluate(EvalRequest(**body.model_dump()), admin=False) + return summary.to_dict() + + @app.post("/submit") + async def submit(body: SubmitBody): + return await sidecar.submit(commit=body.commit) + + @app.get("/status") + async def status(): + return sidecar.status().to_dict() + + # --- admin endpoint (bearer-token gated) --- + @app.post("/finalize") + async def finalize(authorization: str | None = Header(default=None)): + if not check_admin(authorization, admin_token): + raise HTTPException(status_code=403, detail="admin token required") + return await verifier.finalize() + + return app diff --git a/vero/src/vero/harbor/auth.py b/vero/src/vero/harbor/auth.py new file mode 100644 index 0000000..8fafad9 --- /dev/null +++ b/vero/src/vero/harbor/auth.py @@ -0,0 +1,39 @@ +"""Admin-token auth for the eval sidecar. + +The token gates the admin `finalize` endpoint. It is generated per trial by the +sidecar and written `root:600` on a volume mounted into `main`, so the verifier +(root, shared mode) can read it but the optimizer (`agent.user`) cannot. The +optimizer therefore can only reach the agent endpoints, never `finalize`. +""" + +from __future__ import annotations + +import secrets +from pathlib import Path + +_BEARER = "Bearer " + + +def generate_token() -> str: + return secrets.token_urlsafe(32) + + +def write_admin_token(path: Path | str, token: str, *, mode: int = 0o600) -> Path: + """Write the token to ``path`` with restrictive perms (caller runs as root so the + file is root-owned and unreadable by ``agent.user``).""" + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(token) + p.chmod(mode) + return p + + +def read_admin_token(path: Path | str) -> str: + return Path(path).read_text().strip() + + +def check_admin(authorization: str | None, expected_token: str) -> bool: + """Constant-time check of an ``Authorization: Bearer `` header.""" + if not authorization or not authorization.startswith(_BEARER): + return False + return secrets.compare_digest(authorization[len(_BEARER):], expected_token) diff --git a/vero/src/vero/harbor/cli.py b/vero/src/vero/harbor/cli.py new file mode 100644 index 0000000..e68ce4c --- /dev/null +++ b/vero/src/vero/harbor/cli.py @@ -0,0 +1,127 @@ +"""`vero harbor` CLI. + +Thin clients the optimizer and verifier use inside the compiled task: + - agent (in `main`): eval / submit / status -> POST/GET the sidecar over VERO_EVAL_URL + - verifier (in `main`): finalize -> POST /finalize with the admin token, + write /logs/verifier/reward.json +`serve` (sidecar entry) and `build`/`run` (host-side compiler) are added with stage (c). +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import click + + +def _base_url() -> str: + url = os.environ.get("VERO_EVAL_URL") + if not url: + raise click.ClickException("VERO_EVAL_URL is not set (the eval sidecar URL).") + return url.rstrip("/") + + +def _request(method: str, path: str, *, payload: dict | None = None, headers: dict | None = None): + import httpx + + resp = httpx.request( + method, f"{_base_url()}{path}", json=payload, headers=headers or {}, timeout=None + ) + if resp.status_code >= 400: + raise click.ClickException(f"{method} {path} -> {resp.status_code}: {resp.text}") + return resp.json() + + +@click.group() +def harbor() -> None: + """Vero ⇄ Harbor: optimization-as-a-Harbor-task commands.""" + + +@harbor.command("serve") +@click.option("--config", "config_path", required=True, help="Path to the ServeConfig JSON.") +def serve_cmd(config_path): + """Eval-sidecar entrypoint: assemble the engine/sidecar/verifier and serve (uvicorn).""" + from vero.harbor.serve import serve + + serve(config_path) + + +@harbor.command("build") +@click.option("-c", "--config", "config_path", required=True, help="Path to build.yaml.") +@click.option("-o", "--out", required=True, help="Output task directory.") +def build_cmd(config_path, out): + """Compile a build.yaml into a runnable Harbor optimization task directory.""" + from vero.harbor.build import BuildConfig, compile_task + + task_dir = compile_task(BuildConfig.from_file(config_path), out) + click.echo(f"Compiled task -> {task_dir}") + + +@harbor.command("run", context_settings={"ignore_unknown_options": True}) +@click.option("-c", "--config", "config_path", required=True, help="Path to build.yaml.") +@click.option("-a", "--agent", required=True, help="Optimizer agent (passed to harbor run).") +@click.option("-m", "--model", default=None, help="Model for the optimizer agent.") +@click.option("-e", "--environment", "provider", default="docker", show_default=True) +@click.argument("extra", nargs=-1, type=click.UNPROCESSED) +def run_cmd(config_path, agent, model, provider, extra): + """Build to a temp dir, then `harbor run` it (build + run convenience).""" + import subprocess + import tempfile + + from vero.harbor.build import BuildConfig, compile_task + + task_dir = compile_task(BuildConfig.from_file(config_path), Path(tempfile.mkdtemp()) / "task") + cmd = ["uvx", "harbor", "run", "-p", str(task_dir), "-a", agent, "-e", provider] + if model: + cmd += ["-m", model] + cmd += list(extra) + click.echo(f"$ {' '.join(cmd)}") + raise SystemExit(subprocess.call(cmd)) + + +@harbor.command("eval") +@click.option("--dataset-id", required=True) +@click.option("--split", required=True) +@click.option("--commit", default=None, help="Defaults to the agent repo HEAD.") +@click.option("--num-samples", type=int, default=None) +@click.option("--sample-ids", default=None, help="Comma-separated sample ids.") +def eval_cmd(dataset_id, split, commit, num_samples, sample_ids): + """Spend one evaluation of your commit on a split (agent).""" + payload: dict = {"dataset_id": dataset_id, "split": split} + if commit: + payload["commit"] = commit + if num_samples is not None: + payload["num_samples"] = num_samples + if sample_ids: + payload["sample_ids"] = [int(x) for x in sample_ids.split(",")] + click.echo(json.dumps(_request("POST", "/eval", payload=payload), indent=2)) + + +@harbor.command("submit") +@click.option("--commit", default=None, help="Defaults to the agent repo HEAD.") +def submit_cmd(commit): + """Nominate a commit and end the optimization run (agent; if enabled).""" + click.echo(json.dumps(_request("POST", "/submit", payload={"commit": commit}), indent=2)) + + +@harbor.command("status") +def status_cmd(): + """Show remaining budget, evaluable splits, and whether submit is enabled (agent).""" + click.echo(json.dumps(_request("GET", "/status"), indent=2)) + + +@harbor.command("finalize") +@click.option("--token-file", required=True, help="Path to the admin token (root:600).") +@click.option("--output", default="/logs/verifier/reward.json", show_default=True) +def finalize_cmd(token_file, output): + """Verifier: select the best/submitted commit, score on the test split, write reward.json (admin).""" + from vero.harbor.auth import read_admin_token + + token = read_admin_token(token_file) + reward = _request("POST", "/finalize", headers={"Authorization": f"Bearer {token}"}) + out = Path(output) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(reward)) + click.echo(json.dumps(reward, indent=2)) diff --git a/vero/src/vero/harbor/config.py b/vero/src/vero/harbor/config.py new file mode 100644 index 0000000..da5f56e --- /dev/null +++ b/vero/src/vero/harbor/config.py @@ -0,0 +1,35 @@ +"""HarborConfig — the Mode-B configuration. + +User-facing config that turns "evaluate my agent on a set of Harbor tasks" into a +`harbor run` invocation. A typed projection of the user-controllable `harbor run` +flags; the per-eval-derived flags (task selection, jobs dir, source/agent resolution) +are filled in by the runner, not here. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class HarborConfig: + task_source: str # registry ref "org/name[@ver]" OR a local path to a task dir/dataset + agent_import_path: str # module path to the candidate agent, e.g. "pkg.mod:Class" + model: str | None = None + environment: str = "modal" # cloud provider (docker allowed for local testing) + n_attempts: int = 1 + max_retries: int = 2 + reward_key: str | None = None # primary reward; default pass -> reward -> mean + extra_args: list[str] = field(default_factory=list) # passthrough harbor run flags + + @property + def is_registry(self) -> bool: + """Local if the source resolves to an existing path; otherwise a registry ref.""" + return not Path(self.task_source).expanduser().exists() + + def source_args(self) -> list[str]: + """`harbor run` source selector: `-d ` (registry) or `-p ` (local).""" + if self.is_registry: + return ["-d", self.task_source] + return ["-p", str(Path(self.task_source).expanduser())] diff --git a/vero/src/vero/harbor/dataset.py b/vero/src/vero/harbor/dataset.py new file mode 100644 index 0000000..c9ececa --- /dev/null +++ b/vero/src/vero/harbor/dataset.py @@ -0,0 +1,80 @@ +"""Build the vero dataset (task-name references + split partition) for Mode B. + +A Mode-B vero dataset has no labels — each "sample" is a Harbor task name. A local +task's name is its subdirectory name (the dir containing ``task.toml``), matching what +``harbor run -i/--include-task-name`` filters on; registry task names come from the +registry's task configs. + +The split partition is a ``dict[str, list[str]]`` (e.g. ``{"train": [...], "test": [...]}``) +supplied by the benchmark author; this module compiles + validates it into a DatasetDict. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from datasets import DatasetDict + + +def build_harbor_dataset(partition: dict[str, list[str]]) -> DatasetDict: + """Compile a ``{split: [task_names]}`` partition into a vero DatasetDict. + + Each split is a single-column (`task_name`) Dataset — the label-free sample + references Mode B evaluates. + """ + from datasets import Dataset, DatasetDict + + if not partition: + raise ValueError("Harbor dataset partition is empty.") + return DatasetDict( + {split: Dataset.from_dict({"task_name": list(names)}) for split, names in partition.items()} + ) + + +def enumerate_local_task_names(task_source: str | Path) -> list[str]: + """Task names available in a local Harbor task source. + + If the path is itself a task dir (contains ``task.toml``), returns ``[dir_name]``; + otherwise returns the names of immediate subdirectories that contain ``task.toml``. + """ + path = Path(task_source).expanduser() + if (path / "task.toml").exists(): + return [path.name] + if not path.is_dir(): + raise ValueError(f"Local task source is not a directory: {path}") + return sorted( + d.name for d in path.iterdir() if d.is_dir() and (d / "task.toml").exists() + ) + + +async def enumerate_registry_task_names( + ref: str, *, registry_url: str | None = None +) -> list[str]: + """Task names in a registry dataset (``org/name[@version]``). + + Lazy-imports the ``harbor`` SDK (the ``harbor`` extra) — registry resolution is a + build-time concern, not a sidecar-runtime one. Integration-verified. + """ + from harbor.models.job.config import RegistryDatasetConfig + from harbor.models.registry import RemoteRegistryInfo + + name, _, version = ref.partition("@") + config = RegistryDatasetConfig( + registry=RemoteRegistryInfo(url=registry_url) if registry_url else None, + name=name, + version=version or None, + ) + return sorted(tc.path.name for tc in await config.get_task_configs()) + + +def validate_partition(partition: dict[str, list[str]], available: list[str]) -> None: + """Raise if the partition references task names not in ``available``.""" + avail = set(available) + referenced = {name for names in partition.values() for name in names} + unknown = referenced - avail + if unknown: + raise ValueError( + f"Partition references task names not found in the source: {sorted(unknown)}" + ) diff --git a/vero/src/vero/harbor/protocol.py b/vero/src/vero/harbor/protocol.py new file mode 100644 index 0000000..eeb6a4e --- /dev/null +++ b/vero/src/vero/harbor/protocol.py @@ -0,0 +1,106 @@ +"""Wire types for the eval sidecar's HTTP frontend, and the redaction that +projects a full Experiment down to what the agent may see. + +`EvalRequest` (the request) lives in `vero.evaluation.engine` — it is shared with +the in-process tool. The *response* types here are sidecar-specific: they are +aggregate-safe by construction (never per-sample), because per-sample detail is +delivered as files on the agent-readable volume, gated by split tier. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field + +from vero.core.budget import SplitBudget +from vero.core.dataset.base import SplitAccess, SplitAccessLevel +from vero.core.db.database import Experiment + + +@dataclass +class EvalSummary: + """Aggregate-safe response to an agent evaluate call. + + Carries no per-sample data. Per-sample detail (for visible splits) and + summary stats (for partial splits) are written to the agent-readable volume + at `result_path`; nothing is written there for no_access splits. + """ + + commit: str + split: str + dataset_id: str + n_samples: int + mean_score: float | None + result_path: str | None # where on the agent volume to read detail (None if nothing written) + budget_remaining: SplitBudget | None = None + + def to_dict(self) -> dict: + d = asdict(self) + if self.budget_remaining is not None: + d["budget_remaining"] = asdict(self.budget_remaining) + return d + + +@dataclass +class StatusSummary: + """Response to a status call. `submit_enabled` (not the verifier-internal + selection strategy) is what the agent needs to know.""" + + submit_enabled: bool + # per (split, dataset_id): tier + whether the agent may evaluate it + remaining budget + splits: list[dict] = field(default_factory=list) + + def to_dict(self) -> dict: + return asdict(self) + + +def tier_for_split(split: str, split_accesses: list[SplitAccess]) -> SplitAccessLevel: + """Resolve a split's visibility tier (default: viewable when unlisted).""" + for sa in split_accesses: + if sa.split == split: + return sa.access + return SplitAccessLevel.viewable + + +def summarize_experiment( + experiment: Experiment, + *, + result_path: str | None, + budget_remaining: SplitBudget | None = None, +) -> EvalSummary: + """Project a full Experiment to an aggregate-safe EvalSummary.""" + return EvalSummary( + commit=experiment.run.candidate.commit, + split=experiment.run.dataset_subset.split, + dataset_id=experiment.run.dataset_subset.dataset_id, + n_samples=len(experiment.result.sample_results), + mean_score=experiment.result.score(), + result_path=result_path, + budget_remaining=budget_remaining, + ) + + +def build_status( + *, + submit_enabled: bool, + budget: dict[tuple[str, str], SplitBudget], + split_accesses: list[SplitAccess], +) -> StatusSummary: + """Build the agent-facing status from the budget ledger + split tiers. + + Only budgeted (split, dataset_id) pairs are listed — those are exactly what + the agent may evaluate. no_access splits are not in the agent ledger. + """ + splits = [] + for (split, dataset_id), b in budget.items(): + tier = tier_for_split(split, split_accesses) + splits.append( + { + "split": split, + "dataset_id": dataset_id, + "tier": str(tier), + "agent_evaluable": tier != SplitAccessLevel.no_access, + "remaining_sample_budget": b.remaining_sample_budget, + "remaining_run_budget": b.remaining_run_budget, + } + ) + return StatusSummary(submit_enabled=submit_enabled, splits=splits) diff --git a/vero/src/vero/harbor/serve.py b/vero/src/vero/harbor/serve.py new file mode 100644 index 0000000..ec2e98f --- /dev/null +++ b/vero/src/vero/harbor/serve.py @@ -0,0 +1,170 @@ +"""`vero harbor serve` — the eval-sidecar entrypoint. + +Assembles the EvaluationEngine + EvaluationSidecar + Verifier from a ServeConfig +(written by the compiler, baked into the sidecar image), generates the per-trial admin +token, and serves the FastAPI app under uvicorn. ServeConfig is the compiler↔serve +contract. +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +from pydantic import BaseModel, Field + +from vero.core.budget import BudgetLedger, SplitBudget +from vero.core.dataset.base import SplitAccess, SplitAccessLevel +from vero.core.db.database import ExperimentDatabase +from vero.core.evaluation import BaseEvaluationParameters +from vero.core.sessions import get_vero_home_dir +from vero.evaluation.engine import EvaluationEngine +from vero.evaluation.evaluator import Evaluator +from vero.harbor.app import create_app +from vero.harbor.auth import generate_token, write_admin_token +from vero.harbor.server import EvaluationSidecar +from vero.harbor.verifier import VerificationTarget, Verifier +from vero.workspace.git import GitWorkspace + +logger = logging.getLogger(__name__) + + +class _SplitAccessCfg(BaseModel): + split: str + access: str # "viewable" | "non_viewable" | "no_access" + + +class _TargetCfg(BaseModel): + task: str | None = None + dataset_id: str + split: str + reward_key: str = "reward" + sample_ids: list[int] | None = None + + +class ServeConfig(BaseModel): + """Everything the sidecar needs to assemble itself. Baked by the compiler.""" + + repo_path: str # sidecar's own repo (baseline target) = the engine workspace + agent_repo_path: str # mounted agent workspace (commit-transfer source) + session_id: str + dataset_id: str # already registered in the sidecar's VERO_HOME + split_accesses: list[_SplitAccessCfg] + budgets: list[dict] # SplitBudget kwargs + + # Mode A + task: str | None = None + task_project: str | None = None + task_module: str | None = None + # Mode B + harbor: dict | None = None # HarborConfig kwargs + + # selection / reward + reward_mode: str = "auto_best" + selection_split: str = "validation" + targets: list[_TargetCfg] = Field(default_factory=list) + base_commit: str | None = None + submit_enabled: bool = False + + # volumes / token + agent_volume: str + admin_volume: str + admin_token_path: str + + # eval params + timeout: int = 600 + sample_timeout: int = 180 + max_concurrency: int = 20 + use_copy: bool = True # isolate each eval in a temp copy (clean tree, concurrency-safe) + + host: str = "0.0.0.0" + port: int = 8000 + + @classmethod + def from_file(cls, path: Path | str) -> ServeConfig: + return cls.model_validate_json(Path(path).read_text()) + + +async def build_components(config: ServeConfig) -> tuple[EvaluationSidecar, Verifier, str]: + """Assemble the sidecar + verifier (sharing one engine) and the admin token.""" + vero_home = get_vero_home_dir() + workspace = await GitWorkspace.create(config.repo_path) + + budget = BudgetLedger( + [SplitBudget(**b) for b in config.budgets], + persist_path=Path(config.admin_volume) / "ledger.json", + ) + + eval_strategy = None + if config.harbor is not None: + from vero.harbor.runner import HarborRunner + from vero.harbor.config import HarborConfig + + eval_strategy = HarborRunner(HarborConfig(**config.harbor)) + + evaluator = Evaluator( + workspace, + config.session_id, + vero_home=vero_home, + use_copy=config.use_copy, + task_project=Path(config.task_project) if config.task_project else None, + task_module=config.task_module, + eval_strategy=eval_strategy, + ) + + db = ExperimentDatabase(id=config.session_id) # shared by engine (writes) + verifier (reads) + engine = EvaluationEngine( + evaluator=evaluator, + budget=budget, + default_task=config.task, + db=db, + run_constraints=BaseEvaluationParameters( + timeout=config.timeout, + sample_timeout=config.sample_timeout, + max_concurrency=config.max_concurrency, + ), + session_id=config.session_id, + vero_home=vero_home, + ) + + split_accesses = [ + SplitAccess(split=s.split, access=SplitAccessLevel(s.access)) + for s in config.split_accesses + ] + sidecar = EvaluationSidecar( + engine=engine, + split_accesses=split_accesses, + agent_repo_path=Path(config.agent_repo_path), + agent_volume=Path(config.agent_volume), + admin_volume=Path(config.admin_volume), + submit_enabled=config.submit_enabled, + ) + verifier = Verifier( + engine=engine, + admin_volume=Path(config.admin_volume), + reward_mode=config.reward_mode, # type: ignore[arg-type] + targets=[VerificationTarget(**t.model_dump()) for t in config.targets], + selection_split=config.selection_split, + base_commit=config.base_commit, + ) + + token = generate_token() + write_admin_token(config.admin_token_path, token) + return sidecar, verifier, token + + +async def build_app(config: ServeConfig): + sidecar, verifier, token = await build_components(config) + return create_app(sidecar=sidecar, verifier=verifier, admin_token=token) + + +def serve(config_path: Path | str) -> None: + """Sidecar entrypoint: build the app and run it under uvicorn.""" + import asyncio + + import uvicorn + + config = ServeConfig.from_file(config_path) + app = asyncio.run(build_app(config)) + logger.info(f"Serving eval sidecar on {config.host}:{config.port}") + uvicorn.run(app, host=config.host, port=config.port) diff --git a/vero/src/vero/harbor/server.py b/vero/src/vero/harbor/server.py new file mode 100644 index 0000000..a4c3f86 --- /dev/null +++ b/vero/src/vero/harbor/server.py @@ -0,0 +1,186 @@ +"""EvaluationSidecar: the privileged, transport-agnostic frontend over the +EvaluationEngine, plus the trust-boundary mechanics that only exist in the Harbor +sidecar — commit transfer from the mounted agent repo and tier-gated +write-routing of results across the two volumes. + +The HTTP binding (`serve()`) is a thin shell added when the `vero harbor serve` +CLI lands; these handlers are framework-agnostic and unit-testable on their own. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import replace +from pathlib import Path + +from vero.core.dataset.base import SplitAccess, SplitAccessLevel +from vero.core.db.database import Experiment +from vero.evaluation.engine import EvalRequest, EvaluationEngine +from vero.exceptions import InvalidSplitError +from vero.harbor.protocol import ( + EvalSummary, + StatusSummary, + build_status, + summarize_experiment, + tier_for_split, +) + +logger = logging.getLogger(__name__) + + +class CommitTransferError(RuntimeError): + """Raised when a commit cannot be fetched from the agent's mounted repo.""" + + +class SubmitDisabledError(RuntimeError): + """Raised when submit() is called but the task does not use submit selection.""" + + +class EvaluationSidecar: + """Agent-facing handlers over the EvaluationEngine. + + Wraps the engine with: commit transfer (mounted agent repo -> sidecar repo), + result write-routing by split tier, and aggregate-safe responses. The engine + meters agent calls (admin calls bypass). + """ + + def __init__( + self, + *, + engine: EvaluationEngine, + split_accesses: list[SplitAccess], + agent_repo_path: Path, + agent_volume: Path, + admin_volume: Path, + submit_enabled: bool = False, + ): + self.engine = engine + self.split_accesses = split_accesses + self.agent_repo_path = Path(agent_repo_path) + self.agent_volume = Path(agent_volume) + self.admin_volume = Path(admin_volume) + self.submit_enabled = submit_enabled + + # ------------------------------------------------------------------ + # Handlers (the HTTP layer resolves `admin` from auth and calls these) + # ------------------------------------------------------------------ + + async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> EvalSummary: + sha = await self._transfer_commit(req.commit) + exp = await self.engine.evaluate(replace(req, commit=sha), admin=admin) + result_path = self._route_results(exp, admin=admin) + budget_remaining = None + if not admin: + try: + budget_remaining = self.engine.budget.get(req.dataset_id, req.split) + except InvalidSplitError: + pass + return summarize_experiment( + exp, result_path=result_path, budget_remaining=budget_remaining + ) + + async def submit(self, commit: str | None = None) -> dict: + """Record the agent's nominated commit; terminal. No score returned.""" + if not self.submit_enabled: + raise SubmitDisabledError( + "This task does not use submit-based selection; submit is disabled." + ) + sha = await self._transfer_commit(commit) + self.admin_volume.mkdir(parents=True, exist_ok=True) + (self.admin_volume / "submission.json").write_text( + json.dumps({"commit": sha}, indent=2) + ) + return {"submitted_commit": sha} + + def status(self) -> StatusSummary: + return build_status( + submit_enabled=self.submit_enabled, + budget=self.engine.budget.status(), + split_accesses=self.split_accesses, + ) + + # ------------------------------------------------------------------ + # Trust-boundary mechanics + # ------------------------------------------------------------------ + + async def _transfer_commit(self, ref: str | None) -> str: + """Fetch ``ref`` (default agent HEAD) from the mounted agent repo into the + sidecar's own repo and return its resolved sha. + + The agent repo is untrusted: hooks are disabled and ``file://`` forces an + object copy (no hardlink/alternates) so the fetched commit is fully owned + by the sidecar repo and tamper-evident. + """ + workspace = self.engine.evaluator.workspace + root = workspace.root + target = ref or "HEAD" + fetch = await workspace.sandbox.run( + [ + "git", + "-c", + "core.hooksPath=/dev/null", + "-c", + "protocol.file.allow=always", + "-C", + root, + "fetch", + "--no-tags", + "--no-recurse-submodules", + f"file://{self.agent_repo_path}", + target, + ], + timeout=120, + ) + if fetch.returncode != 0: + raise CommitTransferError( + f"git fetch of {target!r} from agent repo failed: {fetch.stderr}" + ) + rev = await workspace.sandbox.run( + ["git", "-C", root, "rev-parse", "FETCH_HEAD"], timeout=30 + ) + if rev.returncode != 0: + raise CommitTransferError(f"rev-parse FETCH_HEAD failed: {rev.stderr}") + return rev.stdout.strip() + + def _route_results(self, experiment: Experiment, *, admin: bool) -> str | None: + """Write the agent-visible projection of an experiment by split tier. + + Full per-sample results always live admin-side (the session store). Here we + write only what the agent may see: + - visible: aggregate summary + full per-sample results + - non_viewable: aggregate summary only (no per-sample / no labels) + - no_access: nothing + Admin/verifier evals never write to the agent volume. + Returns the agent-volume path written, or None. + """ + if admin: + return None + split = experiment.run.dataset_subset.split + tier = tier_for_split(split, self.split_accesses) + if tier == SplitAccessLevel.no_access: + return None + + commit = experiment.run.candidate.commit + dest = self.agent_volume / "results" / f"{split}__{commit[:12]}" + dest.mkdir(parents=True, exist_ok=True) + + # Aggregate summary is label-safe for both visible and partial tiers. + (dest / "summary.json").write_text( + json.dumps( + { + "split": split, + "commit": commit, + "n_samples": len(experiment.result.sample_results), + "mean_score": experiment.result.score(), + "status": str(experiment.result.status), + }, + indent=2, + ) + ) + if tier == SplitAccessLevel.viewable: + for sample_id, sample_result in experiment.result.sample_results.items(): + (dest / f"{sample_id}.json").write_text( + sample_result.model_dump_json(indent=2) + ) + return str(dest) diff --git a/vero/src/vero/harbor/verifier.py b/vero/src/vero/harbor/verifier.py new file mode 100644 index 0000000..c641d8c --- /dev/null +++ b/vero/src/vero/harbor/verifier.py @@ -0,0 +1,114 @@ +"""Verifier: admin-side commit selection + hidden-split scoring -> reward. + +Runs at trial end. In the shared-verifier deployment the eval sidecar is still +up, so the verifier (root, in the `main` container) reaches this logic through +the sidecar's token-gated ``finalize`` endpoint, sharing the engine's state +(repo, dataset, scoring, ledger, submission record). It selects the candidate +commit (submit: the agent's nominated commit | auto_best: the best commit on the +selection split, excluding the baseline) and scores it on a configured battery +of targets, emitting a multi-key reward dict that the wiring writes to Harbor's +reward.json. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +from vero.core.constants import default_minimum_score +from vero.evaluation.engine import EvaluationEngine + +logger = logging.getLogger(__name__) + + +class NoCandidateError(RuntimeError): + """Raised when no commit can be selected (no submission / no experiments).""" + + +@dataclass +class VerificationTarget: + """One scoring target -> one named reward in reward.json.""" + + task: str | None # None in Mode B (the nested harbor strategy ignores the vero task) + dataset_id: str + split: str + reward_key: str + sample_ids: list[int] | None = None # None = full split + + +class Verifier: + def __init__( + self, + *, + engine: EvaluationEngine, + admin_volume: Path, + reward_mode: Literal["submit", "auto_best"], + targets: list[VerificationTarget], + selection_split: str = "validation", + base_commit: str | None = None, + ): + self.engine = engine + self.admin_volume = Path(admin_volume) + self.reward_mode = reward_mode + self.targets = targets + self.selection_split = selection_split + self.base_commit = base_commit + + async def finalize(self) -> dict[str, float]: + """Select the commit and score it on every target -> {reward_key: score}.""" + sha = self._select_commit() + logger.info(f"Verifier selected commit {sha} (mode={self.reward_mode})") + rewards: dict[str, float] = {} + for target in self.targets: + exp = await self.engine.evaluate_admin( + task=target.task, + dataset_id=target.dataset_id, + split=target.split, + commit=sha, + sample_ids=target.sample_ids, + ) + score = exp.result.score() + rewards[target.reward_key] = ( + float(score) if score is not None else default_minimum_score + ) + return rewards + + def _select_commit(self) -> str: + if self.reward_mode == "submit": + return self._submitted_commit() + return self._best_from_db() + + def _submitted_commit(self) -> str: + path = self.admin_volume / "submission.json" + if not path.exists(): + raise NoCandidateError( + "submit mode but no submission.json — the agent never submitted a commit." + ) + commit = json.loads(path.read_text()).get("commit") + if not commit: + raise NoCandidateError("submission.json has no commit.") + return commit + + def _best_from_db(self) -> str: + """Best candidate by recorded score on the selection split (excludes baseline).""" + if self.engine.db is None: + raise NoCandidateError("auto_best mode but no experiment database.") + df = self.engine.db.get_experiments_df(fill_score=default_minimum_score) + if df.empty or "dataset_subset_split" not in df.columns: + raise NoCandidateError("auto_best mode but no experiments recorded.") + + split_df = df[df["dataset_subset_split"] == self.selection_split] + if self.base_commit is not None: + split_df = split_df[split_df["candidate_commit"] != self.base_commit] + if len(split_df) == 0: + raise NoCandidateError( + f"auto_best mode but no candidate experiments on split " + f"'{self.selection_split}'." + ) + best = split_df.sort_values( + by=["mean_score", "candidate_created_at"], ascending=[False, False] + ).iloc[0] + return best["candidate_commit"] diff --git a/vero/tests/test_harbor_app.py b/vero/tests/test_harbor_app.py new file mode 100644 index 0000000..5b6a503 --- /dev/null +++ b/vero/tests/test_harbor_app.py @@ -0,0 +1,85 @@ +"""Tests for vero.harbor.app — FastAPI routes + agent/admin auth.""" + +from unittest.mock import AsyncMock, MagicMock + +from fastapi.testclient import TestClient + +from vero.exceptions import ExperimentBudgetExceeded +from vero.harbor.app import create_app +from vero.harbor.auth import check_admin, generate_token, read_admin_token, write_admin_token +from vero.harbor.protocol import EvalSummary, StatusSummary +from vero.harbor.server import SubmitDisabledError + +TOKEN = "secret-admin-token" + + +def _client(sidecar=None, verifier=None): + sidecar = sidecar or MagicMock() + verifier = verifier or MagicMock() + return TestClient(create_app(sidecar=sidecar, verifier=verifier, admin_token=TOKEN)) + + +class TestAuthHelpers: + def test_token_roundtrip_and_perms(self, tmp_path): + tok = generate_token() + p = write_admin_token(tmp_path / "t", tok) + assert read_admin_token(p) == tok + assert (p.stat().st_mode & 0o777) == 0o600 + + def test_check_admin(self): + assert check_admin(f"Bearer {TOKEN}", TOKEN) is True + assert check_admin("Bearer wrong", TOKEN) is False + assert check_admin(None, TOKEN) is False + assert check_admin(TOKEN, TOKEN) is False # missing "Bearer " + + +class TestAgentEndpoints: + def test_eval(self): + sidecar = MagicMock() + sidecar.evaluate = AsyncMock( + return_value=EvalSummary( + commit="c1", split="train", dataset_id="ds", n_samples=2, + mean_score=0.5, result_path="/r", budget_remaining=None, + ) + ) + r = _client(sidecar=sidecar).post( + "/eval", json={"dataset_id": "ds", "split": "train", "num_samples": 2} + ) + assert r.status_code == 200 + assert r.json()["mean_score"] == 0.5 + assert sidecar.evaluate.await_args.kwargs["admin"] is False + + def test_status(self): + sidecar = MagicMock() + sidecar.status = MagicMock( + return_value=StatusSummary(submit_enabled=True, splits=[{"split": "train"}]) + ) + r = _client(sidecar=sidecar).get("/status") + assert r.status_code == 200 and r.json()["submit_enabled"] is True + + def test_submit_disabled_maps_to_409(self): + sidecar = MagicMock() + sidecar.submit = AsyncMock(side_effect=SubmitDisabledError("disabled")) + r = _client(sidecar=sidecar).post("/submit", json={"commit": "c1"}) + assert r.status_code == 409 + + def test_budget_exceeded_maps_to_429(self): + sidecar = MagicMock() + sidecar.evaluate = AsyncMock(side_effect=ExperimentBudgetExceeded("no budget")) + r = _client(sidecar=sidecar).post("/eval", json={"dataset_id": "ds", "split": "train"}) + assert r.status_code == 429 + + +class TestAdminEndpoint: + def test_finalize_requires_token(self): + verifier = MagicMock() + verifier.finalize = AsyncMock(return_value={"reward": 1.0}) + client = _client(verifier=verifier) + + assert client.post("/finalize").status_code == 403 # no token + assert client.post("/finalize", headers={"Authorization": "Bearer wrong"}).status_code == 403 + verifier.finalize.assert_not_awaited() + + r = client.post("/finalize", headers={"Authorization": f"Bearer {TOKEN}"}) + assert r.status_code == 200 and r.json() == {"reward": 1.0} + verifier.finalize.assert_awaited_once() diff --git a/vero/tests/test_harbor_cli.py b/vero/tests/test_harbor_cli.py new file mode 100644 index 0000000..afab16c --- /dev/null +++ b/vero/tests/test_harbor_cli.py @@ -0,0 +1,82 @@ +"""Tests for vero.harbor.cli — the agent/verifier CLI clients (mocked httpx).""" + +import json + +from click.testing import CliRunner + +from vero.harbor.cli import harbor + + +class _Resp: + def __init__(self, status_code, data): + self.status_code = status_code + self._data = data + self.text = json.dumps(data) + + def json(self): + return self._data + + +def _patch_httpx(monkeypatch, resp, capture): + import httpx + + def fake_request(method, url, *, json=None, headers=None, timeout=None): + capture.update(method=method, url=url, json=json, headers=headers) + return resp + + monkeypatch.setattr(httpx, "request", fake_request) + + +def test_eval_posts_and_prints(monkeypatch): + monkeypatch.setenv("VERO_EVAL_URL", "http://sidecar:8000") + cap: dict = {} + _patch_httpx(monkeypatch, _Resp(200, {"mean_score": 0.5}), cap) + + result = CliRunner().invoke( + harbor, ["eval", "--dataset-id", "ds", "--split", "train", "--num-samples", "3"] + ) + assert result.exit_code == 0 + assert cap["method"] == "POST" and cap["url"].endswith("/eval") + assert cap["json"] == {"dataset_id": "ds", "split": "train", "num_samples": 3} + assert json.loads(result.output)["mean_score"] == 0.5 + + +def test_eval_error_status_raises(monkeypatch): + monkeypatch.setenv("VERO_EVAL_URL", "http://sidecar:8000") + _patch_httpx(monkeypatch, _Resp(429, {"error": "no budget"}), {}) + result = CliRunner().invoke(harbor, ["eval", "--dataset-id", "ds", "--split", "train"]) + assert result.exit_code != 0 + assert "429" in result.output + + +def test_eval_missing_url_errors(): + result = CliRunner(env={"VERO_EVAL_URL": ""}).invoke( + harbor, ["eval", "--dataset-id", "ds", "--split", "train"] + ) + assert result.exit_code != 0 + assert "VERO_EVAL_URL" in result.output + + +def test_status_get(monkeypatch): + monkeypatch.setenv("VERO_EVAL_URL", "http://sidecar:8000") + cap: dict = {} + _patch_httpx(monkeypatch, _Resp(200, {"submit_enabled": True}), cap) + result = CliRunner().invoke(harbor, ["status"]) + assert result.exit_code == 0 and cap["method"] == "GET" and cap["url"].endswith("/status") + + +def test_finalize_uses_token_and_writes_reward(monkeypatch, tmp_path): + monkeypatch.setenv("VERO_EVAL_URL", "http://sidecar:8000") + token_file = tmp_path / "tok" + token_file.write_text("T0KEN") + out = tmp_path / "reward.json" + cap: dict = {} + _patch_httpx(monkeypatch, _Resp(200, {"reward": 1.0}), cap) + + result = CliRunner().invoke( + harbor, ["finalize", "--token-file", str(token_file), "--output", str(out)] + ) + assert result.exit_code == 0 + assert cap["url"].endswith("/finalize") + assert cap["headers"]["Authorization"] == "Bearer T0KEN" + assert json.loads(out.read_text()) == {"reward": 1.0} diff --git a/vero/tests/test_harbor_dataset.py b/vero/tests/test_harbor_dataset.py new file mode 100644 index 0000000..82caca2 --- /dev/null +++ b/vero/tests/test_harbor_dataset.py @@ -0,0 +1,49 @@ +"""Tests for vero.harbor.dataset — partition compile + local task enumeration.""" + +import pytest + +from vero.harbor.dataset import ( + build_harbor_dataset, + enumerate_local_task_names, + validate_partition, +) + + +def _make_task_dir(root, name): + d = root / name + d.mkdir(parents=True) + (d / "task.toml").write_text("[task]\nname='x'\n") + return d + + +class TestBuildDataset: + def test_partition_to_datasetdict(self): + ds = build_harbor_dataset({"train": ["a", "b"], "test": ["c"]}) + assert set(ds.keys()) == {"train", "test"} + assert ds["train"]["task_name"] == ["a", "b"] + assert ds["test"]["task_name"] == ["c"] + + def test_empty_partition_raises(self): + with pytest.raises(ValueError): + build_harbor_dataset({}) + + +class TestEnumerateLocal: + def test_dataset_dir_of_tasks(self, tmp_path): + _make_task_dir(tmp_path, "task_b") + _make_task_dir(tmp_path, "task_a") + (tmp_path / "not_a_task").mkdir() # no task.toml -> excluded + assert enumerate_local_task_names(tmp_path) == ["task_a", "task_b"] + + def test_single_task_dir(self, tmp_path): + d = _make_task_dir(tmp_path, "solo") + assert enumerate_local_task_names(d) == ["solo"] + + +class TestValidatePartition: + def test_ok_when_subset(self): + validate_partition({"train": ["a"], "test": ["b"]}, ["a", "b", "c"]) + + def test_unknown_names_raise(self): + with pytest.raises(ValueError, match="not found"): + validate_partition({"test": ["a", "zzz"]}, ["a", "b"]) diff --git a/vero/tests/test_harbor_protocol.py b/vero/tests/test_harbor_protocol.py new file mode 100644 index 0000000..9a079e1 --- /dev/null +++ b/vero/tests/test_harbor_protocol.py @@ -0,0 +1,86 @@ +"""Tests for vero.harbor.protocol — sidecar wire types + redaction/summary.""" + +from vero.core.budget import SplitBudget +from vero.core.dataset.base import SplitAccess, SplitAccessLevel +from vero.core.db.candidate import Candidate +from vero.core.db.dataset import DatasetSubset +from vero.core.db.result import ( + ExperimentResult, + ExperimentResultStatus, + SampleResult, +) +from vero.core.db.run import ExperimentRun +from vero.harbor.protocol import ( + build_status, + summarize_experiment, + tier_for_split, +) + + +def _experiment(scores: list[float]) -> "object": + from vero.core.db.database import Experiment + from vero.core.db.dataset import DatasetSample + + run = ExperimentRun( + candidate=Candidate(commit="abc123", repo_name="r"), + dataset_subset=DatasetSubset(split="validation", dataset_id="ds1"), + ) + sample_results = { + i: SampleResult( + dataset_sample=DatasetSample(sample_id=i, split="validation", dataset_id="ds1"), + score=s, + ) + for i, s in enumerate(scores) + } + result = ExperimentResult( + run_id=run.id, status=ExperimentResultStatus.SUCCESS, sample_results=sample_results + ) + return Experiment(run=run, result=result) + + +class TestTier: + def test_listed_split_tier(self): + accesses = [SplitAccess.no_access("test"), SplitAccess.non_viewable("validation")] + assert tier_for_split("test", accesses) == SplitAccessLevel.no_access + assert tier_for_split("validation", accesses) == SplitAccessLevel.non_viewable + + def test_unlisted_defaults_viewable(self): + assert tier_for_split("train", []) == SplitAccessLevel.viewable + + +class TestSummarize: + def test_aggregate_only_no_per_sample(self): + exp = _experiment([1.0, 0.0, 1.0]) + summary = summarize_experiment(exp, result_path="/x/y") + assert summary.commit == "abc123" + assert summary.split == "validation" + assert summary.dataset_id == "ds1" + assert summary.n_samples == 3 + assert summary.mean_score is not None + # no per-sample field exists on the summary at all + assert not any("sample" in k for k in summary.to_dict() if k != "n_samples") + + def test_budget_serialized(self): + exp = _experiment([1.0]) + b = SplitBudget(split="validation", dataset_id="ds1", total_run_budget=5) + d = summarize_experiment(exp, result_path=None, budget_remaining=b).to_dict() + assert d["budget_remaining"]["remaining_run_budget"] == 5 + assert d["result_path"] is None + + +class TestBuildStatus: + def test_lists_budgeted_splits_with_tier(self): + budget = { + ("train", "ds1"): SplitBudget(split="train", dataset_id="ds1", total_run_budget=10), + ("validation", "ds1"): SplitBudget(split="validation", dataset_id="ds1", total_run_budget=3), + } + accesses = [SplitAccess.non_viewable("validation")] # train defaults viewable + status = build_status(submit_enabled=True, budget=budget, split_accesses=accesses) + + assert status.submit_enabled is True + by_split = {s["split"]: s for s in status.splits} + assert by_split["train"]["tier"] == str(SplitAccessLevel.viewable) + assert by_split["train"]["agent_evaluable"] is True + assert by_split["validation"]["tier"] == str(SplitAccessLevel.non_viewable) + assert by_split["validation"]["agent_evaluable"] is True + assert by_split["validation"]["remaining_run_budget"] == 3 diff --git a/vero/tests/test_harbor_serve.py b/vero/tests/test_harbor_serve.py new file mode 100644 index 0000000..edbbbca --- /dev/null +++ b/vero/tests/test_harbor_serve.py @@ -0,0 +1,143 @@ +"""Integration test for vero.harbor.serve — assemble the sidecar/verifier from a +ServeConfig and run a real (deterministic, no-LLM) Mode-A eval + finalize. + +Reuses the external-task project pattern: a trivial agent + a separate task project, +scored deterministically. Validates that `build_components` produces a working engine, +and that a real eval flows into verifier selection + scoring. +""" + +from __future__ import annotations + +import subprocess +import textwrap +from pathlib import Path + +import pytest + +from vero.core.dataset.store import resolve_and_save_dataset +from vero.evaluation.engine import EvalRequest +from vero.harbor.serve import ServeConfig, build_components + + +def _git(path: Path, *args: str) -> str: + return subprocess.run( + ["git", "-c", "user.name=t", "-c", "user.email=t@t", *args], + cwd=path, capture_output=True, check=True, text=True, + ).stdout.strip() + + +def _create_agent(root: Path) -> tuple[Path, str]: + d = root / "my-agent" + (d / "src" / "my_agent").mkdir(parents=True) + (d / "pyproject.toml").write_text(textwrap.dedent("""\ + [project] + name = "my-agent" + version = "0.1.0" + requires-python = ">=3.11" + [build-system] + requires = ["hatchling"] + build-backend = "hatchling.build" + [tool.hatch.build.targets.wheel] + packages = ["src/my_agent"] + """)) + (d / "src" / "my_agent" / "__init__.py").write_text('def solve(q): return "42"\n') + _git(d, "init") + _git(d, "add", ".") + _git(d, "commit", "-m", "init") + return d, _git(d, "rev-parse", "HEAD") + + +def _create_task_project(root: Path, vero_path: Path) -> Path: + d = root / "my-eval-tasks" + vt = d / "src" / "my_eval_tasks" / "vero_tasks" + vt.mkdir(parents=True) + (d / "pyproject.toml").write_text(textwrap.dedent(f"""\ + [project] + name = "my-eval-tasks" + version = "0.1.0" + requires-python = ">=3.11" + dependencies = ["scale-vero[optimize]"] + [build-system] + requires = ["hatchling"] + build-backend = "hatchling.build" + [tool.hatch.build.targets.wheel] + packages = ["src/my_eval_tasks"] + [tool.uv.sources] + scale-vero = {{ path = "{vero_path}", editable = true }} + """)) + (vt / "__init__.py").write_text("from my_eval_tasks.vero_tasks import math_task # noqa\n") + (vt / "math_task.py").write_text(textwrap.dedent("""\ + from my_agent import solve + from vero.core.db.result import TaskOutput, TaskResult + from vero.core.evaluation import EvaluationParameters + from vero.core.task import create_task + math_task = create_task("math") + @math_task.inference() + async def run_inference(task, evaluation_parameters): + return TaskOutput(output=solve(task["question"])) + @math_task.evaluation() + async def evaluate(task, output, evaluation_parameters): + return TaskResult(output=output.output, score=1.0 if output.output == task["expected"] else 0.0) + """)) + subprocess.run(["uv", "sync"], cwd=d, capture_output=True, check=True) + return d + + +@pytest.fixture +def fixture(tmp_path, monkeypatch): + from vero.core.constants import PACKAGE_DIR + from datasets import Dataset, DatasetDict + + vh = tmp_path / "vero_home" + (vh / "sessions").mkdir(parents=True) + (vh / "datasets").mkdir(parents=True) + monkeypatch.setenv("VERO_HOME_DIR", str(vh)) + + agent_dir, head = _create_agent(tmp_path) + task_dir = _create_task_project(tmp_path, PACKAGE_DIR) + ds = DatasetDict({"test": Dataset.from_dict( + {"question": ["6*7?", "2+2?"], "expected": ["42", "4"]})}) + ds_path = tmp_path / "ds" + ds.save_to_disk(str(ds_path)) + dataset_id = resolve_and_save_dataset(str(ds_path), vh / "sessions", vh / "datasets", "sess") + return agent_dir, head, task_dir, dataset_id, tmp_path + + +def _serve_config(agent_dir, head, task_dir, dataset_id, tmp) -> ServeConfig: + return ServeConfig( + repo_path=str(agent_dir), + agent_repo_path=str(agent_dir), + session_id="sess", + dataset_id=dataset_id, + split_accesses=[{"split": "test", "access": "non_viewable"}], + budgets=[{"split": "test", "dataset_id": dataset_id, "total_run_budget": 5}], + task="math", + task_project=str(task_dir), + task_module="my_eval_tasks.vero_tasks", + reward_mode="auto_best", + selection_split="test", + targets=[{"task": "math", "dataset_id": dataset_id, "split": "test", "reward_key": "reward", "sample_ids": [0]}], + agent_volume=str(tmp / "agent_vol"), + admin_volume=str(tmp / "admin_vol"), + admin_token_path=str(tmp / "admin_vol" / "token"), + timeout=300, + ) + + +@pytest.mark.asyncio +async def test_serve_assembles_and_evaluates_and_finalizes(fixture): + agent_dir, head, task_dir, dataset_id, tmp = fixture + config = _serve_config(agent_dir, head, task_dir, dataset_id, tmp) + + sidecar, verifier, token = await build_components(config) + assert token and (tmp / "admin_vol" / "token").read_text() == token + + # real eval (no LLM): sample 0 expects "42", agent solves -> "42" -> score 1.0 + exp = await sidecar.engine.evaluate( + EvalRequest(dataset_id=dataset_id, split="test", commit=head, sample_ids=[0]) + ) + assert exp.result.sample_results[0].score == 1.0 + + # verifier selects the (only) candidate on "test" and scores it on the test target + rewards = await verifier.finalize() + assert rewards["reward"] == 1.0 diff --git a/vero/tests/test_harbor_server.py b/vero/tests/test_harbor_server.py new file mode 100644 index 0000000..16986a4 --- /dev/null +++ b/vero/tests/test_harbor_server.py @@ -0,0 +1,124 @@ +"""Tests for vero.harbor.server.EvaluationSidecar — handlers, tier-routing, submit.""" + +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from vero.core.budget import BudgetLedger, SplitBudget +from vero.core.dataset.base import SplitAccess +from vero.core.db.candidate import Candidate +from vero.core.db.dataset import DatasetSample, DatasetSubset +from vero.core.db.database import Experiment +from vero.core.db.result import ( + ExperimentResult, + ExperimentResultStatus, + SampleResult, +) +from vero.core.db.run import ExperimentRun +from vero.harbor.server import EvaluationSidecar, SubmitDisabledError +from vero.evaluation.engine import EvalRequest + + +def _experiment(split: str, commit: str = "abcdef123456") -> Experiment: + run = ExperimentRun( + candidate=Candidate(commit=commit, repo_name="r"), + dataset_subset=DatasetSubset(split=split, dataset_id="ds1"), + ) + sample_results = { + i: SampleResult( + dataset_sample=DatasetSample(sample_id=i, split=split, dataset_id="ds1"), + score=float(i % 2), + feedback=f"Expected: secret-{i}", # label-bearing: must NOT reach agent on partial + ) + for i in range(3) + } + return Experiment( + run=run, + result=ExperimentResult( + run_id=run.id, status=ExperimentResultStatus.SUCCESS, sample_results=sample_results + ), + ) + + +def _sidecar(tmp_path, *, split, submit_enabled=False): + engine = MagicMock() + engine.evaluate = AsyncMock(return_value=_experiment(split)) + engine.budget = BudgetLedger( + [SplitBudget(split=split, dataset_id="ds1", total_run_budget=5, total_sample_budget=100)] + ) + sidecar = EvaluationSidecar( + engine=engine, + split_accesses=[SplitAccess.non_viewable("validation"), SplitAccess.no_access("test")], + agent_repo_path=tmp_path / "agent_repo", + agent_volume=tmp_path / "agent_vol", + admin_volume=tmp_path / "admin_vol", + submit_enabled=submit_enabled, + ) + # Stub the git transfer (integration-tested separately); pin the sha. + sidecar._transfer_commit = AsyncMock(return_value="abcdef123456") + return sidecar + + +class TestRouting: + @pytest.mark.asyncio + async def test_visible_split_writes_full_per_sample(self, tmp_path): + sidecar = _sidecar(tmp_path, split="train") # train defaults to viewable + summary = await sidecar.evaluate(EvalRequest(dataset_id="ds1", split="train")) + + dest = tmp_path / "agent_vol" / "results" / "train__abcdef123456" + assert (dest / "summary.json").exists() + assert {(dest / f"{i}.json").exists() for i in range(3)} == {True} + assert summary.result_path == str(dest) + assert summary.n_samples == 3 + + @pytest.mark.asyncio + async def test_partial_split_writes_summary_only_no_labels(self, tmp_path): + sidecar = _sidecar(tmp_path, split="validation") # non_viewable -> partial + summary = await sidecar.evaluate(EvalRequest(dataset_id="ds1", split="validation")) + + dest = tmp_path / "agent_vol" / "results" / "validation__abcdef123456" + assert (dest / "summary.json").exists() + # NO per-sample files -> the label-bearing feedback never reaches the agent + assert not list(dest.glob("[0-9]*.json")) + blob = (dest / "summary.json").read_text() + assert "secret-" not in blob + assert summary.result_path == str(dest) + + @pytest.mark.asyncio + async def test_admin_eval_writes_nothing_to_agent_volume(self, tmp_path): + sidecar = _sidecar(tmp_path, split="test") # no_access; admin only + summary = await sidecar.evaluate( + EvalRequest(dataset_id="ds1", split="test"), admin=True + ) + assert not (tmp_path / "agent_vol").exists() or not list( + (tmp_path / "agent_vol").rglob("*.json") + ) + assert summary.result_path is None + # admin call bypasses metering + assert summary.budget_remaining is None + + +class TestSubmit: + @pytest.mark.asyncio + async def test_submit_records_nomination(self, tmp_path): + sidecar = _sidecar(tmp_path, split="train", submit_enabled=True) + out = await sidecar.submit(commit="deadbeef") + rec = json.loads((tmp_path / "admin_vol" / "submission.json").read_text()) + assert rec["commit"] == "abcdef123456" # the transferred sha + assert out["submitted_commit"] == "abcdef123456" + + @pytest.mark.asyncio + async def test_submit_disabled_raises(self, tmp_path): + sidecar = _sidecar(tmp_path, split="train", submit_enabled=False) + with pytest.raises(SubmitDisabledError): + await sidecar.submit(commit="x") + + +class TestStatus: + def test_status_reports_submit_and_splits(self, tmp_path): + sidecar = _sidecar(tmp_path, split="train", submit_enabled=True) + status = sidecar.status() + assert status.submit_enabled is True + assert status.splits[0]["split"] == "train" + assert status.splits[0]["remaining_run_budget"] == 5 diff --git a/vero/tests/test_harbor_transfer.py b/vero/tests/test_harbor_transfer.py new file mode 100644 index 0000000..00afae5 --- /dev/null +++ b/vero/tests/test_harbor_transfer.py @@ -0,0 +1,85 @@ +"""Integration test for EvaluationSidecar._transfer_commit (real git repos). + +Validates that a commit is fetched from the (untrusted) mounted agent repo into +the sidecar's own repo and resolved to its sha — the one server.py piece +that can't be unit-tested with mocks. +""" + +import subprocess +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from vero.harbor.server import EvaluationSidecar +from vero.sandbox import LocalSandbox +from vero.workspace.git import GitWorkspace + + +def _git(cwd: Path, *args: str) -> str: + return subprocess.run( + ["git", "-c", "user.email=t@t", "-c", "user.name=t", *args], + cwd=cwd, + check=True, + capture_output=True, + text=True, + ).stdout.strip() + + +def _init_repo(path: Path, content: str) -> str: + path.mkdir(parents=True, exist_ok=True) + _git(path, "init", "-q") + (path / "f.txt").write_text(content) + _git(path, "add", "f.txt") + _git(path, "commit", "-q", "-m", "c") + return _git(path, "rev-parse", "HEAD") + + +async def _sidecar_for(sidecar_repo: Path, agent_repo: Path, tmp_path: Path): + sandbox = await LocalSandbox.create(root=tmp_path) + workspace = await GitWorkspace.from_path(sandbox, str(sidecar_repo)) + engine = MagicMock() + engine.evaluator.workspace = workspace + return EvaluationSidecar( + engine=engine, + split_accesses=[], + agent_repo_path=agent_repo, + agent_volume=tmp_path / "av", + admin_volume=tmp_path / "adv", + ) + + +@pytest.mark.asyncio +async def test_transfer_fetches_agent_head_into_sidecar_repo(tmp_path): + agent_repo = tmp_path / "agent" + sidecar_repo = tmp_path / "sidecar" + agent_head = _init_repo(agent_repo, "agent work") + _init_repo(sidecar_repo, "sidecar base") + + sidecar = await _sidecar_for(sidecar_repo, agent_repo, tmp_path) + sha = await sidecar._transfer_commit(None) # default = agent HEAD + + assert sha == agent_head + # the fetched commit object now lives in the sidecar's own repo (tamper-evident copy) + assert ( + subprocess.run( + ["git", "-C", str(sidecar_repo), "cat-file", "-e", sha], capture_output=True + ).returncode + == 0 + ) + + +@pytest.mark.asyncio +async def test_transfer_explicit_ref(tmp_path): + agent_repo = tmp_path / "agent" + sidecar_repo = tmp_path / "sidecar" + _init_repo(agent_repo, "first") + # a second commit; transfer the first by explicit sha + first = _git(agent_repo, "rev-parse", "HEAD") + (agent_repo / "f.txt").write_text("second") + _git(agent_repo, "commit", "-aqm", "second") + _init_repo(sidecar_repo, "sidecar base") + + sidecar = await _sidecar_for(sidecar_repo, agent_repo, tmp_path) + sha = await sidecar._transfer_commit(first) + assert sha == first diff --git a/vero/tests/test_harbor_verifier.py b/vero/tests/test_harbor_verifier.py new file mode 100644 index 0000000..59b878b --- /dev/null +++ b/vero/tests/test_harbor_verifier.py @@ -0,0 +1,112 @@ +"""Tests for vero.harbor.verifier.Verifier — selection + multi-target scoring.""" + +import json +from unittest.mock import AsyncMock, MagicMock + +import pandas as pd +import pytest + +from vero.harbor.verifier import NoCandidateError, VerificationTarget, Verifier + + +def _engine(scores_by_call): + engine = MagicMock() + engine.evaluate_admin = AsyncMock( + side_effect=[MagicMock(result=MagicMock(score=MagicMock(return_value=s))) for s in scores_by_call] + ) + return engine + + +class TestSubmitSelection: + @pytest.mark.asyncio + async def test_finalize_submit_scores_nominated_commit(self, tmp_path): + (tmp_path / "submission.json").write_text(json.dumps({"commit": "deadbeef"})) + engine = _engine([0.8]) + v = Verifier( + engine=engine, + admin_volume=tmp_path, + reward_mode="submit", + targets=[VerificationTarget(task="t", dataset_id="ds1", split="test", reward_key="reward")], + ) + rewards = await v.finalize() + assert rewards == {"reward": 0.8} + assert engine.evaluate_admin.await_args.kwargs["commit"] == "deadbeef" + assert engine.evaluate_admin.await_args.kwargs["split"] == "test" + + @pytest.mark.asyncio + async def test_finalize_submit_no_submission_raises(self, tmp_path): + v = Verifier( + engine=_engine([]), + admin_volume=tmp_path, + reward_mode="submit", + targets=[VerificationTarget(task="t", dataset_id="ds1", split="test", reward_key="reward")], + ) + with pytest.raises(NoCandidateError): + await v.finalize() + + +class TestMultiTarget: + @pytest.mark.asyncio + async def test_finalize_emits_multiple_reward_keys(self, tmp_path): + (tmp_path / "submission.json").write_text(json.dumps({"commit": "c1"})) + engine = _engine([0.9, 0.4]) + v = Verifier( + engine=engine, + admin_volume=tmp_path, + reward_mode="submit", + targets=[ + VerificationTarget(task="t", dataset_id="ds1", split="test", reward_key="in_domain"), + VerificationTarget(task="t2", dataset_id="ds2", split="test", reward_key="held_out"), + ], + ) + rewards = await v.finalize() + assert rewards == {"in_domain": 0.9, "held_out": 0.4} + assert engine.evaluate_admin.await_count == 2 + + +class TestAutoBestSelection: + @pytest.mark.asyncio + async def test_finalize_auto_best_picks_top_validation_score(self, tmp_path): + engine = _engine([0.95]) + engine.db.get_experiments_df.return_value = pd.DataFrame( + { + "dataset_subset_split": ["validation", "validation", "train"], + "candidate_commit": ["lo", "hi", "ignored"], + "mean_score": [0.5, 0.9, 1.0], + "candidate_created_at": [1, 2, 3], + } + ) + v = Verifier( + engine=engine, + admin_volume=tmp_path, + reward_mode="auto_best", + selection_split="validation", + targets=[VerificationTarget(task="t", dataset_id="ds1", split="test", reward_key="reward")], + ) + rewards = await v.finalize() + assert rewards == {"reward": 0.95} + # selected the highest validation score ("hi"), not the train row + assert engine.evaluate_admin.await_args.kwargs["commit"] == "hi" + + @pytest.mark.asyncio + async def test_finalize_auto_best_excludes_baseline(self, tmp_path): + engine = _engine([0.7]) + engine.db.get_experiments_df.return_value = pd.DataFrame( + { + "dataset_subset_split": ["validation", "validation"], + "candidate_commit": ["base", "agent"], + "mean_score": [0.99, 0.6], + "candidate_created_at": [1, 2], + } + ) + v = Verifier( + engine=engine, + admin_volume=tmp_path, + reward_mode="auto_best", + selection_split="validation", + base_commit="base", + targets=[VerificationTarget(task="t", dataset_id="ds1", split="test", reward_key="reward")], + ) + await v.finalize() + # baseline excluded even though it scored higher + assert engine.evaluate_admin.await_args.kwargs["commit"] == "agent"