Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/python/ess-auth/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[project]
name = "ess-auth"
version = "0.1.0"
description = "OAuth2 client-credentials helper and JWT verification utilities"
requires-python = ">=3.12,<3.13"
dependencies = ["httpx>=0.27", "PyJWT>=2.0.0"]

[project.optional-dependencies]
jwks = ["PyJWT[crypto]>=2.0.0"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/ess_auth"]
12 changes: 12 additions & 0 deletions packages/python/ess-auth/src/ess_auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""OAuth2 client-credentials authentication and JWT verification utilities."""

from .jwt import JWKSVerifier, decode_jwt, generate_jwt
from .token_helper import AuthenticationError, TokenHelper

__all__ = [
"AuthenticationError",
"JWKSVerifier",
"TokenHelper",
"decode_jwt",
"generate_jwt",
]
3 changes: 3 additions & 0 deletions packages/python/ess-auth/src/ess_auth/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Default configuration constants."""

EXPIRATION_WINDOW = 120 # seconds before expiry to trigger refresh
187 changes: 187 additions & 0 deletions packages/python/ess-auth/src/ess_auth/jwt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""JWT token generation, decoding, and JWKS verification."""

from __future__ import annotations

import logging
import time

import jwt as pyjwt

try:
from jwt import PyJWKClient as _PyJWKClient
except ImportError:
_PyJWKClient = None # type: ignore[assignment,misc]

from .token_helper import AuthenticationError

logger = logging.getLogger(__name__)

_DEFAULT_ALGORITHM = "HS256"
_SECONDS_PER_DAY = 86400


def generate_jwt(
subject: str,
secret: str,
*,
expires_days: int = 30,
algorithm: str = _DEFAULT_ALGORITHM,
**extra_claims: object,
) -> str:
"""Generate an HS256 JWT.

Parameters
----------
subject:
The ``sub`` claim — identifies the token holder (e.g., a user ID).
secret:
The signing secret (must match ``AUTH_SECRET`` on the server).
expires_days:
Token lifetime in days (default: 30).
algorithm:
JWT algorithm (default: HS256).
**extra_claims:
Additional claims merged into the payload.

Returns
-------
str
The encoded JWT string.
"""
now = int(time.time())
payload: dict[str, object] = {
"sub": subject,
"iat": now,
"exp": now + expires_days * _SECONDS_PER_DAY,
**extra_claims,
}
return pyjwt.encode(payload, secret, algorithm=algorithm)


def decode_jwt(
token: str,
secret: str,
*,
algorithm: str = _DEFAULT_ALGORITHM,
) -> dict:
"""Decode and verify an HS256 JWT.

Parameters
----------
token:
The encoded JWT string.
secret:
The signing secret used to verify the signature.
algorithm:
Expected JWT algorithm (default: HS256).

Returns
-------
dict
The decoded payload.

Raises
------
jwt.InvalidTokenError
If the token is invalid, expired, or the signature doesn't match.
"""
return pyjwt.decode(token, secret, algorithms=[algorithm])


class JWKSVerifier:
"""Verify JWTs against a JWKS endpoint (RS256) with optional HS256 fallback.

Parameters
----------
jwks_uri:
URL of the JWKS endpoint for RS256 public key discovery.
issuer:
Expected ``iss`` claim value.
audience:
Expected ``aud`` claim value.
hs256_secret:
Optional shared secret for HS256 fallback. When *None*, only RS256
is attempted.
"""

def __init__(
self,
jwks_uri: str,
issuer: str,
audience: str,
*,
hs256_secret: str | None = None,
) -> None:
if _PyJWKClient is None:
msg = (
"JWKS verification requires the 'cryptography' package. "
"Install it with: pip install 'ess-auth[jwks]'"
)
raise ImportError(msg)
self._jwks_client = _PyJWKClient(jwks_uri, cache_keys=True)
self._issuer = issuer
self._audience = audience
self._hs256_secret = hs256_secret or None

def verify(self, token: str) -> dict:
"""Decode and verify a JWT token.

Tries RS256 (JWKS) first. If that fails and an HS256 secret is
configured, falls back to HS256 verification.

Returns
-------
dict
The decoded JWT payload.

Raises
------
AuthenticationError
If verification fails for all configured methods.
"""
try:
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
return pyjwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=self._audience,
issuer=self._issuer,
)
except (pyjwt.InvalidTokenError, pyjwt.PyJWKClientError):
logger.debug("RS256 verification failed, trying fallback", exc_info=True)

if self._hs256_secret:
try:
return pyjwt.decode(token, self._hs256_secret, algorithms=["HS256"])
except pyjwt.InvalidTokenError:
pass

raise AuthenticationError("Invalid token")

@staticmethod
def extract_token(
headers: dict[bytes, bytes],
authorization: str | None = None,
) -> str:
"""Extract raw JWT from HTTP headers.

Checks ``authorization`` first (parsed ``Authorization: Bearer`` value),
then falls back to the ``x-api-key`` header.

Raises
------
AuthenticationError
If no token is found.
"""
token_value = authorization
if not token_value:
api_key = headers.get(b"x-api-key", b"").decode()
if api_key:
token_value = api_key
if not token_value:
raise AuthenticationError("Missing authorization")
scheme, _, value = token_value.strip().partition(" ")
if scheme.lower() == "bearer" and value:
return value.strip()
return token_value.strip()
112 changes: 112 additions & 0 deletions packages/python/ess-auth/src/ess_auth/token_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""OAuth2 Client Credentials token helper with automatic caching and refresh."""

from __future__ import annotations

import logging
import time

import httpx

from .config import EXPIRATION_WINDOW

logger = logging.getLogger(__name__)


class AuthenticationError(Exception):
"""Raised when a token request fails."""


class TokenHelper:
"""Manages OAuth2 client-credentials tokens with caching and auto-refresh.

Tokens are cached and transparently refreshed when they are within
``refresh_margin`` seconds of expiry.

Parameters
----------
client_id:
OAuth2 client ID.
secret:
OAuth2 client secret.
token_url:
Token endpoint URL.
refresh_margin:
Seconds before expiry to trigger a proactive refresh.
"""

def __init__(
self,
client_id: str,
secret: str,
*,
token_url: str,
refresh_margin: int = EXPIRATION_WINDOW,
) -> None:
self._client_id = client_id
self._secret = secret
self._token_url = token_url
self._refresh_margin = refresh_margin

self._access_token: str | None = None
self._expires_at: float = 0.0

@property
def token(self) -> str | None:
"""The current cached access token, or ``None`` if not yet fetched."""
return self._access_token

@property
def is_expired(self) -> bool:
"""Whether the cached token is missing or within the refresh window."""
return self._access_token is None or time.monotonic() >= (
self._expires_at - self._refresh_margin
)

def fetch_token(self, http: httpx.Client | None = None) -> str:
"""Return a valid access token, refreshing if necessary.

Parameters
----------
http:
Optional ``httpx.Client`` to reuse. A short-lived client is
created automatically when *None*.
"""
if not self.is_expired:
assert self._access_token is not None # noqa: S101 # nosec B101 # narrowing assertion after None check
return self._access_token

if http is not None:
self._refresh(http)
else:
with httpx.Client(timeout=30.0) as client:
self._refresh(client)

assert self._access_token is not None # noqa: S101 # nosec B101 # narrowing assertion after None check
return self._access_token

def fetch_token_info(self, http: httpx.Client | None = None) -> tuple[str, float]:
"""Return ``(token, expires_at_monotonic)``."""
token = self.fetch_token(http)
return token, self._expires_at

def _refresh(self, http: httpx.Client) -> None:
logger.info("Refreshing OAuth2 access token from %s", self._token_url)
try:
response = http.post(
self._token_url,
data={"grant_type": "client_credentials"},
auth=(self._client_id, self._secret),
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
raise AuthenticationError(
f"Token request failed: HTTP {exc.response.status_code}"
) from exc
except httpx.HTTPError as exc:
raise AuthenticationError(f"Token request failed: {exc}") from exc

payload = response.json()
self._access_token = payload["access_token"]
expires_in = int(payload.get("expires_in", 3600))
self._expires_at = time.monotonic() + expires_in
logger.info("Token acquired, expires in %ds", expires_in)
64 changes: 64 additions & 0 deletions packages/python/ess-browser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# ess-browser

Shared Playwright browser session with SSO handling.

```python
from ess_browser import BrowserSession

with BrowserSession(headed=True) as session:
page = session.new_page()
page.goto("https://example.com/")
session.wait_for_login(page, "https://example.com/")
```

## Installation

From the workspace root:

```bash
uv sync --all-packages
```

## API

### `BrowserSession`

Context manager that launches a persistent Chrome browser session.

| Parameter | Default | Description |
|---|---|---|
| `headed` | `False` | Show the browser window |
| `profile_dir` | `~/.ess-browser/profile` | Browser profile for session persistence |
| `viewport` | `1280x900` | Browser viewport size |
| `enable_extensions` | `False` | Load Chrome extensions from the profile (headed mode only) |
| `init_scripts` | `[]` | Additional JS snippets injected into every page |
| `skip_duo_update_prompt` | `True` | Auto-dismiss the Duo "update Chrome" nag |

Methods:

- `new_page() -> Page` -- open a new browser tab
- `wait_for_login(page, target_url, timeout_ms=300_000)` -- block until SSO completes
- `is_auth_redirect(current_url, target_url) -> bool` -- check if on an SSO page

#### Duo prompt auto-dismiss

By default, an init script auto-clicks the "Skip for now" button on
`duosecurity.com` pages so the Chrome-update nag doesn't block automated
flows. Pass `skip_duo_update_prompt=False` to disable this behavior for
manual or debugging sessions.

#### Chrome extensions

Set `enable_extensions=True` to load extensions installed in the profile
directory. This only has an effect in headed mode; Playwright ignores
extensions when running headless.

### `ess_browser.auth`

Lower-level SSO detection functions:

- `AUTH_DOMAINS` -- tuple of known SSO/IdP domain patterns
- `DUO_SKIP_UPDATE_SCRIPT` -- JS snippet that dismisses the Duo update prompt
- `LOGIN_TIMEOUT_MS` -- default timeout (5 minutes)
- `is_auth_redirect(current_url, target_url) -> bool`
- `wait_for_login(page, target_url, timeout_ms=LOGIN_TIMEOUT_MS)`
Loading
Loading