diff --git a/docs/reference/authentication.md b/docs/reference/authentication.md new file mode 100644 index 0000000000..b9fc62a51c --- /dev/null +++ b/docs/reference/authentication.md @@ -0,0 +1,181 @@ +# Authentication + +Specify CLI uses **opt-in authentication** for HTTP requests to catalog +sources, extension downloads, and release checks. No credentials are +sent unless you explicitly configure them. + +## Configuration + +Create `~/.specify/auth.json` to enable authentication: + +```json +{ + "providers": [ + { + "hosts": ["github.com", "api.github.com", "raw.githubusercontent.com", "codeload.github.com"], + "provider": "github", + "auth": "bearer", + "token_env": "GH_TOKEN" + } + ] +} +``` + +> **Security:** Restrict the file to owner-only access: +> ``` +> chmod 600 ~/.specify/auth.json +> ``` + +Without this file, all HTTP requests are unauthenticated. + +## Fields + +Each entry in the `providers` array has the following fields: + +| Field | Required | Description | +|---|---|---| +| `hosts` | Yes | Array of hostnames this entry applies to. Supports wildcards (e.g. `*.visualstudio.com`). | +| `provider` | Yes | Built-in provider key: `github` or `azure-devops`. | +| `auth` | Yes | Auth scheme (see below). | +| `token` | No | Token value (inline). Use `token_env` instead when possible. | +| `token_env` | No | Environment variable name to read the token from. | + +For `azure-ad` auth, additional fields are required: + +| Field | Required | Description | +|---|---|---| +| `tenant_id` | Yes | Azure AD tenant ID. | +| `client_id` | Yes | Service principal client ID. | +| `client_secret_env` | Yes | Environment variable containing the client secret. | + +Either `token` or `token_env` must be set for `bearer` and `basic-pat` schemes. + +## Providers and auth schemes + +### GitHub (`github`) + +| Scheme | Header | Use for | +|---|---|---| +| `bearer` | `Authorization: Bearer ` | PATs, fine-grained PATs, OAuth tokens, GitHub App tokens | + +**Example — PAT via environment variable:** + +```json +{ + "hosts": ["github.com", "api.github.com", "raw.githubusercontent.com", "codeload.github.com"], + "provider": "github", + "auth": "bearer", + "token_env": "GH_TOKEN" +} +``` + +### Azure DevOps (`azure-devops`) + +| Scheme | Header | Use for | +|---|---|---| +| `basic-pat` | `Authorization: Basic base64(:)` | Personal Access Tokens | +| `bearer` | `Authorization: Bearer ` | Pre-acquired OAuth / Azure AD tokens | +| `azure-cli` | `Authorization: Bearer ` | Token acquired via `az account get-access-token` | +| `azure-ad` | `Authorization: Bearer ` | Token acquired via OAuth2 client credentials flow | + +**Example — PAT via environment variable:** + +```json +{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "basic-pat", + "token_env": "AZURE_DEVOPS_PAT" +} +``` + +**Example — Azure CLI (interactive login):** + +```json +{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "azure-cli" +} +``` + +Requires `az login` to have been run beforehand. + +**Example — Azure AD service principal (CI/automation):** + +```json +{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "azure-ad", + "tenant_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "client_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "client_secret_env": "AZURE_CLIENT_SECRET" +} +``` + +## Multiple entries + +You can configure multiple entries for different hosts or organizations: + +```json +{ + "providers": [ + { + "hosts": ["github.com", "api.github.com", "raw.githubusercontent.com", "codeload.github.com"], + "provider": "github", + "auth": "bearer", + "token_env": "GH_TOKEN" + }, + { + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "basic-pat", + "token_env": "AZURE_DEVOPS_PAT" + } + ] +} +``` + +## How it works + +1. For each outbound HTTP request, the URL hostname is matched against + the `hosts` patterns in `auth.json`. +2. If a match is found, the corresponding provider resolves the token + and attaches the appropriate `Authorization` header. +3. If the request receives a 401 or 403, the next matching entry is tried. +4. After all matching entries are exhausted, an unauthenticated request + is attempted as a final fallback. +5. On redirects, the `Authorization` header is stripped if the redirect + target leaves the entry's declared hosts — preventing credential + leakage to CDNs or third-party services. + +## Template + +A reference `auth.json` with GitHub pre-configured: + +```json +{ + "providers": [ + { + "hosts": [ + "github.com", + "api.github.com", + "raw.githubusercontent.com", + "codeload.github.com" + ], + "provider": "github", + "auth": "bearer", + "token_env": "GH_TOKEN" + } + ] +} +``` + +To use it: + +```bash +mkdir -p ~/.specify +# Copy the JSON above into ~/.specify/auth.json +chmod 600 ~/.specify/auth.json +``` diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d5f5aba2d5..2fc5509b62 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -1721,22 +1721,14 @@ def _fetch_latest_release_tag() -> tuple[str | None, str | None]: On anything else — including a malformed response body — the exception propagates; there is no catch-all (research D-006). """ - req = urllib.request.Request( - GITHUB_API_LATEST, - headers={"Accept": "application/vnd.github+json"}, - ) - token = None - for env_var in ("GH_TOKEN", "GITHUB_TOKEN"): - candidate = os.environ.get(env_var) - if candidate is not None: - candidate = candidate.strip() - if candidate: - token = candidate - break - if token: - req.add_header("Authorization", f"Bearer {token}") + from .authentication.http import open_url + try: - with urllib.request.urlopen(req, timeout=5) as resp: + with open_url( + GITHUB_API_LATEST, + timeout=5, + extra_headers={"Accept": "application/vnd.github+json"}, + ) as resp: payload = json.loads(resp.read().decode("utf-8")) tag = payload.get("tag_name") if not isinstance(tag, str) or not tag: @@ -1745,7 +1737,7 @@ def _fetch_latest_release_tag() -> tuple[str | None, str | None]: except urllib.error.HTTPError as e: # Order matters: HTTPError is a subclass of URLError. if e.code == 403: - return None, "rate limited (try setting GH_TOKEN or GITHUB_TOKEN)" + return None, "rate limited (try setting GH_TOKEN and configuring ~/.specify/auth.json)" return None, f"HTTP {e.code}" except (urllib.error.URLError, OSError): return None, "offline or timeout" @@ -2633,7 +2625,9 @@ def preset_add( with tempfile.TemporaryDirectory() as tmpdir: zip_path = Path(tmpdir) / "preset.zip" try: - with urllib.request.urlopen(from_url, timeout=60) as response: + from specify_cli.authentication.http import open_url as _open_url + + with _open_url(from_url, timeout=60) as response: zip_path.write_bytes(response.read()) except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download: {e}") @@ -3637,7 +3631,9 @@ def extension_add( zip_path = download_dir / f"{extension}-url-download.zip" try: - with urllib.request.urlopen(from_url, timeout=60) as response: + from specify_cli.authentication.http import open_url as _open_url + + with _open_url(from_url, timeout=60) as response: zip_data = response.read() zip_path.write_bytes(zip_data) @@ -4927,7 +4923,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if source.startswith("http://") or source.startswith("https://"): from ipaddress import ip_address from urllib.parse import urlparse - from urllib.request import urlopen # noqa: S310 + from specify_cli.authentication.http import open_url as _open_url parsed_src = urlparse(source) src_host = parsed_src.hostname or "" @@ -4944,7 +4940,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: import tempfile try: - with urlopen(source, timeout=30) as resp: # noqa: S310 + with _open_url(source, timeout=30) as resp: final_url = resp.geturl() final_parsed = urlparse(final_url) final_host = final_parsed.hostname or "" @@ -5040,10 +5036,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: workflow_file = workflow_dir / "workflow.yml" try: - from urllib.request import urlopen # noqa: S310 — URL comes from catalog + from specify_cli.authentication.http import open_url as _open_url workflow_dir.mkdir(parents=True, exist_ok=True) - with urlopen(workflow_url, timeout=30) as response: # noqa: S310 + with _open_url(workflow_url, timeout=30) as response: # Validate final URL after redirects final_url = response.geturl() final_parsed = urlparse(final_url) diff --git a/src/specify_cli/authentication/__init__.py b/src/specify_cli/authentication/__init__.py new file mode 100644 index 0000000000..b4963af76b --- /dev/null +++ b/src/specify_cli/authentication/__init__.py @@ -0,0 +1,50 @@ +"""Authentication provider registry for multi-platform support. + +Credentials are **opt-in only**. No authentication headers are sent unless +the user creates ``~/.specify/auth.json`` mapping hosts to providers. +Provider classes define *how* to authenticate (Bearer, Basic-PAT, etc.) +while the config file defines *where* and *with what credentials*. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .base import AuthProvider + +# Maps provider key → AuthProvider class instance. +AUTH_REGISTRY: dict[str, AuthProvider] = {} + + +def _register(provider: AuthProvider) -> None: + """Register a provider instance in the global registry. + + Raises ``ValueError`` for falsy keys and ``KeyError`` for duplicates. + """ + key = provider.key + if not key: + raise ValueError("Cannot register provider with an empty key.") + if key in AUTH_REGISTRY: + raise KeyError(f"Provider with key {key!r} is already registered.") + AUTH_REGISTRY[key] = provider + + +def get_provider(key: str) -> AuthProvider | None: + """Return the provider for *key*, or ``None`` if not registered.""" + return AUTH_REGISTRY.get(key) + + +# -- Register built-in providers ----------------------------------------- + + +def _register_builtins() -> None: + """Register all built-in authentication providers (alphabetical).""" + from .azure_devops import AzureDevOpsAuth + from .github import GitHubAuth + + _register(AzureDevOpsAuth()) + _register(GitHubAuth()) + + +_register_builtins() diff --git a/src/specify_cli/authentication/azure_devops.py b/src/specify_cli/authentication/azure_devops.py new file mode 100644 index 0000000000..136d4f2904 --- /dev/null +++ b/src/specify_cli/authentication/azure_devops.py @@ -0,0 +1,117 @@ +"""Azure DevOps authentication provider.""" + +from __future__ import annotations + +import base64 +import json as _json +import os +import subprocess +from typing import TYPE_CHECKING + +from .base import AuthProvider + +if TYPE_CHECKING: + from .config import AuthConfigEntry + +# Azure DevOps resource ID for OAuth / Azure AD token acquisition. +_ADO_RESOURCE_ID = "499b84ac-1321-427f-aa17-267ca6975798" + + +class AzureDevOpsAuth(AuthProvider): + """Azure DevOps authentication provider. + + Supports four auth schemes: + + * ``basic-pat`` — PAT with empty username, Base64-encoded as ``:`` + * ``bearer`` — pre-acquired OAuth / Azure AD token + * ``azure-cli`` — acquires a token via ``az account get-access-token`` + * ``azure-ad`` — acquires a token via OAuth2 client credentials flow + """ + + key = "azure-devops" + supported_auth_schemes = ("basic-pat", "bearer", "azure-cli", "azure-ad") + + def auth_headers(self, token: str, auth_scheme: str) -> dict[str, str]: + """Build the ``Authorization`` header for the given scheme.""" + if auth_scheme == "basic-pat": + encoded = base64.b64encode(f":{token}".encode("ascii")).decode("ascii") + return {"Authorization": f"Basic {encoded}"} + if auth_scheme in ("bearer", "azure-cli", "azure-ad"): + return {"Authorization": f"Bearer {token}"} + raise ValueError( + f"AzureDevOpsAuth does not support auth scheme {auth_scheme!r}" + ) + + def resolve_token(self, entry: AuthConfigEntry) -> str | None: + """Resolve token, with special handling for azure-cli and azure-ad.""" + if entry.auth == "azure-cli": + return self._acquire_via_az_cli() + if entry.auth == "azure-ad": + return self._acquire_via_client_credentials(entry) + return super().resolve_token(entry) + + # -- Token acquisition ------------------------------------------------ + + @staticmethod + def _acquire_via_az_cli() -> str | None: + """Run ``az account get-access-token`` and return the access token.""" + try: + result = subprocess.run( # noqa: S603, S607 + [ + "az", + "account", + "get-access-token", + "--resource", + _ADO_RESOURCE_ID, + "--output", + "json", + ], + capture_output=True, + text=True, + timeout=30, + check=False, + ) + if result.returncode != 0: + return None + payload = _json.loads(result.stdout) + token = payload.get("accessToken", "").strip() + return token or None + except (OSError, _json.JSONDecodeError, KeyError): + return None + + @staticmethod + def _acquire_via_client_credentials(entry: AuthConfigEntry) -> str | None: + """Acquire a token via OAuth2 client credentials flow.""" + import urllib.error + import urllib.request + + if not entry.tenant_id or not entry.client_id or not entry.client_secret_env: + return None + client_secret = os.environ.get(entry.client_secret_env, "").strip() + if not client_secret: + return None + + url = ( + f"https://login.microsoftonline.com/{entry.tenant_id}" + "/oauth2/v2.0/token" + ) + from urllib.parse import urlencode + body = urlencode({ + "grant_type": "client_credentials", + "client_id": entry.client_id, + "client_secret": client_secret, + "scope": f"{_ADO_RESOURCE_ID}/.default", + }).encode("utf-8") + + req = urllib.request.Request( + url, + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 + payload = _json.loads(resp.read().decode("utf-8")) + token = payload.get("access_token", "").strip() + return token or None + except (urllib.error.URLError, OSError, _json.JSONDecodeError, KeyError): + return None diff --git a/src/specify_cli/authentication/base.py b/src/specify_cli/authentication/base.py new file mode 100644 index 0000000000..d6e0f4b118 --- /dev/null +++ b/src/specify_cli/authentication/base.py @@ -0,0 +1,57 @@ +"""Abstract base class for authentication providers.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .config import AuthConfigEntry + + +class AuthProvider(ABC): + """Abstract base class every authentication provider must implement. + + Subclasses must set: + + * ``key`` — unique provider identifier (e.g. ``"github"``, ``"azure-devops"``) + * ``supported_auth_schemes`` — tuple of auth scheme strings this provider handles + + And implement: + + * ``auth_headers(token, auth_scheme)`` — build headers from a resolved token + * ``resolve_token(entry)`` — obtain the token for a config entry + """ + + key: str = "" + """Unique provider identifier.""" + + supported_auth_schemes: tuple[str, ...] = () + """Auth schemes this provider supports (e.g. ``("bearer",)``).""" + + @abstractmethod + def auth_headers(self, token: str, auth_scheme: str) -> dict[str, str]: + """Build authentication headers for *token* using *auth_scheme*. + + Must return a dict with at least an ``Authorization`` key. + """ + + def resolve_token(self, entry: AuthConfigEntry) -> str | None: + """Resolve the token for *entry*. + + Default implementation reads from ``entry.token`` directly + or from the environment variable named by ``entry.token_env``. + Override for schemes that acquire tokens dynamically + (e.g. ``azure-cli``, ``azure-ad``). + """ + import os + + if entry.token: + return entry.token.strip() or None + if entry.token_env: + val = os.environ.get(entry.token_env) + if val is not None: + val = val.strip() + if val: + return val + return None diff --git a/src/specify_cli/authentication/config.py b/src/specify_cli/authentication/config.py new file mode 100644 index 0000000000..6ded5d2af9 --- /dev/null +++ b/src/specify_cli/authentication/config.py @@ -0,0 +1,195 @@ +"""Authentication configuration loader. + +Reads ``~/.specify/auth.json`` to determine which hosts receive credentials +and which provider/auth-scheme to use. No credentials are sent without +an explicit opt-in via this file. +""" + +from __future__ import annotations + +import json +import stat +from dataclasses import dataclass +from fnmatch import fnmatch +from pathlib import Path +from urllib.parse import urlparse + + +@dataclass(frozen=True) +class AuthConfigEntry: + """A single provider entry from ``auth.json``.""" + + hosts: tuple[str, ...] + provider: str + auth: str + token: str | None = None + token_env: str | None = None + # Azure AD service-principal fields + tenant_id: str | None = None + client_id: str | None = None + client_secret_env: str | None = None + + +_VALID_AUTH_SCHEMES = frozenset({ + "bearer", + "basic-pat", + "azure-cli", + "azure-ad", +}) + + +def _default_config_path() -> Path: + """Return ``~/.specify/auth.json``.""" + return Path.home() / ".specify" / "auth.json" + + +def load_auth_config( + path: Path | None = None, +) -> list[AuthConfigEntry]: + """Load and validate ``auth.json``, returning configured entries. + + Returns an empty list when the file does not exist — this means + all HTTP requests will be unauthenticated (opt-in model). + + Raises ``ValueError`` on schema violations. Callers that want + misconfigurations to fail fast can allow this exception to + propagate; higher-level HTTP helpers may instead catch it, + warn, and continue with unauthenticated requests. + """ + config_path = path or _default_config_path() + + if not config_path.is_file(): + return [] + + # Warn (but don't fail) if the file is world-readable. + try: + mode = config_path.stat().st_mode + if mode & (stat.S_IRGRP | stat.S_IROTH): + import warnings + + warnings.warn( + f"{config_path} is readable by group/others. " + "Consider restricting with: chmod 600 " + f"{config_path}", + UserWarning, + stacklevel=2, + ) + except OSError: + pass # stat failed — skip permission check + + raw = json.loads(config_path.read_text(encoding="utf-8")) + + if not isinstance(raw, dict): + raise ValueError(f"auth.json must be a JSON object, got {type(raw).__name__}") + + providers_raw = raw.get("providers") + if not isinstance(providers_raw, list): + raise ValueError("auth.json must contain a 'providers' array") + + entries: list[AuthConfigEntry] = [] + for i, entry_raw in enumerate(providers_raw): + if not isinstance(entry_raw, dict): + raise ValueError(f"providers[{i}]: must be a JSON object") + + hosts = entry_raw.get("hosts") + if not isinstance(hosts, list) or not hosts: + raise ValueError(f"providers[{i}]: 'hosts' must be a non-empty array") + if not all(isinstance(h, str) and h.strip() for h in hosts): + raise ValueError(f"providers[{i}]: each host must be a non-empty string") + # Normalize hosts: strip whitespace and lowercase + hosts = [h.strip().lower() for h in hosts] + + provider = entry_raw.get("provider", "") + if not isinstance(provider, str) or not provider: + raise ValueError(f"providers[{i}]: 'provider' must be a non-empty string") + + auth = entry_raw.get("auth", "") + if not isinstance(auth, str) or not auth: + raise ValueError(f"providers[{i}]: 'auth' must be a non-empty string") + if auth not in _VALID_AUTH_SCHEMES: + raise ValueError( + f"providers[{i}]: unsupported auth scheme {auth!r}; " + f"valid: {sorted(_VALID_AUTH_SCHEMES)}" + ) + + token = entry_raw.get("token") + token_env = entry_raw.get("token_env") + + # Validate token/token_env types + if token is not None and (not isinstance(token, str) or not token.strip()): + raise ValueError(f"providers[{i}]: 'token' must be a non-empty string") + if token_env is not None and (not isinstance(token_env, str) or not token_env.strip()): + raise ValueError(f"providers[{i}]: 'token_env' must be a non-empty string") + + # Validate provider+scheme compatibility + from . import get_provider as _get_provider + _prov = _get_provider(provider) + if _prov is None: + from . import AUTH_REGISTRY + raise ValueError( + f"providers[{i}]: unknown provider {provider!r}; " + f"registered: {sorted(AUTH_REGISTRY.keys())}" + ) + if auth not in _prov.supported_auth_schemes: + raise ValueError( + f"providers[{i}]: provider {provider!r} does not support " + f"auth scheme {auth!r}; supported: {list(_prov.supported_auth_schemes)}" + ) + + # Validate token source based on auth scheme + if auth in ("bearer", "basic-pat"): + if not token and not token_env: + raise ValueError( + f"providers[{i}]: auth={auth!r} requires 'token' or 'token_env'" + ) + elif auth == "azure-ad": + tenant_id = entry_raw.get("tenant_id") + client_id = entry_raw.get("client_id") + client_secret_env = entry_raw.get("client_secret_env") + if not all([tenant_id, client_id, client_secret_env]): + raise ValueError( + f"providers[{i}]: auth='azure-ad' requires " + "'tenant_id', 'client_id', and 'client_secret_env'" + ) + for field_name, field_val in [ + ("tenant_id", tenant_id), + ("client_id", client_id), + ("client_secret_env", client_secret_env), + ]: + if not isinstance(field_val, str) or not field_val.strip(): + raise ValueError( + f"providers[{i}]: '{field_name}' must be a non-empty string" + ) + # azure-cli needs no extra fields + + entries.append( + AuthConfigEntry( + hosts=tuple(hosts), + provider=provider, + auth=auth, + token=token, + token_env=token_env, + tenant_id=entry_raw.get("tenant_id"), + client_id=entry_raw.get("client_id"), + client_secret_env=entry_raw.get("client_secret_env"), + ) + ) + + return entries + + +def find_entries_for_url( + url: str, entries: list[AuthConfigEntry] +) -> list[AuthConfigEntry]: + """Return entries whose ``hosts`` match the hostname of *url*.""" + hostname = (urlparse(url).hostname or "").lower() + if not hostname: + return [] + return [ + e + for e in entries + if any( + pattern == hostname or fnmatch(hostname, pattern) + for pattern in e.hosts + ) + ] diff --git a/src/specify_cli/authentication/github.py b/src/specify_cli/authentication/github.py new file mode 100644 index 0000000000..3797d82926 --- /dev/null +++ b/src/specify_cli/authentication/github.py @@ -0,0 +1,24 @@ +"""GitHub authentication provider.""" + +from __future__ import annotations + +from .base import AuthProvider + + +class GitHubAuth(AuthProvider): + """GitHub authentication provider. + + Supports the ``bearer`` auth scheme, used for PATs, fine-grained PATs, + OAuth tokens, and GitHub App installation tokens. + """ + + key = "github" + supported_auth_schemes = ("bearer",) + + def auth_headers(self, token: str, auth_scheme: str) -> dict[str, str]: + """Return ``Authorization: Bearer ``.""" + if auth_scheme != "bearer": + raise ValueError( + f"GitHubAuth does not support auth scheme {auth_scheme!r}" + ) + return {"Authorization": f"Bearer {token}"} diff --git a/src/specify_cli/authentication/http.py b/src/specify_cli/authentication/http.py new file mode 100644 index 0000000000..e68222e182 --- /dev/null +++ b/src/specify_cli/authentication/http.py @@ -0,0 +1,138 @@ +"""Authenticated HTTP helpers driven by ``~/.specify/auth.json``. + +No credentials are sent unless the user has created ``auth.json``. +For each outbound URL the helper matches the hostname against +configured entries, resolves the token via the appropriate provider +class, and attaches auth headers. Redirect safety is enforced: +the ``Authorization`` header is stripped when a redirect leaves the +entry's declared hosts. On 401/403 the next matching entry is tried, +then unauthenticated. +""" + +from __future__ import annotations + +import urllib.error +import urllib.request +from fnmatch import fnmatch +from urllib.parse import urlparse + +from . import get_provider +from .config import AuthConfigEntry, find_entries_for_url, load_auth_config + + +_config_override: list[AuthConfigEntry] | None = None + + +def _load_config() -> list[AuthConfigEntry]: + """Load auth config, using override if set (for testing).""" + if _config_override is not None: + return _config_override + try: + return load_auth_config() + except (ValueError, OSError) as exc: + import warnings + warnings.warn( + f"Failed to load ~/.specify/auth.json: {exc}. " + "All requests will be unauthenticated.", + UserWarning, + stacklevel=2, + ) + return [] + + +def _hostname_in_hosts(hostname: str, hosts: tuple[str, ...]) -> bool: + """Return True if *hostname* matches any pattern in *hosts*.""" + hostname = hostname.lower() + return any(p == hostname or fnmatch(hostname, p) for p in hosts) + + +class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler): + """Drop ``Authorization`` when a redirect leaves the entry's declared hosts.""" + + def __init__(self, hosts: tuple[str, ...]) -> None: + super().__init__() + self._hosts = hosts + + def redirect_request(self, req, fp, code, msg, headers, newurl): + original_auth = ( + req.get_header("Authorization") + or req.unredirected_hdrs.get("Authorization") + ) + new_req = super().redirect_request(req, fp, code, msg, headers, newurl) + if new_req is not None: + hostname = (urlparse(newurl).hostname or "").lower() + if _hostname_in_hosts(hostname, self._hosts): + if original_auth: + new_req.add_unredirected_header("Authorization", original_auth) + else: + new_req.headers.pop("Authorization", None) + new_req.unredirected_hdrs.pop("Authorization", None) + return new_req + + +def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urllib.request.Request: + """Build a :class:`~urllib.request.Request`, attaching auth when config matches. + + Uses the first matching entry from ``auth.json`` whose token resolves. + Returns a plain request when no entry matches or the file doesn't exist. + """ + headers: dict[str, str] = {} + if extra_headers: + # Strip Authorization from extra_headers to prevent bypass + headers.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"}) + # Auth headers applied last — cannot be overridden by extra_headers + entries = find_entries_for_url(url, _load_config()) + for entry in entries: + provider = get_provider(entry.provider) + if provider is None: + continue + token = provider.resolve_token(entry) + if token: + headers.update(provider.auth_headers(token, entry.auth)) + break + return urllib.request.Request(url, headers=headers) + + +def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = None): + """Open *url* with config-driven auth, redirect stripping, and fallthrough. + + 1. Find ``auth.json`` entries whose hosts match the URL. + 2. For each entry, resolve the token and try the request. + 3. On 401/403 move to the next matching entry. + 4. After all entries exhausted (or none matched), try unauthenticated. + 5. Non-auth errors (404, 500, network) raise immediately. + + *extra_headers* (e.g. ``Accept``) are merged into every attempt. + """ + entries = find_entries_for_url(url, _load_config()) + + def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request: + merged = {} + if extra_headers: + # Strip Authorization from extra_headers to prevent bypass + merged.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"}) + # Auth headers applied last — cannot be overridden by extra_headers + merged.update(auth_headers) + return urllib.request.Request(url, headers=merged) + + # Try each matching entry + for entry in entries: + provider = get_provider(entry.provider) + if provider is None: + continue + token = provider.resolve_token(entry) + if not token: + continue + + req = _make_req(provider.auth_headers(token, entry.auth)) + opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts)) + try: + return opener.open(req, timeout=timeout) + except urllib.error.HTTPError as exc: + if exc.code in (401, 403): + continue # try next entry + raise + + # No entry worked (or none matched) — unauthenticated fallback + req = _make_req({}) + return urllib.request.urlopen(req, timeout=timeout) # noqa: S310 diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index a419ebf1d2..5f8e196282 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -1546,20 +1546,20 @@ def _validate_catalog_url(self, url: str) -> None: raise ValidationError("Catalog URL must be a valid URL with a host.") def _make_request(self, url: str): - """Build a urllib Request, adding a GitHub auth header when available. + """Build a urllib Request, adding auth headers when a provider matches. - Delegates to :func:`specify_cli._github_http.build_github_request`. + Delegates to :func:`specify_cli.authentication.http.build_request`. """ - from specify_cli._github_http import build_github_request - return build_github_request(url) + from specify_cli.authentication.http import build_request + return build_request(url) def _open_url(self, url: str, timeout: int = 10): - """Open a URL with GitHub auth, stripping the header on cross-host redirects. + """Open a URL with provider-based auth, trying each configured provider. - Delegates to :func:`specify_cli._github_http.open_github_url`. + Delegates to :func:`specify_cli.authentication.http.open_url`. """ - from specify_cli._github_http import open_github_url - return open_github_url(url, timeout) + from specify_cli.authentication.http import open_url + return open_url(url, timeout) def _load_catalog_config(self, config_path: Path) -> Optional[List[CatalogEntry]]: """Load catalog stack configuration from a YAML file. diff --git a/src/specify_cli/integrations/catalog.py b/src/specify_cli/integrations/catalog.py index 2faa69ae96..908813258a 100644 --- a/src/specify_cli/integrations/catalog.py +++ b/src/specify_cli/integrations/catalog.py @@ -256,7 +256,9 @@ def _fetch_single_catalog( pass # Cache cleanup is best-effort; ignore deletion failures. try: - with urllib.request.urlopen(entry.url, timeout=10) as resp: + from specify_cli.authentication.http import open_url + + with open_url(entry.url, timeout=10) as resp: # Validate final URL after redirects final_url = resp.geturl() if final_url != entry.url: diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index 27054a77fc..6f9d0dea82 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -1845,20 +1845,20 @@ def _validate_catalog_url(self, url: str) -> None: ) def _make_request(self, url: str): - """Build a urllib Request, adding a GitHub auth header when available. + """Build a urllib Request, adding auth headers when a provider matches. - Delegates to :func:`specify_cli._github_http.build_github_request`. + Delegates to :func:`specify_cli.authentication.http.build_request`. """ - from specify_cli._github_http import build_github_request - return build_github_request(url) + from specify_cli.authentication.http import build_request + return build_request(url) def _open_url(self, url: str, timeout: int = 10): - """Open a URL with GitHub auth, stripping the header on cross-host redirects. + """Open a URL with provider-based auth, trying each configured provider. - Delegates to :func:`specify_cli._github_http.open_github_url`. + Delegates to :func:`specify_cli.authentication.http.open_url`. """ - from specify_cli._github_http import open_github_url - return open_github_url(url, timeout) + from specify_cli.authentication.http import open_url + return open_url(url, timeout) def _load_catalog_config(self, config_path: Path) -> Optional[List[PresetCatalogEntry]]: """Load catalog stack configuration from a YAML file. diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da5c60b5c8..213b443e3d 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -322,7 +322,7 @@ def _fetch_single_catalog( # Fetch from URL — validate scheme before opening and after redirects from urllib.parse import urlparse - from urllib.request import urlopen + from specify_cli.authentication.http import open_url as _open_url def _validate_catalog_url(url: str) -> None: parsed = urlparse(url) @@ -337,7 +337,7 @@ def _validate_catalog_url(url: str) -> None: _validate_catalog_url(entry.url) try: - with urlopen(entry.url, timeout=30) as resp: # noqa: S310 + with _open_url(entry.url, timeout=30) as resp: _validate_catalog_url(resp.geturl()) data = json.loads(resp.read().decode("utf-8")) except Exception as exc: diff --git a/tests/auth_helpers.py b/tests/auth_helpers.py new file mode 100644 index 0000000000..babc43e406 --- /dev/null +++ b/tests/auth_helpers.py @@ -0,0 +1,21 @@ +"""Shared test helpers for authentication config injection.""" + +from __future__ import annotations + +from specify_cli.authentication.config import AuthConfigEntry + + +def make_github_auth_entry(token_env: str = "GH_TOKEN") -> AuthConfigEntry: + """Build a GitHub ``AuthConfigEntry`` for testing.""" + return AuthConfigEntry( + hosts=("github.com", "api.github.com", "raw.githubusercontent.com", "codeload.github.com"), + provider="github", + auth="bearer", + token_env=token_env, + ) + + +def inject_github_config(monkeypatch, token_env: str = "GH_TOKEN") -> None: + """Inject a GitHub auth.json config entry into the auth HTTP module.""" + from specify_cli.authentication import http as _auth_http + monkeypatch.setattr(_auth_http, "_config_override", [make_github_auth_entry(token_env)]) diff --git a/tests/conftest.py b/tests/conftest.py index 9e8ffaae59..42ca6f707f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -66,3 +66,15 @@ def _has_working_bash() -> bool: def strip_ansi(text: str) -> str: """Remove ANSI escape codes from Rich-formatted CLI output.""" return _ANSI_ESCAPE_RE.sub("", text) + + +# --------------------------------------------------------------------------- +# Auth config isolation — prevents tests from reading ~/.specify/auth.json +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _isolate_auth_config(monkeypatch): + """Ensure no test reads the real ~/.specify/auth.json.""" + from specify_cli.authentication import http as _auth_http + monkeypatch.setattr(_auth_http, "_config_override", []) diff --git a/tests/integrations/test_integration_catalog.py b/tests/integrations/test_integration_catalog.py index 6d82a6c390..de01b40fc2 100644 --- a/tests/integrations/test_integration_catalog.py +++ b/tests/integrations/test_integration_catalog.py @@ -128,12 +128,12 @@ class TestCatalogFetch: """Tests that use a local HTTP server stub via monkeypatch.""" def _patch_urlopen(self, monkeypatch, catalog_data): - """Patch urllib.request.urlopen to return *catalog_data*.""" + """Patch authentication.http.urllib.request.urlopen to return *catalog_data*.""" class FakeResponse: def __init__(self, data, url=""): self._data = json.dumps(data).encode() - self._url = url + self._url = url if isinstance(url, str) else url.full_url def read(self): return self._data @@ -147,11 +147,12 @@ def __enter__(self): def __exit__(self, *a): pass - def fake_urlopen(url, timeout=10): + def fake_urlopen(req, timeout=10): + url = req if isinstance(req, str) else req.full_url return FakeResponse(catalog_data, url) - import urllib.request - monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) + import specify_cli.authentication.http as _auth_http + monkeypatch.setattr(_auth_http.urllib.request, "urlopen", fake_urlopen) def test_fetch_and_search_all(self, tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) @@ -448,12 +449,12 @@ def test_list_catalog_flag(self, tmp_path, monkeypatch): }, } - import urllib.request + import specify_cli.authentication.http as _auth_http class FakeResponse: def __init__(self, data, url=""): self._data = json.dumps(data).encode() - self._url = url + self._url = url if isinstance(url, str) else url.full_url def read(self): return self._data def geturl(self): @@ -463,7 +464,8 @@ def __enter__(self): def __exit__(self, *a): pass - monkeypatch.setattr(urllib.request, "urlopen", lambda url, timeout=10: FakeResponse(catalog, url)) + monkeypatch.setattr(_auth_http.urllib.request, "urlopen", + lambda req, timeout=10: FakeResponse(catalog, req if isinstance(req, str) else req.full_url)) old = os.getcwd() try: diff --git a/tests/test_authentication.py b/tests/test_authentication.py new file mode 100644 index 0000000000..056d457e7e --- /dev/null +++ b/tests/test_authentication.py @@ -0,0 +1,762 @@ +"""Tests for the authentication provider registry and config-driven HTTP helpers. + +Covers: +- Config loading (auth.json parsing, validation, permission warning) +- Registry mechanics (_register, get_provider, duplicate/empty-key guards) +- GitHubAuth — bearer headers +- AzureDevOpsAuth — basic-pat, bearer, azure-cli, azure-ad headers +- Host matching (find_entries_for_url) +- open_url — config-driven auth with fallthrough and redirect stripping +- build_request — single-shot request construction +- _fetch_latest_release_tag() delegation +""" + +from __future__ import annotations + +import base64 +import json + +import pytest + +from specify_cli.authentication import AUTH_REGISTRY, _register, get_provider +from specify_cli.authentication.azure_devops import AzureDevOpsAuth +from specify_cli.authentication.base import AuthProvider +from specify_cli.authentication.config import ( + AuthConfigEntry, + find_entries_for_url, + load_auth_config, +) +from specify_cli.authentication.github import GitHubAuth + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _github_entry(token_env: str = "GH_TOKEN", token: str | None = None) -> AuthConfigEntry: + """Build a standard GitHub config entry.""" + return AuthConfigEntry( + hosts=("github.com", "api.github.com", "raw.githubusercontent.com", "codeload.github.com"), + provider="github", + auth="bearer", + token=token, + token_env=token_env if token is None else None, + ) + + +def _ado_basic_entry(token_env: str = "AZURE_DEVOPS_PAT") -> AuthConfigEntry: + """Build an ADO basic-pat config entry.""" + return AuthConfigEntry( + hosts=("dev.azure.com",), + provider="azure-devops", + auth="basic-pat", + token_env=token_env, + ) + + +class _StubProvider(AuthProvider): + """Minimal concrete provider for registry mechanics tests.""" + + key = "stub-provider" + supported_auth_schemes = ("bearer",) + + def auth_headers(self, token: str, auth_scheme: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}"} + + +# --------------------------------------------------------------------------- +# Config loading +# --------------------------------------------------------------------------- + + +class TestLoadAuthConfig: + def test_missing_file_returns_empty(self, tmp_path): + assert load_auth_config(tmp_path / "nonexistent.json") == [] + + def test_valid_github_config(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["github.com"], + "provider": "github", + "auth": "bearer", + "token_env": "GH_TOKEN", + }] + })) + entries = load_auth_config(cfg) + assert len(entries) == 1 + assert entries[0].provider == "github" + assert entries[0].auth == "bearer" + assert entries[0].token_env == "GH_TOKEN" + + def test_valid_ado_config(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "basic-pat", + "token_env": "AZURE_DEVOPS_PAT", + }] + })) + entries = load_auth_config(cfg) + assert len(entries) == 1 + assert entries[0].provider == "azure-devops" + assert entries[0].auth == "basic-pat" + + def test_inline_token(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["github.com"], + "provider": "github", + "auth": "bearer", + "token": "ghp_inline_token", + }] + })) + entries = load_auth_config(cfg) + assert entries[0].token == "ghp_inline_token" + + def test_azure_ad_config(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "azure-ad", + "tenant_id": "tid", + "client_id": "cid", + "client_secret_env": "SECRET", + }] + })) + entries = load_auth_config(cfg) + assert entries[0].auth == "azure-ad" + assert entries[0].tenant_id == "tid" + + def test_azure_cli_config(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "azure-cli", + }] + })) + entries = load_auth_config(cfg) + assert entries[0].auth == "azure-cli" + + def test_multiple_entries(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [ + {"hosts": ["github.com"], "provider": "github", "auth": "bearer", "token_env": "GH_TOKEN"}, + {"hosts": ["dev.azure.com"], "provider": "azure-devops", "auth": "basic-pat", "token_env": "ADO_PAT"}, + ] + })) + entries = load_auth_config(cfg) + assert len(entries) == 2 + + # -- Negative: validation errors -- + + def test_invalid_json_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text("not json") + with pytest.raises(json.JSONDecodeError): + load_auth_config(cfg) + + def test_not_object_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text("[]") + with pytest.raises(ValueError, match="JSON object"): + load_auth_config(cfg) + + def test_missing_providers_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({"foo": "bar"})) + with pytest.raises(ValueError, match="providers"): + load_auth_config(cfg) + + def test_empty_hosts_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": [], "provider": "github", "auth": "bearer", "token_env": "X"}] + })) + with pytest.raises(ValueError, match="non-empty"): + load_auth_config(cfg) + + def test_missing_provider_key_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": ["github.com"], "auth": "bearer", "token_env": "X"}] + })) + with pytest.raises(ValueError, match="provider"): + load_auth_config(cfg) + + def test_unsupported_auth_scheme_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": ["github.com"], "provider": "github", "auth": "ntlm", "token_env": "X"}] + })) + with pytest.raises(ValueError, match="unsupported"): + load_auth_config(cfg) + + def test_bearer_without_token_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": ["github.com"], "provider": "github", "auth": "bearer"}] + })) + with pytest.raises(ValueError, match="token"): + load_auth_config(cfg) + + def test_azure_ad_missing_fields_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["dev.azure.com"], + "provider": "azure-devops", + "auth": "azure-ad", + "tenant_id": "tid", + }] + })) + with pytest.raises(ValueError, match="azure-ad"): + load_auth_config(cfg) + + def test_unknown_provider_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": ["example.com"], "provider": "gitlab", "auth": "bearer", "token_env": "X"}] + })) + with pytest.raises(ValueError, match="unknown provider"): + load_auth_config(cfg) + + def test_incompatible_provider_scheme_raises(self, tmp_path): + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{ + "hosts": ["github.com"], + "provider": "github", + "auth": "basic-pat", + "token_env": "X", + }] + })) + with pytest.raises(ValueError, match="does not support"): + load_auth_config(cfg) + + def test_world_readable_warns(self, tmp_path): + import stat + + cfg = tmp_path / "auth.json" + cfg.write_text(json.dumps({ + "providers": [{"hosts": ["github.com"], "provider": "github", "auth": "bearer", "token_env": "GH_TOKEN"}] + })) + cfg.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) + with pytest.warns(UserWarning, match="readable by group"): + load_auth_config(cfg) + + +# --------------------------------------------------------------------------- +# Host matching +# --------------------------------------------------------------------------- + + +class TestFindEntriesForUrl: + def test_exact_match(self): + entry = _github_entry() + result = find_entries_for_url("https://github.com/org/repo", [entry]) + assert result == [entry] + + def test_wildcard_match(self): + entry = AuthConfigEntry( + hosts=("*.visualstudio.com",), + provider="azure-devops", + auth="basic-pat", + token_env="ADO_PAT", + ) + result = find_entries_for_url("https://myorg.visualstudio.com/project", [entry]) + assert result == [entry] + + def test_no_match_returns_empty(self): + entry = _github_entry() + result = find_entries_for_url("https://evil.example.com/file", [entry]) + assert result == [] + + def test_no_match_for_lookalike_host(self): + entry = _github_entry() + result = find_entries_for_url("https://github.com.evil.com/file", [entry]) + assert result == [] + + def test_empty_url_returns_empty(self): + assert find_entries_for_url("", [_github_entry()]) == [] + + def test_empty_entries_returns_empty(self): + assert find_entries_for_url("https://github.com/org/repo", []) == [] + + def test_multiple_matches_returned(self): + e1 = _github_entry(token_env="GH_TOKEN") + e2 = _github_entry(token_env="GITHUB_TOKEN") + result = find_entries_for_url("https://github.com/org/repo", [e1, e2]) + assert len(result) == 2 + + +# --------------------------------------------------------------------------- +# Registry mechanics +# --------------------------------------------------------------------------- + + +class TestAuthRegistry: + def test_github_registered(self): + assert "github" in AUTH_REGISTRY + + def test_azure_devops_registered(self): + assert "azure-devops" in AUTH_REGISTRY + + def test_get_provider_returns_github(self): + assert isinstance(get_provider("github"), GitHubAuth) + + def test_get_provider_returns_azure_devops(self): + assert isinstance(get_provider("azure-devops"), AzureDevOpsAuth) + + def test_get_provider_unknown_returns_none(self): + assert get_provider("does-not-exist") is None + + def test_register_duplicate_raises_key_error(self): + class _UniqueStub(_StubProvider): + key = "__test_duplicate__" + + try: + _register(_UniqueStub()) + with pytest.raises(KeyError, match="already registered"): + _register(_UniqueStub()) + finally: + AUTH_REGISTRY.pop("__test_duplicate__", None) + + def test_register_empty_key_raises_value_error(self): + class _EmptyKey(_StubProvider): + key = "" + + with pytest.raises(ValueError, match="empty key"): + _register(_EmptyKey()) + + +# --------------------------------------------------------------------------- +# GitHubAuth +# --------------------------------------------------------------------------- + + +class TestGitHubAuth: + def test_bearer_headers(self): + assert GitHubAuth().auth_headers("my-token", "bearer") == {"Authorization": "Bearer my-token"} + + def test_unsupported_scheme_raises(self): + with pytest.raises(ValueError, match="basic-pat"): + GitHubAuth().auth_headers("tok", "basic-pat") + + def test_resolve_token_from_env(self, monkeypatch): + monkeypatch.setenv("GH_TOKEN", "env-token") + assert GitHubAuth().resolve_token(_github_entry()) == "env-token" + + def test_resolve_token_inline(self): + assert GitHubAuth().resolve_token(_github_entry(token="inline-tok")) == "inline-tok" + + def test_resolve_token_strips_whitespace(self, monkeypatch): + monkeypatch.setenv("GH_TOKEN", " my-token ") + assert GitHubAuth().resolve_token(_github_entry()) == "my-token" + + def test_resolve_token_empty_env_returns_none(self, monkeypatch): + monkeypatch.setenv("GH_TOKEN", " ") + assert GitHubAuth().resolve_token(_github_entry()) is None + + def test_resolve_token_missing_env_returns_none(self, monkeypatch): + monkeypatch.delenv("GH_TOKEN", raising=False) + assert GitHubAuth().resolve_token(_github_entry()) is None + + def test_key(self): + assert GitHubAuth.key == "github" + + def test_supported_schemes(self): + assert GitHubAuth.supported_auth_schemes == ("bearer",) + + +# --------------------------------------------------------------------------- +# AzureDevOpsAuth +# --------------------------------------------------------------------------- + + +class TestAzureDevOpsAuth: + def test_basic_pat_headers(self): + headers = AzureDevOpsAuth().auth_headers("my-pat", "basic-pat") + encoded = base64.b64encode(b":my-pat").decode("ascii") + assert headers == {"Authorization": f"Basic {encoded}"} + + def test_basic_pat_format(self): + header = AzureDevOpsAuth().auth_headers("test-pat", "basic-pat")["Authorization"] + raw = base64.b64decode(header[len("Basic "):]).decode("ascii") + assert raw == ":test-pat" + + def test_bearer_headers(self): + assert AzureDevOpsAuth().auth_headers("tok", "bearer") == {"Authorization": "Bearer tok"} + + def test_azure_cli_headers(self): + assert AzureDevOpsAuth().auth_headers("tok", "azure-cli") == {"Authorization": "Bearer tok"} + + def test_azure_ad_headers(self): + assert AzureDevOpsAuth().auth_headers("tok", "azure-ad") == {"Authorization": "Bearer tok"} + + def test_unsupported_scheme_raises(self): + with pytest.raises(ValueError): + AzureDevOpsAuth().auth_headers("tok", "ntlm") + + def test_resolve_token_basic_pat(self, monkeypatch): + monkeypatch.setenv("AZURE_DEVOPS_PAT", "my-pat") + assert AzureDevOpsAuth().resolve_token(_ado_basic_entry()) == "my-pat" + + def test_resolve_token_strips_whitespace(self, monkeypatch): + monkeypatch.setenv("AZURE_DEVOPS_PAT", " my-pat ") + assert AzureDevOpsAuth().resolve_token(_ado_basic_entry()) == "my-pat" + + def test_resolve_token_missing_returns_none(self, monkeypatch): + monkeypatch.delenv("AZURE_DEVOPS_PAT", raising=False) + assert AzureDevOpsAuth().resolve_token(_ado_basic_entry()) is None + + def test_key(self): + assert AzureDevOpsAuth.key == "azure-devops" + + def test_supported_schemes(self): + schemes = AzureDevOpsAuth.supported_auth_schemes + assert "basic-pat" in schemes + assert "bearer" in schemes + assert "azure-cli" in schemes + assert "azure-ad" in schemes + + def test_resolve_token_azure_cli_success(self): + """azure-cli acquires token via az CLI.""" + from unittest.mock import patch, MagicMock + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-cli", + ) + result = MagicMock() + result.returncode = 0 + result.stdout = '{"accessToken": "cli-acquired-token"}' + with patch("specify_cli.authentication.azure_devops.subprocess.run", return_value=result): + assert AzureDevOpsAuth().resolve_token(entry) == "cli-acquired-token" + + def test_resolve_token_azure_cli_failure_returns_none(self): + """azure-cli returns None when az CLI fails.""" + from unittest.mock import patch, MagicMock + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-cli", + ) + result = MagicMock() + result.returncode = 1 + result.stdout = "" + with patch("specify_cli.authentication.azure_devops.subprocess.run", return_value=result): + assert AzureDevOpsAuth().resolve_token(entry) is None + + def test_resolve_token_azure_cli_not_installed_returns_none(self): + """azure-cli returns None when az is not installed.""" + from unittest.mock import patch + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-cli", + ) + with patch("specify_cli.authentication.azure_devops.subprocess.run", side_effect=OSError("not found")): + assert AzureDevOpsAuth().resolve_token(entry) is None + + def test_resolve_token_azure_ad_success(self, monkeypatch): + """azure-ad acquires token via OAuth2 client credentials.""" + from unittest.mock import patch, MagicMock + monkeypatch.setenv("MY_SECRET", "secret-value") + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-ad", + tenant_id="tid", client_id="cid", client_secret_env="MY_SECRET", + ) + mock_resp = MagicMock() + mock_resp.read.return_value = b'{"access_token": "ad-acquired-token"}' + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + with patch("urllib.request.urlopen", return_value=mock_resp): + assert AzureDevOpsAuth().resolve_token(entry) == "ad-acquired-token" + + def test_resolve_token_azure_ad_missing_secret_returns_none(self, monkeypatch): + """azure-ad returns None when client secret env var is missing.""" + monkeypatch.delenv("MY_SECRET", raising=False) + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-ad", + tenant_id="tid", client_id="cid", client_secret_env="MY_SECRET", + ) + assert AzureDevOpsAuth().resolve_token(entry) is None + + def test_resolve_token_azure_ad_network_error_returns_none(self, monkeypatch): + """azure-ad returns None on network errors.""" + import urllib.error + from unittest.mock import patch + monkeypatch.setenv("MY_SECRET", "secret-value") + entry = AuthConfigEntry( + hosts=("dev.azure.com",), provider="azure-devops", auth="azure-ad", + tenant_id="tid", client_id="cid", client_secret_env="MY_SECRET", + ) + with patch("urllib.request.urlopen", + side_effect=urllib.error.URLError("connection refused")): + assert AzureDevOpsAuth().resolve_token(entry) is None + + +# --------------------------------------------------------------------------- +# open_url / build_request — positive tests +# --------------------------------------------------------------------------- + + +class TestAuthenticatedHttp: + def _set_config(self, monkeypatch, entries): + from specify_cli.authentication import http as _mod + monkeypatch.setattr(_mod, "_config_override", entries) + + def test_build_request_attaches_auth_for_matching_host(self, monkeypatch): + from specify_cli.authentication.http import build_request + monkeypatch.setenv("GH_TOKEN", "my-token") + self._set_config(monkeypatch, [_github_entry()]) + req = build_request("https://github.com/org/repo") + assert req.get_header("Authorization") == "Bearer my-token" + + def test_build_request_no_auth_for_non_matching_host(self, monkeypatch): + from specify_cli.authentication.http import build_request + monkeypatch.setenv("GH_TOKEN", "my-token") + self._set_config(monkeypatch, [_github_entry()]) + req = build_request("https://evil.example.com/file") + assert "Authorization" not in req.headers + + def test_build_request_no_auth_when_no_config(self, monkeypatch): + from specify_cli.authentication.http import build_request + self._set_config(monkeypatch, []) + req = build_request("https://github.com/org/repo") + assert "Authorization" not in req.headers + + def test_build_request_extra_headers(self, monkeypatch): + from specify_cli.authentication.http import build_request + monkeypatch.setenv("GH_TOKEN", "my-token") + self._set_config(monkeypatch, [_github_entry()]) + req = build_request("https://github.com/api", extra_headers={"Accept": "application/json"}) + assert req.get_header("Accept") == "application/json" + assert req.get_header("Authorization") == "Bearer my-token" + + def test_open_url_attaches_auth_for_matching_host(self, monkeypatch): + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + monkeypatch.setenv("GH_TOKEN", "my-token") + self._set_config(monkeypatch, [_github_entry()]) + captured = {} + mock_opener = MagicMock() + def fake_open(req, timeout=None): + captured["req"] = req + resp = MagicMock(); resp.__enter__ = lambda s: s; resp.__exit__ = MagicMock(return_value=False) + return resp + mock_opener.open.side_effect = fake_open + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): + open_url("https://github.com/org/repo/catalog.json") + assert captured["req"].get_header("Authorization") == "Bearer my-token" + + def test_open_url_no_auth_for_non_matching_host(self, monkeypatch): + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + monkeypatch.setenv("GH_TOKEN", "my-token") + self._set_config(monkeypatch, [_github_entry()]) + captured = {} + def fake_urlopen(req, timeout=None): + captured["req"] = req + resp = MagicMock(); resp.__enter__ = lambda s: s; resp.__exit__ = MagicMock(return_value=False) + return resp + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=fake_urlopen): + open_url("https://example.com/file.json") + assert captured["req"].get_header("Authorization") is None + + def test_open_url_no_auth_when_no_config(self, monkeypatch): + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + self._set_config(monkeypatch, []) + captured = {} + def fake_urlopen(req, timeout=None): + captured["req"] = req + resp = MagicMock(); resp.__enter__ = lambda s: s; resp.__exit__ = MagicMock(return_value=False) + return resp + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=fake_urlopen): + open_url("https://github.com/org/repo") + assert captured["req"].get_header("Authorization") is None + + def test_open_url_falls_through_on_401(self, monkeypatch): + import urllib.error + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + monkeypatch.setenv("GH_TOKEN", "bad-token") + self._set_config(monkeypatch, [_github_entry()]) + call_count = 0 + def fake_side_effect(req, timeout=None): + nonlocal call_count; call_count += 1 + if call_count == 1: + raise urllib.error.HTTPError("url", 401, "Unauthorized", {}, None) + resp = MagicMock(); resp.__enter__ = lambda s: s; resp.__exit__ = MagicMock(return_value=False) + return resp + mock_opener = MagicMock(); mock_opener.open.side_effect = fake_side_effect + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener), \ + patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=fake_side_effect): + open_url("https://github.com/org/repo") + assert call_count == 2 + + +# --------------------------------------------------------------------------- +# open_url — negative tests +# --------------------------------------------------------------------------- + + +class TestAuthenticatedHttpNegative: + def _set_config(self, monkeypatch, entries): + from specify_cli.authentication import http as _mod + monkeypatch.setattr(_mod, "_config_override", entries) + + def test_500_raises_immediately(self, monkeypatch): + import urllib.error + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + monkeypatch.setenv("GH_TOKEN", "tok") + self._set_config(monkeypatch, [_github_entry()]) + mock_opener = MagicMock() + mock_opener.open.side_effect = urllib.error.HTTPError("url", 500, "ISE", {}, None) + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): + with pytest.raises(urllib.error.HTTPError, match="500"): + open_url("https://github.com/org/repo") + + def test_404_raises_immediately(self, monkeypatch): + import urllib.error + from unittest.mock import MagicMock, patch + from specify_cli.authentication.http import open_url + monkeypatch.setenv("GH_TOKEN", "tok") + self._set_config(monkeypatch, [_github_entry()]) + mock_opener = MagicMock() + mock_opener.open.side_effect = urllib.error.HTTPError("url", 404, "Not Found", {}, None) + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): + with pytest.raises(urllib.error.HTTPError, match="404"): + open_url("https://github.com/org/repo") + + def test_urlerror_propagates(self, monkeypatch): + import urllib.error + from unittest.mock import patch + from specify_cli.authentication.http import open_url + self._set_config(monkeypatch, []) + with patch("specify_cli.authentication.http.urllib.request.urlopen", + side_effect=urllib.error.URLError("refused")): + with pytest.raises(urllib.error.URLError): + open_url("https://example.com/file") + + def test_timeout_propagates(self, monkeypatch): + import socket + from unittest.mock import patch + from specify_cli.authentication.http import open_url + self._set_config(monkeypatch, []) + with patch("specify_cli.authentication.http.urllib.request.urlopen", + side_effect=socket.timeout("timed out")): + with pytest.raises(socket.timeout): + open_url("https://example.com/file") + + +# --------------------------------------------------------------------------- +# Redirect stripping +# --------------------------------------------------------------------------- + + +class TestRedirectStripping: + def test_redirect_within_hosts_preserves_auth(self): + from specify_cli.authentication.http import _StripAuthOnRedirect + from urllib.request import Request + import io + handler = _StripAuthOnRedirect(("github.com", "codeload.github.com")) + req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"}) + new_req = handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {}, + "https://codeload.github.com/org/repo/zip") + assert new_req is not None + auth = new_req.get_header("Authorization") or new_req.unredirected_hdrs.get("Authorization") + assert auth == "Bearer tok" + + def test_redirect_outside_hosts_strips_auth(self): + from specify_cli.authentication.http import _StripAuthOnRedirect + from urllib.request import Request + import io + handler = _StripAuthOnRedirect(("github.com",)) + req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"}) + new_req = handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {}, + "https://objects.githubusercontent.com/asset") + assert new_req is not None + assert new_req.headers.get("Authorization") is None + assert new_req.unredirected_hdrs.get("Authorization") is None + + def test_multi_hop_redirect_within_hosts_preserves_auth(self): + """Auth survives a multi-hop redirect chain within allowed hosts.""" + from specify_cli.authentication.http import _StripAuthOnRedirect + from urllib.request import Request + import io + hosts = ("github.com", "codeload.github.com", "objects-origin.githubusercontent.com") + handler = _StripAuthOnRedirect(hosts) + + # First hop: github.com → codeload.github.com + req1 = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"}) + req2 = handler.redirect_request(req1, io.BytesIO(b""), 302, "Found", {}, + "https://codeload.github.com/org/repo/zip") + assert req2 is not None + auth2 = req2.get_header("Authorization") or req2.unredirected_hdrs.get("Authorization") + assert auth2 == "Bearer tok" + + # Second hop: codeload.github.com → objects-origin.githubusercontent.com + req3 = handler.redirect_request(req2, io.BytesIO(b""), 302, "Found", {}, + "https://objects-origin.githubusercontent.com/asset") + assert req3 is not None + auth3 = req3.get_header("Authorization") or req3.unredirected_hdrs.get("Authorization") + assert auth3 == "Bearer tok" + + +# --------------------------------------------------------------------------- +# _fetch_latest_release_tag delegation +# --------------------------------------------------------------------------- + + +class TestFetchLatestReleaseTagDelegation: + def _set_config(self, monkeypatch, entries): + from specify_cli.authentication import http as _mod + monkeypatch.setattr(_mod, "_config_override", entries) + + def _capture_request(self): + import json as _json + from unittest.mock import MagicMock + captured: dict = {} + def side_effect(req, timeout=None): + captured["request"] = req + body = _json.dumps({"tag_name": "v9.9.9"}).encode() + resp = MagicMock(); resp.read.return_value = body + cm = MagicMock(); cm.__enter__.return_value = resp; cm.__exit__.return_value = False + return cm + return captured, side_effect + + def test_gh_token_forwarded_when_configured(self, monkeypatch): + from unittest.mock import MagicMock, patch + from specify_cli import _fetch_latest_release_tag + monkeypatch.setenv("GH_TOKEN", "forwarded-sentinel") + self._set_config(monkeypatch, [_github_entry()]) + captured, side_effect = self._capture_request() + mock_opener = MagicMock(); mock_opener.open.side_effect = side_effect + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): + _fetch_latest_release_tag() + assert captured["request"].get_header("Authorization") == "Bearer forwarded-sentinel" + + def test_no_config_means_no_auth(self, monkeypatch): + from unittest.mock import patch + from specify_cli import _fetch_latest_release_tag + self._set_config(monkeypatch, []) + captured, side_effect = self._capture_request() + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect): + _fetch_latest_release_tag() + assert captured["request"].get_header("Authorization") is None + + def test_accept_header_present(self, monkeypatch): + from unittest.mock import patch + from specify_cli import _fetch_latest_release_tag + self._set_config(monkeypatch, []) + captured, side_effect = self._capture_request() + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect): + _fetch_latest_release_tag() + assert captured["request"].get_header("Accept") == "application/vnd.github+json" diff --git a/tests/test_extensions.py b/tests/test_extensions.py index c5be0ab4f3..1434ba309d 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -2453,6 +2453,10 @@ def _make_catalog(self, temp_dir): (project_dir / ".specify").mkdir() return ExtensionCatalog(project_dir) + def _inject_github_config(self, monkeypatch, token_env="GH_TOKEN"): + from tests.auth_helpers import inject_github_config + inject_github_config(monkeypatch, token_env) + def test_make_request_no_token_no_auth_header(self, temp_dir, monkeypatch): """Without a token, requests carry no Authorization header.""" monkeypatch.delenv("GITHUB_TOKEN", raising=False) @@ -2473,6 +2477,7 @@ def test_make_request_whitespace_github_token_falls_back_to_gh_token(self, temp_ """When GITHUB_TOKEN is whitespace-only, GH_TOKEN is used as fallback.""" monkeypatch.setenv("GITHUB_TOKEN", " ") monkeypatch.setenv("GH_TOKEN", "ghp_fallback") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://raw.githubusercontent.com/org/repo/main/catalog.json") assert req.get_header("Authorization") == "Bearer ghp_fallback" @@ -2481,6 +2486,7 @@ def test_make_request_github_token_added_for_raw_githubusercontent(self, temp_di """GITHUB_TOKEN is attached for raw.githubusercontent.com URLs.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") monkeypatch.delenv("GH_TOKEN", raising=False) + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://raw.githubusercontent.com/org/repo/main/catalog.json") assert req.get_header("Authorization") == "Bearer ghp_testtoken" @@ -2489,49 +2495,40 @@ def test_make_request_gh_token_fallback(self, temp_dir, monkeypatch): """GH_TOKEN is used when GITHUB_TOKEN is absent.""" monkeypatch.delenv("GITHUB_TOKEN", raising=False) monkeypatch.setenv("GH_TOKEN", "ghp_ghtoken") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://github.com/org/repo/releases/download/v1/ext.zip") assert req.get_header("Authorization") == "Bearer ghp_ghtoken" - def test_make_request_github_token_takes_precedence_over_gh_token(self, temp_dir, monkeypatch): - """GITHUB_TOKEN takes precedence over GH_TOKEN when both are set.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_primary") - monkeypatch.setenv("GH_TOKEN", "ghp_secondary") + def test_make_request_gh_token_takes_precedence_over_github_token(self, temp_dir, monkeypatch): + """When auth.json uses GH_TOKEN, that token is used regardless of GITHUB_TOKEN.""" + monkeypatch.setenv("GITHUB_TOKEN", "ghp_secondary") + monkeypatch.setenv("GH_TOKEN", "ghp_primary") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://api.github.com/repos/org/repo") assert req.get_header("Authorization") == "Bearer ghp_primary" - def test_make_request_token_not_added_for_non_github_url(self, temp_dir, monkeypatch): - """Auth header is never attached to non-GitHub URLs to prevent credential leakage.""" + def test_make_request_no_auth_for_non_matching_host(self, temp_dir, monkeypatch): + """Auth is NOT attached to hosts not listed in auth.json.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://internal.example.com/catalog.json") assert "Authorization" not in req.headers - def test_make_request_token_not_added_for_github_lookalike_host(self, temp_dir, monkeypatch): - """Auth header is not attached to hosts that include github.com as a suffix.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") - catalog = self._make_catalog(temp_dir) - req = catalog._make_request("https://github.com.evil.com/org/repo/releases/download/v1/ext.zip") - assert "Authorization" not in req.headers - - def test_make_request_token_not_added_for_github_in_path(self, temp_dir, monkeypatch): - """Auth header is not attached when github.com appears only in the URL path.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") - catalog = self._make_catalog(temp_dir) - req = catalog._make_request("https://evil.example.com/github.com/org/repo/releases/download/v1/ext.zip") - assert "Authorization" not in req.headers - - def test_make_request_token_not_added_for_github_in_query(self, temp_dir, monkeypatch): - """Auth header is not attached when github.com appears only in the query string.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + def test_make_request_no_auth_when_no_config(self, temp_dir, monkeypatch): + """No auth header when no auth.json config exists.""" + monkeypatch.delenv("GITHUB_TOKEN", raising=False) + monkeypatch.delenv("GH_TOKEN", raising=False) catalog = self._make_catalog(temp_dir) - req = catalog._make_request("https://evil.example.com/download?source=https://github.com/org/repo/v1/ext.zip") + req = catalog._make_request("https://github.com/org/repo/releases/download/v1/ext.zip") assert "Authorization" not in req.headers def test_make_request_token_added_for_api_github_com(self, temp_dir, monkeypatch): """GITHUB_TOKEN is attached for api.github.com URLs.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://api.github.com/repos/org/repo/releases/assets/1") assert req.get_header("Authorization") == "Bearer ghp_testtoken" @@ -2539,49 +2536,17 @@ def test_make_request_token_added_for_api_github_com(self, temp_dir, monkeypatch def test_make_request_token_added_for_codeload_github_com(self, temp_dir, monkeypatch): """GITHUB_TOKEN is attached for codeload.github.com URLs (GitHub archive redirects).""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) req = catalog._make_request("https://codeload.github.com/org/repo/zip/refs/tags/v1.0.0") assert req.get_header("Authorization") == "Bearer ghp_testtoken" - def test_redirect_preserves_auth_for_github_to_codeload(self): - """Auth header is preserved when GitHub redirects to codeload.github.com.""" - from specify_cli._github_http import _StripAuthOnRedirect - from urllib.request import Request - import io - - handler = _StripAuthOnRedirect() - original_url = "https://github.com/org/repo/archive/refs/tags/v1.zip" - redirect_url = "https://codeload.github.com/org/repo/zip/refs/tags/v1" - req = Request(original_url, headers={"Authorization": "Bearer ghp_test"}) - fp = io.BytesIO(b"") - new_req = handler.redirect_request(req, fp, 302, "Found", {}, redirect_url) - assert new_req is not None - auth = new_req.get_header("Authorization") or new_req.unredirected_hdrs.get("Authorization") - assert auth == "Bearer ghp_test" - - def test_redirect_strips_auth_for_github_to_external(self): - """Auth header is stripped when GitHub redirects to a non-GitHub host.""" - from specify_cli._github_http import _StripAuthOnRedirect - from urllib.request import Request - import io - - handler = _StripAuthOnRedirect() - original_url = "https://github.com/org/repo/releases/download/v1/asset.zip" - redirect_url = "https://objects.githubusercontent.com/github-production-release-asset/12345" - req = Request(original_url, headers={"Authorization": "Bearer ghp_test"}) - fp = io.BytesIO(b"") - new_req = handler.redirect_request(req, fp, 302, "Found", {}, redirect_url) - assert new_req is not None - auth_header = new_req.headers.get("Authorization") - auth_unredirected = new_req.unredirected_hdrs.get("Authorization") - assert auth_header is None - assert auth_unredirected is None - def test_fetch_single_catalog_sends_auth_header(self, temp_dir, monkeypatch): - """_fetch_single_catalog passes Authorization header via opener for GitHub URLs.""" + """_fetch_single_catalog passes Authorization header when a provider is configured.""" from unittest.mock import patch, MagicMock monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) catalog_data = {"schema_version": "1.0", "extensions": {}} @@ -2589,6 +2554,7 @@ def test_fetch_single_catalog_sends_auth_header(self, temp_dir, monkeypatch): mock_response.read.return_value = json.dumps(catalog_data).encode() mock_response.__enter__ = lambda s: s mock_response.__exit__ = MagicMock(return_value=False) + mock_response.geturl.return_value = "https://raw.githubusercontent.com/org/repo/main/catalog.json" captured = {} mock_opener = MagicMock() @@ -2606,17 +2572,18 @@ def fake_open(req, timeout=None): install_allowed=True, ) - with patch("urllib.request.build_opener", return_value=mock_opener): + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): catalog._fetch_single_catalog(entry, force_refresh=True) assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch): - """download_extension passes Authorization header via opener for GitHub URLs.""" + """download_extension passes Authorization header when a provider is configured.""" from unittest.mock import patch, MagicMock import zipfile, io monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = self._make_catalog(temp_dir) # Build a minimal valid ZIP in memory @@ -2631,7 +2598,6 @@ def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch): mock_response.__exit__ = MagicMock(return_value=False) captured = {} - mock_opener = MagicMock() def fake_open(req, timeout=None): @@ -2648,7 +2614,7 @@ def fake_open(req, timeout=None): } with patch.object(catalog, "get_extension_info", return_value=ext_info), \ - patch("urllib.request.build_opener", return_value=mock_opener): + patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): catalog.download_extension("test-ext", target_dir=temp_dir) assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" diff --git a/tests/test_presets.py b/tests/test_presets.py index 4b167ed9be..f74cd59bf4 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -1223,6 +1223,10 @@ def test_same_priority_sorted_alphabetically(self, project_dir): class TestPresetCatalog: """Test template catalog functionality.""" + def _inject_github_config(self, monkeypatch, token_env="GH_TOKEN"): + from tests.auth_helpers import inject_github_config + inject_github_config(monkeypatch, token_env) + def test_default_catalog_url(self, project_dir): """Test default catalog URL.""" catalog = PresetCatalog(project_dir) @@ -1417,6 +1421,7 @@ def test_make_request_whitespace_github_token_falls_back_to_gh_token(self, proje """When GITHUB_TOKEN is whitespace-only, GH_TOKEN is used as fallback.""" monkeypatch.setenv("GITHUB_TOKEN", " ") monkeypatch.setenv("GH_TOKEN", "ghp_fallback") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://raw.githubusercontent.com/org/repo/main/catalog.json") assert req.get_header("Authorization") == "Bearer ghp_fallback" @@ -1425,6 +1430,7 @@ def test_make_request_github_token_added_for_github_url(self, project_dir, monke """GITHUB_TOKEN is attached for raw.githubusercontent.com URLs.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") monkeypatch.delenv("GH_TOKEN", raising=False) + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://raw.githubusercontent.com/org/repo/main/catalog.json") assert req.get_header("Authorization") == "Bearer ghp_testtoken" @@ -1433,58 +1439,50 @@ def test_make_request_gh_token_fallback(self, project_dir, monkeypatch): """GH_TOKEN is used when GITHUB_TOKEN is absent.""" monkeypatch.delenv("GITHUB_TOKEN", raising=False) monkeypatch.setenv("GH_TOKEN", "ghp_ghtoken") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://github.com/org/repo/releases/download/v1/pack.zip") assert req.get_header("Authorization") == "Bearer ghp_ghtoken" - def test_make_request_github_token_takes_precedence(self, project_dir, monkeypatch): - """GITHUB_TOKEN takes precedence over GH_TOKEN when both are set.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_primary") - monkeypatch.setenv("GH_TOKEN", "ghp_secondary") + def test_make_request_gh_token_takes_precedence(self, project_dir, monkeypatch): + """When auth.json uses GH_TOKEN, that token is used regardless of GITHUB_TOKEN.""" + monkeypatch.setenv("GITHUB_TOKEN", "ghp_secondary") + monkeypatch.setenv("GH_TOKEN", "ghp_primary") + self._inject_github_config(monkeypatch, token_env="GH_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://api.github.com/repos/org/repo") assert req.get_header("Authorization") == "Bearer ghp_primary" def test_make_request_token_added_for_codeload_github_com(self, project_dir, monkeypatch): - """GITHUB_TOKEN is attached for codeload.github.com URLs (GitHub archive redirects).""" + """GITHUB_TOKEN is attached for codeload.github.com URLs.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://codeload.github.com/org/repo/zip/refs/tags/v1.0.0") assert req.get_header("Authorization") == "Bearer ghp_testtoken" - def test_make_request_token_not_added_for_non_github_url(self, project_dir, monkeypatch): - """Auth header is never attached to non-GitHub URLs to prevent credential leakage.""" + def test_make_request_no_auth_for_non_matching_host(self, project_dir, monkeypatch): + """Auth is NOT attached to hosts not listed in auth.json.""" monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = PresetCatalog(project_dir) req = catalog._make_request("https://internal.example.com/catalog.json") assert "Authorization" not in req.headers - def test_make_request_token_not_added_for_github_lookalike_host(self, project_dir, monkeypatch): - """Auth header is not attached to hosts that include github.com as a suffix.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") - catalog = PresetCatalog(project_dir) - req = catalog._make_request("https://github.com.evil.com/org/repo/releases/download/v1/pack.zip") - assert "Authorization" not in req.headers - - def test_make_request_token_not_added_for_github_in_path(self, project_dir, monkeypatch): - """Auth header is not attached when github.com appears only in the URL path.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") - catalog = PresetCatalog(project_dir) - req = catalog._make_request("https://evil.example.com/github.com/org/repo/releases/download/v1/pack.zip") - assert "Authorization" not in req.headers - - def test_make_request_token_not_added_for_github_in_query(self, project_dir, monkeypatch): - """Auth header is not attached when github.com appears only in the query string.""" - monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + def test_make_request_no_auth_when_no_config(self, project_dir, monkeypatch): + """No auth header when no auth.json config exists.""" + monkeypatch.delenv("GITHUB_TOKEN", raising=False) + monkeypatch.delenv("GH_TOKEN", raising=False) catalog = PresetCatalog(project_dir) - req = catalog._make_request("https://evil.example.com/download?source=https://github.com/org/repo/v1/pack.zip") + req = catalog._make_request("https://github.com/org/repo/releases/download/v1/pack.zip") assert "Authorization" not in req.headers def test_fetch_single_catalog_sends_auth_header(self, project_dir, monkeypatch): - """_fetch_single_catalog passes Authorization header via opener for GitHub URLs.""" + """_fetch_single_catalog passes Authorization header when configured.""" from unittest.mock import patch, MagicMock monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = PresetCatalog(project_dir) catalog_data = {"schema_version": "1.0", "presets": {}} @@ -1492,6 +1490,7 @@ def test_fetch_single_catalog_sends_auth_header(self, project_dir, monkeypatch): mock_response.read.return_value = json.dumps(catalog_data).encode() mock_response.__enter__ = lambda s: s mock_response.__exit__ = MagicMock(return_value=False) + mock_response.geturl.return_value = "https://raw.githubusercontent.com/org/repo/main/presets/catalog.json" captured = {} mock_opener = MagicMock() @@ -1509,16 +1508,17 @@ def fake_open(req, timeout=None): install_allowed=True, ) - with patch("urllib.request.build_opener", return_value=mock_opener): + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): catalog._fetch_single_catalog(entry, force_refresh=True) assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" def test_download_pack_sends_auth_header(self, project_dir, monkeypatch): - """download_pack passes Authorization header via opener for GitHub URLs.""" + """download_pack passes Authorization header when configured.""" from unittest.mock import patch, MagicMock monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken") + self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") catalog = PresetCatalog(project_dir) import io @@ -1550,7 +1550,7 @@ def fake_open(req, timeout=None): } with patch.object(catalog, "get_pack_info", return_value=pack_info), \ - patch("urllib.request.build_opener", return_value=mock_opener): + patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): catalog.download_pack("test-pack", target_dir=project_dir) assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" diff --git a/tests/test_upgrade.py b/tests/test_upgrade.py index 28a0ce6414..1e85805db8 100644 --- a/tests/test_upgrade.py +++ b/tests/test_upgrade.py @@ -69,7 +69,7 @@ def test_stub_makes_no_network_call(self): # If the stub ever starts calling urllib, this patch's side_effect # would fire and the assertion below would fail. with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=AssertionError("stub must not hit the network"), ): result = runner.invoke(app, ["self", "upgrade"]) @@ -138,7 +138,7 @@ def test_empty_string_passthrough(self): class TestUserStory1: def test_newer_available_prints_update_and_install_command(self): with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", return_value=_mock_urlopen_response({"tag_name": "v0.9.0"}), ): result = runner.invoke(app, ["self", "check"]) @@ -151,7 +151,7 @@ def test_newer_available_prints_update_and_install_command(self): def test_up_to_date_prints_current_only(self): with patch("specify_cli._get_installed_version", return_value="0.9.0"), patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", return_value=_mock_urlopen_response({"tag_name": "v0.9.0"}), ): result = runner.invoke(app, ["self", "check"]) @@ -163,7 +163,7 @@ def test_up_to_date_prints_current_only(self): def test_dev_build_ahead_of_release_is_up_to_date(self): with patch("specify_cli._get_installed_version", return_value="0.7.5.dev0"), patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", return_value=_mock_urlopen_response({"tag_name": "v0.7.4"}), ): result = runner.invoke(app, ["self", "check"]) @@ -174,7 +174,7 @@ def test_dev_build_ahead_of_release_is_up_to_date(self): def test_unknown_installed_still_prints_latest_and_reinstall(self): with patch("specify_cli._get_installed_version", return_value="unknown"), patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", return_value=_mock_urlopen_response({"tag_name": "v0.7.4"}), ): result = runner.invoke(app, ["self", "check"]) @@ -186,7 +186,7 @@ def test_unknown_installed_still_prints_latest_and_reinstall(self): def test_unparseable_tag_routes_to_indeterminate(self): with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", return_value=_mock_urlopen_response({"tag_name": "not-a-version"}), ): result = runner.invoke(app, ["self", "check"]) @@ -200,7 +200,7 @@ def test_unparseable_tag_routes_to_indeterminate(self): class TestFailureCategorization: def test_urlerror_maps_to_offline(self): with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=urllib.error.URLError("no route to host"), ): tag, reason = _fetch_latest_release_tag() @@ -209,7 +209,7 @@ def test_urlerror_maps_to_offline(self): def test_timeout_maps_to_offline(self): with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=TimeoutError(), ): tag, reason = _fetch_latest_release_tag() @@ -218,17 +218,17 @@ def test_timeout_maps_to_offline(self): def test_403_maps_to_rate_limited(self): with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=_http_error(403, "rate limited"), ): tag, reason = _fetch_latest_release_tag() assert tag is None - assert reason == "rate limited (try setting GH_TOKEN or GITHUB_TOKEN)" + assert reason == "rate limited (try setting GH_TOKEN and configuring ~/.specify/auth.json)" @pytest.mark.parametrize("code", [404, 500, 502]) def test_other_http_uses_code_string(self, code): with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=_http_error(code, "oops"), ): tag, reason = _fetch_latest_release_tag() @@ -238,7 +238,7 @@ def test_other_http_uses_code_string(self, code): def test_generic_exception_propagates(self): # Per research D-006, no catch-all exists; RuntimeError MUST bubble. with patch( - "specify_cli.urllib.request.urlopen", + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=RuntimeError("boom"), ): with pytest.raises(RuntimeError): @@ -247,7 +247,7 @@ def test_generic_exception_propagates(self): _FAILURE_CASES = [ ("offline or timeout", urllib.error.URLError("down")), - ("rate limited (try setting GH_TOKEN or GITHUB_TOKEN)", _http_error(403)), + ("rate limited (try setting GH_TOKEN and configuring ~/.specify/auth.json)", _http_error(403)), ("HTTP 500", _http_error(500)), ] @@ -258,22 +258,22 @@ def test_failure_prints_installed_plus_one_line_reason( self, expected_reason, side_effect ): with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", side_effect=side_effect + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect ): result = runner.invoke(app, ["self", "check"]) output = strip_ansi(result.output) assert "Installed: 0.7.4" in output - if expected_reason == "rate limited (try setting GH_TOKEN or GITHUB_TOKEN)": + if expected_reason == "rate limited (try setting GH_TOKEN and configuring ~/.specify/auth.json)": assert "Could not check latest release: rate limited" in output assert "GH_TOKEN" in output - assert "GITHUB_TOKEN" in output + assert "auth.json" in output else: assert f"Could not check latest release: {expected_reason}" in output @pytest.mark.parametrize("_expected_reason, side_effect", _FAILURE_CASES) def test_failure_exits_zero(self, _expected_reason, side_effect): with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", side_effect=side_effect + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect ): result = runner.invoke(app, ["self", "check"]) assert result.exit_code == 0 @@ -283,7 +283,7 @@ def test_failure_output_contains_no_traceback_no_url( self, _expected_reason, side_effect ): with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", side_effect=side_effect + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect ): result = runner.invoke(app, ["self", "check"]) combined = (result.output or "") + (result.stderr or "") @@ -302,12 +302,20 @@ def _side_effect(req, timeout=None): return captured, _side_effect +def _inject_github_config(monkeypatch, token_env="GH_TOKEN"): + from tests.auth_helpers import inject_github_config + inject_github_config(monkeypatch, token_env) + + class TestUserStory3: def test_gh_token_attached_as_bearer_header(self, monkeypatch): monkeypatch.setenv("GH_TOKEN", SENTINEL_GH_TOKEN) monkeypatch.delenv("GITHUB_TOKEN", raising=False) + _inject_github_config(monkeypatch, token_env="GH_TOKEN") captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + mock_opener = MagicMock() + mock_opener.open.side_effect = side_effect + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") == f"Bearer {SENTINEL_GH_TOKEN}" @@ -315,8 +323,11 @@ def test_gh_token_attached_as_bearer_header(self, monkeypatch): def test_github_token_used_when_gh_token_unset(self, monkeypatch): monkeypatch.delenv("GH_TOKEN", raising=False) monkeypatch.setenv("GITHUB_TOKEN", SENTINEL_GITHUB_TOKEN) + _inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + mock_opener = MagicMock() + mock_opener.open.side_effect = side_effect + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") == f"Bearer {SENTINEL_GITHUB_TOKEN}" @@ -325,7 +336,7 @@ def test_no_authorization_header_when_both_unset(self, monkeypatch): monkeypatch.delenv("GH_TOKEN", raising=False) monkeypatch.delenv("GITHUB_TOKEN", raising=False) captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") is None @@ -333,8 +344,9 @@ def test_no_authorization_header_when_both_unset(self, monkeypatch): def test_empty_string_gh_token_treated_as_unset(self, monkeypatch): monkeypatch.setenv("GH_TOKEN", "") monkeypatch.delenv("GITHUB_TOKEN", raising=False) + _inject_github_config(monkeypatch, token_env="GH_TOKEN") captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") is None @@ -342,8 +354,9 @@ def test_empty_string_gh_token_treated_as_unset(self, monkeypatch): def test_whitespace_only_gh_token_treated_as_unset(self, monkeypatch): monkeypatch.setenv("GH_TOKEN", " ") monkeypatch.delenv("GITHUB_TOKEN", raising=False) + _inject_github_config(monkeypatch, token_env="GH_TOKEN") captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + with patch("specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") is None @@ -351,8 +364,11 @@ def test_whitespace_only_gh_token_treated_as_unset(self, monkeypatch): def test_whitespace_only_gh_token_falls_back_to_github_token(self, monkeypatch): monkeypatch.setenv("GH_TOKEN", " ") monkeypatch.setenv("GITHUB_TOKEN", SENTINEL_GITHUB_TOKEN) + _inject_github_config(monkeypatch, token_env="GITHUB_TOKEN") captured, side_effect = _capture_request_via_urlopen() - with patch("specify_cli.urllib.request.urlopen", side_effect=side_effect): + mock_opener = MagicMock() + mock_opener.open.side_effect = side_effect + with patch("specify_cli.authentication.http.urllib.request.build_opener", return_value=mock_opener): _fetch_latest_release_tag() req = captured["request"] assert req.get_header("Authorization") == f"Bearer {SENTINEL_GITHUB_TOKEN}" @@ -364,7 +380,7 @@ def test_gh_token_never_appears_in_failure_output( monkeypatch.setenv("GH_TOKEN", SENTINEL_GH_TOKEN) monkeypatch.delenv("GITHUB_TOKEN", raising=False) with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", side_effect=side_effect + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect ): result = runner.invoke(app, ["self", "check"]) combined = strip_ansi((result.output or "") + (result.stderr or "")) @@ -377,7 +393,7 @@ def test_github_token_never_appears_in_failure_output( monkeypatch.delenv("GH_TOKEN", raising=False) monkeypatch.setenv("GITHUB_TOKEN", SENTINEL_GITHUB_TOKEN) with patch("specify_cli._get_installed_version", return_value="0.7.4"), patch( - "specify_cli.urllib.request.urlopen", side_effect=side_effect + "specify_cli.authentication.http.urllib.request.urlopen", side_effect=side_effect ): result = runner.invoke(app, ["self", "check"]) combined = strip_ansi((result.output or "") + (result.stderr or ""))