From 080e31a06762389b476e543f882ddc8e3c7a8e98 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Thu, 9 Apr 2026 20:47:07 +0530 Subject: [PATCH] chore: add ruff linting, replace RL scanner, and bump dependencies --- .github/actions/rl-scanner/action.yml | 71 ---------------- .github/workflows/publish.yml | 54 +++++++++--- .github/workflows/rl-scanner.yml | 83 ------------------ .github/workflows/test.yml | 15 +--- .gitignore | 2 + .ruff.toml | 18 ++++ fastapi_plugin/__init__.py | 3 +- fastapi_plugin/fast_api_client.py | 17 ++-- fastapi_plugin/utils.py | 47 +++++----- poetry.lock | 30 ++++++- pyproject.toml | 1 + tests/conftest.py | 7 +- tests/test_bearer_token_validation.py | 26 +++--- tests/test_client_initialization.py | 14 +-- tests/test_configuration_modes.py | 60 ++++++------- tests/test_dpop_authentication.py | 92 ++++++++++---------- tests/test_reverse_proxy.py | 118 +++++++++++++------------- tests/test_utils.py | 50 +++++------ 18 files changed, 312 insertions(+), 396 deletions(-) delete mode 100644 .github/actions/rl-scanner/action.yml delete mode 100644 .github/workflows/rl-scanner.yml create mode 100644 .ruff.toml diff --git a/.github/actions/rl-scanner/action.yml b/.github/actions/rl-scanner/action.yml deleted file mode 100644 index 7a2b774..0000000 --- a/.github/actions/rl-scanner/action.yml +++ /dev/null @@ -1,71 +0,0 @@ -name: "Reversing Labs Scanner" -description: "Runs the Reversing Labs scanner on a specified artifact." -inputs: - artifact-path: - description: "Path to the artifact to be scanned." - required: true - version: - description: "Version of the artifact." - required: true - -runs: - using: "composite" - steps: - - name: Set up Python - uses: actions/setup-python@v4 - with: - python-version: "3.10" - - - name: Install Python dependencies - shell: bash - run: | - pip install boto3 requests - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - role-to-assume: ${{ env.PRODSEC_TOOLS_ARN }} - aws-region: us-east-1 - mask-aws-account-id: true - - - name: Install RL Wrapper - shell: bash - run: | - pip install rl-wrapper>=1.0.6 --index-url "https://${{ env.PRODSEC_TOOLS_USER }}:${{ env.PRODSEC_TOOLS_TOKEN }}@a0us.jfrog.io/artifactory/api/pypi/python-local/simple" - - - name: Run RL Scanner - shell: bash - env: - RLSECURE_LICENSE: ${{ env.RLSECURE_LICENSE }} - RLSECURE_SITE_KEY: ${{ env.RLSECURE_SITE_KEY }} - SIGNAL_HANDLER_TOKEN: ${{ env.SIGNAL_HANDLER_TOKEN }} - PYTHONUNBUFFERED: 1 - run: | - if [ ! -f "${{ inputs.artifact-path }}" ]; then - echo "Artifact not found: ${{ inputs.artifact-path }}" - exit 1 - fi - - rl-wrapper \ - --artifact "${{ inputs.artifact-path }}" \ - --name "${{ github.event.repository.name }}" \ - --version "${{ inputs.version }}" \ - --repository "${{ github.repository }}" \ - --commit "${{ github.sha }}" \ - --build-env "github_actions" \ - --suppress_output - - # Check the outcome of the scanner - if [ $? -ne 0 ]; then - echo "RL Scanner failed." - echo "scan-status=failed" >> $GITHUB_ENV - exit 1 - else - echo "RL Scanner passed." - echo "scan-status=success" >> $GITHUB_ENV - fi - -outputs: - scan-status: - description: "The outcome of the scan process." - value: ${{ env.scan-status }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 73e2926..5b0fbe2 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -12,17 +12,49 @@ permissions: jobs: rl-scanner: - uses: ./.github/workflows/rl-scanner.yml - with: - python-version: "3.10" - artifact-name: "auth0-fastapi-api.tgz" - secrets: - RLSECURE_LICENSE: ${{ secrets.RLSECURE_LICENSE }} - RLSECURE_SITE_KEY: ${{ secrets.RLSECURE_SITE_KEY }} - SIGNAL_HANDLER_TOKEN: ${{ secrets.SIGNAL_HANDLER_TOKEN }} - PRODSEC_TOOLS_USER: ${{ secrets.PRODSEC_TOOLS_USER }} - PRODSEC_TOOLS_TOKEN: ${{ secrets.PRODSEC_TOOLS_TOKEN }} - PRODSEC_TOOLS_ARN: ${{ secrets.PRODSEC_TOOLS_ARN }} + if: github.event_name == 'workflow_dispatch' || (github.event_name == 'pull_request' && github.event.pull_request.merged && startsWith(github.event.pull_request.head.ref, 'release/')) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Configure Python + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Build artifact + run: | + pip install --user --upgrade pip + pip install --user pipx + pipx ensurepath + pipx install poetry + poetry config virtualenvs.in-project true + poetry install --with dev + poetry build + tar -czvf auth0-fastapi-api.tgz * + + - name: Get version + id: get_version + uses: ./.github/actions/get-version + + - name: Run RL Scanner + uses: auth0/devsecops-tooling/.github/actions/rl-scan@main + with: + artifact-name: "auth0-fastapi-api" + artifact-path: "auth0-fastapi-api.tgz" + version: ${{ steps.get_version.outputs.version }} + RLSECURE_LICENSE: ${{ secrets.RLSECURE_LICENSE }} + RLSECURE_SITE_KEY: ${{ secrets.RLSECURE_SITE_KEY }} + SIGNAL_HANDLER_TOKEN: ${{ secrets.SIGNAL_HANDLER_TOKEN }} + SIGNAL_HANDLER_DOMAIN: ${{ secrets.SIGNAL_HANDLER_DOMAIN }} + PRODSEC_TOOLS_ARN: ${{ secrets.PRODSEC_TOOLS_ARN }} + PRODSEC_TOOLS_USER: ${{ secrets.PRODSEC_TOOLS_USER }} + PRODSEC_TOOLS_TOKEN: ${{ secrets.PRODSEC_TOOLS_TOKEN }} + PRODSEC_PYTHON_TOOLS_REPO: ${{ secrets.PRODSEC_PYTHON_TOOLS_REPO }} + publish-pypi: if: github.event_name == 'workflow_dispatch' || (github.event_name == 'pull_request' && github.event.pull_request.merged && startsWith(github.event.pull_request.head.ref, 'release/')) name: "PyPI" diff --git a/.github/workflows/rl-scanner.yml b/.github/workflows/rl-scanner.yml deleted file mode 100644 index 482ce02..0000000 --- a/.github/workflows/rl-scanner.yml +++ /dev/null @@ -1,83 +0,0 @@ -name: RL-Secure Workflow - -on: - workflow_call: - inputs: - python-version: - required: true - type: string - artifact-name: - required: true - type: string - secrets: - RLSECURE_LICENSE: - required: true - RLSECURE_SITE_KEY: - required: true - SIGNAL_HANDLER_TOKEN: - required: true - PRODSEC_TOOLS_USER: - required: true - PRODSEC_TOOLS_TOKEN: - required: true - PRODSEC_TOOLS_ARN: - required: true - -jobs: - rl-scanner: - if: github.event_name == 'workflow_dispatch' || (github.event_name == 'pull_request' && github.event.pull_request.merged && startsWith(github.event.pull_request.head.ref, 'release/')) - runs-on: ubuntu-latest - outputs: - scan-status: ${{ steps.rl-scan-conclusion.outcome }} - - steps: - - uses: actions/checkout@v6 - with: - fetch-depth: 0 - fetch-tags: true - - - name: Configure Python - uses: actions/setup-python@v6 - with: - python-version: ${{ inputs.python-version }} - - - name: Configure dependencies - run: | - pip install --user --upgrade pip - pip install --user pipx - pipx ensurepath - pipx install poetry==1.4.2 - pip install --upgrade pip - pip install boto3 requests - poetry config virtualenvs.in-project true - poetry install --with dev - poetry self add "poetry-dynamic-versioning[plugin]==1.1.1" - - - name: Build release - run: | - poetry build - - - name: Create tgz build artifact - run: | - tar -czvf ${{ inputs.artifact-name }} * - - - name: Get Artifact Version - id: get_version - uses: ./.github/actions/get-version - - - name: Run RL Scanner - id: rl-scan-conclusion - uses: ./.github/actions/rl-scanner - with: - artifact-path: "$(pwd)/${{ inputs.artifact-name }}" - version: "${{ steps.get_version.outputs.version }}" - env: - RLSECURE_LICENSE: ${{ secrets.RLSECURE_LICENSE }} - RLSECURE_SITE_KEY: ${{ secrets.RLSECURE_SITE_KEY }} - SIGNAL_HANDLER_TOKEN: ${{ secrets.SIGNAL_HANDLER_TOKEN }} - PRODSEC_TOOLS_USER: ${{ secrets.PRODSEC_TOOLS_USER }} - PRODSEC_TOOLS_TOKEN: ${{ secrets.PRODSEC_TOOLS_TOKEN }} - PRODSEC_TOOLS_ARN: ${{ secrets.PRODSEC_TOOLS_ARN }} - - - name: Output scan result - run: echo "scan-status=${{ steps.rl-scan-conclusion.outcome }}" >> $GITHUB_ENV diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0ec6298..10530fd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -67,19 +67,12 @@ jobs: run: | poetry run pytest - # - name: Run lint - # run: | - # pipx install black==23.3.0 - # pipx install flake8==5.0.4 - # pipx install isort==5.11.5 - # pipx install pyupgrade==3.3.2 - # black . --check - # flake8 . --count --show-source --statistics - # isort . --diff --profile black - # pyupgrade . --py37-plus --keep-runtime-typing + - name: Run lint + run: | + poetry run ruff check . - if: ${{ matrix.python-version == '3.10' }} name: Upload coverage - uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # pin@5.5.2 + uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # pin@6.0.0 with: token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4825a42..4ac8747 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,5 @@ integration_test*.py .coverage coverage.xml +# AI Tools +.claude/ diff --git a/.ruff.toml b/.ruff.toml new file mode 100644 index 0000000..7ad6af6 --- /dev/null +++ b/.ruff.toml @@ -0,0 +1,18 @@ +line-length = 100 +target-version = "py39" + +[lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade + "S", # bandit (security) +] +ignore = ["E501", "B904", "B008"] # Line too long (handled by formatter), Exception handling without from, Depends() in defaults (FastAPI convention) + +[lint.per-file-ignores] +"tests/*" = ["S101", "S105", "S106", "S107"] # Allow assert, ignore hardcoded password warnings in test files diff --git a/fastapi_plugin/__init__.py b/fastapi_plugin/__init__.py index f3d08b0..0a1b85c 100644 --- a/fastapi_plugin/__init__.py +++ b/fastapi_plugin/__init__.py @@ -1,4 +1,3 @@ -from .fast_api_client import Auth0FastAPI from auth0_api_python import ( CacheAdapter, ConfigurationError, @@ -8,6 +7,8 @@ InMemoryCache, ) +from .fast_api_client import Auth0FastAPI + __all__ = [ "Auth0FastAPI", "CacheAdapter", diff --git a/fastapi_plugin/fast_api_client.py b/fastapi_plugin/fast_api_client.py index 2d73d9f..cd45817 100644 --- a/fastapi_plugin/fast_api_client.py +++ b/fastapi_plugin/fast_api_client.py @@ -1,11 +1,10 @@ -from typing import Optional, List, Union, Dict, Callable -from fastapi import Request, HTTPException -from starlette.responses import Response - -from .utils import validate_scopes, http_exception, get_canonical_url +from typing import Callable, Optional, Union from auth0_api_python.api_client import ApiClient, ApiClientOptions, BaseAuthError from auth0_api_python.cache import CacheAdapter +from fastapi import Request + +from .utils import get_canonical_url, http_exception, validate_scopes class Auth0FastAPI: @@ -18,7 +17,7 @@ def __init__( self, domain: Optional[str] = None, audience: str = "", - domains: Optional[Union[List[str], Callable]] = None, + domains: Optional[Union[list[str], Callable]] = None, client_id=None, client_secret=None, custom_fetch=None, @@ -73,7 +72,7 @@ def __init__( def require_auth( self, - scopes: Optional[Union[str, List[str]]] = None + scopes: Optional[Union[str, list[str]]] = None ): """ Returns an async FastAPI dependency that: @@ -83,7 +82,7 @@ def require_auth( 4) Raises HTTPException on error 5) On success, returns the decoded claims """ - async def _dependency(request: Request) -> Dict: + async def _dependency(request: Request) -> dict: try: claims = await self.api_client.verify_request( headers=dict(request.headers), @@ -97,7 +96,7 @@ async def _dependency(request: Request) -> Dict: error_desc=e.get_error_description(), headers=e.get_headers() ) - except Exception as e: + except Exception: # Handle any unexpected errors raise http_exception( status_code=500, diff --git a/fastapi_plugin/utils.py b/fastapi_plugin/utils.py index dbec367..2f3ef35 100644 --- a/fastapi_plugin/utils.py +++ b/fastapi_plugin/utils.py @@ -1,23 +1,24 @@ -from typing import Optional, List, Union, Dict -from fastapi import Request, HTTPException -from starlette.responses import Response +from typing import Optional from urllib.parse import urlparse, urlunparse +from fastapi import HTTPException, Request + + def http_exception( status_code: int, error: str, error_desc: str, - headers: Optional[Dict[str, str]] = None + headers: Optional[dict[str, str]] = None ) -> HTTPException: """ Construct an HTTPException with appropriate headers. - + Args: status_code: HTTP status code error: OAuth2/DPoP error code - error_desc: Human-readable error description + error_desc: Human-readable error description headers: Optional headers dict (e.g., from BaseAuthError.get_headers()) - + Note: When used with BaseAuthError, pass the headers from get_headers() to ensure correct WWW-Authenticate challenges are included. """ @@ -33,7 +34,7 @@ def http_exception( def _should_trust_proxy(request: Request) -> bool: """ Determines if X-Forwarded-* headers should be trusted. - + Returns: bool: True if proxy headers should be trusted """ @@ -47,42 +48,42 @@ def _should_trust_proxy(request: Request) -> bool: def _parse_forwarded_host(forwarded_host: Optional[str]) -> Optional[str]: """ Parses X-Forwarded-Host header, handling multiple comma-separated values. - + Args: forwarded_host: Value of X-Forwarded-Host header - + Returns: The first host value, or None if empty """ if not forwarded_host: return None - + # Handle comma-separated values (multiple proxies) comma_index = forwarded_host.find(',') if comma_index != -1: forwarded_host = forwarded_host[:comma_index].rstrip() - + return forwarded_host.strip() or None def get_canonical_url(request: Request) -> str: """ Constructs the canonical URL for DPoP validation, securely handling reverse proxy headers. - + Args: request: FastAPI/Starlette Request object - + Returns: Canonical URL string matching what the client used - + """ # Start with the direct connection URL parsed = urlparse(str(request.url)) - + # Default to direct request values scheme = parsed.scheme netloc = parsed.netloc path = parsed.path - + # Only process X-Forwarded headers if proxy is trusted if _should_trust_proxy(request): # X-Forwarded-Proto: Override scheme if present @@ -91,13 +92,13 @@ def get_canonical_url(request: Request) -> str: proto = forwarded_proto.strip().lower() if proto in ("http", "https"): scheme = proto - + # X-Forwarded-Host: Override host, handling multiple proxies forwarded_host = request.headers.get("x-forwarded-host") parsed_host = _parse_forwarded_host(forwarded_host) if parsed_host: netloc = parsed_host - + # X-Forwarded-Prefix: Prepend path prefix forwarded_prefix = request.headers.get("x-forwarded-prefix", "").strip() if forwarded_prefix and not any([ @@ -108,7 +109,7 @@ def get_canonical_url(request: Request) -> str: if not forwarded_prefix.startswith("/"): forwarded_prefix = "/" + forwarded_prefix path = forwarded_prefix.rstrip("/") + path - + canonical_url = urlunparse(( scheme, netloc, @@ -117,10 +118,10 @@ def get_canonical_url(request: Request) -> str: parsed.query, "" # No fragment in DPoP htu claim )) - + return canonical_url -def validate_scopes(claims: Dict, required_scopes: List[str]) -> bool: +def validate_scopes(claims: dict, required_scopes: list[str]) -> bool: """ Verifies the 'scope' claim (a space-delimited string) includes all required_scopes. """ @@ -129,4 +130,4 @@ def validate_scopes(claims: Dict, required_scopes: List[str]) -> bool: return False token_scopes = scope_str.split() # space-delimited - return all(req in token_scopes for req in required_scopes) \ No newline at end of file + return all(req in token_scopes for req in required_scopes) diff --git a/poetry.lock b/poetry.lock index d15de1b..833b702 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1872,6 +1872,34 @@ pygments = ">=2.13.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "ruff" +version = "0.15.10" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "ruff-0.15.10-py3-none-linux_armv6l.whl", hash = "sha256:0744e31482f8f7d0d10a11fcbf897af272fefdfcb10f5af907b18c2813ff4d5f"}, + {file = "ruff-0.15.10-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b1e7c16ea0ff5a53b7c2df52d947e685973049be1cdfe2b59a9c43601897b22e"}, + {file = "ruff-0.15.10-py3-none-macosx_11_0_arm64.whl", hash = "sha256:93cc06a19e5155b4441dd72808fdf84290d84ad8a39ca3b0f994363ade4cebb1"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83e1dd04312997c99ea6965df66a14fb4f03ba978564574ffc68b0d61fd3989e"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8154d43684e4333360fedd11aaa40b1b08a4e37d8ffa9d95fee6fa5b37b6fab1"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ab88715f3a6deb6bde6c227f3a123410bec7b855c3ae331b4c006189e895cef"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a768ff5969b4f44c349d48edf4ab4f91eddb27fd9d77799598e130fb628aa158"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0ee3ef42dab7078bda5ff6a1bcba8539e9857deb447132ad5566a038674540d0"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51cb8cc943e891ba99989dd92d61e29b1d231e14811db9be6440ecf25d5c1609"}, + {file = "ruff-0.15.10-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:e59c9bdc056a320fb9ea1700a8d591718b8faf78af065484e801258d3a76bc3f"}, + {file = "ruff-0.15.10-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:136c00ca2f47b0018b073f28cb5c1506642a830ea941a60354b0e8bc8076b151"}, + {file = "ruff-0.15.10-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8b80a2f3c9c8a950d6237f2ca12b206bccff626139be9fa005f14feb881a1ae8"}, + {file = "ruff-0.15.10-py3-none-musllinux_1_2_i686.whl", hash = "sha256:e3e53c588164dc025b671c9df2462429d60357ea91af7e92e9d56c565a9f1b07"}, + {file = "ruff-0.15.10-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b0c52744cf9f143a393e284125d2576140b68264a93c6716464e129a3e9adb48"}, + {file = "ruff-0.15.10-py3-none-win32.whl", hash = "sha256:d4272e87e801e9a27a2e8df7b21011c909d9ddd82f4f3281d269b6ba19789ca5"}, + {file = "ruff-0.15.10-py3-none-win_amd64.whl", hash = "sha256:28cb32d53203242d403d819fd6983152489b12e4a3ae44993543d6fe62ab42ed"}, + {file = "ruff-0.15.10-py3-none-win_arm64.whl", hash = "sha256:601d1610a9e1f1c2165a4f561eeaa2e2ea1e97f3287c5aa258d3dab8b57c6188"}, + {file = "ruff-0.15.10.tar.gz", hash = "sha256:d1f86e67ebfdef88e00faefa1552b5e510e1d35f3be7d423dc7e84e63788c94e"}, +] + [[package]] name = "secretstorage" version = "3.3.3" @@ -2143,4 +2171,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "47d97385e53eb22d8da50dbfeef1f5a76b1b0196201ce53d09683f959d1625c4" +content-hash = "af0c687df05f06a0914d7ee57d23563ba4712dce25490e7bb8ebd2c710b00ecd" diff --git a/pyproject.toml b/pyproject.toml index f165f7a..35279ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ pytest-cov = "^4.0" pytest-asyncio = ">=0.20.3,<0.27.0" pytest-mock = "^3.15.1" pytest-httpx = "^0.35.0" +ruff = ">=0.1" twine = "^6.2.0" uvicorn = ">=0.34.0" diff --git a/tests/conftest.py b/tests/conftest.py index 7a589df..b80a59d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,7 @@ -from typing import Dict, List from pytest_httpx import HTTPXMock -from .test_utils import PUBLIC_DPOP_JWK, PRIVATE_JWK +from .test_utils import PRIVATE_JWK, PUBLIC_DPOP_JWK # RSA public key used across all test domains (shared key for simplicity) RSA_PUBLIC_KEY = { @@ -37,7 +36,7 @@ def setup_mocks(httpx_mock: HTTPXMock): ) -def setup_mcd_mocks(httpx_mock: HTTPXMock, domains: List[str]): +def setup_mcd_mocks(httpx_mock: HTTPXMock, domains: list[str]): """Setup OIDC and JWKS mocks for multiple domains in MCD tests. Each domain gets its own .well-known/openid-configuration and @@ -69,4 +68,4 @@ def setup_mcd_mocks(httpx_mock: HTTPXMock, domains: List[str]): PUBLIC_DPOP_JWK ] } - ) \ No newline at end of file + ) diff --git a/tests/test_bearer_token_validation.py b/tests/test_bearer_token_validation.py index 683136d..f52f086 100644 --- a/tests/test_bearer_token_validation.py +++ b/tests/test_bearer_token_validation.py @@ -3,14 +3,14 @@ Tests core JWT validation logic including issuer, expiration, and scope checks. """ import pytest -from fastapi import FastAPI, Depends -from pytest_httpx import HTTPXMock +from fastapi import Depends, FastAPI from fastapi.testclient import TestClient +from pytest_httpx import HTTPXMock from fastapi_plugin.fast_api_client import Auth0FastAPI -from .test_utils import generate_token -from .conftest import setup_mocks, setup_mcd_mocks +from .conftest import setup_mcd_mocks, setup_mocks +from .test_utils import generate_token # ============================================================================= # Missing Token Tests @@ -28,7 +28,7 @@ async def test_route(claims=Depends(auth0.require_auth())): client = TestClient(app) response = client.get("/test") - + assert response.status_code == 400 json_body = response.json() assert json_body["detail"]["error"] == "invalid_request" @@ -42,7 +42,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_valid_bearer_token_authentication(httpx_mock: HTTPXMock): """Test successful authentication with a valid Bearer token.""" setup_mocks(httpx_mock) - + access_token = await generate_token( domain="auth0.local", user_id="user_123", @@ -77,7 +77,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_missing_issuer_claim(httpx_mock: HTTPXMock): """Test that tokens without 'iss' claim are rejected with 401.""" setup_mocks(httpx_mock) - + access_token = await generate_token( domain="auth0.local", user_id="user_123", @@ -108,7 +108,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_invalid_issuer_claim(httpx_mock: HTTPXMock): """Test that tokens with mismatched issuer are rejected with 401.""" setup_mocks(httpx_mock) - + access_token = await generate_token( domain="auth0.local", user_id="user_123", @@ -130,7 +130,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "/test", headers={"Authorization": f"Bearer {access_token}"} ) - + assert response.status_code == 401 @@ -142,7 +142,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_missing_expiration_claim(httpx_mock: HTTPXMock): """Test that tokens without 'exp' claim are rejected with 401.""" setup_mocks(httpx_mock) - + access_token = await generate_token( domain="auth0.local", user_id="user_123", @@ -164,7 +164,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "/test", headers={"Authorization": f"Bearer {access_token}"} ) - + assert response.status_code == 401 @@ -176,7 +176,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_insufficient_scope(httpx_mock: HTTPXMock): """Test that tokens with insufficient scopes are rejected with 403.""" setup_mocks(httpx_mock) - + access_token = await generate_token( domain="auth0.local", user_id="user_123", @@ -199,7 +199,7 @@ async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): "/test", headers={"Authorization": f"Bearer {access_token}"} ) - + assert response.status_code == 403 json_body = response.json() assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_client_initialization.py b/tests/test_client_initialization.py index 2748dc1..13991d8 100644 --- a/tests/test_client_initialization.py +++ b/tests/test_client_initialization.py @@ -3,9 +3,9 @@ Tests constructor parameters, default values, and configuration options. """ import pytest -from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin import ConfigurationError, InMemoryCache +from fastapi_plugin import ConfigurationError, InMemoryCache +from fastapi_plugin.fast_api_client import Auth0FastAPI # ============================================================================= # Client Credentials Configuration @@ -15,14 +15,14 @@ def test_initialization_with_client_credentials(): """Test that Auth0FastAPI accepts and stores client_id and client_secret.""" client_id = "test_client_id" client_secret = "test_client_secret" - + auth0 = Auth0FastAPI( domain="auth0.local", audience="test_audience", client_id=client_id, client_secret=client_secret ) - + options = auth0.api_client.options assert options.client_id == client_id assert options.client_secret == client_secret @@ -35,7 +35,7 @@ def test_initialization_with_client_credentials(): def test_dpop_default_configuration(): """Test that DPoP has correct default configuration values.""" auth0 = Auth0FastAPI(domain="auth0.local", audience="test") - + assert auth0.api_client.options.dpop_enabled is True assert auth0.api_client.options.dpop_required is False assert auth0.api_client.options.dpop_iat_leeway == 30 @@ -49,7 +49,7 @@ def test_dpop_disabled_configuration(): audience="test", dpop_enabled=False ) - + assert auth0.api_client.options.dpop_enabled is False @@ -61,7 +61,7 @@ def test_dpop_custom_timing_configuration(): dpop_iat_leeway=60, dpop_iat_offset=600 ) - + assert auth0.api_client.options.dpop_iat_leeway == 60 assert auth0.api_client.options.dpop_iat_offset == 600 diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py index 2b0cef4..3f3aaa8 100644 --- a/tests/test_configuration_modes.py +++ b/tests/test_configuration_modes.py @@ -3,18 +3,18 @@ Tests the runtime behavior of different DPoP configuration modes. """ import pytest -from fastapi import FastAPI, Depends -from pytest_httpx import HTTPXMock +from fastapi import Depends, FastAPI from fastapi.testclient import TestClient +from pytest_httpx import HTTPXMock from fastapi_plugin.fast_api_client import Auth0FastAPI + +from .conftest import setup_mocks from .test_utils import ( - generate_token, - generate_dpop_proof, generate_dpop_bound_token, + generate_dpop_proof, + generate_token, ) -from .conftest import setup_mocks - # ============================================================================= # DPoP Required Mode Tests @@ -30,24 +30,24 @@ async def test_dpop_required_mode_rejects_bearer(): audience="", issuer="https://auth0.local/" ) - + app = FastAPI() auth0 = Auth0FastAPI( domain="auth0.local", audience="", dpop_required=True # DPoP required mode ) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", headers={"Authorization": f"Bearer {bearer_token}"} ) - + assert response.status_code == 400 json_body = response.json() assert json_body["detail"]["error"] == "invalid_request" @@ -57,20 +57,20 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_required_mode_accepts_dpop(httpx_mock: HTTPXMock): """Test that DPoP tokens are accepted when dpop_required=True.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI( domain="auth0.local", @@ -81,7 +81,7 @@ async def test_dpop_required_mode_accepts_dpop(httpx_mock: HTTPXMock): @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) response = client.get( "/test", @@ -90,7 +90,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": dpop_proof } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -108,24 +108,24 @@ async def test_dpop_disabled_mode_rejects_dpop(): audience="", issuer="https://auth0.local/" ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI( domain="auth0.local", audience="", dpop_enabled=False # DPoP disabled ) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", @@ -134,7 +134,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": dpop_proof } ) - + assert response.status_code == 400 json_body = response.json() assert json_body["detail"]["error"] == "invalid_request" @@ -148,31 +148,31 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_bearer_only_mode_accepts_bearer(httpx_mock: HTTPXMock): """Test that Bearer tokens work when DPoP is disabled.""" setup_mocks(httpx_mock) - + bearer_token = await generate_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + app = FastAPI() auth0 = Auth0FastAPI( domain="auth0.local", audience="", dpop_enabled=False # Bearer only mode ) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) response = client.get( "/test", headers={"Authorization": f"Bearer {bearer_token}"} ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -185,7 +185,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_bound_token_with_bearer_scheme_fails(httpx_mock: HTTPXMock): """Test that DPoP-bound tokens fail when using Bearer scheme.""" setup_mocks(httpx_mock) - + # Generate a DPoP-bound token (has cnf claim) dpop_token = await generate_dpop_bound_token( domain="auth0.local", @@ -193,25 +193,25 @@ async def test_dpop_bound_token_with_bearer_scheme_fails(httpx_mock: HTTPXMock): audience="", issuer="https://auth0.local/" ) - + app = FastAPI() auth0 = Auth0FastAPI( domain="auth0.local", audience="", dpop_enabled=True ) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) # Try to use DPoP-bound token with Bearer scheme (should fail) response = client.get( "/test", headers={"Authorization": f"Bearer {dpop_token}"} ) - + assert response.status_code == 401 json_body = response.json() assert json_body["detail"]["error"] == "invalid_token" diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py index cc55e76..175cba8 100644 --- a/tests/test_dpop_authentication.py +++ b/tests/test_dpop_authentication.py @@ -3,43 +3,41 @@ Focuses on DPoP-specific authentication flows and error cases. """ import pytest -from fastapi import FastAPI, Depends -from pytest_httpx import HTTPXMock +from fastapi import Depends, FastAPI from fastapi.testclient import TestClient +from pytest_httpx import HTTPXMock from fastapi_plugin.fast_api_client import Auth0FastAPI -from .test_utils import ( - generate_dpop_proof, - generate_dpop_bound_token -) -from .conftest import setup_mocks, setup_mcd_mocks + +from .conftest import setup_mcd_mocks, setup_mocks +from .test_utils import generate_dpop_bound_token, generate_dpop_proof @pytest.mark.asyncio async def test_dpop_authentication_success(httpx_mock: HTTPXMock): """Test successful DPoP authentication with valid token and proof.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) response = client.get( "/test", @@ -48,7 +46,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": dpop_proof } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -57,27 +55,27 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_authentication_missing_dpop_header(httpx_mock: HTTPXMock): """Test DPoP request fails early when DPoP header is missing (before token validation).""" # No setup_mocks needed - request fails before JWKS lookup - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", headers={"Authorization": f"DPoP {access_token}"} # Missing DPoP header ) - + assert response.status_code == 400 json_body = response.json() assert "invalid_request" in json_body["detail"]["error"] @@ -87,21 +85,21 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_authentication_invalid_dpop_proof(httpx_mock: HTTPXMock): """Test DPoP request fails early with malformed DPoP proof (before token validation).""" # No setup_mocks needed - request fails before JWKS lookup - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", @@ -110,7 +108,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": "invalid.jwt.proof" } ) - + assert response.status_code == 400 json_body = response.json() assert "invalid_dpop_proof" in json_body["detail"]["error"] @@ -120,28 +118,28 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_authentication_url_mismatch(httpx_mock: HTTPXMock): """Test DPoP proof fails when htu doesn't match request URL.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate proof for WRONG URL dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/wrong-url", # Wrong URL access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", @@ -150,7 +148,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": dpop_proof } ) - + assert response.status_code == 400 json_body = response.json() assert "invalid_dpop_proof" in json_body["detail"]["error"] @@ -160,28 +158,28 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_authentication_method_mismatch(httpx_mock: HTTPXMock): """Test DPoP proof fails when htm doesn't match request method.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate proof for POST but send GET dpop_proof = await generate_dpop_proof( http_method="POST", # Wrong method http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) response = client.get( "/test", @@ -190,7 +188,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "DPoP": dpop_proof } ) - + assert response.status_code == 400 json_body = response.json() assert "invalid_dpop_proof" in json_body["detail"]["error"] @@ -200,7 +198,7 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_dpop_with_scope_validation(httpx_mock: HTTPXMock): """Test that scope validation works with DPoP tokens.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", @@ -208,20 +206,20 @@ async def test_dpop_with_scope_validation(httpx_mock: HTTPXMock): issuer="https://auth0.local/", claims={"scope": "read:data write:data"} ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): return {"user": claims["sub"]} - + client = TestClient(app) response = client.get( "/test", @@ -230,7 +228,7 @@ async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): "DPoP": dpop_proof } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -239,7 +237,7 @@ async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): async def test_dpop_with_invalid_scope(httpx_mock: HTTPXMock): """Test that scope validation fails with insufficient scopes.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", @@ -247,20 +245,20 @@ async def test_dpop_with_invalid_scope(httpx_mock: HTTPXMock): issuer="https://auth0.local/", claims={"scope": "read:data"} ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="http://testserver/test", access_token=access_token ) - + app = FastAPI() auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): return "OK" - + client = TestClient(app) response = client.get( "/test", @@ -269,7 +267,7 @@ async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): "DPoP": dpop_proof } ) - + assert response.status_code == 403 json_body = response.json() assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py index 87c6037..3c267d8 100644 --- a/tests/test_reverse_proxy.py +++ b/tests/test_reverse_proxy.py @@ -3,49 +3,47 @@ Tests the get_canonical_url functionality and X-Forwarded-* header handling. """ import pytest -from fastapi import FastAPI, Depends, Request -from pytest_httpx import HTTPXMock +from fastapi import Depends, FastAPI, Request from fastapi.testclient import TestClient +from pytest_httpx import HTTPXMock from fastapi_plugin.fast_api_client import Auth0FastAPI from fastapi_plugin.utils import get_canonical_url -from .test_utils import ( - generate_dpop_proof, - generate_dpop_bound_token -) + from .conftest import setup_mocks +from .test_utils import generate_dpop_bound_token, generate_dpop_proof @pytest.mark.asyncio async def test_reverse_proxy_with_trust_enabled(httpx_mock: HTTPXMock): """Test that X-Forwarded headers are used when trust_proxy=True.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate DPoP proof for the PUBLIC URL dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://api.example.com/test", # Public URL access_token=access_token ) - + app = FastAPI() app.state.trust_proxy = True # Enable proxy trust - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) - + # Request comes to internal URL but with X-Forwarded headers response = client.get( "/test", @@ -56,7 +54,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Host": "api.example.com" } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -65,32 +63,32 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_reverse_proxy_without_trust_fails(httpx_mock: HTTPXMock): """Test that X-Forwarded headers are IGNORED when trust_proxy=False.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate DPoP proof for the PUBLIC URL dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://api.example.com/test", access_token=access_token ) - + app = FastAPI() # trust_proxy defaults to False - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return "OK" - + client = TestClient(app) - + # Headers are present but should be ignored response = client.get( "/test", @@ -101,7 +99,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Host": "api.example.com" } ) - + # Should fail because proof expects https://api.example.com # but app sees http://testserver (headers ignored) assert response.status_code == 400 @@ -113,32 +111,32 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_reverse_proxy_with_path_prefix(httpx_mock: HTTPXMock): """Test X-Forwarded-Prefix handling.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate DPoP proof with prefix dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://api.example.com/api/v1/test", access_token=access_token ) - + app = FastAPI() app.state.trust_proxy = True - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) - + response = client.get( "/test", headers={ @@ -149,7 +147,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Prefix": "/api/v1" } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -158,32 +156,32 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_reverse_proxy_with_trailing_slash_prefix(httpx_mock: HTTPXMock): """Test X-Forwarded-Prefix with trailing slash is handled correctly.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Generate DPoP proof WITHOUT trailing slash dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://api.example.com/api/v1/test", access_token=access_token ) - + app = FastAPI() app.state.trust_proxy = True - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) - + response = client.get( "/test", headers={ @@ -194,7 +192,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Prefix": "/api/v1/" # With trailing slash } ) - + # Should still work - trailing slash should be stripped assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -204,31 +202,31 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_reverse_proxy_multiple_hosts(httpx_mock: HTTPXMock): """Test that first host is used from comma-separated X-Forwarded-Host.""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://client.example.com/test", access_token=access_token ) - + app = FastAPI() app.state.trust_proxy = True - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) - + response = client.get( "/test", headers={ @@ -238,7 +236,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Host": "client.example.com, proxy1.internal, proxy2.internal" } ) - + assert response.status_code == 200 json_response = response.json() assert json_response["user"] == "user_123" @@ -247,32 +245,32 @@ async def test_route(claims=Depends(auth0.require_auth())): async def test_reverse_proxy_partial_headers(httpx_mock: HTTPXMock): """Test with only X-Forwarded-Proto (partial headers).""" setup_mocks(httpx_mock) - + access_token = await generate_dpop_bound_token( domain="auth0.local", user_id="user_123", audience="", issuer="https://auth0.local/" ) - + # Proof expects https but with testserver host dpop_proof = await generate_dpop_proof( http_method="GET", http_url="https://testserver/test", access_token=access_token ) - + app = FastAPI() app.state.trust_proxy = True - + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - + @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} - + client = TestClient(app) - + response = client.get( "/test", headers={ @@ -281,7 +279,7 @@ async def test_route(claims=Depends(auth0.require_auth())): "X-Forwarded-Proto": "https" # Only proto, no host } ) - + assert response.status_code == 200 assert response.json()["user"] == "user_123" @@ -290,12 +288,12 @@ def test_get_canonical_url_with_proxy(): """Test get_canonical_url uses X-Forwarded headers when trust_proxy=True.""" app = FastAPI() app.state.trust_proxy = True - + @app.get("/test") async def test_route(request: Request): canonical_url = get_canonical_url(request) return {"url": canonical_url} - + client = TestClient(app) response = client.get( "/test", @@ -304,7 +302,7 @@ async def test_route(request: Request): "X-Forwarded-Host": "api.example.com" } ) - + assert response.status_code == 200 url = response.json()["url"] assert "https://api.example.com/test" == url @@ -314,12 +312,12 @@ def test_get_canonical_url_security_without_trust(): """Test that malicious X-Forwarded headers are ignored without trust.""" app = FastAPI() # trust_proxy = False (default) - + @app.get("/test") async def test_route(request: Request): canonical_url = get_canonical_url(request) return {"url": canonical_url} - + client = TestClient(app) response = client.get( "/test", @@ -328,7 +326,7 @@ async def test_route(request: Request): "X-Forwarded-Host": "attacker.com" } ) - + assert response.status_code == 200 url = response.json()["url"] # Malicious headers should be ignored diff --git a/tests/test_utils.py b/tests/test_utils.py index a996bea..30153bf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,10 @@ -import time -import hashlib import base64 +import hashlib import secrets -from typing import Optional, Dict, Any, Union -from authlib.jose import JsonWebKey, jwt +import time +from typing import Any, Optional, Union +from authlib.jose import JsonWebKey, jwt # A private RSA JWK for test usage (Bearer tokens). PRIVATE_JWK = { @@ -50,7 +50,7 @@ async def generate_token( issuer: Union[str, bool, None] = None, iat: bool = True, exp: bool = True, - claims: Optional[Dict[str, Any]] = None, + claims: Optional[dict[str, Any]] = None, expiration_time: int = 3600, token_type: str = "bearer" ) -> str: @@ -89,7 +89,7 @@ async def generate_token( if audience: token_claims["aud"] = audience - + # Add DPoP binding for DPoP tokens if token_type == "dpop": jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) @@ -115,7 +115,7 @@ def generate_jti() -> str: """Generate a random JTI (JWT ID) for DPoP proof.""" return base64url_encode(secrets.token_bytes(16)) -def calculate_jwk_thumbprint(jwk_dict: Dict[str, Any]) -> str: +def calculate_jwk_thumbprint(jwk_dict: dict[str, Any]) -> str: """Calculate JWK thumbprint for DPoP proof.""" # For EC P-256 keys, thumbprint is calculated from crv, kty, x, y thumbprint_jwk = { @@ -138,23 +138,23 @@ async def generate_dpop_proof( ) -> str: """ Generate a DPoP proof JWT for testing. - + Args: http_method: HTTP method (GET, POST, etc.) http_url: Full HTTP URL access_token: Access token to bind (for ath claim) nonce: Server nonce for DPoP proof iat_offset: Offset for iat claim (for testing expired proofs) - + Returns: DPoP proof JWT string """ current_time = int(time.time()) + iat_offset jti = generate_jti() - + # Calculate JWK thumbprint for jkt claim jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) - + # DPoP proof claims proof_claims = { "jti": jti, @@ -163,15 +163,15 @@ async def generate_dpop_proof( "iat": current_time, "jkt": jkt } - + # Add access token hash if provided if access_token: proof_claims["ath"] = sha256_hash(access_token) - + # Add nonce if provided if nonce: proof_claims["nonce"] = nonce - + # Create header with public key header = { "alg": "ES256", @@ -183,7 +183,7 @@ async def generate_dpop_proof( "y": PUBLIC_DPOP_JWK["y"] } } - + # Sign with private key key = JsonWebKey.import_key(PRIVATE_DPOP_JWK) proof_jwt = jwt.encode(header, proof_claims, key) @@ -197,14 +197,14 @@ async def generate_dpop_bound_token( issuer: Union[str, bool, None] = None, iat: bool = True, exp: bool = True, - claims: Optional[Dict[str, Any]] = None, + claims: Optional[dict[str, Any]] = None, expiration_time: int = 3600, cnf_jkt: Optional[str] = None ) -> str: """ Generate a DPoP-bound access token for testing. Similar to generate_token but includes cnf claim with jkt. - + Args: domain: Auth0 domain user_id: Subject claim @@ -215,33 +215,33 @@ async def generate_dpop_bound_token( claims: Additional claims expiration_time: Token expiration in seconds cnf_jkt: JWK thumbprint for confirmation claim (auto-calculated if None) - + Returns: DPoP-bound access token JWT string """ token_claims = dict(claims or {}) token_claims.setdefault("sub", user_id) - + if iat: token_claims["iat"] = int(time.time()) - + if exp: token_claims["exp"] = int(time.time()) + expiration_time - + if issuer is not False: token_claims["iss"] = issuer if isinstance(issuer, str) else f"https://{domain}/" - + if audience: token_claims["aud"] = audience - + # Add DPoP binding if cnf_jkt is None: cnf_jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) - + token_claims["cnf"] = { "jkt": cnf_jkt } - + # Sign with RS256 (access tokens are still RS256, only DPoP proofs are ES256) key = JsonWebKey.import_key(PRIVATE_JWK) header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]}