Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 2024-05-23 - Prevent Timing Attacks in Token Validation
**Vulnerability:** API key and token validation in `AuthMiddleware` and `APIKeyMiddleware` used standard Python string equality (`!=`) and set membership (`in`) checks, which leak comparison time and could allow attackers to guess tokens via timing attacks.
**Learning:** Security middleware in Python must always use constant-time comparisons for secrets, especially in custom ASGI/FastAPI middleware that might bypass standard framework-level protections. Standard string comparison is susceptible to timing side channels.
**Prevention:** Always use `secrets.compare_digest(a, b)` when validating authentication tokens, API keys, passwords, or other secrets. Ensure inputs are strings and not `None` before comparing to avoid `TypeError`.
43 changes: 39 additions & 4 deletions src/core/security/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import math
import secrets
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
Expand Down Expand Up @@ -219,7 +220,16 @@ async def dispatch(
all_valid_keys: set[str] = self.valid_keys | app_state_keys

method = request.method
if not api_key or api_key not in all_valid_keys:
is_valid_key = False
if api_key:
for valid_key in all_valid_keys:
if isinstance(valid_key, str) and secrets.compare_digest(
Comment on lines +225 to +226
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restore O(1) API key lookup path

Replacing api_key in all_valid_keys with a full for loop over every key makes authentication cost grow linearly with the number of configured keys on every request; this can become a noticeable throughput regression (and CPU amplification vector) in deployments that keep many tenant keys in memory. The same pattern appears in both the dispatch and ASGI __call__ paths, so all API-key-protected traffic pays this cost. Consider preserving constant-time comparison semantics without scanning the entire key set per request.

Useful? React with πŸ‘Β / πŸ‘Ž.

api_key, valid_key
):
is_valid_key = True
break

if not is_valid_key:
logger.warning(
"Invalid or missing API key for %s %s from client %s",
method,
Expand Down Expand Up @@ -525,7 +535,16 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
f"API Key authentication is enabled key_count={len(all_valid_keys)}"
)
method = scope.get("method", "UNKNOWN")
if not api_key or api_key not in all_valid_keys:
is_valid_key = False
if api_key:
for valid_key in all_valid_keys:
if isinstance(valid_key, str) and secrets.compare_digest(
api_key, valid_key
):
is_valid_key = True
break

if not is_valid_key:
logger.warning(
"Invalid or missing API key for %s %s from client %s",
method,
Expand Down Expand Up @@ -659,7 +678,15 @@ async def dispatch(
method = request.method

# Validate the token
if not token or token != self.valid_token:
is_valid_token = False
if (
token
and isinstance(self.valid_token, str)
and secrets.compare_digest(token, self.valid_token)
):
is_valid_token = True

if not is_valid_token:
logger.warning(
"Invalid or missing auth token for %s %s from client %s",
method,
Expand Down Expand Up @@ -763,7 +790,15 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
method = scope.get("method", "UNKNOWN")

# Validate the token
if not token or token != self.valid_token:
is_valid_token = False
if (
token
and isinstance(self.valid_token, str)
and secrets.compare_digest(token, self.valid_token)
):
is_valid_token = True

if not is_valid_token:
logger.warning(
"Invalid or missing auth token for %s %s from client %s",
method,
Expand Down
188 changes: 94 additions & 94 deletions src/core/transport/fastapi/adapters/response/other_response_builder.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,94 @@
"""Other response builder for non-JSON responses."""
from __future__ import annotations
import json
from typing import Any
from fastapi.responses import Response
from src.core.domain.responses import ResponseEnvelope
from src.core.transport.fastapi.adapters.protocols import (
IHeaderSanitizer,
IUsageHeaderInjector,
)
from src.core.transport.fastapi.adapters.sanitization.header_sanitizer import (
HeaderSanitizer,
)
from src.core.transport.fastapi.adapters.usage.header_injector import (
UsageHeaderInjector,
)
class OtherResponseBuilder:
"""Build FastAPI Response for non-JSON content types.
Handles binary, text, and other non-JSON response types with
appropriate header sanitization.
"""
def __init__(
self,
header_sanitizer: IHeaderSanitizer | None = None,
usage_header_injector: IUsageHeaderInjector | None = None,
) -> None:
"""Initialize other response builder.
Args:
header_sanitizer: Optional header sanitizer. Creates default if not provided.
usage_header_injector: Optional usage header injector. Creates default if not provided.
"""
self._header_sanitizer = header_sanitizer or HeaderSanitizer()
self._usage_header_injector = usage_header_injector or UsageHeaderInjector()
def build(self, envelope: ResponseEnvelope) -> Response:
"""Build Response from envelope.
Args:
envelope: Response envelope
Returns:
FastAPI Response
"""
# Get content and media type
content = envelope.content
media_type = getattr(envelope, "media_type", None) or "application/octet-stream"
# Inject canonical usage headers if available (Requirement 5.5)
headers = envelope.headers or {}
# Extract usage dict from envelope if available (for fallback)
usage_dict: dict[str, Any] | None = None
if envelope.usage:
from src.core.domain.usage_summary import UsageSummary
if isinstance(envelope.usage, UsageSummary):
usage_dict = envelope.usage.to_legacy_dict()
headers = self._usage_header_injector.inject_headers(
headers, usage_dict or {}, canonical_usage=envelope.canonical_usage
)
# Sanitize headers
safe_headers = self._header_sanitizer.sanitize(headers)
# Handle content conversion
content_bytes: bytes
if isinstance(content, bytes):
content_bytes = content
elif isinstance(content, str):
content_bytes = content.encode("utf-8")
else:
# For iterables and other non-string content, use JSON serialization
# to ensure consistent formatting (double quotes, proper escaping)
try:
content_bytes = json.dumps(content).encode("utf-8")
except (TypeError, ValueError):
# Fallback to string if JSON serialization fails
content_bytes = str(content).encode("utf-8")
# Create response
return Response(
content=content_bytes,
status_code=envelope.status_code or 200,
media_type=media_type,
headers=safe_headers,
)
"""Other response builder for non-JSON responses."""

from __future__ import annotations

import json
from typing import Any

from fastapi.responses import Response

from src.core.domain.responses import ResponseEnvelope
from src.core.transport.fastapi.adapters.protocols import (
IHeaderSanitizer,
IUsageHeaderInjector,
)
from src.core.transport.fastapi.adapters.sanitization.header_sanitizer import (
HeaderSanitizer,
)
from src.core.transport.fastapi.adapters.usage.header_injector import (
UsageHeaderInjector,
)


class OtherResponseBuilder:
"""Build FastAPI Response for non-JSON content types.

Handles binary, text, and other non-JSON response types with
appropriate header sanitization.
"""

def __init__(
self,
header_sanitizer: IHeaderSanitizer | None = None,
usage_header_injector: IUsageHeaderInjector | None = None,
) -> None:
"""Initialize other response builder.

Args:
header_sanitizer: Optional header sanitizer. Creates default if not provided.
usage_header_injector: Optional usage header injector. Creates default if not provided.
"""
self._header_sanitizer = header_sanitizer or HeaderSanitizer()
self._usage_header_injector = usage_header_injector or UsageHeaderInjector()

def build(self, envelope: ResponseEnvelope) -> Response:
"""Build Response from envelope.

Args:
envelope: Response envelope

Returns:
FastAPI Response
"""
# Get content and media type
content = envelope.content
media_type = getattr(envelope, "media_type", None) or "application/octet-stream"

# Inject canonical usage headers if available (Requirement 5.5)
headers = envelope.headers or {}
# Extract usage dict from envelope if available (for fallback)
usage_dict: dict[str, Any] | None = None
if envelope.usage:
from src.core.domain.usage_summary import UsageSummary

if isinstance(envelope.usage, UsageSummary):
usage_dict = envelope.usage.to_legacy_dict()
headers = self._usage_header_injector.inject_headers(
headers, usage_dict or {}, canonical_usage=envelope.canonical_usage
)

# Sanitize headers
safe_headers = self._header_sanitizer.sanitize(headers)

# Handle content conversion
content_bytes: bytes
if isinstance(content, bytes):
content_bytes = content
elif isinstance(content, str):
content_bytes = content.encode("utf-8")
else:
# For iterables and other non-string content, use JSON serialization
# to ensure consistent formatting (double quotes, proper escaping)
try:
content_bytes = json.dumps(content).encode("utf-8")
except (TypeError, ValueError):
# Fallback to string if JSON serialization fails
content_bytes = str(content).encode("utf-8")

# Create response
return Response(
content=content_bytes,
status_code=envelope.status_code or 200,
media_type=media_type,
headers=safe_headers,
)
Loading
Loading