Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ authors = [
]
requires-python = ">=3.13"
dependencies = [
"microplex[calibrate] @ git+https://github.com/PolicyEngine/microplex.git@773106e3a159a0417ed15025b507ab05c0b93b5d",
"microplex[calibrate] @ git+https://github.com/PolicyEngine/microplex.git@cad505289da23e1e7a5eded3c67a248cd8d1b8e4",
"duckdb>=1.2",
"h5py>=3.10",
"requests>=2.31",
Expand Down
221 changes: 219 additions & 2 deletions src/microplex_us/pipelines/check_export_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@

import argparse
import json
import re
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any

# Path to the committed contract shipped alongside this module.
DEFAULT_CONTRACT_PATH = Path(__file__).with_name("ecps_export_contract.json")
DEFAULT_SPEC_PATH = Path(__file__).resolve().parents[1] / "specs" / "us-2024.yaml"

SIGNED_NUMERIC_SUPPORT_COLUMNS = frozenset(
{
Expand Down Expand Up @@ -113,6 +115,28 @@ def ok(self) -> bool:
return not self.issues


@dataclass
class SpecVariableManifestDiff:
"""Result of checking ``spec.variables`` against the frozen contract."""

spec_path: str
required_contract_count: int
declared_imputation_count: int
variable_manifest_count: int
missing_required: list[str]
missing_declared_imputation: list[str]
extra_variables: list[str]

@property
def ok(self) -> bool:
"""True when the manifest exactly covers required and declared vars."""
return not (
self.missing_required
or self.missing_declared_imputation
or self.extra_variables
)


def compute_column_diff(
present: set[str],
*,
Expand Down Expand Up @@ -229,6 +253,138 @@ def compute_support_diff(
)


def compute_spec_variable_manifest_diff(
*,
contract: dict,
spec_path: Path = DEFAULT_SPEC_PATH,
) -> SpecVariableManifestDiff:
"""Compare ``spec.variables`` with required exports and declared imputations."""
text = spec_path.read_text(encoding="utf-8")
variables = _parse_top_level_mapping_keys(text, "variables")
if not variables:
raise ValueError(f"Spec {spec_path} is missing a variables mapping.")

required = {str(column) for column in contract["required"]}
declared_imputation = _parse_imputation_vars(text)
expected = required | declared_imputation
return SpecVariableManifestDiff(
spec_path=str(spec_path),
required_contract_count=len(required),
declared_imputation_count=len(declared_imputation),
variable_manifest_count=len(variables),
missing_required=sorted(required - variables),
missing_declared_imputation=sorted(declared_imputation - variables),
extra_variables=sorted(variables - expected),
)


def _top_level_section_lines(text: str, section: str) -> list[str]:
"""Return lines in a simple top-level YAML section.

This module is intentionally importable with only the column-parity
job's minimal dependencies, so the fast manifest gate avoids PyYAML.
The parser only needs the committed spec's shape: top-level sections,
mapping keys under ``variables:``, and imputation ``vars`` lists.
"""
section_header = f"{section}:"
lines = text.splitlines()
for index, line in enumerate(lines):
if line.strip() == section_header and not line.startswith((" ", "\t")):
body: list[str] = []
for candidate in lines[index + 1 :]:
stripped = candidate.strip()
if (
stripped
and not candidate.startswith((" ", "\t"))
and re.match(r"^[A-Za-z_][A-Za-z0-9_-]*:", stripped)
):
break
body.append(candidate)
return body
return []


def _parse_top_level_mapping_keys(text: str, section: str) -> set[str]:
"""Parse direct mapping keys from a top-level section."""
keys: set[str] = set()
for line in _top_level_section_lines(text, section):
match = re.match(r"^ ([A-Za-z_][A-Za-z0-9_]*):(?:\s|$)", line)
if match:
keys.add(match.group(1))
return keys


def _parse_inline_list(raw: str) -> list[str]:
"""Parse the simple YAML inline list form used in tests."""
stripped = raw.strip()
if not stripped.startswith("[") or not stripped.endswith("]"):
return []
body = stripped[1:-1].strip()
if not body:
return []
return [
parsed
for item in body.split(",")
if (parsed := _parse_simple_yaml_scalar(item)) is not None
]


def _parse_simple_yaml_scalar(raw: str) -> str | None:
"""Parse a simple YAML scalar variable name with optional inline comment."""
value = raw.strip()
quote: str | None = None
unquoted = []
for index, char in enumerate(value):
if char in {"'", '"'}:
if quote is None:
quote = char
elif quote == char:
quote = None
if char == "#" and quote is None and (index == 0 or value[index - 1].isspace()):
break
unquoted.append(char)
value = "".join(unquoted).strip()
if (
len(value) >= 2
and value[0] == value[-1]
and value[0] in {"'", '"'}
):
value = value[1:-1].strip()
if re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", value):
return value
return None


def _parse_imputation_vars(text: str) -> set[str]:
"""Parse variable names from imputation step ``vars`` lists."""
variables: set[str] = set()
in_vars_block = False
for line in _top_level_section_lines(text, "imputation"):
if re.match(r"^ -\s", line):
in_vars_block = False

inline_match = re.match(r"^ vars:\s*(\[.*\])(?:\s+#.*)?$", line)
if inline_match:
variables.update(_parse_inline_list(inline_match.group(1)))
in_vars_block = False
continue

if re.match(r"^ vars:\s*$", line):
in_vars_block = True
continue

if in_vars_block:
item_match = re.match(r"^ -\s+(.+?)\s*$", line)
if item_match:
if parsed := _parse_simple_yaml_scalar(item_match.group(1)):
variables.add(parsed)
continue
if re.match(r"^ [A-Za-z_][A-Za-z0-9_-]*:", line):
in_vars_block = False

return variables


def _h5_column_values(
handle: Any,
column: str,
Expand Down Expand Up @@ -416,6 +572,7 @@ def _format_report(
n_required: int,
n_forbidden: int,
support_diff: SupportDiff | None = None,
spec_diff: SpecVariableManifestDiff | None = None,
) -> str:
"""Build a human-readable report for the diff."""
lines = [
Expand Down Expand Up @@ -452,7 +609,29 @@ def _format_report(
),
]
)
ok = diff.ok and (support_diff is None or support_diff.ok)
if spec_diff is not None:
lines.extend(
[
"",
" spec variable manifest:",
f" spec: {spec_diff.spec_path}",
f" required contract variables: {spec_diff.required_contract_count}",
f" declared imputation variables: {spec_diff.declared_imputation_count}",
f" spec.variables count: {spec_diff.variable_manifest_count}",
f" missing_required ({len(spec_diff.missing_required)}):",
*_bullet_lines(spec_diff.missing_required),
" missing_declared_imputation "
f"({len(spec_diff.missing_declared_imputation)}):",
*_bullet_lines(spec_diff.missing_declared_imputation),
f" extra_variables ({len(spec_diff.extra_variables)}):",
*_bullet_lines(spec_diff.extra_variables),
]
)
ok = (
diff.ok
and (support_diff is None or support_diff.ok)
and (spec_diff is None or spec_diff.ok)
)
lines.extend(["", " RESULT: " + ("PASS" if ok else "FAIL")])
return "\n".join(lines)

Expand Down Expand Up @@ -522,6 +701,20 @@ def main(argv: list[str] | None = None) -> int:
default=str(DEFAULT_CONTRACT_PATH),
help="Override the contract JSON (default: committed contract).",
)
parser.add_argument(
"--spec",
metavar="FILE",
help=(
"Spec YAML whose variables block must cover the contract and "
"declared imputation vars. Defaults to the committed US spec when "
"using the committed contract."
),
)
parser.add_argument(
"--skip-spec-variable-manifest",
action="store_true",
help="Skip the spec.variables manifest coverage check.",
)
parser.add_argument(
"--support-baseline",
metavar="H5",
Expand Down Expand Up @@ -593,6 +786,21 @@ def main(argv: list[str] | None = None) -> int:
optional=optional,
excluded=excluded,
)
contract_path = Path(args.contract).resolve()
spec_path = None
if not args.skip_spec_variable_manifest:
if args.spec:
spec_path = Path(args.spec)
elif contract_path == DEFAULT_CONTRACT_PATH.resolve():
spec_path = DEFAULT_SPEC_PATH
spec_diff = (
None
if spec_path is None
else compute_spec_variable_manifest_diff(
contract=contract,
spec_path=Path(spec_path),
)
)
support_diff = None
if args.support_baseline:
support_exempt = set(contract.get("support_exemptions", [])) | set(
Expand All @@ -619,9 +827,18 @@ def main(argv: list[str] | None = None) -> int:
n_required=len(required),
n_forbidden=len(forbidden),
support_diff=support_diff,
spec_diff=spec_diff,
)
)
return (
0
if (
diff.ok
and (support_diff is None or support_diff.ok)
and (spec_diff is None or spec_diff.ok)
)
else 1
)
return 0 if diff.ok and (support_diff is None or support_diff.ok) else 1


if __name__ == "__main__":
Expand Down
22 changes: 19 additions & 3 deletions src/microplex_us/pipelines/mp300k_artifact_gates.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ def _column_contract_gate(
from microplex_us.pipelines.check_export_columns import (
DEFAULT_CONTRACT_PATH,
compute_column_diff,
compute_spec_variable_manifest_diff,
load_contract,
)

Expand All @@ -535,6 +536,7 @@ def _column_contract_gate(
optional=optional,
excluded=excluded,
)
spec_diff = compute_spec_variable_manifest_diff(contract=contract)
satisfied_count = len(required) - len(diff.missing_required)
contract_share = float(satisfied_count / len(required)) if required else None
metrics = {
Expand All @@ -553,23 +555,37 @@ def _column_contract_gate(
# informational, matching check_export_columns.
"extra_candidate_column_count": len(diff.extra_unknown),
"column_contract_share": contract_share,
"spec_variable_manifest_count": spec_diff.variable_manifest_count,
"spec_required_contract_column_count": spec_diff.required_contract_count,
"spec_declared_imputation_variable_count": spec_diff.declared_imputation_count,
"spec_missing_required_column_count": len(spec_diff.missing_required),
"spec_missing_declared_imputation_count": len(
spec_diff.missing_declared_imputation
),
"spec_extra_variable_count": len(spec_diff.extra_variables),
}
details = {
"missing_contract_columns": diff.missing_required,
"forbidden_present_columns": diff.forbidden_present,
"extra_unknown_columns": diff.extra_unknown,
"extra_candidate_columns": diff.extra_unknown,
"spec_variable_manifest": {
"spec_path": spec_diff.spec_path,
"missing_required": spec_diff.missing_required,
"missing_declared_imputation": spec_diff.missing_declared_imputation,
"extra_variables": spec_diff.extra_variables,
},
}
if diff.missing_required or diff.forbidden_present:
if diff.missing_required or diff.forbidden_present or not spec_diff.ok:
return _gate(
"fail",
"candidate H5 leaf-input column set violates the frozen eCPS contract",
"candidate H5 leaf-input column set or spec manifest violates the frozen eCPS contract",
metrics=metrics,
details=details,
)
return _gate(
"pass",
"candidate H5 leaf-input column set satisfies the frozen eCPS contract",
"candidate H5 leaf-input column set and spec manifest satisfy the frozen eCPS contract",
metrics=metrics,
details=details,
)
Expand Down
Loading
Loading