Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions vero/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ dependencies = [
"datasets>=4.3.0",
"pydantic>=2.11.7",
"python-dotenv>=1.2.2",
"pyyaml>=6.0",
"requests>=2.32.5",
"rich>=13.9.4",
"s3fs>=2025.9.0",
"tenacity>=9.1.2",
"toml>=0.10.2",
Expand All @@ -37,6 +39,12 @@ docker = [
claude = [
"claude-agent-sdk>=0.1.56",
]
harbor = [
"fastapi>=0.110",
"uvicorn>=0.27",
"httpx>=0.27",
"jinja2>=3.1.6",
]
optimize = [
"async-lru>=2.0.5",
"beautifulsoup4>=4.14.2",
Expand Down
187 changes: 187 additions & 0 deletions vero/src/vero/core/budget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Per-split evaluation budgets and the ledger that meters them.

``SplitBudget`` is the public, stateful budget for one (split, dataset_id) pair.
``BudgetLedger`` owns a set of them — the keys also form the allowlist of
evaluable combinations. The ledger is in-memory by default (the in-process
``ExperimentRunnerTool``); with a ``persist_path`` it flushes every mutation to
durable JSON under a single-writer lock (the Harbor eval sidecar).
"""

from __future__ import annotations

import asyncio
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path

from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError

logger = logging.getLogger(__name__)


@dataclass
class SplitBudget:
"""A stateful object that tracks the remaining budget for running experiments."""

split: str
dataset_id: str = ""
total_sample_budget: int | None = None
remaining_sample_budget: int | None = field(init=False)
total_run_budget: int | None = None
remaining_run_budget: int | None = field(init=False)
max_samples_per_run: int | None = None

def __repr__(self) -> str:
repr_items = [
("split", self.split),
("dataset_id", self.dataset_id),
("total_sample_budget", self.total_sample_budget),
("total_run_budget", self.total_run_budget),
]
repr_items = [item for item in repr_items if item[1] is not None]
return (
f"SplitBudget({', '.join([f'{item[0]}={item[1]}' for item in repr_items])})"
)

def __post_init__(self):
assert (
self.total_sample_budget is not None or self.total_run_budget is not None
), "Either total sample budget or total run budget must be provided."
self.remaining_sample_budget = self.total_sample_budget
self.remaining_run_budget = self.total_run_budget

assert (
isinstance(self.total_sample_budget, int)
or self.total_sample_budget is None
)
assert isinstance(self.total_run_budget, int) or self.total_run_budget is None
assert (
isinstance(self.max_samples_per_run, int)
or self.max_samples_per_run is None
)

def has_run_budget(self) -> bool:
return self.remaining_run_budget is None or self.remaining_run_budget > 0

def decrement_run_budget(self) -> None:
if self.remaining_run_budget is not None:
self.remaining_run_budget -= 1

def has_sample_budget(self, num_samples: int) -> bool:
return (
self.remaining_sample_budget is None
or self.remaining_sample_budget >= num_samples
)

def decrement_sample_budget(self, num_samples: int) -> None:
if self.remaining_sample_budget is not None:
self.remaining_sample_budget -= num_samples

def exceeds_per_run_budget(self, num_samples: int) -> bool:
return (
self.max_samples_per_run is not None
and num_samples > self.max_samples_per_run
)


class BudgetLedger:
"""Meters evaluation budget across (split, dataset_id) pairs.

The keys are also the allowlist of evaluable combinations: a pair with no
budget entry is rejected by ``validate``.

In-memory by default. Pass ``persist_path`` for the durable, crash-safe
variant used by the Harbor sidecar — every mutation is flushed under a
single-writer lock, and ``reserve`` checks-and-decrements atomically before a
run so concurrent callers cannot overspend. Budget is never refunded on error.
"""

def __init__(
self,
budgets: list[SplitBudget] | None = None,
*,
persist_path: Path | str | None = None,
):
self._budgets: dict[tuple[str, str], SplitBudget] = {
(b.split, b.dataset_id): b for b in (budgets or [])
}
self.persist_path = Path(persist_path) if persist_path else None
self._lock = asyncio.Lock()

def validate(self, dataset_id: str, split: str) -> None:
"""Raise if (split, dataset_id) is not an allowed combination."""
if (split, dataset_id) not in self._budgets:
allowed_keys = list(self._budgets.keys())
raise InvalidSplitError(
f"No split budget found for the combination (dataset_id={dataset_id}, split={split}) "
f"either because it does not exist or because it is not allowed. "
f"Allowed combinations: {allowed_keys}"
)

def get(self, dataset_id: str, split: str) -> SplitBudget:
"""Return the budget for a pair (validates membership first)."""
self.validate(dataset_id, split)
return self._budgets[(split, dataset_id)]

def check(self, dataset_id: str, split: str, num_samples: int) -> None:
"""Raise ExperimentBudgetExceeded if the request would exceed the budget."""
budget = self.get(dataset_id, split)
if not budget.has_run_budget():
raise ExperimentBudgetExceeded(
f"No runs left for the {split} split of the {dataset_id} dataset."
)
if not budget.has_sample_budget(num_samples):
raise ExperimentBudgetExceeded(
f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, "
f"but the remaining sample budget only allows for {budget.remaining_sample_budget} samples."
)
if budget.exceeds_per_run_budget(num_samples):
raise ExperimentBudgetExceeded(
f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, "
f"but only {budget.max_samples_per_run} are allowed per run."
)

def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""Decrement the budget for a completed (or attempted) run and flush."""
budget = self.get(dataset_id, split)
budget.decrement_sample_budget(num_samples)
budget.decrement_run_budget()
self._flush()
return budget

async def reserve(
self, dataset_id: str, split: str, num_samples: int
) -> SplitBudget:
"""Atomically check + record before a run (durable, single-writer).

Raises InvalidSplitError / ExperimentBudgetExceeded *before* decrementing,
so a rejected request costs nothing; a reserved request is never refunded.
"""
async with self._lock:
self.check(dataset_id, split, num_samples)
return self.record(dataset_id, split, num_samples)

def status(self) -> dict[tuple[str, str], SplitBudget]:
"""Return all budgets keyed by (split, dataset_id)."""
return dict(self._budgets)

def _flush(self) -> None:
if self.persist_path is None:
return
data = [
{
"split": b.split,
"dataset_id": b.dataset_id,
"total_sample_budget": b.total_sample_budget,
"remaining_sample_budget": b.remaining_sample_budget,
"total_run_budget": b.total_run_budget,
"remaining_run_budget": b.remaining_run_budget,
"max_samples_per_run": b.max_samples_per_run,
}
for b in self._budgets.values()
]
self.persist_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.persist_path.with_suffix(self.persist_path.suffix + ".tmp")
tmp.write_text(json.dumps(data, indent=2))
tmp.replace(self.persist_path)
14 changes: 12 additions & 2 deletions vero/src/vero/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def main():
setup_logging()


# Optional `vero harbor` group (requires the `harbor` extra). Registered lazily so the
# base CLI works without it.
try:
from vero.harbor.cli import harbor as _harbor_group

main.add_command(_harbor_group)
except ImportError:
pass


@main.group()
def init():
"""Initialize evaluation scaffolds for your uv project."""
Expand Down Expand Up @@ -578,7 +588,7 @@ def check(
if errors:
click.echo("\n Skipping task discovery (project issues above)")
else:
from vero.evaluator import Evaluator
from vero.evaluation.evaluator import Evaluator
from vero.workspace.git import GitWorkspace

async def _discover():
Expand Down Expand Up @@ -760,7 +770,7 @@ def evaluate(
"""Run an evaluation on an agent codebase."""
import asyncio

from vero.evaluator import run_evaluation
from vero.evaluation.evaluator import run_evaluation

asyncio.run(
run_evaluation(
Expand Down
26 changes: 22 additions & 4 deletions vero/src/vero/core/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ class DefaultSplitNames(StrEnum):


class SplitAccessLevel(StrEnum):
"""Access levels for dataset splits."""
"""Access levels for dataset splits.

Three tiers of increasing restriction:
- viewable: rows materialized + full per-sample results visible.
- non_viewable: no rows, but the split can be evaluated and summary stats seen.
- no_access: no rows, no summary, and not agent-evaluable (admin/verifier only).
"""

viewable = "viewable"
non_viewable = "non_viewable"
no_access = "no_access"


@dataclass
Expand All @@ -40,17 +47,28 @@ def viewable(cls, split: str) -> SplitAccess:
def non_viewable(cls, split: str) -> SplitAccess:
return cls(split=split, access=SplitAccessLevel.non_viewable)

@classmethod
def no_access(cls, split: str) -> SplitAccess:
return cls(split=split, access=SplitAccessLevel.no_access)


default_split_accesses = (
SplitAccess.non_viewable(DefaultSplitNames.test),
SplitAccess.no_access(DefaultSplitNames.test),
SplitAccess.non_viewable(DefaultSplitNames.validation),
)


def get_non_viewable_splits(split_accesses: list[SplitAccess]) -> list[str]:
"""Extract non-viewable splits from a list of SplitAccess."""
"""Splits whose rows/details are not viewable (non_viewable and no_access).

no_access is strictly more restrictive than non_viewable, so it is excluded
everywhere non_viewable is. The non_viewable/no_access distinction (summary +
agent-evaluable vs. not) is enforced in the evaluation engine, not here.
"""
return [
sa.split for sa in split_accesses if sa.access == SplitAccessLevel.non_viewable
sa.split
for sa in split_accesses
if sa.access in (SplitAccessLevel.non_viewable, SplitAccessLevel.no_access)
]


Expand Down
60 changes: 38 additions & 22 deletions vero/src/vero/core/db/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import json
import logging
import traceback
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Sequence
from uuid import uuid4

from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field, model_validator

from vero.core.constants import default_maximum_score, default_minimum_score
from vero.core.db.dataset import DatasetSample
Expand All @@ -20,20 +19,42 @@
logger = logging.getLogger(__name__)


@dataclass
class TaskOutput:
"""Non-serializable output of an agent on a single task. Used within a subprocess to collate the outputs of the inference process.
class TaskOutput(BaseModel):
"""Serializable output of inference on a single task.

Persisted between the inference and scoring stages, so it must be
JSON-serializable. An ``Exception`` passed to ``error`` is coerced to its
string form, with the traceback captured into ``error_traceback``.

Attributes:
output: The output of the agent on the task.
error: An optional error string, e.g. the traceback of the error.
execution_trace: An optional list of spans indicating details of the inference process.
error: An error string (e.g. ``str(exception)``).
error_traceback: Full traceback string if inference raised.
execution_trace: An optional list of spans describing the inference process.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

output: Any = None
error: Exception | None = None
error: str | None = None
error_traceback: str | None = None
execution_trace: Sequence[Any] | None = None

@model_validator(mode="before")
@classmethod
def _coerce_exception_error(cls, data: Any) -> Any:
"""Accept an Exception in ``error`` and convert it to str + traceback."""
if isinstance(data, dict):
err = data.get("error")
if isinstance(err, BaseException):
data = dict(data)
data["error"] = str(err)
if not data.get("error_traceback"):
data["error_traceback"] = "".join(
traceback.format_exception(type(err), err, err.__traceback__)
)
return data


class TaskResult(BaseModel):
"""Serializable evaluation result for a single task. Used across processes for long-term storage of evaluation results.
Expand Down Expand Up @@ -62,21 +83,12 @@ class TaskResult(BaseModel):

@classmethod
def from_task_output(cls, task_output: TaskOutput, **kwargs: Any) -> TaskResult:
"""Create a TaskResult from a TaskOutput."""

if isinstance(task_output.error, Exception):
kwargs["error"] = str(task_output.error)
kwargs["error_traceback"] = "".join(
traceback.format_exception(
type(task_output.error),
task_output.error,
task_output.error.__traceback__,
)
)

kwargs["execution_trace"] = task_output.execution_trace
"""Create a TaskResult from a (serializable) TaskOutput."""
kwargs["output"] = task_output.output

kwargs["execution_trace"] = task_output.execution_trace
if task_output.error is not None:
kwargs.setdefault("error", task_output.error)
kwargs.setdefault("error_traceback", task_output.error_traceback)
return cls(**kwargs)


Expand Down Expand Up @@ -113,6 +125,10 @@ def is_error(self) -> bool:
or self.error_traceback is not None
)

def is_scored(self) -> bool:
"""True once the scoring stage has run for this sample (score or eval_error set)."""
return self.score is not None or self.eval_error is not None

def as_pandas_series(self, exclude: set[str] | None = None) -> Series:
"""Return the sample result in a pandas representation."""
import pandas as pd
Expand Down
Loading