diff --git a/vero/pyproject.toml b/vero/pyproject.toml index 5e182cc..6a88415 100644 --- a/vero/pyproject.toml +++ b/vero/pyproject.toml @@ -14,7 +14,9 @@ dependencies = [ "datasets>=4.3.0", "pydantic>=2.11.7", "python-dotenv>=1.2.2", + "pyyaml>=6.0", "requests>=2.32.5", + "rich>=13.9.4", "s3fs>=2025.9.0", "tenacity>=9.1.2", "toml>=0.10.2", @@ -37,6 +39,12 @@ docker = [ claude = [ "claude-agent-sdk>=0.1.56", ] +harbor = [ + "fastapi>=0.110", + "uvicorn>=0.27", + "httpx>=0.27", + "jinja2>=3.1.6", +] optimize = [ "async-lru>=2.0.5", "beautifulsoup4>=4.14.2", diff --git a/vero/src/vero/core/budget.py b/vero/src/vero/core/budget.py new file mode 100644 index 0000000..3d370f5 --- /dev/null +++ b/vero/src/vero/core/budget.py @@ -0,0 +1,187 @@ +"""Per-split evaluation budgets and the ledger that meters them. + +``SplitBudget`` is the public, stateful budget for one (split, dataset_id) pair. +``BudgetLedger`` owns a set of them — the keys also form the allowlist of +evaluable combinations. The ledger is in-memory by default (the in-process +``ExperimentRunnerTool``); with a ``persist_path`` it flushes every mutation to +durable JSON under a single-writer lock (the Harbor eval sidecar). +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass, field +from pathlib import Path + +from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError + +logger = logging.getLogger(__name__) + + +@dataclass +class SplitBudget: + """A stateful object that tracks the remaining budget for running experiments.""" + + split: str + dataset_id: str = "" + total_sample_budget: int | None = None + remaining_sample_budget: int | None = field(init=False) + total_run_budget: int | None = None + remaining_run_budget: int | None = field(init=False) + max_samples_per_run: int | None = None + + def __repr__(self) -> str: + repr_items = [ + ("split", self.split), + ("dataset_id", self.dataset_id), + ("total_sample_budget", self.total_sample_budget), + ("total_run_budget", self.total_run_budget), + ] + repr_items = [item for item in repr_items if item[1] is not None] + return ( + f"SplitBudget({', '.join([f'{item[0]}={item[1]}' for item in repr_items])})" + ) + + def __post_init__(self): + assert ( + self.total_sample_budget is not None or self.total_run_budget is not None + ), "Either total sample budget or total run budget must be provided." + self.remaining_sample_budget = self.total_sample_budget + self.remaining_run_budget = self.total_run_budget + + assert ( + isinstance(self.total_sample_budget, int) + or self.total_sample_budget is None + ) + assert isinstance(self.total_run_budget, int) or self.total_run_budget is None + assert ( + isinstance(self.max_samples_per_run, int) + or self.max_samples_per_run is None + ) + + def has_run_budget(self) -> bool: + return self.remaining_run_budget is None or self.remaining_run_budget > 0 + + def decrement_run_budget(self) -> None: + if self.remaining_run_budget is not None: + self.remaining_run_budget -= 1 + + def has_sample_budget(self, num_samples: int) -> bool: + return ( + self.remaining_sample_budget is None + or self.remaining_sample_budget >= num_samples + ) + + def decrement_sample_budget(self, num_samples: int) -> None: + if self.remaining_sample_budget is not None: + self.remaining_sample_budget -= num_samples + + def exceeds_per_run_budget(self, num_samples: int) -> bool: + return ( + self.max_samples_per_run is not None + and num_samples > self.max_samples_per_run + ) + + +class BudgetLedger: + """Meters evaluation budget across (split, dataset_id) pairs. + + The keys are also the allowlist of evaluable combinations: a pair with no + budget entry is rejected by ``validate``. + + In-memory by default. Pass ``persist_path`` for the durable, crash-safe + variant used by the Harbor sidecar — every mutation is flushed under a + single-writer lock, and ``reserve`` checks-and-decrements atomically before a + run so concurrent callers cannot overspend. Budget is never refunded on error. + """ + + def __init__( + self, + budgets: list[SplitBudget] | None = None, + *, + persist_path: Path | str | None = None, + ): + self._budgets: dict[tuple[str, str], SplitBudget] = { + (b.split, b.dataset_id): b for b in (budgets or []) + } + self.persist_path = Path(persist_path) if persist_path else None + self._lock = asyncio.Lock() + + def validate(self, dataset_id: str, split: str) -> None: + """Raise if (split, dataset_id) is not an allowed combination.""" + if (split, dataset_id) not in self._budgets: + allowed_keys = list(self._budgets.keys()) + raise InvalidSplitError( + f"No split budget found for the combination (dataset_id={dataset_id}, split={split}) " + f"either because it does not exist or because it is not allowed. " + f"Allowed combinations: {allowed_keys}" + ) + + def get(self, dataset_id: str, split: str) -> SplitBudget: + """Return the budget for a pair (validates membership first).""" + self.validate(dataset_id, split) + return self._budgets[(split, dataset_id)] + + def check(self, dataset_id: str, split: str, num_samples: int) -> None: + """Raise ExperimentBudgetExceeded if the request would exceed the budget.""" + budget = self.get(dataset_id, split) + if not budget.has_run_budget(): + raise ExperimentBudgetExceeded( + f"No runs left for the {split} split of the {dataset_id} dataset." + ) + if not budget.has_sample_budget(num_samples): + raise ExperimentBudgetExceeded( + f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, " + f"but the remaining sample budget only allows for {budget.remaining_sample_budget} samples." + ) + if budget.exceeds_per_run_budget(num_samples): + raise ExperimentBudgetExceeded( + f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, " + f"but only {budget.max_samples_per_run} are allowed per run." + ) + + def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget: + """Decrement the budget for a completed (or attempted) run and flush.""" + budget = self.get(dataset_id, split) + budget.decrement_sample_budget(num_samples) + budget.decrement_run_budget() + self._flush() + return budget + + async def reserve( + self, dataset_id: str, split: str, num_samples: int + ) -> SplitBudget: + """Atomically check + record before a run (durable, single-writer). + + Raises InvalidSplitError / ExperimentBudgetExceeded *before* decrementing, + so a rejected request costs nothing; a reserved request is never refunded. + """ + async with self._lock: + self.check(dataset_id, split, num_samples) + return self.record(dataset_id, split, num_samples) + + def status(self) -> dict[tuple[str, str], SplitBudget]: + """Return all budgets keyed by (split, dataset_id).""" + return dict(self._budgets) + + def _flush(self) -> None: + if self.persist_path is None: + return + data = [ + { + "split": b.split, + "dataset_id": b.dataset_id, + "total_sample_budget": b.total_sample_budget, + "remaining_sample_budget": b.remaining_sample_budget, + "total_run_budget": b.total_run_budget, + "remaining_run_budget": b.remaining_run_budget, + "max_samples_per_run": b.max_samples_per_run, + } + for b in self._budgets.values() + ] + self.persist_path.parent.mkdir(parents=True, exist_ok=True) + tmp = self.persist_path.with_suffix(self.persist_path.suffix + ".tmp") + tmp.write_text(json.dumps(data, indent=2)) + tmp.replace(self.persist_path) diff --git a/vero/src/vero/core/cli.py b/vero/src/vero/core/cli.py index 419ec80..ebc2293 100644 --- a/vero/src/vero/core/cli.py +++ b/vero/src/vero/core/cli.py @@ -83,6 +83,16 @@ def main(): setup_logging() +# Optional `vero harbor` group (requires the `harbor` extra). Registered lazily so the +# base CLI works without it. +try: + from vero.harbor.cli import harbor as _harbor_group + + main.add_command(_harbor_group) +except ImportError: + pass + + @main.group() def init(): """Initialize evaluation scaffolds for your uv project.""" @@ -578,7 +588,7 @@ def check( if errors: click.echo("\n Skipping task discovery (project issues above)") else: - from vero.evaluator import Evaluator + from vero.evaluation.evaluator import Evaluator from vero.workspace.git import GitWorkspace async def _discover(): @@ -760,7 +770,7 @@ def evaluate( """Run an evaluation on an agent codebase.""" import asyncio - from vero.evaluator import run_evaluation + from vero.evaluation.evaluator import run_evaluation asyncio.run( run_evaluation( diff --git a/vero/src/vero/core/dataset/base.py b/vero/src/vero/core/dataset/base.py index d9cf3ef..0b5c0bf 100644 --- a/vero/src/vero/core/dataset/base.py +++ b/vero/src/vero/core/dataset/base.py @@ -19,10 +19,17 @@ class DefaultSplitNames(StrEnum): class SplitAccessLevel(StrEnum): - """Access levels for dataset splits.""" + """Access levels for dataset splits. + + Three tiers of increasing restriction: + - viewable: rows materialized + full per-sample results visible. + - non_viewable: no rows, but the split can be evaluated and summary stats seen. + - no_access: no rows, no summary, and not agent-evaluable (admin/verifier only). + """ viewable = "viewable" non_viewable = "non_viewable" + no_access = "no_access" @dataclass @@ -40,17 +47,28 @@ def viewable(cls, split: str) -> SplitAccess: def non_viewable(cls, split: str) -> SplitAccess: return cls(split=split, access=SplitAccessLevel.non_viewable) + @classmethod + def no_access(cls, split: str) -> SplitAccess: + return cls(split=split, access=SplitAccessLevel.no_access) + default_split_accesses = ( - SplitAccess.non_viewable(DefaultSplitNames.test), + SplitAccess.no_access(DefaultSplitNames.test), SplitAccess.non_viewable(DefaultSplitNames.validation), ) def get_non_viewable_splits(split_accesses: list[SplitAccess]) -> list[str]: - """Extract non-viewable splits from a list of SplitAccess.""" + """Splits whose rows/details are not viewable (non_viewable and no_access). + + no_access is strictly more restrictive than non_viewable, so it is excluded + everywhere non_viewable is. The non_viewable/no_access distinction (summary + + agent-evaluable vs. not) is enforced in the evaluation engine, not here. + """ return [ - sa.split for sa in split_accesses if sa.access == SplitAccessLevel.non_viewable + sa.split + for sa in split_accesses + if sa.access in (SplitAccessLevel.non_viewable, SplitAccessLevel.no_access) ] diff --git a/vero/src/vero/core/db/result.py b/vero/src/vero/core/db/result.py index c22c0df..1a81608 100644 --- a/vero/src/vero/core/db/result.py +++ b/vero/src/vero/core/db/result.py @@ -3,12 +3,11 @@ import json import logging import traceback -from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING, Any, Sequence from uuid import uuid4 -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field, model_validator from vero.core.constants import default_maximum_score, default_minimum_score from vero.core.db.dataset import DatasetSample @@ -20,20 +19,42 @@ logger = logging.getLogger(__name__) -@dataclass -class TaskOutput: - """Non-serializable output of an agent on a single task. Used within a subprocess to collate the outputs of the inference process. +class TaskOutput(BaseModel): + """Serializable output of inference on a single task. + + Persisted between the inference and scoring stages, so it must be + JSON-serializable. An ``Exception`` passed to ``error`` is coerced to its + string form, with the traceback captured into ``error_traceback``. Attributes: output: The output of the agent on the task. - error: An optional error string, e.g. the traceback of the error. - execution_trace: An optional list of spans indicating details of the inference process. + error: An error string (e.g. ``str(exception)``). + error_traceback: Full traceback string if inference raised. + execution_trace: An optional list of spans describing the inference process. """ + model_config = ConfigDict(arbitrary_types_allowed=True) + output: Any = None - error: Exception | None = None + error: str | None = None + error_traceback: str | None = None execution_trace: Sequence[Any] | None = None + @model_validator(mode="before") + @classmethod + def _coerce_exception_error(cls, data: Any) -> Any: + """Accept an Exception in ``error`` and convert it to str + traceback.""" + if isinstance(data, dict): + err = data.get("error") + if isinstance(err, BaseException): + data = dict(data) + data["error"] = str(err) + if not data.get("error_traceback"): + data["error_traceback"] = "".join( + traceback.format_exception(type(err), err, err.__traceback__) + ) + return data + class TaskResult(BaseModel): """Serializable evaluation result for a single task. Used across processes for long-term storage of evaluation results. @@ -62,21 +83,12 @@ class TaskResult(BaseModel): @classmethod def from_task_output(cls, task_output: TaskOutput, **kwargs: Any) -> TaskResult: - """Create a TaskResult from a TaskOutput.""" - - if isinstance(task_output.error, Exception): - kwargs["error"] = str(task_output.error) - kwargs["error_traceback"] = "".join( - traceback.format_exception( - type(task_output.error), - task_output.error, - task_output.error.__traceback__, - ) - ) - - kwargs["execution_trace"] = task_output.execution_trace + """Create a TaskResult from a (serializable) TaskOutput.""" kwargs["output"] = task_output.output - + kwargs["execution_trace"] = task_output.execution_trace + if task_output.error is not None: + kwargs.setdefault("error", task_output.error) + kwargs.setdefault("error_traceback", task_output.error_traceback) return cls(**kwargs) @@ -113,6 +125,10 @@ def is_error(self) -> bool: or self.error_traceback is not None ) + def is_scored(self) -> bool: + """True once the scoring stage has run for this sample (score or eval_error set).""" + return self.score is not None or self.eval_error is not None + def as_pandas_series(self, exclude: set[str] | None = None) -> Series: """Return the sample result in a pandas representation.""" import pandas as pd diff --git a/vero/src/vero/core/task/__init__.py b/vero/src/vero/core/task/__init__.py index dd9471b..a034e4b 100644 --- a/vero/src/vero/core/task/__init__.py +++ b/vero/src/vero/core/task/__init__.py @@ -6,6 +6,7 @@ def create_task( register: bool = True, task_parameters: type | None = None, required_env_vars: list[str] | None = None, + label_fields: list[str] | None = None, ) -> VeroTask: """Create a VeroTask for use in user code. @@ -15,6 +16,10 @@ def create_task( task_parameters: Optional TaskParameters subclass for early validation. required_env_vars: Environment variables that must be set for this task to run (e.g. ``["LITELLM_BASE_URL", "LITELLM_API_KEY"]``). + label_fields: Dataset columns that hold labels/ground truth. These are + stripped from each sample before it is passed to inference, so the + (agent-authored) inference code never sees them; scoring still gets + the full row. A static, immutable property of the task definition. Returns: A new VeroTask instance. @@ -24,6 +29,7 @@ def create_task( register=register, task_parameters_type=task_parameters, required_env_vars=required_env_vars, + label_fields=label_fields, ) diff --git a/vero/src/vero/core/task/task.py b/vero/src/vero/core/task/task.py index c9e605f..6e1f651 100644 --- a/vero/src/vero/core/task/task.py +++ b/vero/src/vero/core/task/task.py @@ -14,7 +14,12 @@ from vero.core.db.dataset import DatasetSample from vero.core.db.result import SampleResult, TaskOutput, TaskResult from vero.core.evaluation import EvaluationParameters -from vero.core.sessions import get_vero_home_dir, save_sample_result +from vero.core.sessions import ( + get_vero_home_dir, + load_all_sample_results, + load_sample_result, + save_sample_result, +) from vero.core.utils import limited_gather, maybe_await logger = logging.getLogger(__name__) @@ -79,6 +84,7 @@ def __init__( register: bool = True, task_parameters_type: type | None = None, required_env_vars: list[str] | None = None, + label_fields: list[str] | None = None, ): """Initialize a VeroTask. @@ -90,12 +96,16 @@ def __init__( required_env_vars: Environment variables that must be set for this task to run (e.g. ``["LITELLM_BASE_URL", "LITELLM_API_KEY"]``). Checked before the evaluation subprocess starts. + label_fields: Dataset columns holding labels/ground truth. Stripped from + each sample before inference (so inference never sees them); scoring + receives the full row. Static, immutable task property. """ self.name = name self._functions: dict[str, Callable] = {} self._batch_functions: dict[str, Callable] = {} self._task_parameters_type = task_parameters_type self.required_env_vars: list[str] = required_env_vars or [] + self.label_fields: list[str] = label_fields or [] if register: if name in VeroTask._registry: @@ -502,76 +512,111 @@ async def evaluate_safely(task: TaskT, output: TaskOutput) -> TaskResult: # Results # ------------------------------------------------------------------------- - def compile_and_save_sample_results( - self, - evaluation_parameters: EvaluationParameters, - results: list[TaskResult | Exception], - task_data: Sequence[dict[str, JsonValue]] | None = None, - ) -> dict[str, int | float | None]: - """Compile results into SampleResult objects and save to disk. + # ------------------------------------------------------------------------- + # Per-stage persistence + # + # Each sample is persisted to its own ``samples/{id}.json`` file as it + # completes: a partial SampleResult after inference (score=None), then the + # same file updated with scoring fields. This makes every stage independently + # runnable and resumable from any partial state. + # ------------------------------------------------------------------------- - Args: - evaluation_parameters: Evaluation parameters. - results: List of evaluation results or exceptions. - task_data: Raw task data dicts for each sample (used to populate input field). + def _sessions_dir(self) -> Path: + return get_vero_home_dir() / "sessions" - Returns: - Metrics dictionary. + def _scrub_inputs(self, row: Any) -> Any: + """Strip ``label_fields`` from a sample before it reaches inference. + + Only applies to mapping rows; non-mapping rows pass through unchanged. """ + if not self.label_fields: + return row + try: + return {k: v for k, v in dict(row).items() if k not in self.label_fields} + except (TypeError, ValueError): + return row + + def _dataset_sample( + self, params: EvaluationParameters, sample_id: int + ) -> DatasetSample: + return DatasetSample( + sample_id=sample_id, + split=params.run.dataset_subset.split, + dataset_id=params.run.dataset_subset.dataset_id, + ) + + def _save_inference( + self, + params: EvaluationParameters, + sample_id: int, + task_data: Sequence[dict[str, JsonValue]] | None, + pos: int, + output: TaskOutput, + ) -> None: + """Persist a partial SampleResult holding only the inference output.""" + sample_input = ( + self._scrub_inputs(dict(task_data[pos])) + if task_data is not None and pos < len(task_data) + else None + ) + sample_result = SampleResult.from_task_result( + dataset_sample=self._dataset_sample(params, sample_id), + task_result=TaskResult.from_task_output(output), + commit=params.run.candidate.commit, + result_id=params.result_id, + input=sample_input, + ) + save_sample_result( + self._sessions_dir(), + params.session_id, + params.result_id, + sample_id=sample_id, + result=sample_result, + ) + + def _save_score( + self, + params: EvaluationParameters, + sample_result: SampleResult, + task_result: TaskResult, + ) -> None: + """Update a persisted SampleResult with scoring-stage fields and re-save.""" + sample_result.score = task_result.score + sample_result.feedback = task_result.feedback + sample_result.metrics = task_result.metrics + sample_result.eval_error = task_result.eval_error + sample_result.eval_trace = task_result.eval_trace + if task_result.error_traceback and sample_result.error_traceback is None: + sample_result.error_traceback = task_result.error_traceback + save_sample_result( + self._sessions_dir(), + params.session_id, + params.result_id, + sample_id=sample_result.dataset_sample.sample_id, + result=sample_result, + ) + + def compute_metrics( + self, params: EvaluationParameters + ) -> dict[str, int | float | None]: + """Compute metrics from the SampleResults persisted on disk.""" from vero.core.constants import default_minimum_score - metrics = { - "num_samples": len(results), + sample_results = load_all_sample_results( + self._sessions_dir(), params.session_id, params.result_id + ) + + metrics: dict[str, int | float | None] = { + "num_samples": len(sample_results), "num_errors": 0, "avg_score": 0, "avg_filled_score": None, } - - sample_results: dict[int, SampleResult] = {} - sample_ids = evaluation_parameters.run.dataset_subset.sample_ids - if sample_ids is None: - sample_ids = list(range(len(results))) - - commit = evaluation_parameters.run.candidate.commit - result_id = evaluation_parameters.result_id - - for idx, (sample_id, result) in enumerate(zip(sample_ids, results)): - dataset_sample = DatasetSample( - sample_id=sample_id, - split=evaluation_parameters.run.dataset_subset.split, - dataset_id=evaluation_parameters.run.dataset_subset.dataset_id, - ) - - sample_input = ( - dict(task_data[idx]) - if task_data is not None and idx < len(task_data) - else None - ) - common_kwargs = { - "commit": commit, - "result_id": result_id, - "input": sample_input, - } - - if isinstance(result, Exception): - error = "".join( - traceback.format_exception( - type(result), result, result.__traceback__ - ) - ) - sample_results[sample_id] = SampleResult( - dataset_sample=dataset_sample, error=error, **common_kwargs - ) - metrics["num_errors"] = metrics["num_errors"] + 1 - else: - sample_results[sample_id] = SampleResult.from_task_result( - dataset_sample=dataset_sample, task_result=result, **common_kwargs - ) - - if result.error is not None or result.eval_error is not None: - metrics["num_errors"] = metrics["num_errors"] + 1 - elif result.score is not None: - metrics["avg_score"] = metrics["avg_score"] + result.score + for sr in sample_results.values(): + if sr.error is not None or sr.eval_error is not None: + metrics["num_errors"] += 1 + elif sr.score is not None: + metrics["avg_score"] += sr.score metrics["num_successes"] = metrics["num_samples"] - metrics["num_errors"] @@ -581,7 +626,6 @@ def compile_and_save_sample_results( metrics["avg_score"] = None metrics["avg_filled_score"] = metrics["avg_score"] - if metrics["avg_score"] is None: metrics["avg_filled_score"] = default_minimum_score elif metrics["num_errors"] > 0: @@ -590,19 +634,6 @@ def compile_and_save_sample_results( + metrics["num_errors"] * default_minimum_score ) / metrics["num_samples"] - if sample_results: - vero_home = get_vero_home_dir() - sessions_dir = vero_home / "sessions" - for sample_id, result in sample_results.items(): - save_sample_result( - sessions_dir, - evaluation_parameters.session_id, - evaluation_parameters.result_id, - sample_id=sample_id, - result=result, - ) - logger.info(f"Saved {len(sample_results)} sample results") - return metrics # ------------------------------------------------------------------------- @@ -639,14 +670,160 @@ def _validate_required_functions(self) -> None: + "\n".join(f" - {e}" for e in errors) ) + async def run_inference_stage(self, params: EvaluationParameters) -> None: + """Run (or resume) inference, persisting each sample as it completes. + + Resume: samples whose ``samples/{id}.json`` already exists are skipped. + Per-sample inference persists incrementally; a batch inference function + persists after the batch returns. + """ + tasks, task_data = self._load_and_prepare_data(params) + sample_ids = params.run.dataset_subset.sample_ids + if sample_ids is None: + sample_ids = list(range(len(tasks))) + + sessions_dir = self._sessions_dir() + pending = [ + (pos, sid) + for pos, sid in enumerate(sample_ids) + if load_sample_result(sessions_dir, params.session_id, params.result_id, sid) + is None + ] + if not pending: + logger.info("Inference stage: all samples already persisted; skipping") + return + + single_fn = self.get("run_inference", batch=False) + batch_fn = self.get("run_inference", batch=True) + if single_fn is None and batch_fn is None: + raise RuntimeError( + "No inference function registered. " + "Use @task.inference() or @task.inference(batch=True) to register one." + ) + + if single_fn is not None: + + async def infer_and_save(pos: int, sid: int) -> TaskOutput: + output = self.cast_to_task_output( + await maybe_await(single_fn(self._scrub_inputs(tasks[pos]), params)) + ) + self._save_inference(params, sid, task_data, pos, output) + return output + + results = await limited_gather( + coro_factories=[ + (lambda p=pos, s=sid: infer_and_save(p, s)) for pos, sid in pending + ], + limit=params.max_concurrency, + retry_config=params.retry_config, + desc="Running inference", + return_exceptions=True, + timeout=params.sample_timeout, + run_in_thread=params.use_threading, + ) + # Persist an error record for samples that exhausted retries. + for (pos, sid), res in zip(pending, results): + if isinstance(res, Exception): + self._save_inference( + params, sid, task_data, pos, TaskOutput(error=res) + ) + else: + outputs = await self.run_batch_inference( + [self._scrub_inputs(tasks[pos]) for pos, _ in pending], params + ) + for (pos, sid), output in zip(pending, outputs): + self._save_inference(params, sid, task_data, pos, output) + + logger.info(f"Inference stage complete: {len(pending)} samples") + + async def run_scoring_stage(self, params: EvaluationParameters) -> None: + """Run (or resume) scoring over persisted inference outputs. + + Skips samples that errored during inference (terminal) or are already + scored. Reads inference outputs from disk and re-persists with scores. + """ + tasks, _ = self._load_and_prepare_data(params) + sample_ids = params.run.dataset_subset.sample_ids + if sample_ids is None: + sample_ids = list(range(len(tasks))) + + existing = load_all_sample_results( + self._sessions_dir(), params.session_id, params.result_id + ) + pending: list[tuple[int, SampleResult]] = [] + for pos, sid in enumerate(sample_ids): + sr = existing.get(sid) + if sr is None: + logger.warning( + f"Scoring stage: no inference result for sample {sid}; skipping" + ) + continue + if sr.error is not None: # inference error is terminal + continue + if sr.is_scored(): + continue + pending.append((pos, sr)) + if not pending: + logger.info("Scoring stage: nothing to score; skipping") + return + + single_fn = self.get("run_evaluation", batch=False) + batch_fn = self.get("run_evaluation", batch=True) + if single_fn is None and batch_fn is None: + raise RuntimeError( + "No evaluation function registered. " + "Use @task.evaluation() or @task.evaluation(batch=True) to register one." + ) + + def _output(sr: SampleResult) -> TaskOutput: + return TaskOutput( + output=sr.output, error=sr.error, execution_trace=sr.execution_trace + ) + + if single_fn is not None: + + async def score_and_save(pos: int, sr: SampleResult) -> None: + result = await maybe_await(single_fn(tasks[pos], _output(sr), params)) + self._save_score(params, sr, self.cast_to_task_result(_output(sr), result)) + + results = await limited_gather( + coro_factories=[ + (lambda p=pos, r=sr: score_and_save(p, r)) for pos, sr in pending + ], + limit=params.max_concurrency, + retry_config=params.retry_config, + desc="Evaluating samples", + return_exceptions=True, + timeout=params.sample_timeout, + run_in_thread=params.use_threading, + ) + for (pos, sr), res in zip(pending, results): + if isinstance(res, Exception): + self._save_score( + params, sr, self.cast_to_task_result(_output(sr), res) + ) + else: + eval_results = await self.run_batch_evaluation( + [tasks[pos] for pos, _ in pending], + [_output(sr) for _, sr in pending], + params, + ) + for (pos, sr), task_result in zip(pending, eval_results): + self._save_score(params, sr, task_result) + + logger.info(f"Scoring stage complete: {len(pending)} samples") + async def run(self, params: EvaluationParameters) -> dict[str, Any]: - """Run the complete evaluation pipeline. + """Run the full evaluation pipeline as two resumable stages. + + Inference and scoring each persist per-sample as they complete and skip + already-done samples, so a crashed run resumes from its partial state. Args: params: Evaluation parameters. Returns: - Metrics dictionary. + Metrics dictionary (computed from the persisted sample results). Raises: RuntimeError: If required functions are not registered. @@ -660,21 +837,11 @@ async def run(self, params: EvaluationParameters) -> dict[str, Any]: # Validate required functions are registered self._validate_required_functions() - # Step 1: Load and prepare data - tasks, task_data = self._load_and_prepare_data(params) - logger.info(f"Loaded {len(tasks)} samples") - - # Step 2: Run inference - outputs = await self.run_batch_inference(tasks, params) + await self.run_inference_stage(params) + await self.run_scoring_stage(params) - # Step 3: Run evaluation - results = await self.run_batch_evaluation(tasks, outputs, params) - logger.info(f"Processed {len(results)} samples") - - # Step 4: Compile and save results - metrics = self.compile_and_save_sample_results(params, results, task_data) + metrics = self.compute_metrics(params) logger.info(f"Logged results: {metrics}") - return metrics def __repr__(self) -> str: diff --git a/vero/src/vero/evaluation/__init__.py b/vero/src/vero/evaluation/__init__.py new file mode 100644 index 0000000..816e191 --- /dev/null +++ b/vero/src/vero/evaluation/__init__.py @@ -0,0 +1,19 @@ +"""Evaluation: the Evaluator (checkout + run) and the EvaluationEngine that +orchestrates it (sample resolution + budget metering). The in-process +ExperimentRunnerTool and the Harbor eval sidecar are both frontends over the engine. +""" + +from vero.evaluation.engine import EvalRequest, EvaluationEngine +from vero.evaluation.evaluator import ( + Evaluator, + isolate_project, + run_evaluation, +) + +__all__ = [ + "Evaluator", + "isolate_project", + "run_evaluation", + "EvaluationEngine", + "EvalRequest", +] diff --git a/vero/src/vero/evaluation/engine.py b/vero/src/vero/evaluation/engine.py new file mode 100644 index 0000000..5524c57 --- /dev/null +++ b/vero/src/vero/evaluation/engine.py @@ -0,0 +1,188 @@ +"""EvaluationEngine: the shared evaluation core. + +Wraps the :class:`~vero.evaluator.Evaluator` with budget metering and the +dataset/split allowlist. It is the single eval path used by both the in-process +``ExperimentRunnerTool`` (in-memory budget) and the Harbor eval sidecar (durable +budget + HTTP frontend). It returns the **full** ``Experiment`` — redaction, +write-routing, and human/wire formatting are the frontend's job, not the core's. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +from vero.core.budget import BudgetLedger, SplitBudget +from vero.core.evaluation import BaseEvaluationParameters + +if TYPE_CHECKING: + from vero.core.db.database import Experiment, ExperimentDatabase + from vero.evaluation.evaluator import Evaluator + +logger = logging.getLogger(__name__) + + +@dataclass +class EvalRequest: + """A request to evaluate a commit on a dataset split. + + Also the agent-facing wire payload in the Harbor case. ``task`` is + not a field — it is fixed config bound on the service, not agent-chosen. + """ + + dataset_id: str + split: str + commit: str | None = None # None -> resolved by the caller (e.g. agent repo HEAD) + sample_ids: list[int] | None = None + num_samples: int | None = None + + +class EvaluationEngine: + """Resolve samples -> meter budget -> run the Evaluator -> full Experiment.""" + + def __init__( + self, + *, + evaluator: Evaluator, + budget: BudgetLedger, + default_task: str | None = None, + db: ExperimentDatabase | None = None, + run_constraints: BaseEvaluationParameters | None = None, + session_id: str | None = None, + vero_home: Path | None = None, + ): + self.evaluator = evaluator + self.budget = budget + self.default_task = default_task + self.db = db + self.run_constraints = run_constraints or BaseEvaluationParameters() + self.session_id = session_id + self.vero_home = vero_home + + @classmethod + def from_session(cls, session) -> EvaluationEngine: + """Build a service from a bound Session (mirrors ExperimentRunnerTool.bind).""" + from copy import deepcopy + + return cls( + evaluator=session.evaluator, + budget=BudgetLedger(deepcopy(session.budget)), + default_task=session.task, + db=session.db, + run_constraints=session.evaluation_parameters, + session_id=session.session_id, + vero_home=session.vero_home, + ) + + # ------------------------------------------------------------------ + # Dataset / sample resolution (lifted from ExperimentRunnerTool) + # ------------------------------------------------------------------ + + def _get_dataset_info(self, dataset_id: str): + from vero.core.dataset import DatasetInfo + from vero.core.dataset.store import load_dataset + + sessions_dir = self.vero_home / "sessions" if self.vero_home else None + dataset_cache = self.vero_home / "datasets" if self.vero_home else None + dataset = load_dataset(sessions_dir, dataset_cache, self.session_id, dataset_id) + return DatasetInfo( + id=dataset_id, + splits={split: len(dataset[split]) for split in dataset}, + features={split: list(dataset[split].features) for split in dataset}, + ) + + def _get_samples_from_split( + self, dataset_id: str, split: str, num_samples: int + ) -> list[int] | None: + """First-N sample ids, or None when N covers (or exceeds) the whole split.""" + split_size = self._get_dataset_info(dataset_id).splits[split] + num_samples = min(num_samples, split_size) + if num_samples >= split_size: + return None + return list(range(num_samples)) + + def _validate_and_count_samples( + self, dataset_id: str, split: str, sample_ids: list[int] | None = None + ) -> int: + """Validate sample ids are in range; return the count (full split if None).""" + split_size = self._get_dataset_info(dataset_id).splits[split] + if sample_ids is None: + return split_size + invalid = [s for s in sample_ids if s < 0 or s >= split_size] + if invalid: + raise ValueError( + f"The provided sample ids are outside the range of the split " + f"[0, {split_size - 1}]: {invalid}" + ) + return len(sample_ids) + + def resolve_samples(self, req: EvalRequest) -> tuple[list[int] | None, int]: + """Resolve (sample_ids, count) for a request. Raises on invalid combos.""" + if req.sample_ids is not None and req.num_samples is not None: + raise ValueError( + "Cannot specify both sample_ids and num_samples. " + "Use sample_ids for specific samples, or num_samples for the first N samples." + ) + sample_ids = req.sample_ids + if req.num_samples is not None: + sample_ids = self._get_samples_from_split( + req.dataset_id, req.split, req.num_samples + ) + count = self._validate_and_count_samples(req.dataset_id, req.split, sample_ids) + return sample_ids, count + + # ------------------------------------------------------------------ + # Evaluation + # ------------------------------------------------------------------ + + async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> Experiment: + """Meter (unless admin) and run one evaluation; return the full Experiment. + + ``no_access`` gating is implicit: those splits are absent from the budget + ledger, so ``reserve`` raises ``InvalidSplitError`` for the agent; admin + bypasses the ledger and may evaluate anything. + """ + sample_ids, n = self.resolve_samples(req) + if not admin: + await self.budget.reserve(req.dataset_id, req.split, n) + return await self.evaluator.evaluate( + commit=req.commit, + dataset_id=req.dataset_id, + split=req.split, + task=self.default_task, + sample_ids=sample_ids, + db=self.db, + evaluation_parameters=self.run_constraints, + ) + + async def evaluate_admin( + self, + *, + task: str, + dataset_id: str, + split: str, + commit: str, + sample_ids: list[int] | None = None, + ) -> Experiment: + """Admin/verifier evaluation: explicit ``task``, no budget, no allowlist. + + Unlike :meth:`evaluate` (which is bound to ``default_task`` and metered), + this scores an arbitrary ``(task, dataset_id, split)`` — including held-out + tasks/splits the agent never had access to. Used by the verifier to score + the selected commit on its configured targets. + """ + return await self.evaluator.evaluate( + commit=commit, + dataset_id=dataset_id, + split=split, + task=task, + sample_ids=sample_ids, + db=self.db, + evaluation_parameters=self.run_constraints, + ) + + def status(self) -> dict[tuple[str, str], SplitBudget]: + """Remaining budget per (split, dataset_id).""" + return self.budget.status() diff --git a/vero/src/vero/evaluation/evaluator.py b/vero/src/vero/evaluation/evaluator.py new file mode 100644 index 0000000..c13f7c0 --- /dev/null +++ b/vero/src/vero/evaluation/evaluator.py @@ -0,0 +1,836 @@ +from __future__ import annotations + +import json +import logging +import os +import random +import traceback +from pathlib import Path + +import yaml +from rich.panel import Panel +from rich.syntax import Syntax + +from vero.core.cli_adapters import UvRunParameters +from vero.core.constants import ( + evaluation_parameters_basename, + evaluation_results_basename, + pytest_report_basename, + result_metadata_basename, + samples_dir_name, +) +from vero.core.db.candidate import Candidate +from vero.core.db.database import Experiment, ExperimentDatabase +from vero.core.db.dataset import DatasetSubset +from vero.core.db.result import ExperimentResult, SampleResult +from vero.core.db.run import ExperimentRun +from vero.core.evaluation import BaseEvaluationParameters, EvaluationParameters +from vero.core.sessions import ( + clear_result_cache, + get_experiment_dir, + get_session_dir, + get_vero_home_dir, + initialize_result_store, + load_all_sample_results, + save_json_to_cache, +) +from vero.core.task.utils import get_discover_cmd, get_run_cmd +from vero.evaluation.strategy import EvalStrategy +from vero.exceptions import ExperimentRunFailedError +from vero.logging import setup_console +from vero.utils import run_subprocess_with_tee +from vero.workspace import Workspace +from vero.workspace.git import GitWorkspace + +console = setup_console() + +logger = logging.getLogger(__name__) + + +class Evaluator: + """Evaluates experiment runs by checking out commits and running tasks in subprocesses.""" + + def __init__( + self, + workspace: Workspace, + session_id: str, + *, + vero_home: Path | None = None, + use_copy: bool = False, + hooks: list[str] | None = None, + sync: bool = False, + subprocess_env_vars: list | Path | str | None = None, + task_project: Path | None = None, + task_module: str | None = None, + eval_strategy: EvalStrategy | None = None, + ): + self.workspace = workspace + self.session_id = session_id + self.vero_home = vero_home or get_vero_home_dir() + self.use_copy = use_copy + self.hooks = hooks if hooks is not None else ["setup_logging"] + self.sync = sync + self._subprocess_env_vars = subprocess_env_vars + self.task_project = task_project + self.task_module = task_module + # Mode-specific "produce sample results" step. None = Mode A (task.utils, + # inline below). A strategy (e.g. Harbor Mode B) is injected by the caller. + self.eval_strategy = eval_strategy + self.on_experiment: list = [] # Callbacks fired after each evaluate() + + @property + def sessions_dir(self) -> Path: + return self.vero_home / "sessions" + + @property + def dataset_cache(self) -> Path: + return self.vero_home / "datasets" + + @property + def subprocess_env(self) -> dict[str, str] | None: + """Build subprocess env on demand from var names. Returns None to inherit os.environ.""" + if self._subprocess_env_vars is None: + return None + from vero.utils.subprocess_env import build_subprocess_env + + return build_subprocess_env(self._subprocess_env_vars) + + def _get_subprocess_env_with_vero_home(self) -> dict[str, str] | None: + """Build subprocess env and ensure VERO_HOME_DIR is set.""" + env = self.subprocess_env + if env is not None: + env["VERO_HOME_DIR"] = str(self.vero_home) + return env + + @staticmethod + def log_evaluation_results(result: ExperimentResult) -> None: + """Logs the evaluation results to the console.""" + stats = ( + result.sample_results_statistics( + as_dict=True, convert_lists_to_strings=True + ) + or {} + ) + if len(stats) > 0: + syntax = Syntax( + yaml.dump(stats, sort_keys=False), + "yaml", + theme="monokai", + line_numbers=False, + ) + console.print( + Panel( + syntax, + title="[bold green]⚙️ Evaluation Statistics[/bold green]", + border_style="green", + ) + ) + else: + console.print(f"No ExperimentResult found for run {result.run_id}.") + + def load_sample_results_from_cache( + self, evaluation_parameters: EvaluationParameters + ) -> dict[int, SampleResult]: + """Load the sample results from the cache. + + Tries to load from per-sample files first (new format), then falls back + to the single JSON file (legacy format) for backward compatibility. + """ + sample_results = load_all_sample_results( + self.sessions_dir, self.session_id, evaluation_parameters.result_id + ) + + if not sample_results: + logger.warning( + f"No sample results found for run {evaluation_parameters.run.id}." + ) + + return sample_results + + def _get_uv_params( + self, agent_project_path: Path | str + ) -> tuple[UvRunParameters, Path | str]: + """Build UvRunParameters and determine cwd for subprocess. + + When task_project is set, runs uv in the task project and layers + the agent code on top via --with-editable. Otherwise runs in the + agent project directly (backward compat). + + Returns: + (uv_params, cwd) tuple. + """ + if self.task_project: + return ( + UvRunParameters.from_env( + project=str(self.task_project), + with_editable=str(agent_project_path), + ), + self.task_project, + ) + return UvRunParameters.from_env( + project=str(agent_project_path) + ), agent_project_path + + async def _discover_tasks(self, project_path: Path | str) -> dict: + """Discover tasks via isolated subprocess. + + Args: + project_path: Path to the agent project. + + Returns: + Dictionary with package name and task metadata. + """ + uv_params, cwd = self._get_uv_params(project_path) + cmd = [*uv_params.get_cmd(), *get_discover_cmd(task_module=self.task_module)] + result = await run_subprocess_with_tee( + cmd, + timeout=60, + cwd=str(cwd), + flush=False, + tee_stdout=False, + env=self._get_subprocess_env_with_vero_home(), + ) + + if result.returncode != 0: + raise ExperimentRunFailedError( + f"Task discovery failed. Error: {result.stderr}.", + stdout=result.stdout, + stderr=result.stderr, + returncode=int(result.returncode), + ) + + return json.loads(result.stdout) + + async def _run_task( + self, + project_path: Path | str, + task_name: str, + params_file: Path, + timeout: int = 60 * 10, + ) -> dict | None: + """Execute task via isolated subprocess. + + Args: + project_path: Path to the user's project. + task_name: Name of the task to execute. + params_file: Path to JSON file containing EvaluationParameters. + timeout: Subprocess timeout in seconds. + + Returns: + Metrics dictionary from task execution, or None if parsing fails. + """ + uv_params, cwd = self._get_uv_params(project_path) + cmd = [ + *uv_params.get_cmd(), + *get_run_cmd( + task_name, params_file, hooks=self.hooks, task_module=self.task_module + ), + ] + result = await run_subprocess_with_tee( + cmd, + timeout=timeout, + cwd=cwd, + flush=True, + env=self._get_subprocess_env_with_vero_home(), + ) + logger.info("Subprocess complete!") + + # Save subprocess output for debugging + log_dir = params_file.parent + if result.stderr: + (log_dir / "subprocess_stderr.log").write_text(result.stderr) + if result.stdout: + (log_dir / "subprocess_stdout.log").write_text(result.stdout) + if result.returncode != 0: + (log_dir / "subprocess_returncode.txt").write_text(str(result.returncode)) + logger.warning( + f"Subprocess exited with code {result.returncode}. " + f"Stderr: {result.stderr[:500] if result.stderr else '(empty)'}" + ) + + # Read metrics from file (written by task subprocess) + metrics_path = log_dir / "metrics.json" + if metrics_path.exists(): + try: + return json.loads(metrics_path.read_text()) + except json.JSONDecodeError: + logger.warning(f"Failed to parse {metrics_path} as JSON") + return None + else: + logger.warning(f"Metrics file not found at {metrics_path}") + return None + + async def _run_task_in_subprocess( + self, + params: EvaluationParameters, + workspace: Workspace, + ) -> None: + """Run task via vero.task_utils subprocess. + + Args: + params: Evaluation parameters (must have task set). + workspace: Workspace to run in. + + Raises: + ExperimentRunFailedError: If task discovery or execution fails. + """ + + # Discover available tasks first + try: + discovery_result = await self._discover_tasks(workspace.project_path) + except Exception as e: + error_str = "".join(traceback.format_exception(type(e), e, e.__traceback__)) + raise ExperimentRunFailedError( + f"Task discovery failed. Error: {error_str}.", + stdout="", + stderr=error_str, + returncode=1, + ) + + # Validate the requested task exists + available_tasks = discovery_result.get("tasks", {}) + if params.task not in available_tasks: + available_names = list(available_tasks.keys()) + raise ExperimentRunFailedError( + f"Task '{params.task}' not found in package '{discovery_result.get('package', 'unknown')}'.\n" + f"Available tasks: {available_names if available_names else '(none found)'}\n" + f"Ensure your task is registered in vero_tasks/__init__.py", + stdout="", + stderr="", + returncode=1, + ) + + # Validate required environment variables + required_env = available_tasks[params.task].get("required_env_vars", []) + if required_env: + missing = [v for v in required_env if not os.environ.get(v)] + if missing: + raise ExperimentRunFailedError( + f"Task '{params.task}' requires environment variables that are not set: " + f"{', '.join(missing)}. Set them before running.", + stdout="", + stderr="", + returncode=1, + ) + + # Run the task + result_dir = get_experiment_dir( + self.sessions_dir, self.session_id, params.result_id + ) + params_file = result_dir / evaluation_parameters_basename + logger.info( + f"Running task '{params.task}' via vero.task_utils in {workspace.project_path}" + ) + try: + metrics = await self._run_task( + workspace.project_path, + params.task, + params_file, + timeout=params.timeout, + ) + logger.info(f"Task completed with metrics: {metrics}") + except Exception as e: + error_str = "".join(traceback.format_exception(type(e), e, e.__traceback__)) + raise ExperimentRunFailedError( + f"Task execution failed. Error: {error_str}.", + stdout="", + stderr=error_str, + returncode=1, + ) + + async def run( + self, + evaluation_parameters: EvaluationParameters, + use_copy: bool | None = None, + ) -> ExperimentResult: + """Run an experiment by checking out the candidate commit and running tasks via uv. + + Args: + evaluation_parameters: The parameters for the evaluation. + use_copy: Override for self.use_copy. If True, creates a temporary isolated copy + of the workspace (always clean). If False, uses the current workspace (requires clean state). + + Returns: + ExperimentResult with sample results and metadata. + """ + use_copy = use_copy if use_copy is not None else self.use_copy + + if not use_copy: + return await self._run_in_workspace(evaluation_parameters, self.workspace) + + async with self.workspace.temp_copy( + from_version=evaluation_parameters.run.candidate.commit, + ) as temp_workspace: + return await self._run_in_workspace(evaluation_parameters, temp_workspace) + + async def evaluate( + self, + commit: str, + dataset_id: str, + split: str, + task: str | None = None, + sample_ids: list[int] | None = None, + db: ExperimentDatabase | None = None, + evaluation_parameters: BaseEvaluationParameters | None = None, + use_copy: bool | None = None, + ) -> Experiment: + """Full evaluation lifecycle: resolve commit → run → create experiment → DB → hooks. + + This is the single entry point for all evaluations. Both Policy.evaluate_commit() + and ExperimentRunnerTool delegate here. + + Args: + commit: Git commit hash or ref to evaluate. + dataset_id: Dataset ID in the session store. + split: Dataset split to evaluate. + task: Task name to execute. + sample_ids: Specific sample IDs to evaluate (None = all). + db: ExperimentDatabase to record the experiment in. + evaluation_parameters: Base eval params (timeout, concurrency, etc.). + use_copy: Whether to create a temporary copy for the eval. + + Returns: + The completed Experiment with results. + """ + from vero.core.db.database import Experiment + + # Resolve commit ref to canonical version ID + try: + if isinstance(self.workspace, GitWorkspace): + full_hash = await self.workspace.resolve_ref(commit) + else: + full_hash = commit + except Exception as e: + raise ValueError( + f"Cannot resolve commit '{commit}': {e}. " + f"Make sure the commit exists in the repository." + ) + + # Build candidate + candidate = None + if db is not None: + candidate = db.get_candidate((self.workspace.name, full_hash)) + if candidate is None: + candidate = Candidate(commit=full_hash, repo_name=self.workspace.name) + + # Build run + dataset_subset = DatasetSubset( + split=split, sample_ids=sample_ids, dataset_id=dataset_id + ) + run = ExperimentRun(candidate=candidate, dataset_subset=dataset_subset) + + # Build eval params + base_params = evaluation_parameters or BaseEvaluationParameters() + params = EvaluationParameters( + **base_params.model_dump(), + run=run, + dataset_id=dataset_id, + task=task, + session_id=self.session_id, + ) + + # Run + result = await self.run(params, use_copy=use_copy) + + # Create experiment + experiment = Experiment(run=run, result=result) + + # Add to DB + if db is not None: + db.add_experiment(experiment) + + # Fire post-eval callbacks (may be sync or async) + import asyncio as _asyncio + + for callback in self.on_experiment: + try: + result = callback(experiment) + if _asyncio.iscoroutine(result): + await result + except Exception as e: + logger.warning(f"on_experiment callback failed: {e}") + + return experiment + + async def _run_in_workspace( + self, params: EvaluationParameters, workspace: Workspace + ) -> ExperimentResult: + """Run an experiment by checking out the candidate commit and running tasks via uv.""" + + # We cannot execute with a dirty workspace, as this may introduce side effects on the evaluation results. + if await workspace.is_dirty(): + raise RuntimeError( + "Evaluator cannot execute. There are unsaved changes in the workspace." + ) + + # Update the evaluation parameters with the dataset loader and session_id + params.session_id = self.session_id + + # Initialize the directory to store the evaluation and pytest report files + result_dir = initialize_result_store( + self.sessions_dir, self.session_id, params.result_id + ) + + save_json_to_cache( + self.sessions_dir, + self.session_id, + params.result_id, + basename=evaluation_parameters_basename, + data=params, + ) + logger.info( + f"Saved evaluation parameters to cache: {result_dir / evaluation_parameters_basename}" + ) + + # Git-specific: fetch from remote if configured + if self.sync and isinstance(workspace, GitWorkspace): + await workspace.maybe_fetch() + + # Clear any stale cached results before running to avoid reading old data if run fails + clear_result_cache( + self.sessions_dir, + self.session_id, + params.result_id, + result_basenames=[pytest_report_basename, evaluation_results_basename], + ) + + if self.eval_strategy is not None: + # Non-default strategy (e.g. Harbor Mode B): it owns staging + execution + + # collation, persisting SampleResults to the result store. + async with workspace.at(params.run.candidate.commit): + await self.eval_strategy.produce_sample_results( + workspace=workspace, params=params, result_dir=result_dir + ) + else: + # Mode A: ship data into the sandbox, run task.utils, copy results back. + experiment_dir = str( + get_experiment_dir(self.sessions_dir, self.session_id, params.result_id) + ) + await workspace.sandbox.upload(experiment_dir, experiment_dir) + + # Upload dataset cache so subprocess can load it + from vero.core.dataset.store import _read_mapping + + mapping = _read_mapping(self.sessions_dir, self.session_id) + dataset_fp = mapping.get(params.dataset_id or "") + if dataset_fp: + cache_entry = str(self.dataset_cache / dataset_fp) + await workspace.sandbox.upload(cache_entry, cache_entry) + # Also upload the session datasets.json mapping + session_dir = str(get_session_dir(self.sessions_dir, self.session_id)) + datasets_json = f"{session_dir}/datasets.json" + await workspace.sandbox.upload(datasets_json, datasets_json) + + # Switch to the candidate version and run the evaluation in a subprocess + async with workspace.at(params.run.candidate.commit): + await self._run_task_in_subprocess(params, workspace) + + # Transfer results back from the sandbox + await workspace.sandbox.download(experiment_dir, experiment_dir) + + sample_results = self.load_sample_results_from_cache(params) + + if not sample_results: + raise ExperimentRunFailedError( + f"No sample results found for run {params.run.id}! Likely because execution failed.", + returncode=1, + ) + else: + result = ExperimentResult.create_with_status( + id=params.result_id, + error_rate=params.error_rate_threshold, + run_id=params.run.id, + sample_results=sample_results, + ) + + # Write result metadata to disk so the DB can be reconstructed from experiments/ + save_json_to_cache( + self.sessions_dir, + self.session_id, + params.result_id, + basename=result_metadata_basename, + data={ + "id": result.id, + "run_id": result.run_id, + "status": result.status.value, + }, + ) + + self.log_evaluation_results(result) + return result + + +def _resolve_vero_dependency(isolated_dir: Path, original_project_dir: Path) -> None: + """Resolve the vero path dependency in pyproject.toml after isolation. + + When a project is isolated (copied to a new location), relative path + dependencies in ``[tool.uv.sources]`` break. This function resolves + the ``scale-vero`` dependency to an absolute path via ``uv add``. + + Raises ValueError if any *other* relative path dependencies are found, + since those are unsupported and would silently break. + """ + import subprocess + import tomllib + + pyproject_path = isolated_dir / "pyproject.toml" + if not pyproject_path.exists(): + return + + with open(pyproject_path, "rb") as f: + pyproject = tomllib.load(f) + + sources = pyproject.get("tool", {}).get("uv", {}).get("sources", {}) + if not sources: + return + + for name, source in sources.items(): + if not isinstance(source, dict) or "path" not in source: + continue + + rel_path = source["path"] + if not rel_path.startswith(".") and not rel_path.startswith("/"): + continue # Not a relative path + + if "vero" in name.lower(): + # Always resolve to the known vero package directory rather than + # trusting the relative path (which may be stale or wrong). + from vero.core.constants import PACKAGE_DIR + + abs_path = PACKAGE_DIR + editable_flag = ["--editable"] if source.get("editable") else [] + subprocess.run( + ["uv", "add", *editable_flag, "--dev", str(abs_path)], + cwd=isolated_dir, + capture_output=True, + check=True, + ) + logger.info(f"Resolved {name} dependency: {rel_path} -> {abs_path}") + else: + raise ValueError( + f"Unsupported relative path dependency '{name}' " + f"(path={rel_path!r}) in {pyproject_path}. " + f"Only vero is handled during isolation." + ) + + +def isolate_project( + project_path: Path | str, + session_id: str, + git_ref: str = "HEAD", + *, + sessions_dir: Path, +) -> Path: + """Copy a project into a fresh, standalone git repo. + + Useful when the project lives inside a monorepo or has uncommitted changes. + Extracts files at *git_ref* via ``git archive`` (falling back to a plain + copy when the source is not a git repo), then ``git init`` + ``git commit`` + so the result is a clean, self-contained repository. + + Relative path dependencies on vero in pyproject.toml are resolved to + absolute paths so they remain valid after the copy. + + Args: + project_path: Path to the project directory. + session_id: Session ID (isolated copy is placed under the session dir). + git_ref: Git ref to archive from (default: HEAD). + sessions_dir: Path to the sessions root directory. + + Returns: + Path to the isolated project root. + """ + import shutil + import subprocess + + project_path = Path(project_path).resolve() + isolated_dir = (sessions_dir / session_id) / project_path.name + isolated_dir.mkdir(parents=True, exist_ok=True) + + repo_root_result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + cwd=project_path, + capture_output=True, + text=True, + ) + + if repo_root_result.returncode == 0: + repo_root_path = Path(repo_root_result.stdout.strip()) + project_rel = project_path.relative_to(repo_root_path) + strip = len(project_rel.parts) + + archive = subprocess.Popen( + ["git", "archive", git_ref, str(project_rel)], + cwd=repo_root_path, + stdout=subprocess.PIPE, + ) + subprocess.run( + ["tar", "xf", "-", "--strip-components", str(strip)], + cwd=isolated_dir, + stdin=archive.stdout, + check=True, + ) + archive.wait() + else: + shutil.copytree(project_path, isolated_dir, dirs_exist_ok=True) + + # Resolve vero dependency before git init (so it's in the initial commit) + _resolve_vero_dependency(isolated_dir, project_path) + + subprocess.run(["git", "init"], cwd=isolated_dir, capture_output=True, check=True) + subprocess.run( + ["git", "add", "."], cwd=isolated_dir, capture_output=True, check=True + ) + subprocess.run( + [ + "git", + "-c", + "user.name=vero", + "-c", + "user.email=vero@localhost", + "commit", + "-m", + "Initial commit (isolated)", + ], + cwd=isolated_dir, + capture_output=True, + check=True, + ) + + if repo_root_result.returncode == 0: + subprocess.run( + ["git", "remote", "add", "origin", repo_root_result.stdout.strip()], + cwd=isolated_dir, + capture_output=True, + ) + + logger.info(f"Isolated project: {project_path} -> {isolated_dir}") + return isolated_dir + + +async def run_evaluation( + project_path: Path | str, + dataset: str | Path, + split: str, + task: str | None = None, + commit: str | None = None, + sample_ids: list[int] | None = None, + num_samples: int | None = None, + task_params: dict | None = None, + seed: int = 42, + timeout: int = 3600, + per_sample_timeout: int = 180, + create_temporary_copy: bool = False, + isolate: bool = False, + hooks: list[str] | None = None, + session_id: str | None = None, + max_concurrency: int | None = None, + subprocess_env_vars: list[str] | Path | str | None = None, + task_project: Path | str | None = None, + task_module: str | None = None, + vero_home: Path | None = None, +) -> ExperimentResult: + """Run an evaluation using the given parameters. + + Args: + project_path: Path to the agent project to evaluate. + dataset: Dataset, DatasetDict, path to saved dataset, or dataset ID string. + split: Dataset split to evaluate. + task: Task name to execute from vero_tasks module. + commit: Commit to evaluate. + sample_ids: List of sample IDs to evaluate. + num_samples: Number of samples to evaluate. + task_params: Task-specific parameters for the evaluation. + seed: Random seed for sample selection. + timeout: Overall timeout for the evaluation subprocess in seconds. + per_sample_timeout: Timeout for a single sample in seconds. + create_temporary_copy: Whether to create a temporary copy for the evaluation. + isolate: Whether to copy the project into a fresh git repo before evaluating. + hooks: List of hook names to execute before task. + session_id: Session ID. + max_concurrency: Maximum concurrent tasks. + subprocess_env_vars: Environment variable names to pass to task subprocesses. + task_project: Path to a separate task project. When set, evaluator runs + uv in the task project and layers the agent via --with-editable. + task_module: Explicit Python module to import for task registration + (e.g. "my_eval_tasks.vero_tasks"). If None, auto-discovers. + vero_home: Path to the vero home directory. Defaults to ~/.vero. + + Returns: + The experiment result. + + Raises: + ExperimentRunFailedError: If the evaluation fails. + """ + from vero.core.dataset.store import resolve_and_save_dataset + + vh = vero_home or get_vero_home_dir() + sessions_dir = vh / "sessions" + dataset_cache = vh / "datasets" + + if task_params is None: + task_params = {} + + if session_id is None: + from uuid import uuid4 + + session_id = str(uuid4()) + logger.info(f"Auto-generated session ID: {session_id}") + + if isolate: + project_path = isolate_project( + project_path, session_id, sessions_dir=sessions_dir + ) + + workspace = await GitWorkspace.create(project_path) + + # Resolve and save dataset + dataset_id = resolve_and_save_dataset( + dataset, sessions_dir, dataset_cache, session_id + ) + + evaluator = Evaluator( + workspace=workspace, + use_copy=create_temporary_copy, + hooks=hooks, + session_id=session_id, + vero_home=vh, + subprocess_env_vars=subprocess_env_vars, + task_project=Path(task_project) if task_project else None, + task_module=task_module, + ) + + if commit is None: + commit = await workspace.current_version() + logger.warning(f"No commit provided, using current commit: {commit}.") + + # Sample data if num_samples is provided + if num_samples is not None and sample_ids is None: + from vero.core.dataset.store import load_dataset as _load_ds + + ds = _load_ds(sessions_dir, dataset_cache, session_id, dataset_id) + rng = random.Random(seed) + sample_ids = rng.sample(range(len(ds[split])), num_samples) + + # Build base eval params + eval_params = BaseEvaluationParameters( + timeout=timeout, + sample_timeout=per_sample_timeout, + task_params=task_params, + ) + if max_concurrency is not None: + eval_params.max_concurrency = max_concurrency + + experiment = await evaluator.evaluate( + commit=commit, + dataset_id=dataset_id, + split=split, + task=task, + sample_ids=sample_ids, + evaluation_parameters=eval_params, + use_copy=create_temporary_copy, + ) + + result_dir = get_experiment_dir(sessions_dir, session_id, experiment.id) + console.print(f"Result available at {result_dir / samples_dir_name}") + return experiment.result diff --git a/vero/src/vero/evaluation/strategy.py b/vero/src/vero/evaluation/strategy.py new file mode 100644 index 0000000..3eb0c24 --- /dev/null +++ b/vero/src/vero/evaluation/strategy.py @@ -0,0 +1,35 @@ +"""The evaluation strategy seam. + +The Evaluator handles the shared lifecycle (clean-tree check, result store, checkout, +ExperimentResult assembly) and delegates the mode-specific step — "produce per-sample +results for this candidate/split/sample_ids" — to an EvalStrategy. + +The default (Mode A) path is the in-process ``task.utils`` subprocess, kept inline in +the Evaluator. A non-default strategy (e.g. Harbor Mode B, injected from ``vero.harbor``) +implements this Protocol; the Evaluator never imports the strategy's module, keeping +``vero.evaluation`` harbor-agnostic. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from vero.core.evaluation import EvaluationParameters + from vero.workspace import Workspace + + +@runtime_checkable +class EvalStrategy(Protocol): + async def produce_sample_results( + self, + *, + workspace: Workspace, + params: EvaluationParameters, + result_dir: Path, + ) -> None: + """Run the evaluation for ``params.run`` (commit/split/sample_ids) against the + checked-out ``workspace`` and persist per-sample ``SampleResult``s to the result + store (so ``Evaluator`` can assemble them into an ``ExperimentResult``).""" + ... diff --git a/vero/src/vero/evaluator.py b/vero/src/vero/evaluator.py index 5b447bb..c5e9069 100644 --- a/vero/src/vero/evaluator.py +++ b/vero/src/vero/evaluator.py @@ -1,823 +1,15 @@ -from __future__ import annotations - -import json -import logging -import os -import random -import traceback -from pathlib import Path - -import yaml -from rich.panel import Panel -from rich.syntax import Syntax - -from .core.cli_adapters import UvRunParameters -from .core.constants import ( - evaluation_parameters_basename, - evaluation_results_basename, - pytest_report_basename, - result_metadata_basename, - samples_dir_name, -) -from .core.db.candidate import Candidate -from .core.db.database import Experiment, ExperimentDatabase -from .core.db.dataset import DatasetSubset -from .core.db.result import ExperimentResult, SampleResult -from .core.db.run import ExperimentRun -from .core.evaluation import BaseEvaluationParameters, EvaluationParameters -from .core.sessions import ( - clear_result_cache, - get_experiment_dir, - get_session_dir, - get_vero_home_dir, - initialize_result_store, - load_all_sample_results, - save_json_to_cache, +"""Back-compat shim. The implementation moved to ``vero.evaluation.evaluator``. + +Prefer importing from ``vero.evaluation`` going forward; this module is kept so +existing ``from vero.evaluator import ...`` imports (examples, external code) keep +working. +""" + +from vero.evaluation.evaluator import ( # noqa: F401 + Evaluator, + _resolve_vero_dependency, + isolate_project, + run_evaluation, ) -from .core.task.utils import get_discover_cmd, get_run_cmd -from .exceptions import ExperimentRunFailedError -from .logging import setup_console -from .utils import run_subprocess_with_tee -from .workspace import Workspace -from .workspace.git import GitWorkspace - -console = setup_console() - -logger = logging.getLogger(__name__) - - -class Evaluator: - """Evaluates experiment runs by checking out commits and running tasks in subprocesses.""" - - def __init__( - self, - workspace: Workspace, - session_id: str, - *, - vero_home: Path | None = None, - use_copy: bool = False, - hooks: list[str] | None = None, - sync: bool = False, - subprocess_env_vars: list | Path | str | None = None, - task_project: Path | None = None, - task_module: str | None = None, - ): - self.workspace = workspace - self.session_id = session_id - self.vero_home = vero_home or get_vero_home_dir() - self.use_copy = use_copy - self.hooks = hooks if hooks is not None else ["setup_logging"] - self.sync = sync - self._subprocess_env_vars = subprocess_env_vars - self.task_project = task_project - self.task_module = task_module - self.on_experiment: list = [] # Callbacks fired after each evaluate() - - @property - def sessions_dir(self) -> Path: - return self.vero_home / "sessions" - - @property - def dataset_cache(self) -> Path: - return self.vero_home / "datasets" - - @property - def subprocess_env(self) -> dict[str, str] | None: - """Build subprocess env on demand from var names. Returns None to inherit os.environ.""" - if self._subprocess_env_vars is None: - return None - from vero.utils.subprocess_env import build_subprocess_env - - return build_subprocess_env(self._subprocess_env_vars) - - def _get_subprocess_env_with_vero_home(self) -> dict[str, str] | None: - """Build subprocess env and ensure VERO_HOME_DIR is set.""" - env = self.subprocess_env - if env is not None: - env["VERO_HOME_DIR"] = str(self.vero_home) - return env - - @staticmethod - def log_evaluation_results(result: ExperimentResult) -> None: - """Logs the evaluation results to the console.""" - stats = ( - result.sample_results_statistics( - as_dict=True, convert_lists_to_strings=True - ) - or {} - ) - if len(stats) > 0: - syntax = Syntax( - yaml.dump(stats, sort_keys=False), - "yaml", - theme="monokai", - line_numbers=False, - ) - console.print( - Panel( - syntax, - title="[bold green]⚙️ Evaluation Statistics[/bold green]", - border_style="green", - ) - ) - else: - console.print(f"No ExperimentResult found for run {result.run_id}.") - - def load_sample_results_from_cache( - self, evaluation_parameters: EvaluationParameters - ) -> dict[int, SampleResult]: - """Load the sample results from the cache. - - Tries to load from per-sample files first (new format), then falls back - to the single JSON file (legacy format) for backward compatibility. - """ - sample_results = load_all_sample_results( - self.sessions_dir, self.session_id, evaluation_parameters.result_id - ) - - if not sample_results: - logger.warning( - f"No sample results found for run {evaluation_parameters.run.id}." - ) - - return sample_results - - def _get_uv_params( - self, agent_project_path: Path | str - ) -> tuple[UvRunParameters, Path | str]: - """Build UvRunParameters and determine cwd for subprocess. - - When task_project is set, runs uv in the task project and layers - the agent code on top via --with-editable. Otherwise runs in the - agent project directly (backward compat). - - Returns: - (uv_params, cwd) tuple. - """ - if self.task_project: - return ( - UvRunParameters.from_env( - project=str(self.task_project), - with_editable=str(agent_project_path), - ), - self.task_project, - ) - return UvRunParameters.from_env( - project=str(agent_project_path) - ), agent_project_path - - async def _discover_tasks(self, project_path: Path | str) -> dict: - """Discover tasks via isolated subprocess. - - Args: - project_path: Path to the agent project. - - Returns: - Dictionary with package name and task metadata. - """ - uv_params, cwd = self._get_uv_params(project_path) - cmd = [*uv_params.get_cmd(), *get_discover_cmd(task_module=self.task_module)] - result = await run_subprocess_with_tee( - cmd, - timeout=60, - cwd=str(cwd), - flush=False, - tee_stdout=False, - env=self._get_subprocess_env_with_vero_home(), - ) - - if result.returncode != 0: - raise ExperimentRunFailedError( - f"Task discovery failed. Error: {result.stderr}.", - stdout=result.stdout, - stderr=result.stderr, - returncode=int(result.returncode), - ) - - return json.loads(result.stdout) - - async def _run_task( - self, - project_path: Path | str, - task_name: str, - params_file: Path, - timeout: int = 60 * 10, - ) -> dict | None: - """Execute task via isolated subprocess. - - Args: - project_path: Path to the user's project. - task_name: Name of the task to execute. - params_file: Path to JSON file containing EvaluationParameters. - timeout: Subprocess timeout in seconds. - - Returns: - Metrics dictionary from task execution, or None if parsing fails. - """ - uv_params, cwd = self._get_uv_params(project_path) - cmd = [ - *uv_params.get_cmd(), - *get_run_cmd( - task_name, params_file, hooks=self.hooks, task_module=self.task_module - ), - ] - result = await run_subprocess_with_tee( - cmd, - timeout=timeout, - cwd=cwd, - flush=True, - env=self._get_subprocess_env_with_vero_home(), - ) - logger.info("Subprocess complete!") - - # Save subprocess output for debugging - log_dir = params_file.parent - if result.stderr: - (log_dir / "subprocess_stderr.log").write_text(result.stderr) - if result.stdout: - (log_dir / "subprocess_stdout.log").write_text(result.stdout) - if result.returncode != 0: - (log_dir / "subprocess_returncode.txt").write_text(str(result.returncode)) - logger.warning( - f"Subprocess exited with code {result.returncode}. " - f"Stderr: {result.stderr[:500] if result.stderr else '(empty)'}" - ) - - # Read metrics from file (written by task subprocess) - metrics_path = log_dir / "metrics.json" - if metrics_path.exists(): - try: - return json.loads(metrics_path.read_text()) - except json.JSONDecodeError: - logger.warning(f"Failed to parse {metrics_path} as JSON") - return None - else: - logger.warning(f"Metrics file not found at {metrics_path}") - return None - - async def _run_task_in_subprocess( - self, - params: EvaluationParameters, - workspace: Workspace, - ) -> None: - """Run task via vero.task_utils subprocess. - - Args: - params: Evaluation parameters (must have task set). - workspace: Workspace to run in. - - Raises: - ExperimentRunFailedError: If task discovery or execution fails. - """ - - # Discover available tasks first - try: - discovery_result = await self._discover_tasks(workspace.project_path) - except Exception as e: - error_str = "".join(traceback.format_exception(type(e), e, e.__traceback__)) - raise ExperimentRunFailedError( - f"Task discovery failed. Error: {error_str}.", - stdout="", - stderr=error_str, - returncode=1, - ) - - # Validate the requested task exists - available_tasks = discovery_result.get("tasks", {}) - if params.task not in available_tasks: - available_names = list(available_tasks.keys()) - raise ExperimentRunFailedError( - f"Task '{params.task}' not found in package '{discovery_result.get('package', 'unknown')}'.\n" - f"Available tasks: {available_names if available_names else '(none found)'}\n" - f"Ensure your task is registered in vero_tasks/__init__.py", - stdout="", - stderr="", - returncode=1, - ) - - # Validate required environment variables - required_env = available_tasks[params.task].get("required_env_vars", []) - if required_env: - missing = [v for v in required_env if not os.environ.get(v)] - if missing: - raise ExperimentRunFailedError( - f"Task '{params.task}' requires environment variables that are not set: " - f"{', '.join(missing)}. Set them before running.", - stdout="", - stderr="", - returncode=1, - ) - - # Run the task - result_dir = get_experiment_dir( - self.sessions_dir, self.session_id, params.result_id - ) - params_file = result_dir / evaluation_parameters_basename - logger.info( - f"Running task '{params.task}' via vero.task_utils in {workspace.project_path}" - ) - try: - metrics = await self._run_task( - workspace.project_path, - params.task, - params_file, - timeout=params.timeout, - ) - logger.info(f"Task completed with metrics: {metrics}") - except Exception as e: - error_str = "".join(traceback.format_exception(type(e), e, e.__traceback__)) - raise ExperimentRunFailedError( - f"Task execution failed. Error: {error_str}.", - stdout="", - stderr=error_str, - returncode=1, - ) - - async def run( - self, - evaluation_parameters: EvaluationParameters, - use_copy: bool | None = None, - ) -> ExperimentResult: - """Run an experiment by checking out the candidate commit and running tasks via uv. - - Args: - evaluation_parameters: The parameters for the evaluation. - use_copy: Override for self.use_copy. If True, creates a temporary isolated copy - of the workspace (always clean). If False, uses the current workspace (requires clean state). - - Returns: - ExperimentResult with sample results and metadata. - """ - use_copy = use_copy if use_copy is not None else self.use_copy - - if not use_copy: - return await self._run_in_workspace(evaluation_parameters, self.workspace) - - async with self.workspace.temp_copy( - from_version=evaluation_parameters.run.candidate.commit, - ) as temp_workspace: - return await self._run_in_workspace(evaluation_parameters, temp_workspace) - - async def evaluate( - self, - commit: str, - dataset_id: str, - split: str, - task: str | None = None, - sample_ids: list[int] | None = None, - db: ExperimentDatabase | None = None, - evaluation_parameters: BaseEvaluationParameters | None = None, - use_copy: bool | None = None, - ) -> Experiment: - """Full evaluation lifecycle: resolve commit → run → create experiment → DB → hooks. - - This is the single entry point for all evaluations. Both Policy.evaluate_commit() - and ExperimentRunnerTool delegate here. - - Args: - commit: Git commit hash or ref to evaluate. - dataset_id: Dataset ID in the session store. - split: Dataset split to evaluate. - task: Task name to execute. - sample_ids: Specific sample IDs to evaluate (None = all). - db: ExperimentDatabase to record the experiment in. - evaluation_parameters: Base eval params (timeout, concurrency, etc.). - use_copy: Whether to create a temporary copy for the eval. - - Returns: - The completed Experiment with results. - """ - from .core.db.database import Experiment - - # Resolve commit ref to canonical version ID - try: - if isinstance(self.workspace, GitWorkspace): - full_hash = await self.workspace.resolve_ref(commit) - else: - full_hash = commit - except Exception as e: - raise ValueError( - f"Cannot resolve commit '{commit}': {e}. " - f"Make sure the commit exists in the repository." - ) - - # Build candidate - candidate = None - if db is not None: - candidate = db.get_candidate((self.workspace.name, full_hash)) - if candidate is None: - candidate = Candidate(commit=full_hash, repo_name=self.workspace.name) - - # Build run - dataset_subset = DatasetSubset( - split=split, sample_ids=sample_ids, dataset_id=dataset_id - ) - run = ExperimentRun(candidate=candidate, dataset_subset=dataset_subset) - - # Build eval params - base_params = evaluation_parameters or BaseEvaluationParameters() - params = EvaluationParameters( - **base_params.model_dump(), - run=run, - dataset_id=dataset_id, - task=task, - session_id=self.session_id, - ) - - # Run - result = await self.run(params, use_copy=use_copy) - - # Create experiment - experiment = Experiment(run=run, result=result) - - # Add to DB - if db is not None: - db.add_experiment(experiment) - - # Fire post-eval callbacks (may be sync or async) - import asyncio as _asyncio - - for callback in self.on_experiment: - try: - result = callback(experiment) - if _asyncio.iscoroutine(result): - await result - except Exception as e: - logger.warning(f"on_experiment callback failed: {e}") - - return experiment - - async def _run_in_workspace( - self, params: EvaluationParameters, workspace: Workspace - ) -> ExperimentResult: - """Run an experiment by checking out the candidate commit and running tasks via uv.""" - - # We cannot execute with a dirty workspace, as this may introduce side effects on the evaluation results. - if await workspace.is_dirty(): - raise RuntimeError( - "Evaluator cannot execute. There are unsaved changes in the workspace." - ) - - # Update the evaluation parameters with the dataset loader and session_id - params.session_id = self.session_id - - # Initialize the directory to store the evaluation and pytest report files - result_dir = initialize_result_store( - self.sessions_dir, self.session_id, params.result_id - ) - - save_json_to_cache( - self.sessions_dir, - self.session_id, - params.result_id, - basename=evaluation_parameters_basename, - data=params, - ) - logger.info( - f"Saved evaluation parameters to cache: {result_dir / evaluation_parameters_basename}" - ) - - # Git-specific: fetch from remote if configured - if self.sync and isinstance(workspace, GitWorkspace): - await workspace.maybe_fetch() - - # Clear any stale cached results before running to avoid reading old data if run fails - clear_result_cache( - self.sessions_dir, - self.session_id, - params.result_id, - result_basenames=[pytest_report_basename, evaluation_results_basename], - ) - - # Transfer data into the sandbox before running - experiment_dir = str( - get_experiment_dir(self.sessions_dir, self.session_id, params.result_id) - ) - await workspace.sandbox.upload(experiment_dir, experiment_dir) - - # Upload dataset cache so subprocess can load it - from vero.core.dataset.store import _read_mapping - - mapping = _read_mapping(self.sessions_dir, self.session_id) - dataset_fp = mapping.get(params.dataset_id or "") - if dataset_fp: - cache_entry = str(self.dataset_cache / dataset_fp) - await workspace.sandbox.upload(cache_entry, cache_entry) - # Also upload the session datasets.json mapping - session_dir = str(get_session_dir(self.sessions_dir, self.session_id)) - datasets_json = f"{session_dir}/datasets.json" - await workspace.sandbox.upload(datasets_json, datasets_json) - - # Switch to the candidate version and run the evaluation in a subprocess - async with workspace.at(params.run.candidate.commit): - await self._run_task_in_subprocess(params, workspace) - - # Transfer results back from the sandbox - await workspace.sandbox.download(experiment_dir, experiment_dir) - - sample_results = self.load_sample_results_from_cache(params) - - if not sample_results: - raise ExperimentRunFailedError( - f"No sample results found for run {params.run.id}! Likely because execution failed.", - returncode=1, - ) - else: - result = ExperimentResult.create_with_status( - id=params.result_id, - error_rate=params.error_rate_threshold, - run_id=params.run.id, - sample_results=sample_results, - ) - - # Write result metadata to disk so the DB can be reconstructed from experiments/ - save_json_to_cache( - self.sessions_dir, - self.session_id, - params.result_id, - basename=result_metadata_basename, - data={ - "id": result.id, - "run_id": result.run_id, - "status": result.status.value, - }, - ) - - self.log_evaluation_results(result) - return result - - -def _resolve_vero_dependency(isolated_dir: Path, original_project_dir: Path) -> None: - """Resolve the vero path dependency in pyproject.toml after isolation. - - When a project is isolated (copied to a new location), relative path - dependencies in ``[tool.uv.sources]`` break. This function resolves - the ``scale-vero`` dependency to an absolute path via ``uv add``. - - Raises ValueError if any *other* relative path dependencies are found, - since those are unsupported and would silently break. - """ - import subprocess - import tomllib - - pyproject_path = isolated_dir / "pyproject.toml" - if not pyproject_path.exists(): - return - - with open(pyproject_path, "rb") as f: - pyproject = tomllib.load(f) - - sources = pyproject.get("tool", {}).get("uv", {}).get("sources", {}) - if not sources: - return - - for name, source in sources.items(): - if not isinstance(source, dict) or "path" not in source: - continue - - rel_path = source["path"] - if not rel_path.startswith(".") and not rel_path.startswith("/"): - continue # Not a relative path - - if "vero" in name.lower(): - # Always resolve to the known vero package directory rather than - # trusting the relative path (which may be stale or wrong). - from vero.core.constants import PACKAGE_DIR - - abs_path = PACKAGE_DIR - editable_flag = ["--editable"] if source.get("editable") else [] - subprocess.run( - ["uv", "add", *editable_flag, "--dev", str(abs_path)], - cwd=isolated_dir, - capture_output=True, - check=True, - ) - logger.info(f"Resolved {name} dependency: {rel_path} -> {abs_path}") - else: - raise ValueError( - f"Unsupported relative path dependency '{name}' " - f"(path={rel_path!r}) in {pyproject_path}. " - f"Only vero is handled during isolation." - ) - - -def isolate_project( - project_path: Path | str, - session_id: str, - git_ref: str = "HEAD", - *, - sessions_dir: Path, -) -> Path: - """Copy a project into a fresh, standalone git repo. - - Useful when the project lives inside a monorepo or has uncommitted changes. - Extracts files at *git_ref* via ``git archive`` (falling back to a plain - copy when the source is not a git repo), then ``git init`` + ``git commit`` - so the result is a clean, self-contained repository. - - Relative path dependencies on vero in pyproject.toml are resolved to - absolute paths so they remain valid after the copy. - - Args: - project_path: Path to the project directory. - session_id: Session ID (isolated copy is placed under the session dir). - git_ref: Git ref to archive from (default: HEAD). - sessions_dir: Path to the sessions root directory. - - Returns: - Path to the isolated project root. - """ - import shutil - import subprocess - - project_path = Path(project_path).resolve() - isolated_dir = (sessions_dir / session_id) / project_path.name - isolated_dir.mkdir(parents=True, exist_ok=True) - - repo_root_result = subprocess.run( - ["git", "rev-parse", "--show-toplevel"], - cwd=project_path, - capture_output=True, - text=True, - ) - - if repo_root_result.returncode == 0: - repo_root_path = Path(repo_root_result.stdout.strip()) - project_rel = project_path.relative_to(repo_root_path) - strip = len(project_rel.parts) - - archive = subprocess.Popen( - ["git", "archive", git_ref, str(project_rel)], - cwd=repo_root_path, - stdout=subprocess.PIPE, - ) - subprocess.run( - ["tar", "xf", "-", "--strip-components", str(strip)], - cwd=isolated_dir, - stdin=archive.stdout, - check=True, - ) - archive.wait() - else: - shutil.copytree(project_path, isolated_dir, dirs_exist_ok=True) - - # Resolve vero dependency before git init (so it's in the initial commit) - _resolve_vero_dependency(isolated_dir, project_path) - - subprocess.run(["git", "init"], cwd=isolated_dir, capture_output=True, check=True) - subprocess.run( - ["git", "add", "."], cwd=isolated_dir, capture_output=True, check=True - ) - subprocess.run( - [ - "git", - "-c", - "user.name=vero", - "-c", - "user.email=vero@localhost", - "commit", - "-m", - "Initial commit (isolated)", - ], - cwd=isolated_dir, - capture_output=True, - check=True, - ) - - if repo_root_result.returncode == 0: - subprocess.run( - ["git", "remote", "add", "origin", repo_root_result.stdout.strip()], - cwd=isolated_dir, - capture_output=True, - ) - - logger.info(f"Isolated project: {project_path} -> {isolated_dir}") - return isolated_dir - - -async def run_evaluation( - project_path: Path | str, - dataset: str | Path, - split: str, - task: str | None = None, - commit: str | None = None, - sample_ids: list[int] | None = None, - num_samples: int | None = None, - task_params: dict | None = None, - seed: int = 42, - timeout: int = 3600, - per_sample_timeout: int = 180, - create_temporary_copy: bool = False, - isolate: bool = False, - hooks: list[str] | None = None, - session_id: str | None = None, - max_concurrency: int | None = None, - subprocess_env_vars: list[str] | Path | str | None = None, - task_project: Path | str | None = None, - task_module: str | None = None, - vero_home: Path | None = None, -) -> ExperimentResult: - """Run an evaluation using the given parameters. - - Args: - project_path: Path to the agent project to evaluate. - dataset: Dataset, DatasetDict, path to saved dataset, or dataset ID string. - split: Dataset split to evaluate. - task: Task name to execute from vero_tasks module. - commit: Commit to evaluate. - sample_ids: List of sample IDs to evaluate. - num_samples: Number of samples to evaluate. - task_params: Task-specific parameters for the evaluation. - seed: Random seed for sample selection. - timeout: Overall timeout for the evaluation subprocess in seconds. - per_sample_timeout: Timeout for a single sample in seconds. - create_temporary_copy: Whether to create a temporary copy for the evaluation. - isolate: Whether to copy the project into a fresh git repo before evaluating. - hooks: List of hook names to execute before task. - session_id: Session ID. - max_concurrency: Maximum concurrent tasks. - subprocess_env_vars: Environment variable names to pass to task subprocesses. - task_project: Path to a separate task project. When set, evaluator runs - uv in the task project and layers the agent via --with-editable. - task_module: Explicit Python module to import for task registration - (e.g. "my_eval_tasks.vero_tasks"). If None, auto-discovers. - vero_home: Path to the vero home directory. Defaults to ~/.vero. - - Returns: - The experiment result. - - Raises: - ExperimentRunFailedError: If the evaluation fails. - """ - from vero.core.dataset.store import resolve_and_save_dataset - - vh = vero_home or get_vero_home_dir() - sessions_dir = vh / "sessions" - dataset_cache = vh / "datasets" - - if task_params is None: - task_params = {} - - if session_id is None: - from uuid import uuid4 - - session_id = str(uuid4()) - logger.info(f"Auto-generated session ID: {session_id}") - - if isolate: - project_path = isolate_project( - project_path, session_id, sessions_dir=sessions_dir - ) - - workspace = await GitWorkspace.create(project_path) - - # Resolve and save dataset - dataset_id = resolve_and_save_dataset( - dataset, sessions_dir, dataset_cache, session_id - ) - - evaluator = Evaluator( - workspace=workspace, - use_copy=create_temporary_copy, - hooks=hooks, - session_id=session_id, - vero_home=vh, - subprocess_env_vars=subprocess_env_vars, - task_project=Path(task_project) if task_project else None, - task_module=task_module, - ) - - if commit is None: - commit = await workspace.current_version() - logger.warning(f"No commit provided, using current commit: {commit}.") - - # Sample data if num_samples is provided - if num_samples is not None and sample_ids is None: - from vero.core.dataset.store import load_dataset as _load_ds - - ds = _load_ds(sessions_dir, dataset_cache, session_id, dataset_id) - rng = random.Random(seed) - sample_ids = rng.sample(range(len(ds[split])), num_samples) - - # Build base eval params - eval_params = BaseEvaluationParameters( - timeout=timeout, - sample_timeout=per_sample_timeout, - task_params=task_params, - ) - if max_concurrency is not None: - eval_params.max_concurrency = max_concurrency - - experiment = await evaluator.evaluate( - commit=commit, - dataset_id=dataset_id, - split=split, - task=task, - sample_ids=sample_ids, - evaluation_parameters=eval_params, - use_copy=create_temporary_copy, - ) - result_dir = get_experiment_dir(sessions_dir, session_id, experiment.id) - console.print(f"Result available at {result_dir / samples_dir_name}") - return experiment.result +__all__ = ["Evaluator", "isolate_project", "run_evaluation", "_resolve_vero_dependency"] diff --git a/vero/src/vero/policy.py b/vero/src/vero/policy.py index 5e630b9..247dc1a 100644 --- a/vero/src/vero/policy.py +++ b/vero/src/vero/policy.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import json import logging from dataclasses import dataclass, field @@ -17,7 +16,7 @@ ) from vero.core.db.database import Experiment, ExperimentDatabase from vero.core.evaluation import BaseEvaluationParameters -from vero.evaluator import Evaluator +from vero.evaluation.evaluator import Evaluator from vero.filesystem import AccessRule, AccessType from vero.logging import SessionLogger, log_experiments_to_wandb from vero.sandbox import Sandbox @@ -32,6 +31,8 @@ from datasets import DatasetDict from jinja2 import Template + from vero.harbor.config import HarborConfig + DatasetT = Path | str | DatasetDict logger = logging.getLogger(__name__) @@ -140,6 +141,11 @@ class Policy: # --- Sandbox --- sandbox: Sandbox | None = None + # --- Harbor (Mode B) --- + # When set, evaluation runs a nested `harbor run` (the agent-under-test on the + # configured Harbor tasks) instead of vero-native inference/scoring. + harbor: HarborConfig | None = None + # --- Storage --- vero_home: Path | str | None = None @@ -221,7 +227,7 @@ async def init(self) -> None: # Git workspace — create via sandbox.run() git commands project_path = Path(self.project_path) if self.isolate: - from vero.evaluator import isolate_project + from vero.evaluation.evaluator import isolate_project project_path = isolate_project( project_path, self.session_id, self.ref, sessions_dir=self.sessions_dir @@ -337,6 +343,13 @@ async def init(self) -> None: self._validate_budget_splits() self.session.budget = self.budget + # Mode B: inject a HarborRunner strategy when a HarborConfig is set. + eval_strategy = None + if self.harbor is not None: + from vero.harbor.runner import HarborRunner + + eval_strategy = HarborRunner(self.harbor) + # Evaluator — with explicit subprocess env self.session.evaluator = Evaluator( self.session.workspace, @@ -345,6 +358,7 @@ async def init(self) -> None: subprocess_env_vars=self.subprocess_env_vars, task_project=Path(self.task_project) if self.task_project else None, task_module=self.task_module, + eval_strategy=eval_strategy, ) # Register artifact callbacks on evaluator so they fire for all eval paths diff --git a/vero/src/vero/session.py b/vero/src/vero/session.py index 3a9b4de..d86bb7d 100644 --- a/vero/src/vero/session.py +++ b/vero/src/vero/session.py @@ -8,7 +8,7 @@ from vero.core.dataset import SplitAccess from vero.core.db import ExperimentDatabase from vero.core.evaluation import BaseEvaluationParameters -from vero.evaluator import Evaluator +from vero.evaluation.evaluator import Evaluator from vero.tools.experiment_runner import SplitBudget # noqa: E402 — direct import avoids tools/__init__.py from vero.workspace import Workspace diff --git a/vero/src/vero/tools/experiment_runner.py b/vero/src/vero/tools/experiment_runner.py index c393146..a3820da 100644 --- a/vero/src/vero/tools/experiment_runner.py +++ b/vero/src/vero/tools/experiment_runner.py @@ -2,90 +2,30 @@ import logging from dataclasses import dataclass, field +from pathlib import Path from typing import Callable, NoReturn +from vero.core.budget import BudgetLedger, SplitBudget from vero.core.db.database import Experiment, ExperimentDatabase from vero.core.evaluation import BaseEvaluationParameters -from vero.evaluator import Evaluator +from vero.evaluation.evaluator import Evaluator from vero.exceptions import ( ExperimentBudgetExceeded, ExperimentRunFailedError, - InvalidSplitError, ) +from vero.evaluation.engine import EvalRequest, EvaluationEngine from vero.tools.utils import is_tool logger = logging.getLogger(__name__) +# SplitBudget moved to vero.core.budget; re-exported here for the public import path. +__all__ = ["ExperimentRunnerTool", "SplitBudget"] + def _default_on_fatal(msg: str) -> NoReturn: raise RuntimeError(msg) -@dataclass -class SplitBudget: - """A stateful object that tracks the remaining budget for running experiments.""" - - split: str - dataset_id: str = "" - total_sample_budget: int | None = None - remaining_sample_budget: int | None = field(init=False) - total_run_budget: int | None = None - remaining_run_budget: int | None = field(init=False) - max_samples_per_run: int | None = None - - def __repr__(self) -> str: - repr_items = [ - ("split", self.split), - ("dataset_id", self.dataset_id), - ("total_sample_budget", self.total_sample_budget), - ("total_run_budget", self.total_run_budget), - ] - repr_items = [item for item in repr_items if item[1] is not None] - return ( - f"SplitBudget({', '.join([f'{item[0]}={item[1]}' for item in repr_items])})" - ) - - def __post_init__(self): - assert ( - self.total_sample_budget is not None or self.total_run_budget is not None - ), "Either total sample budget or total run budget must be provided." - self.remaining_sample_budget = self.total_sample_budget - self.remaining_run_budget = self.total_run_budget - - assert ( - isinstance(self.total_sample_budget, int) - or self.total_sample_budget is None - ) - assert isinstance(self.total_run_budget, int) or self.total_run_budget is None - assert ( - isinstance(self.max_samples_per_run, int) - or self.max_samples_per_run is None - ) - - def has_run_budget(self) -> bool: - return self.remaining_run_budget is None or self.remaining_run_budget > 0 - - def decrement_run_budget(self) -> None: - if self.remaining_run_budget is not None: - self.remaining_run_budget -= 1 - - def has_sample_budget(self, num_samples: int) -> bool: - return ( - self.remaining_sample_budget is None - or self.remaining_sample_budget >= num_samples - ) - - def decrement_sample_budget(self, num_samples: int) -> None: - if self.remaining_sample_budget is not None: - self.remaining_sample_budget -= num_samples - - def exceeds_per_run_budget(self, num_samples: int) -> bool: - return ( - self.max_samples_per_run is not None - and num_samples > self.max_samples_per_run - ) - - @dataclass class ExperimentRunnerTool: """Run target agents on tasks and get performance metrics.""" @@ -101,15 +41,25 @@ class ExperimentRunnerTool: ) _task: str | None = None db: ExperimentDatabase | None = None - _budget_map: dict[tuple[str, str], SplitBudget] = field( - default_factory=dict, repr=False - ) + _vero_home: Path | None = None + _session_id: str | None = None + # The shared evaluation core. This tool is a thin frontend over it (formats + # results for the LLM, owns on_fatal); the Harbor sidecar is the other frontend. + engine: EvaluationEngine | None = field(default=None, repr=False) def __post_init__(self): - if self.split_budgets: - self._budget_map = { - (sb.split, sb.dataset_id): sb for sb in self.split_budgets - } + self._build_engine() + + def _build_engine(self) -> None: + self.engine = EvaluationEngine( + evaluator=self.evaluator, + budget=BudgetLedger(self.split_budgets or []), + default_task=self._task, + db=self.db, + run_constraints=self.run_constraints, + session_id=self._session_id, + vero_home=self._vero_home, + ) def bind(self, session) -> None: from copy import deepcopy @@ -121,21 +71,23 @@ def bind(self, session) -> None: self._vero_home = session.vero_home self.run_constraints = session.evaluation_parameters self._task = session.task - self._budget_map = {(sb.split, sb.dataset_id): sb for sb in self.split_budgets} + self._build_engine() + + @property + def _budget_ledger(self) -> BudgetLedger: + return self.engine.budget + + @property + def _budget_map(self) -> dict[tuple[str, str], SplitBudget]: + """Back-compat view of the budget ledger, keyed (split, dataset_id). + + Returns the ledger's live SplitBudget objects (mutations propagate). + """ + return self.engine.budget.status() def _get_dataset_info(self, dataset_id: str): - """Get dataset info from the store.""" - from vero.core.dataset import DatasetInfo - from vero.core.dataset.store import load_dataset - - sessions_dir = self._vero_home / "sessions" if self._vero_home else None - dataset_cache = self._vero_home / "datasets" if self._vero_home else None - dataset = load_dataset(sessions_dir, dataset_cache, self._session_id, dataset_id) - return DatasetInfo( - id=dataset_id, - splits={split: len(dataset[split]) for split in dataset}, - features={split: list(dataset[split].features) for split in dataset}, - ) + """Get dataset info from the store (delegates to the shared service).""" + return self.engine._get_dataset_info(dataset_id) async def _resolve_commit(self, commit: str) -> str: """Resolve a commit reference to its full hash. @@ -165,93 +117,32 @@ async def _resolve_commit(self, commit: str) -> str: def _get_samples_from_split( self, dataset_id: str, split: str, num_samples: int ) -> list[int] | None: - """Get a list of sample ids from a split. If num_samples is greater than or equal to the size of the split, return None.""" - dataset_info = self._get_dataset_info(dataset_id) - split_size = dataset_info.splits[split] - num_samples = min(num_samples, split_size) - - if num_samples >= split_size: - return None - - sample_ids = list(range(num_samples)) - return sample_ids + """First-N sample ids, or None for the whole split (delegates to the service).""" + return self.engine._get_samples_from_split(dataset_id, split, num_samples) def _validate_and_count_samples( self, dataset_id: str, split: str, sample_ids: list[int] | None = None ) -> int: - """Validate and count the number of samples in a split. If sample_ids is None, return the size of the split.""" - - dataset_info = self._get_dataset_info(dataset_id) - split_size = dataset_info.splits[split] - - # If None, the full split is being evaluated - if sample_ids is None: - return split_size - - # Validate that the sample ids are within the range of the split - invalid_sample_ids = [] - for sample_id in sample_ids: - if sample_id < 0 or sample_id >= split_size: - invalid_sample_ids.append(sample_id) - - if len(invalid_sample_ids) > 0: - raise ValueError( - f"The provided sample ids are outside the range of the split [0, {split_size - 1}]: {invalid_sample_ids}" - ) - - return len(sample_ids) + """Validate + count samples (delegates to the service).""" + return self.engine._validate_and_count_samples(dataset_id, split, sample_ids) def _validate_split_access(self, dataset_id: str, split: str) -> None: """Validate that the split and dataset combination is allowed.""" - - if (split, dataset_id) not in self._budget_map: - allowed_keys = list(self._budget_map.keys()) - raise InvalidSplitError( - f"No split budget found for the combination (dataset_id={dataset_id}, split={split}) either because it does not exist or because it is not allowed. Allowed combinations: {allowed_keys}" - ) + self._budget_ledger.validate(dataset_id, split) def _check_budget( self, dataset_id: str, split: str, requested_num_samples: int - ) -> str: + ) -> None: """Check that the budget allows for the requested number of samples.""" - - # Check if this split and dataset combination is allowed - self._validate_split_access(dataset_id, split) - budget = self._budget_map[(split, dataset_id)] - - # Determine if we have enough runs left - if not budget.has_run_budget(): - raise ExperimentBudgetExceeded( - f"No runs left for the {split} split of the {dataset_id} dataset." - ) - - # Check against remaining sample budget - if not budget.has_sample_budget(requested_num_samples): - raise ExperimentBudgetExceeded( - f"Requested {requested_num_samples} samples for the {split} split of the {dataset_id} dataset, but the remaining sample budget only allows for {budget.remaining_sample_budget} samples." - ) - - # Check against max samples per run constraint - if budget.exceeds_per_run_budget(requested_num_samples): - raise ExperimentBudgetExceeded( - f"Requested {requested_num_samples} samples for the {split} split of the {dataset_id} dataset, but only {budget.max_samples_per_run} are allowed per run." - ) + self._budget_ledger.check(dataset_id, split, requested_num_samples) def _update_budget(self, dataset_id: str, split: str, num_samples: int) -> str: - """Update the remaining budget for a given dataset and split and return a message about the update.""" - - self._validate_split_access(dataset_id, split) - budget = self._budget_map[(split, dataset_id)] + """Decrement the budget for a given dataset and split; return a status message.""" + budget = self._budget_ledger.record(dataset_id, split, num_samples) info = "" - - # Update the remaining budget - budget.decrement_sample_budget(num_samples) if budget.total_sample_budget is not None: info += f"Used {num_samples} samples from the total {budget.total_sample_budget} sample budget. Remaining sample budget: {budget.remaining_sample_budget}. " - - # Update the remaining runs - budget.decrement_run_budget() if budget.remaining_run_budget is not None: info += f"Used 1 run from the total {budget.total_run_budget} run budget. Remaining runs: {budget.remaining_run_budget}" @@ -263,20 +154,18 @@ async def _evaluate_commit( dataset_id: str, split: str, sample_ids: list[int] | None = None, - add_to_db: bool = True, ) -> Experiment: - """Evaluate a version of the codebase specified by a Git commit on a subset of a dataset.""" + """Run one evaluation via the shared EvaluationEngine. + Uses ``admin=True`` so the service does not meter the budget — this tool + owns budgeting via ``_check_budget``/``_update_budget`` (check-before, + decrement-after) to preserve its existing semantics. + """ + req = EvalRequest( + dataset_id=dataset_id, split=split, commit=commit, sample_ids=sample_ids + ) try: - return await self.evaluator.evaluate( - commit=commit, - dataset_id=dataset_id, - split=split, - task=self._task, - sample_ids=sample_ids, - db=self.db if add_to_db else None, - evaluation_parameters=self.run_constraints, - ) + return await self.engine.evaluate(req, admin=True) except ExperimentRunFailedError as e: if e.returncode >= 3: self.on_fatal(str(e)) @@ -295,8 +184,7 @@ async def check_remaining_experiment_budget( Returns: A string containing the remaining budget for the given dataset and split. """ - self._validate_split_access(dataset_id, split) - budget = self._budget_map[(split, dataset_id)] + budget = self._budget_ledger.get(dataset_id, split) info = "" if budget.total_sample_budget is not None: @@ -355,7 +243,6 @@ async def evaluate_commit( dataset_id=dataset_id, split=split, sample_ids=sample_ids, - add_to_db=True, ) except Exception as e: raise e @@ -385,7 +272,9 @@ async def evaluate_commit_on_all_splits( """ accessible_splits = [ - split for (split, ds_id) in self._budget_map.keys() if ds_id == dataset_id + split + for (split, ds_id) in self._budget_ledger.status().keys() + if ds_id == dataset_id ] logger.info( @@ -403,7 +292,7 @@ async def evaluate_commit_on_all_splits( for split in accessible_splits: full_split_size = self._validate_and_count_samples(dataset_id, split) - budget = self._budget_map.get((split, dataset_id)) + budget = self._budget_ledger.get(dataset_id, split) # Cap samples to remaining budget if needed requested_num_samples = full_split_size @@ -436,7 +325,6 @@ async def evaluate_commit_on_all_splits( dataset_id=dataset_id, split=split, sample_ids=sample_ids, - add_to_db=True, ) except Exception as e: results[split] = e diff --git a/vero/tests/test_budget.py b/vero/tests/test_budget.py new file mode 100644 index 0000000..c0d5047 --- /dev/null +++ b/vero/tests/test_budget.py @@ -0,0 +1,94 @@ +"""Tests for BudgetLedger (vero.core.budget).""" + +import json + +import pytest + +from vero.core.budget import BudgetLedger, SplitBudget +from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError + + +def _ledger(**kwargs): + return BudgetLedger( + [ + SplitBudget( + split="dev", dataset_id="ds1", total_sample_budget=100, total_run_budget=3 + ) + ], + **kwargs, + ) + + +class TestAllowlist: + def test_validate_allows_configured_pair(self): + _ledger().validate("ds1", "dev") # no raise + + def test_validate_rejects_unknown_pair(self): + with pytest.raises(InvalidSplitError): + _ledger().validate("ds1", "test") + with pytest.raises(InvalidSplitError): + _ledger().validate("other", "dev") + + +class TestCheck: + def test_check_passes_within_budget(self): + _ledger().check("ds1", "dev", 50) + + def test_check_rejects_over_sample_budget(self): + with pytest.raises(ExperimentBudgetExceeded): + _ledger().check("ds1", "dev", 101) + + def test_check_rejects_no_runs_left(self): + led = BudgetLedger([SplitBudget(split="dev", dataset_id="ds1", total_run_budget=1)]) + led.record("ds1", "dev", 0) # consume the one run + with pytest.raises(ExperimentBudgetExceeded): + led.check("ds1", "dev", 0) + + def test_check_rejects_over_per_run(self): + led = BudgetLedger( + [SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=100, max_samples_per_run=10)] + ) + with pytest.raises(ExperimentBudgetExceeded): + led.check("ds1", "dev", 11) + + +class TestRecord: + def test_record_decrements(self): + led = _ledger() + b = led.record("ds1", "dev", 30) + assert b.remaining_sample_budget == 70 + assert b.remaining_run_budget == 2 + + +class TestReserve: + @pytest.mark.asyncio + async def test_reserve_checks_then_decrements(self): + led = _ledger() + b = await led.reserve("ds1", "dev", 40) + assert b.remaining_sample_budget == 60 + assert b.remaining_run_budget == 2 + + @pytest.mark.asyncio + async def test_reserve_rejects_without_decrementing(self): + led = _ledger() + with pytest.raises(ExperimentBudgetExceeded): + await led.reserve("ds1", "dev", 101) + # rejected request costs nothing + assert led.get("ds1", "dev").remaining_sample_budget == 100 + assert led.get("ds1", "dev").remaining_run_budget == 3 + + +class TestPersistence: + def test_flush_writes_durable_json(self, tmp_path): + path = tmp_path / "ledger.json" + led = _ledger(persist_path=path) + led.record("ds1", "dev", 25) + data = json.loads(path.read_text()) + entry = next(e for e in data if e["split"] == "dev" and e["dataset_id"] == "ds1") + assert entry["remaining_sample_budget"] == 75 + assert entry["remaining_run_budget"] == 2 + + def test_no_flush_when_in_memory(self, tmp_path): + led = _ledger() # persist_path=None + led.record("ds1", "dev", 25) # no file written, no error + assert not list(tmp_path.iterdir()) diff --git a/vero/tests/test_dataset_viewer.py b/vero/tests/test_dataset_viewer.py index 044f569..632ffb4 100644 --- a/vero/tests/test_dataset_viewer.py +++ b/vero/tests/test_dataset_viewer.py @@ -3,18 +3,12 @@ from __future__ import annotations import json -import tempfile -from pathlib import Path -from unittest.mock import MagicMock import pytest from datasets import Dataset, DatasetDict from vero.core.dataset import ( - DatasetInfo, DefaultSplitNames, - SplitAccess, default_split_accesses, - get_non_viewable_splits, ) from vero.core.dataset.store import save_dataset from vero.policy import Session diff --git a/vero/tests/test_e2e_optimization.py b/vero/tests/test_e2e_optimization.py index e242bab..121525b 100644 --- a/vero/tests/test_e2e_optimization.py +++ b/vero/tests/test_e2e_optimization.py @@ -49,7 +49,7 @@ async def test_matmul_kernel_evaluates(workspace): """Naive kernel evaluates correctly — all samples produce valid scores.""" kernel_dir, task_dir, dataset_path, vero_home = workspace - from vero.evaluator import run_evaluation + from vero.evaluation.evaluator import run_evaluation result = await run_evaluation( project_path=kernel_dir, @@ -77,7 +77,7 @@ async def test_kernel_change_changes_score(workspace): """Modifying kernel code and re-evaluating produces different scores.""" kernel_dir, task_dir, dataset_path, vero_home = workspace - from vero.evaluator import run_evaluation + from vero.evaluation.evaluator import run_evaluation # Evaluate naive kernel result_v1 = await run_evaluation( @@ -170,7 +170,7 @@ def multiply(a, b): from datasets import DatasetDict from vero.core.dataset.store import save_dataset - from vero.evaluator import Evaluator + from vero.evaluation.evaluator import Evaluator session_id = "test-workspace-eval" ds = DatasetDict.load_from_disk(str(dataset_path)) diff --git a/vero/tests/test_engine.py b/vero/tests/test_engine.py new file mode 100644 index 0000000..789927b --- /dev/null +++ b/vero/tests/test_engine.py @@ -0,0 +1,97 @@ +"""Tests for EvaluationEngine (vero.evaluation.engine) — the shared evaluation core.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from vero.core.budget import BudgetLedger, SplitBudget +from vero.core.dataset import DatasetInfo +from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError +from vero.evaluation.engine import EvalRequest, EvaluationEngine + +_DATASET_INFO = DatasetInfo( + id="ds1", splits={"dev": 100, "test": 50}, features={"dev": [], "test": []} +) + + +def _make_service(budgets=None, monkeypatch=None): + evaluator = MagicMock() + evaluator.evaluate = AsyncMock(return_value="EXPERIMENT") # sentinel + svc = EvaluationEngine( + evaluator=evaluator, + budget=BudgetLedger( + budgets + or [SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=100, total_run_budget=3)] + ), + default_task="main", + session_id="s1", + ) + if monkeypatch is not None: + monkeypatch.setattr(svc, "_get_dataset_info", lambda dataset_id: _DATASET_INFO) + return svc + + +class TestResolveSamples: + def test_rejects_both_sample_ids_and_num_samples(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + with pytest.raises(ValueError, match="both sample_ids and num_samples"): + svc.resolve_samples(EvalRequest(dataset_id="ds1", split="dev", sample_ids=[0], num_samples=1)) + + def test_num_samples_first_n(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + ids, n = svc.resolve_samples(EvalRequest(dataset_id="ds1", split="dev", num_samples=5)) + assert ids == [0, 1, 2, 3, 4] and n == 5 + + def test_num_samples_full_split_is_none(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + ids, n = svc.resolve_samples(EvalRequest(dataset_id="ds1", split="dev", num_samples=100)) + assert ids is None and n == 100 # None == whole split + + def test_sample_ids_out_of_range_raises(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + with pytest.raises(ValueError, match="outside the range"): + svc.resolve_samples(EvalRequest(dataset_id="ds1", split="dev", sample_ids=[0, 999])) + + +class TestEvaluate: + @pytest.mark.asyncio + async def test_evaluate_meters_and_runs(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + exp = await svc.evaluate(EvalRequest(dataset_id="ds1", split="dev", commit="c1", num_samples=10)) + + assert exp == "EXPERIMENT" + svc.evaluator.evaluate.assert_awaited_once() + kwargs = svc.evaluator.evaluate.await_args.kwargs + assert kwargs["commit"] == "c1" and kwargs["split"] == "dev" and kwargs["task"] == "main" + assert kwargs["sample_ids"] == list(range(10)) + # budget metered + assert svc.status()[("dev", "ds1")].remaining_run_budget == 2 + assert svc.status()[("dev", "ds1")].remaining_sample_budget == 90 + + @pytest.mark.asyncio + async def test_evaluate_budget_exhausted_does_not_run(self, monkeypatch): + # 50-sample budget; num_samples=60 caps to 60 (< split size 100) and exceeds it + svc = _make_service( + budgets=[SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=50)], + monkeypatch=monkeypatch, + ) + with pytest.raises(ExperimentBudgetExceeded): + await svc.evaluate(EvalRequest(dataset_id="ds1", split="dev", commit="c1", num_samples=60)) + svc.evaluator.evaluate.assert_not_awaited() + + @pytest.mark.asyncio + async def test_evaluate_unknown_split_rejected(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + with pytest.raises(InvalidSplitError): + await svc.evaluate(EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10)) + + @pytest.mark.asyncio + async def test_admin_bypasses_budget(self, monkeypatch): + svc = _make_service(monkeypatch=monkeypatch) + # 'test' isn't in the agent budget map, but admin may evaluate it + await svc.evaluate( + EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10), admin=True + ) + svc.evaluator.evaluate.assert_awaited_once() + # nothing metered + assert svc.status()[("dev", "ds1")].remaining_run_budget == 3 diff --git a/vero/tests/test_eval_strategy.py b/vero/tests/test_eval_strategy.py new file mode 100644 index 0000000..6bf183e --- /dev/null +++ b/vero/tests/test_eval_strategy.py @@ -0,0 +1,66 @@ +"""Tests for the Evaluator strategy seam (vero.evaluation.strategy).""" + +import contextlib +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from vero.core.db.candidate import Candidate +from vero.core.db.dataset import DatasetSample, DatasetSubset +from vero.core.db.result import SampleResult +from vero.core.db.run import ExperimentRun +from vero.core.evaluation import EvaluationParameters +from vero.core.sessions import get_vero_home_dir, save_sample_result +from vero.evaluation.evaluator import Evaluator + + +def _mock_workspace(): + ws = MagicMock() + ws.name = "repo" + ws.is_dirty = AsyncMock(return_value=False) + + @contextlib.asynccontextmanager + async def _at(commit): + yield + + ws.at = _at + return ws + + +@pytest.mark.asyncio +async def test_injected_strategy_produces_results(tmp_path, monkeypatch): + monkeypatch.setenv("VERO_HOME_DIR", str(tmp_path / "vero_home")) + + called = {} + + class FakeStrategy: + async def produce_sample_results(self, *, workspace, params, result_dir): + called["yes"] = True + save_sample_result( + get_vero_home_dir() / "sessions", + params.session_id, + params.result_id, + sample_id=0, + result=SampleResult( + dataset_sample=DatasetSample(sample_id=0, split="test", dataset_id="ds"), + score=1.0, + commit=params.run.candidate.commit, + result_id=params.result_id, + ), + ) + + evaluator = Evaluator(_mock_workspace(), session_id="s", eval_strategy=FakeStrategy()) + params = EvaluationParameters( + run=ExperimentRun( + candidate=Candidate(commit="c1", repo_name="repo"), + dataset_subset=DatasetSubset(split="test", dataset_id="ds", sample_ids=[0]), + ), + session_id="s", + ) + + result = await evaluator.run(params, use_copy=False) + + assert called.get("yes") is True + assert result.sample_results[0].score == 1.0 + # Mode-A staging path was NOT taken (strategy branch); sandbox untouched + evaluator.workspace.sandbox.upload.assert_not_called() diff --git a/vero/tests/test_experiment_runner.py b/vero/tests/test_experiment_runner.py index d8a33fc..f2baac5 100644 --- a/vero/tests/test_experiment_runner.py +++ b/vero/tests/test_experiment_runner.py @@ -1,6 +1,6 @@ """Tests for ExperimentRunnerTool and SplitBudget.""" -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -23,14 +23,18 @@ @pytest.fixture(autouse=True) def mock_dataset_info(monkeypatch): - """Mock _get_dataset_info to avoid dataset store dependency in tests.""" - original = ExperimentRunnerTool._get_dataset_info + """Mock _get_dataset_info to avoid dataset store dependency in tests. + + The tool delegates dataset resolution to EvaluationEngine, so patch there + (the tool's own _get_dataset_info also delegates to the service). + """ + from vero.evaluation.engine import EvaluationEngine def patched_get_dataset_info(self, dataset_id): return _DEFAULT_DATASET_INFO monkeypatch.setattr( - ExperimentRunnerTool, "_get_dataset_info", patched_get_dataset_info + EvaluationEngine, "_get_dataset_info", patched_get_dataset_info ) diff --git a/vero/tests/test_external_tasks.py b/vero/tests/test_external_tasks.py index 51a730d..2c60bff 100644 --- a/vero/tests/test_external_tasks.py +++ b/vero/tests/test_external_tasks.py @@ -18,7 +18,7 @@ import pytest -from vero.evaluator import run_evaluation +from vero.evaluation.evaluator import run_evaluation def _init_git(path: Path) -> None: diff --git a/vero/tests/test_isolate_project.py b/vero/tests/test_isolate_project.py index bc94a03..d41071a 100644 --- a/vero/tests/test_isolate_project.py +++ b/vero/tests/test_isolate_project.py @@ -1,11 +1,10 @@ """Tests for project isolation with dependency resolution.""" import subprocess -from pathlib import Path import pytest -from vero.evaluator import _resolve_vero_dependency +from vero.evaluation.evaluator import _resolve_vero_dependency @pytest.fixture diff --git a/vero/tests/test_task.py b/vero/tests/test_task.py index 4d5972e..274067b 100644 --- a/vero/tests/test_task.py +++ b/vero/tests/test_task.py @@ -2,7 +2,6 @@ from __future__ import annotations -import warnings import pytest from pydantic import ValidationError @@ -369,3 +368,46 @@ async def infer(task, evaluation_parameters): params = _make_eval_params() with pytest.raises(RuntimeError, match="No evaluation function"): await t.run(params) + + +# --------------------------------------------------------------------------- +# Label scrubbing (Mode A) +# --------------------------------------------------------------------------- + + +class TestLabelScrubbing: + def test_scrub_inputs_helper(self): + t = create_task("scrub-helper", register=False, label_fields=["answer"]) + # mapping rows have label fields removed + assert t._scrub_inputs({"q": "x", "answer": "y"}) == {"q": "x"} + # non-mapping rows pass through unchanged + assert t._scrub_inputs("notadict") == "notadict" + # no label_fields configured -> no-op + t2 = create_task("scrub-helper-2", register=False) + assert t2._scrub_inputs({"q": "x", "answer": "y"}) == {"q": "x", "answer": "y"} + + @pytest.mark.asyncio + async def test_inference_never_sees_labels_scoring_does(self): + t = create_task("scrub-e2e", register=False, label_fields=["answer"]) + seen_by_inference = {} + + @t.load_data() + def load(evaluation_parameters): + return [{"q": "2+2", "answer": "4"}] + + @t.inference() + async def infer(task, evaluation_parameters): + seen_by_inference["keys"] = sorted(task.keys()) + return TaskOutput(output="4") + + @t.evaluation() + async def evaluate(task, output, evaluation_parameters): + # scoring receives the full row, including the label + assert "answer" in task + return TaskResult(score=1.0 if output.output == task["answer"] else 0.0) + + params = _make_eval_params(num_samples=1) + metrics = await t.run(params) + + assert seen_by_inference["keys"] == ["q"] # label stripped from inference + assert metrics["avg_score"] == 1.0 diff --git a/vero/tests/test_task_metrics.py b/vero/tests/test_task_metrics.py index dd13f73..4ae6f93 100644 --- a/vero/tests/test_task_metrics.py +++ b/vero/tests/test_task_metrics.py @@ -6,7 +6,7 @@ import pytest -from vero.evaluator import Evaluator +from vero.evaluation.evaluator import Evaluator from vero.utils.asyncio import SubprocessResult pytestmark = pytest.mark.asyncio @@ -43,8 +43,8 @@ def fake_subprocess(*args, **kwargs): returncode=0, ) - with patch("vero.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): - with patch("vero.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): + with patch("vero.evaluation.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): + with patch("vero.evaluation.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): result = await evaluator._run_task( Path("/fake/project"), "test_task", params_file ) @@ -59,8 +59,8 @@ async def test_run_task_returns_none_when_no_metrics_file(evaluator, experiment_ def fake_subprocess(*args, **kwargs): return SubprocessResult(args=["fake"], stdout="", stderr="", returncode=0) - with patch("vero.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): - with patch("vero.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): + with patch("vero.evaluation.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): + with patch("vero.evaluation.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): result = await evaluator._run_task( Path("/fake/project"), "test_task", params_file ) @@ -76,8 +76,8 @@ def fake_subprocess(*args, **kwargs): (tmp_path / "metrics.json").write_text("not valid json {{{") return SubprocessResult(args=["fake"], stdout="", stderr="", returncode=0) - with patch("vero.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): - with patch("vero.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): + with patch("vero.evaluation.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): + with patch("vero.evaluation.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): result = await evaluator._run_task( Path("/fake/project"), "test_task", params_file ) @@ -98,8 +98,8 @@ def fake_subprocess(*args, **kwargs): returncode=0, ) - with patch("vero.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): - with patch("vero.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): + with patch("vero.evaluation.evaluator.run_subprocess_with_tee", new=AsyncMock(side_effect=fake_subprocess)): + with patch("vero.evaluation.evaluator.UvRunParameters.from_env", return_value=MagicMock(get_cmd=lambda: ["uv", "run"])): await evaluator._run_task(Path("/fake/project"), "test_task", params_file) assert (tmp_path / "subprocess_stdout.log").read_text() == "some stdout" diff --git a/vero/tests/test_uv_with_editable.py b/vero/tests/test_uv_with_editable.py index 8e9b891..07171ae 100644 --- a/vero/tests/test_uv_with_editable.py +++ b/vero/tests/test_uv_with_editable.py @@ -8,7 +8,6 @@ from __future__ import annotations -import asyncio import subprocess import textwrap from pathlib import Path diff --git a/vero/uv.lock b/vero/uv.lock index 27fa2eb..b1b98ac 100644 --- a/vero/uv.lock +++ b/vero/uv.lock @@ -913,6 +913,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c1/ea/53f2148663b321f21b5a606bd5f191517cf40b7072c0497d3c92c4a13b1e/executing-2.2.1-py2.py3-none-any.whl", hash = "sha256:760643d3452b4d777d295bb167ccc74c64a81df23fb5e08eff250c425a4b2017", size = 28317, upload-time = "2025-09-01T09:48:08.5Z" }, ] +[[package]] +name = "fastapi" +version = "0.137.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "annotated-doc" }, + { name = "pydantic" }, + { name = "starlette" }, + { name = "typing-extensions" }, + { name = "typing-inspection" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d5/b1/e5b92c59d2c37817e77c1a8c2fc1f79cdcc04c68253e5406b43e3204cba7/fastapi-0.137.1.tar.gz", hash = "sha256:822360704230d9533d8d9475399613525968aa2f0b5bd2a3ccc9f18c88fd541c", size = 408293, upload-time = "2026-06-15T11:28:20.79Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/35/380b9a5922f4340e51c309cde09e5bd32e62f02302971bee30dc15aa0624/fastapi-0.137.1-py3-none-any.whl", hash = "sha256:64f6983c59e45c4b9fdc44e57cb8035c2451ee91ea8e8ec042aca37de7cf6b69", size = 121877, upload-time = "2026-06-15T11:28:19.523Z" }, +] + [[package]] name = "fastjsonschema" version = "2.21.2" @@ -4251,7 +4267,9 @@ dependencies = [ { name = "datasets" }, { name = "pydantic" }, { name = "python-dotenv" }, + { name = "pyyaml" }, { name = "requests" }, + { name = "rich" }, { name = "s3fs" }, { name = "tenacity" }, { name = "toml" }, @@ -4269,6 +4287,12 @@ evaluate = [ { name = "haikunator" }, { name = "rich" }, ] +harbor = [ + { name = "fastapi" }, + { name = "httpx" }, + { name = "jinja2" }, + { name = "uvicorn" }, +] jupyter = [ { name = "jupyter" }, ] @@ -4324,8 +4348,11 @@ requires-dist = [ { name = "datasets", specifier = ">=4.3.0" }, { name = "datasets", marker = "extra == 'optimize'", specifier = ">=4.3.0" }, { name = "docker", marker = "extra == 'docker'", specifier = ">=7.1.0" }, + { name = "fastapi", marker = "extra == 'harbor'", specifier = ">=0.110" }, { name = "haikunator", marker = "extra == 'evaluate'", specifier = ">=2.1.0" }, { name = "haikunator", marker = "extra == 'optimize'", specifier = ">=2.1.0" }, + { name = "httpx", marker = "extra == 'harbor'", specifier = ">=0.27" }, + { name = "jinja2", marker = "extra == 'harbor'", specifier = ">=3.1.6" }, { name = "jinja2", marker = "extra == 'optimize'", specifier = ">=3.1.6" }, { name = "jupyter", marker = "extra == 'jupyter'", specifier = ">=1.1.1" }, { name = "jupyterlab", marker = "extra == 'notebook'", specifier = ">=4.5.2" }, @@ -4338,7 +4365,9 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.11.7" }, { name = "pypdf", marker = "extra == 'optimize'", specifier = ">=6.2.0" }, { name = "python-dotenv", specifier = ">=1.2.2" }, + { name = "pyyaml", specifier = ">=6.0" }, { name = "requests", specifier = ">=2.32.5" }, + { name = "rich", specifier = ">=13.9.4" }, { name = "rich", marker = "extra == 'evaluate'", specifier = ">=13.9.4" }, { name = "rich", marker = "extra == 'optimize'", specifier = ">=13.9.4" }, { name = "s3fs", specifier = ">=2025.9.0" }, @@ -4349,10 +4378,11 @@ requires-dist = [ { name = "toml", specifier = ">=0.10.2" }, { name = "tqdm", specifier = ">=4.67.1" }, { name = "trafilatura", marker = "extra == 'optimize'", specifier = ">=2.0.0" }, + { name = "uvicorn", marker = "extra == 'harbor'", specifier = ">=0.27" }, { name = "wandb", marker = "extra == 'wandb'", specifier = ">=0.2.5" }, { name = "wcmatch", marker = "extra == 'optimize'", specifier = ">=10.1" }, ] -provides-extras = ["wandb", "sgp", "docker", "claude", "optimize", "jupyter", "kaggle", "evaluate", "plot", "notebook"] +provides-extras = ["wandb", "sgp", "docker", "claude", "harbor", "optimize", "jupyter", "kaggle", "evaluate", "plot", "notebook"] [package.metadata.requires-dev] dev = [