-
Notifications
You must be signed in to change notification settings - Fork 3
π‘οΈ Sentinel: [CRITICAL] Fix timing attack vulnerability in token validation #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
matdev83
wants to merge
1
commit into
dev
Choose a base branch
from
fix-timing-attack-token-validation-11785717866638839489
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| ## 2024-05-23 - Prevent Timing Attacks in Token Validation | ||
| **Vulnerability:** API key and token validation in `AuthMiddleware` and `APIKeyMiddleware` used standard Python string equality (`!=`) and set membership (`in`) checks, which leak comparison time and could allow attackers to guess tokens via timing attacks. | ||
| **Learning:** Security middleware in Python must always use constant-time comparisons for secrets, especially in custom ASGI/FastAPI middleware that might bypass standard framework-level protections. Standard string comparison is susceptible to timing side channels. | ||
| **Prevention:** Always use `secrets.compare_digest(a, b)` when validating authentication tokens, API keys, passwords, or other secrets. Ensure inputs are strings and not `None` before comparing to avoid `TypeError`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
188 changes: 94 additions & 94 deletions
188
src/core/transport/fastapi/adapters/response/other_response_builder.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,94 +1,94 @@ | ||
| """Other response builder for non-JSON responses.""" | ||
| from __future__ import annotations | ||
| import json | ||
| from typing import Any | ||
| from fastapi.responses import Response | ||
| from src.core.domain.responses import ResponseEnvelope | ||
| from src.core.transport.fastapi.adapters.protocols import ( | ||
| IHeaderSanitizer, | ||
| IUsageHeaderInjector, | ||
| ) | ||
| from src.core.transport.fastapi.adapters.sanitization.header_sanitizer import ( | ||
| HeaderSanitizer, | ||
| ) | ||
| from src.core.transport.fastapi.adapters.usage.header_injector import ( | ||
| UsageHeaderInjector, | ||
| ) | ||
| class OtherResponseBuilder: | ||
| """Build FastAPI Response for non-JSON content types. | ||
| Handles binary, text, and other non-JSON response types with | ||
| appropriate header sanitization. | ||
| """ | ||
| def __init__( | ||
| self, | ||
| header_sanitizer: IHeaderSanitizer | None = None, | ||
| usage_header_injector: IUsageHeaderInjector | None = None, | ||
| ) -> None: | ||
| """Initialize other response builder. | ||
| Args: | ||
| header_sanitizer: Optional header sanitizer. Creates default if not provided. | ||
| usage_header_injector: Optional usage header injector. Creates default if not provided. | ||
| """ | ||
| self._header_sanitizer = header_sanitizer or HeaderSanitizer() | ||
| self._usage_header_injector = usage_header_injector or UsageHeaderInjector() | ||
| def build(self, envelope: ResponseEnvelope) -> Response: | ||
| """Build Response from envelope. | ||
| Args: | ||
| envelope: Response envelope | ||
| Returns: | ||
| FastAPI Response | ||
| """ | ||
| # Get content and media type | ||
| content = envelope.content | ||
| media_type = getattr(envelope, "media_type", None) or "application/octet-stream" | ||
| # Inject canonical usage headers if available (Requirement 5.5) | ||
| headers = envelope.headers or {} | ||
| # Extract usage dict from envelope if available (for fallback) | ||
| usage_dict: dict[str, Any] | None = None | ||
| if envelope.usage: | ||
| from src.core.domain.usage_summary import UsageSummary | ||
| if isinstance(envelope.usage, UsageSummary): | ||
| usage_dict = envelope.usage.to_legacy_dict() | ||
| headers = self._usage_header_injector.inject_headers( | ||
| headers, usage_dict or {}, canonical_usage=envelope.canonical_usage | ||
| ) | ||
| # Sanitize headers | ||
| safe_headers = self._header_sanitizer.sanitize(headers) | ||
| # Handle content conversion | ||
| content_bytes: bytes | ||
| if isinstance(content, bytes): | ||
| content_bytes = content | ||
| elif isinstance(content, str): | ||
| content_bytes = content.encode("utf-8") | ||
| else: | ||
| # For iterables and other non-string content, use JSON serialization | ||
| # to ensure consistent formatting (double quotes, proper escaping) | ||
| try: | ||
| content_bytes = json.dumps(content).encode("utf-8") | ||
| except (TypeError, ValueError): | ||
| # Fallback to string if JSON serialization fails | ||
| content_bytes = str(content).encode("utf-8") | ||
| # Create response | ||
| return Response( | ||
| content=content_bytes, | ||
| status_code=envelope.status_code or 200, | ||
| media_type=media_type, | ||
| headers=safe_headers, | ||
| ) | ||
| """Other response builder for non-JSON responses.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| from typing import Any | ||
|
|
||
| from fastapi.responses import Response | ||
|
|
||
| from src.core.domain.responses import ResponseEnvelope | ||
| from src.core.transport.fastapi.adapters.protocols import ( | ||
| IHeaderSanitizer, | ||
| IUsageHeaderInjector, | ||
| ) | ||
| from src.core.transport.fastapi.adapters.sanitization.header_sanitizer import ( | ||
| HeaderSanitizer, | ||
| ) | ||
| from src.core.transport.fastapi.adapters.usage.header_injector import ( | ||
| UsageHeaderInjector, | ||
| ) | ||
|
|
||
|
|
||
| class OtherResponseBuilder: | ||
| """Build FastAPI Response for non-JSON content types. | ||
|
|
||
| Handles binary, text, and other non-JSON response types with | ||
| appropriate header sanitization. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| header_sanitizer: IHeaderSanitizer | None = None, | ||
| usage_header_injector: IUsageHeaderInjector | None = None, | ||
| ) -> None: | ||
| """Initialize other response builder. | ||
|
|
||
| Args: | ||
| header_sanitizer: Optional header sanitizer. Creates default if not provided. | ||
| usage_header_injector: Optional usage header injector. Creates default if not provided. | ||
| """ | ||
| self._header_sanitizer = header_sanitizer or HeaderSanitizer() | ||
| self._usage_header_injector = usage_header_injector or UsageHeaderInjector() | ||
|
|
||
| def build(self, envelope: ResponseEnvelope) -> Response: | ||
| """Build Response from envelope. | ||
|
|
||
| Args: | ||
| envelope: Response envelope | ||
|
|
||
| Returns: | ||
| FastAPI Response | ||
| """ | ||
| # Get content and media type | ||
| content = envelope.content | ||
| media_type = getattr(envelope, "media_type", None) or "application/octet-stream" | ||
|
|
||
| # Inject canonical usage headers if available (Requirement 5.5) | ||
| headers = envelope.headers or {} | ||
| # Extract usage dict from envelope if available (for fallback) | ||
| usage_dict: dict[str, Any] | None = None | ||
| if envelope.usage: | ||
| from src.core.domain.usage_summary import UsageSummary | ||
|
|
||
| if isinstance(envelope.usage, UsageSummary): | ||
| usage_dict = envelope.usage.to_legacy_dict() | ||
| headers = self._usage_header_injector.inject_headers( | ||
| headers, usage_dict or {}, canonical_usage=envelope.canonical_usage | ||
| ) | ||
|
|
||
| # Sanitize headers | ||
| safe_headers = self._header_sanitizer.sanitize(headers) | ||
|
|
||
| # Handle content conversion | ||
| content_bytes: bytes | ||
| if isinstance(content, bytes): | ||
| content_bytes = content | ||
| elif isinstance(content, str): | ||
| content_bytes = content.encode("utf-8") | ||
| else: | ||
| # For iterables and other non-string content, use JSON serialization | ||
| # to ensure consistent formatting (double quotes, proper escaping) | ||
| try: | ||
| content_bytes = json.dumps(content).encode("utf-8") | ||
| except (TypeError, ValueError): | ||
| # Fallback to string if JSON serialization fails | ||
| content_bytes = str(content).encode("utf-8") | ||
|
|
||
| # Create response | ||
| return Response( | ||
| content=content_bytes, | ||
| status_code=envelope.status_code or 200, | ||
| media_type=media_type, | ||
| headers=safe_headers, | ||
| ) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing
api_key in all_valid_keyswith a fullforloop over every key makes authentication cost grow linearly with the number of configured keys on every request; this can become a noticeable throughput regression (and CPU amplification vector) in deployments that keep many tenant keys in memory. The same pattern appears in both thedispatchand ASGI__call__paths, so all API-key-protected traffic pays this cost. Consider preserving constant-time comparison semantics without scanning the entire key set per request.Useful? React with πΒ / π.