diff --git a/src/fromager/commands/graph.py b/src/fromager/commands/graph.py index cf3fab98..78860e17 100644 --- a/src/fromager/commands/graph.py +++ b/src/fromager/commands/graph.py @@ -2,14 +2,18 @@ import itertools import json import logging +import math import pathlib import sys import typing import click +import rich +import rich.box from packaging.requirements import Requirement -from packaging.utils import canonicalize_name +from packaging.utils import NormalizedName, canonicalize_name from packaging.version import Version +from rich.table import Table from fromager import clickext, context from fromager.commands import bootstrap @@ -784,3 +788,370 @@ def n2s(nodes: typing.Iterable[DependencyNode]) -> str: topo.done(*nodes_to_build) print(f"\nBuilding {len(graph)} packages in {rounds} rounds.") + + +def _get_collection_name(graph_path: str) -> str: + """Derive collection name from file path stem.""" + return pathlib.Path(graph_path).stem + + +def _get_collection_packages(graph_path: str) -> set[NormalizedName]: + """Load graph and return all canonical package names, excluding ROOT.""" + graph = DependencyGraph.from_file(graph_path) + return { + node.canonicalized_name + for node in graph.get_all_nodes() + if node.canonicalized_name != ROOT + } + + +def _find_shared_packages( + collections: dict[str, set[NormalizedName]], + min_collections: int, + display_names: dict[str, str] | None = None, +) -> list[dict[str, typing.Any]]: + """Find packages in >= min_collections collections, sorted by count desc then name asc.""" + all_packages: set[NormalizedName] = set().union(*collections.values()) + results: list[dict[str, typing.Any]] = [] + for pkg in all_packages: + containing = [ + display_names.get(key, key) if display_names else key + for key, pkgs in collections.items() + if pkg in pkgs + ] + if len(containing) >= min_collections: + results.append( + { + "package": pkg, + "collections": sorted(containing), + "count": len(containing), + } + ) + results.sort(key=lambda x: (-x["count"], x["package"])) + return results + + +def _compute_collection_impact( + collections: dict[str, set[NormalizedName]], + base_package_names: set[NormalizedName], + display_names: dict[str, str] | None = None, +) -> list[dict[str, typing.Any]]: + """For each collection, compute how many packages remain after removing base packages. + + Each entry includes per-remaining-package cross-collection counts. + Sorted by remaining package count descending, then collection name ascending. + """ + all_packages: set[NormalizedName] = set().union(*collections.values()) + pkg_counts: dict[NormalizedName, int] = { + pkg: sum(1 for pkgs in collections.values() if pkg in pkgs) + for pkg in all_packages + } + + result = [] + for key, pkgs in collections.items(): + coll_name = display_names.get(key, key) if display_names else key + base_pkgs = pkgs & base_package_names + remaining_pkgs = pkgs - base_package_names + remaining_detail = sorted( + [ + {"package": pkg, "collection_count": pkg_counts[pkg]} + for pkg in remaining_pkgs + ], + key=lambda x: ( + -typing.cast(int, x["collection_count"]), + typing.cast(str, x["package"]), + ), + ) + result.append( + { + "collection": coll_name, + "total_packages": len(pkgs), + "base_packages": len(base_pkgs), + "remaining_packages": len(remaining_pkgs), + "reduction_percentage": ( + round(len(base_pkgs) / len(pkgs) * 100, 1) if pkgs else 0.0 + ), + "remaining": remaining_detail, + } + ) + result.sort( + key=lambda x: ( + -typing.cast(int, x["remaining_packages"]), + typing.cast(str, x["collection"]), + ) + ) + return result + + +def _suggest_base_table( + candidates: list[dict[str, typing.Any]], + total_collections: int, + collection_names: list[str], + min_collections: int, + base_packages: set[NormalizedName] | None, + total_unique_packages: int, + impact: list[dict[str, typing.Any]], + base_only_packages: set[NormalizedName], +) -> None: + """Display suggest-base results as a rich table.""" + title = ( + f"Base collection candidates " + f"(threshold: {min_collections}/{total_collections} collections)\n" + f"Collections: {', '.join(sorted(collection_names))}" + ) + table = Table(title=title, box=rich.box.MARKDOWN, title_justify="left") + table.add_column("Package", justify="left", no_wrap=True) + table.add_column("Collections", justify="right", no_wrap=True) + table.add_column("Coverage", justify="right", no_wrap=True) + table.add_column("Appears In", justify="left") + if base_packages is not None: + table.add_column("In Base", justify="center", no_wrap=True) + + already_in_base = 0 + new_candidates = 0 + for entry in candidates: + pkg = entry["package"] + count = entry["count"] + cols = entry["collections"] + coverage = f"{(count / total_collections) * 100:.1f}%" + count_str = f"{count}/{total_collections}" + appears_in = ", ".join(cols) + if base_packages is not None: + in_base = pkg in base_packages + if in_base: + already_in_base += 1 + else: + new_candidates += 1 + table.add_row( + pkg, count_str, coverage, appears_in, "yes" if in_base else "no" + ) + else: + new_candidates += 1 + table.add_row(pkg, count_str, coverage, appears_in) + + console = rich.get_console() + console.print(table) + console.print(f"\nTotal unique packages: {total_unique_packages}") + console.print(f"Packages in >= {min_collections} collections: {len(candidates)}") + if base_packages is not None: + console.print(f"Already in base: {already_in_base}") + console.print(f"New candidates: {new_candidates}") + + # Collection Impact table + impact_table = Table( + title="Collection Impact", box=rich.box.MARKDOWN, title_justify="left" + ) + impact_table.add_column("Collection", justify="left", no_wrap=True) + impact_table.add_column("Total Pkgs", justify="right", no_wrap=True) + impact_table.add_column("In Base", justify="right", no_wrap=True) + impact_table.add_column("Remaining", justify="right", no_wrap=True) + impact_table.add_column("% Saved", justify="right", no_wrap=True) + for entry in impact: + impact_table.add_row( + entry["collection"], + str(entry["total_packages"]), + str(entry["base_packages"]), + str(entry["remaining_packages"]), + f"{entry['reduction_percentage']:.1f}%", + ) + console.print(impact_table) + + # Remaining Packages table — deduplicated across all collections + seen: set[NormalizedName] = set() + remaining_rows: list[dict[str, typing.Any]] = [] + for entry in impact: + for pkg_entry in entry["remaining"]: + pkg = pkg_entry["package"] + if pkg not in seen: + seen.add(pkg) + remaining_rows.append(pkg_entry) + remaining_rows.sort(key=lambda x: (-x["collection_count"], x["package"])) + + remaining_table = Table( + title="Remaining Packages (not in proposed base)", + box=rich.box.MARKDOWN, + title_justify="left", + ) + remaining_table.add_column("Package", justify="left", no_wrap=True) + remaining_table.add_column("Collections", justify="right", no_wrap=True) + remaining_table.add_column("Coverage", justify="right", no_wrap=True) + for pkg_entry in remaining_rows: + count = pkg_entry["collection_count"] + remaining_table.add_row( + pkg_entry["package"], + f"{count}/{total_collections}", + f"{(count / total_collections) * 100:.1f}%", + ) + console.print(remaining_table) + + if base_only_packages: + base_only_table = Table( + title="Existing Base Packages (carried forward, not new candidates)", + box=rich.box.MARKDOWN, + title_justify="left", + ) + base_only_table.add_column("Package", justify="left", no_wrap=True) + for pkg in sorted(base_only_packages): + base_only_table.add_row(str(pkg)) + console.print(base_only_table) + + +def _suggest_base_json( + candidates: list[dict[str, typing.Any]], + total_collections: int, + collection_names: list[str], + min_collections: int, + base_packages: set[NormalizedName] | None, + base_graph: str | None, + total_unique_packages: int, + impact: list[dict[str, typing.Any]], + base_only_packages: set[NormalizedName], +) -> None: + """Display suggest-base results as JSON.""" + output: dict[str, typing.Any] = { + "metadata": { + "total_collections": total_collections, + "total_unique_packages": total_unique_packages, + "packages_meeting_threshold": len(candidates), + "collections": sorted(collection_names), + "min_collections": min_collections, + }, + "candidates": [], + "collection_impact": impact, + } + if base_graph is not None: + output["metadata"]["base_graph"] = base_graph + + for entry in candidates: + pkg = entry["package"] + count = entry["count"] + cols = entry["collections"] + candidate: dict[str, typing.Any] = { + "package": pkg, + "collections": cols, + "collection_count": count, + "coverage_percentage": round((count / total_collections) * 100, 1), + } + if base_packages is not None: + candidate["in_base"] = pkg in base_packages + output["candidates"].append(candidate) + + if base_only_packages: + output["base_only_packages"] = sorted(str(p) for p in base_only_packages) + + json.dump(output, sys.stdout, indent=2) + + +def _suggest_base_impl( + collection_graphs: tuple[str, ...], + base_graph: str | None, + min_collections: int | None, + output_format: str, +) -> None: + """Core implementation for suggest_base, testable without a click context.""" + if len(collection_graphs) < 2: + raise click.UsageError("At least 2 collection graphs are required") + if min_collections is None: + min_collections = max(2, math.ceil(len(collection_graphs) / 2)) + elif min_collections < 2: + raise click.UsageError("--min-collections must be >= 2") + if min_collections > len(collection_graphs): + raise click.UsageError( + f"--min-collections ({min_collections}) cannot exceed number of graphs ({len(collection_graphs)})" + ) + + # Load each collection, keyed by resolved path to avoid stem collisions + collections: dict[str, set[NormalizedName]] = {} + display_names: dict[str, str] = {} + for path in collection_graphs: + key = str(pathlib.Path(path).resolve()) + name = _get_collection_name(path) + pkgs = _get_collection_packages(path) + if not pkgs: + logger.warning("Collection %s is empty, skipping", name) + continue + collections[key] = pkgs + display_names[key] = name + + # Load base graph if provided + base_packages: set[NormalizedName] | None = None + if base_graph: + base_packages = _get_collection_packages(base_graph) + + total_unique_packages = len(set().union(*collections.values())) + candidates = _find_shared_packages(collections, min_collections, display_names) + total = len(collections) + + candidate_names: set[NormalizedName] = {entry["package"] for entry in candidates} + # The full proposed base includes existing base packages (all carried forward) + proposed_base: set[NormalizedName] = ( + candidate_names | base_packages if base_packages else candidate_names + ) + # Packages carried from the existing base that are not new candidates + base_only_packages: set[NormalizedName] = ( + base_packages - candidate_names if base_packages else set() + ) + impact = _compute_collection_impact(collections, proposed_base, display_names) + + if output_format == "json": + _suggest_base_json( + candidates, + total, + list(display_names.values()), + min_collections, + base_packages, + base_graph, + total_unique_packages, + impact, + base_only_packages, + ) + else: + _suggest_base_table( + candidates, + total, + list(display_names.values()), + min_collections, + base_packages, + total_unique_packages, + impact, + base_only_packages, + ) + + +@graph.command() +@click.option( + "--base", + "base_graph", + type=str, + default=None, + help="Existing base collection graph to enhance", +) +@click.option( + "--min-collections", + type=int, + default=None, + help="Minimum collections a package must appear in (default: 50% of provided collections)", +) +@click.option( + "--format", + "output_format", + type=click.Choice(["table", "json"]), + default="table", + help="Output format (default: table)", +) +@click.argument("collection_graphs", nargs=-1, required=True) +@click.pass_obj +def suggest_base( + wkctx: context.WorkContext, + collection_graphs: tuple[str, ...], + base_graph: str | None, + min_collections: int | None, + output_format: str, +) -> None: + """Suggest packages for a shared base collection. + + Analyzes COLLECTION_GRAPHS (2 or more graph files) to identify packages + appearing across multiple collections. These are candidates for factoring + into a base collection built once and reused. + """ + _suggest_base_impl(collection_graphs, base_graph, min_collections, output_format) diff --git a/tests/test_graph_commands.py b/tests/test_graph_commands.py index 4f2f63fd..ab9a4b27 100644 --- a/tests/test_graph_commands.py +++ b/tests/test_graph_commands.py @@ -1,15 +1,22 @@ """Test graph command functions that display constraint information.""" +import json +import pathlib from unittest.mock import Mock +import click import pytest from packaging.requirements import Requirement -from packaging.utils import canonicalize_name +from packaging.utils import NormalizedName, canonicalize_name from packaging.version import Version from fromager import dependency_graph from fromager.commands.graph import ( + _compute_collection_impact, _find_customized_dependencies_for_node, + _find_shared_packages, + _get_collection_packages, + _suggest_base_impl, find_why, show_explain_duplicates, ) @@ -458,3 +465,406 @@ def mock_pbi(name: str) -> Mock: assert "<2.0" in requirement_str or "< 2.0" in requirement_str # Should NOT be B's requirement of C assert "package-c" not in requirement_str + + +# --------------------------------------------------------------------------- +# Helpers for suggest_base tests +# --------------------------------------------------------------------------- + + +def _make_graph_file(tmp_path: pathlib.Path, stem: str, packages: list[str]) -> str: + """Write a minimal graph JSON file containing the given top-level packages.""" + graph = dependency_graph.DependencyGraph() + for pkg in packages: + graph.add_dependency( + parent_name=None, + parent_version=None, + req_type=RequirementType.TOP_LEVEL, + req=Requirement(pkg), + req_version=Version("1.0.0"), + download_url=f"https://example.com/{pkg}-1.0.0.tar.gz", + ) + path = tmp_path / f"{stem}.json" + with open(path, "w") as f: + graph.serialize(f) + return str(path) + + +# --------------------------------------------------------------------------- +# Unit tests for helper functions +# --------------------------------------------------------------------------- + + +def test_get_collection_packages(tmp_path: pathlib.Path) -> None: + """_get_collection_packages returns normalized names, excluding ROOT.""" + path = _make_graph_file(tmp_path, "col-a", ["package-one", "PackageTwo"]) + result = _get_collection_packages(path) + assert NormalizedName("package-one") in result + assert NormalizedName("packagetwo") in result + assert NormalizedName("") not in result # ROOT excluded + + +def test_find_shared_packages_basic() -> None: + """Basic overlap: packages in >= 2 of 3 collections are returned.""" + collections: dict[str, set[NormalizedName]] = { + "a": {NormalizedName("b"), NormalizedName("c"), NormalizedName("d")}, + "b": {NormalizedName("b"), NormalizedName("c"), NormalizedName("e")}, + "c": {NormalizedName("c"), NormalizedName("f")}, + } + results = _find_shared_packages(collections, min_collections=2) + packages = {r["package"] for r in results} + assert NormalizedName("b") in packages + assert NormalizedName("c") in packages + assert NormalizedName("d") not in packages # only in 1 + assert NormalizedName("e") not in packages # only in 1 + assert NormalizedName("f") not in packages # only in 1 + + +def test_find_shared_packages_threshold() -> None: + """min_collections=3 returns only packages in all 3 collections.""" + collections: dict[str, set[NormalizedName]] = { + "a": {NormalizedName("b"), NormalizedName("c"), NormalizedName("d")}, + "b": { + NormalizedName("b"), + NormalizedName("c"), + NormalizedName("d"), + NormalizedName("e"), + }, + "c": {NormalizedName("c"), NormalizedName("d"), NormalizedName("f")}, + } + results = _find_shared_packages(collections, min_collections=3) + packages = {r["package"] for r in results} + assert NormalizedName("c") in packages + assert NormalizedName("d") in packages + assert NormalizedName("b") not in packages # only in 2 + + +def test_find_shared_packages_sorting() -> None: + """Results sorted by count desc then package name asc.""" + collections: dict[str, set[NormalizedName]] = { + "a": {NormalizedName("z"), NormalizedName("m"), NormalizedName("a")}, + "b": {NormalizedName("z"), NormalizedName("m"), NormalizedName("a")}, + "c": {NormalizedName("z")}, + } + results = _find_shared_packages(collections, min_collections=2) + # z is in 3 collections, m and a in 2 + assert results[0]["package"] == NormalizedName("z") + # Among count=2 entries, 'a' comes before 'm' + remaining = [r["package"] for r in results[1:]] + assert remaining == sorted(remaining) + + +# --------------------------------------------------------------------------- +# Command output tests +# --------------------------------------------------------------------------- + + +def test_suggest_base_table_output( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """suggest_base command produces table output with key strings.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-only-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=None, + min_collections=2, + output_format="table", + ) + + captured = capsys.readouterr() + assert "pkg-shared" in captured.out + assert "Total unique packages: 3" in captured.out + assert "Packages in >= 2 collections: 1" in captured.out + # pkg-only-* appear in the Remaining Packages section, not the candidates table + assert "pkg-only-a" in captured.out + assert "pkg-only-b" in captured.out + + +def test_suggest_base_dynamic_default_min_collections( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """Default --min-collections is 50% of provided graphs (rounded up).""" + # 4 graphs → default threshold = ceil(4/2) = 2 + # pkg-shared-ab is in A and B (2/4), pkg-shared-abc is in A, B, C (3/4) + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared-ab", "pkg-shared-abc"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared-ab", "pkg-shared-abc"]) + path_c = _make_graph_file(tmp_path, "coll-c", ["pkg-shared-abc"]) + path_d = _make_graph_file(tmp_path, "coll-d", ["pkg-only-d"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b, path_c, path_d), + base_graph=None, + min_collections=None, # dynamic default: ceil(4/2) = 2 + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + assert data["metadata"]["min_collections"] == 2 + packages = {c["package"] for c in data["candidates"]} + assert "pkg-shared-ab" in packages + assert "pkg-shared-abc" in packages + assert "pkg-only-d" not in packages + + +def test_suggest_base_json_output( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """suggest_base command produces valid JSON output.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-only-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=None, + min_collections=2, + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + assert "metadata" in data + assert "candidates" in data + assert data["metadata"]["total_collections"] == 2 + assert data["metadata"]["total_unique_packages"] == 3 + assert data["metadata"]["packages_meeting_threshold"] == 1 + assert data["metadata"]["min_collections"] == 2 + assert len(data["candidates"]) == 1 + candidate = data["candidates"][0] + assert candidate["package"] == "pkg-shared" + assert candidate["collection_count"] == 2 + assert candidate["coverage_percentage"] == 100.0 + assert "in_base" not in candidate + + +def test_suggest_base_with_base_graph( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """--base flag marks packages that are already in the base graph.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-new"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-new"]) + path_base = _make_graph_file(tmp_path, "base", ["pkg-shared"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=path_base, + min_collections=2, + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + candidates_by_pkg = {c["package"]: c for c in data["candidates"]} + assert candidates_by_pkg["pkg-shared"]["in_base"] is True + assert candidates_by_pkg["pkg-new"]["in_base"] is False + assert data["metadata"]["base_graph"] == path_base + + +def test_suggest_base_too_few_graphs(tmp_path: pathlib.Path) -> None: + """Error raised when fewer than 2 graphs are provided.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-a"]) + + with pytest.raises(click.UsageError, match="At least 2 collection graphs"): + _suggest_base_impl( + collection_graphs=(path_a,), + base_graph=None, + min_collections=2, + output_format="table", + ) + + +def test_suggest_base_invalid_min_collections(tmp_path: pathlib.Path) -> None: + """Error raised when --min-collections exceeds number of graphs.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-b"]) + + with pytest.raises(click.UsageError, match="cannot exceed number of graphs"): + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=None, + min_collections=3, + output_format="table", + ) + + +# --------------------------------------------------------------------------- +# Tests for _compute_collection_impact +# --------------------------------------------------------------------------- + + +def test_compute_collection_impact_basic() -> None: + """Remaining counts and per-package cross-collection counts are correct.""" + # Arrange: 3 collections with known overlap + # base candidates: pkg-shared (in all 3) + # remaining: pkg-ab (in a, b), pkg-only-a (in a only), etc. + collections: dict[str, set[NormalizedName]] = { + "coll-a": { + NormalizedName("pkg-shared"), + NormalizedName("pkg-ab"), + NormalizedName("pkg-only-a"), + }, + "coll-b": { + NormalizedName("pkg-shared"), + NormalizedName("pkg-ab"), + NormalizedName("pkg-only-b"), + }, + "coll-c": { + NormalizedName("pkg-shared"), + NormalizedName("pkg-only-c"), + }, + } + base_package_names: set[NormalizedName] = {NormalizedName("pkg-shared")} + + # Act + result = _compute_collection_impact(collections, base_package_names) + + # Assert: each collection entry has correct counts + by_coll = {entry["collection"]: entry for entry in result} + assert by_coll["coll-a"]["total_packages"] == 3 + assert by_coll["coll-a"]["base_packages"] == 1 + assert by_coll["coll-a"]["remaining_packages"] == 2 + assert by_coll["coll-b"]["remaining_packages"] == 2 + assert by_coll["coll-c"]["remaining_packages"] == 1 + + # pkg-ab appears in 2 collections, should have collection_count=2 + coll_a_remaining = {r["package"]: r for r in by_coll["coll-a"]["remaining"]} + assert coll_a_remaining[NormalizedName("pkg-ab")]["collection_count"] == 2 + assert coll_a_remaining[NormalizedName("pkg-only-a")]["collection_count"] == 1 + + # reduction_percentage for coll-a: 1/3 * 100 = 33.3% + assert by_coll["coll-a"]["reduction_percentage"] == 33.3 + + +def test_compute_collection_impact_sorting() -> None: + """Results sorted by remaining_packages desc, then collection name asc.""" + collections: dict[str, set[NormalizedName]] = { + "coll-z": {NormalizedName("pkg-shared"), NormalizedName("r1")}, + "coll-a": { + NormalizedName("pkg-shared"), + NormalizedName("r2"), + NormalizedName("r3"), + }, + "coll-m": {NormalizedName("pkg-shared")}, + } + base_package_names: set[NormalizedName] = {NormalizedName("pkg-shared")} + + result = _compute_collection_impact(collections, base_package_names) + + # coll-a has 2 remaining, coll-z has 1, coll-m has 0 + assert result[0]["collection"] == "coll-a" + assert result[1]["collection"] == "coll-z" + assert result[2]["collection"] == "coll-m" + + +def test_suggest_base_table_includes_impact( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """Table output includes Collection Impact section.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-only-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=None, + min_collections=2, + output_format="table", + ) + + captured = capsys.readouterr() + assert "Collection Impact" in captured.out + assert "Total Pkgs" in captured.out + assert "In Base" in captured.out + assert "Remaining" in captured.out + assert "% Saved" in captured.out + # Title may be word-wrapped by Rich; check for the prefix + assert "Remaining Packages (not in proposed" in captured.out + + +def test_suggest_base_json_includes_impact( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """JSON output includes collection_impact key with correct structure.""" + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-only-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=None, + min_collections=2, + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + assert "collection_impact" in data + impact = data["collection_impact"] + assert len(impact) == 2 + + by_coll = {entry["collection"]: entry for entry in impact} + # Each collection has 2 total packages, 1 shared (in base), 1 remaining + for coll_name in ("coll-a", "coll-b"): + entry = by_coll[coll_name] + assert entry["total_packages"] == 2 + assert entry["base_packages"] == 1 + assert entry["remaining_packages"] == 1 + assert entry["reduction_percentage"] == 50.0 + assert len(entry["remaining"]) == 1 + + +def test_suggest_base_json_base_only_packages( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """Packages in --base that are not candidates appear in base_only_packages.""" + # pkg-shared is a candidate (in both collections); pkg-base-only is only in the base + path_a = _make_graph_file(tmp_path, "coll-a", ["pkg-shared", "pkg-only-a"]) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + path_base = _make_graph_file(tmp_path, "base", ["pkg-shared", "pkg-base-only"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=path_base, + min_collections=2, + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + + # pkg-base-only is in the base but not a candidate + assert "base_only_packages" in data + assert "pkg-base-only" in data["base_only_packages"] + # pkg-shared is a candidate and in the base; it should NOT be in base_only_packages + assert "pkg-shared" not in data["base_only_packages"] + + +def test_suggest_base_json_base_only_impacts_collection_impact( + capsys: pytest.CaptureFixture[str], tmp_path: pathlib.Path +) -> None: + """Base-only packages count toward collection impact when --base is provided.""" + # pkg-shared is a candidate; pkg-base-only is base-only but appears in coll-a + path_a = _make_graph_file( + tmp_path, "coll-a", ["pkg-shared", "pkg-base-only", "pkg-only-a"] + ) + path_b = _make_graph_file(tmp_path, "coll-b", ["pkg-shared", "pkg-only-b"]) + path_base = _make_graph_file(tmp_path, "base", ["pkg-shared", "pkg-base-only"]) + + _suggest_base_impl( + collection_graphs=(path_a, path_b), + base_graph=path_base, + min_collections=2, + output_format="json", + ) + + captured = capsys.readouterr() + data = json.loads(captured.out) + + by_coll = {entry["collection"]: entry for entry in data["collection_impact"]} + # coll-a has 3 packages; pkg-shared and pkg-base-only are both in the proposed + # base, so base_packages=2 and remaining_packages=1 + assert by_coll["coll-a"]["base_packages"] == 2 + assert by_coll["coll-a"]["remaining_packages"] == 1 + # coll-b has 2 packages; only pkg-shared is in the proposed base + assert by_coll["coll-b"]["base_packages"] == 1 + assert by_coll["coll-b"]["remaining_packages"] == 1